OClCollMeshEngn: Use shouldAcceptRequests stop/finalize() pattern

This makes the stop() method capable of synchronously stopping all
engine/server-type async services which don't act in a self-moved
fashion but instead wait for a request.
This commit is contained in:
2025-11-14 01:41:03 -04:00
parent 324e3d1f6a
commit a1625eb562
2 changed files with 86 additions and 36 deletions
@@ -92,8 +92,9 @@ slotCompactorProgram(nullptr), collateProgram(nullptr),
slotCompactorKernel(nullptr), collateKernel(nullptr), slotCompactorKernel(nullptr), collateKernel(nullptr),
clAssemblyBuffer(nullptr), clAssemblyBuffer(nullptr),
clCollationBuffer(nullptr), clCollationBuffer(nullptr),
compactIsSetup(false), compactIsRunning(false), shouldAcceptRequests(false),
collateIsSetup(false), collateIsRunning(false), compactIsRunning(false),
collateIsRunning(false),
currentCompactKernelEvent(nullptr), currentCollateKernelEvent(nullptr), currentCompactKernelEvent(nullptr), currentCollateKernelEvent(nullptr),
assemblyBufferPtr(nullptr), assemblyBufferPtr(nullptr),
assemblyBufferSize(0), assemblyBufferSize(0),
@@ -112,8 +113,14 @@ OpenClCollatingAndMeshingEngine::~OpenClCollatingAndMeshingEngine()
bool OpenClCollatingAndMeshingEngine::setup() bool OpenClCollatingAndMeshingEngine::setup()
{ {
if (compactIsSetup && collateIsSetup) { // Defensive check to prevent double-calling
return true; {
SpinLock::Guard lock(shouldAcceptRequestsLock);
if (shouldAcceptRequests)
{
throw std::runtime_error(std::string(__func__) + ": setup() called "
"while already set up");
}
} }
cl_int err; cl_int err;
@@ -235,8 +242,7 @@ bool OpenClCollatingAndMeshingEngine::setup()
goto cleanup; goto cleanup;
} }
compactIsSetup = true; shouldAcceptRequests = true;
collateIsSetup = true;
return true; return true;
cleanup: cleanup:
@@ -246,8 +252,13 @@ cleanup:
void OpenClCollatingAndMeshingEngine::finalize() void OpenClCollatingAndMeshingEngine::finalize()
{ {
// Call stop() first // Call stop() to set shouldAcceptRequests to false and get previous state
stop(); bool wasAcceptingRequests = stop();
(void)wasAcceptingRequests;
// Complete any running kernels
compactKernelComplete();
collateKernelComplete();
// Release OpenCL buffers in reverse order // Release OpenCL buffers in reverse order
if (clCollationBuffer) if (clCollationBuffer)
@@ -302,9 +313,7 @@ void OpenClCollatingAndMeshingEngine::finalize()
// Reset state variables // Reset state variables
device = nullptr; device = nullptr;
platform = nullptr; platform = nullptr;
compactIsSetup = false;
compactIsRunning = false; compactIsRunning = false;
collateIsSetup = false;
collateIsRunning = false; collateIsRunning = false;
currentCompactKernelEvent = nullptr; currentCompactKernelEvent = nullptr;
currentCollateKernelEvent = nullptr; currentCollateKernelEvent = nullptr;
@@ -400,7 +409,6 @@ bool OpenClCollatingAndMeshingEngine::startCompactKernel(
1, // globalWorkSize 1, // globalWorkSize
compactKernelEventCallback, compactKernelEventCallback,
"slotCompactor", "slotCompactor",
compactIsSetup,
compactIsRunning); compactIsRunning);
if (!success) { return false; } if (!success) { return false; }
@@ -459,7 +467,6 @@ bool OpenClCollatingAndMeshingEngine::startCollateKernel(
globalWorkSize, globalWorkSize,
collateKernelEventCallback, collateKernelEventCallback,
"collateDgrams", "collateDgrams",
collateIsSetup,
collateIsRunning); collateIsRunning);
if (!success) { return false; } if (!success) { return false; }
@@ -683,13 +690,16 @@ bool OpenClCollatingAndMeshingEngine::setupCollateDgramsArgs(
return true; return true;
} }
void OpenClCollatingAndMeshingEngine::stop() bool OpenClCollatingAndMeshingEngine::stop()
{ {
stopCompactKernel(); // Acquire and release lock tightly around setting the flag
stopCollateKernel(); SpinLock::Guard lock(shouldAcceptRequestsLock);
bool wasAcceptingRequests = shouldAcceptRequests;
shouldAcceptRequests = false;
return wasAcceptingRequests;
} }
void OpenClCollatingAndMeshingEngine::stopCompactKernel() void OpenClCollatingAndMeshingEngine::compactKernelComplete()
{ {
/** EXPLANATION: /** EXPLANATION:
* Technically we should only need to do this if we plan to read the * Technically we should only need to do this if we plan to read the
@@ -706,12 +716,12 @@ void OpenClCollatingAndMeshingEngine::stopCompactKernel()
clWaitForEvents(1, &currentCompactKernelEvent); clWaitForEvents(1, &currentCompactKernelEvent);
clReleaseEvent(currentCompactKernelEvent); clReleaseEvent(currentCompactKernelEvent);
currentCompactKernelEvent = nullptr; currentCompactKernelEvent = nullptr;
compactIsRunning = false;
} }
compactKernelCb = [](cl_int){}; compactKernelCb = [](cl_int){};
compactIsRunning = false;
} }
void OpenClCollatingAndMeshingEngine::stopCollateKernel() void OpenClCollatingAndMeshingEngine::collateKernelComplete()
{ {
/** EXPLANATION: /** EXPLANATION:
* Technically we should only need to do this if we plan to read the * Technically we should only need to do this if we plan to read the
@@ -727,9 +737,9 @@ void OpenClCollatingAndMeshingEngine::stopCollateKernel()
clWaitForEvents(1, &currentCollateKernelEvent); clWaitForEvents(1, &currentCollateKernelEvent);
clReleaseEvent(currentCollateKernelEvent); clReleaseEvent(currentCollateKernelEvent);
currentCollateKernelEvent = nullptr; currentCollateKernelEvent = nullptr;
collateIsRunning = false;
} }
collateKernelCb = [](cl_int){}; collateKernelCb = [](cl_int){};
collateIsRunning = false;
} }
bool OpenClCollatingAndMeshingEngine::mapBuffer( bool OpenClCollatingAndMeshingEngine::mapBuffer(
@@ -869,6 +879,13 @@ public:
void compactCollateAndMeshFrameReq1_doCompact_posted( void compactCollateAndMeshFrameReq1_doCompact_posted(
std::shared_ptr<CompactCollateAndMeshFrameReq> context) std::shared_ptr<CompactCollateAndMeshFrameReq> context)
{ {
SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
if (!engine.shouldAcceptRequests)
{
callOriginalCallback(false);
return;
}
// Record compact kernel start time // Record compact kernel start time
engine.compactKernelStartTime = std::chrono::high_resolution_clock::now(); engine.compactKernelStartTime = std::chrono::high_resolution_clock::now();
@@ -883,7 +900,7 @@ public:
if (!success) if (!success)
{ {
engine.stopCompactKernel(); engine.compactKernelComplete();
callOriginalCallback(false); callOriginalCallback(false);
return; return;
} }
@@ -893,7 +910,20 @@ public:
std::shared_ptr<CompactCollateAndMeshFrameReq> context, std::shared_ptr<CompactCollateAndMeshFrameReq> context,
cl_int compactStatus) cl_int compactStatus)
{ {
engine.stopCompactKernel(); SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
if (!engine.shouldAcceptRequests)
{
/** EXPLANATION:
* We intentionally don't call compactKernelComplete() here because
* if shouldAcceptRequests is false, then the caller that called
* finalize() will also be forced to call compactKernelComplete()
* inside of finalize().
*/
callOriginalCallback(false);
return;
}
engine.compactKernelComplete();
// Record compact kernel end time // Record compact kernel end time
engine.compactKernelEndTime = std::chrono::high_resolution_clock::now(); engine.compactKernelEndTime = std::chrono::high_resolution_clock::now();
@@ -914,12 +944,20 @@ public:
} }
#endif #endif
lock.unlockPrematurely();
context->compactCollateAndMeshFrameReq3_doCollate_posted(context); context->compactCollateAndMeshFrameReq3_doCollate_posted(context);
} }
void compactCollateAndMeshFrameReq3_doCollate_posted( void compactCollateAndMeshFrameReq3_doCollate_posted(
std::shared_ptr<CompactCollateAndMeshFrameReq> context) std::shared_ptr<CompactCollateAndMeshFrameReq> context)
{ {
SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
if (!engine.shouldAcceptRequests)
{
callOriginalCallback(false);
return;
}
// Record collate kernel start time // Record collate kernel start time
engine.collateKernelStartTime = std::chrono::high_resolution_clock::now(); engine.collateKernelStartTime = std::chrono::high_resolution_clock::now();
@@ -933,7 +971,7 @@ public:
if (!success) if (!success)
{ {
engine.stopCollateKernel(); engine.collateKernelComplete();
callOriginalCallback(false); callOriginalCallback(false);
return; return;
} }
@@ -943,7 +981,17 @@ public:
[[maybe_unused]] std::shared_ptr<CompactCollateAndMeshFrameReq> context, [[maybe_unused]] std::shared_ptr<CompactCollateAndMeshFrameReq> context,
cl_int collateStatus) cl_int collateStatus)
{ {
engine.stopCollateKernel(); SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
if (!engine.shouldAcceptRequests)
{
/* We intentionally don't call collateKernelComplete() here for the
* same reason as above.
*/
callOriginalCallback(false);
return;
}
engine.collateKernelComplete();
// Record collate kernel end time // Record collate kernel end time
engine.collateKernelEndTime = std::chrono::high_resolution_clock::now(); engine.collateKernelEndTime = std::chrono::high_resolution_clock::now();
@@ -986,6 +1034,15 @@ void OpenClCollatingAndMeshingEngine::compactCollateAndMeshFrameReq(
AsynchronousLoop& asyncLoop, StimulusFrame& stimulusFrame, AsynchronousLoop& asyncLoop, StimulusFrame& stimulusFrame,
Callback<compactCollateAndMeshFrameReqCbFn> callback) Callback<compactCollateAndMeshFrameReqCbFn> callback)
{ {
{
SpinLock::Guard lock(shouldAcceptRequestsLock);
if (!shouldAcceptRequests)
{
callback.callbackFn(false, stimulusFrame);
return;
}
}
auto caller = smoHooksPtr->ComponentThread_getSelf(); auto caller = smoHooksPtr->ComponentThread_getSelf();
auto request = std::make_shared<CompactCollateAndMeshFrameReq>( auto request = std::make_shared<CompactCollateAndMeshFrameReq>(
*this, asyncLoop, stimulusFrame, *this, asyncLoop, stimulusFrame,
@@ -13,6 +13,7 @@
#include <CL/cl.h> #include <CL/cl.h>
#include <asynchronousLoop.h> #include <asynchronousLoop.h>
#include <callback.h> #include <callback.h>
#include <spinLock.h>
#include <user/stimulusFrame.h> #include <user/stimulusFrame.h>
#include "stagingBuffer.h" #include "stagingBuffer.h"
#include "frameAssemblyDesc.h" #include "frameAssemblyDesc.h"
@@ -59,9 +60,9 @@ private:
StagingBuffer& assemblyBuff, StagingBuffer& collationBuff, StagingBuffer& assemblyBuff, StagingBuffer& collationBuff,
collateKernelCbFn callback); collateKernelCbFn callback);
void stopCompactKernel(); void compactKernelComplete();
void stopCollateKernel(); void collateKernelComplete();
void stop(); bool stop();
public: public:
// Get kernel execution durations in milliseconds // Get kernel execution durations in milliseconds
@@ -80,16 +81,15 @@ private:
cl_program collateProgram; cl_program collateProgram;
cl_kernel slotCompactorKernel; cl_kernel slotCompactorKernel;
cl_kernel collateKernel; cl_kernel collateKernel;
bool isSetup;
// OpenCL buffers // OpenCL buffers
cl_mem clAssemblyBuffer; cl_mem clAssemblyBuffer;
cl_mem clCollationBuffer; cl_mem clCollationBuffer;
// State tracking // State tracking
bool compactIsSetup; SpinLock shouldAcceptRequestsLock;
bool shouldAcceptRequests;
bool compactIsRunning; bool compactIsRunning;
bool collateIsSetup;
bool collateIsRunning; bool collateIsRunning;
cl_event currentCompactKernelEvent; cl_event currentCompactKernelEvent;
cl_event currentCollateKernelEvent; cl_event currentCollateKernelEvent;
@@ -155,15 +155,8 @@ private:
size_t globalWorkSize, size_t globalWorkSize,
void (CL_CALLBACK *eventCallback)(cl_event, cl_int, void*), void (CL_CALLBACK *eventCallback)(cl_event, cl_int, void*),
const char* kernelName, const char* kernelName,
bool& isSetup,
bool& isRunning) bool& isRunning)
{ {
if (!isSetup)
{
std::cerr << __func__ << ": engine not set up" << std::endl;
return false;
}
if (isRunning) if (isRunning)
{ {
std::cerr << __func__ << ": already running, call stop() first" std::cerr << __func__ << ": already running, call stop() first"