#include #include #include #include #include #include #include #include #include #include #include #include #include namespace smo { namespace stim_buff { std::shared_ptr StimulusProducer::getAttachedStimulusBuffer( const std::shared_ptr& spec) const { for (const auto& buffer : attachedStimulusBuffers) { if (buffer && buffer->deviceAttachmentSpec && *buffer->deviceAttachmentSpec == *spec) { return buffer; } } return nullptr; } bool StimulusProducer::hasBufferWithQualeIfaceApi( const std::string& qualeIfaceApi) const { for (const auto& buffer : attachedStimulusBuffers) { if (!buffer || !buffer->deviceAttachmentSpec) { throw std::runtime_error( "StimulusProducer::hasBufferWithQualeIfaceApi: encountered " "null buffer or null deviceAttachmentSpec in " "attachedStimulusBuffers (should never happen)"); } if (buffer->deviceAttachmentSpec->qualeIfaceApi != qualeIfaceApi) { continue; } return true; } return false; } bool StimulusProducer::addAttachedStimulusBufferIfNotExists( const std::shared_ptr& buffer) { if (!buffer) { return false; } auto it = std::find_if( attachedStimulusBuffers.begin(), attachedStimulusBuffers.end(), [&](const std::shared_ptr& buf) { return buf && buffer && buf->deviceAttachmentSpec && buffer->deviceAttachmentSpec && *(buf->deviceAttachmentSpec) == *(buffer->deviceAttachmentSpec); }); if (it != attachedStimulusBuffers.end()) { return false; } attachedStimulusBuffers.push_back(buffer); return true; } void StimulusProducer::destroyAttachedStimulusBuffer( const std::shared_ptr& buffer) { if (!buffer) { return; } auto it = std::find( attachedStimulusBuffers.begin(), attachedStimulusBuffers.end(), buffer); if (it != attachedStimulusBuffers.end()) { attachedStimulusBuffers.erase(it); } } void StimulusProducer::stop() { { SpinLock::Guard lock(shouldContinueLock); shouldContinue = false; } // Cancel timer immediately timer.cancel(); std::cout << __func__ << ": Stopped stimulus producer for device " << deviceAttachmentSpec->deviceSelector << std::endl; } void StimulusProducer::scheduleNextTimeout(int delayMs) { if (!shouldContinue) { return; } // Schedule the next timeout using the provided delay timer.expires_from_now( boost::posix_time::milliseconds(delayMs)); timer.async_wait( std::bind( &StimulusProducer::onTimeout, this, std::placeholders::_1)); } void StimulusProducer::onTimeout(const boost::system::error_code& error) { // Timer was cancelled, which is expected when stopping if (error == boost::asio::error::operation_aborted) { return; } if (error) { std::cerr << "StimulusProducer: Timer error: " << error.message() << std::endl; return; } SpinLock::Guard lock(shouldContinueLock); if (!shouldContinue) { return; } /** EXPLANATION: * We need to ensure that there's only ever one stimframe being produced * during any CONFIG_STIMBUFF_FRAME_PERIOD_MS period. To guarantee this, we * use a spinlock. * * When a new frame is to be produced, the async producer will first acquire * the frameAssemblyLimiter spinlock. This way, when the next timeout is * fired it can check whether its predecessor stimframe has finished being * produced. If the preceding stimframe is still being produced, then we'll * sleep for CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS ms before trying again. */ int nextWakeupDelayMs; bool deferred = false; if (frameAssemblyRateLimiter.tryAcquire()) { nextWakeupDelayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS; // Check if we're ending a deferral period if (nDeferrals > 0) { auto deferralEndTime = std::chrono::high_resolution_clock::now(); auto duration = deferralEndTime - deferralStartTime; auto durationMs = std::chrono::duration_cast< std::chrono::milliseconds>(duration); std::cout << __func__ << ": Deferral period ended. " << "Total deferrals: " << nDeferrals << ", Duration: " << durationMs.count() << "ms" << std::endl; nDeferrals = 0; } /** EXPLANATION: * Call the derived class's frame production handler * Note: The derived class's frame production handler (aka * its implementation of stimFrameProductionTimesliceInd()) must * release the lock when frame production completes */ stimFrameProductionTimesliceInd(); } else { nextWakeupDelayMs = CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS; deferred = true; ++nDeferrals; // If this is first deferral, capture start stamp and print message if (nDeferrals == 1) { deferralStartTime = std::chrono::high_resolution_clock::now(); std::cerr << __func__ << ": Deferral period beginning. " "Configured deferral period: " << nextWakeupDelayMs << "ms" << std::endl; } } scheduleNextTimeout(nextWakeupDelayMs); // FIXME: We should be able to release the start/stop lock at this point. if (deferred && OptionParser::getOptions().verbose) { std::cerr << __func__ << ": Deferring frame by " << nextWakeupDelayMs << "ms due to rate limit." << std::endl; } } } // namespace stim_buff } // namespace smo