Files
salmanoff/commonLibs/attachmentSupport/stimulusProducer.cpp
T
2026-06-10 04:12:32 -04:00

235 lines
5.8 KiB
C++

#include <config.h>
#include <iostream>
#include <chrono>
#include <algorithm>
#include <boost/asio/io_context.hpp>
#include <opts.h>
#include <componentThread.h>
#include <adapters/boostAsio/deadlineTimerAReq.h>
#include <user/stimulusProducer.h>
#include <user/stimulusBuffer.h>
namespace smo {
namespace stim_buff {
namespace {
long computeTimesliceResidueMs(
long productionDurationMs, long periodMs)
{
if (productionDurationMs >= periodMs) {
return 0;
}
return periodMs - productionDurationMs;
}
void logProductionOverrunIfNeeded(
const char *daemonName,
long productionDurationMs, long periodMs,
size_t &nTimesliceOverruns)
{
if (productionDurationMs <= periodMs) {
return;
}
++nTimesliceOverruns;
const long overrunByMs = productionDurationMs - periodMs;
std::cerr << daemonName << ": production overrun: actual="
<< productionDurationMs << "ms budget=" << periodMs
<< "ms overrunBy=" << overrunByMs << "ms nOverruns="
<< nTimesliceOverruns << std::endl;
}
long durationMsSince(
const std::chrono::high_resolution_clock::time_point &startStamp,
const std::chrono::high_resolution_clock::time_point &endStamp)
{
const auto duration = endStamp - startStamp;
return std::chrono::duration_cast<std::chrono::milliseconds>(
duration).count();
}
void logDaemonDurationsIfVerbose(
const char *daemonName,
long productionDurationMs,
long timesliceDurationMs,
long periodMs)
{
if (!OptionParser::getOptions().verbose) {
return;
}
std::cerr << daemonName << ": daemon durations: production="
<< productionDurationMs << "ms timeslice="
<< timesliceDurationMs << "ms period=" << periodMs
<< "ms" << std::endl;
}
} // namespace
std::shared_ptr<StimulusBuffer> StimulusProducer::getAttachedStimulusBuffer(
const std::shared_ptr<device::DeviceAttachmentSpec>& spec) const
{
for (const auto& buffer : attachedStimulusBuffers)
{
if (buffer && buffer->deviceAttachmentSpec &&
*buffer->deviceAttachmentSpec == *spec)
{
return buffer;
}
}
return nullptr;
}
bool StimulusProducer::hasBufferWithQualeIfaceApi(
const std::string& qualeIfaceApi) const
{
for (const auto& buffer : attachedStimulusBuffers)
{
if (!buffer || !buffer->deviceAttachmentSpec)
{
throw std::runtime_error(
"StimulusProducer::hasBufferWithQualeIfaceApi: encountered "
"null buffer or null deviceAttachmentSpec in "
"attachedStimulusBuffers (should never happen)");
}
if (buffer->deviceAttachmentSpec->qualeIfaceApi != qualeIfaceApi)
{ continue; }
return true;
}
return false;
}
bool StimulusProducer::addAttachedStimulusBufferIfNotExists(
const std::shared_ptr<StimulusBuffer>& buffer)
{
if (!buffer) { return false; }
auto it = std::find_if(
attachedStimulusBuffers.begin(),
attachedStimulusBuffers.end(),
[&](const std::shared_ptr<StimulusBuffer>& buf) {
return buf && buffer &&
buf->deviceAttachmentSpec && buffer->deviceAttachmentSpec &&
*(buf->deviceAttachmentSpec) == *(buffer->deviceAttachmentSpec);
});
if (it != attachedStimulusBuffers.end()) {
return false;
}
attachedStimulusBuffers.push_back(buffer);
return true;
}
void StimulusProducer::destroyAttachedStimulusBuffer(
const std::shared_ptr<StimulusBuffer>& buffer)
{
if (!buffer) { return; }
auto it = std::find(
attachedStimulusBuffers.begin(),
attachedStimulusBuffers.end(),
buffer);
if (it != attachedStimulusBuffers.end()) {
attachedStimulusBuffers.erase(it);
}
}
sscl::co::NonViralNonPostingInvoker StimulusProducer::productionCDaemon(
std::exception_ptr &, std::function<void()>,
sscl::SyncCancelerForAsyncWork &canceler)
{
const long framePeriodMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS;
do
{
if (canceler.isCancellationRequested()) {
break;
}
const auto timesliceStartStamp =
std::chrono::high_resolution_clock::now();
const auto productionStartStamp =
std::chrono::high_resolution_clock::now();
co_await stimFrameProductionTimesliceCInd(canceler);
const auto productionEndStamp =
std::chrono::high_resolution_clock::now();
const long productionDurationMs = durationMsSince(
productionStartStamp, productionEndStamp);
logProductionOverrunIfNeeded(
"productionCDaemon",
productionDurationMs, framePeriodMs, nTimesliceOverruns);
const long residueMs = computeTimesliceResidueMs(
productionDurationMs, framePeriodMs);
// Schedule the next timeout based on timeslice remaining time.
const bool expiredNormally = co_await
adapters::boostAsio::getDeadlineTimerAReqAwaiter(
ioContext,
daemonTimer,
boost::posix_time::milliseconds(residueMs));
if (!expiredNormally) {
// Timer was cancelled, which is expected when stopping
break;
}
const auto timesliceEndStamp =
std::chrono::high_resolution_clock::now();
const long timesliceDurationMs = durationMsSince(
timesliceStartStamp, timesliceEndStamp);
logDaemonDurationsIfVerbose(
"productionCDaemon",
productionDurationMs, timesliceDurationMs,
framePeriodMs);
} while (!canceler.isCancellationRequested());
co_return;
}
void StimulusProducer::start()
{
std::cout << __func__ << ": Starting stimulus producer for device "
<< deviceAttachmentSpec->deviceSelector << std::endl;
nTimesliceOverruns = 0;
taskNursery.openAdmission();
taskNursery.launch(
[this](sscl::co::NonViralTaskNursery::Slot::Lease &lease)
{
return productionCDaemon(
lease.getExceptionStorage(),
lease.getCallerLambda(),
lease.getSyncCanceler());
});
}
void StimulusProducer::stop()
{
// Cancel timer immediately
daemonTimer.cancel();
taskNursery.requestCancelOnAll();
taskNursery.closeAdmission();
taskNursery.syncAwaitAllSettlements(
sscl::ComponentThread::getSelf()->getIoContext());
std::cout << __func__ << ": Stopped stimulus producer for device "
<< deviceAttachmentSpec->deviceSelector << std::endl;
}
} // namespace stim_buff
} // namespace smo