91fc655b25
StimulusProducer: syncAwaitAllSettlements should pump caller io_context
224 lines
5.9 KiB
C++
224 lines
5.9 KiB
C++
#include <config.h>
|
|
#include <iostream>
|
|
#include <chrono>
|
|
#include <algorithm>
|
|
#include <boost/asio/io_context.hpp>
|
|
#include <opts.h>
|
|
#include <componentThread.h>
|
|
#include <adapters/boostAsio/deadlineTimerAReq.h>
|
|
#include <spinscale/spinLock.h>
|
|
#include <user/stimulusProducer.h>
|
|
#include <user/stimulusBuffer.h>
|
|
|
|
namespace smo {
|
|
namespace stim_buff {
|
|
|
|
std::shared_ptr<StimulusBuffer> StimulusProducer::getAttachedStimulusBuffer(
|
|
const std::shared_ptr<device::DeviceAttachmentSpec>& spec) const
|
|
{
|
|
for (const auto& buffer : attachedStimulusBuffers)
|
|
{
|
|
if (buffer && buffer->deviceAttachmentSpec &&
|
|
*buffer->deviceAttachmentSpec == *spec)
|
|
{
|
|
return buffer;
|
|
}
|
|
}
|
|
|
|
return nullptr;
|
|
}
|
|
|
|
bool StimulusProducer::hasBufferWithQualeIfaceApi(
|
|
const std::string& qualeIfaceApi) const
|
|
{
|
|
for (const auto& buffer : attachedStimulusBuffers)
|
|
{
|
|
if (!buffer || !buffer->deviceAttachmentSpec)
|
|
{
|
|
throw std::runtime_error(
|
|
"StimulusProducer::hasBufferWithQualeIfaceApi: encountered "
|
|
"null buffer or null deviceAttachmentSpec in "
|
|
"attachedStimulusBuffers (should never happen)");
|
|
}
|
|
|
|
if (buffer->deviceAttachmentSpec->qualeIfaceApi != qualeIfaceApi)
|
|
{ continue; }
|
|
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
bool StimulusProducer::addAttachedStimulusBufferIfNotExists(
|
|
const std::shared_ptr<StimulusBuffer>& buffer)
|
|
{
|
|
if (!buffer) { return false; }
|
|
|
|
auto it = std::find_if(
|
|
attachedStimulusBuffers.begin(),
|
|
attachedStimulusBuffers.end(),
|
|
[&](const std::shared_ptr<StimulusBuffer>& buf) {
|
|
return buf && buffer &&
|
|
buf->deviceAttachmentSpec && buffer->deviceAttachmentSpec &&
|
|
*(buf->deviceAttachmentSpec) == *(buffer->deviceAttachmentSpec);
|
|
});
|
|
|
|
if (it != attachedStimulusBuffers.end()) {
|
|
return false;
|
|
}
|
|
|
|
attachedStimulusBuffers.push_back(buffer);
|
|
return true;
|
|
}
|
|
|
|
void StimulusProducer::destroyAttachedStimulusBuffer(
|
|
const std::shared_ptr<StimulusBuffer>& buffer)
|
|
{
|
|
if (!buffer) { return; }
|
|
|
|
auto it = std::find(
|
|
attachedStimulusBuffers.begin(),
|
|
attachedStimulusBuffers.end(),
|
|
buffer);
|
|
|
|
if (it != attachedStimulusBuffers.end()) {
|
|
attachedStimulusBuffers.erase(it);
|
|
}
|
|
}
|
|
|
|
sscl::co::NonViralNonPostingInvoker StimulusProducer::productionCDaemon(
|
|
std::exception_ptr &, std::function<void()>,
|
|
sscl::SyncCancelerForAsyncWork &canceler)
|
|
{
|
|
int nextDelayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS;
|
|
|
|
do
|
|
{
|
|
bool shouldProduceFrame = false;
|
|
|
|
if (!canceler.execUncancelableSegmentOrAbort([&]()
|
|
{
|
|
/** EXPLANATION:
|
|
* We need to ensure that there's only ever one stimframe being
|
|
* produced during any CONFIG_STIMBUFF_FRAME_PERIOD_MS period. To
|
|
* guarantee this, we use a spinlock.
|
|
*
|
|
* When a new frame is to be produced, the async producer will
|
|
* first acquire the frameAssemblyLimiter spinlock. This ensures
|
|
* that only one stimframe is produced during any
|
|
* CONFIG_STIMBUFF_FRAME_PERIOD_MS interval. When the next
|
|
* timeout fires, it checks if the previous stimframe has
|
|
* finished production. If the previous stimframe is still
|
|
* being produced, we will sleep for
|
|
* CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS ms before retrying.
|
|
*/
|
|
if (frameAssemblyRateLimiter.tryAcquire())
|
|
{
|
|
nextDelayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS;
|
|
|
|
// Check if we're ending a deferral period
|
|
if (nDeferrals > 0)
|
|
{
|
|
auto deferralEndTime =
|
|
std::chrono::high_resolution_clock::now();
|
|
auto duration = deferralEndTime - deferralStartTime;
|
|
auto durationMs = std::chrono::duration_cast<
|
|
std::chrono::milliseconds>(duration);
|
|
|
|
std::cout << "productionCDaemon: Deferral period ended. "
|
|
<< "Total deferrals: " << nDeferrals
|
|
<< ", Duration: " << durationMs.count()
|
|
<< "ms" << std::endl;
|
|
|
|
nDeferrals = 0;
|
|
}
|
|
|
|
shouldProduceFrame = true;
|
|
}
|
|
else
|
|
{
|
|
nextDelayMs = CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS;
|
|
|
|
++nDeferrals;
|
|
// If this is first deferral, capture start stamp and print message
|
|
if (nDeferrals == 1)
|
|
{
|
|
deferralStartTime =
|
|
std::chrono::high_resolution_clock::now();
|
|
std::cerr << "productionCDaemon: Deferral period "
|
|
"beginning. Configured deferral period: "
|
|
<< nextDelayMs << "ms" << std::endl;
|
|
}
|
|
}
|
|
}))
|
|
{ break; }
|
|
|
|
if (shouldProduceFrame)
|
|
{
|
|
/** EXPLANATION:
|
|
* Call the derived class's frame production handler
|
|
* Note: The derived class's frame production handler (aka
|
|
* its implementation of stimFrameProductionTimesliceCInd()) must
|
|
* release the lock when frame production completes
|
|
*/
|
|
co_await stimFrameProductionTimesliceCInd(canceler);
|
|
}
|
|
else if (OptionParser::getOptions().verbose)
|
|
{
|
|
std::cerr << "productionCDaemon: Deferring frame by "
|
|
<< nextDelayMs << "ms due to rate limit." << std::endl;
|
|
}
|
|
|
|
// Schedule the next timeout using the provided delay
|
|
const bool expiredNormally = co_await
|
|
adapters::boostAsio::getDeadlineTimerAReqAwaiter(
|
|
ioContext,
|
|
daemonTimer,
|
|
boost::posix_time::milliseconds(nextDelayMs));
|
|
|
|
if (!expiredNormally) {
|
|
// Timer was cancelled, which is expected when stopping
|
|
break;
|
|
}
|
|
|
|
// FIXME: We should be able to release the start/stop lock at this point.
|
|
|
|
} while (!canceler.isCancellationRequested());
|
|
|
|
co_return;
|
|
}
|
|
|
|
void StimulusProducer::start()
|
|
{
|
|
std::cout << __func__ << ": Starting stimulus producer for device "
|
|
<< deviceAttachmentSpec->deviceSelector << std::endl;
|
|
|
|
nDeferrals = 0;
|
|
taskNursery.openAdmission();
|
|
taskNursery.launch(
|
|
[this](sscl::co::NonViralTaskNursery::Slot::Lease &lease)
|
|
{
|
|
return productionCDaemon(
|
|
lease.getExceptionStorage(),
|
|
lease.getCallerLambda(),
|
|
lease.getSyncCanceler());
|
|
});
|
|
}
|
|
|
|
void StimulusProducer::stop()
|
|
{
|
|
// Cancel timer immediately
|
|
daemonTimer.cancel();
|
|
taskNursery.requestCancelOnAll();
|
|
taskNursery.closeAdmission();
|
|
taskNursery.syncAwaitAllSettlements(
|
|
sscl::ComponentThread::getSelf()->getIoContext());
|
|
|
|
std::cout << __func__ << ": Stopped stimulus producer for device "
|
|
<< deviceAttachmentSpec->deviceSelector << std::endl;
|
|
}
|
|
|
|
} // namespace stim_buff
|
|
} // namespace smo
|