diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp index 8882094..0dacf6e 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp @@ -1,9 +1,13 @@ #include #include +#include +#include #include #include #include +#include #include +#include #include #include #include @@ -36,7 +40,8 @@ IoUringAssemblyEngine::IoUringAssemblyEngine(PcloudStimulusBuffer& parent_) : parent(parent_), frameAssemblyDesc(nullptr), ring{}, isSetup(false), -eventfdFd(-1), stallTimer(parent_.device->componentThread->getIoService()) +eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0), +stallTimer(parent_.device->componentThread->getIoService()) {} bool IoUringAssemblyEngine::setup() @@ -70,10 +75,13 @@ bool IoUringAssemblyEngine::setup() /** EXPLANATION: * Initialize io_uring ring - allocate SQEs and CQEs for one frame assembly - * One SQE per slot (one datagram per slot) + * One SQE per slot (one datagram per slot), plus one extra for cancel + * operations, since io_uring_prep_cancel() requires a valid SQE. So we + * alloc 1 extra SQE to guarantee that we will always have an available SQE + * for cancel operations. */ ret = io_uring_queue_init( - static_cast(frameAssemblyDesc->numSlots), &ring, 0); + static_cast(frameAssemblyDesc->numSlots + 1), &ring, 0); if (ret < 0) { goto cleanup; } @@ -130,28 +138,232 @@ void IoUringAssemblyEngine::finalize() frameAssemblyDesc = nullptr; } -void IoUringAssemblyEngine::resetAndAssembleFrame() +void IoUringAssemblyEngine::resetAndAssembleFrame( + std::function onCqeReady) { - // Design/stub: This method should: - // 1. Submit frameAssemblyDesc->numSlots RECVMSG SQEs using io_uring_prep_recvmsg() - // - Each SQE receives into frameAssemblyDesc->slots[i].vaddr - // - With size frameAssemblyDesc->slots[i].nBytes - // - Socket FD from pcloudDataFdDesc->native_handle() - // 2. Submit batch via io_uring_submit(&ring) - // 3. Set up stall timer using stallTimer with appropriate timeout - // - SQEs are independent and can arrive out of order - // - Timer detects if SQEs get stalled + if (!onCqeReady) + { + throw std::runtime_error(std::string(__func__) + + ": onCqeReady callback is invalid"); + } + + if (!frameAssemblyDesc || !pcloudDataFdDesc || eventfdFd < 0) + { + throw std::runtime_error(std::string(__func__) + + ": invalid state: " + + ( !frameAssemblyDesc ? "frameAssemblyDesc is null; " : "" ) + + ( !pcloudDataFdDesc ? "pcloudDataFdDesc is null; " : "" ) + + ( eventfdFd < 0 ? "eventfdFd is invalid." : "" )); + } + + // eventfdDesc should not be valid when resetAndAssembleFrame is called + if (eventfdDesc) + { + throw std::runtime_error(std::string(__func__) + + ": eventfdDesc is already set"); + } + + // Store the callback for re-arming + onCqeReadyCallback = std::move(onCqeReady); + + /** EXPLANATION: + * Flush eventfd state: poll and read any pending events before creating + * descriptor. + * Use poll() to check if data is available (non-blocking check). + * If data is available, read it to flush. + */ + struct pollfd pfd; + pfd.fd = eventfdFd; + pfd.events = POLLIN; + pfd.revents = 0; + int poll_ret = poll(&pfd, 1, 0); // Timeout 0 = non-blocking + if (poll_ret > 0 && (pfd.revents & POLLIN)) + { + uint64_t discard; + ssize_t ret = read(eventfdFd, &discard, sizeof(discard)); + (void)ret; // Ignore errors - just trying to flush + } + + eventfdDesc = std::make_unique( + parent.device->componentThread->getIoService(), eventfdFd); + + if (!eventfdDesc) + { + throw std::runtime_error(std::string(__func__) + + ": failed to create eventfd stream descriptor"); + } + + // Get UDP socket file descriptor + int udpFd = pcloudDataFdDesc->native_handle(); + if (udpFd < 0) + { + throw std::runtime_error(std::string(__func__) + + ": invalid UDP socket file descriptor"); + } + + // Prepare SQEs for each slot in the frame + struct io_uring_sqe *sqe; + for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i) + { + sqe = io_uring_get_sqe(&ring); + if (!sqe) + { + throw std::runtime_error(std::string(__func__) + + ": failed to get SQE for slot " + std::to_string(i)); + } + + const auto& slot = frameAssemblyDesc->slots[i]; + + // Prepare recvmsg SQE for this slot + struct msghdr msg = {}; + struct iovec iov; + iov.iov_base = slot.vaddr; + iov.iov_len = slot.nBytes; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + io_uring_prep_recvmsg(sqe, udpFd, &msg, 0); + // Set user_data to slot index for tracking + io_uring_sqe_set_data(sqe, reinterpret_cast(i)); + } + + // Submit all SQEs + int ret = io_uring_submit(&ring); + if (ret < 0) + { + throw std::runtime_error(std::string(__func__) + + ": io_uring_submit failed: " + std::strerror(errno) + + " (errno=" + std::to_string(errno) + ")"); + } + + // Start listening for CQE notifications on eventfd + eventfdDesc->async_read_some( + boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)), + std::bind( + &IoUringAssemblyEngine::onEventfdRead, this, + std::placeholders::_1, + std::placeholders::_2)); } void IoUringAssemblyEngine::stop() { - // Design/stub: This method should: - // 1. Cancel all pending SQEs using io_uring cancellation mechanisms - // 2. Cancel in-flight stall timeout timer via stallTimer.cancel() - // 3. Set appropriate state flags - // Cancel in-flight stall timeout timer stallTimer.cancel(); + + if (isSetup) + { + struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); + if (!sqe) + { + std::cerr << __func__ << ": failed to get SQE for cancel op. " + << "Continuing cleanup without cancelling.\n"; + + goto cleanup_eventfd; + } + + /* Cancel all in-flight operations on our ring + * using IORING_ASYNC_CANCEL_ANY. Identify the CQE for the cancel + * op as numSlots since numSlots is an invalid slot index for a + * real slot. + */ + io_uring_prep_cancel( + sqe, reinterpret_cast(frameAssemblyDesc->numSlots), + IORING_ASYNC_CANCEL_ANY); + + io_uring_submit(&ring); + + /* Wait for cancellation to complete. According to the man page, + * cancellation is synchronous and a CQE is guaranteed to be + * generated by the time submission returns. + */ + struct io_uring_cqe *cqe; + bool sawCancelCqe = false; + while (io_uring_peek_cqe(&ring, &cqe) == 0) + { + // Call seen() on all CQEs for completeness/correctness. + io_uring_cqe_seen(&ring, cqe); + void *user_data = io_uring_cqe_get_data(cqe); + if (user_data == reinterpret_cast( + frameAssemblyDesc->numSlots)) + { + sawCancelCqe = true; + } + } + + if (!sawCancelCqe) { + std::cerr << __func__ << ": no CQE seen for cancel operation\n"; + } + } + +cleanup_eventfd: + if (eventfdDesc) + { + /** EXPLANATION: + * The goal here is to ensure that our io_service's event loop will not + * get any events from the eventfd after we've called stop(). So we + * completely deinitialize the eventfd descriptor. + * + * But we still want to reuse the underlying eventfd file descriptor, + * itself in the next resetAndAssembleFrame() cycle, so we call + * release() instead of reset() to ensure that the underlying fd + * is not closed. + */ + eventfdDesc->cancel(); + eventfdDesc.release(); + } + + onCqeReadyCallback = nullptr; +} + +void IoUringAssemblyEngine::onEventfdRead( + const boost::system::error_code& error, + std::size_t bytes_transferred) +{ + (void)bytes_transferred; + + // Ignore cancellation errors + if (error == boost::asio::error::operation_aborted) + { return; } + + /** FIXME: + * It may be necessary to specifically check for and handle the cancel op + * CQE here. I'm not sure as yet though, but I'll highlight it here for now. + */ + + // Process all available CQEs and call callback for each one + struct io_uring_cqe *cqe; + while (io_uring_peek_cqe(&ring, &cqe) == 0) + { + // Get user_data from the CQE + void* user_data = io_uring_cqe_get_data(cqe); + // Mark the CQE as seen + io_uring_cqe_seen(&ring, cqe); + + /** EXPLANATION: + * Call the user-provided callback for this CQE with its user_data. + * + * 1. Notice that we call the caller's cb *after* marking the CQE as + * seen. We may later need to change this if the caller needs + * information from the CQE before it is marked as seen. + * + * 2. Notice that we do not check for or filter out the cancel op CQE + * here. The caller's handler will be able to see the cancel op CQE + * because of this. + */ + if (onCqeReadyCallback) { + onCqeReadyCallback(user_data); + } + } + + // Re-arm the eventfd read for next CQE notification + if (eventfdDesc && eventfdFd >= 0) + { + eventfdDesc->async_read_some( + boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)), + std::bind(&IoUringAssemblyEngine::onEventfdRead, this, + std::placeholders::_1, + std::placeholders::_2)); + } } void IoUringAssemblyEngine::cancelIncompleteAndFillDummies() diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h index 8dc9f92..7c7e13e 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include "frameAssemblyDesc.h" @@ -28,7 +29,8 @@ public: bool setup(); void finalize(); - void resetAndAssembleFrame(); + void resetAndAssembleFrame( + std::function onCqeReady); void stop(); // Telemetry helpers @@ -48,13 +50,19 @@ private: // Eventfd for CQE notifications (used with boost's unified loop) int eventfdFd; + std::unique_ptr eventfdDesc; + uint64_t eventfd_value; // Buffer for async_read_some // Point cloud data socket descriptor std::shared_ptr pcloudDataFdDesc; // Stall detection timer boost::asio::deadline_timer stallTimer; + // Callback for CQE notifications (called with user_data from each CQE) + std::function onCqeReadyCallback; void cancelIncompleteAndFillDummies(); + void onEventfdRead( + const boost::system::error_code& error, std::size_t bytes_transferred); }; } // namespace stim_buff