Files
salmanoff/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp
T

626 lines
18 KiB
C++
Raw Normal View History

2025-11-06 15:03:26 -04:00
#include <boostAsioLinkageFix.h>
2025-11-06 00:00:23 -04:00
#include <config.h>
#include <opts.h>
#include <cstring>
#include <stdexcept>
#include <functional>
#include <sys/socket.h>
#include <sys/eventfd.h>
#include <sys/uio.h>
#include <sys/poll.h>
#include <unistd.h>
#include <errno.h>
#include <boost/system/error_code.hpp>
#include <livoxProto1/device.h>
#include <livoxProto1/livoxProto1.h>
2025-11-06 00:00:23 -04:00
#include <asynchronousContinuation.h>
#include <asynchronousLoop.h>
#include <asynchronousBridge.h>
#include <callback.h>
#include <spinLock.h>
2025-10-31 11:48:31 -04:00
#include "ioUringAssemblyEngine.h"
#include "pcloudStimulusBuffer.h"
#include "livoxGen1.h"
namespace smo {
namespace stim_buff {
inline LivoxProto1DllState& getLivoxProto1State() { return livoxProto1; }
struct DummyLivoxEthHeader
{
2025-11-01 20:18:05 -04:00
enum : uint32_t {
INVALID_ERR_CODE = 0xFFFFFFFFu
};
enum : uint8_t {
INVALID_TIMESTAMP_TYPE = 0xFFu,
INVALID_DATA_TYPE = 0xFFu
};
uint8_t version, slot, id, rsvd;
uint32_t err_code;
2025-11-01 20:18:05 -04:00
uint8_t timestamp_type, data_type;
uint8_t timestamp[8];
};
IoUringAssemblyEngine::IoUringAssemblyEngine(PcloudStimulusBuffer& parent_)
: parent(parent_),
frameAssemblyDesc(nullptr), ring{},
isSetup(false),
eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0),
2025-11-06 00:00:23 -04:00
stallTimer(parent_.device->componentThread->getIoService()),
isAssembling(false)
{}
bool IoUringAssemblyEngine::setup()
{
if (isSetup)
{ return false; }
// Get FrameAssemblyDesc from staging buffer
frameAssemblyDesc = static_cast<std::shared_ptr<FrameAssemblyDesc>>(
parent.assemblyBuffer);
if (!frameAssemblyDesc || frameAssemblyDesc->slots.empty())
{ return false; }
// Get point cloud data socket descriptor from UdpCommandDemuxer
auto& livoxState = getLivoxProto1State();
if (!livoxState.livoxProto1_getPcloudDataFdDesc)
{ return false; }
pcloudDataFdDesc = (*livoxState.livoxProto1_getPcloudDataFdDesc)();
if (!pcloudDataFdDesc)
{ return false; }
// Get UDP socket file descriptor
int udpFd = pcloudDataFdDesc->native_handle();
if (udpFd < 0)
{ return false; }
// Declare iovec early to avoid goto crossing initialization
struct iovec iov;
int ret;
/** EXPLANATION:
* Initialize io_uring ring - allocate SQEs and CQEs for one frame assembly
* One SQE per slot (one datagram per slot), plus one extra for cancel
* operations, since io_uring_prep_cancel() requires a valid SQE. So we
* alloc 1 extra SQE to guarantee that we will always have an available SQE
* for cancel operations.
*/
ret = io_uring_queue_init(
static_cast<unsigned int>(frameAssemblyDesc->numSlots + 1), &ring, 0);
if (ret < 0)
{ goto cleanup; }
// Register staging buffer with io_uring for DMA-apt I/O
iov = parent.assemblyBuffer.getIoUringRegisterIoVec();
ret = io_uring_register_buffers(&ring, &iov, 1);
if (ret < 0)
{ goto cleanup_ring; }
// Create eventfd for CQE notifications (used with boost's unified loop)
eventfdFd = eventfd(0, EFD_NONBLOCK);
if (eventfdFd < 0)
{ goto cleanup_buffers; }
// Register eventfd with io_uring
ret = io_uring_register_eventfd(&ring, eventfdFd);
if (ret < 0)
{ goto cleanup_eventfd; }
isSetup = true;
return true;
cleanup_eventfd:
close(eventfdFd);
eventfdFd = -1;
cleanup_buffers:
io_uring_unregister_buffers(&ring);
cleanup_ring:
io_uring_queue_exit(&ring);
cleanup:
return false;
}
void IoUringAssemblyEngine::finalize()
{
// Call stop() to cancel in-flight operations (stop() already cancels the timer)
stop();
if (eventfdFd >= 0)
{
io_uring_unregister_eventfd(&ring);
close(eventfdFd);
eventfdFd = -1;
}
if (isSetup)
{
io_uring_unregister_buffers(&ring);
io_uring_queue_exit(&ring);
isSetup = false;
}
// Reset state to allow setup() to be called again
frameAssemblyDesc = nullptr;
}
void IoUringAssemblyEngine::resetAndAssembleFrame(
2025-11-06 00:00:23 -04:00
resetAndAssembleFrameCbFn onCqeReady)
{
if (!onCqeReady)
{
throw std::runtime_error(std::string(__func__)
+ ": onCqeReady callback is invalid");
}
if (!frameAssemblyDesc || !pcloudDataFdDesc || eventfdFd < 0)
{
throw std::runtime_error(std::string(__func__)
+ ": invalid state: "
+ ( !frameAssemblyDesc ? "frameAssemblyDesc is null; " : "" )
+ ( !pcloudDataFdDesc ? "pcloudDataFdDesc is null; " : "" )
+ ( eventfdFd < 0 ? "eventfdFd is invalid." : "" ));
}
// eventfdDesc should not be valid when resetAndAssembleFrame is called
if (eventfdDesc)
{
throw std::runtime_error(std::string(__func__)
+ ": eventfdDesc is already set");
}
// Store the callback for re-arming
onCqeReadyCallback = std::move(onCqeReady);
/** EXPLANATION:
* Flush eventfd state: poll and read any pending events before creating
* descriptor.
* Use poll() to check if data is available (non-blocking check).
* If data is available, read it to flush.
*/
struct pollfd pfd;
pfd.fd = eventfdFd;
pfd.events = POLLIN;
pfd.revents = 0;
int poll_ret = poll(&pfd, 1, 0); // Timeout 0 = non-blocking
if (poll_ret > 0 && (pfd.revents & POLLIN))
{
uint64_t discard;
ssize_t ret = read(eventfdFd, &discard, sizeof(discard));
(void)ret; // Ignore errors - just trying to flush
}
eventfdDesc = std::make_unique<boost::asio::posix::stream_descriptor>(
parent.device->componentThread->getIoService(), eventfdFd);
if (!eventfdDesc)
{
throw std::runtime_error(std::string(__func__)
+ ": failed to create eventfd stream descriptor");
}
// Get UDP socket file descriptor
int udpFd = pcloudDataFdDesc->native_handle();
if (udpFd < 0)
{
throw std::runtime_error(std::string(__func__)
+ ": invalid UDP socket file descriptor");
}
// Prepare SQEs for each slot in the frame
struct io_uring_sqe *sqe;
for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i)
{
sqe = io_uring_get_sqe(&ring);
if (!sqe)
{
throw std::runtime_error(std::string(__func__)
+ ": failed to get SQE for slot " + std::to_string(i));
}
const auto& slot = frameAssemblyDesc->slots[i];
// Prepare recvmsg SQE for this slot
struct msghdr msg = {};
struct iovec iov;
iov.iov_base = slot.vaddr;
iov.iov_len = slot.nBytes;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
io_uring_prep_recvmsg(sqe, udpFd, &msg, 0);
// Set user_data to slot index for tracking
io_uring_sqe_set_data(sqe, reinterpret_cast<void*>(i));
}
// Submit all SQEs
int ret = io_uring_submit(&ring);
if (ret < 0)
{
throw std::runtime_error(std::string(__func__)
+ ": io_uring_submit failed: " + std::strerror(errno)
+ " (errno=" + std::to_string(errno) + ")");
}
2025-11-06 00:00:23 -04:00
// Set assembly flag
isAssembling = true;
// Start listening for CQE notifications on eventfd
eventfdDesc->async_read_some(
boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)),
std::bind(
&IoUringAssemblyEngine::onEventfdRead, this,
std::placeholders::_1,
std::placeholders::_2));
}
2025-11-06 00:00:23 -04:00
void IoUringAssemblyEngine::stop(bool doAcquireLock)
{
2025-11-06 00:00:23 -04:00
// Clear assembly flag first to signal onEventfdRead to stop re-arming
// Acquire and release lock tightly around setting the flag
if (doAcquireLock)
{
SpinLock::Guard lock(isAssemblingLock);
isAssembling = false;
} else {
isAssembling = false;
}
/** FIXME:
* There's a problem with this bridge here.
*
* We can't delay during every call to stop because under normal operating
* conditions, this whole assembly process should be able to move as fast
* as possible and to receive as much data as possible without maximum
* throughput.
*
* Yet we need to delay briefly here to ensure that the onEventfdRead loop
* has a chance to see the flag and halt.
*
* We need to analyze this carefully and figure out what the correct
* conditions are for being certain that we aren't destroying state while
* the eventfdRead loop is still running; and we need to figure out how to
* ensure that we only delay when absolutely necessary.
*/
// Cancel in-flight stall timeout timer
stallTimer.cancel();
2025-11-06 08:53:44 -04:00
onCqeReadyCallback = std::move([](void *, int){});
if (isSetup)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
if (!sqe)
{
std::cerr << __func__ << ": failed to get SQE for cancel op. "
<< "Continuing cleanup without cancelling.\n";
goto cleanup_eventfd;
}
/* Cancel all in-flight operations on our ring
* using IORING_ASYNC_CANCEL_ANY. Identify the CQE for the cancel
* op as numSlots since numSlots is an invalid slot index for a
* real slot.
*/
io_uring_prep_cancel(
sqe, reinterpret_cast<void*>(frameAssemblyDesc->numSlots),
IORING_ASYNC_CANCEL_ANY);
io_uring_submit(&ring);
/* Wait for cancellation to complete. According to the man page,
* cancellation is synchronous and a CQE is guaranteed to be
* generated by the time submission returns.
*/
struct io_uring_cqe *cqe;
bool sawCancelCqe = false;
while (io_uring_peek_cqe(&ring, &cqe) == 0)
{
// Call seen() on all CQEs for completeness/correctness.
io_uring_cqe_seen(&ring, cqe);
void *user_data = io_uring_cqe_get_data(cqe);
if (user_data == reinterpret_cast<void*>(
frameAssemblyDesc->numSlots))
{
sawCancelCqe = true;
}
}
2025-11-06 00:00:23 -04:00
if (!sawCancelCqe && OptionParser::getOptions().verbose) {
std::cerr << __func__ << ": no CQE seen for cancel operation\n";
}
}
cleanup_eventfd:
if (eventfdDesc)
{
/** EXPLANATION:
* The goal here is to ensure that our io_service's event loop will not
* get any events from the eventfd after we've called stop(). So we
* completely deinitialize the eventfd descriptor.
*
* But we still want to reuse the underlying eventfd file descriptor,
* itself in the next resetAndAssembleFrame() cycle, so we call
* release() instead of reset() to ensure that the underlying fd
* is not closed.
2025-11-06 00:00:23 -04:00
*
* However, we need to close the descriptor's association with the
* io_service before releasing it, otherwise Boost.Asio will complain
* when we try to create a new descriptor with the same fd.
*/
eventfdDesc->cancel();
2025-11-06 00:00:23 -04:00
eventfdDesc->release();
/* Destroy the descriptor object (now that it's unregistered, destroying
* it won't close the fd since release() transferred ownership back)
*/
eventfdDesc.reset();
}
}
// Continuation class for assembleFrameReq
class IoUringAssemblyEngine::AssembleFrameReq
: public PostedAsynchronousContinuation<
IoUringAssemblyEngine::assembleFrameReqCbFn>
{
public:
AssembleFrameReq(
IoUringAssemblyEngine& engine_,
const std::shared_ptr<ComponentThread>& caller,
Callback<IoUringAssemblyEngine::assembleFrameReqCbFn> cb)
: PostedAsynchronousContinuation<
IoUringAssemblyEngine::assembleFrameReqCbFn>(caller, cb),
engine(engine_),
loop(engine_.frameAssemblyDesc->numSlots),
timerFired(false), handlerExecuted(false)
{}
public:
void assembleFrameReq1_posted(
std::shared_ptr<AssembleFrameReq> context)
{
if (!engine.frameAssemblyDesc)
{
throw std::runtime_error(std::string(__func__)
+ ": frameAssemblyDesc is null");
}
// Initialize loop with number of slots
context->loop = AsynchronousLoop(engine.frameAssemblyDesc->numSlots);
/** FIXME:
* I'm suspicious of this std::bind return object here. What if us
* setting it to null inside of stop() doesn't actually cause the
* object to be destroyed? This would cause this contin's sh_ptr's
* reference count to never reach 0, causing a memory leak.
*/
engine.resetAndAssembleFrame(
std::bind(&AssembleFrameReq::assembleFrameReq2_2,
context.get(), context,
std::placeholders::_1, std::placeholders::_2));
// Set up timeout timer for CONFIG_STIMBUFF_FRAME_PERIOD_MS/2 ms
engine.stallTimer.expires_from_now(
boost::posix_time::milliseconds(
CONFIG_STIMBUFF_FRAME_PERIOD_MS / 2));
engine.stallTimer.async_wait(
std::bind(&AssembleFrameReq::assembleFrameReq2_1,
context.get(), context,
std::placeholders::_1));
}
void assembleFrameReq2_1(
std::shared_ptr<AssembleFrameReq> context,
const boost::system::error_code& error)
{
// Check if timer was cancelled (ignore if operation_aborted)
if (error == boost::asio::error::operation_aborted) { return; }
// Set timer fired flag
context->timerFired.store(true);
context->assembleFrameReq3(context);
}
void assembleFrameReq2_2(
std::shared_ptr<AssembleFrameReq> context,
void *user_data, int cqe_result)
{
(void)user_data; // Not used - we just track success/failure counts
// Caller decides success: result >= 0 means success
bool success = (cqe_result >= 0);
if (context->loop.incrementSuccessOrFailureAndTestForCompletionDueTo(
success))
{
// Loop is complete, call oracle function
context->assembleFrameReq3(context);
}
}
void assembleFrameReq3(std::shared_ptr<AssembleFrameReq> context)
{
// Ensure we only execute once using atomic exchange
if (context->handlerExecuted.exchange(true)) { return; }
// Cancel the timer, stop the engine and process frame, if any.
context->engine.stop(false);
/** EXPLANATION:
* Timeout doesn't necessarily mean error.
*
* If we received zero dgrams from the device, that is indeed an error.
* But if we received some dgrams, but not all, that is not an error:
* it just means we didn't receive as much data as we would have liked.
*/
// Error: no slots succeeded - no data received successfully.
if (context->loop.nSucceeded.load() == 0)
{
context->callOriginalCb(false, context->loop);
return;
}
if (context->loop.nSucceeded.load() >= context->loop.nTotal)
{
// Success: all or more slots succeeded
if (context->loop.nSucceeded.load() > context->loop.nTotal)
{
std::cerr << __func__ << ": nSucceeded > nTotal: succ ("
<< context->loop.nSucceeded.load()
<< ") > nTotal (" << context->loop.nTotal << ")\n";
}
context->callOriginalCb(true, context->loop);
return;
}
if (context->loop.nSucceeded.load() < context->loop.nTotal)
{
// Success: some slots succeeded (less than total)
// Note: dummy fill for un-assembled slots will be implemented later
context->callOriginalCb(true, context->loop);
return;
}
if (OptionParser::getOptions().verbose)
{
std::cerr << __func__ << ": Invalid state: nSucceeded ("
<< context->loop.nSucceeded.load()
<< ") < nTotal (" << context->loop.nTotal << ")" << std::endl;
}
context->callOriginalCb(false, context->loop);
return;
}
public:
IoUringAssemblyEngine& engine;
AsynchronousLoop loop;
std::atomic<bool> timerFired;
std::atomic<bool> handlerExecuted;
};
void IoUringAssemblyEngine::assembleFrameReq(
Callback<assembleFrameReqCbFn> cb)
{
if (!frameAssemblyDesc)
{
throw std::runtime_error(std::string(__func__)
+ ": frameAssemblyDesc is null");
}
2025-11-06 00:00:23 -04:00
const auto& caller = smoHooksPtr->ComponentThread_getSelf();
auto request = std::make_shared<AssembleFrameReq>(
*this, caller, std::move(cb));
parent.device->componentThread->getIoService().post(
std::bind(
&AssembleFrameReq::assembleFrameReq1_posted,
request.get(), request));
}
void IoUringAssemblyEngine::onEventfdRead(
const boost::system::error_code& error,
std::size_t bytes_transferred)
{
(void)bytes_transferred;
// Ignore cancellation errors
2025-11-06 00:00:23 -04:00
if (error == boost::asio::error::operation_aborted) { return; }
/** EXPLANATION:
* This lock should be held throughout this method to ensure that the
* IoUringAssemblyEngine's per-assembly state isn't destroyed while this
* handler is running.
*/
SpinLock::Guard lock(isAssemblingLock);
if (!isAssembling) { return; }
/** FIXME:
* It may be necessary to specifically check for and handle the cancel op
* CQE here. I'm not sure as yet though, but I'll highlight it here for now.
*/
// Process all available CQEs and call callback for each one
struct io_uring_cqe *cqe;
while (io_uring_peek_cqe(&ring, &cqe) == 0)
{
// Get user_data from the CQE
void* user_data = io_uring_cqe_get_data(cqe);
2025-11-06 00:00:23 -04:00
// Get result from the CQE
int cqe_result = cqe->res;
// Mark the CQE as seen
io_uring_cqe_seen(&ring, cqe);
/** EXPLANATION:
2025-11-06 00:00:23 -04:00
* Call the user-provided callback for this CQE with its user_data and
* result.
*
* 1. Notice that we call the caller's cb *after* marking the CQE as
* seen. We may later need to change this if the caller needs
* information from the CQE before it is marked as seen.
*
* 2. Notice that we do not check for or filter out the cancel op CQE
* here. The caller's handler will be able to see the cancel op CQE
* because of this.
*/
if (onCqeReadyCallback) {
2025-11-06 00:00:23 -04:00
onCqeReadyCallback(user_data, cqe_result);
}
}
// Re-arm the eventfd read for next CQE notification
2025-11-06 00:00:23 -04:00
// Only re-arm if assembly is still active (stop() hasn't been called)
if (eventfdDesc && eventfdFd >= 0)
{
eventfdDesc->async_read_some(
boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)),
2025-11-06 00:00:23 -04:00
std::bind(
&IoUringAssemblyEngine::onEventfdRead, this,
std::placeholders::_1,
std::placeholders::_2));
}
}
2025-10-31 11:48:31 -04:00
void IoUringAssemblyEngine::cancelIncompleteAndFillDummies()
{
if (!frameAssemblyDesc)
2025-10-31 11:48:31 -04:00
{ return; }
for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i)
2025-10-31 11:48:31 -04:00
{
// In the real path, decide from CQE accounting whether slot i completed.
// Here, demonstrate dummy header insertion API.
auto* hdr = reinterpret_cast<DummyLivoxEthHeader*>(frameAssemblyDesc->slots[i].vaddr);
2025-11-01 20:18:05 -04:00
hdr->err_code = DummyLivoxEthHeader::INVALID_ERR_CODE;
hdr->timestamp_type = DummyLivoxEthHeader::INVALID_TIMESTAMP_TYPE;
hdr->data_type = DummyLivoxEthHeader::INVALID_DATA_TYPE;
2025-10-31 11:48:31 -04:00
}
}
size_t IoUringAssemblyEngine::computePointsPerDgram(int returnMode)
{
2025-10-31 11:48:31 -04:00
/*
* Map modes to points per datagram based on Livox docs
* 1: first, 2: strongest -> 96 samples => 96 points
* 3: dual -> 48 samples * 2 points = 96
* 4: triple -> 30 samples * 3 points = 90
*/
switch (returnMode)
{
2025-10-31 11:48:31 -04:00
case static_cast<int>(livoxProto1::Device::ReturnMode::SingleFirst):
case static_cast<int>(livoxProto1::Device::ReturnMode::SingleStrongest):
case static_cast<int>(livoxProto1::Device::ReturnMode::Dual):
return 96u;
case static_cast<int>(livoxProto1::Device::ReturnMode::Triple):
return 90u;
default:
throw std::runtime_error(
std::string(__func__) + ": Unknown returnMode "
+ std::to_string(returnMode));
}
}
} // namespace stim_buff
} // namespace smo