#include #include #include #include "ioUringAssemblyEngine.h" namespace smo { namespace stim_buff { namespace { struct DummyLivoxEthHeader { uint8_t version; uint8_t slot; uint8_t id; uint8_t rsvd; uint32_t err_code; uint8_t timestamp_type; uint8_t data_type; uint8_t timestamp[8]; }; constexpr uint32_t kInvalidErrCode = 0xFFFFFFFFu; constexpr uint8_t kInvalidTimestampType = 0xFFu; constexpr uint8_t kInvalidDataType = 0xFFu; } IoUringAssemblyEngine::IoUringAssemblyEngine() : udpFd(-1), running(false), paused(false), frameCb(nullptr), desc(nullptr), frameBase(nullptr), frameStrideBytes(0), timeoutMs(15), frameIndex(0) {} bool IoUringAssemblyEngine::setSocketFromDevice( const std::shared_ptr& device ) { if (!device) { return false; } // Expect device to expose pcloudDataSocketDesc() udpFd = device->pcloudDataSocketDesc->native_handle(); return (udpFd >= 0); } bool IoUringAssemblyEngine::start( FrameAssemblyDesc& descRef, uint8_t* frameBasePtr, size_t frameStrideBytesArg, uint32_t timeoutMsArg ) { if (udpFd < 0 || !frameBasePtr || descRef.slots.empty()) { return false; } desc = &descRef; frameBase = frameBasePtr; frameStrideBytes = frameStrideBytesArg; timeoutMs = timeoutMsArg; paused.store(false); running.store(true); // Placeholder: actual io_uring submission/handling would run in a dedicated thread // For now, assume synchronous loop stub postReceives(); handleCqesAndTimeout(); return true; } void IoUringAssemblyEngine::pause() { paused.store(true); } void IoUringAssemblyEngine::resume() { if (!running.load()) { return; } paused.store(false); postReceives(); } void IoUringAssemblyEngine::stop() { running.store(false); } void IoUringAssemblyEngine::onFrameComplete(FrameCompleteCallback cb) { frameCb = std::move(cb); } void IoUringAssemblyEngine::postReceives() { // Intentionally left minimal; real implementation will submit N RECVMSG SQEs } void IoUringAssemblyEngine::handleCqesAndTimeout() { // Intentionally left minimal; real implementation will poll CQEs with timeout cancelIncompleteAndFillDummies(); if (frameCb) { frameCb(frameIndex++); } } void IoUringAssemblyEngine::cancelIncompleteAndFillDummies() { if (!desc) { return; } for (size_t i = 0; i < desc->numSlots; ++i) { // In the real path, decide from CQE accounting whether slot i completed. // Here, demonstrate dummy header insertion API. auto* hdr = reinterpret_cast(desc->slots[i].vaddr); hdr->err_code = kInvalidErrCode; hdr->timestamp_type = kInvalidTimestampType; hdr->data_type = kInvalidDataType; } } size_t IoUringAssemblyEngine::computePointsPerDgram(int returnMode) { /* * Map modes to points per datagram based on Livox docs * 1: first, 2: strongest -> 96 samples => 96 points * 3: dual -> 48 samples * 2 points = 96 * 4: triple -> 30 samples * 3 points = 90 */ switch (returnMode) { case static_cast(livoxProto1::Device::ReturnMode::SingleFirst): case static_cast(livoxProto1::Device::ReturnMode::SingleStrongest): case static_cast(livoxProto1::Device::ReturnMode::Dual): return 96u; case static_cast(livoxProto1::Device::ReturnMode::Triple): return 90u; default: throw std::runtime_error( std::string(__func__) + ": Unknown returnMode " + std::to_string(returnMode)); } } } // namespace stim_buff } // namespace smo