StimProd,DevReattacher: use CDaemon nonviral nursery coro

We ported these two daemons over to the new nursery mechanism and
they work nicely.
This commit is contained in:
2026-06-09 19:47:44 -04:00
parent 165c846700
commit 87a8de9a2b
6 changed files with 136 additions and 124 deletions
-10
View File
@@ -53,14 +53,6 @@ if(NOT STIMBUFF_FRAME_PERIOD_MS GREATER 0)
"STIMBUFF_FRAME_PERIOD_MS must be a positive integer > 0") "STIMBUFF_FRAME_PERIOD_MS must be a positive integer > 0")
endif() endif()
# Stimulus buffer frame retry delay configuration
set(STIMBUFF_FRAME_RETRY_DELAY_MS 1
CACHE STRING "Stimulus buffer frame retry delay (ms)")
if(NOT STIMBUFF_FRAME_RETRY_DELAY_MS GREATER 0)
message(FATAL_ERROR
"STIMBUFF_FRAME_RETRY_DELAY_MS must be a positive integer > 0")
endif()
# World thread configuration # World thread configuration
option(WORLD_USE_BODY_THREAD option(WORLD_USE_BODY_THREAD
"Use body thread for world component instead of separate world thread" OFF) "Use body thread for world component instead of separate world thread" OFF)
@@ -90,8 +82,6 @@ endif()
set(CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS ${DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS}) set(CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS ${DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS})
# Set the stimulus buffer frame period variable for config.h # Set the stimulus buffer frame period variable for config.h
set(CONFIG_STIMBUFF_FRAME_PERIOD_MS ${STIMBUFF_FRAME_PERIOD_MS}) set(CONFIG_STIMBUFF_FRAME_PERIOD_MS ${STIMBUFF_FRAME_PERIOD_MS})
# Set the stimulus buffer frame retry delay variable for config.h
set(CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS ${STIMBUFF_FRAME_RETRY_DELAY_MS})
# Configure config.h # Configure config.h
configure_file( configure_file(
@@ -6,13 +6,67 @@
#include <opts.h> #include <opts.h>
#include <componentThread.h> #include <componentThread.h>
#include <adapters/boostAsio/deadlineTimerAReq.h> #include <adapters/boostAsio/deadlineTimerAReq.h>
#include <spinscale/spinLock.h>
#include <user/stimulusProducer.h> #include <user/stimulusProducer.h>
#include <user/stimulusBuffer.h> #include <user/stimulusBuffer.h>
namespace smo { namespace smo {
namespace stim_buff { namespace stim_buff {
namespace {
long computeTimesliceResidueMs(
long productionDurationMs, long periodMs)
{
if (productionDurationMs >= periodMs) {
return 0;
}
return periodMs - productionDurationMs;
}
void logProductionOverrunIfNeeded(
const char *daemonName,
long productionDurationMs, long periodMs,
size_t &nTimesliceOverruns)
{
if (productionDurationMs <= periodMs) {
return;
}
++nTimesliceOverruns;
const long overrunByMs = productionDurationMs - periodMs;
std::cerr << daemonName << ": production overrun: actual="
<< productionDurationMs << "ms budget=" << periodMs
<< "ms overrunBy=" << overrunByMs << "ms nOverruns="
<< nTimesliceOverruns << std::endl;
}
long durationMsSince(
const std::chrono::high_resolution_clock::time_point &startStamp,
const std::chrono::high_resolution_clock::time_point &endStamp)
{
const auto duration = endStamp - startStamp;
return std::chrono::duration_cast<std::chrono::milliseconds>(
duration).count();
}
void logDaemonDurationsIfVerbose(
const char *daemonName,
long productionDurationMs,
long timesliceDurationMs,
long periodMs)
{
if (0 && !OptionParser::getOptions().verbose) {
return;
}
std::cerr << daemonName << ": daemon durations: production="
<< productionDurationMs << "ms timeslice="
<< timesliceDurationMs << "ms period=" << periodMs
<< "ms" << std::endl;
}
} // namespace
std::shared_ptr<StimulusBuffer> StimulusProducer::getAttachedStimulusBuffer( std::shared_ptr<StimulusBuffer> StimulusProducer::getAttachedStimulusBuffer(
const std::shared_ptr<device::DeviceAttachmentSpec>& spec) const const std::shared_ptr<device::DeviceAttachmentSpec>& spec) const
{ {
@@ -91,98 +145,55 @@ sscl::co::NonViralNonPostingInvoker StimulusProducer::productionCDaemon(
std::exception_ptr &, std::function<void()>, std::exception_ptr &, std::function<void()>,
sscl::SyncCancelerForAsyncWork &canceler) sscl::SyncCancelerForAsyncWork &canceler)
{ {
int nextDelayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS; const long framePeriodMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS;
do do
{ {
bool shouldProduceFrame = false; if (canceler.isCancellationRequested()) {
break;
if (!canceler.execUncancelableSegmentOrAbort([&]()
{
/** EXPLANATION:
* We need to ensure that there's only ever one stimframe being
* produced during any CONFIG_STIMBUFF_FRAME_PERIOD_MS period. To
* guarantee this, we use a spinlock.
*
* When a new frame is to be produced, the async producer will
* first acquire the frameAssemblyLimiter spinlock. This ensures
* that only one stimframe is produced during any
* CONFIG_STIMBUFF_FRAME_PERIOD_MS interval. When the next
* timeout fires, it checks if the previous stimframe has
* finished production. If the previous stimframe is still
* being produced, we will sleep for
* CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS ms before retrying.
*/
if (frameAssemblyRateLimiter.tryAcquire())
{
nextDelayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS;
// Check if we're ending a deferral period
if (nDeferrals > 0)
{
auto deferralEndTime =
std::chrono::high_resolution_clock::now();
auto duration = deferralEndTime - deferralStartTime;
auto durationMs = std::chrono::duration_cast<
std::chrono::milliseconds>(duration);
std::cout << "productionCDaemon: Deferral period ended. "
<< "Total deferrals: " << nDeferrals
<< ", Duration: " << durationMs.count()
<< "ms" << std::endl;
nDeferrals = 0;
}
shouldProduceFrame = true;
}
else
{
nextDelayMs = CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS;
++nDeferrals;
// If this is first deferral, capture start stamp and print message
if (nDeferrals == 1)
{
deferralStartTime =
std::chrono::high_resolution_clock::now();
std::cerr << "productionCDaemon: Deferral period "
"beginning. Configured deferral period: "
<< nextDelayMs << "ms" << std::endl;
}
}
}))
{ break; }
if (shouldProduceFrame)
{
/** EXPLANATION:
* Call the derived class's frame production handler
* Note: The derived class's frame production handler (aka
* its implementation of stimFrameProductionTimesliceCInd()) must
* release the lock when frame production completes
*/
co_await stimFrameProductionTimesliceCInd(canceler);
}
else if (OptionParser::getOptions().verbose)
{
std::cerr << "productionCDaemon: Deferring frame by "
<< nextDelayMs << "ms due to rate limit." << std::endl;
} }
// Schedule the next timeout using the provided delay const auto timesliceStartStamp =
std::chrono::high_resolution_clock::now();
const auto productionStartStamp =
std::chrono::high_resolution_clock::now();
co_await stimFrameProductionTimesliceCInd(canceler);
const auto productionEndStamp =
std::chrono::high_resolution_clock::now();
const long productionDurationMs = durationMsSince(
productionStartStamp, productionEndStamp);
logProductionOverrunIfNeeded(
"productionCDaemon",
productionDurationMs, framePeriodMs, nTimesliceOverruns);
const long residueMs = computeTimesliceResidueMs(
productionDurationMs, framePeriodMs);
// Schedule the next timeout based on timeslice remaining time.
const bool expiredNormally = co_await const bool expiredNormally = co_await
adapters::boostAsio::getDeadlineTimerAReqAwaiter( adapters::boostAsio::getDeadlineTimerAReqAwaiter(
ioContext, ioContext,
daemonTimer, daemonTimer,
boost::posix_time::milliseconds(nextDelayMs)); boost::posix_time::milliseconds(residueMs));
if (!expiredNormally) { if (!expiredNormally) {
// Timer was cancelled, which is expected when stopping // Timer was cancelled, which is expected when stopping
break; break;
} }
// FIXME: We should be able to release the start/stop lock at this point. const auto timesliceEndStamp =
std::chrono::high_resolution_clock::now();
const long timesliceDurationMs = durationMsSince(
timesliceStartStamp, timesliceEndStamp);
logDaemonDurationsIfVerbose(
"productionCDaemon",
productionDurationMs, timesliceDurationMs,
framePeriodMs);
} while (!canceler.isCancellationRequested()); } while (!canceler.isCancellationRequested());
@@ -194,7 +205,7 @@ void StimulusProducer::start()
std::cout << __func__ << ": Starting stimulus producer for device " std::cout << __func__ << ": Starting stimulus producer for device "
<< deviceAttachmentSpec->deviceSelector << std::endl; << deviceAttachmentSpec->deviceSelector << std::endl;
nDeferrals = 0; nTimesliceOverruns = 0;
taskNursery.openAdmission(); taskNursery.openAdmission();
taskNursery.launch( taskNursery.launch(
[this](sscl::co::NonViralTaskNursery::Slot::Lease &lease) [this](sscl::co::NonViralTaskNursery::Slot::Lease &lease)
-1
View File
@@ -13,7 +13,6 @@
#define CONFIG_MRNTT_DEVMGR_REATTACHER_PERIOD_MS @MRNTT_DEVMGR_REATTACHER_PERIOD_MS@ #define CONFIG_MRNTT_DEVMGR_REATTACHER_PERIOD_MS @MRNTT_DEVMGR_REATTACHER_PERIOD_MS@
/* Stimulus buffer frame period configuration */ /* Stimulus buffer frame period configuration */
#define CONFIG_STIMBUFF_FRAME_PERIOD_MS @CONFIG_STIMBUFF_FRAME_PERIOD_MS@ #define CONFIG_STIMBUFF_FRAME_PERIOD_MS @CONFIG_STIMBUFF_FRAME_PERIOD_MS@
#define CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS @CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS@
/* World thread configuration */ /* World thread configuration */
#cmakedefine CONFIG_WORLD_USE_BODY_THREAD #cmakedefine CONFIG_WORLD_USE_BODY_THREAD
+2 -9
View File
@@ -12,7 +12,6 @@
#include <boost/asio/deadline_timer.hpp> #include <boost/asio/deadline_timer.hpp>
#include <spinscale/co/invokers.h> #include <spinscale/co/invokers.h>
#include <spinscale/co/nonViralTaskNursery.h> #include <spinscale/co/nonViralTaskNursery.h>
#include <spinscale/spinLock.h>
#include <spinscale/syncCancelerForAsyncWork.h> #include <spinscale/syncCancelerForAsyncWork.h>
#include "deviceAttachmentSpec.h" #include "deviceAttachmentSpec.h"
@@ -39,7 +38,7 @@ public:
const std::shared_ptr<device::DeviceAttachmentSpec> const std::shared_ptr<device::DeviceAttachmentSpec>
&deviceAttachmentSpec, &deviceAttachmentSpec,
boost::asio::io_context& ioContext_) boost::asio::io_context& ioContext_)
: daemonTimer(ioContext_), nDeferrals(0), : daemonTimer(ioContext_),
deviceAttachmentSpec(deviceAttachmentSpec), ioContext(ioContext_) deviceAttachmentSpec(deviceAttachmentSpec), ioContext(ioContext_)
{} {}
@@ -54,9 +53,6 @@ public:
virtual void start(); virtual void start();
virtual void stop(); virtual void stop();
void allowNextStimulusFrame()
{ frameAssemblyRateLimiter.release(); }
virtual std::shared_ptr<StimulusBuffer> getAttachedStimulusBuffer( virtual std::shared_ptr<StimulusBuffer> getAttachedStimulusBuffer(
const std::shared_ptr<device::DeviceAttachmentSpec>& spec) const; const std::shared_ptr<device::DeviceAttachmentSpec>& spec) const;
@@ -77,8 +73,6 @@ public:
bool hasBufferWithQualeIfaceApi(const std::string& qualeIfaceApi) const; bool hasBufferWithQualeIfaceApi(const std::string& qualeIfaceApi) const;
protected: protected:
sscl::SpinLock frameAssemblyRateLimiter;
// Virtual functions for derived classes to override // Virtual functions for derived classes to override
virtual int getStopDelayMs() const virtual int getStopDelayMs() const
{ {
@@ -96,8 +90,7 @@ protected:
sscl::co::NonViralTaskNursery taskNursery; sscl::co::NonViralTaskNursery taskNursery;
boost::asio::deadline_timer daemonTimer; boost::asio::deadline_timer daemonTimer;
size_t nDeferrals; size_t nTimesliceOverruns = 0;
std::chrono::high_resolution_clock::time_point deferralStartTime;
public: public:
std::shared_ptr<device::DeviceAttachmentSpec> deviceAttachmentSpec; std::shared_ptr<device::DeviceAttachmentSpec> deviceAttachmentSpec;
+41 -5
View File
@@ -1,5 +1,6 @@
#include <config.h> #include <config.h>
#include <iostream> #include <iostream>
#include <chrono>
#include <functional> #include <functional>
#include <componentThread.h> #include <componentThread.h>
#include <adapters/boostAsio/deadlineTimerAReq.h> #include <adapters/boostAsio/deadlineTimerAReq.h>
@@ -10,6 +11,28 @@
namespace smo { namespace smo {
namespace device { namespace device {
namespace {
long computeTimesliceResidueMs(
long workDurationMs, long periodMs)
{
if (workDurationMs >= periodMs) {
return 0;
}
return periodMs - workDurationMs;
}
long durationMsSince(
const std::chrono::high_resolution_clock::time_point &startStamp,
const std::chrono::high_resolution_clock::time_point &endStamp)
{
const auto duration = endStamp - startStamp;
return std::chrono::duration_cast<std::chrono::milliseconds>(
duration).count();
}
} // namespace
DeviceReattacher::DeviceReattacher( DeviceReattacher::DeviceReattacher(
DeviceManager& parent, std::shared_ptr<sscl::ComponentThread> ioThread) DeviceManager& parent, std::shared_ptr<sscl::ComponentThread> ioThread)
: parent(parent), : parent(parent),
@@ -19,6 +42,8 @@ ioThread(ioThread), daemonTimer(ioThread->getIoContext())
* deviceReattacherCDaemon is a dynamic posting non-viral coroutine: start() * deviceReattacherCDaemon is a dynamic posting non-viral coroutine: start()
* passes ExplicitPostTarget{ioThread->getIoContext()} so the daemon body * passes ExplicitPostTarget{ioThread->getIoContext()} so the daemon body
* always runs on ioThread. daemonTimer is reused each loop iteration. * always runs on ioThread. daemonTimer is reused each loop iteration.
* Each timeslice runs attach work first, then sleeps only the period
* residue so reattach polls stay on a wall-clock cadence.
*/ */
} }
@@ -31,20 +56,31 @@ DeviceReattacher::deviceReattacherCDaemon(
{ {
boost::asio::io_context &timerIoContext = boost::asio::io_context &timerIoContext =
sscl::ComponentThread::getSelf()->getIoContext(); sscl::ComponentThread::getSelf()->getIoContext();
const auto periodMs = boost::posix_time::milliseconds( const long reattacherPeriodMs = CONFIG_MRNTT_DEVMGR_REATTACHER_PERIOD_MS;
CONFIG_MRNTT_DEVMGR_REATTACHER_PERIOD_MS);
while (!canceler.isCancellationRequested()) while (!canceler.isCancellationRequested())
{ {
const auto workStartStamp =
std::chrono::high_resolution_clock::now();
co_await parent.attachAllUnattachedDevicesFromKnownListCReq();
const auto workEndStamp =
std::chrono::high_resolution_clock::now();
const long workDurationMs = durationMsSince(
workStartStamp, workEndStamp);
const long residueMs = computeTimesliceResidueMs(
workDurationMs, reattacherPeriodMs);
const bool expiredNormally = co_await const bool expiredNormally = co_await
adapters::boostAsio::getDeadlineTimerAReqAwaiter( adapters::boostAsio::getDeadlineTimerAReqAwaiter(
timerIoContext, daemonTimer, periodMs); timerIoContext, daemonTimer,
boost::posix_time::milliseconds(residueMs));
if (!expiredNormally) { if (!expiredNormally) {
break; break;
} }
co_await parent.attachAllUnattachedDevicesFromKnownListCReq();
} }
co_return; co_return;
@@ -428,28 +428,16 @@ PcloudStimulusProducer::getOrCreateAttachedStimulusBuffer(
} }
} }
struct AllowNextStimulusFrameGuard
{
PcloudStimulusProducer& producer;
explicit AllowNextStimulusFrameGuard(PcloudStimulusProducer& _producer)
: producer(_producer)
{}
~AllowNextStimulusFrameGuard()
{ producer.allowNextStimulusFrame(); }
};
/** EXPLANATION: /** EXPLANATION:
* productionCDaemon co_awaits this viral timeslice and passes its * productionCDaemon co_awaits this viral timeslice and passes its
* nursery-slot canceler. Cooperative shutdown uses that canceler between * nursery-slot canceler. Cadence (timeslice stamp, residue sleep, overrun
* co_await steps; do not use execUncancelableSegmentOrAbort here. * detection) is owned by StimulusProducer::productionCDaemon. Cooperative
* shutdown uses the canceler between co_await steps.
*/ */
sscl::co::ViralNonPostingInvoker<void> sscl::co::ViralNonPostingInvoker<void>
PcloudStimulusProducer::stimFrameProductionTimesliceCInd( PcloudStimulusProducer::stimFrameProductionTimesliceCInd(
sscl::SyncCancelerForAsyncWork &canceler) sscl::SyncCancelerForAsyncWork &canceler)
{ {
AllowNextStimulusFrameGuard allowNextGuard(*this);
StimulusFrame& stimulusFrame = tempStimulusFrame; StimulusFrame& stimulusFrame = tempStimulusFrame;
sscl::AsynchronousLoop frameAssemblyResult(0); sscl::AsynchronousLoop frameAssemblyResult(0);
std::optional<std::reference_wrapper<StimulusFrame>> intensityStimFrame; std::optional<std::reference_wrapper<StimulusFrame>> intensityStimFrame;
@@ -459,11 +447,6 @@ PcloudStimulusProducer::stimFrameProductionTimesliceCInd(
auto& resumeIoContext = device->componentThread->getIoContext(); auto& resumeIoContext = device->componentThread->getIoContext();
// produceFrameReq1_doAssemble_posted // produceFrameReq1_doAssemble_posted
/** EXPLANATION:
* productionCDaemon co_awaits this timeslice outside its uncancelable
* rate-limiter segment; use isCancellationRequested() here, not
* execUncancelableSegmentOrAbort.
*/
if (canceler.isCancellationRequested()) if (canceler.isCancellationRequested())
{ co_return; } { co_return; }