diff --git a/commonLibs/attachmentSupport/stimulusProducer.cpp b/commonLibs/attachmentSupport/stimulusProducer.cpp index a33d053..ae38126 100644 --- a/commonLibs/attachmentSupport/stimulusProducer.cpp +++ b/commonLibs/attachmentSupport/stimulusProducer.cpp @@ -91,10 +91,7 @@ void StimulusProducer::destroyAttachedStimulusBuffer( void StimulusProducer::stop() { - { - sscl::SpinLock::Guard lock(shouldContinueLock); - shouldContinue = false; - } + (void)stimulusProducerCanceler.requestStop(); // Cancel timer immediately timer.cancel(); @@ -105,7 +102,7 @@ void StimulusProducer::stop() void StimulusProducer::scheduleNextTimeout(int delayMs) { - if (!shouldContinue) + if (stimulusProducerCanceler.isCancellationRequestedUnlocked()) { return; } // Schedule the next timeout using the provided delay @@ -131,8 +128,8 @@ void StimulusProducer::onTimeout(const boost::system::error_code& error) return; } - sscl::SpinLock::Guard lock(shouldContinueLock); - if (!shouldContinue) + sscl::SpinLock::Guard guard(stimulusProducerCanceler.s.lock); + if (stimulusProducerCanceler.isCancellationRequestedUnlocked()) { return; } /** EXPLANATION: diff --git a/include/user/stimulusProducer.h b/include/user/stimulusProducer.h index 5d868a4..dab5cee 100644 --- a/include/user/stimulusProducer.h +++ b/include/user/stimulusProducer.h @@ -14,6 +14,7 @@ #include #include #include +#include #include "deviceAttachmentSpec.h" namespace smo { @@ -41,8 +42,7 @@ public: boost::asio::io_service& ioService_) : deviceAttachmentSpec(deviceAttachmentSpec), ioService(ioService_), - shouldContinue(false), timer(ioService), - nDeferrals(0) + timer(ioService), nDeferrals(0) {} virtual ~StimulusProducer() = default; @@ -59,7 +59,7 @@ public: std::cout << __func__ << ": Starting stimulus producer for device " << deviceAttachmentSpec->deviceSelector << std::endl; - shouldContinue = true; + stimulusProducerCanceler.startAcceptingWork(); nDeferrals = 0; scheduleNextTimeout(); } @@ -109,8 +109,7 @@ public: private: boost::asio::io_service& ioService; protected: - sscl::SpinLock shouldContinueLock; - bool shouldContinue; + sscl::SyncCancelerForAsyncWork stimulusProducerCanceler; private: boost::asio::deadline_timer timer; size_t nDeferrals; diff --git a/smocore/deviceManager/deviceReattacher.cpp b/smocore/deviceManager/deviceReattacher.cpp index d1eae41..d5eff81 100644 --- a/smocore/deviceManager/deviceReattacher.cpp +++ b/smocore/deviceManager/deviceReattacher.cpp @@ -18,8 +18,7 @@ constexpr unsigned int reattachInFlightStaleThresholdMultiplier = 4; DeviceReattacher::DeviceReattacher( DeviceManager& parent, std::shared_ptr ioThread) -: parent(parent), ioThread(ioThread), shouldContinue(false), -timer(ioThread->getIoService()) +: parent(parent), ioThread(ioThread), timer(ioThread->getIoService()) { } @@ -41,16 +40,16 @@ mrntt::MrnttNonViralPostingInvoker DeviceReattacher::reattachKnownListCReq( void DeviceReattacher::start() { - shouldContinue = true; + deviceReattacherCanceler.startAcceptingWork(); scheduleNextTimeout(); } void DeviceReattacher::stop() { { - sscl::SpinLock::Guard lock(shouldContinueLock); - shouldContinue = false; + sscl::SpinLock::Guard guard(deviceReattacherCanceler.s.lock); reattachOpInFlight = false; + deviceReattacherCanceler.s.rsrc.shouldContinue = false; /** EXPLANATION: * Do not call reattachCReqInvoker.reset() here. Forcibly destroying * the invoker would tear down an in-flight reattach coroutine frame @@ -65,7 +64,7 @@ void DeviceReattacher::stop() void DeviceReattacher::scheduleNextTimeout() { - if (!shouldContinue) { + if (deviceReattacherCanceler.isCancellationRequestedUnlocked()) { return; } @@ -88,7 +87,7 @@ void DeviceReattacher::holdReattachCReq() reattachLifetimeExceptionPtr, [this]() { - sscl::SpinLock::Guard lock(shouldContinueLock); + sscl::SpinLock::Guard guard(deviceReattacherCanceler.s.lock); reattachOpInFlight = false; })); } @@ -107,8 +106,8 @@ void DeviceReattacher::onTimeout(const boost::system::error_code& error) return; } - sscl::SpinLock::Guard lock(shouldContinueLock); - if (!shouldContinue) { + sscl::SpinLock::Guard guard(deviceReattacherCanceler.s.lock); + if (deviceReattacherCanceler.isCancellationRequestedUnlocked()) { return; } diff --git a/smocore/include/deviceManager/deviceReattacher.h b/smocore/include/deviceManager/deviceReattacher.h index 3d54d84..4750000 100644 --- a/smocore/include/deviceManager/deviceReattacher.h +++ b/smocore/include/deviceManager/deviceReattacher.h @@ -11,7 +11,7 @@ #include #include #include -#include +#include namespace smo { @@ -44,8 +44,7 @@ private: DeviceManager &parent; std::shared_ptr ioThread; - sscl::SpinLock shouldContinueLock; - bool shouldContinue; + sscl::SyncCancelerForAsyncWork deviceReattacherCanceler; boost::asio::deadline_timer timer; std::exception_ptr reattachLifetimeExceptionPtr; std::optional reattachCReqInvoker; diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp index f9d73f4..f9c1bed 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp @@ -59,7 +59,6 @@ IoUringAssemblyEngine::IoUringAssemblyEngine( frameAssemblyDesc(nullptr), ring{}, eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0), stallTimer(parent_.device->componentThread->getIoService()), -shouldAcceptRequests(false), nDgramsPerStagingBufferFrame(nDgramsPerStagingBufferFrame_), assembledSlotsTracker(nDgramsPerStagingBufferFrame_), randomDevice(), randomGenerator(randomDevice()) @@ -68,13 +67,10 @@ randomDevice(), randomGenerator(randomDevice()) bool IoUringAssemblyEngine::setup() { // Defensive check to prevent double-calling + if (!ioUringAssemblyEngnCanceler.isCancellationRequested()) { - sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); - if (shouldAcceptRequests) - { - throw std::runtime_error(std::string(__func__) + ": setup() called " - "while already set up"); - } + throw std::runtime_error(std::string(__func__) + ": setup() called " + "while already set up"); } // Get FrameAssemblyDesc from staging buffer @@ -156,7 +152,7 @@ bool IoUringAssemblyEngine::setup() if (ret < 0) { goto cleanup_eventfd; } - shouldAcceptRequests = true; + ioUringAssemblyEngnCanceler.startAcceptingWork(); return true; cleanup_eventfd: @@ -229,7 +225,7 @@ void IoUringAssemblyEngine::resetAndAssembleFrame( + ": onCqeReady callback is invalid"); } - if (!shouldAcceptRequests) + if (ioUringAssemblyEngnCanceler.isCancellationRequestedUnlocked()) { throw std::runtime_error(std::string(__func__) + ": engine is not accepting requests"); @@ -321,11 +317,7 @@ void IoUringAssemblyEngine::resetAndAssembleFrame( bool IoUringAssemblyEngine::stop() { - // Acquire and release lock tightly around setting the flag - sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); - bool wasAcceptingRequests = shouldAcceptRequests; - shouldAcceptRequests = false; - return wasAcceptingRequests; + return ioUringAssemblyEngnCanceler.requestStop(); } void IoUringAssemblyEngine::assemblyCycleComplete() @@ -444,9 +436,9 @@ public: void assembleFrameReq1_posted( std::shared_ptr context) { - sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock); + sscl::SpinLock::Guard guard(engine.ioUringAssemblyEngnCanceler.s.lock); - if (!engine.shouldAcceptRequests) + if (engine.ioUringAssemblyEngnCanceler.isCancellationRequestedUnlocked()) { context->callOriginalCallback(false, sscl::AsynchronousLoop(0)); return; @@ -498,9 +490,11 @@ public: * indeed seen a SEGFAULT even in the current code with locking, so * I'm going to hold the lock here for now. */ - sscl::SpinLock::Guard lock(context->engine.shouldAcceptRequestsLock); + sscl::SpinLock::Guard guard( + context->engine.ioUringAssemblyEngnCanceler.s.lock); - if (!context->engine.shouldAcceptRequests) + if (context->engine.ioUringAssemblyEngnCanceler + .isCancellationRequestedUnlocked()) { context->engine.assemblyCycleComplete(); context->loop.setRemainingIterationsToFailure(); @@ -518,7 +512,8 @@ public: void *user_data, int cqe_result) { // NB: The lock was acquired by onEventFdRead before calling this func - if (!context->engine.shouldAcceptRequests) + if (context->engine.ioUringAssemblyEngnCanceler + .isCancellationRequestedUnlocked()) { context->engine.assemblyCycleComplete(); context->loop.setRemainingIterationsToFailure(); @@ -549,7 +544,7 @@ public: { /** EXPLANATION: * All branch paths that invoke this unifyig oracle function are - * expected to already hold the shouldAcceptRequestsLock before calling + * expected to already hold ioUringAssemblyEngnCanceler.s.lock before calling * it. */ // Ensure we only execute once using atomic exchange @@ -638,8 +633,8 @@ void IoUringAssemblyEngine::assembleFrameReq( sscl::cps::Callback cb) { { - sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); - if (!shouldAcceptRequests) + sscl::SpinLock::Guard guard(ioUringAssemblyEngnCanceler.s.lock); + if (ioUringAssemblyEngnCanceler.isCancellationRequestedUnlocked()) { cb.callbackFn(false, sscl::AsynchronousLoop(0)); return; @@ -670,7 +665,7 @@ void IoUringAssemblyEngine::onEventfdRead( * IoUringAssemblyEngine's per-assembly state isn't destroyed while this * handler is running. */ - sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); + sscl::SpinLock::Guard guard(ioUringAssemblyEngnCanceler.s.lock); /** EXPLANATION: * You'd think we should put check for shouldAcceptRequests here and * `return` here if !shouldAcceptRequests, but we shouldn't because @@ -722,7 +717,7 @@ void IoUringAssemblyEngine::onEventfdRead( * But we do put a `return` here because we know that at this point, the * caller's callback has already been invoked. */ - if (!shouldAcceptRequests + if (ioUringAssemblyEngnCanceler.isCancellationRequestedUnlocked() || eventfdDesc == nullptr || !eventfdDesc->is_open()) { return; diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h index 82ec0ac..489997a 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h @@ -19,7 +19,7 @@ #include #include #include -#include +#include #include #include @@ -80,12 +80,7 @@ private: boost::asio::deadline_timer stallTimer; // Callback for CQE ntfns (called with user_data+result from each CQE) resetAndAssembleFrameCbFn onCqeReadyCallback; - /** EXPLANATION: - * Flag to indicate whether engine should accept new requests. - * Set by setup(), cleared by stop(). - */ - sscl::SpinLock shouldAcceptRequestsLock; - bool shouldAcceptRequests; + sscl::SyncCancelerForAsyncWork ioUringAssemblyEngnCanceler; size_t nDgramsPerStagingBufferFrame; diff --git a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp index c0661fa..a050d5d 100644 --- a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp +++ b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp @@ -39,7 +39,6 @@ clAverageIntensityBufferClBuffer(nullptr), clAssemblyBuffer(nullptr), clCollationBuffer(nullptr), clAverageIntensityBuffer(nullptr), -shouldAcceptRequests(false), compactIsRunning(false), collateIsRunning(false), currentCompactKernelEvent(nullptr), currentCollateKernelEvent(nullptr), @@ -64,13 +63,10 @@ OpenClCollatingAndMeshingEngine::~OpenClCollatingAndMeshingEngine() bool OpenClCollatingAndMeshingEngine::setup() { // Defensive check to prevent double-calling + if (!openClCollMeshEngnCanceler.isCancellationRequested()) { - sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); - if (shouldAcceptRequests) - { - throw std::runtime_error(std::string(__func__) + ": setup() called " - "while already set up"); - } + throw std::runtime_error(std::string(__func__) + ": setup() called " + "while already set up"); } if (!smoHooksPtr || !smoHooksPtr->ComputeManager_getDevice) @@ -202,13 +198,13 @@ bool OpenClCollatingAndMeshingEngine::setup() clFlush(computeDevice->commandQueue); clFinish(computeDevice->commandQueue); - shouldAcceptRequests = true; + openClCollMeshEngnCanceler.startAcceptingWork(); return true; } void OpenClCollatingAndMeshingEngine::finalize() { - // Call stop() to set shouldAcceptRequests to false and get previous state + // Call stop() to clear shouldContinue and get previous state bool wasAcceptingRequests = stop(); (void)wasAcceptingRequests; @@ -771,11 +767,7 @@ bool OpenClCollatingAndMeshingEngine::setupCollateDgramsArgs( bool OpenClCollatingAndMeshingEngine::stop() { - // Acquire and release lock tightly around setting the flag - sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); - bool wasAcceptingRequests = shouldAcceptRequests; - shouldAcceptRequests = false; - return wasAcceptingRequests; + return openClCollMeshEngnCanceler.requestStop(); } void OpenClCollatingAndMeshingEngine::compactKernelComplete(bool isFinalizing) @@ -1051,8 +1043,8 @@ public: void compactCollateAndMeshFrameReq1_doCompact_posted( std::shared_ptr context) { - sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock); - if (!engine.shouldAcceptRequests) + sscl::SpinLock::Guard guard(engine.openClCollMeshEngnCanceler.s.lock); + if (engine.openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) { callOriginalCallback(false); return; @@ -1082,8 +1074,8 @@ public: std::shared_ptr context, cl_int compactStatus) { - sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock); - if (!engine.shouldAcceptRequests) + sscl::SpinLock::Guard guard(engine.openClCollMeshEngnCanceler.s.lock); + if (engine.openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) { /** EXPLANATION: * We intentionally don't call compactKernelComplete() here because @@ -1116,15 +1108,15 @@ public: } #endif - lock.unlockPrematurely(); + guard.unlockPrematurely(); context->compactCollateAndMeshFrameReq3_doCollate_posted(context); } void compactCollateAndMeshFrameReq3_doCollate_posted( std::shared_ptr context) { - sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock); - if (!engine.shouldAcceptRequests) + sscl::SpinLock::Guard guard(engine.openClCollMeshEngnCanceler.s.lock); + if (engine.openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) { callOriginalCallback(false); return; @@ -1155,8 +1147,8 @@ public: [[maybe_unused]] std::shared_ptr context, cl_int collateStatus) { - sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock); - if (!engine.shouldAcceptRequests) + sscl::SpinLock::Guard guard(engine.openClCollMeshEngnCanceler.s.lock); + if (engine.openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) { /* We intentionally don't call collateKernelComplete() here for the * same reason as above. @@ -1256,8 +1248,8 @@ void OpenClCollatingAndMeshingEngine::compactCollateAndMeshFrameReq( sscl::cps::Callback callback) { { - sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); - if (!shouldAcceptRequests) + sscl::SpinLock::Guard guard(openClCollMeshEngnCanceler.s.lock); + if (openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) { callback.callbackFn(false, stimulusFrame); return; diff --git a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h index 3ef79c9..e65b99c 100644 --- a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h +++ b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include #include #include @@ -150,8 +150,7 @@ private: cl_mem clAverageIntensityBuffer; // State tracking - sscl::SpinLock shouldAcceptRequestsLock; - bool shouldAcceptRequests; + sscl::SyncCancelerForAsyncWork openClCollMeshEngnCanceler; bool compactIsRunning; bool collateIsRunning; cl_event currentCompactKernelEvent; diff --git a/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp b/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp index 6d74b04..e5d6075 100644 --- a/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp +++ b/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp @@ -464,8 +464,10 @@ public: void produceFrameReq1_doAssemble_posted( std::shared_ptr context) { - sscl::SpinLock::Guard lock(pcloudProducer.shouldContinueLock); - if (!pcloudProducer.shouldContinue) + sscl::SpinLock::Guard guard( + pcloudProducer.stimulusProducerCanceler.s.lock); + if (pcloudProducer.stimulusProducerCanceler + .isCancellationRequestedUnlocked()) { callOriginalCallback(); return; @@ -482,8 +484,10 @@ public: std::shared_ptr context, bool success, sscl::AsynchronousLoop loop) { - sscl::SpinLock::Guard lock(pcloudProducer.shouldContinueLock); - if (!pcloudProducer.shouldContinue) + sscl::SpinLock::Guard guard( + pcloudProducer.stimulusProducerCanceler.s.lock); + if (pcloudProducer.stimulusProducerCanceler + .isCancellationRequestedUnlocked()) { callOriginalCallback(); return; @@ -615,8 +619,10 @@ public: context->darkAmbienceStimFrame->get().lock.writeRelease(); } - sscl::SpinLock::Guard lock(pcloudProducer.shouldContinueLock); - if (!pcloudProducer.shouldContinue) + sscl::SpinLock::Guard guard( + pcloudProducer.stimulusProducerCanceler.s.lock); + if (pcloudProducer.stimulusProducerCanceler + .isCancellationRequestedUnlocked()) { callOriginalCallback(); return; @@ -626,7 +632,7 @@ public: std::cerr << __func__ << ": Failed to compact and collate frame" << std::endl; } else { - lock.unlockPrematurely(); + guard.unlockPrematurely(); if (pcloudProducer.pcloudFrameDumper.isEnabled()) { try @@ -748,7 +754,7 @@ void PcloudStimulusProducer::produceFrameReq( sscl::cps::Callback callback) { /** EXPLANATION: - * We shouldn't acquire the StimulusProducer::shouldContinueLock here because + * We shouldn't acquire stimulusProducerCanceler.s.lock here because * this function is called from * StimulusProducer::stimFrameProductionTimesliceInd(), which is already * holding the lock.