Files
salmanoff/stimBuffApis/livoxGen1/stagingBuffer.h
T

203 lines
5.7 KiB
C++

#ifndef STAGINGBUFFER_H
#define STAGINGBUFFER_H
#include <memory>
#include <cstdint>
#include <functional>
#include <atomic>
#include <vector>
#include <string>
#include <sstream>
#include <algorithm>
#include "FrameAssemblyDesc.h"
namespace smo {
namespace stim_buff {
/**
* StagingBuffer manages a large buffer to guide io_uring in assembling some
* number of Livox Avia pcloud UDP dgrams into a single stim frame.
*
* The buffer operates in a cycle:
* 1. io_uring assembles UDP dgrams into the buffer until it's full
* 2. Buffer is handed off to the stimbuff layer to be appended to the stimbuff.
* 3. When the stimbuff layer has appended the current assembled frame, the
* assembly buffer is reset and cycle repeats.
*/
class StagingBuffer
{
public:
class InputEngineConstraints
{
public:
InputEngineConstraints(
size_t slotStartAlignmentByteVal_,
size_t slotPadToNBytes_)
: slotStartAlignmentByteVal(slotStartAlignmentByteVal_),
slotPadToNBytes(slotPadToNBytes_)
{}
~InputEngineConstraints() = default;
// Input-engine layout/constraints
size_t slotStartAlignmentByteVal; // power-of-2 alignment (e.g., 4096)
size_t slotPadToNBytes; // minimum size per datagram slot
// Static defaults for io_uring
static const InputEngineConstraints ioUringConstraints;
inline std::string stringify() const
{
std::ostringstream oss;
oss << "InputEngineConstraints{"
<< "slotStartAlignmentByteVal=" << slotStartAlignmentByteVal
<< ", slotPadToNBytes=" << slotPadToNBytes
<< "}";
return oss.str();
}
};
class OutputEngineConstraints
{
public:
OutputEngineConstraints() = default;
~OutputEngineConstraints() = default;
};
public:
/** EXPLANATION:
* We use the input and output engine constraints to determine the total
* amount of memory required internally to assemble a single frame with
* the given number of points per frame.
*/
explicit StagingBuffer(
const InputEngineConstraints& inputEngineConstraints,
const OutputEngineConstraints& outputEngineConstraints,
size_t nDgramsPerFrame);
~StagingBuffer() = default;
// Non-copyable, movable
StagingBuffer(const StagingBuffer&) = delete;
StagingBuffer& operator=(const StagingBuffer&) = delete;
StagingBuffer(StagingBuffer&&) = default;
StagingBuffer& operator=(StagingBuffer&&) = default;
public:
/** EXPLANATION:
* Returns an input-engine-agnostic descriptor describing per-frame packet
* slot layout. Different input engines should be able to convert this into
* engine-specific metadata. E.g: io_uring's SQE descriptor.
*/
operator std::shared_ptr<FrameAssemblyDesc>() const { return frameDesc; }
// operator OpenClSharedBufferDescriptor() const;
bool isAssembling() const { return assemblingFlag.load(); }
void startAssembly() { assemblingFlag.store(true); }
void stopAssembly() { assemblingFlag.store(false); }
inline std::string stringify() const
{
std::ostringstream oss;
oss << "StagingBuffer{"
<< "nDgramsPerFrame=" << nDgramsPerFrame
<< ", bufferNBytes=" << bufferNBytes
<< ", slotStrideNBytes=" << slotStrideNBytes
<< ", constraints=" << inputConstraints.stringify()
<< "}";
return oss.str();
}
private:
void computeSlotStrideAndBufferSize();
// Buffer data
std::unique_ptr<uint8_t[]> buffer;
size_t bufferNBytes;
// Layout/invariants
size_t nDgramsPerFrame;
size_t slotStrideNBytes;
InputEngineConstraints inputConstraints;
// Descriptor (computed once; reused across frames)
mutable std::shared_ptr<FrameAssemblyDesc> frameDesc;
// Current state
std::atomic<size_t> currentNBytes;
std::atomic<bool> assemblingFlag;
};
/** Inline implementations
******************************************************************************/
inline StagingBuffer::StagingBuffer(
const InputEngineConstraints& inputEngineConstraints_,
const OutputEngineConstraints& /*outputEngineConstraints*/,
size_t nDgramsPerFrame)
: buffer(nullptr), bufferNBytes(0),
nDgramsPerFrame(nDgramsPerFrame), slotStrideNBytes(0),
inputConstraints(inputEngineConstraints_),
assemblingFlag(false)
{
if (nDgramsPerFrame == 0)
{
throw std::invalid_argument(std::string(__func__)
+ ": StagingBuffer: nDgramsPerFrame must be > 0");
}
computeSlotStrideAndBufferSize();
buffer.reset(new uint8_t[bufferNBytes]);
currentNBytes.store(0);
// Build FrameAssemblyDesc once
std::vector<FrameAssemblyDesc::SlotDesc> slots;
slots.reserve(nDgramsPerFrame);
uint8_t *frameBase = buffer.get();
for (size_t i = 0; i < nDgramsPerFrame; ++i)
{
size_t off = i * slotStrideNBytes;
FrameAssemblyDesc::SlotDesc s{
off, frameBase + off, inputConstraints.slotPadToNBytes};
slots.push_back(s);
}
frameDesc = std::make_shared<FrameAssemblyDesc>(
nDgramsPerFrame, inputConstraints.slotPadToNBytes, bufferNBytes,
std::move(slots));
}
inline void StagingBuffer::computeSlotStrideAndBufferSize()
{
// Slot stride is the maximum of alignment and padding
slotStrideNBytes = std::max(
inputConstraints.slotStartAlignmentByteVal,
inputConstraints.slotPadToNBytes);
// Buffer size is nDgramsPerFrame * slotStrideNBytes, aligned up to alignment
size_t rawSize = nDgramsPerFrame * slotStrideNBytes;
bufferNBytes = ((rawSize + inputConstraints.slotStartAlignmentByteVal - 1)
/ inputConstraints.slotStartAlignmentByteVal)
* inputConstraints.slotStartAlignmentByteVal;
}
/** Specific input/output engine constraints
******************************************************************************/
class OpenClConstraints
: public StagingBuffer::OutputEngineConstraints
{
public:
OpenClConstraints()
: StagingBuffer::OutputEngineConstraints()
{}
~OpenClConstraints() = default;
};
} // namespace stim_buff
} // namespace smo
#endif // STAGINGBUFFER_H