Revert "LivoxGen1: Use syncCancelerForAsyncWork in producer pipeline"

This reverts commit d788810a05.

We're doing this because it's not necessary. We will be porting to
coros soon and we can just use brace-scopes.
This commit is contained in:
2026-05-30 07:18:29 -04:00
parent d788810a05
commit 322a8137b2
7 changed files with 443 additions and 508 deletions
@@ -22,6 +22,7 @@
#include <spinscale/cps/asynchronousBridge.h>
#include <spinscale/cps/callback.h>
#include <spinscale/cps/callableTracer.h>
#include <spinscale/spinLock.h>
#include "ioUringAssemblyEngine.h"
#include "pcloudStimulusProducer.h"
#include "livoxGen1.h"
@@ -58,6 +59,7 @@ IoUringAssemblyEngine::IoUringAssemblyEngine(
frameAssemblyDesc(nullptr), ring{},
eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0),
stallTimer(parent_.device->componentThread->getIoService()),
shouldAcceptRequests(false),
nDgramsPerStagingBufferFrame(nDgramsPerStagingBufferFrame_),
assembledSlotsTracker(nDgramsPerStagingBufferFrame_),
randomDevice(), randomGenerator(randomDevice())
@@ -66,10 +68,13 @@ randomDevice(), randomGenerator(randomDevice())
bool IoUringAssemblyEngine::setup()
{
// Defensive check to prevent double-calling
if (!ioUringAssemblyEngnCanceler.isCancellationRequested())
{
throw std::runtime_error(std::string(__func__) + ": setup() called "
"while already set up");
sscl::SpinLock::Guard lock(shouldAcceptRequestsLock);
if (shouldAcceptRequests)
{
throw std::runtime_error(std::string(__func__) + ": setup() called "
"while already set up");
}
}
// Get FrameAssemblyDesc from staging buffer
@@ -151,7 +156,7 @@ bool IoUringAssemblyEngine::setup()
if (ret < 0)
{ goto cleanup_eventfd; }
ioUringAssemblyEngnCanceler.startAcceptingWork();
shouldAcceptRequests = true;
return true;
cleanup_eventfd:
@@ -224,7 +229,7 @@ void IoUringAssemblyEngine::resetAndAssembleFrame(
+ ": onCqeReady callback is invalid");
}
if (ioUringAssemblyEngnCanceler.isCancellationRequestedUnlocked())
if (!shouldAcceptRequests)
{
throw std::runtime_error(std::string(__func__)
+ ": engine is not accepting requests");
@@ -316,7 +321,11 @@ void IoUringAssemblyEngine::resetAndAssembleFrame(
bool IoUringAssemblyEngine::stop()
{
return ioUringAssemblyEngnCanceler.requestStop();
// Acquire and release lock tightly around setting the flag
sscl::SpinLock::Guard lock(shouldAcceptRequestsLock);
bool wasAcceptingRequests = shouldAcceptRequests;
shouldAcceptRequests = false;
return wasAcceptingRequests;
}
void IoUringAssemblyEngine::assemblyCycleComplete()
@@ -435,44 +444,39 @@ public:
void assembleFrameReq1_posted(
std::shared_ptr<AssembleFrameReq> context)
{
auto& canceler = engine.ioUringAssemblyEngnCanceler;
const bool started = canceler.execUncancelableSegmentOrAbort(
[context, this]()
{
// Initialize loop with number of slots
context->loop = sscl::AsynchronousLoop(
engine.frameAssemblyDesc->numSlots);
sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
// Record assembly start time
engine.assemblyStartTime =
std::chrono::high_resolution_clock::now();
/** FIXME:
* I'm suspicious of this std::bind return object here. What if us
* setting it to null inside of stop() doesn't actually cause the
* object to be destroyed? This would cause this contin's sh_ptr's
* reference count to never reach 0, causing a memory leak.
*/
engine.resetAndAssembleFrame(
std::bind(&AssembleFrameReq::assembleFrameReq2_2,
context.get(), context,
std::placeholders::_1, std::placeholders::_2));
// Set up timeout timer for IOURINGASSM_ENGN_FRAME_ASSEM_TIMEOUT_MS ms
engine.stallTimer.expires_from_now(
boost::posix_time::milliseconds(
IOURINGASSM_ENGN_FRAME_ASSEM_TIMEOUT_MS));
engine.stallTimer.async_wait(
std::bind(&AssembleFrameReq::assembleFrameReq2_1,
context.get(), context,
std::placeholders::_1));
});
if (!started)
if (!engine.shouldAcceptRequests)
{
context->callOriginalCallback(false, sscl::AsynchronousLoop(0));
return;
}
// Initialize loop with number of slots
context->loop = sscl::AsynchronousLoop(engine.frameAssemblyDesc->numSlots);
// Record assembly start time
engine.assemblyStartTime = std::chrono::high_resolution_clock::now();
/** FIXME:
* I'm suspicious of this std::bind return object here. What if us
* setting it to null inside of stop() doesn't actually cause the
* object to be destroyed? This would cause this contin's sh_ptr's
* reference count to never reach 0, causing a memory leak.
*/
engine.resetAndAssembleFrame(
std::bind(&AssembleFrameReq::assembleFrameReq2_2,
context.get(), context,
std::placeholders::_1, std::placeholders::_2));
// Set up timeout timer for IOURINGASSM_ENGN_FRAME_ASSEM_TIMEOUT_MS ms
engine.stallTimer.expires_from_now(
boost::posix_time::milliseconds(
IOURINGASSM_ENGN_FRAME_ASSEM_TIMEOUT_MS));
engine.stallTimer.async_wait(
std::bind(&AssembleFrameReq::assembleFrameReq2_1,
context.get(), context,
std::placeholders::_1));
}
void assembleFrameReq2_1(
@@ -494,22 +498,19 @@ public:
* indeed seen a SEGFAULT even in the current code with locking, so
* I'm going to hold the lock here for now.
*/
auto& canceler = context->engine.ioUringAssemblyEngnCanceler;
const bool shouldContinue = canceler.execUncancelableSegmentOrAbort(
[context]()
{
// Set timer fired flag
context->timerFired.store(true);
context->assembleFrameReq3(context);
});
sscl::SpinLock::Guard lock(context->engine.shouldAcceptRequestsLock);
if (!shouldContinue)
if (!context->engine.shouldAcceptRequests)
{
context->engine.assemblyCycleComplete();
context->loop.setRemainingIterationsToFailure();
context->callOriginalCallback(false, context->loop);
return;
}
// Set timer fired flag
context->timerFired.store(true);
context->assembleFrameReq3(context);
}
void assembleFrameReq2_2(
@@ -517,8 +518,7 @@ public:
void *user_data, int cqe_result)
{
// NB: The lock was acquired by onEventFdRead before calling this func
if (context->engine.ioUringAssemblyEngnCanceler
.isCancellationRequestedUnlocked())
if (!context->engine.shouldAcceptRequests)
{
context->engine.assemblyCycleComplete();
context->loop.setRemainingIterationsToFailure();
@@ -549,8 +549,8 @@ public:
{
/** EXPLANATION:
* All branch paths that invoke this unifyig oracle function are
* expected to already hold ioUringAssemblyEngnCanceler's lock before
* calling it.
* expected to already hold the shouldAcceptRequestsLock before calling
* it.
*/
// Ensure we only execute once using atomic exchange
if (context->handlerExecuted.exchange(true)) { return; }
@@ -638,7 +638,8 @@ void IoUringAssemblyEngine::assembleFrameReq(
sscl::cps::Callback<assembleFrameReqCbFn> cb)
{
{
if (ioUringAssemblyEngnCanceler.isCancellationRequested())
sscl::SpinLock::Guard lock(shouldAcceptRequestsLock);
if (!shouldAcceptRequests)
{
cb.callbackFn(false, sscl::AsynchronousLoop(0));
return;
@@ -669,7 +670,7 @@ void IoUringAssemblyEngine::onEventfdRead(
* IoUringAssemblyEngine's per-assembly state isn't destroyed while this
* handler is running.
*/
sscl::SpinLock::Guard lock(ioUringAssemblyEngnCanceler.s.lock);
sscl::SpinLock::Guard lock(shouldAcceptRequestsLock);
/** EXPLANATION:
* You'd think we should put check for shouldAcceptRequests here and
* `return` here if !shouldAcceptRequests, but we shouldn't because
@@ -721,7 +722,7 @@ void IoUringAssemblyEngine::onEventfdRead(
* But we do put a `return` here because we know that at this point, the
* caller's callback has already been invoked.
*/
if (ioUringAssemblyEngnCanceler.isCancellationRequestedUnlocked()
if (!shouldAcceptRequests
|| eventfdDesc == nullptr || !eventfdDesc->is_open())
{
return;