From bb59f47549cbc2712244fc3848e4ec6b09e5d759 Mon Sep 17 00:00:00 2001 From: Hayodea Hekol Date: Thu, 6 Nov 2025 00:00:23 -0400 Subject: [PATCH] IoUringAssmEngn: add assembleFrameReq Invoke it instimFrameProductionTimesliceInd. Also, we discovered: * stream_descriptor::release() doesn't fully release all metadata from the fd it was assigned. This suggests that we should go through the codebase and do: release()=>reset() whenever we wish to release(). * We've confirmed that spinlocks can be used to prevent race conditions between stop() and handler methods. --- .../livoxGen1/ioUringAssemblyEngine.cpp | 237 +++++++++++++++++- .../livoxGen1/ioUringAssemblyEngine.h | 25 +- stimBuffApis/livoxGen1/livoxGen1.h | 2 + .../livoxGen1/pcloudStimulusBuffer.cpp | 10 + 4 files changed, 258 insertions(+), 16 deletions(-) diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp index 0dacf6e..df5fade 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp @@ -1,3 +1,5 @@ +#include +#include #include #include #include @@ -11,6 +13,11 @@ #include #include #include +#include +#include +#include +#include +#include #include "ioUringAssemblyEngine.h" #include "pcloudStimulusBuffer.h" #include "livoxGen1.h" @@ -41,7 +48,8 @@ IoUringAssemblyEngine::IoUringAssemblyEngine(PcloudStimulusBuffer& parent_) frameAssemblyDesc(nullptr), ring{}, isSetup(false), eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0), -stallTimer(parent_.device->componentThread->getIoService()) +stallTimer(parent_.device->componentThread->getIoService()), +isAssembling(false) {} bool IoUringAssemblyEngine::setup() @@ -139,7 +147,7 @@ void IoUringAssemblyEngine::finalize() } void IoUringAssemblyEngine::resetAndAssembleFrame( - std::function onCqeReady) + resetAndAssembleFrameCbFn onCqeReady) { if (!onCqeReady) { @@ -236,6 +244,8 @@ void IoUringAssemblyEngine::resetAndAssembleFrame( + " (errno=" + std::to_string(errno) + ")"); } + // Set assembly flag + isAssembling = true; // Start listening for CQE notifications on eventfd eventfdDesc->async_read_some( boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)), @@ -245,8 +255,35 @@ void IoUringAssemblyEngine::resetAndAssembleFrame( std::placeholders::_2)); } -void IoUringAssemblyEngine::stop() +void IoUringAssemblyEngine::stop(bool doAcquireLock) { + // Clear assembly flag first to signal onEventfdRead to stop re-arming + // Acquire and release lock tightly around setting the flag + if (doAcquireLock) + { + SpinLock::Guard lock(isAssemblingLock); + isAssembling = false; + } else { + isAssembling = false; + } + + /** FIXME: + * There's a problem with this bridge here. + * + * We can't delay during every call to stop because under normal operating + * conditions, this whole assembly process should be able to move as fast + * as possible and to receive as much data as possible without maximum + * throughput. + * + * Yet we need to delay briefly here to ensure that the onEventfdRead loop + * has a chance to see the flag and halt. + * + * We need to analyze this carefully and figure out what the correct + * conditions are for being certain that we aren't destroying state while + * the eventfdRead loop is still running; and we need to figure out how to + * ensure that we only delay when absolutely necessary. + */ + // Cancel in-flight stall timeout timer stallTimer.cancel(); @@ -290,7 +327,7 @@ void IoUringAssemblyEngine::stop() } } - if (!sawCancelCqe) { + if (!sawCancelCqe && OptionParser::getOptions().verbose) { std::cerr << __func__ << ": no CQE seen for cancel operation\n"; } } @@ -307,12 +344,178 @@ cleanup_eventfd: * itself in the next resetAndAssembleFrame() cycle, so we call * release() instead of reset() to ensure that the underlying fd * is not closed. + * + * However, we need to close the descriptor's association with the + * io_service before releasing it, otherwise Boost.Asio will complain + * when we try to create a new descriptor with the same fd. */ eventfdDesc->cancel(); - eventfdDesc.release(); + eventfdDesc->release(); + /* Destroy the descriptor object (now that it's unregistered, destroying + * it won't close the fd since release() transferred ownership back) + */ + eventfdDesc.reset(); + } +} + +// Continuation class for assembleFrameReq +class IoUringAssemblyEngine::AssembleFrameReq +: public PostedAsynchronousContinuation< + IoUringAssemblyEngine::assembleFrameReqCbFn> +{ +public: + AssembleFrameReq( + IoUringAssemblyEngine& engine_, + const std::shared_ptr& caller, + Callback cb) + : PostedAsynchronousContinuation< + IoUringAssemblyEngine::assembleFrameReqCbFn>(caller, cb), + engine(engine_), + loop(engine_.frameAssemblyDesc->numSlots), + timerFired(false), handlerExecuted(false) + {} + +public: + void assembleFrameReq1_posted( + std::shared_ptr context) + { + if (!engine.frameAssemblyDesc) + { + throw std::runtime_error(std::string(__func__) + + ": frameAssemblyDesc is null"); + } + + // Initialize loop with number of slots + context->loop = AsynchronousLoop(engine.frameAssemblyDesc->numSlots); + + /** FIXME: + * I'm suspicious of this std::bind return object here. What if us + * setting it to null inside of stop() doesn't actually cause the + * object to be destroyed? This would cause this contin's sh_ptr's + * reference count to never reach 0, causing a memory leak. + */ + engine.resetAndAssembleFrame( + std::bind(&AssembleFrameReq::assembleFrameReq2_2, + context.get(), context, + std::placeholders::_1, std::placeholders::_2)); + + // Set up timeout timer for CONFIG_STIMBUFF_FRAME_PERIOD_MS/2 ms + engine.stallTimer.expires_from_now( + boost::posix_time::milliseconds( + CONFIG_STIMBUFF_FRAME_PERIOD_MS / 2)); + engine.stallTimer.async_wait( + std::bind(&AssembleFrameReq::assembleFrameReq2_1, + context.get(), context, + std::placeholders::_1)); } - onCqeReadyCallback = nullptr; + void assembleFrameReq2_1( + std::shared_ptr context, + const boost::system::error_code& error) + { + // Check if timer was cancelled (ignore if operation_aborted) + if (error == boost::asio::error::operation_aborted) { return; } + + // Set timer fired flag + context->timerFired.store(true); + context->assembleFrameReq3(context); + } + + void assembleFrameReq2_2( + std::shared_ptr context, + void *user_data, int cqe_result) + { + (void)user_data; // Not used - we just track success/failure counts + + // Caller decides success: result >= 0 means success + bool success = (cqe_result >= 0); + if (context->loop.incrementSuccessOrFailureAndTestForCompletionDueTo( + success)) + { + // Loop is complete, call oracle function + context->assembleFrameReq3(context); + } + } + + void assembleFrameReq3(std::shared_ptr context) + { + // Ensure we only execute once using atomic exchange + if (context->handlerExecuted.exchange(true)) { return; } + // Cancel the timer, stop the engine and process frame, if any. + context->engine.stop(false); + + /** EXPLANATION: + * Timeout doesn't necessarily mean error. + * + * If we received zero dgrams from the device, that is indeed an error. + * But if we received some dgrams, but not all, that is not an error: + * it just means we didn't receive as much data as we would have liked. + */ + + // Error: no slots succeeded - no data received successfully. + if (context->loop.nSucceeded.load() == 0) + { + context->callOriginalCb(false, context->loop); + return; + } + + if (context->loop.nSucceeded.load() >= context->loop.nTotal) + { + // Success: all or more slots succeeded + if (context->loop.nSucceeded.load() > context->loop.nTotal) + { + std::cerr << __func__ << ": nSucceeded > nTotal: succ (" + << context->loop.nSucceeded.load() + << ") > nTotal (" << context->loop.nTotal << ")\n"; + } + + context->callOriginalCb(true, context->loop); + return; + } + + if (context->loop.nSucceeded.load() < context->loop.nTotal) + { + // Success: some slots succeeded (less than total) + // Note: dummy fill for un-assembled slots will be implemented later + context->callOriginalCb(true, context->loop); + return; + } + + if (OptionParser::getOptions().verbose) + { + std::cerr << __func__ << ": Invalid state: nSucceeded (" + << context->loop.nSucceeded.load() + << ") < nTotal (" << context->loop.nTotal << ")" << std::endl; + } + + context->callOriginalCb(false, context->loop); + return; + } + +public: + IoUringAssemblyEngine& engine; + AsynchronousLoop loop; + std::atomic timerFired; + std::atomic handlerExecuted; +}; + +void IoUringAssemblyEngine::assembleFrameReq( + Callback cb) +{ + if (!frameAssemblyDesc) + { + throw std::runtime_error(std::string(__func__) + + ": frameAssemblyDesc is null"); + } + + const auto& caller = smoHooksPtr->ComponentThread_getSelf(); + auto request = std::make_shared( + *this, caller, std::move(cb)); + + parent.device->componentThread->getIoService().post( + std::bind( + &AssembleFrameReq::assembleFrameReq1_posted, + request.get(), request)); } void IoUringAssemblyEngine::onEventfdRead( @@ -322,8 +525,15 @@ void IoUringAssemblyEngine::onEventfdRead( (void)bytes_transferred; // Ignore cancellation errors - if (error == boost::asio::error::operation_aborted) - { return; } + if (error == boost::asio::error::operation_aborted) { return; } + + /** EXPLANATION: + * This lock should be held throughout this method to ensure that the + * IoUringAssemblyEngine's per-assembly state isn't destroyed while this + * handler is running. + */ + SpinLock::Guard lock(isAssemblingLock); + if (!isAssembling) { return; } /** FIXME: * It may be necessary to specifically check for and handle the cancel op @@ -336,11 +546,14 @@ void IoUringAssemblyEngine::onEventfdRead( { // Get user_data from the CQE void* user_data = io_uring_cqe_get_data(cqe); + // Get result from the CQE + int cqe_result = cqe->res; // Mark the CQE as seen io_uring_cqe_seen(&ring, cqe); /** EXPLANATION: - * Call the user-provided callback for this CQE with its user_data. + * Call the user-provided callback for this CQE with its user_data and + * result. * * 1. Notice that we call the caller's cb *after* marking the CQE as * seen. We may later need to change this if the caller needs @@ -351,16 +564,18 @@ void IoUringAssemblyEngine::onEventfdRead( * because of this. */ if (onCqeReadyCallback) { - onCqeReadyCallback(user_data); + onCqeReadyCallback(user_data, cqe_result); } } // Re-arm the eventfd read for next CQE notification + // Only re-arm if assembly is still active (stop() hasn't been called) if (eventfdDesc && eventfdFd >= 0) { eventfdDesc->async_read_some( boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)), - std::bind(&IoUringAssemblyEngine::onEventfdRead, this, + std::bind( + &IoUringAssemblyEngine::onEventfdRead, this, std::placeholders::_1, std::placeholders::_2)); } diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h index 7c7e13e..9481d3b 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h @@ -14,6 +14,10 @@ #include #include #include +#include +#include +#include +#include #include "frameAssemblyDesc.h" namespace smo { @@ -29,9 +33,13 @@ public: bool setup(); void finalize(); - void resetAndAssembleFrame( - std::function onCqeReady); - void stop(); + + typedef std::function resetAndAssembleFrameCbFn; + void resetAndAssembleFrame(resetAndAssembleFrameCbFn onCqeReady); + void stop(bool doAcquireLock = true); + + typedef std::function assembleFrameReqCbFn; + void assembleFrameReq(Callback cb); // Telemetry helpers static size_t computePointsPerDgram(int returnMode); @@ -57,12 +65,19 @@ private: // Stall detection timer boost::asio::deadline_timer stallTimer; - // Callback for CQE notifications (called with user_data from each CQE) - std::function onCqeReadyCallback; + // Callback for CQE ntfns (called with user_data+result from each CQE) + resetAndAssembleFrameCbFn onCqeReadyCallback; + // Flag to indicate assembly is in progress (cleared by stop()) + // Protected by isAssemblingLock + SpinLock isAssemblingLock; + bool isAssembling; void cancelIncompleteAndFillDummies(); void onEventfdRead( const boost::system::error_code& error, std::size_t bytes_transferred); + + class AssembleFrameReq; + friend class AssembleFrameReq; }; } // namespace stim_buff diff --git a/stimBuffApis/livoxGen1/livoxGen1.h b/stimBuffApis/livoxGen1/livoxGen1.h index 1ad9176..b08b395 100644 --- a/stimBuffApis/livoxGen1/livoxGen1.h +++ b/stimBuffApis/livoxGen1/livoxGen1.h @@ -30,6 +30,8 @@ struct LivoxProto1DllState extern LivoxProto1DllState livoxProto1; +extern const SmoCallbacks* smoHooksPtr; + } // namespace stim_buff } // namespace smo diff --git a/stimBuffApis/livoxGen1/pcloudStimulusBuffer.cpp b/stimBuffApis/livoxGen1/pcloudStimulusBuffer.cpp index f23ac79..0d5bd9a 100644 --- a/stimBuffApis/livoxGen1/pcloudStimulusBuffer.cpp +++ b/stimBuffApis/livoxGen1/pcloudStimulusBuffer.cpp @@ -68,6 +68,16 @@ void PcloudStimulusBuffer::stop() void PcloudStimulusBuffer::stimFrameProductionTimesliceInd() { + ioUringAssemblyEngine.assembleFrameReq( + {nullptr, [this](bool success, AsynchronousLoop loop) { + if (!success) { + std::cerr << __func__ << ": Failed to assemble frame" << std::endl; + } else { + std::cout << __func__ << ": Successfully assembled frame " + << loop.nSucceeded.load() << " slots succeeded " + << "out of " << loop.nTotal << " total slots" << std::endl; + } + }}); // Release the spinlock for now frameAssemblyRateLimiter.release(); }