Files
salmanoff/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp
T
hayodea b3743560bb IoUringAssmEngn: detect assembly end condition w/eventfdDesc validity
We can simplify and universalize the logic here by acknowledging that
assemblyCycleComplete() will always destroy the current eventfdDesc
object, so we can just check that to see whether we should continue
the assembly cycle.
2025-11-15 22:02:30 -04:00

879 lines
25 KiB
C++

#include <boostAsioLinkageFix.h>
#include <config.h>
#include <opts.h>
#include <algorithm>
#include <iostream>
#include <iomanip>
#include <cstring>
#include <stdexcept>
#include <functional>
#include <random>
#include <sys/socket.h>
#include <sys/eventfd.h>
#include <sys/uio.h>
#include <sys/poll.h>
#include <unistd.h>
#include <errno.h>
#include <boost/system/error_code.hpp>
#include <livoxProto1/device.h>
#include <livoxProto1/livoxProto1.h>
#include <asynchronousContinuation.h>
#include <asynchronousLoop.h>
#include <asynchronousBridge.h>
#include <callback.h>
#include <callableTracer.h>
#include <spinLock.h>
#include "ioUringAssemblyEngine.h"
#include "pcloudStimulusProducer.h"
#include "livoxGen1.h"
// #define REGISTER_IOURING_BUFFERS
namespace smo {
namespace stim_buff {
inline LivoxProto1DllState& getLivoxProto1State() { return livoxProto1; }
struct DummyLivoxEthHeader
{
DummyLivoxEthHeader()
: version(0xFF), slot(0xFF), id(0xFF), rsvd(0xFF)
{}
static bool isDummy(const DummyLivoxEthHeader& hdr)
{
return hdr.version == 0xFF || hdr.slot == 0xFF || hdr.id == 0xFF
|| hdr.rsvd == 0xFF;
}
static bool isValid(const DummyLivoxEthHeader& hdr)
{ return !isDummy(hdr); }
uint8_t version, slot, id, rsvd;
uint32_t err_code;
uint8_t timestamp_type, data_type;
uint8_t timestamp[8];
};
IoUringAssemblyEngine::IoUringAssemblyEngine(
PcloudStimulusProducer& parent_, size_t nDgramsPerStagingBufferFrame_)
: parent(parent_),
frameAssemblyDesc(nullptr), ring{},
eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0),
stallTimer(parent_.device->componentThread->getIoService()),
shouldAcceptRequests(false),
nDgramsPerStagingBufferFrame(nDgramsPerStagingBufferFrame_),
assembledSlotsTracker(nDgramsPerStagingBufferFrame_),
randomDevice(), randomGenerator(randomDevice())
{}
bool IoUringAssemblyEngine::setup()
{
// Defensive check to prevent double-calling
{
SpinLock::Guard lock(shouldAcceptRequestsLock);
if (shouldAcceptRequests)
{
throw std::runtime_error(std::string(__func__) + ": setup() called "
"while already set up");
}
}
// Get FrameAssemblyDesc from staging buffer
frameAssemblyDesc = static_cast<std::shared_ptr<FrameAssemblyDesc>>(
parent.assemblyBuffer);
if (!frameAssemblyDesc || frameAssemblyDesc->slots.empty())
{ return false; }
// Get point cloud data socket descriptor from UdpCommandDemuxer
auto& livoxState = getLivoxProto1State();
if (!livoxState.livoxProto1_getPcloudDataFdDesc)
{ return false; }
pcloudDataFdDesc = (*livoxState.livoxProto1_getPcloudDataFdDesc)();
if (!pcloudDataFdDesc)
{ return false; }
// Get UDP socket file descriptor
int udpFd = pcloudDataFdDesc->native_handle();
if (udpFd < 0)
{ return false; }
// Set up iovecs for each slot
for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i)
{
assembledSlotsTracker[i].assembled = false;
assembledSlotsTracker[i].msgHdr = {};
assembledSlotsTracker[i].msgHdr.msg_iov =
&assembledSlotsTracker[i].ioVec;
assembledSlotsTracker[i].msgHdr.msg_iovlen = 1;
}
for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i)
{
const auto& slot = frameAssemblyDesc->slots[i];
assembledSlotsTracker[i].ioVec.iov_base = slot.vaddr;
assembledSlotsTracker[i].ioVec.iov_len = slot.nBytes;
}
// Declare iovec early to avoid goto crossing initialization
#ifdef REGISTER_IOURING_BUFFERS
struct iovec iov;
#endif
int ret;
/** EXPLANATION:
* Initialize io_uring ring - allocate SQEs and CQEs for one frame assembly
* One SQE per slot (one datagram per slot), plus one extra for cancel
* operations, since io_uring_prep_cancel() requires a valid SQE. So we
* alloc 1 extra SQE to guarantee that we will always have an available SQE
* for cancel operations.
*/
ret = io_uring_queue_init(
static_cast<unsigned int>(frameAssemblyDesc->numSlots + 1), &ring, 0);
if (ret < 0)
{ goto cleanup; }
#ifdef REGISTER_IOURING_BUFFERS
// Register staging buffer with io_uring for DMA-apt I/O
iov = parent.assemblyBuffer.getIoUringRegisterIoVec();
ret = io_uring_register_buffers(&ring, &iov, 1);
if (ret < 0)
{ goto cleanup_ring; }
#endif
// Create eventfd for CQE notifications (used with boost's unified loop)
eventfdFd = eventfd(0, EFD_NONBLOCK);
if (eventfdFd < 0)
{
#ifdef REGISTER_IOURING_BUFFERS
goto cleanup_buffers;
#else
goto cleanup_ring;
#endif
}
// Register eventfd with io_uring
ret = io_uring_register_eventfd(&ring, eventfdFd);
if (ret < 0)
{ goto cleanup_eventfd; }
shouldAcceptRequests = true;
return true;
cleanup_eventfd:
close(eventfdFd);
eventfdFd = -1;
#ifdef REGISTER_IOURING_BUFFERS
cleanup_buffers:
io_uring_unregister_buffers(&ring);
#endif
cleanup_ring:
io_uring_queue_exit(&ring);
cleanup:
return false;
}
void IoUringAssemblyEngine::finalize()
{
bool wasAcceptingRequests = stop();
if (eventfdFd >= 0)
{
io_uring_unregister_eventfd(&ring);
close(eventfdFd);
eventfdFd = -1;
}
if (wasAcceptingRequests)
{
#ifdef REGISTER_IOURING_BUFFERS
io_uring_unregister_buffers(&ring);
#endif
io_uring_queue_exit(&ring);
}
// Reset state to allow setup() to be called again
frameAssemblyDesc = nullptr;
}
void IoUringAssemblyEngine::resetAndAssembleFrame(
resetAndAssembleFrameCbFn onCqeReady)
{
if (!onCqeReady)
{
throw std::runtime_error(std::string(__func__)
+ ": onCqeReady callback is invalid");
}
if (!shouldAcceptRequests)
{
throw std::runtime_error(std::string(__func__)
+ ": engine is not accepting requests");
}
// eventfdDesc should not be valid when resetAndAssembleFrame is called
if (eventfdDesc)
{
throw std::runtime_error(std::string(__func__)
+ ": eventfdDesc is already set");
}
// Store the callback for re-arming
onCqeReadyCallback = std::move(onCqeReady);
// Reset all assembled slots tracker to false
for (auto& slotDesc : assembledSlotsTracker) {
slotDesc.assembled = false;
}
/** EXPLANATION:
* Flush eventfd state: poll and read any pending events before creating
* descriptor.
* Use poll() to check if data is available (non-blocking check).
* If data is available, read it to flush.
*/
struct pollfd pfd;
pfd.fd = eventfdFd;
pfd.events = POLLIN;
pfd.revents = 0;
int poll_ret = poll(&pfd, 1, 0); // Timeout 0 = non-blocking
if (poll_ret > 0 && (pfd.revents & POLLIN))
{
uint64_t discard;
ssize_t ret = read(eventfdFd, &discard, sizeof(discard));
(void)ret; // Ignore errors - just trying to flush
}
eventfdDesc = std::make_unique<boost::asio::posix::stream_descriptor>(
parent.device->componentThread->getIoService(), eventfdFd);
if (!eventfdDesc)
{
throw std::runtime_error(std::string(__func__)
+ ": failed to create eventfd stream descriptor");
}
// Get UDP socket file descriptor
int udpFd = pcloudDataFdDesc->native_handle();
if (udpFd < 0)
{
throw std::runtime_error(std::string(__func__)
+ ": invalid UDP socket file descriptor");
}
// Prepare SQEs for each slot in the frame
struct io_uring_sqe *sqe;
for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i)
{
sqe = io_uring_get_sqe(&ring);
if (!sqe)
{
throw std::runtime_error(std::string(__func__)
+ ": failed to get SQE for slot " + std::to_string(i));
}
io_uring_prep_recvmsg(sqe, udpFd, &assembledSlotsTracker[i].msgHdr, 0);
// Set user_data to slot index for tracking
io_uring_sqe_set_data(sqe, reinterpret_cast<void*>(i));
}
// Submit all SQEs
int ret = io_uring_submit(&ring);
if (ret < 0)
{
throw std::runtime_error(std::string(__func__)
+ ": io_uring_submit failed: " + std::strerror(errno)
+ " (errno=" + std::to_string(errno) + ")");
}
// Start listening for CQE notifications on eventfd
eventfdDesc->async_read_some(
boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)),
std::bind(
&IoUringAssemblyEngine::onEventfdRead, this,
std::placeholders::_1,
std::placeholders::_2));
}
bool IoUringAssemblyEngine::stop()
{
// Acquire and release lock tightly around setting the flag
SpinLock::Guard lock(shouldAcceptRequestsLock);
bool wasAcceptingRequests = shouldAcceptRequests;
shouldAcceptRequests = false;
return wasAcceptingRequests;
}
void IoUringAssemblyEngine::assemblyCycleComplete()
{
// Cancel in-flight stall timeout timer
stallTimer.cancel();
onCqeReadyCallback = std::move([](void *, int){});
if (frameAssemblyDesc)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
if (!sqe)
{
std::cerr << __func__ << ": failed to get SQE for cancel op. "
<< "Continuing cleanup without cancelling.\n";
goto cleanup_eventfd;
}
/* Cancel all in-flight operations on our ring
* using IORING_ASYNC_CANCEL_ANY. Identify the CQE for the cancel
* op as numSlots since numSlots is an invalid slot index for a
* real slot.
*/
io_uring_prep_cancel(
sqe, reinterpret_cast<void*>(frameAssemblyDesc->numSlots),
IORING_ASYNC_CANCEL_ANY);
io_uring_submit(&ring);
/* Wait for cancellation to complete. According to the man page,
* cancellation is synchronous and a CQE is guaranteed to be
* generated by the time submission returns.
*/
struct io_uring_cqe *cqe;
bool sawCancelCqe = false;
while (io_uring_peek_cqe(&ring, &cqe) == 0)
{
// Call seen() on all CQEs for completeness/correctness.
io_uring_cqe_seen(&ring, cqe);
void *user_data = io_uring_cqe_get_data(cqe);
if (user_data == reinterpret_cast<void*>(
frameAssemblyDesc->numSlots))
{
sawCancelCqe = true;
}
}
if (!sawCancelCqe && smoHooksPtr->OptionParser_getOptions().verbose) {
std::cerr << __func__ << ": no CQE seen for cancel operation\n";
}
}
cleanup_eventfd:
if (eventfdDesc)
{
/** EXPLANATION:
* The goal here is to ensure that our io_service's event loop will not
* get any events from the eventfd after we've called
* assemblyCycleComplete(). So we completely deinitialize the eventfd
* descriptor.
*
* But we still want to reuse the underlying eventfd file descriptor,
* itself in the next resetAndAssembleFrame() cycle, so we call
* release() instead of reset() to ensure that the underlying fd
* is not closed.
*
* However, we need to close the descriptor's association with the
* io_service before releasing it, otherwise Boost.Asio will complain
* when we try to create a new descriptor with the same fd.
*/
/** CAVEAT:
* There's a rare but real race condition here where the eventfd gets an
* event signaled on it, and while boost is internally processing that
* event to enqeue our handler, we call cancel() and release() here.
* If boost internally has locking on the stream_descriptor object,
* this should be fine. But just in case it doesn't, I'm just
* documenting that possibility here.
*
* There's nothing we can really do about it except know that it would
* be very rarely happen; and that we can't do anything about it short
* of modifying the boost.Asio code.
*/
eventfdDesc->cancel();
eventfdDesc->release();
/* Destroy the descriptor object (now that it's unregistered, destroying
* it won't close the fd since release() transferred ownership back)
*/
eventfdDesc.reset();
}
}
// Continuation class for assembleFrameReq
class IoUringAssemblyEngine::AssembleFrameReq
: public PostedAsynchronousContinuation<
IoUringAssemblyEngine::assembleFrameReqCbFn>
{
public:
AssembleFrameReq(
IoUringAssemblyEngine& engine_,
const std::shared_ptr<ComponentThread>& caller,
Callback<IoUringAssemblyEngine::assembleFrameReqCbFn> cb)
: PostedAsynchronousContinuation<
IoUringAssemblyEngine::assembleFrameReqCbFn>(caller, cb),
engine(engine_),
loop(engine_.frameAssemblyDesc->numSlots),
timerFired(false), handlerExecuted(false)
{}
void callOriginalCallback(bool success, AsynchronousLoop loop)
{
callOriginalCb(success, loop);
}
public:
void assembleFrameReq1_posted(
std::shared_ptr<AssembleFrameReq> context)
{
SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
if (!engine.shouldAcceptRequests)
{
context->callOriginalCallback(false, AsynchronousLoop(0));
return;
}
// Initialize loop with number of slots
context->loop = AsynchronousLoop(engine.frameAssemblyDesc->numSlots);
/** FIXME:
* I'm suspicious of this std::bind return object here. What if us
* setting it to null inside of stop() doesn't actually cause the
* object to be destroyed? This would cause this contin's sh_ptr's
* reference count to never reach 0, causing a memory leak.
*/
engine.resetAndAssembleFrame(
std::bind(&AssembleFrameReq::assembleFrameReq2_2,
context.get(), context,
std::placeholders::_1, std::placeholders::_2));
// Set up timeout timer for CONFIG_STIMBUFF_FRAME_PERIOD_MS/2 ms
engine.stallTimer.expires_from_now(
boost::posix_time::milliseconds(
CONFIG_STIMBUFF_FRAME_PERIOD_MS / 2));
engine.stallTimer.async_wait(
std::bind(&AssembleFrameReq::assembleFrameReq2_1,
context.get(), context,
std::placeholders::_1));
}
void assembleFrameReq2_1(
std::shared_ptr<AssembleFrameReq> context,
const boost::system::error_code& error)
{
// Check if timer was cancelled (ignore if operation_aborted)
if (error == boost::asio::error::operation_aborted) { return; }
/** EXPLANATION:
* This lock acquisition here will also cover the call to
* assembleFrameReq3 below. Because of that, it means that the
* requirement that the lock be held while accessing
* the metadata that's destroyed in stop() is satisfied.
*
* In theory though, we shouldn't need to hold the lock into
* assembleFrameReq3 below because that function doesn't really access
* any state that's destroyed in stop()? But I'm not sure, and we have
* indeed seen a SEGFAULT even in the current code with locking, so
* I'm going to hold the lock here for now.
*/
SpinLock::Guard lock(context->engine.shouldAcceptRequestsLock);
if (!context->engine.shouldAcceptRequests)
{
context->engine.assemblyCycleComplete();
context->loop.setRemainingIterationsToFailure();
context->callOriginalCallback(false, context->loop);
return;
}
// Set timer fired flag
context->timerFired.store(true);
context->assembleFrameReq3(context);
}
void assembleFrameReq2_2(
std::shared_ptr<AssembleFrameReq> context,
void *user_data, int cqe_result)
{
// NB: The lock was acquired by onEventFdRead before calling this func
if (!context->engine.shouldAcceptRequests)
{
context->engine.assemblyCycleComplete();
context->loop.setRemainingIterationsToFailure();
context->callOriginalCallback(false, context->loop);
return;
}
// Extract index from user_data and mark slot as assembled if successful
size_t index = reinterpret_cast<size_t>(user_data);
bool success = (cqe_result >= 0);
if (success && index < context->engine.assembledSlotsTracker.size()) {
context->engine.assembledSlotsTracker[index].assembled = true;
}
// Caller decides success: result >= 0 means success
if (context->loop.incrementSuccessOrFailureAndTestForCompletionDueTo(
success))
{
// Loop is complete, call oracle function
context->assembleFrameReq3(context);
}
}
void assembleFrameReq3(
std::shared_ptr<AssembleFrameReq> context
)
{
/** EXPLANATION:
* All branch paths that invoke this unifyig oracle function are
* expected to already hold the shouldAcceptRequestsLock before calling
* it.
*/
// Ensure we only execute once using atomic exchange
if (context->handlerExecuted.exchange(true)) { return; }
// Cancel the timer, stop the engine and process frame, if any.
context->engine.assemblyCycleComplete();
/** EXPLANATION:
* Timeout doesn't necessarily mean error.
*
* If we received zero dgrams from the device, that is indeed an error.
* But if we received some dgrams, but not all, that is not an error:
* it just means we didn't receive as much data as we would have liked.
*/
// Error: no slots succeeded - no data received successfully.
if (context->loop.nSucceeded.load() == 0)
{
context->callOriginalCallback(false, context->loop);
return;
}
#if 0
// Artificially create random dummy slots for testing
context->engine.randomDummySlotFiller(context->loop);
#endif
// Fill un-assembled slots with dummy datagrams
context->engine.fillUnAssembledSlotsWithDummyDgrams();
#if 0
// Print first 4 bytes of each slot (whether assembled or not)
if (context->engine.frameAssemblyDesc)
{
for (size_t i = 0; i < context->engine.frameAssemblyDesc->numSlots; ++i) {
context->engine.printSlotBytes(i, 4);
}
}
#endif
if (context->loop.nSucceeded.load() >= context->loop.nTotal)
{
// Success: all or more slots succeeded
if (context->loop.nSucceeded.load() > context->loop.nTotal)
{
std::cerr << __func__ << ": nSucceeded > nTotal: succ ("
<< context->loop.nSucceeded.load()
<< ") > nTotal (" << context->loop.nTotal << ")\n";
}
context->callOriginalCallback(true, context->loop);
return;
}
if (context->loop.nSucceeded.load() < context->loop.nTotal)
{
// Success: some slots succeeded (less than total)
// Note: dummy fill for un-assembled slots will be implemented later
context->callOriginalCallback(true, context->loop);
return;
}
if (smoHooksPtr->OptionParser_getOptions().verbose)
{
std::cerr << __func__ << ": Invalid state: nSucceeded ("
<< context->loop.nSucceeded.load()
<< ") < nTotal (" << context->loop.nTotal << ")" << std::endl;
}
context->callOriginalCallback(false, context->loop);
return;
}
public:
IoUringAssemblyEngine& engine;
AsynchronousLoop loop;
std::atomic<bool> timerFired;
std::atomic<bool> handlerExecuted;
};
void IoUringAssemblyEngine::assembleFrameReq(
Callback<assembleFrameReqCbFn> cb)
{
{
SpinLock::Guard lock(shouldAcceptRequestsLock);
if (!shouldAcceptRequests)
{
cb.callbackFn(false, AsynchronousLoop(0));
return;
}
}
const auto& caller = smoHooksPtr->ComponentThread_getSelf();
auto request = std::make_shared<AssembleFrameReq>(
*this, caller, std::move(cb));
parent.device->componentThread->getIoService().post(
STC(std::bind(
&AssembleFrameReq::assembleFrameReq1_posted,
request.get(), request)));
}
void IoUringAssemblyEngine::onEventfdRead(
const boost::system::error_code& error,
std::size_t bytes_transferred)
{
(void)bytes_transferred;
// Ignore cancellation errors
if (error == boost::asio::error::operation_aborted) { return; }
/** EXPLANATION:
* This lock should be held throughout this method to ensure that the
* IoUringAssemblyEngine's per-assembly state isn't destroyed while this
* handler is running.
*/
SpinLock::Guard lock(shouldAcceptRequestsLock);
/** EXPLANATION:
* You'd think we should put check for shouldAcceptRequests here and
* `return` here if !shouldAcceptRequests, but we shouldn't because
* that would mean that we can't invoke the caller's callback. This would
* make the caller freeze forever.
*
* Instead we just let the onCqeReadyCallback check for
* shouldAcceptRequests. That way the onCqeReadyCallback can actually
* invoke the caller's callback, as it should. We have no knowledge of the
* caller's callback because we don't have access to the caller's
* continuation object. The onCqeReadyCallback does have access to it,
* so we leave that up to it.
*/
/** FIXME:
* It may be necessary to specifically check for and handle the cancel op
* CQE here. I'm not sure as yet though, but I'll highlight it here for now.
*/
// Process all available CQEs and call callback for each one
struct io_uring_cqe *cqe;
while (io_uring_peek_cqe(&ring, &cqe) == 0)
{
// Get user_data from the CQE
void* user_data = io_uring_cqe_get_data(cqe);
// Get result from the CQE
int cqe_result = cqe->res;
// Mark the CQE as seen
io_uring_cqe_seen(&ring, cqe);
/** EXPLANATION:
* Call the user-provided callback for this CQE with its user_data and
* result.
*
* 1. Notice that we call the caller's cb *after* marking the CQE as
* seen. We may later need to change this if the caller needs
* information from the CQE before it is marked as seen.
*
* 2. Notice that we do not check for or filter out the cancel op CQE
* here. The caller's handler will be able to see the cancel op CQE
* because of this.
*/
if (onCqeReadyCallback) {
onCqeReadyCallback(user_data, cqe_result);
}
}
/** EXPLANATION:
* But we do put a `return` here because we know that at this point, the
* caller's callback has already been invoked.
*/
if (!shouldAcceptRequests
|| eventfdDesc == nullptr || !eventfdDesc->is_open())
{
return;
}
// Re-arm the eventfd read for next CQE notification
eventfdDesc->async_read_some(
boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)),
std::bind(
&IoUringAssemblyEngine::onEventfdRead, this,
std::placeholders::_1,
std::placeholders::_2));
}
void IoUringAssemblyEngine::fillUnAssembledSlotsWithDummyDgrams()
{
if (!frameAssemblyDesc)
{ return; }
for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i)
{
// Only fill slots that were not successfully assembled
if (i >= assembledSlotsTracker.size()
|| assembledSlotsTracker[i].assembled)
{
continue;
}
// Placement construct DummyLivoxEthHeader in the slot
new (frameAssemblyDesc->slots[i].vaddr) DummyLivoxEthHeader();
}
}
void IoUringAssemblyEngine::randomDummySlotFiller(AsynchronousLoop& loop)
{
if (!frameAssemblyDesc)
{ return; }
// Check if there are already dummies (natural dummy instance)
uint32_t nSucceeded = loop.nSucceeded.load();
uint32_t nTotal = loop.nTotal;
uint32_t nFailed = loop.nFailed.load();
if (nFailed > 0 || nSucceeded < nTotal)
{
std::cout << __func__ << ": Natural dummy instance detected (nSucceeded="
<< nSucceeded << ", nTotal=" << nTotal << ", nFailed=" << nFailed
<< "), skipping artificial dummy creation" << std::endl;
return;
}
// Randomly select a number of slots to make into dummies (less than total)
std::uniform_int_distribution<size_t> numDummiesDist(1, nTotal - 1);
size_t numDummiesToCreate = numDummiesDist(randomGenerator);
std::uniform_int_distribution<size_t> slotIndexDist(0, nTotal - 1);
size_t dummiesCreated = 0;
size_t maxAttempts = nTotal * 10; // Safety limit to prevent infinite loop
size_t attempts = 0;
// Mark random slots as unassembled
while (dummiesCreated < numDummiesToCreate && attempts < maxAttempts)
{
++attempts;
size_t randomIndex = slotIndexDist(randomGenerator);
// Skip if already unassembled, re-roll
if (randomIndex >= assembledSlotsTracker.size()
|| !assembledSlotsTracker[randomIndex].assembled)
{
continue;
}
// Mark as unassembled
assembledSlotsTracker[randomIndex].assembled = false;
++dummiesCreated;
}
if (dummiesCreated < numDummiesToCreate)
{
std::cerr << __func__ << ": Warning: Could only create " << dummiesCreated
<< " dummy slots out of " << numDummiesToCreate
<< " requested (max attempts reached)" << std::endl;
numDummiesToCreate = dummiesCreated;
}
// Update the AsynchronousLoop to reflect the new number of dummies
// Since we only reach here when nSucceeded == nTotal and nFailed == 0,
// we can directly calculate the new values
uint32_t newSucceeded = nTotal - static_cast<uint32_t>(numDummiesToCreate);
uint32_t newFailed = static_cast<uint32_t>(numDummiesToCreate);
loop.nSucceeded.store(newSucceeded);
loop.nFailed.store(newFailed);
std::cout << __func__ << ": Artificially created " << numDummiesToCreate
<< " dummy slots (nSucceeded: " << nTotal << " -> "
<< newSucceeded << ", nFailed: 0 -> " << newFailed << ")" << std::endl;
}
void IoUringAssemblyEngine::printSlotBytes(size_t slotIndex, size_t nBytes)
{
if (!frameAssemblyDesc)
{
std::cerr << __func__ << ": frameAssemblyDesc is null" << std::endl;
return;
}
if (slotIndex >= frameAssemblyDesc->numSlots)
{
std::cerr << __func__ << ": slotIndex " << slotIndex
<< " out of range (numSlots=" << frameAssemblyDesc->numSlots
<< ")" << std::endl;
return;
}
const auto& slot = frameAssemblyDesc->slots[slotIndex];
size_t bytesToPrint = std::min(nBytes, static_cast<size_t>(slot.nBytes));
const uint8_t* data = reinterpret_cast<const uint8_t*>(slot.vaddr);
std::cout << __func__ << ": Slot " << slotIndex << " vaddr=" << (void*)slot.vaddr
<< " (" << bytesToPrint
<< " bytes):" << std::endl;
// Print hex dump format: offset | hex bytes | ASCII
const size_t bytesPerLine = 16;
for (size_t offset = 0; offset < bytesToPrint; offset += bytesPerLine)
{
// Print offset
std::cout << std::hex << std::setfill('0') << std::setw(4)
<< offset << ": ";
// Print hex bytes
for (size_t i = 0; i < bytesPerLine; ++i)
{
if (offset + i < bytesToPrint)
{
std::cout << std::setw(2) << static_cast<unsigned>(data[offset + i])
<< " ";
}
else
{
std::cout << " ";
}
}
// Print ASCII representation
std::cout << " |";
for (size_t i = 0; i < bytesPerLine && offset + i < bytesToPrint; ++i)
{
uint8_t byte = data[offset + i];
char c = (byte >= 32 && byte < 127) ? static_cast<char>(byte) : '.';
std::cout << c;
}
std::cout << "|" << std::dec << std::endl;
}
}
size_t IoUringAssemblyEngine::computePointsPerDgram(int returnMode)
{
/*
* Map modes to points per datagram based on Livox docs
* 1: first, 2: strongest -> 96 samples => 96 points
* 3: dual -> 48 samples * 2 points = 96
* 4: triple -> 30 samples * 3 points = 90
*/
switch (returnMode)
{
case static_cast<int>(livoxProto1::Device::ReturnMode::SingleFirst):
case static_cast<int>(livoxProto1::Device::ReturnMode::SingleStrongest):
case static_cast<int>(livoxProto1::Device::ReturnMode::Dual):
return 96u;
case static_cast<int>(livoxProto1::Device::ReturnMode::Triple):
return 90u;
default:
throw std::runtime_error(
std::string(__func__) + ": Unknown returnMode "
+ std::to_string(returnMode));
}
}
} // namespace stim_buff
} // namespace smo