#ifndef STAGINGBUFFER_H #define STAGINGBUFFER_H #include #include #include #include #include #include #include #include #include "frameAssemblyDesc.h" namespace smo { namespace stim_buff { /** * StagingBuffer manages a large buffer to guide io_uring in assembling some * number of Livox Avia pcloud UDP dgrams into a single stim frame. * * The buffer operates in a cycle: * 1. io_uring assembles UDP dgrams into the buffer until it's full * 2. Buffer is handed off to the stimbuff layer to be appended to the stimbuff. * 3. When the stimbuff layer has appended the current assembled frame, the * assembly buffer is reset and cycle repeats. */ class StagingBuffer { public: class InputEngineConstraints { public: InputEngineConstraints( size_t slotStartAlignmentByteVal_, size_t slotPadToNBytes_) : slotStartAlignmentByteVal(slotStartAlignmentByteVal_), slotPadToNBytes(slotPadToNBytes_) {} ~InputEngineConstraints() = default; // Input-engine layout/constraints size_t slotStartAlignmentByteVal; // power-of-2 alignment (e.g., 4096) size_t slotPadToNBytes; // minimum size per datagram slot // Static defaults for io_uring static const InputEngineConstraints ioUringConstraints; inline std::string stringify() const { std::ostringstream oss; oss << "InputEngineConstraints{" << "slotStartAlignmentByteVal=" << slotStartAlignmentByteVal << ", slotPadToNBytes=" << slotPadToNBytes << "}"; return oss.str(); } }; class OutputEngineConstraints { public: OutputEngineConstraints() = default; ~OutputEngineConstraints() = default; }; public: /** EXPLANATION: * We use the input and output engine constraints to determine the total * amount of memory required internally to assemble a single frame with * the given number of points per frame. */ explicit StagingBuffer( const InputEngineConstraints& inputEngineConstraints, const OutputEngineConstraints& outputEngineConstraints, size_t nDgramsPerFrame); ~StagingBuffer() = default; // Non-copyable, movable StagingBuffer(const StagingBuffer&) = delete; StagingBuffer& operator=(const StagingBuffer&) = delete; StagingBuffer(StagingBuffer&&) = default; StagingBuffer& operator=(StagingBuffer&&) = default; public: /** EXPLANATION: * Returns an input-engine-agnostic descriptor describing per-frame packet * slot layout. Different input engines should be able to convert this into * engine-specific metadata. E.g: io_uring's SQE descriptor. */ operator std::shared_ptr() const { return frameDesc; } // operator OpenClSharedBufferDescriptor() const; bool isAssembling() const { return assemblingFlag.load(); } void startAssembly() { assemblingFlag.store(true); } void stopAssembly() { assemblingFlag.store(false); } inline std::string stringify() const { std::ostringstream oss; oss << "StagingBuffer{" << "nDgramsPerFrame=" << nDgramsPerFrame << ", bufferNBytes=" << bufferNBytes << ", slotStrideNBytes=" << slotStrideNBytes << ", constraints=" << inputConstraints.stringify() << "}"; return oss.str(); } private: void computeSlotStrideAndBufferSize(); // Buffer data std::unique_ptr buffer; size_t bufferNBytes; // Layout/invariants size_t nDgramsPerFrame; size_t slotStrideNBytes; InputEngineConstraints inputConstraints; // Descriptor (computed once; reused across frames) mutable std::shared_ptr frameDesc; // Current state std::atomic currentNBytes; std::atomic assemblingFlag; }; /** Inline implementations ******************************************************************************/ inline StagingBuffer::StagingBuffer( const InputEngineConstraints& inputEngineConstraints_, const OutputEngineConstraints& /*outputEngineConstraints*/, size_t nDgramsPerFrame) : buffer(nullptr), bufferNBytes(0), nDgramsPerFrame(nDgramsPerFrame), slotStrideNBytes(0), inputConstraints(inputEngineConstraints_), assemblingFlag(false) { if (nDgramsPerFrame == 0) { throw std::invalid_argument(std::string(__func__) + ": StagingBuffer: nDgramsPerFrame must be > 0"); } computeSlotStrideAndBufferSize(); buffer = std::make_unique(bufferNBytes); currentNBytes.store(0); // Build FrameAssemblyDesc once std::vector slots; slots.reserve(nDgramsPerFrame); uint8_t *frameBase = buffer.get(); for (size_t i = 0; i < nDgramsPerFrame; ++i) { size_t off = i * slotStrideNBytes; FrameAssemblyDesc::SlotDesc s{ off, frameBase + off, inputConstraints.slotPadToNBytes}; slots.push_back(s); } frameDesc = std::make_shared( nDgramsPerFrame, inputConstraints.slotPadToNBytes, bufferNBytes, std::move(slots)); } inline void StagingBuffer::computeSlotStrideAndBufferSize() { // Slot stride is the maximum of alignment and padding slotStrideNBytes = std::max( inputConstraints.slotStartAlignmentByteVal, inputConstraints.slotPadToNBytes); // Buffer size is nDgramsPerFrame * slotStrideNBytes, aligned up to alignment size_t rawSize = nDgramsPerFrame * slotStrideNBytes; bufferNBytes = ((rawSize + inputConstraints.slotStartAlignmentByteVal - 1) / inputConstraints.slotStartAlignmentByteVal) * inputConstraints.slotStartAlignmentByteVal; } /** Specific input/output engine constraints ******************************************************************************/ class OpenClConstraints : public StagingBuffer::OutputEngineConstraints { public: OpenClConstraints() : StagingBuffer::OutputEngineConstraints() {} ~OpenClConstraints() = default; }; } // namespace stim_buff } // namespace smo #endif // STAGINGBUFFER_H