From 1df43665c3607c4d36a1f002b386fd6050e355b2 Mon Sep 17 00:00:00 2001 From: Hayodea Hekol Date: Thu, 13 Nov 2025 23:53:31 -0400 Subject: [PATCH] IoUringAssmEngn: Implement shouldAcceptRequests daemon/async control We've reworked the synchronous control functions that govern the async daemon and in-flight requests for this class. The shouldAcceptRequests flag represents the readiness state of the whole engine class. The in-flight async operations consult the shouldAcceptRequests flag to determine whether they should return early. Now the stop() method is solely for setting the locked flag shouldAcceptRequests=false. The pair resetAndAssembleFrame()/assemblyCycleComplete manage the per-assembly cycle state machine, and they don't need to set or interfere with the shouldAcceptRequests flag. --- .../livoxGen1/ioUringAssemblyEngine.cpp | 161 +++++++++++------- .../livoxGen1/ioUringAssemblyEngine.h | 17 +- 2 files changed, 105 insertions(+), 73 deletions(-) diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp index e2ba7a0..ef22923 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp @@ -59,10 +59,9 @@ IoUringAssemblyEngine::IoUringAssemblyEngine( PcloudStimulusBuffer& parent_, size_t nDgramsPerStagingBufferFrame_) : parent(parent_), frameAssemblyDesc(nullptr), ring{}, -isSetup(false), eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0), stallTimer(parent_.device->componentThread->getIoService()), -isAssembling(false), +shouldAcceptRequests(false), nDgramsPerStagingBufferFrame(nDgramsPerStagingBufferFrame_), assembledSlotsTracker(nDgramsPerStagingBufferFrame_), randomDevice(), randomGenerator(randomDevice()) @@ -70,8 +69,15 @@ randomDevice(), randomGenerator(randomDevice()) bool IoUringAssemblyEngine::setup() { - if (isSetup) - { return false; } + // Defensive check to prevent double-calling + { + SpinLock::Guard lock(shouldAcceptRequestsLock); + if (shouldAcceptRequests) + { + throw std::runtime_error(std::string(__func__) + ": setup() called " + "while already set up"); + } + } // Get FrameAssemblyDesc from staging buffer frameAssemblyDesc = static_cast>( @@ -152,7 +158,7 @@ bool IoUringAssemblyEngine::setup() if (ret < 0) { goto cleanup_eventfd; } - isSetup = true; + shouldAcceptRequests = true; return true; cleanup_eventfd: @@ -170,8 +176,7 @@ cleanup: void IoUringAssemblyEngine::finalize() { - // Call stop() to cancel in-flight operations (stop() already cancels the timer) - stop(); + bool wasAcceptingRequests = stop(); if (eventfdFd >= 0) { @@ -180,13 +185,12 @@ void IoUringAssemblyEngine::finalize() eventfdFd = -1; } - if (isSetup) + if (wasAcceptingRequests) { #ifdef REGISTER_IOURING_BUFFERS io_uring_unregister_buffers(&ring); #endif io_uring_queue_exit(&ring); - isSetup = false; } // Reset state to allow setup() to be called again @@ -202,13 +206,10 @@ void IoUringAssemblyEngine::resetAndAssembleFrame( + ": onCqeReady callback is invalid"); } - if (!frameAssemblyDesc || !pcloudDataFdDesc || eventfdFd < 0) + if (!shouldAcceptRequests) { throw std::runtime_error(std::string(__func__) - + ": invalid state: " - + ( !frameAssemblyDesc ? "frameAssemblyDesc is null; " : "" ) - + ( !pcloudDataFdDesc ? "pcloudDataFdDesc is null; " : "" ) - + ( eventfdFd < 0 ? "eventfdFd is invalid." : "" )); + + ": engine is not accepting requests"); } // eventfdDesc should not be valid when resetAndAssembleFrame is called @@ -286,8 +287,6 @@ void IoUringAssemblyEngine::resetAndAssembleFrame( + " (errno=" + std::to_string(errno) + ")"); } - // Set assembly flag - isAssembling = true; // Start listening for CQE notifications on eventfd eventfdDesc->async_read_some( boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)), @@ -297,40 +296,22 @@ void IoUringAssemblyEngine::resetAndAssembleFrame( std::placeholders::_2)); } -void IoUringAssemblyEngine::stop(bool doAcquireLock) +bool IoUringAssemblyEngine::stop() { - // Clear assembly flag first to signal onEventfdRead to stop re-arming // Acquire and release lock tightly around setting the flag - if (doAcquireLock) - { - SpinLock::Guard lock(isAssemblingLock); - isAssembling = false; - } else { - isAssembling = false; - } - - /** FIXME: - * There's a problem with this bridge here. - * - * We can't delay during every call to stop because under normal operating - * conditions, this whole assembly process should be able to move as fast - * as possible and to receive as much data as possible without maximum - * throughput. - * - * Yet we need to delay briefly here to ensure that the onEventfdRead loop - * has a chance to see the flag and halt. - * - * We need to analyze this carefully and figure out what the correct - * conditions are for being certain that we aren't destroying state while - * the eventfdRead loop is still running; and we need to figure out how to - * ensure that we only delay when absolutely necessary. - */ + SpinLock::Guard lock(shouldAcceptRequestsLock); + bool wasAcceptingRequests = shouldAcceptRequests; + shouldAcceptRequests = false; + return wasAcceptingRequests; +} +void IoUringAssemblyEngine::assemblyCycleComplete() +{ // Cancel in-flight stall timeout timer stallTimer.cancel(); - onCqeReadyCallback = std::move([](void *, int){}); + onCqeReadyCallback = std::move([](void *, int, bool&){}); - if (isSetup) + if (frameAssemblyDesc) { struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); if (!sqe) @@ -380,8 +361,9 @@ cleanup_eventfd: { /** EXPLANATION: * The goal here is to ensure that our io_service's event loop will not - * get any events from the eventfd after we've called stop(). So we - * completely deinitialize the eventfd descriptor. + * get any events from the eventfd after we've called + * assemblyCycleComplete(). So we completely deinitialize the eventfd + * descriptor. * * But we still want to reuse the underlying eventfd file descriptor, * itself in the next resetAndAssembleFrame() cycle, so we call @@ -392,6 +374,18 @@ cleanup_eventfd: * io_service before releasing it, otherwise Boost.Asio will complain * when we try to create a new descriptor with the same fd. */ + /** CAVEAT: + * There's a rare but real race condition here where the eventfd gets an + * event signaled on it, and while boost is internally processing that + * event to enqeue our handler, we call cancel() and release() here. + * If boost internally has locking on the stream_descriptor object, + * this should be fine. But just in case it doesn't, I'm just + * documenting that possibility here. + * + * There's nothing we can really do about it except know that it would + * be very rarely happen; and that we can't do anything about it short + * of modifying the boost.Asio code. + */ eventfdDesc->cancel(); eventfdDesc->release(); /* Destroy the descriptor object (now that it's unregistered, destroying @@ -427,10 +421,12 @@ public: void assembleFrameReq1_posted( std::shared_ptr context) { - if (!engine.frameAssemblyDesc) + SpinLock::Guard lock(engine.shouldAcceptRequestsLock); + + if (!engine.shouldAcceptRequests) { - throw std::runtime_error(std::string(__func__) - + ": frameAssemblyDesc is null"); + context->callOriginalCallback(false, AsynchronousLoop(0)); + return; } // Initialize loop with number of slots @@ -445,7 +441,8 @@ public: engine.resetAndAssembleFrame( std::bind(&AssembleFrameReq::assembleFrameReq2_2, context.get(), context, - std::placeholders::_1, std::placeholders::_2)); + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3)); // Set up timeout timer for CONFIG_STIMBUFF_FRAME_PERIOD_MS/2 ms engine.stallTimer.expires_from_now( @@ -476,25 +473,30 @@ public: * indeed seen a SEGFAULT even in the current code with locking, so * I'm going to hold the lock here for now. */ - SpinLock::Guard lock(context->engine.isAssemblingLock); + SpinLock::Guard lock(context->engine.shouldAcceptRequestsLock); - if (!context->engine.isAssembling) + if (!context->engine.shouldAcceptRequests) { + context->engine.assemblyCycleComplete(); + context->loop.setRemainingIterationsToFailure(); context->callOriginalCallback(false, context->loop); return; } // Set timer fired flag context->timerFired.store(true); - context->assembleFrameReq3(context); + bool dummyStopListening = false; // Timer path doesn't need this + context->assembleFrameReq3(context, dummyStopListening); } void assembleFrameReq2_2( std::shared_ptr context, - void *user_data, int cqe_result) + void *user_data, int cqe_result, bool& stopListeningOnEventFd) { - if (!context->engine.isAssembling) + // NB: The lock was acquired by onEventFdRead before calling this func + if (!context->engine.shouldAcceptRequests) { + context->engine.assemblyCycleComplete(); context->loop.setRemainingIterationsToFailure(); context->callOriginalCallback(false, context->loop); return; @@ -513,16 +515,25 @@ public: success)) { // Loop is complete, call oracle function - context->assembleFrameReq3(context); + context->assembleFrameReq3(context, stopListeningOnEventFd); } } - void assembleFrameReq3(std::shared_ptr context) + void assembleFrameReq3( + std::shared_ptr context, + bool& stopListeningOnEventFd + ) { + /** EXPLANATION: + * All branch paths that invoke this unifyig oracle function are + * expected to already hold the shouldAcceptRequestsLock before calling + * it. + */ // Ensure we only execute once using atomic exchange if (context->handlerExecuted.exchange(true)) { return; } // Cancel the timer, stop the engine and process frame, if any. - context->engine.stop(false); + context->engine.assemblyCycleComplete(); + stopListeningOnEventFd = true; /** EXPLANATION: * Timeout doesn't necessarily mean error. @@ -600,10 +611,13 @@ public: void IoUringAssemblyEngine::assembleFrameReq( Callback cb) { - if (!frameAssemblyDesc) { - throw std::runtime_error(std::string(__func__) - + ": frameAssemblyDesc is null"); + SpinLock::Guard lock(shouldAcceptRequestsLock); + if (!shouldAcceptRequests) + { + cb.callbackFn(false, AsynchronousLoop(0)); + return; + } } const auto& caller = smoHooksPtr->ComponentThread_getSelf(); @@ -630,8 +644,20 @@ void IoUringAssemblyEngine::onEventfdRead( * IoUringAssemblyEngine's per-assembly state isn't destroyed while this * handler is running. */ - SpinLock::Guard lock(isAssemblingLock); - if (!isAssembling) { return; } + SpinLock::Guard lock(shouldAcceptRequestsLock); + /** EXPLANATION: + * You'd think we should put check for shouldAcceptRequests here and + * `return` here if !shouldAcceptRequests, but we shouldn't because + * that would mean that we can't invoke the caller's callback. This would + * make the caller freeze forever. + * + * Instead we just let the onCqeReadyCallback check for + * shouldAcceptRequests. That way the onCqeReadyCallback can actually + * invoke the caller's callback, as it should. We have no knowledge of the + * caller's callback because we don't have access to the caller's + * continuation object. The onCqeReadyCallback does have access to it, + * so we leave that up to it. + */ /** FIXME: * It may be necessary to specifically check for and handle the cancel op @@ -639,6 +665,7 @@ void IoUringAssemblyEngine::onEventfdRead( */ // Process all available CQEs and call callback for each one + bool stopListeningOnEventFd = false; struct io_uring_cqe *cqe; while (io_uring_peek_cqe(&ring, &cqe) == 0) { @@ -662,11 +689,15 @@ void IoUringAssemblyEngine::onEventfdRead( * because of this. */ if (onCqeReadyCallback) { - onCqeReadyCallback(user_data, cqe_result); + onCqeReadyCallback(user_data, cqe_result, stopListeningOnEventFd); } } - if (!isAssembling) { return; } + /** EXPLANATION: + * But we do put a `return` here because we know that at this point, the + * caller's callback has already been invoked. + */ + if (!shouldAcceptRequests || stopListeningOnEventFd) { return; } // Re-arm the eventfd read for next CQE notification eventfdDesc->async_read_some( diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h index aa1ea4c..4c3ddf0 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h @@ -47,9 +47,10 @@ public: { return nSucceeded != 0 && nTotal != 0 && nSucceeded != nTotal; } private: - typedef std::function resetAndAssembleFrameCbFn; + typedef std::function resetAndAssembleFrameCbFn; void resetAndAssembleFrame(resetAndAssembleFrameCbFn onCqeReady); - void stop(bool doAcquireLock = true); + void assemblyCycleComplete(); + bool stop(); private: PcloudStimulusBuffer& parent; @@ -59,7 +60,6 @@ private: // io_uring infrastructure struct io_uring ring; - bool isSetup; // Eventfd for CQE notifications (used with boost's unified loop) int eventfdFd; @@ -72,12 +72,13 @@ private: boost::asio::deadline_timer stallTimer; // Callback for CQE ntfns (called with user_data+result from each CQE) resetAndAssembleFrameCbFn onCqeReadyCallback; - // Flag to indicate assembly is in progress (cleared by stop()) - // Protected by isAssemblingLock - SpinLock isAssemblingLock; - bool isAssembling; + /** EXPLANATION: + * Flag to indicate whether engine should accept new requests. + * Set by setup(), cleared by stop(). + */ + SpinLock shouldAcceptRequestsLock; + bool shouldAcceptRequests; - // Number of datagrams per staging buffer frame size_t nDgramsPerStagingBufferFrame; struct SlotAssemblyDesc