From 59688b4eabccf40f465d0b05e79cfc2097ff4f3e Mon Sep 17 00:00:00 2001 From: Benjamin Ruesink Date: Tue, 8 Oct 2024 14:25:19 +0200 Subject: [PATCH] Separated websocket code to another header file --- README.MD | 2 +- samples/AiMaskStreamApp/AiMaskStreamApp.cpp | 142 +----------------- samples/AiMaskStreamApp/CMakeLists.txt | 7 +- samples/AiMaskStreamApp/video_stream.h | 153 ++++++++++++++++++++ 4 files changed, 163 insertions(+), 141 deletions(-) create mode 100644 samples/AiMaskStreamApp/video_stream.h diff --git a/README.MD b/README.MD index 92d3b53..ea39a3a 100644 --- a/README.MD +++ b/README.MD @@ -13,7 +13,7 @@ vcpkg install libuv:x64-windows uwebsockets-json:x64-windows zlib:x64-windows us ``` - Configure and generate the Visual Studio solution file -- If using CMake GUI, set the source folder to the root of the repository and the build folder to `build`, and set the toolchain file (-T) to `C:/vcpkg/scripts/buildsystems/vcpkg.cmake` +- If using CMake GUI, set the source folder to the root of the repository and the build folder to `build`, and set the toolchain file for cross-compiling to `C:/vcpkg/scripts/buildsystems/vcpkg.cmake` ```bash mkdir build ``` diff --git a/samples/AiMaskStreamApp/AiMaskStreamApp.cpp b/samples/AiMaskStreamApp/AiMaskStreamApp.cpp index dda4500..8746542 100644 --- a/samples/AiMaskStreamApp/AiMaskStreamApp.cpp +++ b/samples/AiMaskStreamApp/AiMaskStreamApp.cpp @@ -28,85 +28,16 @@ #include #include #include -#include -#include -#include -#include + #include "nvCVOpenCV.h" #include "nvVideoEffects.h" #include "opencv2/opencv.hpp" -#define UWS_NO_ZLIB -#include "App.h" +#include "video_stream.h" #pragma comment(lib, "Ws2_32.lib") -enum StreamType { - ST_ORIGINAL, - ST_MASKED, - ST_MAX -}; - -struct FrameQueue { - std::queue frameQueue; - std::mutex frameMutex; -}; - -std::vector frameQueues(ST_MAX); - -struct FrameHolder { - cv::Mat frame; - std::mutex frameMutex; -}; - -std::array frameHolders; - -// Convert cv::Mat to JPEG buffer -std::vector matToBytes(const cv::Mat& img) -{ - std::vector buf; - cv::imencode(".jpg", img, buf); - return buf; -} - -std::vector matToJpeg(const cv::Mat& img) -{ - std::vector buf; - cv::imencode(".jpg", img, buf); - return buf; -} - -void updateFrame(int type, const cv::Mat& frame) -{ - std::lock_guard lock(frameHolders[type].frameMutex); - frameHolders[type].frame = frame.clone(); -} - -cv::Mat compressFrame(const cv::Mat& frame, int quality = 80) -{ - std::vector buf; - std::vector params = { cv::IMWRITE_JPEG_QUALITY, quality }; - cv::imencode(".jpg", frame, buf, params); - return cv::imdecode(buf, cv::IMREAD_COLOR); -} - -void pushToFrameQueue(int type, const cv::Mat& frame) { - //cv::Mat compressedFrame = compressFrame(frame, 50); - - // clear the queue - std::queue empty; - std::swap(frameQueues[type].frameQueue, empty); - - frameQueues[type].frameQueue.push(frame.clone()); -} - -struct PerSocketData { -}; - -std::mutex wsMutex; -std::vector*> activeWebSockets; - #ifdef _MSC_VER #define strcasecmp _stricmp #include @@ -965,73 +896,6 @@ bool isCompModeEnumValid(const FXApp::CompMode& mode) return true; } -void sendFramesToAllClients() { - std::lock_guard lock(wsMutex); - for (auto ws : activeWebSockets) { - for (int type = 0; type < ST_MAX; ++type) { - cv::Mat frame; - { - std::lock_guard lock(frameQueues[type].frameMutex); - if (!frameQueues[type].frameQueue.empty()) { - frame = frameQueues[type].frameQueue.front(); - frameQueues[type].frameQueue.pop(); - } - } - - if (!frame.empty()) { - std::vector jpeg = matToJpeg(frame); - uint32_t size = jpeg.size(); - - // Prepare the message: type (1 byte) + size (4 bytes) + image data - std::string message(1, static_cast(type)); - message.append(reinterpret_cast(&size), 4); - message.append(jpeg.begin(), jpeg.end()); - - ws->send(message, uWS::OpCode::BINARY); - } - } - } -} - -void RunWebSocketServer() { - auto loop = uWS::Loop::get(); - - struct us_timer_t* frameTimer = us_create_timer((struct us_loop_t*)loop, 0, 0); - - us_timer_set(frameTimer, [](struct us_timer_t*) { - sendFramesToAllClients(); - }, 33, 33); // 33ms interval for approximately 30 FPS - - uWS::App().ws("/*", { - .compression = uWS::SHARED_COMPRESSOR, - .maxPayloadLength = 100 * 1024 * 1024, - .idleTimeout = 0, - .maxBackpressure = 1 * 1024 * 1024, - .closeOnBackpressureLimit = false, - .resetIdleTimeoutOnSend = false, - .sendPingsAutomatically = true, - .open = [](auto* ws) { - std::cout << "New WebSocket connection" << std::endl; - std::lock_guard lock(wsMutex); - activeWebSockets.push_back(ws); - }, - .message = [](auto* ws, std::string_view message, uWS::OpCode opCode) { - // Handle incoming messages if needed - }, - .close = [](auto* ws, int code, std::string_view message) { - std::cout << "WebSocket connection closed" << std::endl; - std::cout << "Active connections: " << activeWebSockets.size() - 1 << std::endl; - std::lock_guard lock(wsMutex); - activeWebSockets.erase(std::remove(activeWebSockets.begin(), activeWebSockets.end(), ws), activeWebSockets.end()); - } - }).listen(FLAG_wsPort, [](auto* listen_socket) { - if (listen_socket) { - std::cout << "Listening on port " << FLAG_wsPort << std::endl; - } - }).run(); -} - - int main(int argc, char **argv) { int nErrs = 0; nErrs = ParseMyArgs(argc, argv); @@ -1040,7 +904,7 @@ int main(int argc, char **argv) { return nErrs; } - std::thread websocketThread(RunWebSocketServer); + std::thread websocketThread(RunWebSocketServer, FLAG_wsPort); FXApp::Err fxErr = FXApp::errNone; FXApp app; diff --git a/samples/AiMaskStreamApp/CMakeLists.txt b/samples/AiMaskStreamApp/CMakeLists.txt index 748a69c..077b662 100644 --- a/samples/AiMaskStreamApp/CMakeLists.txt +++ b/samples/AiMaskStreamApp/CMakeLists.txt @@ -6,7 +6,12 @@ set(SOURCE_FILES # Set Visual Studio source filters source_group("Source Files" FILES ${SOURCE_FILES}) -add_executable(AiMaskStreamApp ${SOURCE_FILES}) +set(HEADER_FILES + video_stream.h) +# Set Visual Studio source filters +source_group("Header Files" FILES ${HEADER_FILES}) + +add_executable(AiMaskStreamApp ${SOURCE_FILES} ${HEADER_FILES}) target_include_directories(AiMaskStreamApp PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../utils diff --git a/samples/AiMaskStreamApp/video_stream.h b/samples/AiMaskStreamApp/video_stream.h new file mode 100644 index 0000000..be28b74 --- /dev/null +++ b/samples/AiMaskStreamApp/video_stream.h @@ -0,0 +1,153 @@ +#pragma once + +#include +#include +#include +#include + +#define UWS_NO_ZLIB +#include "App.h" + +void Logger(const char* format, ...) { + time_t now = time(0); + tm* ltm = localtime(&now); + char buffer[32]; + strftime(buffer, 32, "[%H:%M:%S] ", ltm); + printf("%s", buffer); + + va_list args; + va_start(args, format); + vprintf(format, args); + va_end(args); +} + +enum StreamType { + ST_ORIGINAL, + ST_MASKED, + ST_MAX +}; + +struct FrameQueue { + std::queue frameQueue; + std::mutex frameMutex; +}; + +std::vector frameQueues(ST_MAX); + +struct FrameHolder { + cv::Mat frame; + std::mutex frameMutex; +}; + +std::array frameHolders; + +// Convert cv::Mat to JPEG buffer +std::vector matToBytes(const cv::Mat& img) +{ + std::vector buf; + cv::imencode(".jpg", img, buf); + return buf; +} + +std::vector matToJpeg(const cv::Mat& img) +{ + std::vector buf; + cv::imencode(".jpg", img, buf); + return buf; +} + +void updateFrame(int type, const cv::Mat& frame) +{ + std::lock_guard lock(frameHolders[type].frameMutex); + frameHolders[type].frame = frame.clone(); +} + +cv::Mat compressFrame(const cv::Mat& frame, int quality = 80) +{ + std::vector buf; + std::vector params = { cv::IMWRITE_JPEG_QUALITY, quality }; + cv::imencode(".jpg", frame, buf, params); + return cv::imdecode(buf, cv::IMREAD_COLOR); +} + +void pushToFrameQueue(int type, const cv::Mat& frame) { + //cv::Mat compressedFrame = compressFrame(frame, 50); + + // clear the queue + std::queue empty; + std::swap(frameQueues[type].frameQueue, empty); + + frameQueues[type].frameQueue.push(frame.clone()); +} + +struct PerSocketData { +}; + +std::mutex wsMutex; +std::vector*> activeWebSockets; + + +void SendFramesToAllClients() { + std::lock_guard lock(wsMutex); + for (auto ws : activeWebSockets) { + for (int type = 0; type < ST_MAX; ++type) { + cv::Mat frame; + { + std::lock_guard lock(frameQueues[type].frameMutex); + if (!frameQueues[type].frameQueue.empty()) { + frame = frameQueues[type].frameQueue.front(); + frameQueues[type].frameQueue.pop(); + } + } + + if (!frame.empty()) { + std::vector jpeg = matToJpeg(frame); + uint32_t size = jpeg.size(); + + // Prepare the message: type (1 byte) + size (4 bytes) + image data + std::string message(1, static_cast(type)); + message.append(reinterpret_cast(&size), 4); + message.append(jpeg.begin(), jpeg.end()); + + ws->send(message, uWS::OpCode::BINARY); + } + } + } +} + +void RunWebSocketServer(int websocketPort) { + auto loop = uWS::Loop::get(); + + struct us_timer_t* frameTimer = us_create_timer((struct us_loop_t*)loop, 0, 0); + + us_timer_set(frameTimer, [](struct us_timer_t*) { + SendFramesToAllClients(); + }, 33, 33); // 33ms interval for approximately 30 FPS + + uWS::App().ws("/*", { + .compression = uWS::SHARED_COMPRESSOR, + .maxPayloadLength = 100 * 1024 * 1024, + .idleTimeout = 120, + .maxBackpressure = 1 * 1024 * 1024, + .closeOnBackpressureLimit = false, + .resetIdleTimeoutOnSend = false, + .sendPingsAutomatically = true, + .open = [](auto* ws) { + Logger("WebSocket connection opened\n"); + std::lock_guard lock(wsMutex); + activeWebSockets.push_back(ws); + }, + .message = [](auto* ws, std::string_view message, uWS::OpCode opCode) { + // Handle incoming messages if needed + }, + .close = [](auto* ws, int code, std::string_view message) { + Logger("WebSocket connection closed, current connections: %d\n", activeWebSockets.size() - 1); + std::lock_guard lock(wsMutex); + activeWebSockets.erase(std::remove(activeWebSockets.begin(), activeWebSockets.end(), ws), activeWebSockets.end()); + } + }).listen(websocketPort, [websocketPort](auto* listen_socket) { + if (listen_socket) { + Logger("WebSocket listening on port %i\n", websocketPort); + } + }).run(); +}