diff --git a/include/user/stimulusBuffer.h b/include/user/stimulusBuffer.h index 0330e75..0719a1d 100644 --- a/include/user/stimulusBuffer.h +++ b/include/user/stimulusBuffer.h @@ -6,6 +6,13 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include #include "stimFrame.h" #include "deviceAttachmentSpec.h" @@ -40,8 +47,11 @@ public: public: explicit StimulusBuffer( - const device::DeviceAttachmentSpec& deviceAttachmentSpec) - : deviceAttachmentSpec(deviceAttachmentSpec) + const device::DeviceAttachmentSpec& deviceAttachmentSpec, + boost::asio::io_service& ioService) + : deviceAttachmentSpec(deviceAttachmentSpec), + ioService(ioService), + shouldContinue(false), timer(ioService) {} ~StimulusBuffer() = default; @@ -52,9 +62,107 @@ public: StimulusBuffer(StimulusBuffer&&) = default; StimulusBuffer& operator=(StimulusBuffer&&) = default; + // Control methods + void start() + { + shouldContinue.store(true); + scheduleNextTimeout(); + } + + void stop() + { + shouldContinue.store(false); + + // Set up a timeout bridge using the io_service + boost::asio::deadline_timer delayTimer(ioService); + AsynchronousBridge bridge(ioService); + + // Set up the delay for CONFIG_STIMBUFF_FRAME_PERIOD_MS to let in-flight + // operation finish + delayTimer.expires_from_now( + boost::posix_time::milliseconds(CONFIG_STIMBUFF_FRAME_PERIOD_MS)); + + delayTimer.async_wait( + [&bridge](const boost::system::error_code& error) + { + (void)error; + + // Always signal complete, whether timeout expired or was cancelled + bridge.setAsyncOperationComplete(); + }); + + bridge.waitForAsyncOperationCompleteOrIoServiceStopped(); + + // After delay, cancel timer and perform cleanup + timer.cancel(); + } + public: device::DeviceAttachmentSpec deviceAttachmentSpec; std::vector frames_; + +protected: + SpinLock frameAssemblyRateLimiter; + +private: + boost::asio::io_service& ioService; + std::atomic shouldContinue; + boost::asio::deadline_timer timer; + + void scheduleNextTimeout(int delayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS) + { + if (!shouldContinue.load()) + { return; } + + // Schedule the next timeout using the provided delay + timer.expires_from_now( + boost::posix_time::milliseconds(delayMs)); + + timer.async_wait( + std::bind( + &StimulusBuffer::onTimeout, this, std::placeholders::_1)); + } + + void onTimeout(const boost::system::error_code& error) + { + // Timer was cancelled, which is expected when stopping + if (error == boost::asio::error::operation_aborted) { + return; + } + + if (error) + { + std::cerr << "StimulusBuffer: Timer error: " << error.message() + << std::endl; + return; + } + + if (!shouldContinue.load()) + { return; } + + /** EXPLANATION: + * We need to ensure that there's only ever one stimframe being produced + * during any CONFIG_STIMBUFF_FRAME_PERIOD_MS period. To guarantee this, we + * use a spinlock. + * + * When a new frame is to be produced, the async producer will first acquire + * the frameAssemblyLimiter spinlock. This way, when the next timeout is + * fired it can check whether its predecessor stimframe has finished being + * produced. If the preceding stimframe is still being produced, then we'll + * sleep for CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS ms before trying again. + */ + int nextWakeupDelayMs; + if (frameAssemblyRateLimiter.tryAcquire()) + { nextWakeupDelayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS; } + else + { nextWakeupDelayMs = CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS; } + + // Placeholder handler (empty for now) + // Note: The lock should be released when frame production completes + + // Schedule next timeout with the pre-determined duration + scheduleNextTimeout(nextWakeupDelayMs); + } }; } // namespace stim_buff diff --git a/stimBuffApis/livoxGen1/pcloudStimulusBuffer.cpp b/stimBuffApis/livoxGen1/pcloudStimulusBuffer.cpp index eeb718c..620f148 100644 --- a/stimBuffApis/livoxGen1/pcloudStimulusBuffer.cpp +++ b/stimBuffApis/livoxGen1/pcloudStimulusBuffer.cpp @@ -1,10 +1,6 @@ -#include #include -#include #include -#include #include -#include #include "pcloudStimulusBuffer.h" namespace smo { @@ -15,104 +11,13 @@ PcloudStimulusBuffer::PcloudStimulusBuffer( std::shared_ptr &device, const PcloudFormatDesc& formatDesc, size_t nDgramsPerStagingBufferFrame) -: StimulusBuffer(deviceAttachmentSpec), +: StimulusBuffer(deviceAttachmentSpec, device->componentThread->getIoService()), deviceAttachmentSpec(deviceAttachmentSpec), device(device), formatDesc(formatDesc), stagingBuffer( StagingBuffer::InputEngineConstraints::ioUringConstraints, - OpenClConstraints(), nDgramsPerStagingBufferFrame), -shouldContinue(false), timer(device->componentThread->getIoService()) + OpenClConstraints(), nDgramsPerStagingBufferFrame) { } -void PcloudStimulusBuffer::start() -{ - shouldContinue.store(true); - scheduleNextTimeout(); -} - -void PcloudStimulusBuffer::stop() -{ - shouldContinue.store(false); - - // Set up a timeout bridge using the device's component thread's io_service - auto& ioService = device->componentThread->getIoService(); - boost::asio::deadline_timer delayTimer(ioService); - AsynchronousBridge bridge(ioService); - - // Set up the delay for CONFIG_STIMBUFF_FRAME_PERIOD_MS to let in-flight - // operation finish - delayTimer.expires_from_now( - boost::posix_time::milliseconds(CONFIG_STIMBUFF_FRAME_PERIOD_MS)); - - delayTimer.async_wait( - [&bridge](const boost::system::error_code& error) - { - (void)error; - - // Always signal complete, whether timeout expired or was cancelled - bridge.setAsyncOperationComplete(); - }); - - bridge.waitForAsyncOperationCompleteOrIoServiceStopped(); - - // After delay, cancel timer and perform cleanup - timer.cancel(); -} - -void PcloudStimulusBuffer::scheduleNextTimeout(int delayMs) -{ - if (!shouldContinue.load()) - { return; } - - // Schedule the next timeout using the provided delay - timer.expires_from_now( - boost::posix_time::milliseconds(delayMs)); - - timer.async_wait( - std::bind( - &PcloudStimulusBuffer::onTimeout, this, std::placeholders::_1)); -} - -void PcloudStimulusBuffer::onTimeout(const boost::system::error_code& error) -{ - // Timer was cancelled, which is expected when stopping - if (error == boost::asio::error::operation_aborted) { - return; - } - - if (error) - { - std::cerr << "PcloudStimulusBuffer: Timer error: " << error.message() - << std::endl; - return; - } - - if (!shouldContinue.load()) - { return; } - - /** EXPLANATION: - * We need to ensure that there's only ever one stimframe being produced - * during any CONFIG_STIMBUFF_FRAME_PERIOD_MS period. To guarantee this, we - * use a spinlock. - * - * When a new frame is to be produced, the async producer will first acquire - * the frameAssemblyLimiter spinlock. This way, when the next timeout is - * fired it can check whether its predecessor stimframe has finished being - * produced. If the preceding stimframe is still being produced, then we'll - * sleep for CONFIG_STIMBUFF_FRAME_RETRY_PERIOD_MS ms before trying again. - */ - int nextWakeupDelayMs; - if (frameAssemblyRateLimiter.tryAcquire()) - { nextWakeupDelayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS; } - else - { nextWakeupDelayMs = CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS; } - - // Placeholder handler (empty for now) - // Note: The lock should be released when frame production completes - - // Schedule next timeout with the pre-determined duration - scheduleNextTimeout(nextWakeupDelayMs); -} - } // namespace stim_buff } // namespace smo diff --git a/stimBuffApis/livoxGen1/pcloudStimulusBuffer.h b/stimBuffApis/livoxGen1/pcloudStimulusBuffer.h index dfedee2..371c98a 100644 --- a/stimBuffApis/livoxGen1/pcloudStimulusBuffer.h +++ b/stimBuffApis/livoxGen1/pcloudStimulusBuffer.h @@ -1,10 +1,6 @@ #ifndef _LIVOX_GEN1_PCLOUD_STIMULUS_BUFFER_H #define _LIVOX_GEN1_PCLOUD_STIMULUS_BUFFER_H -#include -#include -#include -#include #include #include #include @@ -40,25 +36,12 @@ public: PcloudStimulusBuffer(PcloudStimulusBuffer&&) = default; PcloudStimulusBuffer& operator=(PcloudStimulusBuffer&&) = default; - // Control methods - void start(); - void stop(); - public: device::DeviceAttachmentSpec deviceAttachmentSpec; std::shared_ptr device; PcloudFormatDesc formatDesc; StagingBuffer stagingBuffer; IoUringAssemblyEngine ioUringAssemblyEngine; - -private: - std::atomic shouldContinue; - boost::asio::deadline_timer timer; - SpinLock frameAssemblyRateLimiter; - -private: - void scheduleNextTimeout(int delayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS); - void onTimeout(const boost::system::error_code& error); }; } // namespace stim_buff