From 322a8137b23930ce62cf0bd5fbb687db706dd2c5 Mon Sep 17 00:00:00 2001 From: Hayodea Hekol Date: Sat, 30 May 2026 07:18:29 -0400 Subject: [PATCH] Revert "LivoxGen1: Use syncCancelerForAsyncWork in producer pipeline" This reverts commit d788810a05d16b8010d2253fe0e50eac1dfa25ca. We're doing this because it's not necessary. We will be porting to coros soon and we can just use brace-scopes. --- .../attachmentSupport/stimulusProducer.cpp | 66 +-- include/user/stimulusProducer.h | 9 +- .../livoxGen1/ioUringAssemblyEngine.cpp | 111 ++--- .../livoxGen1/ioUringAssemblyEngine.h | 9 +- .../openClCollatingAndMeshingEngine.cpp | 290 ++++++----- .../openClCollatingAndMeshingEngine.h | 5 +- .../livoxGen1/pcloudStimulusProducer.cpp | 461 ++++++++---------- 7 files changed, 443 insertions(+), 508 deletions(-) diff --git a/commonLibs/attachmentSupport/stimulusProducer.cpp b/commonLibs/attachmentSupport/stimulusProducer.cpp index cc13b21..a33d053 100644 --- a/commonLibs/attachmentSupport/stimulusProducer.cpp +++ b/commonLibs/attachmentSupport/stimulusProducer.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -90,7 +91,10 @@ void StimulusProducer::destroyAttachedStimulusBuffer( void StimulusProducer::stop() { - (void)stimulusProducerCanceler.requestStop(); + { + sscl::SpinLock::Guard lock(shouldContinueLock); + shouldContinue = false; + } // Cancel timer immediately timer.cancel(); @@ -101,7 +105,7 @@ void StimulusProducer::stop() void StimulusProducer::scheduleNextTimeout(int delayMs) { - if (stimulusProducerCanceler.isCancellationRequestedUnlocked()) + if (!shouldContinue) { return; } // Schedule the next timeout using the provided delay @@ -127,6 +131,10 @@ void StimulusProducer::onTimeout(const boost::system::error_code& error) return; } + sscl::SpinLock::Guard lock(shouldContinueLock); + if (!shouldContinue) + { return; } + /** EXPLANATION: * We need to ensure that there's only ever one stimframe being produced * during any CONFIG_STIMBUFF_FRAME_PERIOD_MS period. To guarantee this, we @@ -140,39 +148,35 @@ void StimulusProducer::onTimeout(const boost::system::error_code& error) */ int nextWakeupDelayMs; bool deferred = false; - bool shouldContinue; - shouldContinue = stimulusProducerCanceler.execUncancelableSegmentOrAbort( - [&]() + if (frameAssemblyRateLimiter.tryAcquire()) { - if (frameAssemblyRateLimiter.tryAcquire()) + nextWakeupDelayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS; + + // Check if we're ending a deferral period + if (nDeferrals > 0) { - nextWakeupDelayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS; + auto deferralEndTime = std::chrono::high_resolution_clock::now(); + auto duration = deferralEndTime - deferralStartTime; + auto durationMs = std::chrono::duration_cast< + std::chrono::milliseconds>(duration); - // Check if we're ending a deferral period - if (nDeferrals > 0) - { - auto deferralEndTime = std::chrono::high_resolution_clock::now(); - auto duration = deferralEndTime - deferralStartTime; - auto durationMs = std::chrono::duration_cast< - std::chrono::milliseconds>(duration); + std::cout << __func__ << ": Deferral period ended. " + << "Total deferrals: " << nDeferrals + << ", Duration: " << durationMs.count() << "ms" << std::endl; - std::cout << __func__ << ": Deferral period ended. " - << "Total deferrals: " << nDeferrals - << ", Duration: " << durationMs.count() << "ms" << std::endl; - - nDeferrals = 0; - } - - /** EXPLANATION: - * Call the derived class's frame production handler - * Note: The derived class's frame production handler (aka - * its implementation of stimFrameProductionTimesliceInd()) must - * release the lock when frame production completes - */ - stimFrameProductionTimesliceInd(); - return; + nDeferrals = 0; } + /** EXPLANATION: + * Call the derived class's frame production handler + * Note: The derived class's frame production handler (aka + * its implementation of stimFrameProductionTimesliceInd()) must + * release the lock when frame production completes + */ + stimFrameProductionTimesliceInd(); + } + else + { nextWakeupDelayMs = CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS; deferred = true; @@ -185,9 +189,7 @@ void StimulusProducer::onTimeout(const boost::system::error_code& error) "Configured deferral period: " << nextWakeupDelayMs << "ms" << std::endl; } - }); - - if (!shouldContinue) { return; } + } scheduleNextTimeout(nextWakeupDelayMs); diff --git a/include/user/stimulusProducer.h b/include/user/stimulusProducer.h index df7d373..5d868a4 100644 --- a/include/user/stimulusProducer.h +++ b/include/user/stimulusProducer.h @@ -14,7 +14,6 @@ #include #include #include -#include #include "deviceAttachmentSpec.h" namespace smo { @@ -41,7 +40,8 @@ public: &deviceAttachmentSpec, boost::asio::io_service& ioService_) : deviceAttachmentSpec(deviceAttachmentSpec), - ioService(ioService_), timer(ioService), + ioService(ioService_), + shouldContinue(false), timer(ioService), nDeferrals(0) {} @@ -59,7 +59,7 @@ public: std::cout << __func__ << ": Starting stimulus producer for device " << deviceAttachmentSpec->deviceSelector << std::endl; - stimulusProducerCanceler.startAcceptingWork(); + shouldContinue = true; nDeferrals = 0; scheduleNextTimeout(); } @@ -109,7 +109,8 @@ public: private: boost::asio::io_service& ioService; protected: - sscl::SyncCancelerForAsyncWork stimulusProducerCanceler; + sscl::SpinLock shouldContinueLock; + bool shouldContinue; private: boost::asio::deadline_timer timer; size_t nDeferrals; diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp index 52848c7..f9d73f4 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include "ioUringAssemblyEngine.h" #include "pcloudStimulusProducer.h" #include "livoxGen1.h" @@ -58,6 +59,7 @@ IoUringAssemblyEngine::IoUringAssemblyEngine( frameAssemblyDesc(nullptr), ring{}, eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0), stallTimer(parent_.device->componentThread->getIoService()), +shouldAcceptRequests(false), nDgramsPerStagingBufferFrame(nDgramsPerStagingBufferFrame_), assembledSlotsTracker(nDgramsPerStagingBufferFrame_), randomDevice(), randomGenerator(randomDevice()) @@ -66,10 +68,13 @@ randomDevice(), randomGenerator(randomDevice()) bool IoUringAssemblyEngine::setup() { // Defensive check to prevent double-calling - if (!ioUringAssemblyEngnCanceler.isCancellationRequested()) { - throw std::runtime_error(std::string(__func__) + ": setup() called " - "while already set up"); + sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); + if (shouldAcceptRequests) + { + throw std::runtime_error(std::string(__func__) + ": setup() called " + "while already set up"); + } } // Get FrameAssemblyDesc from staging buffer @@ -151,7 +156,7 @@ bool IoUringAssemblyEngine::setup() if (ret < 0) { goto cleanup_eventfd; } - ioUringAssemblyEngnCanceler.startAcceptingWork(); + shouldAcceptRequests = true; return true; cleanup_eventfd: @@ -224,7 +229,7 @@ void IoUringAssemblyEngine::resetAndAssembleFrame( + ": onCqeReady callback is invalid"); } - if (ioUringAssemblyEngnCanceler.isCancellationRequestedUnlocked()) + if (!shouldAcceptRequests) { throw std::runtime_error(std::string(__func__) + ": engine is not accepting requests"); @@ -316,7 +321,11 @@ void IoUringAssemblyEngine::resetAndAssembleFrame( bool IoUringAssemblyEngine::stop() { - return ioUringAssemblyEngnCanceler.requestStop(); + // Acquire and release lock tightly around setting the flag + sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); + bool wasAcceptingRequests = shouldAcceptRequests; + shouldAcceptRequests = false; + return wasAcceptingRequests; } void IoUringAssemblyEngine::assemblyCycleComplete() @@ -435,44 +444,39 @@ public: void assembleFrameReq1_posted( std::shared_ptr context) { - auto& canceler = engine.ioUringAssemblyEngnCanceler; - const bool started = canceler.execUncancelableSegmentOrAbort( - [context, this]() - { - // Initialize loop with number of slots - context->loop = sscl::AsynchronousLoop( - engine.frameAssemblyDesc->numSlots); + sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock); - // Record assembly start time - engine.assemblyStartTime = - std::chrono::high_resolution_clock::now(); - - /** FIXME: - * I'm suspicious of this std::bind return object here. What if us - * setting it to null inside of stop() doesn't actually cause the - * object to be destroyed? This would cause this contin's sh_ptr's - * reference count to never reach 0, causing a memory leak. - */ - engine.resetAndAssembleFrame( - std::bind(&AssembleFrameReq::assembleFrameReq2_2, - context.get(), context, - std::placeholders::_1, std::placeholders::_2)); - - // Set up timeout timer for IOURINGASSM_ENGN_FRAME_ASSEM_TIMEOUT_MS ms - engine.stallTimer.expires_from_now( - boost::posix_time::milliseconds( - IOURINGASSM_ENGN_FRAME_ASSEM_TIMEOUT_MS)); - engine.stallTimer.async_wait( - std::bind(&AssembleFrameReq::assembleFrameReq2_1, - context.get(), context, - std::placeholders::_1)); - }); - - if (!started) + if (!engine.shouldAcceptRequests) { context->callOriginalCallback(false, sscl::AsynchronousLoop(0)); return; } + + // Initialize loop with number of slots + context->loop = sscl::AsynchronousLoop(engine.frameAssemblyDesc->numSlots); + + // Record assembly start time + engine.assemblyStartTime = std::chrono::high_resolution_clock::now(); + + /** FIXME: + * I'm suspicious of this std::bind return object here. What if us + * setting it to null inside of stop() doesn't actually cause the + * object to be destroyed? This would cause this contin's sh_ptr's + * reference count to never reach 0, causing a memory leak. + */ + engine.resetAndAssembleFrame( + std::bind(&AssembleFrameReq::assembleFrameReq2_2, + context.get(), context, + std::placeholders::_1, std::placeholders::_2)); + + // Set up timeout timer for IOURINGASSM_ENGN_FRAME_ASSEM_TIMEOUT_MS ms + engine.stallTimer.expires_from_now( + boost::posix_time::milliseconds( + IOURINGASSM_ENGN_FRAME_ASSEM_TIMEOUT_MS)); + engine.stallTimer.async_wait( + std::bind(&AssembleFrameReq::assembleFrameReq2_1, + context.get(), context, + std::placeholders::_1)); } void assembleFrameReq2_1( @@ -494,22 +498,19 @@ public: * indeed seen a SEGFAULT even in the current code with locking, so * I'm going to hold the lock here for now. */ - auto& canceler = context->engine.ioUringAssemblyEngnCanceler; - const bool shouldContinue = canceler.execUncancelableSegmentOrAbort( - [context]() - { - // Set timer fired flag - context->timerFired.store(true); - context->assembleFrameReq3(context); - }); + sscl::SpinLock::Guard lock(context->engine.shouldAcceptRequestsLock); - if (!shouldContinue) + if (!context->engine.shouldAcceptRequests) { context->engine.assemblyCycleComplete(); context->loop.setRemainingIterationsToFailure(); context->callOriginalCallback(false, context->loop); return; } + + // Set timer fired flag + context->timerFired.store(true); + context->assembleFrameReq3(context); } void assembleFrameReq2_2( @@ -517,8 +518,7 @@ public: void *user_data, int cqe_result) { // NB: The lock was acquired by onEventFdRead before calling this func - if (context->engine.ioUringAssemblyEngnCanceler - .isCancellationRequestedUnlocked()) + if (!context->engine.shouldAcceptRequests) { context->engine.assemblyCycleComplete(); context->loop.setRemainingIterationsToFailure(); @@ -549,8 +549,8 @@ public: { /** EXPLANATION: * All branch paths that invoke this unifyig oracle function are - * expected to already hold ioUringAssemblyEngnCanceler's lock before - * calling it. + * expected to already hold the shouldAcceptRequestsLock before calling + * it. */ // Ensure we only execute once using atomic exchange if (context->handlerExecuted.exchange(true)) { return; } @@ -638,7 +638,8 @@ void IoUringAssemblyEngine::assembleFrameReq( sscl::cps::Callback cb) { { - if (ioUringAssemblyEngnCanceler.isCancellationRequested()) + sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); + if (!shouldAcceptRequests) { cb.callbackFn(false, sscl::AsynchronousLoop(0)); return; @@ -669,7 +670,7 @@ void IoUringAssemblyEngine::onEventfdRead( * IoUringAssemblyEngine's per-assembly state isn't destroyed while this * handler is running. */ - sscl::SpinLock::Guard lock(ioUringAssemblyEngnCanceler.s.lock); + sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); /** EXPLANATION: * You'd think we should put check for shouldAcceptRequests here and * `return` here if !shouldAcceptRequests, but we shouldn't because @@ -721,7 +722,7 @@ void IoUringAssemblyEngine::onEventfdRead( * But we do put a `return` here because we know that at this point, the * caller's callback has already been invoked. */ - if (ioUringAssemblyEngnCanceler.isCancellationRequestedUnlocked() + if (!shouldAcceptRequests || eventfdDesc == nullptr || !eventfdDesc->is_open()) { return; diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h index 489997a..82ec0ac 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h @@ -19,7 +19,7 @@ #include #include #include -#include +#include #include #include @@ -80,7 +80,12 @@ private: boost::asio::deadline_timer stallTimer; // Callback for CQE ntfns (called with user_data+result from each CQE) resetAndAssembleFrameCbFn onCqeReadyCallback; - sscl::SyncCancelerForAsyncWork ioUringAssemblyEngnCanceler; + /** EXPLANATION: + * Flag to indicate whether engine should accept new requests. + * Set by setup(), cleared by stop(). + */ + sscl::SpinLock shouldAcceptRequestsLock; + bool shouldAcceptRequests; size_t nDgramsPerStagingBufferFrame; diff --git a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp index b8ec2d1..c0661fa 100644 --- a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp +++ b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp @@ -39,6 +39,7 @@ clAverageIntensityBufferClBuffer(nullptr), clAssemblyBuffer(nullptr), clCollationBuffer(nullptr), clAverageIntensityBuffer(nullptr), +shouldAcceptRequests(false), compactIsRunning(false), collateIsRunning(false), currentCompactKernelEvent(nullptr), currentCollateKernelEvent(nullptr), @@ -63,10 +64,13 @@ OpenClCollatingAndMeshingEngine::~OpenClCollatingAndMeshingEngine() bool OpenClCollatingAndMeshingEngine::setup() { // Defensive check to prevent double-calling - if (!openClCollMeshEngnCanceler.isCancellationRequested()) { - throw std::runtime_error(std::string(__func__) + ": setup() called " - "while already set up"); + sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); + if (shouldAcceptRequests) + { + throw std::runtime_error(std::string(__func__) + ": setup() called " + "while already set up"); + } } if (!smoHooksPtr || !smoHooksPtr->ComputeManager_getDevice) @@ -198,7 +202,7 @@ bool OpenClCollatingAndMeshingEngine::setup() clFlush(computeDevice->commandQueue); clFinish(computeDevice->commandQueue); - openClCollMeshEngnCanceler.startAcceptingWork(); + shouldAcceptRequests = true; return true; } @@ -767,7 +771,11 @@ bool OpenClCollatingAndMeshingEngine::setupCollateDgramsArgs( bool OpenClCollatingAndMeshingEngine::stop() { - return openClCollMeshEngnCanceler.requestStop(); + // Acquire and release lock tightly around setting the flag + sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); + bool wasAcceptingRequests = shouldAcceptRequests; + shouldAcceptRequests = false; + return wasAcceptingRequests; } void OpenClCollatingAndMeshingEngine::compactKernelComplete(bool isFinalizing) @@ -1043,33 +1051,28 @@ public: void compactCollateAndMeshFrameReq1_doCompact_posted( std::shared_ptr context) { - auto& canceler = engine.openClCollMeshEngnCanceler; - const bool shouldContinue = canceler.execUncancelableSegmentOrAbort( - [context, this]() + sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock); + if (!engine.shouldAcceptRequests) { - // Record compact kernel start time - engine.compactKernelStartTime = - std::chrono::high_resolution_clock::now(); + callOriginalCallback(false); + return; + } - bool success = engine.startCompactKernel( - engine.parent.assemblyBuffer, - static_cast( - context->frameAssemblyResult.nSucceeded.load()), - std::bind( - &CompactCollateAndMeshFrameReq - ::compactCollateAndMeshFrameReq2_compactDone_posted, - context.get(), context, - std::placeholders::_1)); + // Record compact kernel start time + engine.compactKernelStartTime = std::chrono::high_resolution_clock::now(); - if (!success) - { - engine.compactKernelComplete(); - callOriginalCallback(false); - } - }); + bool success = engine.startCompactKernel( + engine.parent.assemblyBuffer, + static_cast(context->frameAssemblyResult.nSucceeded.load()), + std::bind( + &CompactCollateAndMeshFrameReq + ::compactCollateAndMeshFrameReq2_compactDone_posted, + context.get(), context, + std::placeholders::_1)); - if (!shouldContinue) + if (!success) { + engine.compactKernelComplete(); callOriginalCallback(false); return; } @@ -1079,27 +1082,8 @@ public: std::shared_ptr context, cl_int compactStatus) { - bool compactFailed = false; - - auto& canceler = engine.openClCollMeshEngnCanceler; - const bool shouldContinue = canceler.execUncancelableSegmentOrAbort( - [context, this, compactStatus, &compactFailed]() - { - engine.compactKernelComplete(); - // Record compact kernel end time - engine.compactKernelEndTime = - std::chrono::high_resolution_clock::now(); - - // If compact failed, call callback directly with failure - if (compactStatus != CL_SUCCESS) - { - compactFailed = true; - callOriginalCallback(false); - return; - } - }); - - if (!shouldContinue) + sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock); + if (!engine.shouldAcceptRequests) { /** EXPLANATION: * We intentionally don't call compactKernelComplete() here because @@ -1111,7 +1095,16 @@ public: return; } - if (compactFailed) { return; } + engine.compactKernelComplete(); + // Record compact kernel end time + engine.compactKernelEndTime = std::chrono::high_resolution_clock::now(); + + // If compact failed, call callback directly with failure + if (compactStatus != CL_SUCCESS) + { + callOriginalCallback(false); + return; + } #if 0 // Print first 4 bytes of each slot @@ -1123,39 +1116,36 @@ public: } #endif + lock.unlockPrematurely(); context->compactCollateAndMeshFrameReq3_doCollate_posted(context); } void compactCollateAndMeshFrameReq3_doCollate_posted( std::shared_ptr context) { - auto& canceler = engine.openClCollMeshEngnCanceler; - const bool shouldContinue = canceler.execUncancelableSegmentOrAbort( - [context, this]() + sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock); + if (!engine.shouldAcceptRequests) { - // Record collate kernel start time - engine.collateKernelStartTime = - std::chrono::high_resolution_clock::now(); + callOriginalCallback(false); + return; + } - bool success = engine.startCollateKernel( - context->intensityStimFrame, context->anyAmbienceAttached(), - std::bind( - &CompactCollateAndMeshFrameReq - ::compactCollateAndMeshFrameReq4_collateDone_maybePosted, - context.get(), context, - std::placeholders::_1)); + // Record collate kernel start time + engine.collateKernelStartTime = std::chrono::high_resolution_clock::now(); - if (!success) - { - engine.collateKernelComplete( - context->intensityStimFrame, context->anyAmbienceAttached()); + bool success = engine.startCollateKernel( + context->intensityStimFrame, context->anyAmbienceAttached(), + std::bind( + &CompactCollateAndMeshFrameReq + ::compactCollateAndMeshFrameReq4_collateDone_maybePosted, + context.get(), context, + std::placeholders::_1)); - callOriginalCallback(false); - } - }); - - if (!shouldContinue) + if (!success) { + engine.collateKernelComplete( + context->intensityStimFrame, context->anyAmbienceAttached()); + callOriginalCallback(false); return; } @@ -1165,6 +1155,16 @@ public: [[maybe_unused]] std::shared_ptr context, cl_int collateStatus) { + sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock); + if (!engine.shouldAcceptRequests) + { + /* We intentionally don't call collateKernelComplete() here for the + * same reason as above. + */ + callOriginalCallback(false); + return; + } + /** EXPLANATION: * The reason we don't call collateKernelComplete before checking * shouldAcceptRequests is because if shouldAcceptRequests is false, then @@ -1174,92 +1174,77 @@ public: * Therefore it's finalize()'s responsibility to ensure that it properly * completes/cleans up any in-flight operations. */ - auto& canceler = engine.openClCollMeshEngnCanceler; - const bool shouldContinue = canceler.execUncancelableSegmentOrAbort( - [context, this, collateStatus]() + engine.collateKernelComplete( + context->intensityStimFrame, context->anyAmbienceAttached()); + + // Produce each attached ambience stimbuff's passband count from + // the per-slot averages the collate kernel staged. + uint32_t nSucceededForAmbience = + context->frameAssemblyResult.nSucceeded.load(); + + if (context->lightAmbienceProductionDesc.has_value()) { - engine.collateKernelComplete( - context->intensityStimFrame, context->anyAmbienceAttached()); + engine.produceAmbienceStimulusFrame( + context->lightAmbienceProductionDesc->frame.get(), + context->lightAmbienceProductionDesc->comparator, + nSucceededForAmbience); + } - // Produce each attached ambience stimbuff's passband count from - // the per-slot averages the collate kernel staged. - uint32_t nSucceededForAmbience = - context->frameAssemblyResult.nSucceeded.load(); - - if (context->lightAmbienceProductionDesc.has_value()) - { - engine.produceAmbienceStimulusFrame( - context->lightAmbienceProductionDesc->frame.get(), - context->lightAmbienceProductionDesc->comparator, - nSucceededForAmbience); - } - - if (context->darkAmbienceProductionDesc.has_value()) - { - engine.produceAmbienceStimulusFrame( - context->darkAmbienceProductionDesc->frame.get(), - context->darkAmbienceProductionDesc->comparator, - nSucceededForAmbience); - } - - // Record collate kernel end time - engine.collateKernelEndTime = - std::chrono::high_resolution_clock::now(); - - bool success = (collateStatus == CL_SUCCESS); - - // Early callback + return pattern - if (!success) - { - callOriginalCallback(false); - return; - } - - uint32_t nSucceeded = context->frameAssemblyResult.nSucceeded.load(); - - int returnMode = static_cast(engine.parent.device->currentReturnMode); - size_t pointsPerDgram = livoxProto1::Device::getNPointsPerDgram( - returnMode); - size_t totalPoints = nSucceeded * pointsPerDgram; - - // Count points with intensity greater than 116 - size_t highIntensityCount = 0; - if (context->intensityStimFrame.has_value()) - { - StimulusFrame& intensityFrame = context->intensityStimFrame->get(); - float* intensityFloats = reinterpret_cast(intensityFrame.slotDesc.vaddr); - for (size_t i = 0; i < totalPoints; ++i) - { - float intensity = intensityFloats[i]; - if (intensity >= 116.0f) - { - ++highIntensityCount; - } - } - } - (void)highIntensityCount; - - #if 0 - std::cout << __func__ << ": intensityRingBufferIndex=" - << (context->intensityStimFrame.has_value() ? - context->intensityStimFrame->get().ringBufferIndex : SIZE_MAX) - << ", pointsPerDgram=" << pointsPerDgram - << ", nSucceeded=" << nSucceeded - << ", totalPoints=" << totalPoints - << ", highIntensityCount=" << highIntensityCount << std::endl; - #endif - - callOriginalCallback(success); - }); - - if (!shouldContinue) + if (context->darkAmbienceProductionDesc.has_value()) + { + engine.produceAmbienceStimulusFrame( + context->darkAmbienceProductionDesc->frame.get(), + context->darkAmbienceProductionDesc->comparator, + nSucceededForAmbience); + } + + // Record collate kernel end time + engine.collateKernelEndTime = std::chrono::high_resolution_clock::now(); + + bool success = (collateStatus == CL_SUCCESS); + + // Early callback + return pattern + if (!success) { - /* We intentionally don't call collateKernelComplete() here for the - * same reason as above. - */ callOriginalCallback(false); return; } + + uint32_t nSucceeded = context->frameAssemblyResult.nSucceeded.load(); + + int returnMode = static_cast(engine.parent.device->currentReturnMode); + size_t pointsPerDgram = livoxProto1::Device::getNPointsPerDgram( + returnMode); + size_t totalPoints = nSucceeded * pointsPerDgram; + + // Count points with intensity greater than 116 + size_t highIntensityCount = 0; + if (context->intensityStimFrame.has_value()) + { + StimulusFrame& intensityFrame = context->intensityStimFrame->get(); + float* intensityFloats = reinterpret_cast(intensityFrame.slotDesc.vaddr); + for (size_t i = 0; i < totalPoints; ++i) + { + float intensity = intensityFloats[i]; + if (intensity >= 116.0f) + { + ++highIntensityCount; + } + } + } + (void)highIntensityCount; + +#if 0 + std::cout << __func__ << ": intensityRingBufferIndex=" + << (context->intensityStimFrame.has_value() ? + context->intensityStimFrame->get().ringBufferIndex : SIZE_MAX) + << ", pointsPerDgram=" << pointsPerDgram + << ", nSucceeded=" << nSucceeded + << ", totalPoints=" << totalPoints + << ", highIntensityCount=" << highIntensityCount << std::endl; +#endif + + callOriginalCallback(success); } }; @@ -1271,7 +1256,8 @@ void OpenClCollatingAndMeshingEngine::compactCollateAndMeshFrameReq( sscl::cps::Callback callback) { { - if (openClCollMeshEngnCanceler.isCancellationRequested()) + sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); + if (!shouldAcceptRequests) { callback.callbackFn(false, stimulusFrame); return; diff --git a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h index e65b99c..3ef79c9 100644 --- a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h +++ b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include #include #include @@ -150,7 +150,8 @@ private: cl_mem clAverageIntensityBuffer; // State tracking - sscl::SyncCancelerForAsyncWork openClCollMeshEngnCanceler; + sscl::SpinLock shouldAcceptRequestsLock; + bool shouldAcceptRequests; bool compactIsRunning; bool collateIsRunning; cl_event currentCompactKernelEvent; diff --git a/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp b/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp index 3498396..6d74b04 100644 --- a/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp +++ b/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp @@ -464,137 +464,117 @@ public: void produceFrameReq1_doAssemble_posted( std::shared_ptr context) { - const bool shouldContinue = pcloudProducer.stimulusProducerCanceler - .execUncancelableSegmentOrAbort( - [this, context]() - { - pcloudProducer.ioUringAssemblyEngine.assembleFrameReq( - {context, std::bind( - &ProduceFrameReq::produceFrameReq2_assembleDone, - context.get(), context, - std::placeholders::_1, std::placeholders::_2)}); - }); - - if (!shouldContinue) + sscl::SpinLock::Guard lock(pcloudProducer.shouldContinueLock); + if (!pcloudProducer.shouldContinue) { callOriginalCallback(); return; } + + pcloudProducer.ioUringAssemblyEngine.assembleFrameReq( + {context, std::bind( + &ProduceFrameReq::produceFrameReq2_assembleDone, + context.get(), context, + std::placeholders::_1, std::placeholders::_2)}); } void produceFrameReq2_assembleDone( std::shared_ptr context, bool success, sscl::AsynchronousLoop loop) { - bool shouldContinue = pcloudProducer.stimulusProducerCanceler - .execUncancelableSegmentOrAbort( - [this, context, success, loop]() - { - if (!success) - { - callOriginalCallback(); - - if (pcloudProducer.attachedStimulusBuffers.size() > 0) { - std::cerr << __func__ - << ": Failed to assemble frame.\n"; - } - return; - } - - context->frameAssemblyResult = loop; - - // Check if intensity buffer is attached and acquire frame if so - if (auto intensityBuff = pcloudProducer - .intensityStimulusBuffer.load( - std::memory_order_acquire)) - { - size_t intensityRingbuffIndex = intensityBuff - ->ringBuffer.getIndexToProduceInto(); - - StimulusFrame& intensityStimFrame = intensityBuff - ->ringBuffer.getDataAtSlot( - intensityRingbuffIndex); - - intensityStimFrame.lock.writeAcquire(); - context->intensityStimFrame = std::make_optional( - std::ref(intensityStimFrame)); - } - else { - context->intensityStimFrame = std::nullopt; - } - - // Check if light ambience buffer is attached and acquire frame if so - std::optional - lightAmbienceProductionDescDesc; - if (auto lightAmbienceBuff = - pcloudProducer.lightAmbienceStimulusBuffer.load( - std::memory_order_acquire)) - { - size_t lightAmbienceRingbuffIndex = lightAmbienceBuff - ->ringBuffer.getIndexToProduceInto(); - - StimulusFrame& lightAmbienceStimFrame = - lightAmbienceBuff->ringBuffer.getDataAtSlot( - lightAmbienceRingbuffIndex); - - lightAmbienceStimFrame.lock.writeAcquire(); - context->lightAmbienceStimFrame = std::make_optional( - std::ref(lightAmbienceStimFrame)); - lightAmbienceProductionDescDesc = - AmbienceProductionDesc{ - std::ref(lightAmbienceStimFrame), - lightAmbienceBuff - ->passbandCountGtComparator}; - } - else { - context->lightAmbienceStimFrame = std::nullopt; - } - - // Check if dark ambience buffer is attached and acquire frame if so - std::optional - darkAmbienceProductionDescDesc; - if (auto darkAmbienceBuff = - pcloudProducer.darkAmbienceStimulusBuffer.load( - std::memory_order_acquire)) - { - size_t darkAmbienceRingbuffIndex = darkAmbienceBuff - ->ringBuffer.getIndexToProduceInto(); - - StimulusFrame& darkAmbienceStimFrame = - darkAmbienceBuff->ringBuffer.getDataAtSlot( - darkAmbienceRingbuffIndex); - - darkAmbienceStimFrame.lock.writeAcquire(); - context->darkAmbienceStimFrame = std::make_optional( - std::ref(darkAmbienceStimFrame)); - darkAmbienceProductionDescDesc = - AmbienceProductionDesc{ - std::ref(darkAmbienceStimFrame), - darkAmbienceBuff - ->passbandCountLtComparator}; - } - else { - context->darkAmbienceStimFrame = std::nullopt; - } - - pcloudProducer.openClCollatingAndMeshingEngine - .compactCollateAndMeshFrameReq( - context->frameAssemblyResult, stimulusFrame, - context->intensityStimFrame, - std::move(lightAmbienceProductionDescDesc), - std::move(darkAmbienceProductionDescDesc), - {context, std::bind( - &ProduceFrameReq - ::produceFrameReq3_compactCollateDone, - context.get(), context, - std::placeholders::_1, std::placeholders::_2)}); - }); - - if (!shouldContinue) + sscl::SpinLock::Guard lock(pcloudProducer.shouldContinueLock); + if (!pcloudProducer.shouldContinue) { callOriginalCallback(); return; } + + if (!success) + { + callOriginalCallback(); + + if (pcloudProducer.attachedStimulusBuffers.size() > 0) { + std::cerr << __func__ << ": Failed to assemble frame.\n"; + } + return; + } + + context->frameAssemblyResult = loop; + + // Check if intensity buffer is attached and acquire frame if so + if (auto intensityBuff = pcloudProducer.intensityStimulusBuffer.load( + std::memory_order_acquire)) + { + size_t intensityRingbuffIndex = intensityBuff + ->ringBuffer.getIndexToProduceInto(); + + StimulusFrame& intensityStimFrame = intensityBuff + ->ringBuffer.getDataAtSlot( + intensityRingbuffIndex); + + intensityStimFrame.lock.writeAcquire(); + context->intensityStimFrame = std::make_optional( + std::ref(intensityStimFrame)); + } + else { + context->intensityStimFrame = std::nullopt; + } + + // Check if light ambience buffer is attached and acquire frame if so + std::optional lightAmbienceProductionDescDesc; + if (auto lightAmbienceBuff = + pcloudProducer.lightAmbienceStimulusBuffer.load( + std::memory_order_acquire)) + { + size_t lightAmbienceRingbuffIndex = lightAmbienceBuff + ->ringBuffer.getIndexToProduceInto(); + + StimulusFrame& lightAmbienceStimFrame = lightAmbienceBuff + ->ringBuffer.getDataAtSlot(lightAmbienceRingbuffIndex); + + lightAmbienceStimFrame.lock.writeAcquire(); + context->lightAmbienceStimFrame = std::make_optional( + std::ref(lightAmbienceStimFrame)); + lightAmbienceProductionDescDesc = AmbienceProductionDesc{ + std::ref(lightAmbienceStimFrame), + lightAmbienceBuff->passbandCountGtComparator}; + } + else { + context->lightAmbienceStimFrame = std::nullopt; + } + + // Check if dark ambience buffer is attached and acquire frame if so + std::optional darkAmbienceProductionDescDesc; + if (auto darkAmbienceBuff = + pcloudProducer.darkAmbienceStimulusBuffer.load( + std::memory_order_acquire)) + { + size_t darkAmbienceRingbuffIndex = darkAmbienceBuff + ->ringBuffer.getIndexToProduceInto(); + + StimulusFrame& darkAmbienceStimFrame = darkAmbienceBuff + ->ringBuffer.getDataAtSlot(darkAmbienceRingbuffIndex); + + darkAmbienceStimFrame.lock.writeAcquire(); + context->darkAmbienceStimFrame = std::make_optional( + std::ref(darkAmbienceStimFrame)); + darkAmbienceProductionDescDesc = AmbienceProductionDesc{ + std::ref(darkAmbienceStimFrame), + darkAmbienceBuff->passbandCountLtComparator}; + } + else { + context->darkAmbienceStimFrame = std::nullopt; + } + + pcloudProducer.openClCollatingAndMeshingEngine.compactCollateAndMeshFrameReq( + loop, stimulusFrame, + context->intensityStimFrame, + std::move(lightAmbienceProductionDescDesc), + std::move(darkAmbienceProductionDescDesc), + {context, std::bind( + &ProduceFrameReq::produceFrameReq3_compactCollateDone, + context.get(), context, + std::placeholders::_1, std::placeholders::_2)}); } void produceFrameReq3_compactCollateDone( @@ -623,10 +603,7 @@ public: } #endif - /** EXPLANATION: - * Release intensity/ambience frames if they were supplied/used, - * regardless of whether or not a cancelation request occurred. - */ + // Release intensity frame if it was used if (context->intensityStimFrame.has_value()) { context->intensityStimFrame->get().lock.writeRelease(); } @@ -638,169 +615,130 @@ public: context->darkAmbienceStimFrame->get().lock.writeRelease(); } - if (!success) - { - callOriginalCallback(); - - std::cerr << __func__ - << ": Failed to compact and collate frame" - << std::endl; - - return; - } - - /** EXPLANATION: - * Cancellation early exit for the success path. Analogous to the - * (legacy, removed) pre-canceler check under shouldContinueLock before - * lock.unlockPrematurely() in the former CPS version of this handler. - * - * Assumptions that make the unlocked tail (dump, debug logging, stage - * durations, final callOriginalCallback()) safe without holding - * stimulusProducerCanceler.s.lock: - * - * 1. This handler is only entered from - * CompactCollateAndMeshFrameReq4's execUncancelableSegmentOrAbort - * body, so openClCollatingAndMeshingEngine::finalize() cannot - * tear down OpenCL/engine state until this function returns. - * - * 2. PcloudStimulusProducer::stop() finalizes OpenCL before io_uring, - * so getCompactKernelDuration()/getCollateKernelDuration() and - * getAssemblyDuration() are not racing engine finalize here. - * - * 3. dump/logging reads only producer-owned state (pcloudFrameDumper, - * collationBuffer, device, context->frameAssemblyResult) that - * stop()/finalize() do not destroy. - * - * 4. A stop() that lands after this read but before the tail is - * intentional stale work (same as unlockPrematurely()), not UAF. - * - * Rework to run portions inside stimulusProducerCanceler - * execUncancelableSegmentOrAbort if any of the above ceases to hold: - * e.g. this callback is invoked outside the OpenCL uncancelable - * segment, the tail begins touching engine or buffer state that - * finalize() resets, or stop() ordering changes so teardown can - * interleave with the unlocked tail. - */ - const bool shouldContinue = - !pcloudProducer.stimulusProducerCanceler.isCancellationRequested(); - - if (!shouldContinue) + sscl::SpinLock::Guard lock(pcloudProducer.shouldContinueLock); + if (!pcloudProducer.shouldContinue) { callOriginalCallback(); return; } - if (pcloudProducer.pcloudFrameDumper.isEnabled()) + if (!success) { + std::cerr << __func__ << ": Failed to compact and collate frame" << std::endl; + } else { - try + lock.unlockPrematurely(); + if (pcloudProducer.pcloudFrameDumper.isEnabled()) { - pcloudProducer.pcloudFrameDumper.dumpProducedFrame( - *pcloudProducer.device, - pcloudProducer.collationBuffer, - context->frameAssemblyResult); + try + { + pcloudProducer.pcloudFrameDumper.dumpProducedFrame( + *pcloudProducer.device, + pcloudProducer.collationBuffer, + context->frameAssemblyResult); + } + catch (const std::exception& e) + { + std::cerr << __func__ << ": Failed to dump pcloud frame: " + << e.what() << std::endl; + } } - catch (const std::exception& e) - { - std::cerr << __func__ << ": Failed to dump pcloud frame: " - << e.what() << std::endl; - } - } #if SMO_DEBUG_PCLOUD_AMBIENCE_INTRIN - if (logLightAmbience) - { - auto lightBuff = - pcloudProducer.lightAmbienceStimulusBuffer.load( - std::memory_order_acquire); - if (lightBuff) + if (logLightAmbience) { - std::cerr << __func__ << ": pcloudLightAmbience " - << "passbandCount=" << logLightPassbandCount - << " (per-slot avg intensity " - << ambienceComparatorOpChar( - lightBuff->passbandCountGtComparator.op) - << " " << lightBuff->passbandCountGtComparator.value - << ")"; - if (lightBuff->negtrinInterestConfig.has_value()) + auto lightBuff = + pcloudProducer.lightAmbienceStimulusBuffer.load( + std::memory_order_acquire); + if (lightBuff) { - const auto& nc = *lightBuff->negtrinInterestConfig; - std::cerr << " negtrinInterestThr=" << nc.threshold; - if (nc.percentage != 0U) + std::cerr << __func__ << ": pcloudLightAmbience " + << "passbandCount=" << logLightPassbandCount + << " (per-slot avg intensity " + << ambienceComparatorOpChar( + lightBuff->passbandCountGtComparator.op) + << " " << lightBuff->passbandCountGtComparator.value + << ")"; + if (lightBuff->negtrinInterestConfig.has_value()) { - std::cerr << " (from " << nc.percentage << "%)"; + const auto& nc = *lightBuff->negtrinInterestConfig; + std::cerr << " negtrinInterestThr=" << nc.threshold; + if (nc.percentage != 0U) + { + std::cerr << " (from " << nc.percentage << "%)"; + } + std::cerr << " meetsNegtrinInterest=" + << (lightBuff->shouldTriggerNegtrinEvent( + logLightPassbandCount) + ? "yes" : "no"); } - std::cerr << " meetsNegtrinInterest=" - << (lightBuff->shouldTriggerNegtrinEvent( - logLightPassbandCount) - ? "yes" : "no"); + else + { + std::cerr << " negtrinInterest(n/a)"; + } + std::cerr << std::endl; } - else - { - std::cerr << " negtrinInterest(n/a)"; - } - std::cerr << std::endl; } - } - if (logDarkAmbience) - { - auto darkBuff = - pcloudProducer.darkAmbienceStimulusBuffer.load( - std::memory_order_acquire); - if (darkBuff) + if (logDarkAmbience) { - std::cerr << __func__ << ": pcloudDarkAmbience " - << "passbandCount=" << logDarkPassbandCount - << " (per-slot avg intensity " - << ambienceComparatorOpChar( - darkBuff->passbandCountLtComparator.op) - << " " << darkBuff->passbandCountLtComparator.value - << ")"; - if (darkBuff->postrinInterestConfig.has_value()) + auto darkBuff = + pcloudProducer.darkAmbienceStimulusBuffer.load( + std::memory_order_acquire); + if (darkBuff) { - const auto& pc = *darkBuff->postrinInterestConfig; - std::cerr << " postrinInterestThr=" << pc.threshold; - if (pc.percentage != 0U) + std::cerr << __func__ << ": pcloudDarkAmbience " + << "passbandCount=" << logDarkPassbandCount + << " (per-slot avg intensity " + << ambienceComparatorOpChar( + darkBuff->passbandCountLtComparator.op) + << " " << darkBuff->passbandCountLtComparator.value + << ")"; + if (darkBuff->postrinInterestConfig.has_value()) { - std::cerr << " (from " << pc.percentage << "%)"; + const auto& pc = *darkBuff->postrinInterestConfig; + std::cerr << " postrinInterestThr=" << pc.threshold; + if (pc.percentage != 0U) + { + std::cerr << " (from " << pc.percentage << "%)"; + } + std::cerr << " meetsPostrinInterest=" + << (darkBuff->shouldTriggerPostrinEvent( + logDarkPassbandCount) + ? "yes" : "no"); } - std::cerr << " meetsPostrinInterest=" - << (darkBuff->shouldTriggerPostrinEvent( - logDarkPassbandCount) - ? "yes" : "no"); + else + { + std::cerr << " postrinInterest(n/a)"; + } + std::cerr << std::endl; } - else - { - std::cerr << " postrinInterest(n/a)"; - } - std::cerr << std::endl; } - } #endif #if SMO_PRINT_PCLOUD_STAGE_DURATIONS - const auto logNow = std::chrono::system_clock::now(); - const std::time_t logTime = - std::chrono::system_clock::to_time_t(logNow); - const auto logSubsecMs = - std::chrono::duration_cast( - logNow.time_since_epoch()) % 1000; - auto assemblyDuration = - pcloudProducer.ioUringAssemblyEngine.getAssemblyDuration(); - auto compactDuration = - pcloudProducer.openClCollatingAndMeshingEngine - .getCompactKernelDuration(); - auto collateDuration = - pcloudProducer.openClCollatingAndMeshingEngine - .getCollateKernelDuration(); - std::cout << std::put_time(std::localtime(&logTime), "%T") - << '.' << std::setfill('0') << std::setw(3) - << logSubsecMs.count() << ' ' - << __func__ << ": stage durations: assembly=" - << assemblyDuration.count() - << "ms, compactKernel=" << compactDuration.count() - << "ms, collateKernel=" << collateDuration.count() - << "ms" << std::endl; + const auto logNow = std::chrono::system_clock::now(); + const std::time_t logTime = + std::chrono::system_clock::to_time_t(logNow); + const auto logSubsecMs = + std::chrono::duration_cast( + logNow.time_since_epoch()) % 1000; + auto assemblyDuration = + pcloudProducer.ioUringAssemblyEngine.getAssemblyDuration(); + auto compactDuration = + pcloudProducer.openClCollatingAndMeshingEngine + .getCompactKernelDuration(); + auto collateDuration = + pcloudProducer.openClCollatingAndMeshingEngine + .getCollateKernelDuration(); + std::cout << std::put_time(std::localtime(&logTime), "%T") + << '.' << std::setfill('0') << std::setw(3) + << logSubsecMs.count() << ' ' + << __func__ << ": stage durations: assembly=" + << assemblyDuration.count() + << "ms, compactKernel=" << compactDuration.count() + << "ms, collateKernel=" << collateDuration.count() + << "ms" << std::endl; #endif + } callOriginalCallback(); } @@ -810,9 +748,10 @@ void PcloudStimulusProducer::produceFrameReq( sscl::cps::Callback callback) { /** EXPLANATION: - * We don't do any additional canceler-lock acquisition here because - * callback segment methods already use stimulusProducerCanceler - * checkpoints before running uncancelable work. + * We shouldn't acquire the StimulusProducer::shouldContinueLock here because + * this function is called from + * StimulusProducer::stimFrameProductionTimesliceInd(), which is already + * holding the lock. */ auto caller = smoHooksPtr->ComponentThread_getSelf(); auto request = std::make_shared(