#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "ioUringAssemblyEngine.h" #include "pcloudStimulusBuffer.h" #include "livoxGen1.h" namespace smo { namespace stim_buff { inline LivoxProto1DllState& getLivoxProto1State() { return livoxProto1; } struct DummyLivoxEthHeader { enum : uint32_t { INVALID_ERR_CODE = 0xFFFFFFFFu }; enum : uint8_t { INVALID_TIMESTAMP_TYPE = 0xFFu, INVALID_DATA_TYPE = 0xFFu }; uint8_t version, slot, id, rsvd; uint32_t err_code; uint8_t timestamp_type, data_type; uint8_t timestamp[8]; }; IoUringAssemblyEngine::IoUringAssemblyEngine(PcloudStimulusBuffer& parent_) : parent(parent_), frameAssemblyDesc(nullptr), ring{}, isSetup(false), eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0), stallTimer(parent_.device->componentThread->getIoService()), isAssembling(false) {} bool IoUringAssemblyEngine::setup() { if (isSetup) { return false; } // Get FrameAssemblyDesc from staging buffer frameAssemblyDesc = static_cast>( parent.assemblyBuffer); if (!frameAssemblyDesc || frameAssemblyDesc->slots.empty()) { return false; } // Get point cloud data socket descriptor from UdpCommandDemuxer auto& livoxState = getLivoxProto1State(); if (!livoxState.livoxProto1_getPcloudDataFdDesc) { return false; } pcloudDataFdDesc = (*livoxState.livoxProto1_getPcloudDataFdDesc)(); if (!pcloudDataFdDesc) { return false; } // Get UDP socket file descriptor int udpFd = pcloudDataFdDesc->native_handle(); if (udpFd < 0) { return false; } // Declare iovec early to avoid goto crossing initialization struct iovec iov; int ret; /** EXPLANATION: * Initialize io_uring ring - allocate SQEs and CQEs for one frame assembly * One SQE per slot (one datagram per slot), plus one extra for cancel * operations, since io_uring_prep_cancel() requires a valid SQE. So we * alloc 1 extra SQE to guarantee that we will always have an available SQE * for cancel operations. */ ret = io_uring_queue_init( static_cast(frameAssemblyDesc->numSlots + 1), &ring, 0); if (ret < 0) { goto cleanup; } // Register staging buffer with io_uring for DMA-apt I/O iov = parent.assemblyBuffer.getIoUringRegisterIoVec(); ret = io_uring_register_buffers(&ring, &iov, 1); if (ret < 0) { goto cleanup_ring; } // Create eventfd for CQE notifications (used with boost's unified loop) eventfdFd = eventfd(0, EFD_NONBLOCK); if (eventfdFd < 0) { goto cleanup_buffers; } // Register eventfd with io_uring ret = io_uring_register_eventfd(&ring, eventfdFd); if (ret < 0) { goto cleanup_eventfd; } isSetup = true; return true; cleanup_eventfd: close(eventfdFd); eventfdFd = -1; cleanup_buffers: io_uring_unregister_buffers(&ring); cleanup_ring: io_uring_queue_exit(&ring); cleanup: return false; } void IoUringAssemblyEngine::finalize() { // Call stop() to cancel in-flight operations (stop() already cancels the timer) stop(); if (eventfdFd >= 0) { io_uring_unregister_eventfd(&ring); close(eventfdFd); eventfdFd = -1; } if (isSetup) { io_uring_unregister_buffers(&ring); io_uring_queue_exit(&ring); isSetup = false; } // Reset state to allow setup() to be called again frameAssemblyDesc = nullptr; } void IoUringAssemblyEngine::resetAndAssembleFrame( resetAndAssembleFrameCbFn onCqeReady) { if (!onCqeReady) { throw std::runtime_error(std::string(__func__) + ": onCqeReady callback is invalid"); } if (!frameAssemblyDesc || !pcloudDataFdDesc || eventfdFd < 0) { throw std::runtime_error(std::string(__func__) + ": invalid state: " + ( !frameAssemblyDesc ? "frameAssemblyDesc is null; " : "" ) + ( !pcloudDataFdDesc ? "pcloudDataFdDesc is null; " : "" ) + ( eventfdFd < 0 ? "eventfdFd is invalid." : "" )); } // eventfdDesc should not be valid when resetAndAssembleFrame is called if (eventfdDesc) { throw std::runtime_error(std::string(__func__) + ": eventfdDesc is already set"); } // Store the callback for re-arming onCqeReadyCallback = std::move(onCqeReady); /** EXPLANATION: * Flush eventfd state: poll and read any pending events before creating * descriptor. * Use poll() to check if data is available (non-blocking check). * If data is available, read it to flush. */ struct pollfd pfd; pfd.fd = eventfdFd; pfd.events = POLLIN; pfd.revents = 0; int poll_ret = poll(&pfd, 1, 0); // Timeout 0 = non-blocking if (poll_ret > 0 && (pfd.revents & POLLIN)) { uint64_t discard; ssize_t ret = read(eventfdFd, &discard, sizeof(discard)); (void)ret; // Ignore errors - just trying to flush } eventfdDesc = std::make_unique( parent.device->componentThread->getIoService(), eventfdFd); if (!eventfdDesc) { throw std::runtime_error(std::string(__func__) + ": failed to create eventfd stream descriptor"); } // Get UDP socket file descriptor int udpFd = pcloudDataFdDesc->native_handle(); if (udpFd < 0) { throw std::runtime_error(std::string(__func__) + ": invalid UDP socket file descriptor"); } // Prepare SQEs for each slot in the frame struct io_uring_sqe *sqe; for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i) { sqe = io_uring_get_sqe(&ring); if (!sqe) { throw std::runtime_error(std::string(__func__) + ": failed to get SQE for slot " + std::to_string(i)); } const auto& slot = frameAssemblyDesc->slots[i]; // Prepare recvmsg SQE for this slot struct msghdr msg = {}; struct iovec iov; iov.iov_base = slot.vaddr; iov.iov_len = slot.nBytes; msg.msg_iov = &iov; msg.msg_iovlen = 1; io_uring_prep_recvmsg(sqe, udpFd, &msg, 0); // Set user_data to slot index for tracking io_uring_sqe_set_data(sqe, reinterpret_cast(i)); } // Submit all SQEs int ret = io_uring_submit(&ring); if (ret < 0) { throw std::runtime_error(std::string(__func__) + ": io_uring_submit failed: " + std::strerror(errno) + " (errno=" + std::to_string(errno) + ")"); } // Set assembly flag isAssembling = true; // Start listening for CQE notifications on eventfd eventfdDesc->async_read_some( boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)), std::bind( &IoUringAssemblyEngine::onEventfdRead, this, std::placeholders::_1, std::placeholders::_2)); } void IoUringAssemblyEngine::stop(bool doAcquireLock) { // Clear assembly flag first to signal onEventfdRead to stop re-arming // Acquire and release lock tightly around setting the flag if (doAcquireLock) { SpinLock::Guard lock(isAssemblingLock); isAssembling = false; } else { isAssembling = false; } /** FIXME: * There's a problem with this bridge here. * * We can't delay during every call to stop because under normal operating * conditions, this whole assembly process should be able to move as fast * as possible and to receive as much data as possible without maximum * throughput. * * Yet we need to delay briefly here to ensure that the onEventfdRead loop * has a chance to see the flag and halt. * * We need to analyze this carefully and figure out what the correct * conditions are for being certain that we aren't destroying state while * the eventfdRead loop is still running; and we need to figure out how to * ensure that we only delay when absolutely necessary. */ // Cancel in-flight stall timeout timer stallTimer.cancel(); onCqeReadyCallback = std::move([](void *, int){}); if (isSetup) { struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); if (!sqe) { std::cerr << __func__ << ": failed to get SQE for cancel op. " << "Continuing cleanup without cancelling.\n"; goto cleanup_eventfd; } /* Cancel all in-flight operations on our ring * using IORING_ASYNC_CANCEL_ANY. Identify the CQE for the cancel * op as numSlots since numSlots is an invalid slot index for a * real slot. */ io_uring_prep_cancel( sqe, reinterpret_cast(frameAssemblyDesc->numSlots), IORING_ASYNC_CANCEL_ANY); io_uring_submit(&ring); /* Wait for cancellation to complete. According to the man page, * cancellation is synchronous and a CQE is guaranteed to be * generated by the time submission returns. */ struct io_uring_cqe *cqe; bool sawCancelCqe = false; while (io_uring_peek_cqe(&ring, &cqe) == 0) { // Call seen() on all CQEs for completeness/correctness. io_uring_cqe_seen(&ring, cqe); void *user_data = io_uring_cqe_get_data(cqe); if (user_data == reinterpret_cast( frameAssemblyDesc->numSlots)) { sawCancelCqe = true; } } if (!sawCancelCqe && OptionParser::getOptions().verbose) { std::cerr << __func__ << ": no CQE seen for cancel operation\n"; } } cleanup_eventfd: if (eventfdDesc) { /** EXPLANATION: * The goal here is to ensure that our io_service's event loop will not * get any events from the eventfd after we've called stop(). So we * completely deinitialize the eventfd descriptor. * * But we still want to reuse the underlying eventfd file descriptor, * itself in the next resetAndAssembleFrame() cycle, so we call * release() instead of reset() to ensure that the underlying fd * is not closed. * * However, we need to close the descriptor's association with the * io_service before releasing it, otherwise Boost.Asio will complain * when we try to create a new descriptor with the same fd. */ eventfdDesc->cancel(); eventfdDesc->release(); /* Destroy the descriptor object (now that it's unregistered, destroying * it won't close the fd since release() transferred ownership back) */ eventfdDesc.reset(); } } // Continuation class for assembleFrameReq class IoUringAssemblyEngine::AssembleFrameReq : public PostedAsynchronousContinuation< IoUringAssemblyEngine::assembleFrameReqCbFn> { public: AssembleFrameReq( IoUringAssemblyEngine& engine_, const std::shared_ptr& caller, Callback cb) : PostedAsynchronousContinuation< IoUringAssemblyEngine::assembleFrameReqCbFn>(caller, cb), engine(engine_), loop(engine_.frameAssemblyDesc->numSlots), timerFired(false), handlerExecuted(false) {} public: void assembleFrameReq1_posted( std::shared_ptr context) { if (!engine.frameAssemblyDesc) { throw std::runtime_error(std::string(__func__) + ": frameAssemblyDesc is null"); } // Initialize loop with number of slots context->loop = AsynchronousLoop(engine.frameAssemblyDesc->numSlots); /** FIXME: * I'm suspicious of this std::bind return object here. What if us * setting it to null inside of stop() doesn't actually cause the * object to be destroyed? This would cause this contin's sh_ptr's * reference count to never reach 0, causing a memory leak. */ engine.resetAndAssembleFrame( std::bind(&AssembleFrameReq::assembleFrameReq2_2, context.get(), context, std::placeholders::_1, std::placeholders::_2)); // Set up timeout timer for CONFIG_STIMBUFF_FRAME_PERIOD_MS/2 ms engine.stallTimer.expires_from_now( boost::posix_time::milliseconds( CONFIG_STIMBUFF_FRAME_PERIOD_MS / 2)); engine.stallTimer.async_wait( std::bind(&AssembleFrameReq::assembleFrameReq2_1, context.get(), context, std::placeholders::_1)); } void assembleFrameReq2_1( std::shared_ptr context, const boost::system::error_code& error) { // Check if timer was cancelled (ignore if operation_aborted) if (error == boost::asio::error::operation_aborted) { return; } // Set timer fired flag context->timerFired.store(true); context->assembleFrameReq3(context); } void assembleFrameReq2_2( std::shared_ptr context, void *user_data, int cqe_result) { (void)user_data; // Not used - we just track success/failure counts // Caller decides success: result >= 0 means success bool success = (cqe_result >= 0); if (context->loop.incrementSuccessOrFailureAndTestForCompletionDueTo( success)) { // Loop is complete, call oracle function context->assembleFrameReq3(context); } } void assembleFrameReq3(std::shared_ptr context) { // Ensure we only execute once using atomic exchange if (context->handlerExecuted.exchange(true)) { return; } // Cancel the timer, stop the engine and process frame, if any. context->engine.stop(false); /** EXPLANATION: * Timeout doesn't necessarily mean error. * * If we received zero dgrams from the device, that is indeed an error. * But if we received some dgrams, but not all, that is not an error: * it just means we didn't receive as much data as we would have liked. */ // Error: no slots succeeded - no data received successfully. if (context->loop.nSucceeded.load() == 0) { context->callOriginalCb(false, context->loop); return; } if (context->loop.nSucceeded.load() >= context->loop.nTotal) { // Success: all or more slots succeeded if (context->loop.nSucceeded.load() > context->loop.nTotal) { std::cerr << __func__ << ": nSucceeded > nTotal: succ (" << context->loop.nSucceeded.load() << ") > nTotal (" << context->loop.nTotal << ")\n"; } context->callOriginalCb(true, context->loop); return; } if (context->loop.nSucceeded.load() < context->loop.nTotal) { // Success: some slots succeeded (less than total) // Note: dummy fill for un-assembled slots will be implemented later context->callOriginalCb(true, context->loop); return; } if (OptionParser::getOptions().verbose) { std::cerr << __func__ << ": Invalid state: nSucceeded (" << context->loop.nSucceeded.load() << ") < nTotal (" << context->loop.nTotal << ")" << std::endl; } context->callOriginalCb(false, context->loop); return; } public: IoUringAssemblyEngine& engine; AsynchronousLoop loop; std::atomic timerFired; std::atomic handlerExecuted; }; void IoUringAssemblyEngine::assembleFrameReq( Callback cb) { if (!frameAssemblyDesc) { throw std::runtime_error(std::string(__func__) + ": frameAssemblyDesc is null"); } const auto& caller = smoHooksPtr->ComponentThread_getSelf(); auto request = std::make_shared( *this, caller, std::move(cb)); parent.device->componentThread->getIoService().post( STC(std::bind( &AssembleFrameReq::assembleFrameReq1_posted, request.get(), request))); } void IoUringAssemblyEngine::onEventfdRead( const boost::system::error_code& error, std::size_t bytes_transferred) { (void)bytes_transferred; // Ignore cancellation errors if (error == boost::asio::error::operation_aborted) { return; } /** EXPLANATION: * This lock should be held throughout this method to ensure that the * IoUringAssemblyEngine's per-assembly state isn't destroyed while this * handler is running. */ SpinLock::Guard lock(isAssemblingLock); if (!isAssembling) { return; } /** FIXME: * It may be necessary to specifically check for and handle the cancel op * CQE here. I'm not sure as yet though, but I'll highlight it here for now. */ // Process all available CQEs and call callback for each one struct io_uring_cqe *cqe; while (io_uring_peek_cqe(&ring, &cqe) == 0) { // Get user_data from the CQE void* user_data = io_uring_cqe_get_data(cqe); // Get result from the CQE int cqe_result = cqe->res; // Mark the CQE as seen io_uring_cqe_seen(&ring, cqe); /** EXPLANATION: * Call the user-provided callback for this CQE with its user_data and * result. * * 1. Notice that we call the caller's cb *after* marking the CQE as * seen. We may later need to change this if the caller needs * information from the CQE before it is marked as seen. * * 2. Notice that we do not check for or filter out the cancel op CQE * here. The caller's handler will be able to see the cancel op CQE * because of this. */ if (onCqeReadyCallback) { onCqeReadyCallback(user_data, cqe_result); } } // Re-arm the eventfd read for next CQE notification // Only re-arm if assembly is still active (stop() hasn't been called) if (eventfdDesc && eventfdFd >= 0) { eventfdDesc->async_read_some( boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)), std::bind( &IoUringAssemblyEngine::onEventfdRead, this, std::placeholders::_1, std::placeholders::_2)); } } void IoUringAssemblyEngine::cancelIncompleteAndFillDummies() { if (!frameAssemblyDesc) { return; } for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i) { // In the real path, decide from CQE accounting whether slot i completed. // Here, demonstrate dummy header insertion API. auto* hdr = reinterpret_cast(frameAssemblyDesc->slots[i].vaddr); hdr->err_code = DummyLivoxEthHeader::INVALID_ERR_CODE; hdr->timestamp_type = DummyLivoxEthHeader::INVALID_TIMESTAMP_TYPE; hdr->data_type = DummyLivoxEthHeader::INVALID_DATA_TYPE; } } size_t IoUringAssemblyEngine::computePointsPerDgram(int returnMode) { /* * Map modes to points per datagram based on Livox docs * 1: first, 2: strongest -> 96 samples => 96 points * 3: dual -> 48 samples * 2 points = 96 * 4: triple -> 30 samples * 3 points = 90 */ switch (returnMode) { case static_cast(livoxProto1::Device::ReturnMode::SingleFirst): case static_cast(livoxProto1::Device::ReturnMode::SingleStrongest): case static_cast(livoxProto1::Device::ReturnMode::Dual): return 96u; case static_cast(livoxProto1::Device::ReturnMode::Triple): return 90u; default: throw std::runtime_error( std::string(__func__) + ": Unknown returnMode " + std::to_string(returnMode)); } } } // namespace stim_buff } // namespace smo