From b3743560bb4849d2559d1521e50fb905dd05e73e Mon Sep 17 00:00:00 2001 From: Hayodea Hekol Date: Sat, 15 Nov 2025 22:02:30 -0400 Subject: [PATCH] IoUringAssmEngn: detect assembly end condition w/eventfdDesc validity We can simplify and universalize the logic here by acknowledging that assemblyCycleComplete() will always destroy the current eventfdDesc object, so we can just check that to see whether we should continue the assembly cycle. --- .../livoxGen1/ioUringAssemblyEngine.cpp | 25 +++++++++---------- .../livoxGen1/ioUringAssemblyEngine.h | 2 +- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp index 6246d64..c586bde 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp @@ -309,7 +309,7 @@ void IoUringAssemblyEngine::assemblyCycleComplete() { // Cancel in-flight stall timeout timer stallTimer.cancel(); - onCqeReadyCallback = std::move([](void *, int, bool&){}); + onCqeReadyCallback = std::move([](void *, int){}); if (frameAssemblyDesc) { @@ -441,8 +441,7 @@ public: engine.resetAndAssembleFrame( std::bind(&AssembleFrameReq::assembleFrameReq2_2, context.get(), context, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3)); + std::placeholders::_1, std::placeholders::_2)); // Set up timeout timer for CONFIG_STIMBUFF_FRAME_PERIOD_MS/2 ms engine.stallTimer.expires_from_now( @@ -485,13 +484,12 @@ public: // Set timer fired flag context->timerFired.store(true); - bool dummyStopListening = false; // Timer path doesn't need this - context->assembleFrameReq3(context, dummyStopListening); + context->assembleFrameReq3(context); } void assembleFrameReq2_2( std::shared_ptr context, - void *user_data, int cqe_result, bool& stopListeningOnEventFd) + void *user_data, int cqe_result) { // NB: The lock was acquired by onEventFdRead before calling this func if (!context->engine.shouldAcceptRequests) @@ -515,13 +513,12 @@ public: success)) { // Loop is complete, call oracle function - context->assembleFrameReq3(context, stopListeningOnEventFd); + context->assembleFrameReq3(context); } } void assembleFrameReq3( - std::shared_ptr context, - bool& stopListeningOnEventFd + std::shared_ptr context ) { /** EXPLANATION: @@ -533,7 +530,6 @@ public: if (context->handlerExecuted.exchange(true)) { return; } // Cancel the timer, stop the engine and process frame, if any. context->engine.assemblyCycleComplete(); - stopListeningOnEventFd = true; /** EXPLANATION: * Timeout doesn't necessarily mean error. @@ -665,7 +661,6 @@ void IoUringAssemblyEngine::onEventfdRead( */ // Process all available CQEs and call callback for each one - bool stopListeningOnEventFd = false; struct io_uring_cqe *cqe; while (io_uring_peek_cqe(&ring, &cqe) == 0) { @@ -689,7 +684,7 @@ void IoUringAssemblyEngine::onEventfdRead( * because of this. */ if (onCqeReadyCallback) { - onCqeReadyCallback(user_data, cqe_result, stopListeningOnEventFd); + onCqeReadyCallback(user_data, cqe_result); } } @@ -697,7 +692,11 @@ void IoUringAssemblyEngine::onEventfdRead( * But we do put a `return` here because we know that at this point, the * caller's callback has already been invoked. */ - if (!shouldAcceptRequests || stopListeningOnEventFd) { return; } + if (!shouldAcceptRequests + || eventfdDesc == nullptr || !eventfdDesc->is_open()) + { + return; + } // Re-arm the eventfd read for next CQE notification eventfdDesc->async_read_some( diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h index 2a6876a..a38b4d2 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h @@ -47,7 +47,7 @@ public: { return nSucceeded != 0 && nTotal != 0 && nSucceeded != nTotal; } private: - typedef std::function resetAndAssembleFrameCbFn; + typedef std::function resetAndAssembleFrameCbFn; void resetAndAssembleFrame(resetAndAssembleFrameCbFn onCqeReady); void assemblyCycleComplete(); bool stop();