#include #include #include #include #include #include #include #include #include #include #include #include #include #include "ioUringAssemblyEngine.h" #include "pcloudStimulusBuffer.h" #include "livoxGen1.h" namespace smo { namespace stim_buff { inline LivoxProto1DllState& getLivoxProto1State() { return livoxProto1; } struct DummyLivoxEthHeader { enum : uint32_t { INVALID_ERR_CODE = 0xFFFFFFFFu }; enum : uint8_t { INVALID_TIMESTAMP_TYPE = 0xFFu, INVALID_DATA_TYPE = 0xFFu }; uint8_t version, slot, id, rsvd; uint32_t err_code; uint8_t timestamp_type, data_type; uint8_t timestamp[8]; }; IoUringAssemblyEngine::IoUringAssemblyEngine(PcloudStimulusBuffer& parent_) : parent(parent_), frameAssemblyDesc(nullptr), ring{}, isSetup(false), eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0), stallTimer(parent_.device->componentThread->getIoService()) {} bool IoUringAssemblyEngine::setup() { if (isSetup) { return false; } // Get FrameAssemblyDesc from staging buffer frameAssemblyDesc = static_cast>( parent.stagingBuffer); if (!frameAssemblyDesc || frameAssemblyDesc->slots.empty()) { return false; } // Get point cloud data socket descriptor from UdpCommandDemuxer auto& livoxState = getLivoxProto1State(); if (!livoxState.livoxProto1_getPcloudDataFdDesc) { return false; } pcloudDataFdDesc = (*livoxState.livoxProto1_getPcloudDataFdDesc)(); if (!pcloudDataFdDesc) { return false; } // Get UDP socket file descriptor int udpFd = pcloudDataFdDesc->native_handle(); if (udpFd < 0) { return false; } // Declare iovec early to avoid goto crossing initialization struct iovec iov; int ret; /** EXPLANATION: * Initialize io_uring ring - allocate SQEs and CQEs for one frame assembly * One SQE per slot (one datagram per slot), plus one extra for cancel * operations, since io_uring_prep_cancel() requires a valid SQE. So we * alloc 1 extra SQE to guarantee that we will always have an available SQE * for cancel operations. */ ret = io_uring_queue_init( static_cast(frameAssemblyDesc->numSlots + 1), &ring, 0); if (ret < 0) { goto cleanup; } // Register staging buffer with io_uring for DMA-apt I/O iov = parent.stagingBuffer.getIoUringRegisterIoVec(); ret = io_uring_register_buffers(&ring, &iov, 1); if (ret < 0) { goto cleanup_ring; } // Create eventfd for CQE notifications (used with boost's unified loop) eventfdFd = eventfd(0, EFD_NONBLOCK); if (eventfdFd < 0) { goto cleanup_buffers; } // Register eventfd with io_uring ret = io_uring_register_eventfd(&ring, eventfdFd); if (ret < 0) { goto cleanup_eventfd; } isSetup = true; return true; cleanup_eventfd: close(eventfdFd); eventfdFd = -1; cleanup_buffers: io_uring_unregister_buffers(&ring); cleanup_ring: io_uring_queue_exit(&ring); cleanup: return false; } void IoUringAssemblyEngine::finalize() { // Call stop() to cancel in-flight operations (stop() already cancels the timer) stop(); if (eventfdFd >= 0) { io_uring_unregister_eventfd(&ring); close(eventfdFd); eventfdFd = -1; } if (isSetup) { io_uring_unregister_buffers(&ring); io_uring_queue_exit(&ring); isSetup = false; } // Reset state to allow setup() to be called again frameAssemblyDesc = nullptr; } void IoUringAssemblyEngine::resetAndAssembleFrame( std::function onCqeReady) { if (!onCqeReady) { throw std::runtime_error(std::string(__func__) + ": onCqeReady callback is invalid"); } if (!frameAssemblyDesc || !pcloudDataFdDesc || eventfdFd < 0) { throw std::runtime_error(std::string(__func__) + ": invalid state: " + ( !frameAssemblyDesc ? "frameAssemblyDesc is null; " : "" ) + ( !pcloudDataFdDesc ? "pcloudDataFdDesc is null; " : "" ) + ( eventfdFd < 0 ? "eventfdFd is invalid." : "" )); } // eventfdDesc should not be valid when resetAndAssembleFrame is called if (eventfdDesc) { throw std::runtime_error(std::string(__func__) + ": eventfdDesc is already set"); } // Store the callback for re-arming onCqeReadyCallback = std::move(onCqeReady); /** EXPLANATION: * Flush eventfd state: poll and read any pending events before creating * descriptor. * Use poll() to check if data is available (non-blocking check). * If data is available, read it to flush. */ struct pollfd pfd; pfd.fd = eventfdFd; pfd.events = POLLIN; pfd.revents = 0; int poll_ret = poll(&pfd, 1, 0); // Timeout 0 = non-blocking if (poll_ret > 0 && (pfd.revents & POLLIN)) { uint64_t discard; ssize_t ret = read(eventfdFd, &discard, sizeof(discard)); (void)ret; // Ignore errors - just trying to flush } eventfdDesc = std::make_unique( parent.device->componentThread->getIoService(), eventfdFd); if (!eventfdDesc) { throw std::runtime_error(std::string(__func__) + ": failed to create eventfd stream descriptor"); } // Get UDP socket file descriptor int udpFd = pcloudDataFdDesc->native_handle(); if (udpFd < 0) { throw std::runtime_error(std::string(__func__) + ": invalid UDP socket file descriptor"); } // Prepare SQEs for each slot in the frame struct io_uring_sqe *sqe; for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i) { sqe = io_uring_get_sqe(&ring); if (!sqe) { throw std::runtime_error(std::string(__func__) + ": failed to get SQE for slot " + std::to_string(i)); } const auto& slot = frameAssemblyDesc->slots[i]; // Prepare recvmsg SQE for this slot struct msghdr msg = {}; struct iovec iov; iov.iov_base = slot.vaddr; iov.iov_len = slot.nBytes; msg.msg_iov = &iov; msg.msg_iovlen = 1; io_uring_prep_recvmsg(sqe, udpFd, &msg, 0); // Set user_data to slot index for tracking io_uring_sqe_set_data(sqe, reinterpret_cast(i)); } // Submit all SQEs int ret = io_uring_submit(&ring); if (ret < 0) { throw std::runtime_error(std::string(__func__) + ": io_uring_submit failed: " + std::strerror(errno) + " (errno=" + std::to_string(errno) + ")"); } // Start listening for CQE notifications on eventfd eventfdDesc->async_read_some( boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)), std::bind( &IoUringAssemblyEngine::onEventfdRead, this, std::placeholders::_1, std::placeholders::_2)); } void IoUringAssemblyEngine::stop() { // Cancel in-flight stall timeout timer stallTimer.cancel(); if (isSetup) { struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); if (!sqe) { std::cerr << __func__ << ": failed to get SQE for cancel op. " << "Continuing cleanup without cancelling.\n"; goto cleanup_eventfd; } /* Cancel all in-flight operations on our ring * using IORING_ASYNC_CANCEL_ANY. Identify the CQE for the cancel * op as numSlots since numSlots is an invalid slot index for a * real slot. */ io_uring_prep_cancel( sqe, reinterpret_cast(frameAssemblyDesc->numSlots), IORING_ASYNC_CANCEL_ANY); io_uring_submit(&ring); /* Wait for cancellation to complete. According to the man page, * cancellation is synchronous and a CQE is guaranteed to be * generated by the time submission returns. */ struct io_uring_cqe *cqe; bool sawCancelCqe = false; while (io_uring_peek_cqe(&ring, &cqe) == 0) { // Call seen() on all CQEs for completeness/correctness. io_uring_cqe_seen(&ring, cqe); void *user_data = io_uring_cqe_get_data(cqe); if (user_data == reinterpret_cast( frameAssemblyDesc->numSlots)) { sawCancelCqe = true; } } if (!sawCancelCqe) { std::cerr << __func__ << ": no CQE seen for cancel operation\n"; } } cleanup_eventfd: if (eventfdDesc) { /** EXPLANATION: * The goal here is to ensure that our io_service's event loop will not * get any events from the eventfd after we've called stop(). So we * completely deinitialize the eventfd descriptor. * * But we still want to reuse the underlying eventfd file descriptor, * itself in the next resetAndAssembleFrame() cycle, so we call * release() instead of reset() to ensure that the underlying fd * is not closed. */ eventfdDesc->cancel(); eventfdDesc.release(); } onCqeReadyCallback = nullptr; } void IoUringAssemblyEngine::onEventfdRead( const boost::system::error_code& error, std::size_t bytes_transferred) { (void)bytes_transferred; // Ignore cancellation errors if (error == boost::asio::error::operation_aborted) { return; } /** FIXME: * It may be necessary to specifically check for and handle the cancel op * CQE here. I'm not sure as yet though, but I'll highlight it here for now. */ // Process all available CQEs and call callback for each one struct io_uring_cqe *cqe; while (io_uring_peek_cqe(&ring, &cqe) == 0) { // Get user_data from the CQE void* user_data = io_uring_cqe_get_data(cqe); // Mark the CQE as seen io_uring_cqe_seen(&ring, cqe); /** EXPLANATION: * Call the user-provided callback for this CQE with its user_data. * * 1. Notice that we call the caller's cb *after* marking the CQE as * seen. We may later need to change this if the caller needs * information from the CQE before it is marked as seen. * * 2. Notice that we do not check for or filter out the cancel op CQE * here. The caller's handler will be able to see the cancel op CQE * because of this. */ if (onCqeReadyCallback) { onCqeReadyCallback(user_data); } } // Re-arm the eventfd read for next CQE notification if (eventfdDesc && eventfdFd >= 0) { eventfdDesc->async_read_some( boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)), std::bind(&IoUringAssemblyEngine::onEventfdRead, this, std::placeholders::_1, std::placeholders::_2)); } } void IoUringAssemblyEngine::cancelIncompleteAndFillDummies() { if (!frameAssemblyDesc) { return; } for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i) { // In the real path, decide from CQE accounting whether slot i completed. // Here, demonstrate dummy header insertion API. auto* hdr = reinterpret_cast(frameAssemblyDesc->slots[i].vaddr); hdr->err_code = DummyLivoxEthHeader::INVALID_ERR_CODE; hdr->timestamp_type = DummyLivoxEthHeader::INVALID_TIMESTAMP_TYPE; hdr->data_type = DummyLivoxEthHeader::INVALID_DATA_TYPE; } } size_t IoUringAssemblyEngine::computePointsPerDgram(int returnMode) { /* * Map modes to points per datagram based on Livox docs * 1: first, 2: strongest -> 96 samples => 96 points * 3: dual -> 48 samples * 2 points = 96 * 4: triple -> 30 samples * 3 points = 90 */ switch (returnMode) { case static_cast(livoxProto1::Device::ReturnMode::SingleFirst): case static_cast(livoxProto1::Device::ReturnMode::SingleStrongest): case static_cast(livoxProto1::Device::ReturnMode::Dual): return 96u; case static_cast(livoxProto1::Device::ReturnMode::Triple): return 90u; default: throw std::runtime_error( std::string(__func__) + ": Unknown returnMode " + std::to_string(returnMode)); } } } // namespace stim_buff } // namespace smo