From a31a21be651ab6900af6c4892655531ea56bce6e Mon Sep 17 00:00:00 2001 From: Hayodea Hekol Date: Tue, 9 Jun 2026 20:40:39 -0400 Subject: [PATCH] IoUringAssmEngn: fix Io_uring CQE cancel path --- .../livoxGen1/ioUringAssemblyEngine.cpp | 111 ++++++++++++++---- 1 file changed, 86 insertions(+), 25 deletions(-) diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp index 1fa27c4..a28f4dc 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp @@ -33,6 +33,63 @@ namespace stim_buff { #define SMO_PRINT_PCLOUD_ASSEMBLY_RESULTS 1 #endif +namespace { + +constexpr long IOURINGASSM_ENGN_CANCEL_CQE_WAIT_MS = 3; + +void *assemblyCancelRequestUserData(size_t numSlots) +{ + return reinterpret_cast(numSlots); +} + +bool drainCancelCompletionCqe( + struct io_uring *ring, + void *cancelRequestUserData, + long waitBudgetMs) +{ + bool sawCancelCqe = false; + + auto drainPeekableCqes = [&]() + { + struct io_uring_cqe *cqe; + while (io_uring_peek_cqe(ring, &cqe) == 0) + { + void *userData = io_uring_cqe_get_data(cqe); + if (userData == cancelRequestUserData) { + sawCancelCqe = true; + } + io_uring_cqe_seen(ring, cqe); + } + }; + + drainPeekableCqes(); + if (sawCancelCqe) { + return true; + } + + struct __kernel_timespec waitTimeout = { + .tv_sec = static_cast<__kernel_time64_t>(waitBudgetMs / 1000), + .tv_nsec = static_cast((waitBudgetMs % 1000) * 1000000L), + }; + + struct io_uring_cqe *cqe; + const int waitRet = io_uring_wait_cqe_timeout( + ring, &cqe, &waitTimeout); + if (waitRet == 0) + { + void *userData = io_uring_cqe_get_data(cqe); + if (userData == cancelRequestUserData) { + sawCancelCqe = true; + } + io_uring_cqe_seen(ring, cqe); + drainPeekableCqes(); + } + + return sawCancelCqe; +} + +} // namespace + inline LivoxProto1DllState& getLivoxProto1State() { return livoxProto1; } struct DummyLivoxEthHeader @@ -340,37 +397,41 @@ void IoUringAssemblyEngine::assemblyCycleComplete() goto cleanup_eventfd; } - /* Cancel all in-flight operations on our ring - * using IORING_ASYNC_CANCEL_ANY. Identify the CQE for the cancel - * op as numSlots since numSlots is an invalid slot index for a - * real slot. + void *cancelRequestUserData = assemblyCancelRequestUserData( + frameAssemblyDesc->numSlots); + + /* Cancel all in-flight recvmsg ops on this ring. ANY|ALL cancels + * every outstanding request. Tag the cancel SQE itself with + * numSlots (an invalid slot index) so its completion CQE is + * distinguishable from slot recv CQEs. */ io_uring_prep_cancel( - sqe, reinterpret_cast(frameAssemblyDesc->numSlots), - IORING_ASYNC_CANCEL_ANY); + sqe, nullptr, + IORING_ASYNC_CANCEL_ANY | IORING_ASYNC_CANCEL_ALL); + io_uring_sqe_set_data(sqe, cancelRequestUserData); - io_uring_submit(&ring); - - /* Wait for cancellation to complete. According to the man page, - * cancellation is synchronous and a CQE is guaranteed to be - * generated by the time submission returns. - */ - struct io_uring_cqe *cqe; - bool sawCancelCqe = false; - while (io_uring_peek_cqe(&ring, &cqe) == 0) + const int submitRet = io_uring_submit(&ring); + if (submitRet < 0) { - // Call seen() on all CQEs for completeness/correctness. - io_uring_cqe_seen(&ring, cqe); - void *user_data = io_uring_cqe_get_data(cqe); - if (user_data == reinterpret_cast( - frameAssemblyDesc->numSlots)) - { - sawCancelCqe = true; - } + std::cerr << __func__ << ": io_uring_submit for cancel failed: " + << std::strerror(-submitRet) + << " (ret=" << submitRet << ")\n"; + goto cleanup_eventfd; } - if (!sawCancelCqe && smoHooksPtr->OptionParser_getOptions().verbose) { - std::cerr << __func__ << ": no CQE seen for cancel operation\n"; + /* Drain peekable CQEs first, then wait briefly for the tagged + * cancel completion. The cancel CQE may arrive asynchronously + * via eventfd rather than being immediately peekable. + */ + const bool sawCancelCqe = drainCancelCompletionCqe( + &ring, cancelRequestUserData, + IOURINGASSM_ENGN_CANCEL_CQE_WAIT_MS); + + if (!sawCancelCqe && smoHooksPtr->OptionParser_getOptions().verbose) + { + std::cerr << __func__ << ": cancel completion CQE not seen " + << "within " << IOURINGASSM_ENGN_CANCEL_CQE_WAIT_MS + << "ms\n"; } }