LivoxGen1: Use syncCancelerForAsyncWork in producer pipeline

This commit is contained in:
2026-05-29 14:10:45 -04:00
parent 5a9fe12057
commit d788810a05
7 changed files with 507 additions and 442 deletions
@@ -8,7 +8,6 @@
#include <boost/system/error_code.hpp>
#include <opts.h>
#include <componentThread.h>
#include <spinscale/spinLock.h>
#include <user/stimulusProducer.h>
#include <user/stimulusBuffer.h>
@@ -91,10 +90,7 @@ void StimulusProducer::destroyAttachedStimulusBuffer(
void StimulusProducer::stop()
{
{
sscl::SpinLock::Guard lock(shouldContinueLock);
shouldContinue = false;
}
(void)stimulusProducerCanceler.requestStop();
// Cancel timer immediately
timer.cancel();
@@ -105,7 +101,7 @@ void StimulusProducer::stop()
void StimulusProducer::scheduleNextTimeout(int delayMs)
{
if (!shouldContinue)
if (stimulusProducerCanceler.isCancellationRequestedUnlocked())
{ return; }
// Schedule the next timeout using the provided delay
@@ -131,10 +127,6 @@ void StimulusProducer::onTimeout(const boost::system::error_code& error)
return;
}
sscl::SpinLock::Guard lock(shouldContinueLock);
if (!shouldContinue)
{ return; }
/** EXPLANATION:
* We need to ensure that there's only ever one stimframe being produced
* during any CONFIG_STIMBUFF_FRAME_PERIOD_MS period. To guarantee this, we
@@ -148,6 +140,10 @@ void StimulusProducer::onTimeout(const boost::system::error_code& error)
*/
int nextWakeupDelayMs;
bool deferred = false;
bool shouldContinue;
shouldContinue = stimulusProducerCanceler.execUncancelableSegmentOrAbort(
[&]()
{
if (frameAssemblyRateLimiter.tryAcquire())
{
nextWakeupDelayMs = CONFIG_STIMBUFF_FRAME_PERIOD_MS;
@@ -174,9 +170,9 @@ void StimulusProducer::onTimeout(const boost::system::error_code& error)
* release the lock when frame production completes
*/
stimFrameProductionTimesliceInd();
return;
}
else
{
nextWakeupDelayMs = CONFIG_STIMBUFF_FRAME_RETRY_DELAY_MS;
deferred = true;
@@ -189,7 +185,9 @@ void StimulusProducer::onTimeout(const boost::system::error_code& error)
"Configured deferral period: " << nextWakeupDelayMs << "ms"
<< std::endl;
}
}
});
if (!shouldContinue) { return; }
scheduleNextTimeout(nextWakeupDelayMs);
+4 -5
View File
@@ -14,6 +14,7 @@
#include <boost/asio/io_service.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <spinscale/spinLock.h>
#include <spinscale/syncCancelerForAsyncWork.h>
#include "deviceAttachmentSpec.h"
namespace smo {
@@ -40,8 +41,7 @@ public:
&deviceAttachmentSpec,
boost::asio::io_service& ioService_)
: deviceAttachmentSpec(deviceAttachmentSpec),
ioService(ioService_),
shouldContinue(false), timer(ioService),
ioService(ioService_), timer(ioService),
nDeferrals(0)
{}
@@ -59,7 +59,7 @@ public:
std::cout << __func__ << ": Starting stimulus producer for device "
<< deviceAttachmentSpec->deviceSelector << std::endl;
shouldContinue = true;
stimulusProducerCanceler.startAcceptingWork();
nDeferrals = 0;
scheduleNextTimeout();
}
@@ -109,8 +109,7 @@ public:
private:
boost::asio::io_service& ioService;
protected:
sscl::SpinLock shouldContinueLock;
bool shouldContinue;
sscl::SyncCancelerForAsyncWork stimulusProducerCanceler;
private:
boost::asio::deadline_timer timer;
size_t nDeferrals;
@@ -22,7 +22,6 @@
#include <spinscale/cps/asynchronousBridge.h>
#include <spinscale/cps/callback.h>
#include <spinscale/cps/callableTracer.h>
#include <spinscale/spinLock.h>
#include "ioUringAssemblyEngine.h"
#include "pcloudStimulusProducer.h"
#include "livoxGen1.h"
@@ -59,7 +58,6 @@ IoUringAssemblyEngine::IoUringAssemblyEngine(
frameAssemblyDesc(nullptr), ring{},
eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0),
stallTimer(parent_.device->componentThread->getIoService()),
shouldAcceptRequests(false),
nDgramsPerStagingBufferFrame(nDgramsPerStagingBufferFrame_),
assembledSlotsTracker(nDgramsPerStagingBufferFrame_),
randomDevice(), randomGenerator(randomDevice())
@@ -68,14 +66,11 @@ randomDevice(), randomGenerator(randomDevice())
bool IoUringAssemblyEngine::setup()
{
// Defensive check to prevent double-calling
{
sscl::SpinLock::Guard lock(shouldAcceptRequestsLock);
if (shouldAcceptRequests)
if (!ioUringAssemblyEngnCanceler.isCancellationRequested())
{
throw std::runtime_error(std::string(__func__) + ": setup() called "
"while already set up");
}
}
// Get FrameAssemblyDesc from staging buffer
frameAssemblyDesc = static_cast<std::shared_ptr<FrameAssemblyDesc>>(
@@ -156,7 +151,7 @@ bool IoUringAssemblyEngine::setup()
if (ret < 0)
{ goto cleanup_eventfd; }
shouldAcceptRequests = true;
ioUringAssemblyEngnCanceler.startAcceptingWork();
return true;
cleanup_eventfd:
@@ -229,7 +224,7 @@ void IoUringAssemblyEngine::resetAndAssembleFrame(
+ ": onCqeReady callback is invalid");
}
if (!shouldAcceptRequests)
if (ioUringAssemblyEngnCanceler.isCancellationRequestedUnlocked())
{
throw std::runtime_error(std::string(__func__)
+ ": engine is not accepting requests");
@@ -321,11 +316,7 @@ void IoUringAssemblyEngine::resetAndAssembleFrame(
bool IoUringAssemblyEngine::stop()
{
// Acquire and release lock tightly around setting the flag
sscl::SpinLock::Guard lock(shouldAcceptRequestsLock);
bool wasAcceptingRequests = shouldAcceptRequests;
shouldAcceptRequests = false;
return wasAcceptingRequests;
return ioUringAssemblyEngnCanceler.requestStop();
}
void IoUringAssemblyEngine::assemblyCycleComplete()
@@ -444,19 +435,17 @@ public:
void assembleFrameReq1_posted(
std::shared_ptr<AssembleFrameReq> context)
{
sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
if (!engine.shouldAcceptRequests)
auto& canceler = engine.ioUringAssemblyEngnCanceler;
const bool started = canceler.execUncancelableSegmentOrAbort(
[context, this]()
{
context->callOriginalCallback(false, sscl::AsynchronousLoop(0));
return;
}
// Initialize loop with number of slots
context->loop = sscl::AsynchronousLoop(engine.frameAssemblyDesc->numSlots);
context->loop = sscl::AsynchronousLoop(
engine.frameAssemblyDesc->numSlots);
// Record assembly start time
engine.assemblyStartTime = std::chrono::high_resolution_clock::now();
engine.assemblyStartTime =
std::chrono::high_resolution_clock::now();
/** FIXME:
* I'm suspicious of this std::bind return object here. What if us
@@ -477,6 +466,13 @@ public:
std::bind(&AssembleFrameReq::assembleFrameReq2_1,
context.get(), context,
std::placeholders::_1));
});
if (!started)
{
context->callOriginalCallback(false, sscl::AsynchronousLoop(0));
return;
}
}
void assembleFrameReq2_1(
@@ -498,19 +494,22 @@ public:
* indeed seen a SEGFAULT even in the current code with locking, so
* I'm going to hold the lock here for now.
*/
sscl::SpinLock::Guard lock(context->engine.shouldAcceptRequestsLock);
auto& canceler = context->engine.ioUringAssemblyEngnCanceler;
const bool shouldContinue = canceler.execUncancelableSegmentOrAbort(
[context]()
{
// Set timer fired flag
context->timerFired.store(true);
context->assembleFrameReq3(context);
});
if (!context->engine.shouldAcceptRequests)
if (!shouldContinue)
{
context->engine.assemblyCycleComplete();
context->loop.setRemainingIterationsToFailure();
context->callOriginalCallback(false, context->loop);
return;
}
// Set timer fired flag
context->timerFired.store(true);
context->assembleFrameReq3(context);
}
void assembleFrameReq2_2(
@@ -518,7 +517,8 @@ public:
void *user_data, int cqe_result)
{
// NB: The lock was acquired by onEventFdRead before calling this func
if (!context->engine.shouldAcceptRequests)
if (context->engine.ioUringAssemblyEngnCanceler
.isCancellationRequestedUnlocked())
{
context->engine.assemblyCycleComplete();
context->loop.setRemainingIterationsToFailure();
@@ -549,8 +549,8 @@ public:
{
/** EXPLANATION:
* All branch paths that invoke this unifyig oracle function are
* expected to already hold the shouldAcceptRequestsLock before calling
* it.
* expected to already hold ioUringAssemblyEngnCanceler's lock before
* calling it.
*/
// Ensure we only execute once using atomic exchange
if (context->handlerExecuted.exchange(true)) { return; }
@@ -638,8 +638,7 @@ void IoUringAssemblyEngine::assembleFrameReq(
sscl::cps::Callback<assembleFrameReqCbFn> cb)
{
{
sscl::SpinLock::Guard lock(shouldAcceptRequestsLock);
if (!shouldAcceptRequests)
if (ioUringAssemblyEngnCanceler.isCancellationRequested())
{
cb.callbackFn(false, sscl::AsynchronousLoop(0));
return;
@@ -670,7 +669,7 @@ void IoUringAssemblyEngine::onEventfdRead(
* IoUringAssemblyEngine's per-assembly state isn't destroyed while this
* handler is running.
*/
sscl::SpinLock::Guard lock(shouldAcceptRequestsLock);
sscl::SpinLock::Guard lock(ioUringAssemblyEngnCanceler.s.lock);
/** EXPLANATION:
* You'd think we should put check for shouldAcceptRequests here and
* `return` here if !shouldAcceptRequests, but we shouldn't because
@@ -722,7 +721,7 @@ void IoUringAssemblyEngine::onEventfdRead(
* But we do put a `return` here because we know that at this point, the
* caller's callback has already been invoked.
*/
if (!shouldAcceptRequests
if (ioUringAssemblyEngnCanceler.isCancellationRequestedUnlocked()
|| eventfdDesc == nullptr || !eventfdDesc->is_open())
{
return;
@@ -19,7 +19,7 @@
#include <spinscale/cps/asynchronousContinuation.h>
#include <spinscale/asynchronousLoop.h>
#include <spinscale/cps/callback.h>
#include <spinscale/spinLock.h>
#include <spinscale/syncCancelerForAsyncWork.h>
#include <user/frameAssemblyDesc.h>
#include <user/stagingBuffer.h>
@@ -80,12 +80,7 @@ private:
boost::asio::deadline_timer stallTimer;
// Callback for CQE ntfns (called with user_data+result from each CQE)
resetAndAssembleFrameCbFn onCqeReadyCallback;
/** EXPLANATION:
* Flag to indicate whether engine should accept new requests.
* Set by setup(), cleared by stop().
*/
sscl::SpinLock shouldAcceptRequestsLock;
bool shouldAcceptRequests;
sscl::SyncCancelerForAsyncWork ioUringAssemblyEngnCanceler;
size_t nDgramsPerStagingBufferFrame;
@@ -39,7 +39,6 @@ clAverageIntensityBufferClBuffer(nullptr),
clAssemblyBuffer(nullptr),
clCollationBuffer(nullptr),
clAverageIntensityBuffer(nullptr),
shouldAcceptRequests(false),
compactIsRunning(false),
collateIsRunning(false),
currentCompactKernelEvent(nullptr), currentCollateKernelEvent(nullptr),
@@ -64,14 +63,11 @@ OpenClCollatingAndMeshingEngine::~OpenClCollatingAndMeshingEngine()
bool OpenClCollatingAndMeshingEngine::setup()
{
// Defensive check to prevent double-calling
{
sscl::SpinLock::Guard lock(shouldAcceptRequestsLock);
if (shouldAcceptRequests)
if (!openClCollMeshEngnCanceler.isCancellationRequested())
{
throw std::runtime_error(std::string(__func__) + ": setup() called "
"while already set up");
}
}
if (!smoHooksPtr || !smoHooksPtr->ComputeManager_getDevice)
{
@@ -202,7 +198,7 @@ bool OpenClCollatingAndMeshingEngine::setup()
clFlush(computeDevice->commandQueue);
clFinish(computeDevice->commandQueue);
shouldAcceptRequests = true;
openClCollMeshEngnCanceler.startAcceptingWork();
return true;
}
@@ -771,11 +767,7 @@ bool OpenClCollatingAndMeshingEngine::setupCollateDgramsArgs(
bool OpenClCollatingAndMeshingEngine::stop()
{
// Acquire and release lock tightly around setting the flag
sscl::SpinLock::Guard lock(shouldAcceptRequestsLock);
bool wasAcceptingRequests = shouldAcceptRequests;
shouldAcceptRequests = false;
return wasAcceptingRequests;
return openClCollMeshEngnCanceler.requestStop();
}
void OpenClCollatingAndMeshingEngine::compactKernelComplete(bool isFinalizing)
@@ -1051,19 +1043,18 @@ public:
void compactCollateAndMeshFrameReq1_doCompact_posted(
std::shared_ptr<CompactCollateAndMeshFrameReq> context)
{
sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
if (!engine.shouldAcceptRequests)
auto& canceler = engine.openClCollMeshEngnCanceler;
const bool shouldContinue = canceler.execUncancelableSegmentOrAbort(
[context, this]()
{
callOriginalCallback(false);
return;
}
// Record compact kernel start time
engine.compactKernelStartTime = std::chrono::high_resolution_clock::now();
engine.compactKernelStartTime =
std::chrono::high_resolution_clock::now();
bool success = engine.startCompactKernel(
engine.parent.assemblyBuffer,
static_cast<uint32_t>(context->frameAssemblyResult.nSucceeded.load()),
static_cast<uint32_t>(
context->frameAssemblyResult.nSucceeded.load()),
std::bind(
&CompactCollateAndMeshFrameReq
::compactCollateAndMeshFrameReq2_compactDone_posted,
@@ -1074,6 +1065,12 @@ public:
{
engine.compactKernelComplete();
callOriginalCallback(false);
}
});
if (!shouldContinue)
{
callOriginalCallback(false);
return;
}
}
@@ -1082,8 +1079,27 @@ public:
std::shared_ptr<CompactCollateAndMeshFrameReq> context,
cl_int compactStatus)
{
sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
if (!engine.shouldAcceptRequests)
bool compactFailed = false;
auto& canceler = engine.openClCollMeshEngnCanceler;
const bool shouldContinue = canceler.execUncancelableSegmentOrAbort(
[context, this, compactStatus, &compactFailed]()
{
engine.compactKernelComplete();
// Record compact kernel end time
engine.compactKernelEndTime =
std::chrono::high_resolution_clock::now();
// If compact failed, call callback directly with failure
if (compactStatus != CL_SUCCESS)
{
compactFailed = true;
callOriginalCallback(false);
return;
}
});
if (!shouldContinue)
{
/** EXPLANATION:
* We intentionally don't call compactKernelComplete() here because
@@ -1095,16 +1111,7 @@ public:
return;
}
engine.compactKernelComplete();
// Record compact kernel end time
engine.compactKernelEndTime = std::chrono::high_resolution_clock::now();
// If compact failed, call callback directly with failure
if (compactStatus != CL_SUCCESS)
{
callOriginalCallback(false);
return;
}
if (compactFailed) { return; }
#if 0
// Print first 4 bytes of each slot
@@ -1116,22 +1123,19 @@ public:
}
#endif
lock.unlockPrematurely();
context->compactCollateAndMeshFrameReq3_doCollate_posted(context);
}
void compactCollateAndMeshFrameReq3_doCollate_posted(
std::shared_ptr<CompactCollateAndMeshFrameReq> context)
{
sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
if (!engine.shouldAcceptRequests)
auto& canceler = engine.openClCollMeshEngnCanceler;
const bool shouldContinue = canceler.execUncancelableSegmentOrAbort(
[context, this]()
{
callOriginalCallback(false);
return;
}
// Record collate kernel start time
engine.collateKernelStartTime = std::chrono::high_resolution_clock::now();
engine.collateKernelStartTime =
std::chrono::high_resolution_clock::now();
bool success = engine.startCollateKernel(
context->intensityStimFrame, context->anyAmbienceAttached(),
@@ -1147,6 +1151,12 @@ public:
context->intensityStimFrame, context->anyAmbienceAttached());
callOriginalCallback(false);
}
});
if (!shouldContinue)
{
callOriginalCallback(false);
return;
}
}
@@ -1155,16 +1165,6 @@ public:
[[maybe_unused]] std::shared_ptr<CompactCollateAndMeshFrameReq> context,
cl_int collateStatus)
{
sscl::SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
if (!engine.shouldAcceptRequests)
{
/* We intentionally don't call collateKernelComplete() here for the
* same reason as above.
*/
callOriginalCallback(false);
return;
}
/** EXPLANATION:
* The reason we don't call collateKernelComplete before checking
* shouldAcceptRequests is because if shouldAcceptRequests is false, then
@@ -1174,6 +1174,10 @@ public:
* Therefore it's finalize()'s responsibility to ensure that it properly
* completes/cleans up any in-flight operations.
*/
auto& canceler = engine.openClCollMeshEngnCanceler;
const bool shouldContinue = canceler.execUncancelableSegmentOrAbort(
[context, this, collateStatus]()
{
engine.collateKernelComplete(
context->intensityStimFrame, context->anyAmbienceAttached());
@@ -1199,7 +1203,8 @@ public:
}
// Record collate kernel end time
engine.collateKernelEndTime = std::chrono::high_resolution_clock::now();
engine.collateKernelEndTime =
std::chrono::high_resolution_clock::now();
bool success = (collateStatus == CL_SUCCESS);
@@ -1234,7 +1239,7 @@ public:
}
(void)highIntensityCount;
#if 0
#if 0
std::cout << __func__ << ": intensityRingBufferIndex="
<< (context->intensityStimFrame.has_value() ?
context->intensityStimFrame->get().ringBufferIndex : SIZE_MAX)
@@ -1242,9 +1247,19 @@ public:
<< ", nSucceeded=" << nSucceeded
<< ", totalPoints=" << totalPoints
<< ", highIntensityCount=" << highIntensityCount << std::endl;
#endif
#endif
callOriginalCallback(success);
});
if (!shouldContinue)
{
/* We intentionally don't call collateKernelComplete() here for the
* same reason as above.
*/
callOriginalCallback(false);
return;
}
}
};
@@ -1256,8 +1271,7 @@ void OpenClCollatingAndMeshingEngine::compactCollateAndMeshFrameReq(
sscl::cps::Callback<compactCollateAndMeshFrameReqCbFn> callback)
{
{
sscl::SpinLock::Guard lock(shouldAcceptRequestsLock);
if (!shouldAcceptRequests)
if (openClCollMeshEngnCanceler.isCancellationRequested())
{
callback.callbackFn(false, stimulusFrame);
return;
@@ -15,7 +15,7 @@
#include <CL/cl.h>
#include <spinscale/asynchronousLoop.h>
#include <spinscale/cps/callback.h>
#include <spinscale/spinLock.h>
#include <spinscale/syncCancelerForAsyncWork.h>
#include <user/stimulusFrame.h>
#include <user/stagingBuffer.h>
#include <user/frameAssemblyDesc.h>
@@ -150,8 +150,7 @@ private:
cl_mem clAverageIntensityBuffer;
// State tracking
sscl::SpinLock shouldAcceptRequestsLock;
bool shouldAcceptRequests;
sscl::SyncCancelerForAsyncWork openClCollMeshEngnCanceler;
bool compactIsRunning;
bool collateIsRunning;
cl_event currentCompactKernelEvent;
+101 -40
View File
@@ -464,37 +464,39 @@ public:
void produceFrameReq1_doAssemble_posted(
std::shared_ptr<ProduceFrameReq> context)
{
sscl::SpinLock::Guard lock(pcloudProducer.shouldContinueLock);
if (!pcloudProducer.shouldContinue)
const bool shouldContinue = pcloudProducer.stimulusProducerCanceler
.execUncancelableSegmentOrAbort(
[this, context]()
{
callOriginalCallback();
return;
}
pcloudProducer.ioUringAssemblyEngine.assembleFrameReq(
{context, std::bind(
&ProduceFrameReq::produceFrameReq2_assembleDone,
context.get(), context,
std::placeholders::_1, std::placeholders::_2)});
});
if (!shouldContinue)
{
callOriginalCallback();
return;
}
}
void produceFrameReq2_assembleDone(
std::shared_ptr<ProduceFrameReq> context,
bool success, sscl::AsynchronousLoop loop)
{
sscl::SpinLock::Guard lock(pcloudProducer.shouldContinueLock);
if (!pcloudProducer.shouldContinue)
bool shouldContinue = pcloudProducer.stimulusProducerCanceler
.execUncancelableSegmentOrAbort(
[this, context, success, loop]()
{
callOriginalCallback();
return;
}
if (!success)
{
callOriginalCallback();
if (pcloudProducer.attachedStimulusBuffers.size() > 0) {
std::cerr << __func__ << ": Failed to assemble frame.\n";
std::cerr << __func__
<< ": Failed to assemble frame.\n";
}
return;
}
@@ -502,7 +504,8 @@ public:
context->frameAssemblyResult = loop;
// Check if intensity buffer is attached and acquire frame if so
if (auto intensityBuff = pcloudProducer.intensityStimulusBuffer.load(
if (auto intensityBuff = pcloudProducer
.intensityStimulusBuffer.load(
std::memory_order_acquire))
{
size_t intensityRingbuffIndex = intensityBuff
@@ -521,7 +524,8 @@ public:
}
// Check if light ambience buffer is attached and acquire frame if so
std::optional<AmbienceProductionDesc> lightAmbienceProductionDescDesc;
std::optional<AmbienceProductionDesc>
lightAmbienceProductionDescDesc;
if (auto lightAmbienceBuff =
pcloudProducer.lightAmbienceStimulusBuffer.load(
std::memory_order_acquire))
@@ -529,22 +533,26 @@ public:
size_t lightAmbienceRingbuffIndex = lightAmbienceBuff
->ringBuffer.getIndexToProduceInto();
StimulusFrame& lightAmbienceStimFrame = lightAmbienceBuff
->ringBuffer.getDataAtSlot(lightAmbienceRingbuffIndex);
StimulusFrame& lightAmbienceStimFrame =
lightAmbienceBuff->ringBuffer.getDataAtSlot(
lightAmbienceRingbuffIndex);
lightAmbienceStimFrame.lock.writeAcquire();
context->lightAmbienceStimFrame = std::make_optional(
std::ref(lightAmbienceStimFrame));
lightAmbienceProductionDescDesc = AmbienceProductionDesc{
lightAmbienceProductionDescDesc =
AmbienceProductionDesc{
std::ref(lightAmbienceStimFrame),
lightAmbienceBuff->passbandCountGtComparator};
lightAmbienceBuff
->passbandCountGtComparator};
}
else {
context->lightAmbienceStimFrame = std::nullopt;
}
// Check if dark ambience buffer is attached and acquire frame if so
std::optional<AmbienceProductionDesc> darkAmbienceProductionDescDesc;
std::optional<AmbienceProductionDesc>
darkAmbienceProductionDescDesc;
if (auto darkAmbienceBuff =
pcloudProducer.darkAmbienceStimulusBuffer.load(
std::memory_order_acquire))
@@ -552,29 +560,41 @@ public:
size_t darkAmbienceRingbuffIndex = darkAmbienceBuff
->ringBuffer.getIndexToProduceInto();
StimulusFrame& darkAmbienceStimFrame = darkAmbienceBuff
->ringBuffer.getDataAtSlot(darkAmbienceRingbuffIndex);
StimulusFrame& darkAmbienceStimFrame =
darkAmbienceBuff->ringBuffer.getDataAtSlot(
darkAmbienceRingbuffIndex);
darkAmbienceStimFrame.lock.writeAcquire();
context->darkAmbienceStimFrame = std::make_optional(
std::ref(darkAmbienceStimFrame));
darkAmbienceProductionDescDesc = AmbienceProductionDesc{
darkAmbienceProductionDescDesc =
AmbienceProductionDesc{
std::ref(darkAmbienceStimFrame),
darkAmbienceBuff->passbandCountLtComparator};
darkAmbienceBuff
->passbandCountLtComparator};
}
else {
context->darkAmbienceStimFrame = std::nullopt;
}
pcloudProducer.openClCollatingAndMeshingEngine.compactCollateAndMeshFrameReq(
loop, stimulusFrame,
pcloudProducer.openClCollatingAndMeshingEngine
.compactCollateAndMeshFrameReq(
context->frameAssemblyResult, stimulusFrame,
context->intensityStimFrame,
std::move(lightAmbienceProductionDescDesc),
std::move(darkAmbienceProductionDescDesc),
{context, std::bind(
&ProduceFrameReq::produceFrameReq3_compactCollateDone,
&ProduceFrameReq
::produceFrameReq3_compactCollateDone,
context.get(), context,
std::placeholders::_1, std::placeholders::_2)});
});
if (!shouldContinue)
{
callOriginalCallback();
return;
}
}
void produceFrameReq3_compactCollateDone(
@@ -603,7 +623,10 @@ public:
}
#endif
// Release intensity frame if it was used
/** EXPLANATION:
* Release intensity/ambience frames if they were supplied/used,
* regardless of whether or not a cancelation request occurred.
*/
if (context->intensityStimFrame.has_value()) {
context->intensityStimFrame->get().lock.writeRelease();
}
@@ -615,18 +638,58 @@ public:
context->darkAmbienceStimFrame->get().lock.writeRelease();
}
sscl::SpinLock::Guard lock(pcloudProducer.shouldContinueLock);
if (!pcloudProducer.shouldContinue)
if (!success)
{
callOriginalCallback();
std::cerr << __func__
<< ": Failed to compact and collate frame"
<< std::endl;
return;
}
/** EXPLANATION:
* Cancellation early exit for the success path. Analogous to the
* (legacy, removed) pre-canceler check under shouldContinueLock before
* lock.unlockPrematurely() in the former CPS version of this handler.
*
* Assumptions that make the unlocked tail (dump, debug logging, stage
* durations, final callOriginalCallback()) safe without holding
* stimulusProducerCanceler.s.lock:
*
* 1. This handler is only entered from
* CompactCollateAndMeshFrameReq4's execUncancelableSegmentOrAbort
* body, so openClCollatingAndMeshingEngine::finalize() cannot
* tear down OpenCL/engine state until this function returns.
*
* 2. PcloudStimulusProducer::stop() finalizes OpenCL before io_uring,
* so getCompactKernelDuration()/getCollateKernelDuration() and
* getAssemblyDuration() are not racing engine finalize here.
*
* 3. dump/logging reads only producer-owned state (pcloudFrameDumper,
* collationBuffer, device, context->frameAssemblyResult) that
* stop()/finalize() do not destroy.
*
* 4. A stop() that lands after this read but before the tail is
* intentional stale work (same as unlockPrematurely()), not UAF.
*
* Rework to run portions inside stimulusProducerCanceler
* execUncancelableSegmentOrAbort if any of the above ceases to hold:
* e.g. this callback is invoked outside the OpenCL uncancelable
* segment, the tail begins touching engine or buffer state that
* finalize() resets, or stop() ordering changes so teardown can
* interleave with the unlocked tail.
*/
const bool shouldContinue =
!pcloudProducer.stimulusProducerCanceler.isCancellationRequested();
if (!shouldContinue)
{
callOriginalCallback();
return;
}
if (!success) {
std::cerr << __func__ << ": Failed to compact and collate frame" << std::endl;
} else
{
lock.unlockPrematurely();
if (pcloudProducer.pcloudFrameDumper.isEnabled())
{
try
@@ -738,7 +801,6 @@ public:
<< "ms, collateKernel=" << collateDuration.count()
<< "ms" << std::endl;
#endif
}
callOriginalCallback();
}
@@ -748,10 +810,9 @@ void PcloudStimulusProducer::produceFrameReq(
sscl::cps::Callback<produceFrameReqCbFn> callback)
{
/** EXPLANATION:
* We shouldn't acquire the StimulusProducer::shouldContinueLock here because
* this function is called from
* StimulusProducer::stimFrameProductionTimesliceInd(), which is already
* holding the lock.
* We don't do any additional canceler-lock acquisition here because
* callback segment methods already use stimulusProducerCanceler
* checkpoints before running uncancelable work.
*/
auto caller = smoHooksPtr->ComponentThread_getSelf();
auto request = std::make_shared<ProduceFrameReq>(