diff --git a/stimBuffApis/livoxGen1/pcloudStimulusBuffer.cpp b/stimBuffApis/livoxGen1/pcloudStimulusBuffer.cpp index 21b6f43..eeb718c 100644 --- a/stimBuffApis/livoxGen1/pcloudStimulusBuffer.cpp +++ b/stimBuffApis/livoxGen1/pcloudStimulusBuffer.cpp @@ -1,10 +1,118 @@ -#include "pcloudStimulusBuffer.h" +#include #include #include #include +#include +#include +#include +#include "pcloudStimulusBuffer.h" namespace smo { namespace stim_buff { +PcloudStimulusBuffer::PcloudStimulusBuffer( + const device::DeviceAttachmentSpec& deviceAttachmentSpec, + std::shared_ptr &device, + const PcloudFormatDesc& formatDesc, + size_t nDgramsPerStagingBufferFrame) +: StimulusBuffer(deviceAttachmentSpec), +deviceAttachmentSpec(deviceAttachmentSpec), device(device), +formatDesc(formatDesc), stagingBuffer( + StagingBuffer::InputEngineConstraints::ioUringConstraints, + OpenClConstraints(), nDgramsPerStagingBufferFrame), +shouldContinue(false), timer(device->componentThread->getIoService()) +{ +} + +void PcloudStimulusBuffer::start() +{ + shouldContinue.store(true); + scheduleNextTimeout(); +} + +void PcloudStimulusBuffer::stop() +{ + shouldContinue.store(false); + + // Set up a timeout bridge using the device's component thread's io_service + auto& ioService = device->componentThread->getIoService(); + boost::asio::deadline_timer delayTimer(ioService); + AsynchronousBridge bridge(ioService); + + // Set up the delay for CONFIG_STIMBUFF_FRAME_PERIOD_MS to let in-flight + // operation finish + delayTimer.expires_from_now( + boost::posix_time::milliseconds(CONFIG_STIMBUFF_FRAME_PERIOD_MS)); + + delayTimer.async_wait( + [&bridge](const boost::system::error_code& error) + { + (void)error; + + // Always signal complete, whether timeout expired or was cancelled + bridge.setAsyncOperationComplete(); + }); + + bridge.waitForAsyncOperationCompleteOrIoServiceStopped(); + + // After delay, cancel timer and perform cleanup + timer.cancel(); +} + +void PcloudStimulusBuffer::scheduleNextTimeout(int delayMs) +{ + if (!shouldContinue.load()) + { return; } + + // Schedule the next timeout using the provided delay + timer.expires_from_now( + boost::posix_time::milliseconds(delayMs)); + + timer.async_wait( + std::bind( + &PcloudStimulusBuffer::onTimeout, this, std::placeholders::_1)); +} + +void PcloudStimulusBuffer::onTimeout(const boost::system::error_code& error) +{ + // Timer was cancelled, which is expected when stopping + if (error == boost::asio::error::operation_aborted) { + return; + } + + if (error) + { + std::cerr << "PcloudStimulusBuffer: Timer error: " << error.message() + << std::endl; + return; + } + + if (!shouldContinue.load()) + { return; } + + /** EXPLANATION: + * We need to ensure that there's only ever one stimframe being produced + * during any CONFIG_STIMBUFF_FRAME_PERIOD_MS period. To guarantee this, we + * use a spinlock. + * + * When a new frame is to be produced, the async producer will first acquire + * the frameAssemblyLimiter spinlock. This way, when the next timeout is + * fired it can check whether its predecessor stimframe has finished being + * produced. If the preceding stimframe is still being produced, then we'll + * sleep for CONFIG_STIMBUFF_FRAME_RETRY_PERIOD_MS ms before trying again. + */ + int nextWakeupDelayMs; + if (frameAssemblyRateLimiter.tryAcquire()) + { nextWakeupDelayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS; } + else + { nextWakeupDelayMs = CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS; } + + // Placeholder handler (empty for now) + // Note: The lock should be released when frame production completes + + // Schedule next timeout with the pre-determined duration + scheduleNextTimeout(nextWakeupDelayMs); +} + } // namespace stim_buff } // namespace smo diff --git a/stimBuffApis/livoxGen1/pcloudStimulusBuffer.h b/stimBuffApis/livoxGen1/pcloudStimulusBuffer.h index 504b114..dfedee2 100644 --- a/stimBuffApis/livoxGen1/pcloudStimulusBuffer.h +++ b/stimBuffApis/livoxGen1/pcloudStimulusBuffer.h @@ -1,6 +1,10 @@ #ifndef _LIVOX_GEN1_PCLOUD_STIMULUS_BUFFER_H #define _LIVOX_GEN1_PCLOUD_STIMULUS_BUFFER_H +#include +#include +#include +#include #include #include #include @@ -26,13 +30,7 @@ public: const device::DeviceAttachmentSpec& deviceAttachmentSpec, std::shared_ptr &device, const PcloudFormatDesc& formatDesc, - size_t nDgramsPerStagingBufferFrame) - : StimulusBuffer(deviceAttachmentSpec), - deviceAttachmentSpec(deviceAttachmentSpec), device(device), - formatDesc(formatDesc), stagingBuffer( - StagingBuffer::InputEngineConstraints::ioUringConstraints, - OpenClConstraints(), nDgramsPerStagingBufferFrame) - {} + size_t nDgramsPerStagingBufferFrame); ~PcloudStimulusBuffer() = default; @@ -42,12 +40,25 @@ public: PcloudStimulusBuffer(PcloudStimulusBuffer&&) = default; PcloudStimulusBuffer& operator=(PcloudStimulusBuffer&&) = default; + // Control methods + void start(); + void stop(); + public: device::DeviceAttachmentSpec deviceAttachmentSpec; std::shared_ptr device; PcloudFormatDesc formatDesc; StagingBuffer stagingBuffer; IoUringAssemblyEngine ioUringAssemblyEngine; + +private: + std::atomic shouldContinue; + boost::asio::deadline_timer timer; + SpinLock frameAssemblyRateLimiter; + +private: + void scheduleNextTimeout(int delayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS); + void onTimeout(const boost::system::error_code& error); }; } // namespace stim_buff