From 4c93ee32df15826de7f49c6b99e81b865d1a6375 Mon Sep 17 00:00:00 2001 From: Benjamin Ruesink Date: Tue, 1 Oct 2024 17:06:10 +0200 Subject: [PATCH] Testing TCP streaming --- samples/AiMaskStreamApp/AiMaskStreamApp.cpp | 333 +++++++++++++------- 1 file changed, 221 insertions(+), 112 deletions(-) diff --git a/samples/AiMaskStreamApp/AiMaskStreamApp.cpp b/samples/AiMaskStreamApp/AiMaskStreamApp.cpp index cc1534b..8d0cff4 100644 --- a/samples/AiMaskStreamApp/AiMaskStreamApp.cpp +++ b/samples/AiMaskStreamApp/AiMaskStreamApp.cpp @@ -37,12 +37,20 @@ #include "nvVideoEffects.h" #include "opencv2/opencv.hpp" -#include "../../../utils/httplib.h" +#include "httplib.h" +#include "mjpeg_streamer.hpp" +//#include + +#include +#include + +#pragma comment(lib, "Ws2_32.lib") + +using MJPEGStreamer = nadjieb::MJPEGStreamer; enum StreamType { ST_ORIGINAL, ST_MASKED, - ST_PROCESSED, // testing ST_MAX }; @@ -53,16 +61,47 @@ struct FrameQueue { std::vector frameQueues(ST_MAX); +struct FrameHolder { + cv::Mat frame; + std::mutex frameMutex; +}; + +std::array frameHolders; + // Convert cv::Mat to JPEG buffer std::vector matToBytes(const cv::Mat& img) { + + std::vector buf; + cv::imencode(".jpg", img, buf); + return buf; +} + +std::vector matToJpeg(const cv::Mat& img) { std::vector buf; cv::imencode(".jpg", img, buf); return buf; } + +void updateFrame(int type, const cv::Mat& frame) { + std::lock_guard lock(frameHolders[type].frameMutex); + frameHolders[type].frame = frame.clone(); +} + + void pushToFrameQueue(int type, const cv::Mat& frame) { + std::vector buffer; + std::vector params = { cv::IMWRITE_JPEG_QUALITY, 80 }; + cv::imencode(".jpg", frame, buffer, params); + std::lock_guard lock(frameQueues[type].frameMutex); - frameQueues[type].frameQueue.push(frame.clone()); + + cv::Mat finalimg = cv::imdecode(buffer, cv::IMREAD_COLOR); + + std::queue empty; + std::swap(frameQueues[type].frameQueue, empty); + + frameQueues[type].frameQueue.push(finalimg.clone()); } #ifdef _MSC_VER @@ -738,6 +777,108 @@ bail: return (FXApp::Err)vfxErr; } +MJPEGStreamer streamer; +std::mutex clients_mutex; +struct PerSocketData { +}; +//std::set*> clients; + +class TcpServer { +public: + TcpServer(int port) : port_(port) {} + void start() { + WSADATA wsaData; + int result = WSAStartup(MAKEWORD(2, 2), &wsaData); + if (result != 0) { + std::cerr << "WSAStartup failed: " << result << std::endl; + return; + } + SOCKET listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (listenSocket == INVALID_SOCKET) { + std::cerr << "Error creating socket: " << WSAGetLastError() << std::endl; + WSACleanup(); + return; + } + sockaddr_in serverAddr; + serverAddr.sin_family = AF_INET; + serverAddr.sin_port = htons(port_); + serverAddr.sin_addr.s_addr = INADDR_ANY; + if (bind(listenSocket, (sockaddr*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR) { + std::cerr << "Bind failed: " << WSAGetLastError() << std::endl; + closesocket(listenSocket); + WSACleanup(); + return; + } + if (listen(listenSocket, SOMAXCONN) == SOCKET_ERROR) { + std::cerr << "Listen failed: " << WSAGetLastError() << std::endl; + closesocket(listenSocket); + WSACleanup(); + return; + } + std::cout << "Server listening on port " << port_ << std::endl; + + while (true) { + SOCKET clientSocket = accept(listenSocket, NULL, NULL); + if (clientSocket == INVALID_SOCKET) { + std::cerr << "Accept failed: " << WSAGetLastError() << std::endl; + continue; + } + + std::cout << "New client connected" << std::endl; + std::thread(&TcpServer::handleClient, this, clientSocket).detach(); + } + + closesocket(listenSocket); + WSACleanup(); + } + + void close() { + WSACleanup(); + } + +private: + void handleClient(SOCKET clientSocket) { + try { + while (true) { + cv::Mat frame1, frame2; + { + std::lock_guard lock1(frameQueues[ST_ORIGINAL].frameMutex); + std::lock_guard lock2(frameQueues[ST_MASKED].frameMutex); + if (!frameQueues[ST_ORIGINAL].frameQueue.empty() && !frameQueues[ST_MASKED].frameQueue.empty()) { + frame1 = frameQueues[ST_ORIGINAL].frameQueue.front(); + frame2 = frameQueues[ST_MASKED].frameQueue.front(); + frameQueues[ST_ORIGINAL].frameQueue.pop(); + frameQueues[ST_MASKED].frameQueue.pop(); + } + } + if (!frame1.empty() && !frame2.empty()) { + std::vector bytes1 = matToBytes(frame1); + std::vector bytes2 = matToBytes(frame2); + + uint32_t size1 = bytes1.size(); + uint32_t size2 = bytes2.size(); + + send(clientSocket, (char*)&size1, sizeof(size1), 0); + send(clientSocket, (char*)bytes1.data(), bytes1.size(), 0); + send(clientSocket, (char*)&size2, sizeof(size2), 0); + send(clientSocket, (char*)bytes2.data(), bytes2.size(), 0); + } + else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } + } + catch (std::exception& e) { + std::cerr << "Exception in thread: " << e.what() << std::endl; + } + + closesocket(clientSocket); + } + + const char* ip_; + int port_; +}; + FXApp::Err FXApp::processMovie(const char *inFile, const char *outFile) { float ms = 0.0f; FXApp::Err appErr = errNone; @@ -752,6 +893,29 @@ FXApp::Err FXApp::processMovie(const char *inFile, const char *outFile) { if (inFile && !inFile[0]) inFile = nullptr; // Set file paths to NULL if zero length if (outFile && !outFile[0]) outFile = nullptr; + + /*uWS::App().ws("/*", { + .open = [](auto* ws) { + std::cout << "A WebSocket connected!" << std::endl; + std::lock_guard lock(clients_mutex); + clients.insert(ws); + }, + .message = [](auto* ws, std::string_view message, uWS::OpCode opCode) { + ws->send(message, opCode); + }, + .close = [](auto* ws, int code, std::string_view message) { + std::cout << "WebSocket closed" << std::endl; + std::lock_guard lock(clients_mutex); + clients.erase(ws); + } + }).listen(9001, [](auto* listen_socket) { + if (listen_socket) { + std::cout << "Listening on port " << 9001 << std::endl; + } + }).run(); + +*/ + if (inFile) { reader.open(inFile); @@ -954,89 +1118,41 @@ FXApp::Err FXApp::processMovie(const char *inFile, const char *outFile) { if (_show) { drawFrameRate(result); - // Ensure _dstImg is grayscale - if (_dstImg.channels() == 3) { - cv::cvtColor(_dstImg, _dstImg, cv::COLOR_BGR2GRAY); - } + static std::vector params = { cv::IMWRITE_JPEG_QUALITY, 90 }; - // Find edges - cv::Mat edges; - cv::Canny(_dstImg, edges, 100, 200); + /*std::vector buff_bgr; + cv::imencode(".jpg", originalImg, buff_bgr, params); + streamer.publish("/original", std::string(buff_bgr.begin(), buff_bgr.end()));*/ - // Dilate the edges - cv::Mat dilatedEdges; - const int iterations = 3; // higher = less detail but less errors - cv::dilate(edges, dilatedEdges, cv::Mat(), cv::Point(-1, -1), iterations); + //cv::Mat hsv; + //cv::cvtColor(result, hsv, cv::COLOR_BGR2HSV); - // Ensure _srcImg and outlineResult are the same size and type - cv::Mat outlineResult = cv::Mat::zeros(_srcImg.size(), _srcImg.type()); - cv::Mat dilatedMask; - cv::dilate(_dstImg, dilatedMask, cv::Mat(), cv::Point(-1, -1), iterations); + // http://localhost:8080/hsv + /*std::vector buff_hsv; + cv::imencode(".jpg", result, buff_hsv, params); + streamer.publish("/masked", std::string(buff_hsv.begin(), buff_hsv.end()));*/ - // Convert dilatedMask to three channels if needed - if (dilatedMask.channels() == 1) { - cv::cvtColor(dilatedMask, dilatedMask, cv::COLOR_GRAY2BGR); - } - // Find contours - std::vector> contours; - std::vector hierarchy; - cv::findContours(dilatedEdges, contours, hierarchy, cv::RETR_EXTERNAL, cv::CHAIN_APPROX_SIMPLE); - - // Find the largest contour - cv::Mat contourTest = cv::Mat::zeros(_srcImg.size(), _srcImg.type()); - double maxArea = 0; - int largestContourIndex = -1; - for (size_t i = 0; i < contours.size(); ++i) { - double area = cv::contourArea(contours[i]); - if (area > maxArea) { - maxArea = area; - largestContourIndex = i; - } - } + pushToFrameQueue(ST_ORIGINAL, originalImg); + pushToFrameQueue(ST_MASKED, result); - if (largestContourIndex >= 0) { - // Fill the largest contour - cv::drawContours(contourTest, contours, largestContourIndex, cv::Scalar(255, 255, 0), cv::FILLED); - } + /* std::vector buf; + cv::imencode(".jpg", originalImg, buf); + std::string encoded(buf.begin(), buf.end()); - // Convert contourTest to three channels if needed - if (contourTest.channels() == 1) { - cv::cvtColor(contourTest, contourTest, cv::COLOR_GRAY2BGR); - } + std::lock_guard lock(clients_mutex);*/ + /*for (auto ws : clients) { + ws->send(encoded, uWS::OpCode::BINARY); + }*/ - // Combine the dilated mask and filled contour - cv::Mat filledRegion; - cv::bitwise_or(dilatedMask, contourTest, filledRegion); - - // Apply the outline color to the filled region - cv::Vec3b outlineColor(0, 0, 255); - for (int y = 0; y < filledRegion.rows; ++y) { - for (int x = 0; x < filledRegion.cols; ++x) { - if (filledRegion.at(y, x) != cv::Vec3b(0, 0, 0)) { - outlineResult.at(y, x) = outlineColor; - } - } - } + //std::this_thread::sleep_for(std::chrono::milliseconds(33)); // ~30 fps - // Regular outeline - cv::Vec3b outlineColor2(255, 0, 255); - for (int y = 0; y < dilatedEdges.rows; ++y) { - for (int x = 0; x < dilatedEdges.cols; ++x) { - if (dilatedEdges.at(y, x) > 0) { - outlineResult.at(y, x) = outlineColor2; - } - } - } - - pushToFrameQueue(ST_ORIGINAL, originalImg); - pushToFrameQueue(ST_MASKED, result); - pushToFrameQueue(ST_PROCESSED, outlineResult); + //pushToFrameQueue(ST_PROCESSED, outlineResult); // Display the results - cv::imshow("Original", originalImg); - cv::imshow("Overlay", result); - cv::imshow("OutlineTest", outlineResult); + //cv::imshow("Original", originalImg); + //cv::imshow("Overlay", result); + //cv::imshow("OutlineTest", outlineResult); int key = cv::waitKey(1); if (key > 0) { @@ -1092,54 +1208,38 @@ bool isCompModeEnumValid(const FXApp::CompMode& mode) void startHttpServer() { httplib::Server svr; - auto streamHandler = [](StreamType streamType) { + auto frameHandler = [](StreamType streamType) { return [streamType](const httplib::Request&, httplib::Response& res) { - res.set_content_provider( - "multipart/x-mixed-replace; boundary=frame", - [streamType](size_t offset, httplib::DataSink& sink) { - while (true) { - cv::Mat frame; - { - std::lock_guard lock(frameQueues[streamType].frameMutex); - if (!frameQueues[streamType].frameQueue.empty()) { - frame = frameQueues[streamType].frameQueue.front(); - frameQueues[streamType].frameQueue.pop(); - } - } - - if (!frame.empty()) { - std::vector bytes = matToBytes(frame); - - // Write multipart frame - std::string header = "--frame\r\nContent-Type: image/jpeg\r\n\r\n"; - sink.write(header.data(), header.size()); - sink.write(reinterpret_cast(bytes.data()), bytes.size()); - sink.write("\r\n", 2); - } - else { - std::this_thread::sleep_for(std::chrono::milliseconds(30)); // Wait to avoid busy-waiting - } - } - - return true; // Continue streaming + cv::Mat frame; + { + std::lock_guard lock(frameQueues[streamType].frameMutex); + if (!frameQueues[streamType].frameQueue.empty()) { + frame = frameQueues[streamType].frameQueue.front(); + frameQueues[streamType].frameQueue.pop(); } - ); + } + if (!frame.empty()) { + std::vector jpegData = matToBytes(frame); + res.set_content(std::string(jpegData.begin(), jpegData.end()), "image/jpeg"); + } + else { + res.status = 404; + res.set_content("No frame available", "text/plain"); + } }; }; - // Array of route names corresponding to StreamType enum const std::array routeNames = { - "original", "masked", "processed" + "original", "masked" }; - // Set up routes for each frame type for (int i = 0; i < ST_MAX; ++i) { std::string route = "/video_" + routeNames[i]; - svr.Get(route, streamHandler((StreamType)i)); - printf("Streaming %s at http://localhost:8080%s\n", routeNames[i].c_str(), route.c_str()); + svr.Get(route, frameHandler((StreamType)i)); + printf("Serving %s frames at http://localhost:8080%s\n", routeNames[i].c_str(), route.c_str()); } - svr.listen("0.0.0.0", 8080); // Start server on port 8080 + svr.listen("0.0.0.0", 8090); } int main(int argc, char **argv) { @@ -1150,7 +1250,11 @@ int main(int argc, char **argv) { return nErrs; } - std::thread serverThread(startHttpServer); + /*std::thread serverThread(startHttpServer); + streamer.start(8001);*/ + + TcpServer server(8093); + std::thread serverThread2(&TcpServer::start, &server); //httplib::Server svr; @@ -1239,6 +1343,11 @@ int main(int argc, char **argv) { } } + server.close(); + + //serverThread.join(); + serverThread2.join(); + if (fxErr) std::cerr << "Error: " << app.errorStringFromCode(fxErr) << std::endl; return (int)fxErr; } \ No newline at end of file