#ifndef _STIMULUS_BUFFER_H #define _STIMULUS_BUFFER_H #include #include #include #include #include #include #include #include #include #include #include #include #include "stimFrame.h" #include "deviceAttachmentSpec.h" namespace smo { namespace stim_buff { /** * StimulusBuffer manages a collection of stimulus frames with simultaneity stamps. * * This buffer is designed to hold stimulus frames that have been assembled * from raw sensor data (e.g., Livox Avia point cloud data) and are ready * for processing by the mind layer. * * The buffer provides thread-safe operations for adding frames, retrieving * frames, and managing the buffer state. */ class StimulusBuffer { public: class PcloudFormatDesc { public: enum class Format { XYZ, XYZI, }; public: Format format; }; public: explicit StimulusBuffer( const device::DeviceAttachmentSpec& deviceAttachmentSpec, boost::asio::io_service& ioService) : deviceAttachmentSpec(deviceAttachmentSpec), ioService(ioService), shouldContinue(false), timer(ioService) {} virtual ~StimulusBuffer() = default; // Non-copyable, movable StimulusBuffer(const StimulusBuffer&) = delete; StimulusBuffer& operator=(const StimulusBuffer&) = delete; StimulusBuffer(StimulusBuffer&&) = default; StimulusBuffer& operator=(StimulusBuffer&&) = default; // Control methods void start() { shouldContinue.store(true); scheduleNextTimeout(); } void stop(); protected: // Virtual functions for derived classes to override virtual int getStopDelayMs() const { return CONFIG_STIMBUFF_FRAME_PERIOD_MS; } virtual void stimFrameProductionTimesliceInd() = 0; private: void onTimeout(const boost::system::error_code& error); public: device::DeviceAttachmentSpec deviceAttachmentSpec; std::vector frames_; protected: SpinLock frameAssemblyRateLimiter; private: boost::asio::io_service& ioService; std::atomic shouldContinue; boost::asio::deadline_timer timer; void scheduleNextTimeout(int delayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS) { if (!shouldContinue.load()) { return; } // Schedule the next timeout using the provided delay timer.expires_from_now( boost::posix_time::milliseconds(delayMs)); timer.async_wait( std::bind( &StimulusBuffer::onTimeout, this, std::placeholders::_1)); } }; /** Inline methods ******************************************************************************/ inline void StimulusBuffer::stop() { shouldContinue.store(false); // Set up a timeout bridge using the io_service boost::asio::deadline_timer delayTimer(ioService); AsynchronousBridge bridge(ioService); // Set up the delay to let in-flight operation finish delayTimer.expires_from_now( boost::posix_time::milliseconds(getStopDelayMs())); delayTimer.async_wait( [&bridge](const boost::system::error_code& error) { (void)error; // Always signal complete, whether timeout expired or was cancelled bridge.setAsyncOperationComplete(); }); bridge.waitForAsyncOperationCompleteOrIoServiceStopped(); // After delay, cancel timer and perform cleanup timer.cancel(); } inline void StimulusBuffer::onTimeout(const boost::system::error_code& error) { // Timer was cancelled, which is expected when stopping if (error == boost::asio::error::operation_aborted) { return; } if (error) { std::cerr << "StimulusBuffer: Timer error: " << error.message() << std::endl; return; } if (!shouldContinue.load()) { return; } /** EXPLANATION: * We need to ensure that there's only ever one stimframe being produced * during any CONFIG_STIMBUFF_FRAME_PERIOD_MS period. To guarantee this, we * use a spinlock. * * When a new frame is to be produced, the async producer will first acquire * the frameAssemblyLimiter spinlock. This way, when the next timeout is * fired it can check whether its predecessor stimframe has finished being * produced. If the preceding stimframe is still being produced, then we'll * sleep for CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS ms before trying again. */ int nextWakeupDelayMs; if (frameAssemblyRateLimiter.tryAcquire()) { nextWakeupDelayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS; } else { nextWakeupDelayMs = CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS; } // Call the derived class's frame production handler stimFrameProductionTimesliceInd(); // Note: The lock should be released when frame production completes // Schedule next timeout with the pre-determined duration scheduleNextTimeout(nextWakeupDelayMs); } } // namespace stim_buff } // namespace smo #endif // _STIMULUS_BUFFER_H