From b3d0565e11dbc7ea1154cf594cf913c75476a4e1 Mon Sep 17 00:00:00 2001 From: Hayodea Hekol Date: Thu, 30 Oct 2025 19:07:19 -0400 Subject: [PATCH] livoxGen1: Committing intermediate state before daemon design --- stimBuffApis/livoxGen1/frameAssemblyDesc.h | 65 +++++++++ .../livoxGen1/ioUringAssemblyEngine.cpp | 123 ++++++++++++++++++ .../livoxGen1/ioUringAssemblyEngine.h | 72 ++++++++++ stimBuffApis/livoxGen1/pcloudStimulusBuffer.h | 2 + 4 files changed, 262 insertions(+) create mode 100644 stimBuffApis/livoxGen1/frameAssemblyDesc.h create mode 100644 stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp create mode 100644 stimBuffApis/livoxGen1/ioUringAssemblyEngine.h diff --git a/stimBuffApis/livoxGen1/frameAssemblyDesc.h b/stimBuffApis/livoxGen1/frameAssemblyDesc.h new file mode 100644 index 0000000..d0a8fda --- /dev/null +++ b/stimBuffApis/livoxGen1/frameAssemblyDesc.h @@ -0,0 +1,65 @@ +#ifndef _LIVOX_GEN1_FRAME_ASSEMBLY_DESC_H +#define _LIVOX_GEN1_FRAME_ASSEMBLY_DESC_H + +#include +#include +#include +#include +#include + +namespace smo { +namespace stim_buff { + +class FrameAssemblyDesc +{ +public: + struct SlotDesc + { + size_t offsetBytes; // offset from frame base + uint8_t* vaddr; // direct pointer into StagingBuffer memory + size_t nBytes; // slot capacity in bytes + }; + +public: + FrameAssemblyDesc() = default; + + FrameAssemblyDesc( + size_t n, size_t slotSize, + size_t frameStride, + std::vector slotList) + : numSlots(n), slotSizeBytes(slotSize), + frameStrideBytes(frameStride), + slots(std::move(slotList)) {} + + inline std::string stringify() const { + std::ostringstream oss; + oss << "FrameAssemblyDesc{" + << "numSlots=" << numSlots + << ", slotSizeBytes=" << slotSizeBytes + << ", frameStrideBytes=" << frameStrideBytes + << ", slots=["; + const size_t preview = slots.size() < 4 ? slots.size() : 4; + for (size_t i = 0; i < preview; ++i) { + oss << "{off=" << slots[i].offsetBytes + << ", nBytes=" << slots[i].nBytes + << ", vaddr=" << (const void*)slots[i].vaddr << "}"; + if (i + 1 < preview) oss << ","; + } + if (slots.size() > preview) oss << ", ..."; + oss << "]}"; + return oss.str(); + } + +public: + size_t numSlots; + size_t slotSizeBytes; + size_t frameStrideBytes; + std::vector slots; +}; + +} // namespace stim_buff +} // namespace smo + +#endif // _LIVOX_GEN1_FRAME_ASSEMBLY_DESC_H + + diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp new file mode 100644 index 0000000..407c3ce --- /dev/null +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp @@ -0,0 +1,123 @@ +#include +#include +#include +#include "IoUringAssemblyEngine.h" + +namespace smo { +namespace stim_buff { + +namespace { +struct DummyLivoxEthHeader +{ + uint8_t version; + uint8_t slot; + uint8_t id; + uint8_t rsvd; + uint32_t err_code; + uint8_t timestamp_type; + uint8_t data_type; + uint8_t timestamp[8]; +}; + +constexpr uint32_t kInvalidErrCode = 0xFFFFFFFFu; +constexpr uint8_t kInvalidTimestampType = 0xFFu; +constexpr uint8_t kInvalidDataType = 0xFFu; +} + +IoUringAssemblyEngine::IoUringAssemblyEngine() + : udpFd(-1), running(false), paused(false), frameCb(nullptr), + desc(nullptr), frameBase(nullptr), frameStrideBytes(0), + timeoutMs(15), frameIndex(0) + {} +IoUringAssemblyEngine::~IoUringAssemblyEngine() = default; + +bool IoUringAssemblyEngine::setSocketFromDevice(const std::shared_ptr& device) { + if (!device) return false; + // Expect device to expose pcloudDataSocketDesc() + udpFd = device->pcloudDataSocketDesc(); + return udpFd >= 0; +} + +bool IoUringAssemblyEngine::start(FrameAssemblyDesc& descRef, + uint8_t* frameBasePtr, + size_t frameStrideBytesArg, + uint32_t timeoutMsArg) { + if (udpFd < 0 || !frameBasePtr || descRef.slots.empty()) return false; + desc = &descRef; + frameBase = frameBasePtr; + frameStrideBytes = frameStrideBytesArg; + timeoutMs = timeoutMsArg; + paused.store(false); + running.store(true); + // Placeholder: actual io_uring submission/handling would run in a dedicated thread + // For now, assume synchronous loop stub + postReceives(); + handleCqesAndTimeout(); + return true; +} + +void IoUringAssemblyEngine::pause() { + paused.store(true); +} + +void IoUringAssemblyEngine::resume() { + if (!running.load()) return; + paused.store(false); + postReceives(); +} + +void IoUringAssemblyEngine::stop() { + running.store(false); +} + +void IoUringAssemblyEngine::onFrameComplete(FrameCompleteCallback cb) { + frameCb = std::move(cb); +} + +void IoUringAssemblyEngine::postReceives() { + // Intentionally left minimal; real implementation will submit N RECVMSG SQEs +} + +void IoUringAssemblyEngine::handleCqesAndTimeout() { + // Intentionally left minimal; real implementation will poll CQEs with timeout + cancelIncompleteAndFillDummies(); + if (frameCb) frameCb(frameIndex++); +} + +void IoUringAssemblyEngine::cancelIncompleteAndFillDummies() { + if (!desc) return; + for (size_t i = 0; i < desc->numSlots; ++i) { + // In the real path, decide from CQE accounting whether slot i completed. + // Here, demonstrate dummy header insertion API. + auto* hdr = reinterpret_cast(desc->slots[i].vaddr); + hdr->err_code = kInvalidErrCode; + hdr->timestamp_type = kInvalidTimestampType; + hdr->data_type = kInvalidDataType; + } +} + +size_t IoUringAssemblyEngine::computePointsPerDgram(int returnMode) +{ + /* Map modes to points per datagram based on Livox docs + * 1: first, 2: strongest -> 96 samples => 96 points + * 3: dual -> 48 samples * 2 points = 96 + * 4: triple -> 30 samples * 3 points = 90 + */ + switch (returnMode) + { + case livoxProto1::Device::ReturnMode::SingleFirst: + case livoxProto1::Device::ReturnMode::SingleStrongest: + case livoxProto1::Device::ReturnMode::Dual: + return 96u; + case livoxProto1::Device::ReturnMode::Triple: + return 90u; + default: + throw std::runtime_error(std::string(__func__) + + ": Unknown returnMode " + std::to_string(returnMode)); + } +} + +} // namespace stim_buff +} // namespace smo + + diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h new file mode 100644 index 0000000..6c16796 --- /dev/null +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h @@ -0,0 +1,72 @@ +#ifndef _LIVOX_GEN1_IOURING_ASSEMBLY_ENGINE_H +#define _LIVOX_GEN1_IOURING_ASSEMBLY_ENGINE_H + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "frameAssemblyDesc.h" + +namespace smo { +namespace stim_buff { + +class IoUringAssemblyEngine +{ +public: + using FrameCompleteCallback = std::function; + + IoUringAssemblyEngine(); + ~IoUringAssemblyEngine(); + + // Configure UDP socket by querying the device's pcloud data socket descriptor + bool setSocketFromDevice(const std::shared_ptr& device); + + // Start posting N RECVMSG SQEs based on the descriptor; timeoutMs defaults to 15ms + bool start( + FrameAssemblyDesc& desc, + uint8_t* frameBase, + size_t frameStrideBytes, + uint32_t timeoutMs = 15); + + void pause(); + void resume(); + void stop(); + + void onFrameComplete(FrameCompleteCallback cb); + + // Telemetry helpers + static size_t computePointsPerDgram(int returnMode); + static size_t computePointsPerFrame(int returnMode, size_t nDgramsPerFrame) + { return computePointsPerDgram(returnMode) * nDgramsPerFrame; } + +private: + int udpFd; + std::atomic running; + std::atomic paused; + FrameCompleteCallback frameCb; + + // Cached descriptor for reuse across iterations + FrameAssemblyDesc* desc; + uint8_t* frameBase; + size_t frameStrideBytes; + uint32_t timeoutMs; + size_t frameIndex; + + // Internal helpers + void postReceives(); + void handleCqesAndTimeout(); + void cancelIncompleteAndFillDummies(); +}; + +} // namespace stim_buff +} // namespace smo + +#endif // _LIVOX_GEN1_IOURING_ASSEMBLY_ENGINE_H + + diff --git a/stimBuffApis/livoxGen1/pcloudStimulusBuffer.h b/stimBuffApis/livoxGen1/pcloudStimulusBuffer.h index 83341ea..504b114 100644 --- a/stimBuffApis/livoxGen1/pcloudStimulusBuffer.h +++ b/stimBuffApis/livoxGen1/pcloudStimulusBuffer.h @@ -5,6 +5,7 @@ #include #include #include "stagingBuffer.h" +#include "ioUringAssemblyEngine.h" namespace smo { namespace stim_buff { @@ -46,6 +47,7 @@ public: std::shared_ptr device; PcloudFormatDesc formatDesc; StagingBuffer stagingBuffer; + IoUringAssemblyEngine ioUringAssemblyEngine; }; } // namespace stim_buff