Separated websocket code to another header file

master
Benjamin Ruesink 2 months ago
parent eb0026ae08
commit 59688b4eab

@ -13,7 +13,7 @@ vcpkg install libuv:x64-windows uwebsockets-json:x64-windows zlib:x64-windows us
```
- Configure and generate the Visual Studio solution file
- If using CMake GUI, set the source folder to the root of the repository and the build folder to `build`, and set the toolchain file (-T) to `C:/vcpkg/scripts/buildsystems/vcpkg.cmake`
- If using CMake GUI, set the source folder to the root of the repository and the build folder to `build`, and set the toolchain file for cross-compiling to `C:/vcpkg/scripts/buildsystems/vcpkg.cmake`
```bash
mkdir build
```

@ -28,85 +28,16 @@
#include <chrono>
#include <string>
#include <iostream>
#include <vector>
#include <queue>
#include <mutex>
#include <thread>
#include "nvCVOpenCV.h"
#include "nvVideoEffects.h"
#include "opencv2/opencv.hpp"
#define UWS_NO_ZLIB
#include "App.h"
#include "video_stream.h"
#pragma comment(lib, "Ws2_32.lib")
enum StreamType {
ST_ORIGINAL,
ST_MASKED,
ST_MAX
};
struct FrameQueue {
std::queue<cv::Mat> frameQueue;
std::mutex frameMutex;
};
std::vector<FrameQueue> frameQueues(ST_MAX);
struct FrameHolder {
cv::Mat frame;
std::mutex frameMutex;
};
std::array<FrameHolder, ST_MAX> frameHolders;
// Convert cv::Mat to JPEG buffer
std::vector<uchar> matToBytes(const cv::Mat& img)
{
std::vector<uchar> buf;
cv::imencode(".jpg", img, buf);
return buf;
}
std::vector<uchar> matToJpeg(const cv::Mat& img)
{
std::vector<uchar> buf;
cv::imencode(".jpg", img, buf);
return buf;
}
void updateFrame(int type, const cv::Mat& frame)
{
std::lock_guard<std::mutex> lock(frameHolders[type].frameMutex);
frameHolders[type].frame = frame.clone();
}
cv::Mat compressFrame(const cv::Mat& frame, int quality = 80)
{
std::vector<uchar> buf;
std::vector<int> params = { cv::IMWRITE_JPEG_QUALITY, quality };
cv::imencode(".jpg", frame, buf, params);
return cv::imdecode(buf, cv::IMREAD_COLOR);
}
void pushToFrameQueue(int type, const cv::Mat& frame) {
//cv::Mat compressedFrame = compressFrame(frame, 50);
// clear the queue
std::queue<cv::Mat> empty;
std::swap(frameQueues[type].frameQueue, empty);
frameQueues[type].frameQueue.push(frame.clone());
}
struct PerSocketData {
};
std::mutex wsMutex;
std::vector<uWS::WebSocket<false, true, PerSocketData>*> activeWebSockets;
#ifdef _MSC_VER
#define strcasecmp _stricmp
#include <Windows.h>
@ -965,73 +896,6 @@ bool isCompModeEnumValid(const FXApp::CompMode& mode)
return true;
}
void sendFramesToAllClients() {
std::lock_guard<std::mutex> lock(wsMutex);
for (auto ws : activeWebSockets) {
for (int type = 0; type < ST_MAX; ++type) {
cv::Mat frame;
{
std::lock_guard<std::mutex> lock(frameQueues[type].frameMutex);
if (!frameQueues[type].frameQueue.empty()) {
frame = frameQueues[type].frameQueue.front();
frameQueues[type].frameQueue.pop();
}
}
if (!frame.empty()) {
std::vector<uchar> jpeg = matToJpeg(frame);
uint32_t size = jpeg.size();
// Prepare the message: type (1 byte) + size (4 bytes) + image data
std::string message(1, static_cast<char>(type));
message.append(reinterpret_cast<char*>(&size), 4);
message.append(jpeg.begin(), jpeg.end());
ws->send(message, uWS::OpCode::BINARY);
}
}
}
}
void RunWebSocketServer() {
auto loop = uWS::Loop::get();
struct us_timer_t* frameTimer = us_create_timer((struct us_loop_t*)loop, 0, 0);
us_timer_set(frameTimer, [](struct us_timer_t*) {
sendFramesToAllClients();
}, 33, 33); // 33ms interval for approximately 30 FPS
uWS::App().ws<PerSocketData>("/*", {
.compression = uWS::SHARED_COMPRESSOR,
.maxPayloadLength = 100 * 1024 * 1024,
.idleTimeout = 0,
.maxBackpressure = 1 * 1024 * 1024,
.closeOnBackpressureLimit = false,
.resetIdleTimeoutOnSend = false,
.sendPingsAutomatically = true,
.open = [](auto* ws) {
std::cout << "New WebSocket connection" << std::endl;
std::lock_guard<std::mutex> lock(wsMutex);
activeWebSockets.push_back(ws);
},
.message = [](auto* ws, std::string_view message, uWS::OpCode opCode) {
// Handle incoming messages if needed
},
.close = [](auto* ws, int code, std::string_view message) {
std::cout << "WebSocket connection closed" << std::endl;
std::cout << "Active connections: " << activeWebSockets.size() - 1 << std::endl;
std::lock_guard<std::mutex> lock(wsMutex);
activeWebSockets.erase(std::remove(activeWebSockets.begin(), activeWebSockets.end(), ws), activeWebSockets.end());
}
}).listen(FLAG_wsPort, [](auto* listen_socket) {
if (listen_socket) {
std::cout << "Listening on port " << FLAG_wsPort << std::endl;
}
}).run();
}
int main(int argc, char **argv) {
int nErrs = 0;
nErrs = ParseMyArgs(argc, argv);
@ -1040,7 +904,7 @@ int main(int argc, char **argv) {
return nErrs;
}
std::thread websocketThread(RunWebSocketServer);
std::thread websocketThread(RunWebSocketServer, FLAG_wsPort);
FXApp::Err fxErr = FXApp::errNone;
FXApp app;

@ -6,7 +6,12 @@ set(SOURCE_FILES
# Set Visual Studio source filters
source_group("Source Files" FILES ${SOURCE_FILES})
add_executable(AiMaskStreamApp ${SOURCE_FILES})
set(HEADER_FILES
video_stream.h)
# Set Visual Studio source filters
source_group("Header Files" FILES ${HEADER_FILES})
add_executable(AiMaskStreamApp ${SOURCE_FILES} ${HEADER_FILES})
target_include_directories(AiMaskStreamApp PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/../utils

@ -0,0 +1,153 @@
#pragma once
#include <vector>
#include <queue>
#include <mutex>
#include <thread>
#define UWS_NO_ZLIB
#include "App.h"
void Logger(const char* format, ...) {
time_t now = time(0);
tm* ltm = localtime(&now);
char buffer[32];
strftime(buffer, 32, "[%H:%M:%S] ", ltm);
printf("%s", buffer);
va_list args;
va_start(args, format);
vprintf(format, args);
va_end(args);
}
enum StreamType {
ST_ORIGINAL,
ST_MASKED,
ST_MAX
};
struct FrameQueue {
std::queue<cv::Mat> frameQueue;
std::mutex frameMutex;
};
std::vector<FrameQueue> frameQueues(ST_MAX);
struct FrameHolder {
cv::Mat frame;
std::mutex frameMutex;
};
std::array<FrameHolder, ST_MAX> frameHolders;
// Convert cv::Mat to JPEG buffer
std::vector<uchar> matToBytes(const cv::Mat& img)
{
std::vector<uchar> buf;
cv::imencode(".jpg", img, buf);
return buf;
}
std::vector<uchar> matToJpeg(const cv::Mat& img)
{
std::vector<uchar> buf;
cv::imencode(".jpg", img, buf);
return buf;
}
void updateFrame(int type, const cv::Mat& frame)
{
std::lock_guard<std::mutex> lock(frameHolders[type].frameMutex);
frameHolders[type].frame = frame.clone();
}
cv::Mat compressFrame(const cv::Mat& frame, int quality = 80)
{
std::vector<uchar> buf;
std::vector<int> params = { cv::IMWRITE_JPEG_QUALITY, quality };
cv::imencode(".jpg", frame, buf, params);
return cv::imdecode(buf, cv::IMREAD_COLOR);
}
void pushToFrameQueue(int type, const cv::Mat& frame) {
//cv::Mat compressedFrame = compressFrame(frame, 50);
// clear the queue
std::queue<cv::Mat> empty;
std::swap(frameQueues[type].frameQueue, empty);
frameQueues[type].frameQueue.push(frame.clone());
}
struct PerSocketData {
};
std::mutex wsMutex;
std::vector<uWS::WebSocket<false, true, PerSocketData>*> activeWebSockets;
void SendFramesToAllClients() {
std::lock_guard<std::mutex> lock(wsMutex);
for (auto ws : activeWebSockets) {
for (int type = 0; type < ST_MAX; ++type) {
cv::Mat frame;
{
std::lock_guard<std::mutex> lock(frameQueues[type].frameMutex);
if (!frameQueues[type].frameQueue.empty()) {
frame = frameQueues[type].frameQueue.front();
frameQueues[type].frameQueue.pop();
}
}
if (!frame.empty()) {
std::vector<uchar> jpeg = matToJpeg(frame);
uint32_t size = jpeg.size();
// Prepare the message: type (1 byte) + size (4 bytes) + image data
std::string message(1, static_cast<char>(type));
message.append(reinterpret_cast<char*>(&size), 4);
message.append(jpeg.begin(), jpeg.end());
ws->send(message, uWS::OpCode::BINARY);
}
}
}
}
void RunWebSocketServer(int websocketPort) {
auto loop = uWS::Loop::get();
struct us_timer_t* frameTimer = us_create_timer((struct us_loop_t*)loop, 0, 0);
us_timer_set(frameTimer, [](struct us_timer_t*) {
SendFramesToAllClients();
}, 33, 33); // 33ms interval for approximately 30 FPS
uWS::App().ws<PerSocketData>("/*", {
.compression = uWS::SHARED_COMPRESSOR,
.maxPayloadLength = 100 * 1024 * 1024,
.idleTimeout = 120,
.maxBackpressure = 1 * 1024 * 1024,
.closeOnBackpressureLimit = false,
.resetIdleTimeoutOnSend = false,
.sendPingsAutomatically = true,
.open = [](auto* ws) {
Logger("WebSocket connection opened\n");
std::lock_guard<std::mutex> lock(wsMutex);
activeWebSockets.push_back(ws);
},
.message = [](auto* ws, std::string_view message, uWS::OpCode opCode) {
// Handle incoming messages if needed
},
.close = [](auto* ws, int code, std::string_view message) {
Logger("WebSocket connection closed, current connections: %d\n", activeWebSockets.size() - 1);
std::lock_guard<std::mutex> lock(wsMutex);
activeWebSockets.erase(std::remove(activeWebSockets.begin(), activeWebSockets.end(), ws), activeWebSockets.end());
}
}).listen(websocketPort, [websocketPort](auto* listen_socket) {
if (listen_socket) {
Logger("WebSocket listening on port %i\n", websocketPort);
}
}).run();
}
Loading…
Cancel
Save