diff --git a/.gitignore b/.gitignore index 3e7a101..58e4f5e 100644 --- a/.gitignore +++ b/.gitignore @@ -52,4 +52,6 @@ build/**/* include/* lib/* bin/* -test/test_runner \ No newline at end of file +test/test_runner + +.vscode/* \ No newline at end of file diff --git a/samples/AiMaskStreamApp/AiMaskStreamApp.cpp b/samples/AiMaskStreamApp/AiMaskStreamApp.cpp index 2104a97..11e74af 100644 --- a/samples/AiMaskStreamApp/AiMaskStreamApp.cpp +++ b/samples/AiMaskStreamApp/AiMaskStreamApp.cpp @@ -105,6 +105,12 @@ void pushToFrameQueue(int type, const cv::Mat& frame) { frameQueues[type].frameQueue.push(finalimg.clone()); } +struct PerSocketData { +}; + +std::mutex wsMutex; +std::vector*> activeWebSockets; + #ifdef _MSC_VER #define strcasecmp _stricmp #include @@ -778,107 +784,6 @@ bail: return (FXApp::Err)vfxErr; } -std::mutex clients_mutex; -struct PerSocketData { -}; -//std::set*> clients; - -class TcpServer { -public: - TcpServer(int port) : port_(port) {} - void start() { - WSADATA wsaData; - int result = WSAStartup(MAKEWORD(2, 2), &wsaData); - if (result != 0) { - std::cerr << "WSAStartup failed: " << result << std::endl; - return; - } - SOCKET listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - if (listenSocket == INVALID_SOCKET) { - std::cerr << "Error creating socket: " << WSAGetLastError() << std::endl; - WSACleanup(); - return; - } - sockaddr_in serverAddr; - serverAddr.sin_family = AF_INET; - serverAddr.sin_port = htons(port_); - serverAddr.sin_addr.s_addr = INADDR_ANY; - if (bind(listenSocket, (sockaddr*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR) { - std::cerr << "Bind failed: " << WSAGetLastError() << std::endl; - closesocket(listenSocket); - WSACleanup(); - return; - } - if (listen(listenSocket, SOMAXCONN) == SOCKET_ERROR) { - std::cerr << "Listen failed: " << WSAGetLastError() << std::endl; - closesocket(listenSocket); - WSACleanup(); - return; - } - std::cout << "Server listening on port " << port_ << std::endl; - - while (true) { - SOCKET clientSocket = accept(listenSocket, NULL, NULL); - if (clientSocket == INVALID_SOCKET) { - std::cerr << "Accept failed: " << WSAGetLastError() << std::endl; - continue; - } - - std::cout << "New client connected" << std::endl; - std::thread(&TcpServer::handleClient, this, clientSocket).detach(); - } - - closesocket(listenSocket); - WSACleanup(); - } - - void close() { - WSACleanup(); - } - -private: - void handleClient(SOCKET clientSocket) { - try { - while (true) { - cv::Mat frame1, frame2; - { - std::lock_guard lock1(frameQueues[ST_ORIGINAL].frameMutex); - std::lock_guard lock2(frameQueues[ST_MASKED].frameMutex); - if (!frameQueues[ST_ORIGINAL].frameQueue.empty() && !frameQueues[ST_MASKED].frameQueue.empty()) { - frame1 = frameQueues[ST_ORIGINAL].frameQueue.front(); - frame2 = frameQueues[ST_MASKED].frameQueue.front(); - frameQueues[ST_ORIGINAL].frameQueue.pop(); - frameQueues[ST_MASKED].frameQueue.pop(); - } - } - if (!frame1.empty() && !frame2.empty()) { - std::vector bytes1 = matToBytes(frame1); - std::vector bytes2 = matToBytes(frame2); - - uint32_t size1 = bytes1.size(); - uint32_t size2 = bytes2.size(); - - send(clientSocket, (char*)&size1, sizeof(size1), 0); - send(clientSocket, (char*)bytes1.data(), bytes1.size(), 0); - send(clientSocket, (char*)&size2, sizeof(size2), 0); - send(clientSocket, (char*)bytes2.data(), bytes2.size(), 0); - } - else { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - } - } - catch (std::exception& e) { - std::cerr << "Exception in thread: " << e.what() << std::endl; - } - - closesocket(clientSocket); - } - - const char* ip_; - int port_; -}; - FXApp::Err FXApp::processMovie(const char *inFile, const char *outFile) { float ms = 0.0f; FXApp::Err appErr = errNone; @@ -893,29 +798,6 @@ FXApp::Err FXApp::processMovie(const char *inFile, const char *outFile) { if (inFile && !inFile[0]) inFile = nullptr; // Set file paths to NULL if zero length if (outFile && !outFile[0]) outFile = nullptr; - - /*uWS::App().ws("/*", { - .open = [](auto* ws) { - std::cout << "A WebSocket connected!" << std::endl; - std::lock_guard lock(clients_mutex); - clients.insert(ws); - }, - .message = [](auto* ws, std::string_view message, uWS::OpCode opCode) { - ws->send(message, opCode); - }, - .close = [](auto* ws, int code, std::string_view message) { - std::cout << "WebSocket closed" << std::endl; - std::lock_guard lock(clients_mutex); - clients.erase(ws); - } - }).listen(9001, [](auto* listen_socket) { - if (listen_socket) { - std::cout << "Listening on port " << 9001 << std::endl; - } - }).run(); - -*/ - if (inFile) { reader.open(inFile); @@ -1211,83 +1093,74 @@ void startHttpServer() { svr.listen("0.0.0.0", 8090); } -void testWS() { - /* ws->getUserData returns one of these */ - struct PerSocketData { - /* Fill with user data */ - std::vector topics; - int nr = 0; - }; - { - /* Keep in mind that uWS::SSLApp({options}) is the same as uWS::App() when compiled without SSL support. - * You may swap to using uWS:App() if you don't need SSL */ - uWS::SSLApp* app = new uWS::SSLApp({ - /* There are example certificates in uWebSockets.js repo */ - .key_file_name = "misc/key.pem", - .cert_file_name = "misc/cert.pem", - .passphrase = "1234" - }); - - app->ws("/*", { - /* Settings */ - .compression = uWS::DISABLED, - .maxPayloadLength = 16 * 1024 * 1024, - .idleTimeout = 60, - .maxBackpressure = 16 * 1024 * 1024, - .closeOnBackpressureLimit = false, - .resetIdleTimeoutOnSend = true, - .sendPingsAutomatically = false, - /* Handlers */ - .upgrade = nullptr, - .open = [](auto* ws) { - /* Open event here, you may access ws->getUserData() which points to a PerSocketData struct */ - - printf("opened\n"); - PerSocketData* perSocketData = (PerSocketData*)ws->getUserData(); - - for (int i = 0; i < 32; i++) { - std::string topic = std::to_string((uintptr_t)ws) + "-" + std::to_string(i); - perSocketData->topics.push_back(topic); - ws->subscribe(topic); +void sendFramesToAllClients() { + std::lock_guard lock(wsMutex); + for (auto ws : activeWebSockets) { + for (int type = 0; type < ST_MAX; ++type) { + cv::Mat frame; + { + std::lock_guard lock(frameQueues[type].frameMutex); + if (!frameQueues[type].frameQueue.empty()) { + frame = frameQueues[type].frameQueue.front(); + frameQueues[type].frameQueue.pop(); } - }, - .message = [&app](auto* ws, std::string_view message, uWS::OpCode opCode) { - PerSocketData* perSocketData = (PerSocketData*)ws->getUserData(); - - printf("Message: %s\n", message.data()); - - app->publish(perSocketData->topics[(size_t)(++perSocketData->nr % 32)], message, opCode); - ws->publish(perSocketData->topics[(size_t)(++perSocketData->nr % 32)], message, opCode); - }, - .drain = [](auto*/*ws*/) { - /* Check ws->getBufferedAmount() here */ - //std::cout << "drain" << std::endl; - }, - .ping = [](auto*/*ws*/, std::string_view) { - /* Not implemented yet */ - }, - .pong = [](auto*/*ws*/, std::string_view) { - /* Not implemented yet */ - }, - .close = [](auto*/*ws*/, int /*code*/, std::string_view /*message*/) { - /* You may access ws->getUserData() here */ } - }).listen(9001, [](auto* listen_s) { - if (listen_s) { - std::cout << "Listening on port " << 9001 << std::endl; - //listen_socket = listen_s; - } - }); - app->run(); + if (!frame.empty()) { + std::vector jpeg = matToJpeg(frame); + uint32_t size = jpeg.size(); - delete app; + // Prepare the message: type (1 byte) + size (4 bytes) + image data + std::string message(1, static_cast(type)); + message.append(reinterpret_cast(&size), 4); + message.append(jpeg.begin(), jpeg.end()); - uWS::Loop::get()->free(); + ws->send(message, uWS::OpCode::BINARY); + } + } } } + +void runWebSocketServer() { + auto loop = uWS::Loop::get(); + + struct us_timer_t* frameTimer = us_create_timer((struct us_loop_t*)loop, 0, 0); + + us_timer_set(frameTimer, [](struct us_timer_t*) { + sendFramesToAllClients(); + }, 33, 33); // 33ms interval for approximately 30 FPS + + uWS::App().ws("/*", { + .compression = uWS::SHARED_COMPRESSOR, + .maxPayloadLength = 100 * 1024 * 1024, + .idleTimeout = 0, + .maxBackpressure = 1 * 1024 * 1024, + .closeOnBackpressureLimit = false, + .resetIdleTimeoutOnSend = false, + .sendPingsAutomatically = true, + .open = [](auto* ws) { + std::cout << "New WebSocket connection" << std::endl; + std::lock_guard lock(wsMutex); + activeWebSockets.push_back(ws); + }, + .message = [](auto* ws, std::string_view message, uWS::OpCode opCode) { + // Handle incoming messages if needed + }, + .close = [](auto* ws, int code, std::string_view message) { + std::cout << "WebSocket connection closed" << std::endl; + std::lock_guard lock(wsMutex); + activeWebSockets.erase(std::remove(activeWebSockets.begin(), activeWebSockets.end(), ws), activeWebSockets.end()); + } + }).listen(9001, [](auto* listen_socket) { + if (listen_socket) { + std::cout << "Listening on port 9001" << std::endl; + } + }).run(); +} + + int main(int argc, char **argv) { int nErrs = 0; nErrs = ParseMyArgs(argc, argv); @@ -1296,10 +1169,7 @@ int main(int argc, char **argv) { return nErrs; } - std::thread testws(&testWS); - - TcpServer server(8093); - std::thread serverThread2(&TcpServer::start, &server); + std::thread websocketThread(runWebSocketServer); FXApp::Err fxErr = FXApp::errNone; FXApp app; @@ -1351,10 +1221,7 @@ int main(int argc, char **argv) { } } - server.close(); - - //serverThread.join(); - serverThread2.join(); + websocketThread.join(); if (fxErr) std::cerr << "Error: " << app.errorStringFromCode(fxErr) << std::endl; return (int)fxErr; diff --git a/streamWebApp/index.html b/streamWebApp/index.html new file mode 100644 index 0000000..801e0d4 --- /dev/null +++ b/streamWebApp/index.html @@ -0,0 +1,120 @@ + + + + Stream Web App + + + +

Stream Web App

+ + +
+ Original Stream + Masked Stream +
+
+

Messages received:

+

FPS:

+
+ + + \ No newline at end of file