From 010ba9c7bd04761a75ac670212dc3488e404af97 Mon Sep 17 00:00:00 2001 From: Hayodea Hekol Date: Sun, 9 Nov 2025 00:55:58 -0400 Subject: [PATCH] Bugfix,IoUringEngn: fill unassembled slots w/dummy; use separate iovecs We implemented the feature to fill unassembled slots w/dummy header values for the livox pcloud header. We also fixed a bug where io uring was writing into the last slot only because we were using the same iovec for every SQE. --- .../livoxGen1/ioUringAssemblyEngine.cpp | 150 ++++++++++++++---- .../livoxGen1/ioUringAssemblyEngine.h | 19 ++- .../livoxGen1/pcloudStimulusBuffer.cpp | 2 +- 3 files changed, 139 insertions(+), 32 deletions(-) diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp index b32034c..2a8381b 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp @@ -1,6 +1,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -30,13 +33,18 @@ inline LivoxProto1DllState& getLivoxProto1State() { return livoxProto1; } struct DummyLivoxEthHeader { - enum : uint32_t { - INVALID_ERR_CODE = 0xFFFFFFFFu - }; - enum : uint8_t { - INVALID_TIMESTAMP_TYPE = 0xFFu, - INVALID_DATA_TYPE = 0xFFu - }; + DummyLivoxEthHeader() + : version(0xFF), slot(0xFF), id(0xFF), rsvd(0xFF) + {} + + static bool isDummy(const DummyLivoxEthHeader& hdr) + { + return hdr.version == 0xFF || hdr.slot == 0xFF || hdr.id == 0xFF + || hdr.rsvd == 0xFF; + } + + static bool isValid(const DummyLivoxEthHeader& hdr) + { return !isDummy(hdr); } uint8_t version, slot, id, rsvd; uint32_t err_code; @@ -44,13 +52,16 @@ struct DummyLivoxEthHeader uint8_t timestamp[8]; }; -IoUringAssemblyEngine::IoUringAssemblyEngine(PcloudStimulusBuffer& parent_) +IoUringAssemblyEngine::IoUringAssemblyEngine( + PcloudStimulusBuffer& parent_, size_t nDgramsPerStagingBufferFrame_) : parent(parent_), frameAssemblyDesc(nullptr), ring{}, isSetup(false), eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0), stallTimer(parent_.device->componentThread->getIoService()), -isAssembling(false) +isAssembling(false), +nDgramsPerStagingBufferFrame(nDgramsPerStagingBufferFrame_), +assembledSlotsTracker(nDgramsPerStagingBufferFrame_) {} bool IoUringAssemblyEngine::setup() @@ -78,6 +89,23 @@ bool IoUringAssemblyEngine::setup() if (udpFd < 0) { return false; } + // Set up iovecs for each slot + for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i) + { + assembledSlotsTracker[i].assembled = false; + assembledSlotsTracker[i].msgHdr = {}; + assembledSlotsTracker[i].msgHdr.msg_iov = + &assembledSlotsTracker[i].ioVec; + assembledSlotsTracker[i].msgHdr.msg_iovlen = 1; + } + + for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i) + { + const auto& slot = frameAssemblyDesc->slots[i]; + assembledSlotsTracker[i].ioVec.iov_base = slot.vaddr; + assembledSlotsTracker[i].ioVec.iov_len = slot.nBytes; + } + // Declare iovec early to avoid goto crossing initialization struct iovec iov; int ret; @@ -175,6 +203,11 @@ void IoUringAssemblyEngine::resetAndAssembleFrame( // Store the callback for re-arming onCqeReadyCallback = std::move(onCqeReady); + // Reset all assembled slots tracker to false + for (auto& slotDesc : assembledSlotsTracker) { + slotDesc.assembled = false; + } + /** EXPLANATION: * Flush eventfd state: poll and read any pending events before creating * descriptor. @@ -221,17 +254,7 @@ void IoUringAssemblyEngine::resetAndAssembleFrame( + ": failed to get SQE for slot " + std::to_string(i)); } - const auto& slot = frameAssemblyDesc->slots[i]; - - // Prepare recvmsg SQE for this slot - struct msghdr msg = {}; - struct iovec iov; - iov.iov_base = slot.vaddr; - iov.iov_len = slot.nBytes; - msg.msg_iov = &iov; - msg.msg_iovlen = 1; - - io_uring_prep_recvmsg(sqe, udpFd, &msg, 0); + io_uring_prep_recvmsg(sqe, udpFd, &assembledSlotsTracker[i].msgHdr, 0); // Set user_data to slot index for tracking io_uring_sqe_set_data(sqe, reinterpret_cast(i)); } @@ -444,10 +467,15 @@ public: std::shared_ptr context, void *user_data, int cqe_result) { - (void)user_data; // Not used - we just track success/failure counts + // Extract index from user_data and mark slot as assembled if successful + size_t index = reinterpret_cast(user_data); + bool success = (cqe_result >= 0); + + if (success && index < context->engine.assembledSlotsTracker.size()) { + context->engine.assembledSlotsTracker[index].assembled = true; + } // Caller decides success: result >= 0 means success - bool success = (cqe_result >= 0); if (context->loop.incrementSuccessOrFailureAndTestForCompletionDueTo( success)) { @@ -478,6 +506,9 @@ public: return; } + // Fill un-assembled slots with dummy datagrams + context->engine.fillUnAssembledSlotsWithDummyDgrams(); + if (context->loop.nSucceeded.load() >= context->loop.nTotal) { // Success: all or more slots succeeded @@ -598,19 +629,80 @@ void IoUringAssemblyEngine::onEventfdRead( std::placeholders::_2)); } -void IoUringAssemblyEngine::cancelIncompleteAndFillDummies() +void IoUringAssemblyEngine::fillUnAssembledSlotsWithDummyDgrams() { if (!frameAssemblyDesc) { return; } for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i) { - // In the real path, decide from CQE accounting whether slot i completed. - // Here, demonstrate dummy header insertion API. - auto* hdr = reinterpret_cast(frameAssemblyDesc->slots[i].vaddr); - hdr->err_code = DummyLivoxEthHeader::INVALID_ERR_CODE; - hdr->timestamp_type = DummyLivoxEthHeader::INVALID_TIMESTAMP_TYPE; - hdr->data_type = DummyLivoxEthHeader::INVALID_DATA_TYPE; + // Only fill slots that were not successfully assembled + if (i >= assembledSlotsTracker.size() + || assembledSlotsTracker[i].assembled) + { + continue; + } + + // Placement construct DummyLivoxEthHeader in the slot + new (frameAssemblyDesc->slots[i].vaddr) DummyLivoxEthHeader(); + } +} + +void IoUringAssemblyEngine::printSlotBytes(size_t slotIndex, size_t nBytes) +{ + if (!frameAssemblyDesc) + { + std::cerr << __func__ << ": frameAssemblyDesc is null" << std::endl; + return; + } + + if (slotIndex >= frameAssemblyDesc->numSlots) + { + std::cerr << __func__ << ": slotIndex " << slotIndex + << " out of range (numSlots=" << frameAssemblyDesc->numSlots + << ")" << std::endl; + return; + } + + const auto& slot = frameAssemblyDesc->slots[slotIndex]; + size_t bytesToPrint = std::min(nBytes, static_cast(slot.nBytes)); + const uint8_t* data = reinterpret_cast(slot.vaddr); + + std::cout << __func__ << ": Slot " << slotIndex << " vaddr=" << (void*)slot.vaddr + << " (" << bytesToPrint + << " bytes):" << std::endl; + + // Print hex dump format: offset | hex bytes | ASCII + const size_t bytesPerLine = 16; + for (size_t offset = 0; offset < bytesToPrint; offset += bytesPerLine) + { + // Print offset + std::cout << std::hex << std::setfill('0') << std::setw(4) + << offset << ": "; + + // Print hex bytes + for (size_t i = 0; i < bytesPerLine; ++i) + { + if (offset + i < bytesToPrint) + { + std::cout << std::setw(2) << static_cast(data[offset + i]) + << " "; + } + else + { + std::cout << " "; + } + } + + // Print ASCII representation + std::cout << " |"; + for (size_t i = 0; i < bytesPerLine && offset + i < bytesToPrint; ++i) + { + uint8_t byte = data[offset + i]; + char c = (byte >= 32 && byte < 127) ? static_cast(byte) : '.'; + std::cout << c; + } + std::cout << "|" << std::dec << std::endl; } } diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h index 9481d3b..749b79f 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h @@ -28,7 +28,8 @@ class PcloudStimulusBuffer; class IoUringAssemblyEngine { public: - explicit IoUringAssemblyEngine(PcloudStimulusBuffer& parent); + explicit IoUringAssemblyEngine( + PcloudStimulusBuffer& parent, size_t nDgramsPerStagingBufferFrame); ~IoUringAssemblyEngine() = default; bool setup(); @@ -72,7 +73,21 @@ private: SpinLock isAssemblingLock; bool isAssembling; - void cancelIncompleteAndFillDummies(); + // Number of datagrams per staging buffer frame + size_t nDgramsPerStagingBufferFrame; + + struct SlotAssemblyDesc + { + bool assembled; + struct msghdr msgHdr; + struct iovec ioVec; + }; + + // Track which slots have been successfully assembled and maintain persistent iovecs + std::vector assembledSlotsTracker; + + void fillUnAssembledSlotsWithDummyDgrams(); + void printSlotBytes(size_t slotIndex, size_t nBytes); void onEventfdRead( const boost::system::error_code& error, std::size_t bytes_transferred); diff --git a/stimBuffApis/livoxGen1/pcloudStimulusBuffer.cpp b/stimBuffApis/livoxGen1/pcloudStimulusBuffer.cpp index 825a390..a4c1624 100644 --- a/stimBuffApis/livoxGen1/pcloudStimulusBuffer.cpp +++ b/stimBuffApis/livoxGen1/pcloudStimulusBuffer.cpp @@ -33,7 +33,7 @@ assemblyBuffer( StagingBuffer::IOEngineConstraints::ioUringConstraints, StagingBuffer::IOEngineConstraints::openClInputConstraints, nDgramsPerStagingBufferFrame), -ioUringAssemblyEngine(*this), +ioUringAssemblyEngine(*this, nDgramsPerStagingBufferFrame), collationBuffer( StagingBuffer::IOEngineConstraints::openClInputConstraints, StagingBuffer::IOEngineConstraints::openClInputConstraints,