#include #include #include #include #include #include #include #include #include #include #include namespace smo { namespace stim_buff { std::shared_ptr StimulusProducer::getAttachedStimulusBuffer( const std::shared_ptr& spec) const { for (const auto& buffer : attachedStimulusBuffers) { if (buffer && buffer->deviceAttachmentSpec && *buffer->deviceAttachmentSpec == *spec) { return buffer; } } return nullptr; } bool StimulusProducer::hasBufferWithQualeIfaceApi( const std::string& qualeIfaceApi) const { for (const auto& buffer : attachedStimulusBuffers) { if (!buffer || !buffer->deviceAttachmentSpec) { throw std::runtime_error( "StimulusProducer::hasBufferWithQualeIfaceApi: encountered " "null buffer or null deviceAttachmentSpec in " "attachedStimulusBuffers (should never happen)"); } if (buffer->deviceAttachmentSpec->qualeIfaceApi != qualeIfaceApi) { continue; } return true; } return false; } bool StimulusProducer::addAttachedStimulusBufferIfNotExists( const std::shared_ptr& buffer) { if (!buffer) { return false; } auto it = std::find_if( attachedStimulusBuffers.begin(), attachedStimulusBuffers.end(), [&](const std::shared_ptr& buf) { return buf && buffer && buf->deviceAttachmentSpec && buffer->deviceAttachmentSpec && *(buf->deviceAttachmentSpec) == *(buffer->deviceAttachmentSpec); }); if (it != attachedStimulusBuffers.end()) { return false; } attachedStimulusBuffers.push_back(buffer); return true; } void StimulusProducer::destroyAttachedStimulusBuffer( const std::shared_ptr& buffer) { if (!buffer) { return; } auto it = std::find( attachedStimulusBuffers.begin(), attachedStimulusBuffers.end(), buffer); if (it != attachedStimulusBuffers.end()) { attachedStimulusBuffers.erase(it); } } sscl::co::NonViralNonPostingInvoker StimulusProducer::productionCDaemon( std::exception_ptr &, std::function, sscl::SyncCancelerForAsyncWork &canceler) { int nextDelayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS; do { bool shouldProduceFrame = false; if (!canceler.execUncancelableSegmentOrAbort([&]() { /** EXPLANATION: * We need to ensure that there's only ever one stimframe being * produced during any CONFIG_STIMBUFF_FRAME_PERIOD_MS period. To * guarantee this, we use a spinlock. * * When a new frame is to be produced, the async producer will * first acquire the frameAssemblyLimiter spinlock. This ensures * that only one stimframe is produced during any * CONFIG_STIMBUFF_FRAME_PERIOD_MS interval. When the next * timeout fires, it checks if the previous stimframe has * finished production. If the previous stimframe is still * being produced, we will sleep for * CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS ms before retrying. */ if (frameAssemblyRateLimiter.tryAcquire()) { nextDelayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS; // Check if we're ending a deferral period if (nDeferrals > 0) { auto deferralEndTime = std::chrono::high_resolution_clock::now(); auto duration = deferralEndTime - deferralStartTime; auto durationMs = std::chrono::duration_cast< std::chrono::milliseconds>(duration); std::cout << "productionCDaemon: Deferral period ended. " << "Total deferrals: " << nDeferrals << ", Duration: " << durationMs.count() << "ms" << std::endl; nDeferrals = 0; } shouldProduceFrame = true; } else { nextDelayMs = CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS; ++nDeferrals; // If this is first deferral, capture start stamp and print message if (nDeferrals == 1) { deferralStartTime = std::chrono::high_resolution_clock::now(); std::cerr << "productionCDaemon: Deferral period " "beginning. Configured deferral period: " << nextDelayMs << "ms" << std::endl; } } })) { break; } if (shouldProduceFrame) { /** EXPLANATION: * Call the derived class's frame production handler * Note: The derived class's frame production handler (aka * its implementation of stimFrameProductionTimesliceCInd()) must * release the lock when frame production completes */ co_await stimFrameProductionTimesliceCInd(canceler); } else if (OptionParser::getOptions().verbose) { std::cerr << "productionCDaemon: Deferring frame by " << nextDelayMs << "ms due to rate limit." << std::endl; } // Schedule the next timeout using the provided delay const bool expiredNormally = co_await adapters::boostAsio::getDeadlineTimerAReqAwaiter( ioContext, daemonTimer, boost::posix_time::milliseconds(nextDelayMs)); if (!expiredNormally) { // Timer was cancelled, which is expected when stopping break; } // FIXME: We should be able to release the start/stop lock at this point. } while (!canceler.isCancellationRequested()); co_return; } void StimulusProducer::start() { std::cout << __func__ << ": Starting stimulus producer for device " << deviceAttachmentSpec->deviceSelector << std::endl; nDeferrals = 0; taskNursery.openAdmission(); taskNursery.launch( [this](sscl::co::NonViralTaskNursery::Slot::Lease &lease) { return productionCDaemon( lease.getExceptionStorage(), lease.getCallerLambda(), lease.getSyncCanceler()); }); } void StimulusProducer::stop() { // Cancel timer immediately daemonTimer.cancel(); taskNursery.requestCancelOnAll(); taskNursery.closeAdmission(); taskNursery.syncAwaitAllSettlements( sscl::ComponentThread::getSelf()->getIoContext()); std::cout << __func__ << ": Stopped stimulus producer for device " << deviceAttachmentSpec->deviceSelector << std::endl; } } // namespace stim_buff } // namespace smo