Replace TCP server with websocket implementation, add stream testing web app

master
Benjamin Ruesink 2 months ago
parent c3f5f11d11
commit 5532b45bbb

2
.gitignore vendored

@ -53,3 +53,5 @@ include/*
lib/*
bin/*
test/test_runner
.vscode/*

@ -105,6 +105,12 @@ void pushToFrameQueue(int type, const cv::Mat& frame) {
frameQueues[type].frameQueue.push(finalimg.clone());
}
struct PerSocketData {
};
std::mutex wsMutex;
std::vector<uWS::WebSocket<false, true, PerSocketData>*> activeWebSockets;
#ifdef _MSC_VER
#define strcasecmp _stricmp
#include <Windows.h>
@ -778,107 +784,6 @@ bail:
return (FXApp::Err)vfxErr;
}
std::mutex clients_mutex;
struct PerSocketData {
};
//std::set<uWS::WebSocket<false, true, PerSocketData>*> clients;
class TcpServer {
public:
TcpServer(int port) : port_(port) {}
void start() {
WSADATA wsaData;
int result = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (result != 0) {
std::cerr << "WSAStartup failed: " << result << std::endl;
return;
}
SOCKET listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (listenSocket == INVALID_SOCKET) {
std::cerr << "Error creating socket: " << WSAGetLastError() << std::endl;
WSACleanup();
return;
}
sockaddr_in serverAddr;
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(port_);
serverAddr.sin_addr.s_addr = INADDR_ANY;
if (bind(listenSocket, (sockaddr*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR) {
std::cerr << "Bind failed: " << WSAGetLastError() << std::endl;
closesocket(listenSocket);
WSACleanup();
return;
}
if (listen(listenSocket, SOMAXCONN) == SOCKET_ERROR) {
std::cerr << "Listen failed: " << WSAGetLastError() << std::endl;
closesocket(listenSocket);
WSACleanup();
return;
}
std::cout << "Server listening on port " << port_ << std::endl;
while (true) {
SOCKET clientSocket = accept(listenSocket, NULL, NULL);
if (clientSocket == INVALID_SOCKET) {
std::cerr << "Accept failed: " << WSAGetLastError() << std::endl;
continue;
}
std::cout << "New client connected" << std::endl;
std::thread(&TcpServer::handleClient, this, clientSocket).detach();
}
closesocket(listenSocket);
WSACleanup();
}
void close() {
WSACleanup();
}
private:
void handleClient(SOCKET clientSocket) {
try {
while (true) {
cv::Mat frame1, frame2;
{
std::lock_guard<std::mutex> lock1(frameQueues[ST_ORIGINAL].frameMutex);
std::lock_guard<std::mutex> lock2(frameQueues[ST_MASKED].frameMutex);
if (!frameQueues[ST_ORIGINAL].frameQueue.empty() && !frameQueues[ST_MASKED].frameQueue.empty()) {
frame1 = frameQueues[ST_ORIGINAL].frameQueue.front();
frame2 = frameQueues[ST_MASKED].frameQueue.front();
frameQueues[ST_ORIGINAL].frameQueue.pop();
frameQueues[ST_MASKED].frameQueue.pop();
}
}
if (!frame1.empty() && !frame2.empty()) {
std::vector<uchar> bytes1 = matToBytes(frame1);
std::vector<uchar> bytes2 = matToBytes(frame2);
uint32_t size1 = bytes1.size();
uint32_t size2 = bytes2.size();
send(clientSocket, (char*)&size1, sizeof(size1), 0);
send(clientSocket, (char*)bytes1.data(), bytes1.size(), 0);
send(clientSocket, (char*)&size2, sizeof(size2), 0);
send(clientSocket, (char*)bytes2.data(), bytes2.size(), 0);
}
else {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
}
catch (std::exception& e) {
std::cerr << "Exception in thread: " << e.what() << std::endl;
}
closesocket(clientSocket);
}
const char* ip_;
int port_;
};
FXApp::Err FXApp::processMovie(const char *inFile, const char *outFile) {
float ms = 0.0f;
FXApp::Err appErr = errNone;
@ -894,29 +799,6 @@ FXApp::Err FXApp::processMovie(const char *inFile, const char *outFile) {
if (inFile && !inFile[0]) inFile = nullptr; // Set file paths to NULL if zero length
if (outFile && !outFile[0]) outFile = nullptr;
/*uWS::App().ws<PerSocketData>("/*", {
.open = [](auto* ws) {
std::cout << "A WebSocket connected!" << std::endl;
std::lock_guard<std::mutex> lock(clients_mutex);
clients.insert(ws);
},
.message = [](auto* ws, std::string_view message, uWS::OpCode opCode) {
ws->send(message, opCode);
},
.close = [](auto* ws, int code, std::string_view message) {
std::cout << "WebSocket closed" << std::endl;
std::lock_guard<std::mutex> lock(clients_mutex);
clients.erase(ws);
}
}).listen(9001, [](auto* listen_socket) {
if (listen_socket) {
std::cout << "Listening on port " << 9001 << std::endl;
}
}).run();
*/
if (inFile) {
reader.open(inFile);
} else {
@ -1211,83 +1093,74 @@ void startHttpServer() {
svr.listen("0.0.0.0", 8090);
}
void testWS() {
/* ws->getUserData returns one of these */
struct PerSocketData {
/* Fill with user data */
std::vector<std::string> topics;
int nr = 0;
};
{
/* Keep in mind that uWS::SSLApp({options}) is the same as uWS::App() when compiled without SSL support.
* You may swap to using uWS:App() if you don't need SSL */
uWS::SSLApp* app = new uWS::SSLApp({
/* There are example certificates in uWebSockets.js repo */
.key_file_name = "misc/key.pem",
.cert_file_name = "misc/cert.pem",
.passphrase = "1234"
});
app->ws<PerSocketData>("/*", {
/* Settings */
.compression = uWS::DISABLED,
.maxPayloadLength = 16 * 1024 * 1024,
.idleTimeout = 60,
.maxBackpressure = 16 * 1024 * 1024,
.closeOnBackpressureLimit = false,
.resetIdleTimeoutOnSend = true,
.sendPingsAutomatically = false,
/* Handlers */
.upgrade = nullptr,
.open = [](auto* ws) {
/* Open event here, you may access ws->getUserData() which points to a PerSocketData struct */
printf("opened\n");
PerSocketData* perSocketData = (PerSocketData*)ws->getUserData();
for (int i = 0; i < 32; i++) {
std::string topic = std::to_string((uintptr_t)ws) + "-" + std::to_string(i);
perSocketData->topics.push_back(topic);
ws->subscribe(topic);
void sendFramesToAllClients() {
std::lock_guard<std::mutex> lock(wsMutex);
for (auto ws : activeWebSockets) {
for (int type = 0; type < ST_MAX; ++type) {
cv::Mat frame;
{
std::lock_guard<std::mutex> lock(frameQueues[type].frameMutex);
if (!frameQueues[type].frameQueue.empty()) {
frame = frameQueues[type].frameQueue.front();
frameQueues[type].frameQueue.pop();
}
},
.message = [&app](auto* ws, std::string_view message, uWS::OpCode opCode) {
PerSocketData* perSocketData = (PerSocketData*)ws->getUserData();
printf("Message: %s\n", message.data());
app->publish(perSocketData->topics[(size_t)(++perSocketData->nr % 32)], message, opCode);
ws->publish(perSocketData->topics[(size_t)(++perSocketData->nr % 32)], message, opCode);
},
.drain = [](auto*/*ws*/) {
/* Check ws->getBufferedAmount() here */
//std::cout << "drain" << std::endl;
},
.ping = [](auto*/*ws*/, std::string_view) {
/* Not implemented yet */
},
.pong = [](auto*/*ws*/, std::string_view) {
/* Not implemented yet */
},
.close = [](auto*/*ws*/, int /*code*/, std::string_view /*message*/) {
/* You may access ws->getUserData() here */
}
}).listen(9001, [](auto* listen_s) {
if (listen_s) {
std::cout << "Listening on port " << 9001 << std::endl;
//listen_socket = listen_s;
}
});
app->run();
if (!frame.empty()) {
std::vector<uchar> jpeg = matToJpeg(frame);
uint32_t size = jpeg.size();
delete app;
// Prepare the message: type (1 byte) + size (4 bytes) + image data
std::string message(1, static_cast<char>(type));
message.append(reinterpret_cast<char*>(&size), 4);
message.append(jpeg.begin(), jpeg.end());
uWS::Loop::get()->free();
ws->send(message, uWS::OpCode::BINARY);
}
}
}
}
void runWebSocketServer() {
auto loop = uWS::Loop::get();
struct us_timer_t* frameTimer = us_create_timer((struct us_loop_t*)loop, 0, 0);
us_timer_set(frameTimer, [](struct us_timer_t*) {
sendFramesToAllClients();
}, 33, 33); // 33ms interval for approximately 30 FPS
uWS::App().ws<PerSocketData>("/*", {
.compression = uWS::SHARED_COMPRESSOR,
.maxPayloadLength = 100 * 1024 * 1024,
.idleTimeout = 0,
.maxBackpressure = 1 * 1024 * 1024,
.closeOnBackpressureLimit = false,
.resetIdleTimeoutOnSend = false,
.sendPingsAutomatically = true,
.open = [](auto* ws) {
std::cout << "New WebSocket connection" << std::endl;
std::lock_guard<std::mutex> lock(wsMutex);
activeWebSockets.push_back(ws);
},
.message = [](auto* ws, std::string_view message, uWS::OpCode opCode) {
// Handle incoming messages if needed
},
.close = [](auto* ws, int code, std::string_view message) {
std::cout << "WebSocket connection closed" << std::endl;
std::lock_guard<std::mutex> lock(wsMutex);
activeWebSockets.erase(std::remove(activeWebSockets.begin(), activeWebSockets.end(), ws), activeWebSockets.end());
}
}).listen(9001, [](auto* listen_socket) {
if (listen_socket) {
std::cout << "Listening on port 9001" << std::endl;
}
}).run();
}
int main(int argc, char **argv) {
int nErrs = 0;
nErrs = ParseMyArgs(argc, argv);
@ -1296,10 +1169,7 @@ int main(int argc, char **argv) {
return nErrs;
}
std::thread testws(&testWS);
TcpServer server(8093);
std::thread serverThread2(&TcpServer::start, &server);
std::thread websocketThread(runWebSocketServer);
FXApp::Err fxErr = FXApp::errNone;
FXApp app;
@ -1351,10 +1221,7 @@ int main(int argc, char **argv) {
}
}
server.close();
//serverThread.join();
serverThread2.join();
websocketThread.join();
if (fxErr) std::cerr << "Error: " << app.errorStringFromCode(fxErr) << std::endl;
return (int)fxErr;

@ -0,0 +1,120 @@
<!DOCTYPE html>
<html>
<head>
<title>Stream Web App</title>
<style>
body {
display: flex;
flex-direction: column;
align-items: center;
font-family: Arial, sans-serif;
}
.streams {
display: flex;
justify-content: center;
background: #ddd;
}
.streams img {
margin: 50px;
outline: 1px solid #000;
}
.stats {
display: flex;
justify-content: space-around;
width: 100%;
margin-top: 20px;
}
</style>
</head>
<body>
<h1>Stream Web App</h1>
<button onclick="connect()">Connect</button>
<input type="text" placeholder="Enter the WebSocket URL" />
<div class="streams">
<img class="stream-1" src="" alt="Original Stream">
<img class="stream-2" src="" alt="Masked Stream">
</div>
<div class="stats">
<h4>Messages received: <label class="message-count">0</label></h4>
<h4>FPS: <label class="fps">0</label></h4>
</div>
<script>
let websocketUrl = 'ws://localhost:9001';
let ws;
const input = document.querySelector('input');
input.value = websocketUrl;
const messageCountElement = document.querySelector('.message-count');
const fpsElement = document.querySelector('.fps');
let frameTimestamps = [];
let lastFpsUpdateTime = 0;
const connect = () => {
websocketUrl = input.value;
console.log('Connecting to the server...', websocketUrl);
ws = new WebSocket(websocketUrl);
ws.onopen = () => {
console.log('Connected to the server');
requestFrame();
};
ws.onmessage = (message) => {
messageCountElement.textContent = Number(messageCountElement.textContent) + 1;
handleMessage(message.data);
};
ws.onclose = () => {
console.log('Disconnected from the server');
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
};
const handleMessage = async (blob) => {
try {
const arrayBuffer = await blob.arrayBuffer();
const dataView = new DataView(arrayBuffer);
const type = dataView.getUint8(0);
const size = dataView.getUint32(1, true); // true for little-endian
const imageData = arrayBuffer.slice(5, 5 + size);
const imageBlob = new Blob([imageData], { type: 'image/jpeg' });
const imageUrl = URL.createObjectURL(imageBlob);
const streamElement = document.querySelector(type === 0 ? '.stream-1' : '.stream-2');
streamElement.onload = () => {
URL.revokeObjectURL(streamElement.src); // Clean up the old object URL
};
streamElement.src = imageUrl;
updateFPS();
// requestFrame();
} catch (error) {
console.error('Error processing message:', error);
// requestFrame();
}
};
// const requestFrame = () => {
// if (ws && ws.readyState === WebSocket.OPEN) {
// ws.send('Next frame');
// }
// };
const updateFPS = () => {
const now = performance.now();
frameTimestamps.push(now);
// Keep only the last 30 frame timestamps
if (frameTimestamps.length > 30) {
frameTimestamps.shift();
}
// Update FPS
if (now - lastFpsUpdateTime > 100) {
const timeElapsed = (frameTimestamps[frameTimestamps.length - 1] - frameTimestamps[0]) / 1000; // in seconds
const fps = (frameTimestamps.length - 1) / timeElapsed;
fpsElement.textContent = fps.toFixed(2);
lastFpsUpdateTime = now;
}
};
</script>
</body>
</html>
Loading…
Cancel
Save