From 87a8de9a2b67ae7fc5ae28e432563d5e522ed57b Mon Sep 17 00:00:00 2001 From: Hayodea Hekol Date: Tue, 9 Jun 2026 19:47:44 -0400 Subject: [PATCH] StimProd,DevReattacher: use CDaemon nonviral nursery coro We ported these two daemons over to the new nursery mechanism and they work nicely. --- CMakeLists.txt | 10 -- .../attachmentSupport/stimulusProducer.cpp | 169 ++++++++++-------- include/config.h.in | 1 - include/user/stimulusProducer.h | 11 +- smocore/deviceManager/deviceReattacher.cpp | 46 ++++- .../livoxGen1/pcloudStimulusProducer.cpp | 23 +-- 6 files changed, 136 insertions(+), 124 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 7be9b3a..2857398 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -53,14 +53,6 @@ if(NOT STIMBUFF_FRAME_PERIOD_MS GREATER 0) "STIMBUFF_FRAME_PERIOD_MS must be a positive integer > 0") endif() -# Stimulus buffer frame retry delay configuration -set(STIMBUFF_FRAME_RETRY_DELAY_MS 1 - CACHE STRING "Stimulus buffer frame retry delay (ms)") -if(NOT STIMBUFF_FRAME_RETRY_DELAY_MS GREATER 0) - message(FATAL_ERROR - "STIMBUFF_FRAME_RETRY_DELAY_MS must be a positive integer > 0") -endif() - # World thread configuration option(WORLD_USE_BODY_THREAD "Use body thread for world component instead of separate world thread" OFF) @@ -90,8 +82,6 @@ endif() set(CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS ${DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS}) # Set the stimulus buffer frame period variable for config.h set(CONFIG_STIMBUFF_FRAME_PERIOD_MS ${STIMBUFF_FRAME_PERIOD_MS}) -# Set the stimulus buffer frame retry delay variable for config.h -set(CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS ${STIMBUFF_FRAME_RETRY_DELAY_MS}) # Configure config.h configure_file( diff --git a/commonLibs/attachmentSupport/stimulusProducer.cpp b/commonLibs/attachmentSupport/stimulusProducer.cpp index 130e40b..ec33343 100644 --- a/commonLibs/attachmentSupport/stimulusProducer.cpp +++ b/commonLibs/attachmentSupport/stimulusProducer.cpp @@ -6,13 +6,67 @@ #include #include #include -#include #include #include namespace smo { namespace stim_buff { +namespace { + +long computeTimesliceResidueMs( + long productionDurationMs, long periodMs) +{ + if (productionDurationMs >= periodMs) { + return 0; + } + return periodMs - productionDurationMs; +} + +void logProductionOverrunIfNeeded( + const char *daemonName, + long productionDurationMs, long periodMs, + size_t &nTimesliceOverruns) +{ + if (productionDurationMs <= periodMs) { + return; + } + + ++nTimesliceOverruns; + const long overrunByMs = productionDurationMs - periodMs; + std::cerr << daemonName << ": production overrun: actual=" + << productionDurationMs << "ms budget=" << periodMs + << "ms overrunBy=" << overrunByMs << "ms nOverruns=" + << nTimesliceOverruns << std::endl; +} + +long durationMsSince( + const std::chrono::high_resolution_clock::time_point &startStamp, + const std::chrono::high_resolution_clock::time_point &endStamp) +{ + const auto duration = endStamp - startStamp; + return std::chrono::duration_cast( + duration).count(); +} + +void logDaemonDurationsIfVerbose( + const char *daemonName, + long productionDurationMs, + long timesliceDurationMs, + long periodMs) +{ + if (0 && !OptionParser::getOptions().verbose) { + return; + } + + std::cerr << daemonName << ": daemon durations: production=" + << productionDurationMs << "ms timeslice=" + << timesliceDurationMs << "ms period=" << periodMs + << "ms" << std::endl; +} + +} // namespace + std::shared_ptr StimulusProducer::getAttachedStimulusBuffer( const std::shared_ptr& spec) const { @@ -91,98 +145,55 @@ sscl::co::NonViralNonPostingInvoker StimulusProducer::productionCDaemon( std::exception_ptr &, std::function, sscl::SyncCancelerForAsyncWork &canceler) { - int nextDelayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS; + const long framePeriodMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS; do { - bool shouldProduceFrame = false; - - if (!canceler.execUncancelableSegmentOrAbort([&]() - { - /** EXPLANATION: - * We need to ensure that there's only ever one stimframe being - * produced during any CONFIG_STIMBUFF_FRAME_PERIOD_MS period. To - * guarantee this, we use a spinlock. - * - * When a new frame is to be produced, the async producer will - * first acquire the frameAssemblyLimiter spinlock. This ensures - * that only one stimframe is produced during any - * CONFIG_STIMBUFF_FRAME_PERIOD_MS interval. When the next - * timeout fires, it checks if the previous stimframe has - * finished production. If the previous stimframe is still - * being produced, we will sleep for - * CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS ms before retrying. - */ - if (frameAssemblyRateLimiter.tryAcquire()) - { - nextDelayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS; - - // Check if we're ending a deferral period - if (nDeferrals > 0) - { - auto deferralEndTime = - std::chrono::high_resolution_clock::now(); - auto duration = deferralEndTime - deferralStartTime; - auto durationMs = std::chrono::duration_cast< - std::chrono::milliseconds>(duration); - - std::cout << "productionCDaemon: Deferral period ended. " - << "Total deferrals: " << nDeferrals - << ", Duration: " << durationMs.count() - << "ms" << std::endl; - - nDeferrals = 0; - } - - shouldProduceFrame = true; - } - else - { - nextDelayMs = CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS; - - ++nDeferrals; - // If this is first deferral, capture start stamp and print message - if (nDeferrals == 1) - { - deferralStartTime = - std::chrono::high_resolution_clock::now(); - std::cerr << "productionCDaemon: Deferral period " - "beginning. Configured deferral period: " - << nextDelayMs << "ms" << std::endl; - } - } - })) - { break; } - - if (shouldProduceFrame) - { - /** EXPLANATION: - * Call the derived class's frame production handler - * Note: The derived class's frame production handler (aka - * its implementation of stimFrameProductionTimesliceCInd()) must - * release the lock when frame production completes - */ - co_await stimFrameProductionTimesliceCInd(canceler); - } - else if (OptionParser::getOptions().verbose) - { - std::cerr << "productionCDaemon: Deferring frame by " - << nextDelayMs << "ms due to rate limit." << std::endl; + if (canceler.isCancellationRequested()) { + break; } - // Schedule the next timeout using the provided delay + const auto timesliceStartStamp = + std::chrono::high_resolution_clock::now(); + + const auto productionStartStamp = + std::chrono::high_resolution_clock::now(); + + co_await stimFrameProductionTimesliceCInd(canceler); + + const auto productionEndStamp = + std::chrono::high_resolution_clock::now(); + const long productionDurationMs = durationMsSince( + productionStartStamp, productionEndStamp); + + logProductionOverrunIfNeeded( + "productionCDaemon", + productionDurationMs, framePeriodMs, nTimesliceOverruns); + + const long residueMs = computeTimesliceResidueMs( + productionDurationMs, framePeriodMs); + + // Schedule the next timeout based on timeslice remaining time. const bool expiredNormally = co_await adapters::boostAsio::getDeadlineTimerAReqAwaiter( ioContext, daemonTimer, - boost::posix_time::milliseconds(nextDelayMs)); + boost::posix_time::milliseconds(residueMs)); if (!expiredNormally) { // Timer was cancelled, which is expected when stopping break; } - // FIXME: We should be able to release the start/stop lock at this point. + const auto timesliceEndStamp = + std::chrono::high_resolution_clock::now(); + const long timesliceDurationMs = durationMsSince( + timesliceStartStamp, timesliceEndStamp); + + logDaemonDurationsIfVerbose( + "productionCDaemon", + productionDurationMs, timesliceDurationMs, + framePeriodMs); } while (!canceler.isCancellationRequested()); @@ -194,7 +205,7 @@ void StimulusProducer::start() std::cout << __func__ << ": Starting stimulus producer for device " << deviceAttachmentSpec->deviceSelector << std::endl; - nDeferrals = 0; + nTimesliceOverruns = 0; taskNursery.openAdmission(); taskNursery.launch( [this](sscl::co::NonViralTaskNursery::Slot::Lease &lease) diff --git a/include/config.h.in b/include/config.h.in index 1c583bb..40413d6 100644 --- a/include/config.h.in +++ b/include/config.h.in @@ -13,7 +13,6 @@ #define CONFIG_MRNTT_DEVMGR_REATTACHER_PERIOD_MS @MRNTT_DEVMGR_REATTACHER_PERIOD_MS@ /* Stimulus buffer frame period configuration */ #define CONFIG_STIMBUFF_FRAME_PERIOD_MS @CONFIG_STIMBUFF_FRAME_PERIOD_MS@ -#define CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS @CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS@ /* World thread configuration */ #cmakedefine CONFIG_WORLD_USE_BODY_THREAD diff --git a/include/user/stimulusProducer.h b/include/user/stimulusProducer.h index 9b1078b..14ebe65 100644 --- a/include/user/stimulusProducer.h +++ b/include/user/stimulusProducer.h @@ -12,7 +12,6 @@ #include #include #include -#include #include #include "deviceAttachmentSpec.h" @@ -39,7 +38,7 @@ public: const std::shared_ptr &deviceAttachmentSpec, boost::asio::io_context& ioContext_) - : daemonTimer(ioContext_), nDeferrals(0), + : daemonTimer(ioContext_), deviceAttachmentSpec(deviceAttachmentSpec), ioContext(ioContext_) {} @@ -54,9 +53,6 @@ public: virtual void start(); virtual void stop(); - void allowNextStimulusFrame() - { frameAssemblyRateLimiter.release(); } - virtual std::shared_ptr getAttachedStimulusBuffer( const std::shared_ptr& spec) const; @@ -77,8 +73,6 @@ public: bool hasBufferWithQualeIfaceApi(const std::string& qualeIfaceApi) const; protected: - sscl::SpinLock frameAssemblyRateLimiter; - // Virtual functions for derived classes to override virtual int getStopDelayMs() const { @@ -96,8 +90,7 @@ protected: sscl::co::NonViralTaskNursery taskNursery; boost::asio::deadline_timer daemonTimer; - size_t nDeferrals; - std::chrono::high_resolution_clock::time_point deferralStartTime; + size_t nTimesliceOverruns = 0; public: std::shared_ptr deviceAttachmentSpec; diff --git a/smocore/deviceManager/deviceReattacher.cpp b/smocore/deviceManager/deviceReattacher.cpp index ea2a7be..73c8294 100644 --- a/smocore/deviceManager/deviceReattacher.cpp +++ b/smocore/deviceManager/deviceReattacher.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -10,6 +11,28 @@ namespace smo { namespace device { +namespace { + +long computeTimesliceResidueMs( + long workDurationMs, long periodMs) +{ + if (workDurationMs >= periodMs) { + return 0; + } + return periodMs - workDurationMs; +} + +long durationMsSince( + const std::chrono::high_resolution_clock::time_point &startStamp, + const std::chrono::high_resolution_clock::time_point &endStamp) +{ + const auto duration = endStamp - startStamp; + return std::chrono::duration_cast( + duration).count(); +} + +} // namespace + DeviceReattacher::DeviceReattacher( DeviceManager& parent, std::shared_ptr ioThread) : parent(parent), @@ -19,6 +42,8 @@ ioThread(ioThread), daemonTimer(ioThread->getIoContext()) * deviceReattacherCDaemon is a dynamic posting non-viral coroutine: start() * passes ExplicitPostTarget{ioThread->getIoContext()} so the daemon body * always runs on ioThread. daemonTimer is reused each loop iteration. + * Each timeslice runs attach work first, then sleeps only the period + * residue so reattach polls stay on a wall-clock cadence. */ } @@ -31,20 +56,31 @@ DeviceReattacher::deviceReattacherCDaemon( { boost::asio::io_context &timerIoContext = sscl::ComponentThread::getSelf()->getIoContext(); - const auto periodMs = boost::posix_time::milliseconds( - CONFIG_MRNTT_DEVMGR_REATTACHER_PERIOD_MS); + const long reattacherPeriodMs = CONFIG_MRNTT_DEVMGR_REATTACHER_PERIOD_MS; while (!canceler.isCancellationRequested()) { + const auto workStartStamp = + std::chrono::high_resolution_clock::now(); + + co_await parent.attachAllUnattachedDevicesFromKnownListCReq(); + + const auto workEndStamp = + std::chrono::high_resolution_clock::now(); + const long workDurationMs = durationMsSince( + workStartStamp, workEndStamp); + + const long residueMs = computeTimesliceResidueMs( + workDurationMs, reattacherPeriodMs); + const bool expiredNormally = co_await adapters::boostAsio::getDeadlineTimerAReqAwaiter( - timerIoContext, daemonTimer, periodMs); + timerIoContext, daemonTimer, + boost::posix_time::milliseconds(residueMs)); if (!expiredNormally) { break; } - - co_await parent.attachAllUnattachedDevicesFromKnownListCReq(); } co_return; diff --git a/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp b/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp index b86fd44..5bf52f8 100644 --- a/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp +++ b/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp @@ -428,28 +428,16 @@ PcloudStimulusProducer::getOrCreateAttachedStimulusBuffer( } } -struct AllowNextStimulusFrameGuard -{ - PcloudStimulusProducer& producer; - - explicit AllowNextStimulusFrameGuard(PcloudStimulusProducer& _producer) - : producer(_producer) - {} - - ~AllowNextStimulusFrameGuard() - { producer.allowNextStimulusFrame(); } -}; - /** EXPLANATION: * productionCDaemon co_awaits this viral timeslice and passes its - * nursery-slot canceler. Cooperative shutdown uses that canceler between - * co_await steps; do not use execUncancelableSegmentOrAbort here. + * nursery-slot canceler. Cadence (timeslice stamp, residue sleep, overrun + * detection) is owned by StimulusProducer::productionCDaemon. Cooperative + * shutdown uses the canceler between co_await steps. */ sscl::co::ViralNonPostingInvoker PcloudStimulusProducer::stimFrameProductionTimesliceCInd( sscl::SyncCancelerForAsyncWork &canceler) { - AllowNextStimulusFrameGuard allowNextGuard(*this); StimulusFrame& stimulusFrame = tempStimulusFrame; sscl::AsynchronousLoop frameAssemblyResult(0); std::optional> intensityStimFrame; @@ -459,11 +447,6 @@ PcloudStimulusProducer::stimFrameProductionTimesliceCInd( auto& resumeIoContext = device->componentThread->getIoContext(); // produceFrameReq1_doAssemble_posted - /** EXPLANATION: - * productionCDaemon co_awaits this timeslice outside its uncancelable - * rate-limiter segment; use isCancellationRequested() here, not - * execUncancelableSegmentOrAbort. - */ if (canceler.isCancellationRequested()) { co_return; }