Only create WS message once, instead of recreating it for every client, and put both images into 1 message

master
Benjamin Ruesink 2 months ago
parent 59688b4eab
commit 90a306622d

@ -86,32 +86,40 @@ struct PerSocketData {
std::mutex wsMutex; std::mutex wsMutex;
std::vector<uWS::WebSocket<false, true, PerSocketData>*> activeWebSockets; std::vector<uWS::WebSocket<false, true, PerSocketData>*> activeWebSockets;
void SendFramesToAllClients() { void SendFramesToAllClients() {
std::lock_guard<std::mutex> lock(wsMutex); std::lock_guard<std::mutex> lock(wsMutex);
for (auto ws : activeWebSockets) { std::string message;
for (int type = 0; type < ST_MAX; ++type) { cv::Mat originalFrame, maskedFrame;
cv::Mat frame; {
std::lock_guard<std::mutex> lock(frameQueues[ST_ORIGINAL].frameMutex);
if (!frameQueues[ST_ORIGINAL].frameQueue.empty()) {
originalFrame = frameQueues[ST_ORIGINAL].frameQueue.front();
frameQueues[ST_ORIGINAL].frameQueue.pop();
}
}
{ {
std::lock_guard<std::mutex> lock(frameQueues[type].frameMutex); std::lock_guard<std::mutex> lock(frameQueues[ST_MASKED].frameMutex);
if (!frameQueues[type].frameQueue.empty()) { if (!frameQueues[ST_MASKED].frameQueue.empty()) {
frame = frameQueues[type].frameQueue.front(); maskedFrame = frameQueues[ST_MASKED].frameQueue.front();
frameQueues[type].frameQueue.pop(); frameQueues[ST_MASKED].frameQueue.pop();
} }
} }
if (!frame.empty()) { if (!originalFrame.empty() && !maskedFrame.empty()) {
std::vector<uchar> jpeg = matToJpeg(frame); std::vector<uchar> originalJpeg = matToJpeg(originalFrame);
uint32_t size = jpeg.size(); std::vector<uchar> maskedJpeg = matToJpeg(maskedFrame);
// Prepare the message: type (1 byte) + size (4 bytes) + image data uint32_t originalSize = originalJpeg.size();
std::string message(1, static_cast<char>(type)); uint32_t maskedSize = maskedJpeg.size();
message.append(reinterpret_cast<char*>(&size), 4);
message.append(jpeg.begin(), jpeg.end());
ws->send(message, uWS::OpCode::BINARY); // Prepare the message: originalSize (4 bytes) + maskedSize (4 bytes) + original image data + masked image data
} message.append(reinterpret_cast<char*>(&originalSize), 4);
message.append(reinterpret_cast<char*>(&maskedSize), 4);
message.append(originalJpeg.begin(), originalJpeg.end());
message.append(maskedJpeg.begin(), maskedJpeg.end());
} }
for (auto ws : activeWebSockets) {
ws->send(message, uWS::OpCode::BINARY);
} }
} }
@ -133,9 +141,10 @@ void RunWebSocketServer(int websocketPort) {
.resetIdleTimeoutOnSend = false, .resetIdleTimeoutOnSend = false,
.sendPingsAutomatically = true, .sendPingsAutomatically = true,
.open = [](auto* ws) { .open = [](auto* ws) {
Logger("WebSocket connection opened\n"); Logger("WebSocket connection opened, current connections: %d\n", activeWebSockets.size() + 1);
std::lock_guard<std::mutex> lock(wsMutex); std::lock_guard<std::mutex> lock(wsMutex);
activeWebSockets.push_back(ws); activeWebSockets.push_back(ws);
Logger("Client address: %s\n", ws->getRemoteAddressAsText().data());
}, },
.message = [](auto* ws, std::string_view message, uWS::OpCode opCode) { .message = [](auto* ws, std::string_view message, uWS::OpCode opCode) {
// Handle incoming messages if needed // Handle incoming messages if needed

@ -12,11 +12,20 @@
.streams { .streams {
display: flex; display: flex;
justify-content: center; justify-content: center;
background: #ddd; position: relative;
width: 100%;
} }
.streams img { .streams img {
margin: 50px;
outline: 1px solid #000; outline: 1px solid #000;
position: absolute;
top: 0;
left: 50%;
transform: translateX(-50%);
height: 450px;
background: #ddd;
}
.stream-2 {
mix-blend-mode: darken;
} }
.stats { .stats {
display: flex; display: flex;
@ -24,21 +33,30 @@
width: 100%; width: 100%;
margin-top: 20px; margin-top: 20px;
} }
.connect-info {
display: flex;
justify-content: center;
align-items: center;
margin-top: 20px;
}
</style> </style>
</head> </head>
<body> <body>
<h1>Stream Web App</h1> <h1>Stream Web App</h1>
<button onclick="connect()">Connect</button> <div class="connect-info">
<input type="text" placeholder="Enter the WebSocket URL" /> <input type="text" placeholder="Enter the WebSocket URL" />
<div class="streams"> <button onclick="connect()">Connect</button>
<img class="stream-1" src="" alt="Original Stream"> <button onclick="disconnect()">Disconnect</button>
<img class="stream-2" src="" alt="Masked Stream">
</div> </div>
<div class="stats"> <div class="stats">
<h4>Messages received: <label class="message-count">0</label></h4> <h4>Messages received: <label class="message-count">0</label></h4>
<h4>FPS: <label class="fps">0</label></h4> <h4>FPS: <label class="fps">0</label></h4>
<h4>Total MB received: <label class="total-mb">0</label></h4> <h4>Total MB received: <label class="total-mb">0</label></h4>
</div> </div>
<div class="streams">
<img class="stream-1" src="" alt="Stream">
<img class="stream-2" src="" alt="">
</div>
<script> <script>
let websocketUrl = 'ws://localhost:9001'; let websocketUrl = 'ws://localhost:9001';
let ws; let ws;
@ -51,7 +69,16 @@
let lastFpsUpdateTime = 0; let lastFpsUpdateTime = 0;
let totalBytesReceived = 0; let totalBytesReceived = 0;
const disconnect = () => {
if (ws) {
ws.close();
ws = null;
}
};
const connect = () => { const connect = () => {
disconnect();
websocketUrl = input.value; websocketUrl = input.value;
console.log('Connecting to the server...', websocketUrl); console.log('Connecting to the server...', websocketUrl);
ws = new WebSocket(websocketUrl); ws = new WebSocket(websocketUrl);
@ -74,21 +101,34 @@
try { try {
const arrayBuffer = await blob.arrayBuffer(); const arrayBuffer = await blob.arrayBuffer();
const dataView = new DataView(arrayBuffer); const dataView = new DataView(arrayBuffer);
const type = dataView.getUint8(0); const originalSize = dataView.getUint32(0, true);
const size = dataView.getUint32(1, true); const maskedSize = dataView.getUint32(4, true);
const imageData = arrayBuffer.slice(5, 5 + size);
const imageBlob = new Blob([imageData], { type: 'image/jpeg' }); const originalImageData = arrayBuffer.slice(8, 8 + originalSize);
const imageUrl = URL.createObjectURL(imageBlob); const maskedImageData = arrayBuffer.slice(8 + originalSize, 8 + originalSize + maskedSize);
const streamElement = document.querySelector(type === 0 ? '.stream-1' : '.stream-2');
streamElement.onload = () => { const originalImageBlob = new Blob([originalImageData], { type: 'image/jpeg' });
URL.revokeObjectURL(streamElement.src); // Clean up the old object URL const maskedImageBlob = new Blob([maskedImageData], { type: 'image/jpeg' });
const originalImageUrl = URL.createObjectURL(originalImageBlob);
const maskedImageUrl = URL.createObjectURL(maskedImageBlob);
const originalStreamElement = document.querySelector('.stream-1');
const maskedStreamElement = document.querySelector('.stream-2');
originalStreamElement.onload = () => {
URL.revokeObjectURL(originalStreamElement.src);
}; };
streamElement.src = imageUrl; maskedStreamElement.onload = () => {
URL.revokeObjectURL(maskedStreamElement.src);
};
originalStreamElement.src = originalImageUrl;
maskedStreamElement.src = maskedImageUrl;
// Update total MB received // Update total MB received
totalBytesReceived += arrayBuffer.byteLength; totalBytesReceived += arrayBuffer.byteLength;
totalMBElement.textContent = (totalBytesReceived / (1024 * 1024)).toFixed(2); totalMBElement.textContent = (totalBytesReceived / (1024 * 1024)).toFixed(2);
updateFPS(); updateFPS();
} catch (error) { } catch (error) {
console.error('Error processing message:', error); console.error('Error processing message:', error);

Loading…
Cancel
Save