diff --git a/commonLibs/attachmentSupport/stimulusProducer.cpp b/commonLibs/attachmentSupport/stimulusProducer.cpp index a33d053..cc13b21 100644 --- a/commonLibs/attachmentSupport/stimulusProducer.cpp +++ b/commonLibs/attachmentSupport/stimulusProducer.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include #include @@ -91,10 +90,7 @@ void StimulusProducer::destroyAttachedStimulusBuffer( void StimulusProducer::stop() { - { - sscl::SpinLock::Guard lock(shouldContinueLock); - shouldContinue = false; - } + (void)stimulusProducerCanceler.requestStop(); // Cancel timer immediately timer.cancel(); @@ -105,7 +101,7 @@ void StimulusProducer::stop() void StimulusProducer::scheduleNextTimeout(int delayMs) { - if (!shouldContinue) + if (stimulusProducerCanceler.isCancellationRequestedUnlocked()) { return; } // Schedule the next timeout using the provided delay @@ -131,10 +127,6 @@ void StimulusProducer::onTimeout(const boost::system::error_code& error) return; } - sscl::SpinLock::Guard lock(shouldContinueLock); - if (!shouldContinue) - { return; } - /** EXPLANATION: * We need to ensure that there's only ever one stimframe being produced * during any CONFIG_STIMBUFF_FRAME_PERIOD_MS period. To guarantee this, we @@ -148,35 +140,39 @@ void StimulusProducer::onTimeout(const boost::system::error_code& error) */ int nextWakeupDelayMs; bool deferred = false; - if (frameAssemblyRateLimiter.tryAcquire()) + bool shouldContinue; + shouldContinue = stimulusProducerCanceler.execUncancelableSegmentOrAbort( + [&]() { - nextWakeupDelayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS; - - // Check if we're ending a deferral period - if (nDeferrals > 0) + if (frameAssemblyRateLimiter.tryAcquire()) { - auto deferralEndTime = std::chrono::high_resolution_clock::now(); - auto duration = deferralEndTime - deferralStartTime; - auto durationMs = std::chrono::duration_cast< - std::chrono::milliseconds>(duration); + nextWakeupDelayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS; - std::cout << __func__ << ": Deferral period ended. " - << "Total deferrals: " << nDeferrals - << ", Duration: " << durationMs.count() << "ms" << std::endl; + // Check if we're ending a deferral period + if (nDeferrals > 0) + { + auto deferralEndTime = std::chrono::high_resolution_clock::now(); + auto duration = deferralEndTime - deferralStartTime; + auto durationMs = std::chrono::duration_cast< + std::chrono::milliseconds>(duration); - nDeferrals = 0; + std::cout << __func__ << ": Deferral period ended. " + << "Total deferrals: " << nDeferrals + << ", Duration: " << durationMs.count() << "ms" << std::endl; + + nDeferrals = 0; + } + + /** EXPLANATION: + * Call the derived class's frame production handler + * Note: The derived class's frame production handler (aka + * its implementation of stimFrameProductionTimesliceInd()) must + * release the lock when frame production completes + */ + stimFrameProductionTimesliceInd(); + return; } - /** EXPLANATION: - * Call the derived class's frame production handler - * Note: The derived class's frame production handler (aka - * its implementation of stimFrameProductionTimesliceInd()) must - * release the lock when frame production completes - */ - stimFrameProductionTimesliceInd(); - } - else - { nextWakeupDelayMs = CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS; deferred = true; @@ -189,7 +185,9 @@ void StimulusProducer::onTimeout(const boost::system::error_code& error) "Configured deferral period: " << nextWakeupDelayMs << "ms" << std::endl; } - } + }); + + if (!shouldContinue) { return; } scheduleNextTimeout(nextWakeupDelayMs); diff --git a/include/user/stimulusProducer.h b/include/user/stimulusProducer.h index 5d868a4..df7d373 100644 --- a/include/user/stimulusProducer.h +++ b/include/user/stimulusProducer.h @@ -14,6 +14,7 @@ #include #include #include +#include #include "deviceAttachmentSpec.h" namespace smo { @@ -40,8 +41,7 @@ public: &deviceAttachmentSpec, boost::asio::io_service& ioService_) : deviceAttachmentSpec(deviceAttachmentSpec), - ioService(ioService_), - shouldContinue(false), timer(ioService), + ioService(ioService_), timer(ioService), nDeferrals(0) {} @@ -59,7 +59,7 @@ public: std::cout << __func__ << ": Starting stimulus producer for device " << deviceAttachmentSpec->deviceSelector << std::endl; - shouldContinue = true; + stimulusProducerCanceler.startAcceptingWork(); nDeferrals = 0; scheduleNextTimeout(); } @@ -109,8 +109,7 @@ public: private: boost::asio::io_service& ioService; protected: - sscl::SpinLock shouldContinueLock; - bool shouldContinue; + sscl::SyncCancelerForAsyncWork stimulusProducerCanceler; private: boost::asio::deadline_timer timer; size_t nDeferrals; diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp index f9d73f4..52848c7 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include "ioUringAssemblyEngine.h" #include "pcloudStimulusProducer.h" #include "livoxGen1.h" @@ -59,7 +58,6 @@ IoUringAssemblyEngine::IoUringAssemblyEngine( frameAssemblyDesc(nullptr), ring{}, eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0), stallTimer(parent_.device->componentThread->getIoService()), -shouldAcceptRequests(false), nDgramsPerStagingBufferFrame(nDgramsPerStagingBufferFrame_), assembledSlotsTracker(nDgramsPerStagingBufferFrame_), randomDevice(), randomGenerator(randomDevice()) @@ -68,13 +66,10 @@ randomDevice(), randomGenerator(randomDevice()) bool IoUringAssemblyEngine::setup() { // Defensive check to prevent double-calling + if (!ioUringAssemblyEngnCanceler.isCancellationRequested()) { - sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); - if (shouldAcceptRequests) - { - throw std::runtime_error(std::string(__func__) + ": setup() called " - "while already set up"); - } + throw std::runtime_error(std::string(__func__) + ": setup() called " + "while already set up"); } // Get FrameAssemblyDesc from staging buffer @@ -156,7 +151,7 @@ bool IoUringAssemblyEngine::setup() if (ret < 0) { goto cleanup_eventfd; } - shouldAcceptRequests = true; + ioUringAssemblyEngnCanceler.startAcceptingWork(); return true; cleanup_eventfd: @@ -229,7 +224,7 @@ void IoUringAssemblyEngine::resetAndAssembleFrame( + ": onCqeReady callback is invalid"); } - if (!shouldAcceptRequests) + if (ioUringAssemblyEngnCanceler.isCancellationRequestedUnlocked()) { throw std::runtime_error(std::string(__func__) + ": engine is not accepting requests"); @@ -321,11 +316,7 @@ void IoUringAssemblyEngine::resetAndAssembleFrame( bool IoUringAssemblyEngine::stop() { - // Acquire and release lock tightly around setting the flag - sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); - bool wasAcceptingRequests = shouldAcceptRequests; - shouldAcceptRequests = false; - return wasAcceptingRequests; + return ioUringAssemblyEngnCanceler.requestStop(); } void IoUringAssemblyEngine::assemblyCycleComplete() @@ -444,39 +435,44 @@ public: void assembleFrameReq1_posted( std::shared_ptr context) { - sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock); + auto& canceler = engine.ioUringAssemblyEngnCanceler; + const bool started = canceler.execUncancelableSegmentOrAbort( + [context, this]() + { + // Initialize loop with number of slots + context->loop = sscl::AsynchronousLoop( + engine.frameAssemblyDesc->numSlots); - if (!engine.shouldAcceptRequests) + // Record assembly start time + engine.assemblyStartTime = + std::chrono::high_resolution_clock::now(); + + /** FIXME: + * I'm suspicious of this std::bind return object here. What if us + * setting it to null inside of stop() doesn't actually cause the + * object to be destroyed? This would cause this contin's sh_ptr's + * reference count to never reach 0, causing a memory leak. + */ + engine.resetAndAssembleFrame( + std::bind(&AssembleFrameReq::assembleFrameReq2_2, + context.get(), context, + std::placeholders::_1, std::placeholders::_2)); + + // Set up timeout timer for IOURINGASSM_ENGN_FRAME_ASSEM_TIMEOUT_MS ms + engine.stallTimer.expires_from_now( + boost::posix_time::milliseconds( + IOURINGASSM_ENGN_FRAME_ASSEM_TIMEOUT_MS)); + engine.stallTimer.async_wait( + std::bind(&AssembleFrameReq::assembleFrameReq2_1, + context.get(), context, + std::placeholders::_1)); + }); + + if (!started) { context->callOriginalCallback(false, sscl::AsynchronousLoop(0)); return; } - - // Initialize loop with number of slots - context->loop = sscl::AsynchronousLoop(engine.frameAssemblyDesc->numSlots); - - // Record assembly start time - engine.assemblyStartTime = std::chrono::high_resolution_clock::now(); - - /** FIXME: - * I'm suspicious of this std::bind return object here. What if us - * setting it to null inside of stop() doesn't actually cause the - * object to be destroyed? This would cause this contin's sh_ptr's - * reference count to never reach 0, causing a memory leak. - */ - engine.resetAndAssembleFrame( - std::bind(&AssembleFrameReq::assembleFrameReq2_2, - context.get(), context, - std::placeholders::_1, std::placeholders::_2)); - - // Set up timeout timer for IOURINGASSM_ENGN_FRAME_ASSEM_TIMEOUT_MS ms - engine.stallTimer.expires_from_now( - boost::posix_time::milliseconds( - IOURINGASSM_ENGN_FRAME_ASSEM_TIMEOUT_MS)); - engine.stallTimer.async_wait( - std::bind(&AssembleFrameReq::assembleFrameReq2_1, - context.get(), context, - std::placeholders::_1)); } void assembleFrameReq2_1( @@ -498,19 +494,22 @@ public: * indeed seen a SEGFAULT even in the current code with locking, so * I'm going to hold the lock here for now. */ - sscl::SpinLock::Guard lock(context->engine.shouldAcceptRequestsLock); + auto& canceler = context->engine.ioUringAssemblyEngnCanceler; + const bool shouldContinue = canceler.execUncancelableSegmentOrAbort( + [context]() + { + // Set timer fired flag + context->timerFired.store(true); + context->assembleFrameReq3(context); + }); - if (!context->engine.shouldAcceptRequests) + if (!shouldContinue) { context->engine.assemblyCycleComplete(); context->loop.setRemainingIterationsToFailure(); context->callOriginalCallback(false, context->loop); return; } - - // Set timer fired flag - context->timerFired.store(true); - context->assembleFrameReq3(context); } void assembleFrameReq2_2( @@ -518,7 +517,8 @@ public: void *user_data, int cqe_result) { // NB: The lock was acquired by onEventFdRead before calling this func - if (!context->engine.shouldAcceptRequests) + if (context->engine.ioUringAssemblyEngnCanceler + .isCancellationRequestedUnlocked()) { context->engine.assemblyCycleComplete(); context->loop.setRemainingIterationsToFailure(); @@ -549,8 +549,8 @@ public: { /** EXPLANATION: * All branch paths that invoke this unifyig oracle function are - * expected to already hold the shouldAcceptRequestsLock before calling - * it. + * expected to already hold ioUringAssemblyEngnCanceler's lock before + * calling it. */ // Ensure we only execute once using atomic exchange if (context->handlerExecuted.exchange(true)) { return; } @@ -638,8 +638,7 @@ void IoUringAssemblyEngine::assembleFrameReq( sscl::cps::Callback cb) { { - sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); - if (!shouldAcceptRequests) + if (ioUringAssemblyEngnCanceler.isCancellationRequested()) { cb.callbackFn(false, sscl::AsynchronousLoop(0)); return; @@ -670,7 +669,7 @@ void IoUringAssemblyEngine::onEventfdRead( * IoUringAssemblyEngine's per-assembly state isn't destroyed while this * handler is running. */ - sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); + sscl::SpinLock::Guard lock(ioUringAssemblyEngnCanceler.s.lock); /** EXPLANATION: * You'd think we should put check for shouldAcceptRequests here and * `return` here if !shouldAcceptRequests, but we shouldn't because @@ -722,7 +721,7 @@ void IoUringAssemblyEngine::onEventfdRead( * But we do put a `return` here because we know that at this point, the * caller's callback has already been invoked. */ - if (!shouldAcceptRequests + if (ioUringAssemblyEngnCanceler.isCancellationRequestedUnlocked() || eventfdDesc == nullptr || !eventfdDesc->is_open()) { return; diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h index 82ec0ac..489997a 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h @@ -19,7 +19,7 @@ #include #include #include -#include +#include #include #include @@ -80,12 +80,7 @@ private: boost::asio::deadline_timer stallTimer; // Callback for CQE ntfns (called with user_data+result from each CQE) resetAndAssembleFrameCbFn onCqeReadyCallback; - /** EXPLANATION: - * Flag to indicate whether engine should accept new requests. - * Set by setup(), cleared by stop(). - */ - sscl::SpinLock shouldAcceptRequestsLock; - bool shouldAcceptRequests; + sscl::SyncCancelerForAsyncWork ioUringAssemblyEngnCanceler; size_t nDgramsPerStagingBufferFrame; diff --git a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp index c0661fa..b8ec2d1 100644 --- a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp +++ b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp @@ -39,7 +39,6 @@ clAverageIntensityBufferClBuffer(nullptr), clAssemblyBuffer(nullptr), clCollationBuffer(nullptr), clAverageIntensityBuffer(nullptr), -shouldAcceptRequests(false), compactIsRunning(false), collateIsRunning(false), currentCompactKernelEvent(nullptr), currentCollateKernelEvent(nullptr), @@ -64,13 +63,10 @@ OpenClCollatingAndMeshingEngine::~OpenClCollatingAndMeshingEngine() bool OpenClCollatingAndMeshingEngine::setup() { // Defensive check to prevent double-calling + if (!openClCollMeshEngnCanceler.isCancellationRequested()) { - sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); - if (shouldAcceptRequests) - { - throw std::runtime_error(std::string(__func__) + ": setup() called " - "while already set up"); - } + throw std::runtime_error(std::string(__func__) + ": setup() called " + "while already set up"); } if (!smoHooksPtr || !smoHooksPtr->ComputeManager_getDevice) @@ -202,7 +198,7 @@ bool OpenClCollatingAndMeshingEngine::setup() clFlush(computeDevice->commandQueue); clFinish(computeDevice->commandQueue); - shouldAcceptRequests = true; + openClCollMeshEngnCanceler.startAcceptingWork(); return true; } @@ -771,11 +767,7 @@ bool OpenClCollatingAndMeshingEngine::setupCollateDgramsArgs( bool OpenClCollatingAndMeshingEngine::stop() { - // Acquire and release lock tightly around setting the flag - sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); - bool wasAcceptingRequests = shouldAcceptRequests; - shouldAcceptRequests = false; - return wasAcceptingRequests; + return openClCollMeshEngnCanceler.requestStop(); } void OpenClCollatingAndMeshingEngine::compactKernelComplete(bool isFinalizing) @@ -1051,28 +1043,33 @@ public: void compactCollateAndMeshFrameReq1_doCompact_posted( std::shared_ptr context) { - sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock); - if (!engine.shouldAcceptRequests) + auto& canceler = engine.openClCollMeshEngnCanceler; + const bool shouldContinue = canceler.execUncancelableSegmentOrAbort( + [context, this]() { - callOriginalCallback(false); - return; - } + // Record compact kernel start time + engine.compactKernelStartTime = + std::chrono::high_resolution_clock::now(); - // Record compact kernel start time - engine.compactKernelStartTime = std::chrono::high_resolution_clock::now(); + bool success = engine.startCompactKernel( + engine.parent.assemblyBuffer, + static_cast( + context->frameAssemblyResult.nSucceeded.load()), + std::bind( + &CompactCollateAndMeshFrameReq + ::compactCollateAndMeshFrameReq2_compactDone_posted, + context.get(), context, + std::placeholders::_1)); - bool success = engine.startCompactKernel( - engine.parent.assemblyBuffer, - static_cast(context->frameAssemblyResult.nSucceeded.load()), - std::bind( - &CompactCollateAndMeshFrameReq - ::compactCollateAndMeshFrameReq2_compactDone_posted, - context.get(), context, - std::placeholders::_1)); + if (!success) + { + engine.compactKernelComplete(); + callOriginalCallback(false); + } + }); - if (!success) + if (!shouldContinue) { - engine.compactKernelComplete(); callOriginalCallback(false); return; } @@ -1082,8 +1079,27 @@ public: std::shared_ptr context, cl_int compactStatus) { - sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock); - if (!engine.shouldAcceptRequests) + bool compactFailed = false; + + auto& canceler = engine.openClCollMeshEngnCanceler; + const bool shouldContinue = canceler.execUncancelableSegmentOrAbort( + [context, this, compactStatus, &compactFailed]() + { + engine.compactKernelComplete(); + // Record compact kernel end time + engine.compactKernelEndTime = + std::chrono::high_resolution_clock::now(); + + // If compact failed, call callback directly with failure + if (compactStatus != CL_SUCCESS) + { + compactFailed = true; + callOriginalCallback(false); + return; + } + }); + + if (!shouldContinue) { /** EXPLANATION: * We intentionally don't call compactKernelComplete() here because @@ -1095,16 +1111,7 @@ public: return; } - engine.compactKernelComplete(); - // Record compact kernel end time - engine.compactKernelEndTime = std::chrono::high_resolution_clock::now(); - - // If compact failed, call callback directly with failure - if (compactStatus != CL_SUCCESS) - { - callOriginalCallback(false); - return; - } + if (compactFailed) { return; } #if 0 // Print first 4 bytes of each slot @@ -1116,36 +1123,39 @@ public: } #endif - lock.unlockPrematurely(); context->compactCollateAndMeshFrameReq3_doCollate_posted(context); } void compactCollateAndMeshFrameReq3_doCollate_posted( std::shared_ptr context) { - sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock); - if (!engine.shouldAcceptRequests) + auto& canceler = engine.openClCollMeshEngnCanceler; + const bool shouldContinue = canceler.execUncancelableSegmentOrAbort( + [context, this]() { - callOriginalCallback(false); - return; - } + // Record collate kernel start time + engine.collateKernelStartTime = + std::chrono::high_resolution_clock::now(); - // Record collate kernel start time - engine.collateKernelStartTime = std::chrono::high_resolution_clock::now(); + bool success = engine.startCollateKernel( + context->intensityStimFrame, context->anyAmbienceAttached(), + std::bind( + &CompactCollateAndMeshFrameReq + ::compactCollateAndMeshFrameReq4_collateDone_maybePosted, + context.get(), context, + std::placeholders::_1)); - bool success = engine.startCollateKernel( - context->intensityStimFrame, context->anyAmbienceAttached(), - std::bind( - &CompactCollateAndMeshFrameReq - ::compactCollateAndMeshFrameReq4_collateDone_maybePosted, - context.get(), context, - std::placeholders::_1)); + if (!success) + { + engine.collateKernelComplete( + context->intensityStimFrame, context->anyAmbienceAttached()); - if (!success) + callOriginalCallback(false); + } + }); + + if (!shouldContinue) { - engine.collateKernelComplete( - context->intensityStimFrame, context->anyAmbienceAttached()); - callOriginalCallback(false); return; } @@ -1155,16 +1165,6 @@ public: [[maybe_unused]] std::shared_ptr context, cl_int collateStatus) { - sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock); - if (!engine.shouldAcceptRequests) - { - /* We intentionally don't call collateKernelComplete() here for the - * same reason as above. - */ - callOriginalCallback(false); - return; - } - /** EXPLANATION: * The reason we don't call collateKernelComplete before checking * shouldAcceptRequests is because if shouldAcceptRequests is false, then @@ -1174,77 +1174,92 @@ public: * Therefore it's finalize()'s responsibility to ensure that it properly * completes/cleans up any in-flight operations. */ - engine.collateKernelComplete( - context->intensityStimFrame, context->anyAmbienceAttached()); - - // Produce each attached ambience stimbuff's passband count from - // the per-slot averages the collate kernel staged. - uint32_t nSucceededForAmbience = - context->frameAssemblyResult.nSucceeded.load(); - - if (context->lightAmbienceProductionDesc.has_value()) + auto& canceler = engine.openClCollMeshEngnCanceler; + const bool shouldContinue = canceler.execUncancelableSegmentOrAbort( + [context, this, collateStatus]() { - engine.produceAmbienceStimulusFrame( - context->lightAmbienceProductionDesc->frame.get(), - context->lightAmbienceProductionDesc->comparator, - nSucceededForAmbience); - } + engine.collateKernelComplete( + context->intensityStimFrame, context->anyAmbienceAttached()); - if (context->darkAmbienceProductionDesc.has_value()) - { - engine.produceAmbienceStimulusFrame( - context->darkAmbienceProductionDesc->frame.get(), - context->darkAmbienceProductionDesc->comparator, - nSucceededForAmbience); - } - - // Record collate kernel end time - engine.collateKernelEndTime = std::chrono::high_resolution_clock::now(); - - bool success = (collateStatus == CL_SUCCESS); - - // Early callback + return pattern - if (!success) + // Produce each attached ambience stimbuff's passband count from + // the per-slot averages the collate kernel staged. + uint32_t nSucceededForAmbience = + context->frameAssemblyResult.nSucceeded.load(); + + if (context->lightAmbienceProductionDesc.has_value()) + { + engine.produceAmbienceStimulusFrame( + context->lightAmbienceProductionDesc->frame.get(), + context->lightAmbienceProductionDesc->comparator, + nSucceededForAmbience); + } + + if (context->darkAmbienceProductionDesc.has_value()) + { + engine.produceAmbienceStimulusFrame( + context->darkAmbienceProductionDesc->frame.get(), + context->darkAmbienceProductionDesc->comparator, + nSucceededForAmbience); + } + + // Record collate kernel end time + engine.collateKernelEndTime = + std::chrono::high_resolution_clock::now(); + + bool success = (collateStatus == CL_SUCCESS); + + // Early callback + return pattern + if (!success) + { + callOriginalCallback(false); + return; + } + + uint32_t nSucceeded = context->frameAssemblyResult.nSucceeded.load(); + + int returnMode = static_cast(engine.parent.device->currentReturnMode); + size_t pointsPerDgram = livoxProto1::Device::getNPointsPerDgram( + returnMode); + size_t totalPoints = nSucceeded * pointsPerDgram; + + // Count points with intensity greater than 116 + size_t highIntensityCount = 0; + if (context->intensityStimFrame.has_value()) + { + StimulusFrame& intensityFrame = context->intensityStimFrame->get(); + float* intensityFloats = reinterpret_cast(intensityFrame.slotDesc.vaddr); + for (size_t i = 0; i < totalPoints; ++i) + { + float intensity = intensityFloats[i]; + if (intensity >= 116.0f) + { + ++highIntensityCount; + } + } + } + (void)highIntensityCount; + + #if 0 + std::cout << __func__ << ": intensityRingBufferIndex=" + << (context->intensityStimFrame.has_value() ? + context->intensityStimFrame->get().ringBufferIndex : SIZE_MAX) + << ", pointsPerDgram=" << pointsPerDgram + << ", nSucceeded=" << nSucceeded + << ", totalPoints=" << totalPoints + << ", highIntensityCount=" << highIntensityCount << std::endl; + #endif + + callOriginalCallback(success); + }); + + if (!shouldContinue) { + /* We intentionally don't call collateKernelComplete() here for the + * same reason as above. + */ callOriginalCallback(false); return; } - - uint32_t nSucceeded = context->frameAssemblyResult.nSucceeded.load(); - - int returnMode = static_cast(engine.parent.device->currentReturnMode); - size_t pointsPerDgram = livoxProto1::Device::getNPointsPerDgram( - returnMode); - size_t totalPoints = nSucceeded * pointsPerDgram; - - // Count points with intensity greater than 116 - size_t highIntensityCount = 0; - if (context->intensityStimFrame.has_value()) - { - StimulusFrame& intensityFrame = context->intensityStimFrame->get(); - float* intensityFloats = reinterpret_cast(intensityFrame.slotDesc.vaddr); - for (size_t i = 0; i < totalPoints; ++i) - { - float intensity = intensityFloats[i]; - if (intensity >= 116.0f) - { - ++highIntensityCount; - } - } - } - (void)highIntensityCount; - -#if 0 - std::cout << __func__ << ": intensityRingBufferIndex=" - << (context->intensityStimFrame.has_value() ? - context->intensityStimFrame->get().ringBufferIndex : SIZE_MAX) - << ", pointsPerDgram=" << pointsPerDgram - << ", nSucceeded=" << nSucceeded - << ", totalPoints=" << totalPoints - << ", highIntensityCount=" << highIntensityCount << std::endl; -#endif - - callOriginalCallback(success); } }; @@ -1256,8 +1271,7 @@ void OpenClCollatingAndMeshingEngine::compactCollateAndMeshFrameReq( sscl::cps::Callback callback) { { - sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); - if (!shouldAcceptRequests) + if (openClCollMeshEngnCanceler.isCancellationRequested()) { callback.callbackFn(false, stimulusFrame); return; diff --git a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h index 3ef79c9..e65b99c 100644 --- a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h +++ b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include #include #include @@ -150,8 +150,7 @@ private: cl_mem clAverageIntensityBuffer; // State tracking - sscl::SpinLock shouldAcceptRequestsLock; - bool shouldAcceptRequests; + sscl::SyncCancelerForAsyncWork openClCollMeshEngnCanceler; bool compactIsRunning; bool collateIsRunning; cl_event currentCompactKernelEvent; diff --git a/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp b/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp index 6d74b04..3498396 100644 --- a/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp +++ b/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp @@ -464,117 +464,137 @@ public: void produceFrameReq1_doAssemble_posted( std::shared_ptr context) { - sscl::SpinLock::Guard lock(pcloudProducer.shouldContinueLock); - if (!pcloudProducer.shouldContinue) + const bool shouldContinue = pcloudProducer.stimulusProducerCanceler + .execUncancelableSegmentOrAbort( + [this, context]() + { + pcloudProducer.ioUringAssemblyEngine.assembleFrameReq( + {context, std::bind( + &ProduceFrameReq::produceFrameReq2_assembleDone, + context.get(), context, + std::placeholders::_1, std::placeholders::_2)}); + }); + + if (!shouldContinue) { callOriginalCallback(); return; } - - pcloudProducer.ioUringAssemblyEngine.assembleFrameReq( - {context, std::bind( - &ProduceFrameReq::produceFrameReq2_assembleDone, - context.get(), context, - std::placeholders::_1, std::placeholders::_2)}); } void produceFrameReq2_assembleDone( std::shared_ptr context, bool success, sscl::AsynchronousLoop loop) { - sscl::SpinLock::Guard lock(pcloudProducer.shouldContinueLock); - if (!pcloudProducer.shouldContinue) + bool shouldContinue = pcloudProducer.stimulusProducerCanceler + .execUncancelableSegmentOrAbort( + [this, context, success, loop]() { - callOriginalCallback(); - return; - } + if (!success) + { + callOriginalCallback(); - if (!success) - { - callOriginalCallback(); - - if (pcloudProducer.attachedStimulusBuffers.size() > 0) { - std::cerr << __func__ << ": Failed to assemble frame.\n"; + if (pcloudProducer.attachedStimulusBuffers.size() > 0) { + std::cerr << __func__ + << ": Failed to assemble frame.\n"; + } + return; } + + context->frameAssemblyResult = loop; + + // Check if intensity buffer is attached and acquire frame if so + if (auto intensityBuff = pcloudProducer + .intensityStimulusBuffer.load( + std::memory_order_acquire)) + { + size_t intensityRingbuffIndex = intensityBuff + ->ringBuffer.getIndexToProduceInto(); + + StimulusFrame& intensityStimFrame = intensityBuff + ->ringBuffer.getDataAtSlot( + intensityRingbuffIndex); + + intensityStimFrame.lock.writeAcquire(); + context->intensityStimFrame = std::make_optional( + std::ref(intensityStimFrame)); + } + else { + context->intensityStimFrame = std::nullopt; + } + + // Check if light ambience buffer is attached and acquire frame if so + std::optional + lightAmbienceProductionDescDesc; + if (auto lightAmbienceBuff = + pcloudProducer.lightAmbienceStimulusBuffer.load( + std::memory_order_acquire)) + { + size_t lightAmbienceRingbuffIndex = lightAmbienceBuff + ->ringBuffer.getIndexToProduceInto(); + + StimulusFrame& lightAmbienceStimFrame = + lightAmbienceBuff->ringBuffer.getDataAtSlot( + lightAmbienceRingbuffIndex); + + lightAmbienceStimFrame.lock.writeAcquire(); + context->lightAmbienceStimFrame = std::make_optional( + std::ref(lightAmbienceStimFrame)); + lightAmbienceProductionDescDesc = + AmbienceProductionDesc{ + std::ref(lightAmbienceStimFrame), + lightAmbienceBuff + ->passbandCountGtComparator}; + } + else { + context->lightAmbienceStimFrame = std::nullopt; + } + + // Check if dark ambience buffer is attached and acquire frame if so + std::optional + darkAmbienceProductionDescDesc; + if (auto darkAmbienceBuff = + pcloudProducer.darkAmbienceStimulusBuffer.load( + std::memory_order_acquire)) + { + size_t darkAmbienceRingbuffIndex = darkAmbienceBuff + ->ringBuffer.getIndexToProduceInto(); + + StimulusFrame& darkAmbienceStimFrame = + darkAmbienceBuff->ringBuffer.getDataAtSlot( + darkAmbienceRingbuffIndex); + + darkAmbienceStimFrame.lock.writeAcquire(); + context->darkAmbienceStimFrame = std::make_optional( + std::ref(darkAmbienceStimFrame)); + darkAmbienceProductionDescDesc = + AmbienceProductionDesc{ + std::ref(darkAmbienceStimFrame), + darkAmbienceBuff + ->passbandCountLtComparator}; + } + else { + context->darkAmbienceStimFrame = std::nullopt; + } + + pcloudProducer.openClCollatingAndMeshingEngine + .compactCollateAndMeshFrameReq( + context->frameAssemblyResult, stimulusFrame, + context->intensityStimFrame, + std::move(lightAmbienceProductionDescDesc), + std::move(darkAmbienceProductionDescDesc), + {context, std::bind( + &ProduceFrameReq + ::produceFrameReq3_compactCollateDone, + context.get(), context, + std::placeholders::_1, std::placeholders::_2)}); + }); + + if (!shouldContinue) + { + callOriginalCallback(); return; } - - context->frameAssemblyResult = loop; - - // Check if intensity buffer is attached and acquire frame if so - if (auto intensityBuff = pcloudProducer.intensityStimulusBuffer.load( - std::memory_order_acquire)) - { - size_t intensityRingbuffIndex = intensityBuff - ->ringBuffer.getIndexToProduceInto(); - - StimulusFrame& intensityStimFrame = intensityBuff - ->ringBuffer.getDataAtSlot( - intensityRingbuffIndex); - - intensityStimFrame.lock.writeAcquire(); - context->intensityStimFrame = std::make_optional( - std::ref(intensityStimFrame)); - } - else { - context->intensityStimFrame = std::nullopt; - } - - // Check if light ambience buffer is attached and acquire frame if so - std::optional lightAmbienceProductionDescDesc; - if (auto lightAmbienceBuff = - pcloudProducer.lightAmbienceStimulusBuffer.load( - std::memory_order_acquire)) - { - size_t lightAmbienceRingbuffIndex = lightAmbienceBuff - ->ringBuffer.getIndexToProduceInto(); - - StimulusFrame& lightAmbienceStimFrame = lightAmbienceBuff - ->ringBuffer.getDataAtSlot(lightAmbienceRingbuffIndex); - - lightAmbienceStimFrame.lock.writeAcquire(); - context->lightAmbienceStimFrame = std::make_optional( - std::ref(lightAmbienceStimFrame)); - lightAmbienceProductionDescDesc = AmbienceProductionDesc{ - std::ref(lightAmbienceStimFrame), - lightAmbienceBuff->passbandCountGtComparator}; - } - else { - context->lightAmbienceStimFrame = std::nullopt; - } - - // Check if dark ambience buffer is attached and acquire frame if so - std::optional darkAmbienceProductionDescDesc; - if (auto darkAmbienceBuff = - pcloudProducer.darkAmbienceStimulusBuffer.load( - std::memory_order_acquire)) - { - size_t darkAmbienceRingbuffIndex = darkAmbienceBuff - ->ringBuffer.getIndexToProduceInto(); - - StimulusFrame& darkAmbienceStimFrame = darkAmbienceBuff - ->ringBuffer.getDataAtSlot(darkAmbienceRingbuffIndex); - - darkAmbienceStimFrame.lock.writeAcquire(); - context->darkAmbienceStimFrame = std::make_optional( - std::ref(darkAmbienceStimFrame)); - darkAmbienceProductionDescDesc = AmbienceProductionDesc{ - std::ref(darkAmbienceStimFrame), - darkAmbienceBuff->passbandCountLtComparator}; - } - else { - context->darkAmbienceStimFrame = std::nullopt; - } - - pcloudProducer.openClCollatingAndMeshingEngine.compactCollateAndMeshFrameReq( - loop, stimulusFrame, - context->intensityStimFrame, - std::move(lightAmbienceProductionDescDesc), - std::move(darkAmbienceProductionDescDesc), - {context, std::bind( - &ProduceFrameReq::produceFrameReq3_compactCollateDone, - context.get(), context, - std::placeholders::_1, std::placeholders::_2)}); } void produceFrameReq3_compactCollateDone( @@ -603,7 +623,10 @@ public: } #endif - // Release intensity frame if it was used + /** EXPLANATION: + * Release intensity/ambience frames if they were supplied/used, + * regardless of whether or not a cancelation request occurred. + */ if (context->intensityStimFrame.has_value()) { context->intensityStimFrame->get().lock.writeRelease(); } @@ -615,130 +638,169 @@ public: context->darkAmbienceStimFrame->get().lock.writeRelease(); } - sscl::SpinLock::Guard lock(pcloudProducer.shouldContinueLock); - if (!pcloudProducer.shouldContinue) + if (!success) + { + callOriginalCallback(); + + std::cerr << __func__ + << ": Failed to compact and collate frame" + << std::endl; + + return; + } + + /** EXPLANATION: + * Cancellation early exit for the success path. Analogous to the + * (legacy, removed) pre-canceler check under shouldContinueLock before + * lock.unlockPrematurely() in the former CPS version of this handler. + * + * Assumptions that make the unlocked tail (dump, debug logging, stage + * durations, final callOriginalCallback()) safe without holding + * stimulusProducerCanceler.s.lock: + * + * 1. This handler is only entered from + * CompactCollateAndMeshFrameReq4's execUncancelableSegmentOrAbort + * body, so openClCollatingAndMeshingEngine::finalize() cannot + * tear down OpenCL/engine state until this function returns. + * + * 2. PcloudStimulusProducer::stop() finalizes OpenCL before io_uring, + * so getCompactKernelDuration()/getCollateKernelDuration() and + * getAssemblyDuration() are not racing engine finalize here. + * + * 3. dump/logging reads only producer-owned state (pcloudFrameDumper, + * collationBuffer, device, context->frameAssemblyResult) that + * stop()/finalize() do not destroy. + * + * 4. A stop() that lands after this read but before the tail is + * intentional stale work (same as unlockPrematurely()), not UAF. + * + * Rework to run portions inside stimulusProducerCanceler + * execUncancelableSegmentOrAbort if any of the above ceases to hold: + * e.g. this callback is invoked outside the OpenCL uncancelable + * segment, the tail begins touching engine or buffer state that + * finalize() resets, or stop() ordering changes so teardown can + * interleave with the unlocked tail. + */ + const bool shouldContinue = + !pcloudProducer.stimulusProducerCanceler.isCancellationRequested(); + + if (!shouldContinue) { callOriginalCallback(); return; } - if (!success) { - std::cerr << __func__ << ": Failed to compact and collate frame" << std::endl; - } else + if (pcloudProducer.pcloudFrameDumper.isEnabled()) { - lock.unlockPrematurely(); - if (pcloudProducer.pcloudFrameDumper.isEnabled()) + try { - try - { - pcloudProducer.pcloudFrameDumper.dumpProducedFrame( - *pcloudProducer.device, - pcloudProducer.collationBuffer, - context->frameAssemblyResult); - } - catch (const std::exception& e) - { - std::cerr << __func__ << ": Failed to dump pcloud frame: " - << e.what() << std::endl; - } + pcloudProducer.pcloudFrameDumper.dumpProducedFrame( + *pcloudProducer.device, + pcloudProducer.collationBuffer, + context->frameAssemblyResult); } + catch (const std::exception& e) + { + std::cerr << __func__ << ": Failed to dump pcloud frame: " + << e.what() << std::endl; + } + } #if SMO_DEBUG_PCLOUD_AMBIENCE_INTRIN - if (logLightAmbience) + if (logLightAmbience) + { + auto lightBuff = + pcloudProducer.lightAmbienceStimulusBuffer.load( + std::memory_order_acquire); + if (lightBuff) { - auto lightBuff = - pcloudProducer.lightAmbienceStimulusBuffer.load( - std::memory_order_acquire); - if (lightBuff) + std::cerr << __func__ << ": pcloudLightAmbience " + << "passbandCount=" << logLightPassbandCount + << " (per-slot avg intensity " + << ambienceComparatorOpChar( + lightBuff->passbandCountGtComparator.op) + << " " << lightBuff->passbandCountGtComparator.value + << ")"; + if (lightBuff->negtrinInterestConfig.has_value()) { - std::cerr << __func__ << ": pcloudLightAmbience " - << "passbandCount=" << logLightPassbandCount - << " (per-slot avg intensity " - << ambienceComparatorOpChar( - lightBuff->passbandCountGtComparator.op) - << " " << lightBuff->passbandCountGtComparator.value - << ")"; - if (lightBuff->negtrinInterestConfig.has_value()) + const auto& nc = *lightBuff->negtrinInterestConfig; + std::cerr << " negtrinInterestThr=" << nc.threshold; + if (nc.percentage != 0U) { - const auto& nc = *lightBuff->negtrinInterestConfig; - std::cerr << " negtrinInterestThr=" << nc.threshold; - if (nc.percentage != 0U) - { - std::cerr << " (from " << nc.percentage << "%)"; - } - std::cerr << " meetsNegtrinInterest=" - << (lightBuff->shouldTriggerNegtrinEvent( - logLightPassbandCount) - ? "yes" : "no"); + std::cerr << " (from " << nc.percentage << "%)"; } - else - { - std::cerr << " negtrinInterest(n/a)"; - } - std::cerr << std::endl; + std::cerr << " meetsNegtrinInterest=" + << (lightBuff->shouldTriggerNegtrinEvent( + logLightPassbandCount) + ? "yes" : "no"); } + else + { + std::cerr << " negtrinInterest(n/a)"; + } + std::cerr << std::endl; } - if (logDarkAmbience) + } + if (logDarkAmbience) + { + auto darkBuff = + pcloudProducer.darkAmbienceStimulusBuffer.load( + std::memory_order_acquire); + if (darkBuff) { - auto darkBuff = - pcloudProducer.darkAmbienceStimulusBuffer.load( - std::memory_order_acquire); - if (darkBuff) + std::cerr << __func__ << ": pcloudDarkAmbience " + << "passbandCount=" << logDarkPassbandCount + << " (per-slot avg intensity " + << ambienceComparatorOpChar( + darkBuff->passbandCountLtComparator.op) + << " " << darkBuff->passbandCountLtComparator.value + << ")"; + if (darkBuff->postrinInterestConfig.has_value()) { - std::cerr << __func__ << ": pcloudDarkAmbience " - << "passbandCount=" << logDarkPassbandCount - << " (per-slot avg intensity " - << ambienceComparatorOpChar( - darkBuff->passbandCountLtComparator.op) - << " " << darkBuff->passbandCountLtComparator.value - << ")"; - if (darkBuff->postrinInterestConfig.has_value()) + const auto& pc = *darkBuff->postrinInterestConfig; + std::cerr << " postrinInterestThr=" << pc.threshold; + if (pc.percentage != 0U) { - const auto& pc = *darkBuff->postrinInterestConfig; - std::cerr << " postrinInterestThr=" << pc.threshold; - if (pc.percentage != 0U) - { - std::cerr << " (from " << pc.percentage << "%)"; - } - std::cerr << " meetsPostrinInterest=" - << (darkBuff->shouldTriggerPostrinEvent( - logDarkPassbandCount) - ? "yes" : "no"); + std::cerr << " (from " << pc.percentage << "%)"; } - else - { - std::cerr << " postrinInterest(n/a)"; - } - std::cerr << std::endl; + std::cerr << " meetsPostrinInterest=" + << (darkBuff->shouldTriggerPostrinEvent( + logDarkPassbandCount) + ? "yes" : "no"); } + else + { + std::cerr << " postrinInterest(n/a)"; + } + std::cerr << std::endl; } + } #endif #if SMO_PRINT_PCLOUD_STAGE_DURATIONS - const auto logNow = std::chrono::system_clock::now(); - const std::time_t logTime = - std::chrono::system_clock::to_time_t(logNow); - const auto logSubsecMs = - std::chrono::duration_cast( - logNow.time_since_epoch()) % 1000; - auto assemblyDuration = - pcloudProducer.ioUringAssemblyEngine.getAssemblyDuration(); - auto compactDuration = - pcloudProducer.openClCollatingAndMeshingEngine - .getCompactKernelDuration(); - auto collateDuration = - pcloudProducer.openClCollatingAndMeshingEngine - .getCollateKernelDuration(); - std::cout << std::put_time(std::localtime(&logTime), "%T") - << '.' << std::setfill('0') << std::setw(3) - << logSubsecMs.count() << ' ' - << __func__ << ": stage durations: assembly=" - << assemblyDuration.count() - << "ms, compactKernel=" << compactDuration.count() - << "ms, collateKernel=" << collateDuration.count() - << "ms" << std::endl; + const auto logNow = std::chrono::system_clock::now(); + const std::time_t logTime = + std::chrono::system_clock::to_time_t(logNow); + const auto logSubsecMs = + std::chrono::duration_cast( + logNow.time_since_epoch()) % 1000; + auto assemblyDuration = + pcloudProducer.ioUringAssemblyEngine.getAssemblyDuration(); + auto compactDuration = + pcloudProducer.openClCollatingAndMeshingEngine + .getCompactKernelDuration(); + auto collateDuration = + pcloudProducer.openClCollatingAndMeshingEngine + .getCollateKernelDuration(); + std::cout << std::put_time(std::localtime(&logTime), "%T") + << '.' << std::setfill('0') << std::setw(3) + << logSubsecMs.count() << ' ' + << __func__ << ": stage durations: assembly=" + << assemblyDuration.count() + << "ms, compactKernel=" << compactDuration.count() + << "ms, collateKernel=" << collateDuration.count() + << "ms" << std::endl; #endif - } callOriginalCallback(); } @@ -748,10 +810,9 @@ void PcloudStimulusProducer::produceFrameReq( sscl::cps::Callback callback) { /** EXPLANATION: - * We shouldn't acquire the StimulusProducer::shouldContinueLock here because - * this function is called from - * StimulusProducer::stimFrameProductionTimesliceInd(), which is already - * holding the lock. + * We don't do any additional canceler-lock acquisition here because + * callback segment methods already use stimulusProducerCanceler + * checkpoints before running uncancelable work. */ auto caller = smoHooksPtr->ComponentThread_getSelf(); auto request = std::make_shared(