#ifndef _SP_MC_RING_BUFFER_H #define _SP_MC_RING_BUFFER_H #include #include #include #include #include namespace smo { /** * @brief Single-producer, multi-consumer ring buffer w/per-slot sequence locks * * A ring buffer that maintains data alignment constraints while providing * lock-free read access through per-slot sequence locks. The locks are kept * separate from the data to preserve alignment requirements for the input * engine. */ class SpMcRingBuffer { public: class InputEngineConstraints { public: InputEngineConstraints( size_t slotStartAlignmentNBytes_, size_t slotPadToNBytes_) : slotStartAlignmentNBytes(slotStartAlignmentNBytes_), slotPadToNBytes(slotPadToNBytes_) {} ~InputEngineConstraints() = default; // Input-engine layout/constraints size_t slotStartAlignmentNBytes; // power-of-2 alignment (e.g., 4096) size_t slotPadToNBytes; // minimum size per slot }; public: /** EXPLANATION: * Constructor initializes the ring buffer with the given constraints and * number of slots. Calculates stride and allocates data buffer and sequence * locks array. */ explicit SpMcRingBuffer( const InputEngineConstraints& constraints_, size_t nSlots_) : nSlots(nSlots_), strideNBytes(0), bufferNBytes(0), constraints(constraints_) { if (nSlots == 0) { throw std::invalid_argument(std::string(__func__) + ": SpMcRingBuffer: nSlots must be > 0"); } computeStrideAndBufferSize(); // Allocate data buffer: bufferNBytes (aligned up to alignment) data.resize(bufferNBytes); // Initialize sequence locks array: one lock per slot sequenceLocks.resize(nSlots); } ~SpMcRingBuffer() = default; // Non-copyable, movable SpMcRingBuffer(const SpMcRingBuffer&) = delete; SpMcRingBuffer& operator=(const SpMcRingBuffer&) = delete; SpMcRingBuffer(SpMcRingBuffer&&) = default; SpMcRingBuffer& operator=(SpMcRingBuffer&&) = default; public: /** * @brief Get a reference to data at the specified slot * * @tparam T The type of data stored in the slot * @param slotIndex The index of the slot (0-based) * @return Reference to T at the slot * @throws std::out_of_range if slotIndex >= nSlots */ template T& getDataAtSlot(size_t slotIndex) { if (slotIndex >= nSlots) { throw std::out_of_range(std::string(__func__) + ": SpMcRingBuffer: slotIndex must be < nSlots"); } size_t offset = slotIndex * strideNBytes; return *reinterpret_cast(data.data() + offset); } SequenceLock& getSequenceLockAtSlot(size_t slotIndex) { if (slotIndex >= nSlots) { throw std::out_of_range(std::string(__func__) + ": SpMcRingBuffer: slotIndex must be < nSlots"); } return sequenceLocks[slotIndex]; } private: void computeStrideAndBufferSize() { // Stride is the maximum of alignment and padding strideNBytes = std::max( constraints.slotStartAlignmentNBytes, constraints.slotPadToNBytes); // Buffer size is nSlots * strideNBytes, aligned up to alignment size_t rawSize = nSlots * strideNBytes; bufferNBytes = ((rawSize + constraints.slotStartAlignmentNBytes - 1) / constraints.slotStartAlignmentNBytes) * constraints.slotStartAlignmentNBytes; } // Buffer data std::vector data; // Sequence locks array: one lock per slot std::vector sequenceLocks; public: // Layout/invariants size_t nSlots; size_t strideNBytes; size_t bufferNBytes; InputEngineConstraints constraints; }; } // namespace smo #endif // _SP_MC_RING_BUFFER_H