IoUringAssmEngn: Implement shouldAcceptRequests daemon/async control

We've reworked the synchronous control functions that govern the
async daemon and in-flight requests for this class. The
shouldAcceptRequests flag represents the readiness state of the
whole engine class. The in-flight async operations consult the
shouldAcceptRequests flag to determine whether they should return
early.

Now the stop() method is solely for setting the locked flag
shouldAcceptRequests=false.

The pair resetAndAssembleFrame()/assemblyCycleComplete manage the
per-assembly cycle state machine, and they don't need to set or
interfere with the shouldAcceptRequests flag.
This commit is contained in:
2025-11-13 23:53:31 -04:00
parent 501effe6d5
commit 1df43665c3
2 changed files with 105 additions and 73 deletions
@@ -59,10 +59,9 @@ IoUringAssemblyEngine::IoUringAssemblyEngine(
PcloudStimulusBuffer& parent_, size_t nDgramsPerStagingBufferFrame_)
: parent(parent_),
frameAssemblyDesc(nullptr), ring{},
isSetup(false),
eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0),
stallTimer(parent_.device->componentThread->getIoService()),
isAssembling(false),
shouldAcceptRequests(false),
nDgramsPerStagingBufferFrame(nDgramsPerStagingBufferFrame_),
assembledSlotsTracker(nDgramsPerStagingBufferFrame_),
randomDevice(), randomGenerator(randomDevice())
@@ -70,8 +69,15 @@ randomDevice(), randomGenerator(randomDevice())
bool IoUringAssemblyEngine::setup()
{
if (isSetup)
{ return false; }
// Defensive check to prevent double-calling
{
SpinLock::Guard lock(shouldAcceptRequestsLock);
if (shouldAcceptRequests)
{
throw std::runtime_error(std::string(__func__) + ": setup() called "
"while already set up");
}
}
// Get FrameAssemblyDesc from staging buffer
frameAssemblyDesc = static_cast<std::shared_ptr<FrameAssemblyDesc>>(
@@ -152,7 +158,7 @@ bool IoUringAssemblyEngine::setup()
if (ret < 0)
{ goto cleanup_eventfd; }
isSetup = true;
shouldAcceptRequests = true;
return true;
cleanup_eventfd:
@@ -170,8 +176,7 @@ cleanup:
void IoUringAssemblyEngine::finalize()
{
// Call stop() to cancel in-flight operations (stop() already cancels the timer)
stop();
bool wasAcceptingRequests = stop();
if (eventfdFd >= 0)
{
@@ -180,13 +185,12 @@ void IoUringAssemblyEngine::finalize()
eventfdFd = -1;
}
if (isSetup)
if (wasAcceptingRequests)
{
#ifdef REGISTER_IOURING_BUFFERS
io_uring_unregister_buffers(&ring);
#endif
io_uring_queue_exit(&ring);
isSetup = false;
}
// Reset state to allow setup() to be called again
@@ -202,13 +206,10 @@ void IoUringAssemblyEngine::resetAndAssembleFrame(
+ ": onCqeReady callback is invalid");
}
if (!frameAssemblyDesc || !pcloudDataFdDesc || eventfdFd < 0)
if (!shouldAcceptRequests)
{
throw std::runtime_error(std::string(__func__)
+ ": invalid state: "
+ ( !frameAssemblyDesc ? "frameAssemblyDesc is null; " : "" )
+ ( !pcloudDataFdDesc ? "pcloudDataFdDesc is null; " : "" )
+ ( eventfdFd < 0 ? "eventfdFd is invalid." : "" ));
+ ": engine is not accepting requests");
}
// eventfdDesc should not be valid when resetAndAssembleFrame is called
@@ -286,8 +287,6 @@ void IoUringAssemblyEngine::resetAndAssembleFrame(
+ " (errno=" + std::to_string(errno) + ")");
}
// Set assembly flag
isAssembling = true;
// Start listening for CQE notifications on eventfd
eventfdDesc->async_read_some(
boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)),
@@ -297,40 +296,22 @@ void IoUringAssemblyEngine::resetAndAssembleFrame(
std::placeholders::_2));
}
void IoUringAssemblyEngine::stop(bool doAcquireLock)
bool IoUringAssemblyEngine::stop()
{
// Clear assembly flag first to signal onEventfdRead to stop re-arming
// Acquire and release lock tightly around setting the flag
if (doAcquireLock)
{
SpinLock::Guard lock(isAssemblingLock);
isAssembling = false;
} else {
isAssembling = false;
SpinLock::Guard lock(shouldAcceptRequestsLock);
bool wasAcceptingRequests = shouldAcceptRequests;
shouldAcceptRequests = false;
return wasAcceptingRequests;
}
/** FIXME:
* There's a problem with this bridge here.
*
* We can't delay during every call to stop because under normal operating
* conditions, this whole assembly process should be able to move as fast
* as possible and to receive as much data as possible without maximum
* throughput.
*
* Yet we need to delay briefly here to ensure that the onEventfdRead loop
* has a chance to see the flag and halt.
*
* We need to analyze this carefully and figure out what the correct
* conditions are for being certain that we aren't destroying state while
* the eventfdRead loop is still running; and we need to figure out how to
* ensure that we only delay when absolutely necessary.
*/
void IoUringAssemblyEngine::assemblyCycleComplete()
{
// Cancel in-flight stall timeout timer
stallTimer.cancel();
onCqeReadyCallback = std::move([](void *, int){});
onCqeReadyCallback = std::move([](void *, int, bool&){});
if (isSetup)
if (frameAssemblyDesc)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
if (!sqe)
@@ -380,8 +361,9 @@ cleanup_eventfd:
{
/** EXPLANATION:
* The goal here is to ensure that our io_service's event loop will not
* get any events from the eventfd after we've called stop(). So we
* completely deinitialize the eventfd descriptor.
* get any events from the eventfd after we've called
* assemblyCycleComplete(). So we completely deinitialize the eventfd
* descriptor.
*
* But we still want to reuse the underlying eventfd file descriptor,
* itself in the next resetAndAssembleFrame() cycle, so we call
@@ -392,6 +374,18 @@ cleanup_eventfd:
* io_service before releasing it, otherwise Boost.Asio will complain
* when we try to create a new descriptor with the same fd.
*/
/** CAVEAT:
* There's a rare but real race condition here where the eventfd gets an
* event signaled on it, and while boost is internally processing that
* event to enqeue our handler, we call cancel() and release() here.
* If boost internally has locking on the stream_descriptor object,
* this should be fine. But just in case it doesn't, I'm just
* documenting that possibility here.
*
* There's nothing we can really do about it except know that it would
* be very rarely happen; and that we can't do anything about it short
* of modifying the boost.Asio code.
*/
eventfdDesc->cancel();
eventfdDesc->release();
/* Destroy the descriptor object (now that it's unregistered, destroying
@@ -427,10 +421,12 @@ public:
void assembleFrameReq1_posted(
std::shared_ptr<AssembleFrameReq> context)
{
if (!engine.frameAssemblyDesc)
SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
if (!engine.shouldAcceptRequests)
{
throw std::runtime_error(std::string(__func__)
+ ": frameAssemblyDesc is null");
context->callOriginalCallback(false, AsynchronousLoop(0));
return;
}
// Initialize loop with number of slots
@@ -445,7 +441,8 @@ public:
engine.resetAndAssembleFrame(
std::bind(&AssembleFrameReq::assembleFrameReq2_2,
context.get(), context,
std::placeholders::_1, std::placeholders::_2));
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3));
// Set up timeout timer for CONFIG_STIMBUFF_FRAME_PERIOD_MS/2 ms
engine.stallTimer.expires_from_now(
@@ -476,25 +473,30 @@ public:
* indeed seen a SEGFAULT even in the current code with locking, so
* I'm going to hold the lock here for now.
*/
SpinLock::Guard lock(context->engine.isAssemblingLock);
SpinLock::Guard lock(context->engine.shouldAcceptRequestsLock);
if (!context->engine.isAssembling)
if (!context->engine.shouldAcceptRequests)
{
context->engine.assemblyCycleComplete();
context->loop.setRemainingIterationsToFailure();
context->callOriginalCallback(false, context->loop);
return;
}
// Set timer fired flag
context->timerFired.store(true);
context->assembleFrameReq3(context);
bool dummyStopListening = false; // Timer path doesn't need this
context->assembleFrameReq3(context, dummyStopListening);
}
void assembleFrameReq2_2(
std::shared_ptr<AssembleFrameReq> context,
void *user_data, int cqe_result)
void *user_data, int cqe_result, bool& stopListeningOnEventFd)
{
if (!context->engine.isAssembling)
// NB: The lock was acquired by onEventFdRead before calling this func
if (!context->engine.shouldAcceptRequests)
{
context->engine.assemblyCycleComplete();
context->loop.setRemainingIterationsToFailure();
context->callOriginalCallback(false, context->loop);
return;
@@ -513,16 +515,25 @@ public:
success))
{
// Loop is complete, call oracle function
context->assembleFrameReq3(context);
context->assembleFrameReq3(context, stopListeningOnEventFd);
}
}
void assembleFrameReq3(std::shared_ptr<AssembleFrameReq> context)
void assembleFrameReq3(
std::shared_ptr<AssembleFrameReq> context,
bool& stopListeningOnEventFd
)
{
/** EXPLANATION:
* All branch paths that invoke this unifyig oracle function are
* expected to already hold the shouldAcceptRequestsLock before calling
* it.
*/
// Ensure we only execute once using atomic exchange
if (context->handlerExecuted.exchange(true)) { return; }
// Cancel the timer, stop the engine and process frame, if any.
context->engine.stop(false);
context->engine.assemblyCycleComplete();
stopListeningOnEventFd = true;
/** EXPLANATION:
* Timeout doesn't necessarily mean error.
@@ -600,10 +611,13 @@ public:
void IoUringAssemblyEngine::assembleFrameReq(
Callback<assembleFrameReqCbFn> cb)
{
if (!frameAssemblyDesc)
{
throw std::runtime_error(std::string(__func__)
+ ": frameAssemblyDesc is null");
SpinLock::Guard lock(shouldAcceptRequestsLock);
if (!shouldAcceptRequests)
{
cb.callbackFn(false, AsynchronousLoop(0));
return;
}
}
const auto& caller = smoHooksPtr->ComponentThread_getSelf();
@@ -630,8 +644,20 @@ void IoUringAssemblyEngine::onEventfdRead(
* IoUringAssemblyEngine's per-assembly state isn't destroyed while this
* handler is running.
*/
SpinLock::Guard lock(isAssemblingLock);
if (!isAssembling) { return; }
SpinLock::Guard lock(shouldAcceptRequestsLock);
/** EXPLANATION:
* You'd think we should put check for shouldAcceptRequests here and
* `return` here if !shouldAcceptRequests, but we shouldn't because
* that would mean that we can't invoke the caller's callback. This would
* make the caller freeze forever.
*
* Instead we just let the onCqeReadyCallback check for
* shouldAcceptRequests. That way the onCqeReadyCallback can actually
* invoke the caller's callback, as it should. We have no knowledge of the
* caller's callback because we don't have access to the caller's
* continuation object. The onCqeReadyCallback does have access to it,
* so we leave that up to it.
*/
/** FIXME:
* It may be necessary to specifically check for and handle the cancel op
@@ -639,6 +665,7 @@ void IoUringAssemblyEngine::onEventfdRead(
*/
// Process all available CQEs and call callback for each one
bool stopListeningOnEventFd = false;
struct io_uring_cqe *cqe;
while (io_uring_peek_cqe(&ring, &cqe) == 0)
{
@@ -662,11 +689,15 @@ void IoUringAssemblyEngine::onEventfdRead(
* because of this.
*/
if (onCqeReadyCallback) {
onCqeReadyCallback(user_data, cqe_result);
onCqeReadyCallback(user_data, cqe_result, stopListeningOnEventFd);
}
}
if (!isAssembling) { return; }
/** EXPLANATION:
* But we do put a `return` here because we know that at this point, the
* caller's callback has already been invoked.
*/
if (!shouldAcceptRequests || stopListeningOnEventFd) { return; }
// Re-arm the eventfd read for next CQE notification
eventfdDesc->async_read_some(
@@ -47,9 +47,10 @@ public:
{ return nSucceeded != 0 && nTotal != 0 && nSucceeded != nTotal; }
private:
typedef std::function<void(void*, int)> resetAndAssembleFrameCbFn;
typedef std::function<void(void*, int, bool&)> resetAndAssembleFrameCbFn;
void resetAndAssembleFrame(resetAndAssembleFrameCbFn onCqeReady);
void stop(bool doAcquireLock = true);
void assemblyCycleComplete();
bool stop();
private:
PcloudStimulusBuffer& parent;
@@ -59,7 +60,6 @@ private:
// io_uring infrastructure
struct io_uring ring;
bool isSetup;
// Eventfd for CQE notifications (used with boost's unified loop)
int eventfdFd;
@@ -72,12 +72,13 @@ private:
boost::asio::deadline_timer stallTimer;
// Callback for CQE ntfns (called with user_data+result from each CQE)
resetAndAssembleFrameCbFn onCqeReadyCallback;
// Flag to indicate assembly is in progress (cleared by stop())
// Protected by isAssemblingLock
SpinLock isAssemblingLock;
bool isAssembling;
/** EXPLANATION:
* Flag to indicate whether engine should accept new requests.
* Set by setup(), cleared by stop().
*/
SpinLock shouldAcceptRequestsLock;
bool shouldAcceptRequests;
// Number of datagrams per staging buffer frame
size_t nDgramsPerStagingBufferFrame;
struct SlotAssemblyDesc