Async: Drop-in SyncCancelerForAsyncWork without execUncancelableSegment*
We're doing this to prep for the coro port
This commit is contained in:
@@ -59,7 +59,6 @@ IoUringAssemblyEngine::IoUringAssemblyEngine(
|
||||
frameAssemblyDesc(nullptr), ring{},
|
||||
eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0),
|
||||
stallTimer(parent_.device->componentThread->getIoService()),
|
||||
shouldAcceptRequests(false),
|
||||
nDgramsPerStagingBufferFrame(nDgramsPerStagingBufferFrame_),
|
||||
assembledSlotsTracker(nDgramsPerStagingBufferFrame_),
|
||||
randomDevice(), randomGenerator(randomDevice())
|
||||
@@ -68,13 +67,10 @@ randomDevice(), randomGenerator(randomDevice())
|
||||
bool IoUringAssemblyEngine::setup()
|
||||
{
|
||||
// Defensive check to prevent double-calling
|
||||
if (!ioUringAssemblyEngnCanceler.isCancellationRequested())
|
||||
{
|
||||
sscl::SpinLock::Guard lock(shouldAcceptRequestsLock);
|
||||
if (shouldAcceptRequests)
|
||||
{
|
||||
throw std::runtime_error(std::string(__func__) + ": setup() called "
|
||||
"while already set up");
|
||||
}
|
||||
throw std::runtime_error(std::string(__func__) + ": setup() called "
|
||||
"while already set up");
|
||||
}
|
||||
|
||||
// Get FrameAssemblyDesc from staging buffer
|
||||
@@ -156,7 +152,7 @@ bool IoUringAssemblyEngine::setup()
|
||||
if (ret < 0)
|
||||
{ goto cleanup_eventfd; }
|
||||
|
||||
shouldAcceptRequests = true;
|
||||
ioUringAssemblyEngnCanceler.startAcceptingWork();
|
||||
return true;
|
||||
|
||||
cleanup_eventfd:
|
||||
@@ -229,7 +225,7 @@ void IoUringAssemblyEngine::resetAndAssembleFrame(
|
||||
+ ": onCqeReady callback is invalid");
|
||||
}
|
||||
|
||||
if (!shouldAcceptRequests)
|
||||
if (ioUringAssemblyEngnCanceler.isCancellationRequestedUnlocked())
|
||||
{
|
||||
throw std::runtime_error(std::string(__func__)
|
||||
+ ": engine is not accepting requests");
|
||||
@@ -321,11 +317,7 @@ void IoUringAssemblyEngine::resetAndAssembleFrame(
|
||||
|
||||
bool IoUringAssemblyEngine::stop()
|
||||
{
|
||||
// Acquire and release lock tightly around setting the flag
|
||||
sscl::SpinLock::Guard lock(shouldAcceptRequestsLock);
|
||||
bool wasAcceptingRequests = shouldAcceptRequests;
|
||||
shouldAcceptRequests = false;
|
||||
return wasAcceptingRequests;
|
||||
return ioUringAssemblyEngnCanceler.requestStop();
|
||||
}
|
||||
|
||||
void IoUringAssemblyEngine::assemblyCycleComplete()
|
||||
@@ -444,9 +436,9 @@ public:
|
||||
void assembleFrameReq1_posted(
|
||||
std::shared_ptr<AssembleFrameReq> context)
|
||||
{
|
||||
sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
|
||||
sscl::SpinLock::Guard guard(engine.ioUringAssemblyEngnCanceler.s.lock);
|
||||
|
||||
if (!engine.shouldAcceptRequests)
|
||||
if (engine.ioUringAssemblyEngnCanceler.isCancellationRequestedUnlocked())
|
||||
{
|
||||
context->callOriginalCallback(false, sscl::AsynchronousLoop(0));
|
||||
return;
|
||||
@@ -498,9 +490,11 @@ public:
|
||||
* indeed seen a SEGFAULT even in the current code with locking, so
|
||||
* I'm going to hold the lock here for now.
|
||||
*/
|
||||
sscl::SpinLock::Guard lock(context->engine.shouldAcceptRequestsLock);
|
||||
sscl::SpinLock::Guard guard(
|
||||
context->engine.ioUringAssemblyEngnCanceler.s.lock);
|
||||
|
||||
if (!context->engine.shouldAcceptRequests)
|
||||
if (context->engine.ioUringAssemblyEngnCanceler
|
||||
.isCancellationRequestedUnlocked())
|
||||
{
|
||||
context->engine.assemblyCycleComplete();
|
||||
context->loop.setRemainingIterationsToFailure();
|
||||
@@ -518,7 +512,8 @@ public:
|
||||
void *user_data, int cqe_result)
|
||||
{
|
||||
// NB: The lock was acquired by onEventFdRead before calling this func
|
||||
if (!context->engine.shouldAcceptRequests)
|
||||
if (context->engine.ioUringAssemblyEngnCanceler
|
||||
.isCancellationRequestedUnlocked())
|
||||
{
|
||||
context->engine.assemblyCycleComplete();
|
||||
context->loop.setRemainingIterationsToFailure();
|
||||
@@ -549,7 +544,7 @@ public:
|
||||
{
|
||||
/** EXPLANATION:
|
||||
* All branch paths that invoke this unifyig oracle function are
|
||||
* expected to already hold the shouldAcceptRequestsLock before calling
|
||||
* expected to already hold ioUringAssemblyEngnCanceler.s.lock before calling
|
||||
* it.
|
||||
*/
|
||||
// Ensure we only execute once using atomic exchange
|
||||
@@ -638,8 +633,8 @@ void IoUringAssemblyEngine::assembleFrameReq(
|
||||
sscl::cps::Callback<assembleFrameReqCbFn> cb)
|
||||
{
|
||||
{
|
||||
sscl::SpinLock::Guard lock(shouldAcceptRequestsLock);
|
||||
if (!shouldAcceptRequests)
|
||||
sscl::SpinLock::Guard guard(ioUringAssemblyEngnCanceler.s.lock);
|
||||
if (ioUringAssemblyEngnCanceler.isCancellationRequestedUnlocked())
|
||||
{
|
||||
cb.callbackFn(false, sscl::AsynchronousLoop(0));
|
||||
return;
|
||||
@@ -670,7 +665,7 @@ void IoUringAssemblyEngine::onEventfdRead(
|
||||
* IoUringAssemblyEngine's per-assembly state isn't destroyed while this
|
||||
* handler is running.
|
||||
*/
|
||||
sscl::SpinLock::Guard lock(shouldAcceptRequestsLock);
|
||||
sscl::SpinLock::Guard guard(ioUringAssemblyEngnCanceler.s.lock);
|
||||
/** EXPLANATION:
|
||||
* You'd think we should put check for shouldAcceptRequests here and
|
||||
* `return` here if !shouldAcceptRequests, but we shouldn't because
|
||||
@@ -722,7 +717,7 @@ void IoUringAssemblyEngine::onEventfdRead(
|
||||
* But we do put a `return` here because we know that at this point, the
|
||||
* caller's callback has already been invoked.
|
||||
*/
|
||||
if (!shouldAcceptRequests
|
||||
if (ioUringAssemblyEngnCanceler.isCancellationRequestedUnlocked()
|
||||
|| eventfdDesc == nullptr || !eventfdDesc->is_open())
|
||||
{
|
||||
return;
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
#include <spinscale/cps/asynchronousContinuation.h>
|
||||
#include <spinscale/asynchronousLoop.h>
|
||||
#include <spinscale/cps/callback.h>
|
||||
#include <spinscale/spinLock.h>
|
||||
#include <spinscale/syncCancelerForAsyncWork.h>
|
||||
#include <user/frameAssemblyDesc.h>
|
||||
#include <user/stagingBuffer.h>
|
||||
|
||||
@@ -80,12 +80,7 @@ private:
|
||||
boost::asio::deadline_timer stallTimer;
|
||||
// Callback for CQE ntfns (called with user_data+result from each CQE)
|
||||
resetAndAssembleFrameCbFn onCqeReadyCallback;
|
||||
/** EXPLANATION:
|
||||
* Flag to indicate whether engine should accept new requests.
|
||||
* Set by setup(), cleared by stop().
|
||||
*/
|
||||
sscl::SpinLock shouldAcceptRequestsLock;
|
||||
bool shouldAcceptRequests;
|
||||
sscl::SyncCancelerForAsyncWork ioUringAssemblyEngnCanceler;
|
||||
|
||||
size_t nDgramsPerStagingBufferFrame;
|
||||
|
||||
|
||||
@@ -39,7 +39,6 @@ clAverageIntensityBufferClBuffer(nullptr),
|
||||
clAssemblyBuffer(nullptr),
|
||||
clCollationBuffer(nullptr),
|
||||
clAverageIntensityBuffer(nullptr),
|
||||
shouldAcceptRequests(false),
|
||||
compactIsRunning(false),
|
||||
collateIsRunning(false),
|
||||
currentCompactKernelEvent(nullptr), currentCollateKernelEvent(nullptr),
|
||||
@@ -64,13 +63,10 @@ OpenClCollatingAndMeshingEngine::~OpenClCollatingAndMeshingEngine()
|
||||
bool OpenClCollatingAndMeshingEngine::setup()
|
||||
{
|
||||
// Defensive check to prevent double-calling
|
||||
if (!openClCollMeshEngnCanceler.isCancellationRequested())
|
||||
{
|
||||
sscl::SpinLock::Guard lock(shouldAcceptRequestsLock);
|
||||
if (shouldAcceptRequests)
|
||||
{
|
||||
throw std::runtime_error(std::string(__func__) + ": setup() called "
|
||||
"while already set up");
|
||||
}
|
||||
throw std::runtime_error(std::string(__func__) + ": setup() called "
|
||||
"while already set up");
|
||||
}
|
||||
|
||||
if (!smoHooksPtr || !smoHooksPtr->ComputeManager_getDevice)
|
||||
@@ -202,13 +198,13 @@ bool OpenClCollatingAndMeshingEngine::setup()
|
||||
|
||||
clFlush(computeDevice->commandQueue);
|
||||
clFinish(computeDevice->commandQueue);
|
||||
shouldAcceptRequests = true;
|
||||
openClCollMeshEngnCanceler.startAcceptingWork();
|
||||
return true;
|
||||
}
|
||||
|
||||
void OpenClCollatingAndMeshingEngine::finalize()
|
||||
{
|
||||
// Call stop() to set shouldAcceptRequests to false and get previous state
|
||||
// Call stop() to clear shouldContinue and get previous state
|
||||
bool wasAcceptingRequests = stop();
|
||||
(void)wasAcceptingRequests;
|
||||
|
||||
@@ -771,11 +767,7 @@ bool OpenClCollatingAndMeshingEngine::setupCollateDgramsArgs(
|
||||
|
||||
bool OpenClCollatingAndMeshingEngine::stop()
|
||||
{
|
||||
// Acquire and release lock tightly around setting the flag
|
||||
sscl::SpinLock::Guard lock(shouldAcceptRequestsLock);
|
||||
bool wasAcceptingRequests = shouldAcceptRequests;
|
||||
shouldAcceptRequests = false;
|
||||
return wasAcceptingRequests;
|
||||
return openClCollMeshEngnCanceler.requestStop();
|
||||
}
|
||||
|
||||
void OpenClCollatingAndMeshingEngine::compactKernelComplete(bool isFinalizing)
|
||||
@@ -1051,8 +1043,8 @@ public:
|
||||
void compactCollateAndMeshFrameReq1_doCompact_posted(
|
||||
std::shared_ptr<CompactCollateAndMeshFrameReq> context)
|
||||
{
|
||||
sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
|
||||
if (!engine.shouldAcceptRequests)
|
||||
sscl::SpinLock::Guard guard(engine.openClCollMeshEngnCanceler.s.lock);
|
||||
if (engine.openClCollMeshEngnCanceler.isCancellationRequestedUnlocked())
|
||||
{
|
||||
callOriginalCallback(false);
|
||||
return;
|
||||
@@ -1082,8 +1074,8 @@ public:
|
||||
std::shared_ptr<CompactCollateAndMeshFrameReq> context,
|
||||
cl_int compactStatus)
|
||||
{
|
||||
sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
|
||||
if (!engine.shouldAcceptRequests)
|
||||
sscl::SpinLock::Guard guard(engine.openClCollMeshEngnCanceler.s.lock);
|
||||
if (engine.openClCollMeshEngnCanceler.isCancellationRequestedUnlocked())
|
||||
{
|
||||
/** EXPLANATION:
|
||||
* We intentionally don't call compactKernelComplete() here because
|
||||
@@ -1116,15 +1108,15 @@ public:
|
||||
}
|
||||
#endif
|
||||
|
||||
lock.unlockPrematurely();
|
||||
guard.unlockPrematurely();
|
||||
context->compactCollateAndMeshFrameReq3_doCollate_posted(context);
|
||||
}
|
||||
|
||||
void compactCollateAndMeshFrameReq3_doCollate_posted(
|
||||
std::shared_ptr<CompactCollateAndMeshFrameReq> context)
|
||||
{
|
||||
sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
|
||||
if (!engine.shouldAcceptRequests)
|
||||
sscl::SpinLock::Guard guard(engine.openClCollMeshEngnCanceler.s.lock);
|
||||
if (engine.openClCollMeshEngnCanceler.isCancellationRequestedUnlocked())
|
||||
{
|
||||
callOriginalCallback(false);
|
||||
return;
|
||||
@@ -1155,8 +1147,8 @@ public:
|
||||
[[maybe_unused]] std::shared_ptr<CompactCollateAndMeshFrameReq> context,
|
||||
cl_int collateStatus)
|
||||
{
|
||||
sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
|
||||
if (!engine.shouldAcceptRequests)
|
||||
sscl::SpinLock::Guard guard(engine.openClCollMeshEngnCanceler.s.lock);
|
||||
if (engine.openClCollMeshEngnCanceler.isCancellationRequestedUnlocked())
|
||||
{
|
||||
/* We intentionally don't call collateKernelComplete() here for the
|
||||
* same reason as above.
|
||||
@@ -1256,8 +1248,8 @@ void OpenClCollatingAndMeshingEngine::compactCollateAndMeshFrameReq(
|
||||
sscl::cps::Callback<compactCollateAndMeshFrameReqCbFn> callback)
|
||||
{
|
||||
{
|
||||
sscl::SpinLock::Guard lock(shouldAcceptRequestsLock);
|
||||
if (!shouldAcceptRequests)
|
||||
sscl::SpinLock::Guard guard(openClCollMeshEngnCanceler.s.lock);
|
||||
if (openClCollMeshEngnCanceler.isCancellationRequestedUnlocked())
|
||||
{
|
||||
callback.callbackFn(false, stimulusFrame);
|
||||
return;
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
#include <CL/cl.h>
|
||||
#include <spinscale/asynchronousLoop.h>
|
||||
#include <spinscale/cps/callback.h>
|
||||
#include <spinscale/spinLock.h>
|
||||
#include <spinscale/syncCancelerForAsyncWork.h>
|
||||
#include <user/stimulusFrame.h>
|
||||
#include <user/stagingBuffer.h>
|
||||
#include <user/frameAssemblyDesc.h>
|
||||
@@ -150,8 +150,7 @@ private:
|
||||
cl_mem clAverageIntensityBuffer;
|
||||
|
||||
// State tracking
|
||||
sscl::SpinLock shouldAcceptRequestsLock;
|
||||
bool shouldAcceptRequests;
|
||||
sscl::SyncCancelerForAsyncWork openClCollMeshEngnCanceler;
|
||||
bool compactIsRunning;
|
||||
bool collateIsRunning;
|
||||
cl_event currentCompactKernelEvent;
|
||||
|
||||
@@ -464,8 +464,10 @@ public:
|
||||
void produceFrameReq1_doAssemble_posted(
|
||||
std::shared_ptr<ProduceFrameReq> context)
|
||||
{
|
||||
sscl::SpinLock::Guard lock(pcloudProducer.shouldContinueLock);
|
||||
if (!pcloudProducer.shouldContinue)
|
||||
sscl::SpinLock::Guard guard(
|
||||
pcloudProducer.stimulusProducerCanceler.s.lock);
|
||||
if (pcloudProducer.stimulusProducerCanceler
|
||||
.isCancellationRequestedUnlocked())
|
||||
{
|
||||
callOriginalCallback();
|
||||
return;
|
||||
@@ -482,8 +484,10 @@ public:
|
||||
std::shared_ptr<ProduceFrameReq> context,
|
||||
bool success, sscl::AsynchronousLoop loop)
|
||||
{
|
||||
sscl::SpinLock::Guard lock(pcloudProducer.shouldContinueLock);
|
||||
if (!pcloudProducer.shouldContinue)
|
||||
sscl::SpinLock::Guard guard(
|
||||
pcloudProducer.stimulusProducerCanceler.s.lock);
|
||||
if (pcloudProducer.stimulusProducerCanceler
|
||||
.isCancellationRequestedUnlocked())
|
||||
{
|
||||
callOriginalCallback();
|
||||
return;
|
||||
@@ -615,8 +619,10 @@ public:
|
||||
context->darkAmbienceStimFrame->get().lock.writeRelease();
|
||||
}
|
||||
|
||||
sscl::SpinLock::Guard lock(pcloudProducer.shouldContinueLock);
|
||||
if (!pcloudProducer.shouldContinue)
|
||||
sscl::SpinLock::Guard guard(
|
||||
pcloudProducer.stimulusProducerCanceler.s.lock);
|
||||
if (pcloudProducer.stimulusProducerCanceler
|
||||
.isCancellationRequestedUnlocked())
|
||||
{
|
||||
callOriginalCallback();
|
||||
return;
|
||||
@@ -626,7 +632,7 @@ public:
|
||||
std::cerr << __func__ << ": Failed to compact and collate frame" << std::endl;
|
||||
} else
|
||||
{
|
||||
lock.unlockPrematurely();
|
||||
guard.unlockPrematurely();
|
||||
if (pcloudProducer.pcloudFrameDumper.isEnabled())
|
||||
{
|
||||
try
|
||||
@@ -748,7 +754,7 @@ void PcloudStimulusProducer::produceFrameReq(
|
||||
sscl::cps::Callback<produceFrameReqCbFn> callback)
|
||||
{
|
||||
/** EXPLANATION:
|
||||
* We shouldn't acquire the StimulusProducer::shouldContinueLock here because
|
||||
* We shouldn't acquire stimulusProducerCanceler.s.lock here because
|
||||
* this function is called from
|
||||
* StimulusProducer::stimFrameProductionTimesliceInd(), which is already
|
||||
* holding the lock.
|
||||
|
||||
Reference in New Issue
Block a user