Bugfix,IoUringEngn: fill unassembled slots w/dummy; use separate iovecs

We implemented the feature to fill unassembled slots w/dummy header
values for the livox pcloud header.

We also fixed a bug where io uring was writing into the last slot
only because we were using the same iovec for every SQE.
This commit is contained in:
2025-11-09 00:55:58 -04:00
parent 72a3415553
commit 010ba9c7bd
3 changed files with 139 additions and 32 deletions
+121 -29
View File
@@ -1,6 +1,9 @@
#include <boostAsioLinkageFix.h>
#include <config.h>
#include <opts.h>
#include <algorithm>
#include <iostream>
#include <iomanip>
#include <cstring>
#include <stdexcept>
#include <functional>
@@ -30,13 +33,18 @@ inline LivoxProto1DllState& getLivoxProto1State() { return livoxProto1; }
struct DummyLivoxEthHeader
{
enum : uint32_t {
INVALID_ERR_CODE = 0xFFFFFFFFu
};
enum : uint8_t {
INVALID_TIMESTAMP_TYPE = 0xFFu,
INVALID_DATA_TYPE = 0xFFu
};
DummyLivoxEthHeader()
: version(0xFF), slot(0xFF), id(0xFF), rsvd(0xFF)
{}
static bool isDummy(const DummyLivoxEthHeader& hdr)
{
return hdr.version == 0xFF || hdr.slot == 0xFF || hdr.id == 0xFF
|| hdr.rsvd == 0xFF;
}
static bool isValid(const DummyLivoxEthHeader& hdr)
{ return !isDummy(hdr); }
uint8_t version, slot, id, rsvd;
uint32_t err_code;
@@ -44,13 +52,16 @@ struct DummyLivoxEthHeader
uint8_t timestamp[8];
};
IoUringAssemblyEngine::IoUringAssemblyEngine(PcloudStimulusBuffer& parent_)
IoUringAssemblyEngine::IoUringAssemblyEngine(
PcloudStimulusBuffer& parent_, size_t nDgramsPerStagingBufferFrame_)
: parent(parent_),
frameAssemblyDesc(nullptr), ring{},
isSetup(false),
eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0),
stallTimer(parent_.device->componentThread->getIoService()),
isAssembling(false)
isAssembling(false),
nDgramsPerStagingBufferFrame(nDgramsPerStagingBufferFrame_),
assembledSlotsTracker(nDgramsPerStagingBufferFrame_)
{}
bool IoUringAssemblyEngine::setup()
@@ -78,6 +89,23 @@ bool IoUringAssemblyEngine::setup()
if (udpFd < 0)
{ return false; }
// Set up iovecs for each slot
for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i)
{
assembledSlotsTracker[i].assembled = false;
assembledSlotsTracker[i].msgHdr = {};
assembledSlotsTracker[i].msgHdr.msg_iov =
&assembledSlotsTracker[i].ioVec;
assembledSlotsTracker[i].msgHdr.msg_iovlen = 1;
}
for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i)
{
const auto& slot = frameAssemblyDesc->slots[i];
assembledSlotsTracker[i].ioVec.iov_base = slot.vaddr;
assembledSlotsTracker[i].ioVec.iov_len = slot.nBytes;
}
// Declare iovec early to avoid goto crossing initialization
struct iovec iov;
int ret;
@@ -175,6 +203,11 @@ void IoUringAssemblyEngine::resetAndAssembleFrame(
// Store the callback for re-arming
onCqeReadyCallback = std::move(onCqeReady);
// Reset all assembled slots tracker to false
for (auto& slotDesc : assembledSlotsTracker) {
slotDesc.assembled = false;
}
/** EXPLANATION:
* Flush eventfd state: poll and read any pending events before creating
* descriptor.
@@ -221,17 +254,7 @@ void IoUringAssemblyEngine::resetAndAssembleFrame(
+ ": failed to get SQE for slot " + std::to_string(i));
}
const auto& slot = frameAssemblyDesc->slots[i];
// Prepare recvmsg SQE for this slot
struct msghdr msg = {};
struct iovec iov;
iov.iov_base = slot.vaddr;
iov.iov_len = slot.nBytes;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
io_uring_prep_recvmsg(sqe, udpFd, &msg, 0);
io_uring_prep_recvmsg(sqe, udpFd, &assembledSlotsTracker[i].msgHdr, 0);
// Set user_data to slot index for tracking
io_uring_sqe_set_data(sqe, reinterpret_cast<void*>(i));
}
@@ -444,10 +467,15 @@ public:
std::shared_ptr<AssembleFrameReq> context,
void *user_data, int cqe_result)
{
(void)user_data; // Not used - we just track success/failure counts
// Extract index from user_data and mark slot as assembled if successful
size_t index = reinterpret_cast<size_t>(user_data);
bool success = (cqe_result >= 0);
if (success && index < context->engine.assembledSlotsTracker.size()) {
context->engine.assembledSlotsTracker[index].assembled = true;
}
// Caller decides success: result >= 0 means success
bool success = (cqe_result >= 0);
if (context->loop.incrementSuccessOrFailureAndTestForCompletionDueTo(
success))
{
@@ -478,6 +506,9 @@ public:
return;
}
// Fill un-assembled slots with dummy datagrams
context->engine.fillUnAssembledSlotsWithDummyDgrams();
if (context->loop.nSucceeded.load() >= context->loop.nTotal)
{
// Success: all or more slots succeeded
@@ -598,19 +629,80 @@ void IoUringAssemblyEngine::onEventfdRead(
std::placeholders::_2));
}
void IoUringAssemblyEngine::cancelIncompleteAndFillDummies()
void IoUringAssemblyEngine::fillUnAssembledSlotsWithDummyDgrams()
{
if (!frameAssemblyDesc)
{ return; }
for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i)
{
// In the real path, decide from CQE accounting whether slot i completed.
// Here, demonstrate dummy header insertion API.
auto* hdr = reinterpret_cast<DummyLivoxEthHeader*>(frameAssemblyDesc->slots[i].vaddr);
hdr->err_code = DummyLivoxEthHeader::INVALID_ERR_CODE;
hdr->timestamp_type = DummyLivoxEthHeader::INVALID_TIMESTAMP_TYPE;
hdr->data_type = DummyLivoxEthHeader::INVALID_DATA_TYPE;
// Only fill slots that were not successfully assembled
if (i >= assembledSlotsTracker.size()
|| assembledSlotsTracker[i].assembled)
{
continue;
}
// Placement construct DummyLivoxEthHeader in the slot
new (frameAssemblyDesc->slots[i].vaddr) DummyLivoxEthHeader();
}
}
void IoUringAssemblyEngine::printSlotBytes(size_t slotIndex, size_t nBytes)
{
if (!frameAssemblyDesc)
{
std::cerr << __func__ << ": frameAssemblyDesc is null" << std::endl;
return;
}
if (slotIndex >= frameAssemblyDesc->numSlots)
{
std::cerr << __func__ << ": slotIndex " << slotIndex
<< " out of range (numSlots=" << frameAssemblyDesc->numSlots
<< ")" << std::endl;
return;
}
const auto& slot = frameAssemblyDesc->slots[slotIndex];
size_t bytesToPrint = std::min(nBytes, static_cast<size_t>(slot.nBytes));
const uint8_t* data = reinterpret_cast<const uint8_t*>(slot.vaddr);
std::cout << __func__ << ": Slot " << slotIndex << " vaddr=" << (void*)slot.vaddr
<< " (" << bytesToPrint
<< " bytes):" << std::endl;
// Print hex dump format: offset | hex bytes | ASCII
const size_t bytesPerLine = 16;
for (size_t offset = 0; offset < bytesToPrint; offset += bytesPerLine)
{
// Print offset
std::cout << std::hex << std::setfill('0') << std::setw(4)
<< offset << ": ";
// Print hex bytes
for (size_t i = 0; i < bytesPerLine; ++i)
{
if (offset + i < bytesToPrint)
{
std::cout << std::setw(2) << static_cast<unsigned>(data[offset + i])
<< " ";
}
else
{
std::cout << " ";
}
}
// Print ASCII representation
std::cout << " |";
for (size_t i = 0; i < bytesPerLine && offset + i < bytesToPrint; ++i)
{
uint8_t byte = data[offset + i];
char c = (byte >= 32 && byte < 127) ? static_cast<char>(byte) : '.';
std::cout << c;
}
std::cout << "|" << std::dec << std::endl;
}
}
+17 -2
View File
@@ -28,7 +28,8 @@ class PcloudStimulusBuffer;
class IoUringAssemblyEngine
{
public:
explicit IoUringAssemblyEngine(PcloudStimulusBuffer& parent);
explicit IoUringAssemblyEngine(
PcloudStimulusBuffer& parent, size_t nDgramsPerStagingBufferFrame);
~IoUringAssemblyEngine() = default;
bool setup();
@@ -72,7 +73,21 @@ private:
SpinLock isAssemblingLock;
bool isAssembling;
void cancelIncompleteAndFillDummies();
// Number of datagrams per staging buffer frame
size_t nDgramsPerStagingBufferFrame;
struct SlotAssemblyDesc
{
bool assembled;
struct msghdr msgHdr;
struct iovec ioVec;
};
// Track which slots have been successfully assembled and maintain persistent iovecs
std::vector<SlotAssemblyDesc> assembledSlotsTracker;
void fillUnAssembledSlotsWithDummyDgrams();
void printSlotBytes(size_t slotIndex, size_t nBytes);
void onEventfdRead(
const boost::system::error_code& error, std::size_t bytes_transferred);
@@ -33,7 +33,7 @@ assemblyBuffer(
StagingBuffer::IOEngineConstraints::ioUringConstraints,
StagingBuffer::IOEngineConstraints::openClInputConstraints,
nDgramsPerStagingBufferFrame),
ioUringAssemblyEngine(*this),
ioUringAssemblyEngine(*this, nDgramsPerStagingBufferFrame),
collationBuffer(
StagingBuffer::IOEngineConstraints::openClInputConstraints,
StagingBuffer::IOEngineConstraints::openClInputConstraints,