IoUringAssmEngn: fix Io_uring CQE cancel path
This commit is contained in:
@@ -33,6 +33,63 @@ namespace stim_buff {
|
|||||||
#define SMO_PRINT_PCLOUD_ASSEMBLY_RESULTS 1
|
#define SMO_PRINT_PCLOUD_ASSEMBLY_RESULTS 1
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
constexpr long IOURINGASSM_ENGN_CANCEL_CQE_WAIT_MS = 3;
|
||||||
|
|
||||||
|
void *assemblyCancelRequestUserData(size_t numSlots)
|
||||||
|
{
|
||||||
|
return reinterpret_cast<void*>(numSlots);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool drainCancelCompletionCqe(
|
||||||
|
struct io_uring *ring,
|
||||||
|
void *cancelRequestUserData,
|
||||||
|
long waitBudgetMs)
|
||||||
|
{
|
||||||
|
bool sawCancelCqe = false;
|
||||||
|
|
||||||
|
auto drainPeekableCqes = [&]()
|
||||||
|
{
|
||||||
|
struct io_uring_cqe *cqe;
|
||||||
|
while (io_uring_peek_cqe(ring, &cqe) == 0)
|
||||||
|
{
|
||||||
|
void *userData = io_uring_cqe_get_data(cqe);
|
||||||
|
if (userData == cancelRequestUserData) {
|
||||||
|
sawCancelCqe = true;
|
||||||
|
}
|
||||||
|
io_uring_cqe_seen(ring, cqe);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
drainPeekableCqes();
|
||||||
|
if (sawCancelCqe) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct __kernel_timespec waitTimeout = {
|
||||||
|
.tv_sec = static_cast<__kernel_time64_t>(waitBudgetMs / 1000),
|
||||||
|
.tv_nsec = static_cast<long long>((waitBudgetMs % 1000) * 1000000L),
|
||||||
|
};
|
||||||
|
|
||||||
|
struct io_uring_cqe *cqe;
|
||||||
|
const int waitRet = io_uring_wait_cqe_timeout(
|
||||||
|
ring, &cqe, &waitTimeout);
|
||||||
|
if (waitRet == 0)
|
||||||
|
{
|
||||||
|
void *userData = io_uring_cqe_get_data(cqe);
|
||||||
|
if (userData == cancelRequestUserData) {
|
||||||
|
sawCancelCqe = true;
|
||||||
|
}
|
||||||
|
io_uring_cqe_seen(ring, cqe);
|
||||||
|
drainPeekableCqes();
|
||||||
|
}
|
||||||
|
|
||||||
|
return sawCancelCqe;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
inline LivoxProto1DllState& getLivoxProto1State() { return livoxProto1; }
|
inline LivoxProto1DllState& getLivoxProto1State() { return livoxProto1; }
|
||||||
|
|
||||||
struct DummyLivoxEthHeader
|
struct DummyLivoxEthHeader
|
||||||
@@ -340,37 +397,41 @@ void IoUringAssemblyEngine::assemblyCycleComplete()
|
|||||||
goto cleanup_eventfd;
|
goto cleanup_eventfd;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Cancel all in-flight operations on our ring
|
void *cancelRequestUserData = assemblyCancelRequestUserData(
|
||||||
* using IORING_ASYNC_CANCEL_ANY. Identify the CQE for the cancel
|
frameAssemblyDesc->numSlots);
|
||||||
* op as numSlots since numSlots is an invalid slot index for a
|
|
||||||
* real slot.
|
/* Cancel all in-flight recvmsg ops on this ring. ANY|ALL cancels
|
||||||
|
* every outstanding request. Tag the cancel SQE itself with
|
||||||
|
* numSlots (an invalid slot index) so its completion CQE is
|
||||||
|
* distinguishable from slot recv CQEs.
|
||||||
*/
|
*/
|
||||||
io_uring_prep_cancel(
|
io_uring_prep_cancel(
|
||||||
sqe, reinterpret_cast<void*>(frameAssemblyDesc->numSlots),
|
sqe, nullptr,
|
||||||
IORING_ASYNC_CANCEL_ANY);
|
IORING_ASYNC_CANCEL_ANY | IORING_ASYNC_CANCEL_ALL);
|
||||||
|
io_uring_sqe_set_data(sqe, cancelRequestUserData);
|
||||||
|
|
||||||
io_uring_submit(&ring);
|
const int submitRet = io_uring_submit(&ring);
|
||||||
|
if (submitRet < 0)
|
||||||
/* Wait for cancellation to complete. According to the man page,
|
|
||||||
* cancellation is synchronous and a CQE is guaranteed to be
|
|
||||||
* generated by the time submission returns.
|
|
||||||
*/
|
|
||||||
struct io_uring_cqe *cqe;
|
|
||||||
bool sawCancelCqe = false;
|
|
||||||
while (io_uring_peek_cqe(&ring, &cqe) == 0)
|
|
||||||
{
|
{
|
||||||
// Call seen() on all CQEs for completeness/correctness.
|
std::cerr << __func__ << ": io_uring_submit for cancel failed: "
|
||||||
io_uring_cqe_seen(&ring, cqe);
|
<< std::strerror(-submitRet)
|
||||||
void *user_data = io_uring_cqe_get_data(cqe);
|
<< " (ret=" << submitRet << ")\n";
|
||||||
if (user_data == reinterpret_cast<void*>(
|
goto cleanup_eventfd;
|
||||||
frameAssemblyDesc->numSlots))
|
|
||||||
{
|
|
||||||
sawCancelCqe = true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!sawCancelCqe && smoHooksPtr->OptionParser_getOptions().verbose) {
|
/* Drain peekable CQEs first, then wait briefly for the tagged
|
||||||
std::cerr << __func__ << ": no CQE seen for cancel operation\n";
|
* cancel completion. The cancel CQE may arrive asynchronously
|
||||||
|
* via eventfd rather than being immediately peekable.
|
||||||
|
*/
|
||||||
|
const bool sawCancelCqe = drainCancelCompletionCqe(
|
||||||
|
&ring, cancelRequestUserData,
|
||||||
|
IOURINGASSM_ENGN_CANCEL_CQE_WAIT_MS);
|
||||||
|
|
||||||
|
if (!sawCancelCqe && smoHooksPtr->OptionParser_getOptions().verbose)
|
||||||
|
{
|
||||||
|
std::cerr << __func__ << ": cancel completion CQE not seen "
|
||||||
|
<< "within " << IOURINGASSM_ENGN_CANCEL_CQE_WAIT_MS
|
||||||
|
<< "ms\n";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user