Log amount of data received, extra logging, add args for showing window/changing WS port

master
Benjamin Ruesink 2 months ago
parent 5532b45bbb
commit eb0026ae08

@ -38,16 +38,9 @@
#include "opencv2/opencv.hpp"
#define UWS_NO_ZLIB
#include "httplib.h"
#include "App.h"
//#include <uWebSockets/App.h>
#include <WinSock2.h>
#include <WS2tcpip.h>
#pragma comment(lib, "Ws2_32.lib")
//#pragma comment(lib, "uSockets.lib")
enum StreamType {
ST_ORIGINAL,
@ -70,39 +63,42 @@ struct FrameHolder {
std::array<FrameHolder, ST_MAX> frameHolders;
// Convert cv::Mat to JPEG buffer
std::vector<uchar> matToBytes(const cv::Mat& img) {
std::vector<uchar> matToBytes(const cv::Mat& img)
{
std::vector<uchar> buf;
cv::imencode(".jpg", img, buf);
return buf;
}
std::vector<uchar> matToJpeg(const cv::Mat& img) {
std::vector<uchar> matToJpeg(const cv::Mat& img)
{
std::vector<uchar> buf;
cv::imencode(".jpg", img, buf);
return buf;
}
void updateFrame(int type, const cv::Mat& frame) {
void updateFrame(int type, const cv::Mat& frame)
{
std::lock_guard<std::mutex> lock(frameHolders[type].frameMutex);
frameHolders[type].frame = frame.clone();
}
cv::Mat compressFrame(const cv::Mat& frame, int quality = 80)
{
std::vector<uchar> buf;
std::vector<int> params = { cv::IMWRITE_JPEG_QUALITY, quality };
cv::imencode(".jpg", frame, buf, params);
return cv::imdecode(buf, cv::IMREAD_COLOR);
}
void pushToFrameQueue(int type, const cv::Mat& frame) {
std::vector<uchar> buffer;
std::vector<int> params = { cv::IMWRITE_JPEG_QUALITY, 80 };
cv::imencode(".jpg", frame, buffer, params);
std::lock_guard<std::mutex> lock(frameQueues[type].frameMutex);
cv::Mat finalimg = cv::imdecode(buffer, cv::IMREAD_COLOR);
//cv::Mat compressedFrame = compressFrame(frame, 50);
// clear the queue
std::queue<cv::Mat> empty;
std::swap(frameQueues[type].frameQueue, empty);
frameQueues[type].frameQueue.push(finalimg.clone());
frameQueues[type].frameQueue.push(frame.clone());
}
struct PerSocketData {
@ -155,6 +151,8 @@ std::string FLAG_outDir;
std::string FLAG_outFile;
std::string FLAG_bgFile;
int FLAG_camIndex = 0;
int FLAG_wsPort = 9001;
bool FLAG_showWindow = false;
static bool GetFlagArgVal(const char *flag, const char *arg, const char **val) {
if (*arg != '-') return false;
@ -254,7 +252,8 @@ static int ParseMyArgs(int argc, char **argv) {
GetFlagArgVal("cam_res", arg, &FLAG_camRes) || GetFlagArgVal("mode", arg, &FLAG_mode) ||
GetFlagArgVal("progress", arg, &FLAG_progress) || GetFlagArgVal("show", arg, &FLAG_show) ||
GetFlagArgVal("comp_mode", arg, &FLAG_compMode) || GetFlagArgVal("blur_strength", arg, &FLAG_blurStrength) ||
GetFlagArgVal("cuda_graph", arg, &FLAG_cudaGraph) || GetFlagArgVal("cam_index", arg, &FLAG_camIndex))) {
GetFlagArgVal("cuda_graph", arg, &FLAG_cudaGraph) || GetFlagArgVal("cam_index", arg, &FLAG_camIndex) ||
GetFlagArgVal("ws_port", arg, &FLAG_wsPort) || GetFlagArgVal("show_window", arg, &FLAG_showWindow))) {
continue;
} else if (GetFlagArgVal("help", arg, &help)) {
return NVCV_ERR_HELP;
@ -424,7 +423,6 @@ struct FXApp {
void destroyEffect();
NvCV_Status allocBuffers(unsigned width, unsigned height);
NvCV_Status allocTempBuffers();
Err processImage(const char *inFile, const char *outFile);
Err processMovie(const char *inFile, const char *outFile);
Err processKey(int key);
void nextCompMode();
@ -533,30 +531,10 @@ FXApp::Err FXApp::processKey(int key) {
case 'q':
case ESC_KEY:
return errQuit;
case 'c':
case 'C':
nextCompMode();
break;
case 'f':
case 'F':
_showFPS = !_showFPS;
break;
case 'p':
case 'P':
case '%':
_progress = !_progress;
break;
case 'm':
_blurStrength += 0.05f;
if (_blurStrength > 1.0) {
_blurStrength = 1.0;
}
break;
case 'n':
_blurStrength -= 0.05f;
if (_blurStrength < 0.0) {
_blurStrength = 0.0;
}
break;
default:
break;
@ -712,78 +690,6 @@ static NvCV_Status WriteRGBA(const NvCVImage *bgr, const NvCVImage *a, const std
return cv::imwrite(name, ocv) ? NVCV_SUCCESS : NVCV_ERR_WRITE;
}
FXApp::Err FXApp::processImage(const char *inFile, const char *outFile) {
NvCV_Status vfxErr;
bool ok;
cv::Mat result;
NvCVImage fxSrcChunkyGPU, fxDstChunkyGPU;
// Allocate space for batchOfStates to hold state variable addresses
// Assume that MODEL_BATCH Size is enough for this scenario
unsigned int modelBatch = 1;
BAIL_IF_ERR(vfxErr = NvVFX_GetU32(_eff, NVVFX_MODEL_BATCH, &modelBatch));
_batchOfStates = (NvVFX_StateObjectHandle*) malloc(sizeof(NvVFX_StateObjectHandle) * modelBatch);
if (_batchOfStates == nullptr) {
vfxErr = NVCV_ERR_MEMORY;
goto bail;
}
if (!_eff) return errEffect;
_srcImg = cv::imread(inFile);
if (!_srcImg.data) return errRead;
_dstImg = cv::Mat::zeros(_srcImg.size(), CV_8UC1);
if (!_dstImg.data) return errMemory;
(void)NVWrapperForCVMat(&_srcImg, &_srcVFX);
(void)NVWrapperForCVMat(&_dstImg, &_dstVFX);
if (!fxSrcChunkyGPU.pixels)
{
BAIL_IF_ERR(vfxErr =
NvCVImage_Alloc(&fxSrcChunkyGPU, _srcImg.cols, _srcImg.rows, NVCV_BGR, NVCV_U8, NVCV_CHUNKY, NVCV_GPU, 1));
}
if (!fxDstChunkyGPU.pixels) {
BAIL_IF_ERR(vfxErr =
NvCVImage_Alloc(&fxDstChunkyGPU, _srcImg.cols, _srcImg.rows, NVCV_A, NVCV_U8, NVCV_CHUNKY, NVCV_GPU, 1));
}
BAIL_IF_ERR(vfxErr = NvVFX_SetImage(_eff, NVVFX_INPUT_IMAGE, &fxSrcChunkyGPU));
BAIL_IF_ERR(vfxErr = NvVFX_SetImage(_eff, NVVFX_OUTPUT_IMAGE, &fxDstChunkyGPU));
BAIL_IF_ERR(vfxErr = NvCVImage_Transfer(&_srcVFX, &fxSrcChunkyGPU, 1.0f, _stream, NULL));
// Assign states from stateArray in batchOfStates
// There is only one stream in this app
_batchOfStates[0] = _stateArray[0];
BAIL_IF_ERR(vfxErr = NvVFX_SetStateObjectHandleArray(_eff, NVVFX_STATE, _batchOfStates));
BAIL_IF_ERR(vfxErr = NvVFX_Run(_eff, 0));
BAIL_IF_ERR(vfxErr = NvCVImage_Transfer(&fxDstChunkyGPU, &_dstVFX, 1.0f, _stream, NULL));
overlay(_srcImg, _dstImg, 0.5, result);
if (!std::string(outFile).empty()) {
if(IsLossyImageFile(outFile))
fprintf(stderr, "WARNING: JPEG output file format will reduce image quality\n");
vfxErr = WriteRGBA(&_srcVFX, &_dstVFX, outFile);
if (NVCV_SUCCESS != vfxErr) {
printf("%s: \"%s\"\n", NvCV_GetErrorStringFromCode(vfxErr), outFile);
goto bail;
}
ok = cv::imwrite(std::string(outFile) + "_segmentation_mask.png", _dstImg); // save segmentation mask too
if (!ok) {
printf("Error writing: \"%s_segmentation_mask.png\"\n", outFile);
return errWrite;
}
}
if (_show) {
cv::imshow("Output", result);
cv::waitKey(3000);
}
bail:
return (FXApp::Err)vfxErr;
}
FXApp::Err FXApp::processMovie(const char *inFile, const char *outFile) {
float ms = 0.0f;
FXApp::Err appErr = errNone;
@ -1000,11 +906,14 @@ FXApp::Err FXApp::processMovie(const char *inFile, const char *outFile) {
if (_show) {
drawFrameRate(result);
static std::vector<int> params = { cv::IMWRITE_JPEG_QUALITY, 90 };
pushToFrameQueue(ST_ORIGINAL, originalImg);
pushToFrameQueue(ST_MASKED, result);
if (FLAG_showWindow) {
cv::imshow("Original", originalImg);
cv::imshow("Masked", result);
}
int key = cv::waitKey(1);
if (key > 0) {
appErr = processKey(key);
@ -1043,57 +952,19 @@ int chooseGPU() {
bool isCompModeEnumValid(const FXApp::CompMode& mode)
{
if (mode != FXApp::CompMode::compMatte &&
mode != FXApp::CompMode::compLight &&
mode != FXApp::CompMode::compGreen &&
mode != FXApp::CompMode::compWhite &&
mode != FXApp::CompMode::compNone &&
mode != FXApp::CompMode::compBG &&
mode != FXApp::CompMode::compBlur)
if (mode != FXApp::CompMode::compMatte &&
mode != FXApp::CompMode::compLight &&
mode != FXApp::CompMode::compGreen &&
mode != FXApp::CompMode::compWhite &&
mode != FXApp::CompMode::compNone &&
mode != FXApp::CompMode::compBG &&
mode != FXApp::CompMode::compBlur)
{
return false;
return false;
}
return true;
}
void startHttpServer() {
httplib::Server svr;
auto frameHandler = [](StreamType streamType) {
return [streamType](const httplib::Request&, httplib::Response& res) {
cv::Mat frame;
{
std::lock_guard<std::mutex> lock(frameQueues[streamType].frameMutex);
if (!frameQueues[streamType].frameQueue.empty()) {
frame = frameQueues[streamType].frameQueue.front();
frameQueues[streamType].frameQueue.pop();
}
}
if (!frame.empty()) {
std::vector<uchar> jpegData = matToBytes(frame);
res.set_content(std::string(jpegData.begin(), jpegData.end()), "image/jpeg");
}
else {
res.status = 404;
res.set_content("No frame available", "text/plain");
}
};
};
const std::array<std::string, ST_MAX> routeNames = {
"original", "masked"
};
for (int i = 0; i < ST_MAX; ++i) {
std::string route = "/video_" + routeNames[i];
svr.Get(route, frameHandler((StreamType)i));
printf("Serving %s frames at http://localhost:8080%s\n", routeNames[i].c_str(), route.c_str());
}
svr.listen("0.0.0.0", 8090);
}
void sendFramesToAllClients() {
std::lock_guard<std::mutex> lock(wsMutex);
for (auto ws : activeWebSockets) {
@ -1122,8 +993,7 @@ void sendFramesToAllClients() {
}
}
void runWebSocketServer() {
void RunWebSocketServer() {
auto loop = uWS::Loop::get();
struct us_timer_t* frameTimer = us_create_timer((struct us_loop_t*)loop, 0, 0);
@ -1150,12 +1020,13 @@ void runWebSocketServer() {
},
.close = [](auto* ws, int code, std::string_view message) {
std::cout << "WebSocket connection closed" << std::endl;
std::cout << "Active connections: " << activeWebSockets.size() - 1 << std::endl;
std::lock_guard<std::mutex> lock(wsMutex);
activeWebSockets.erase(std::remove(activeWebSockets.begin(), activeWebSockets.end(), ws), activeWebSockets.end());
}
}).listen(9001, [](auto* listen_socket) {
}).listen(FLAG_wsPort, [](auto* listen_socket) {
if (listen_socket) {
std::cout << "Listening on port 9001" << std::endl;
std::cout << "Listening on port " << FLAG_wsPort << std::endl;
}
}).run();
}
@ -1169,7 +1040,7 @@ int main(int argc, char **argv) {
return nErrs;
}
std::thread websocketThread(runWebSocketServer);
std::thread websocketThread(RunWebSocketServer);
FXApp::Err fxErr = FXApp::errNone;
FXApp app;
@ -1209,9 +1080,6 @@ int main(int argc, char **argv) {
} else {
fxErr = app.appErrFromVfxStatus(app.createAigsEffect());
if (FXApp::errNone == fxErr) {
if (IsImageFile(FLAG_inFile.c_str()))
fxErr = app.processImage(FLAG_inFile.c_str(), FLAG_outFile.c_str());
else
fxErr = app.processMovie(FLAG_inFile.c_str(), FLAG_outFile.c_str());
if (fxErr == FXApp::errNone || fxErr == FXApp::errQuit) {
fxErr = FXApp::errNone; // Quitting isn't an error

@ -37,6 +37,7 @@
<div class="stats">
<h4>Messages received: <label class="message-count">0</label></h4>
<h4>FPS: <label class="fps">0</label></h4>
<h4>Total MB received: <label class="total-mb">0</label></h4>
</div>
<script>
let websocketUrl = 'ws://localhost:9001';
@ -45,9 +46,10 @@
input.value = websocketUrl;
const messageCountElement = document.querySelector('.message-count');
const fpsElement = document.querySelector('.fps');
const totalMBElement = document.querySelector('.total-mb');
let frameTimestamps = [];
let lastFpsUpdateTime = 0;
let totalBytesReceived = 0;
const connect = () => {
websocketUrl = input.value;
@ -55,7 +57,6 @@
ws = new WebSocket(websocketUrl);
ws.onopen = () => {
console.log('Connected to the server');
requestFrame();
};
ws.onmessage = (message) => {
messageCountElement.textContent = Number(messageCountElement.textContent) + 1;
@ -74,7 +75,7 @@
const arrayBuffer = await blob.arrayBuffer();
const dataView = new DataView(arrayBuffer);
const type = dataView.getUint8(0);
const size = dataView.getUint32(1, true); // true for little-endian
const size = dataView.getUint32(1, true);
const imageData = arrayBuffer.slice(5, 5 + size);
const imageBlob = new Blob([imageData], { type: 'image/jpeg' });
const imageUrl = URL.createObjectURL(imageBlob);
@ -84,29 +85,23 @@
};
streamElement.src = imageUrl;
// Update total MB received
totalBytesReceived += arrayBuffer.byteLength;
totalMBElement.textContent = (totalBytesReceived / (1024 * 1024)).toFixed(2);
updateFPS();
// requestFrame();
} catch (error) {
console.error('Error processing message:', error);
// requestFrame();
}
};
// const requestFrame = () => {
// if (ws && ws.readyState === WebSocket.OPEN) {
// ws.send('Next frame');
// }
// };
const updateFPS = () => {
const now = performance.now();
frameTimestamps.push(now);
// Keep only the last 30 frame timestamps
if (frameTimestamps.length > 30) {
frameTimestamps.shift();
}
// Update FPS
if (now - lastFpsUpdateTime > 100) {
const timeElapsed = (frameTimestamps[frameTimestamps.length - 1] - frameTimestamps[0]) / 1000; // in seconds

Loading…
Cancel
Save