From a1625eb5629b45c7948dcc585ff8ade639d3cba6 Mon Sep 17 00:00:00 2001 From: Hayodea Hekol Date: Fri, 14 Nov 2025 01:41:03 -0400 Subject: [PATCH] OClCollMeshEngn: Use shouldAcceptRequests stop/finalize() pattern This makes the stop() method capable of synchronously stopping all engine/server-type async services which don't act in a self-moved fashion but instead wait for a request. --- .../openClCollatingAndMeshingEngine.cpp | 103 ++++++++++++++---- .../openClCollatingAndMeshingEngine.h | 19 +--- 2 files changed, 86 insertions(+), 36 deletions(-) diff --git a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp index bb4dd7e..582f7dc 100644 --- a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp +++ b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp @@ -92,8 +92,9 @@ slotCompactorProgram(nullptr), collateProgram(nullptr), slotCompactorKernel(nullptr), collateKernel(nullptr), clAssemblyBuffer(nullptr), clCollationBuffer(nullptr), -compactIsSetup(false), compactIsRunning(false), -collateIsSetup(false), collateIsRunning(false), +shouldAcceptRequests(false), +compactIsRunning(false), +collateIsRunning(false), currentCompactKernelEvent(nullptr), currentCollateKernelEvent(nullptr), assemblyBufferPtr(nullptr), assemblyBufferSize(0), @@ -112,8 +113,14 @@ OpenClCollatingAndMeshingEngine::~OpenClCollatingAndMeshingEngine() bool OpenClCollatingAndMeshingEngine::setup() { - if (compactIsSetup && collateIsSetup) { - return true; + // Defensive check to prevent double-calling + { + SpinLock::Guard lock(shouldAcceptRequestsLock); + if (shouldAcceptRequests) + { + throw std::runtime_error(std::string(__func__) + ": setup() called " + "while already set up"); + } } cl_int err; @@ -235,8 +242,7 @@ bool OpenClCollatingAndMeshingEngine::setup() goto cleanup; } - compactIsSetup = true; - collateIsSetup = true; + shouldAcceptRequests = true; return true; cleanup: @@ -246,8 +252,13 @@ cleanup: void OpenClCollatingAndMeshingEngine::finalize() { - // Call stop() first - stop(); + // Call stop() to set shouldAcceptRequests to false and get previous state + bool wasAcceptingRequests = stop(); + (void)wasAcceptingRequests; + + // Complete any running kernels + compactKernelComplete(); + collateKernelComplete(); // Release OpenCL buffers in reverse order if (clCollationBuffer) @@ -302,9 +313,7 @@ void OpenClCollatingAndMeshingEngine::finalize() // Reset state variables device = nullptr; platform = nullptr; - compactIsSetup = false; compactIsRunning = false; - collateIsSetup = false; collateIsRunning = false; currentCompactKernelEvent = nullptr; currentCollateKernelEvent = nullptr; @@ -400,7 +409,6 @@ bool OpenClCollatingAndMeshingEngine::startCompactKernel( 1, // globalWorkSize compactKernelEventCallback, "slotCompactor", - compactIsSetup, compactIsRunning); if (!success) { return false; } @@ -459,7 +467,6 @@ bool OpenClCollatingAndMeshingEngine::startCollateKernel( globalWorkSize, collateKernelEventCallback, "collateDgrams", - collateIsSetup, collateIsRunning); if (!success) { return false; } @@ -683,13 +690,16 @@ bool OpenClCollatingAndMeshingEngine::setupCollateDgramsArgs( return true; } -void OpenClCollatingAndMeshingEngine::stop() +bool OpenClCollatingAndMeshingEngine::stop() { - stopCompactKernel(); - stopCollateKernel(); + // Acquire and release lock tightly around setting the flag + SpinLock::Guard lock(shouldAcceptRequestsLock); + bool wasAcceptingRequests = shouldAcceptRequests; + shouldAcceptRequests = false; + return wasAcceptingRequests; } -void OpenClCollatingAndMeshingEngine::stopCompactKernel() +void OpenClCollatingAndMeshingEngine::compactKernelComplete() { /** EXPLANATION: * Technically we should only need to do this if we plan to read the @@ -706,12 +716,12 @@ void OpenClCollatingAndMeshingEngine::stopCompactKernel() clWaitForEvents(1, ¤tCompactKernelEvent); clReleaseEvent(currentCompactKernelEvent); currentCompactKernelEvent = nullptr; - compactIsRunning = false; } compactKernelCb = [](cl_int){}; + compactIsRunning = false; } -void OpenClCollatingAndMeshingEngine::stopCollateKernel() +void OpenClCollatingAndMeshingEngine::collateKernelComplete() { /** EXPLANATION: * Technically we should only need to do this if we plan to read the @@ -727,9 +737,9 @@ void OpenClCollatingAndMeshingEngine::stopCollateKernel() clWaitForEvents(1, ¤tCollateKernelEvent); clReleaseEvent(currentCollateKernelEvent); currentCollateKernelEvent = nullptr; - collateIsRunning = false; } collateKernelCb = [](cl_int){}; + collateIsRunning = false; } bool OpenClCollatingAndMeshingEngine::mapBuffer( @@ -869,6 +879,13 @@ public: void compactCollateAndMeshFrameReq1_doCompact_posted( std::shared_ptr context) { + SpinLock::Guard lock(engine.shouldAcceptRequestsLock); + if (!engine.shouldAcceptRequests) + { + callOriginalCallback(false); + return; + } + // Record compact kernel start time engine.compactKernelStartTime = std::chrono::high_resolution_clock::now(); @@ -883,7 +900,7 @@ public: if (!success) { - engine.stopCompactKernel(); + engine.compactKernelComplete(); callOriginalCallback(false); return; } @@ -893,7 +910,20 @@ public: std::shared_ptr context, cl_int compactStatus) { - engine.stopCompactKernel(); + SpinLock::Guard lock(engine.shouldAcceptRequestsLock); + if (!engine.shouldAcceptRequests) + { + /** EXPLANATION: + * We intentionally don't call compactKernelComplete() here because + * if shouldAcceptRequests is false, then the caller that called + * finalize() will also be forced to call compactKernelComplete() + * inside of finalize(). + */ + callOriginalCallback(false); + return; + } + + engine.compactKernelComplete(); // Record compact kernel end time engine.compactKernelEndTime = std::chrono::high_resolution_clock::now(); @@ -914,12 +944,20 @@ public: } #endif + lock.unlockPrematurely(); context->compactCollateAndMeshFrameReq3_doCollate_posted(context); } void compactCollateAndMeshFrameReq3_doCollate_posted( std::shared_ptr context) { + SpinLock::Guard lock(engine.shouldAcceptRequestsLock); + if (!engine.shouldAcceptRequests) + { + callOriginalCallback(false); + return; + } + // Record collate kernel start time engine.collateKernelStartTime = std::chrono::high_resolution_clock::now(); @@ -933,7 +971,7 @@ public: if (!success) { - engine.stopCollateKernel(); + engine.collateKernelComplete(); callOriginalCallback(false); return; } @@ -943,7 +981,17 @@ public: [[maybe_unused]] std::shared_ptr context, cl_int collateStatus) { - engine.stopCollateKernel(); + SpinLock::Guard lock(engine.shouldAcceptRequestsLock); + if (!engine.shouldAcceptRequests) + { + /* We intentionally don't call collateKernelComplete() here for the + * same reason as above. + */ + callOriginalCallback(false); + return; + } + + engine.collateKernelComplete(); // Record collate kernel end time engine.collateKernelEndTime = std::chrono::high_resolution_clock::now(); @@ -986,6 +1034,15 @@ void OpenClCollatingAndMeshingEngine::compactCollateAndMeshFrameReq( AsynchronousLoop& asyncLoop, StimulusFrame& stimulusFrame, Callback callback) { + { + SpinLock::Guard lock(shouldAcceptRequestsLock); + if (!shouldAcceptRequests) + { + callback.callbackFn(false, stimulusFrame); + return; + } + } + auto caller = smoHooksPtr->ComponentThread_getSelf(); auto request = std::make_shared( *this, asyncLoop, stimulusFrame, diff --git a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h index a339ac6..31a841c 100644 --- a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h +++ b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include "stagingBuffer.h" #include "frameAssemblyDesc.h" @@ -59,9 +60,9 @@ private: StagingBuffer& assemblyBuff, StagingBuffer& collationBuff, collateKernelCbFn callback); - void stopCompactKernel(); - void stopCollateKernel(); - void stop(); + void compactKernelComplete(); + void collateKernelComplete(); + bool stop(); public: // Get kernel execution durations in milliseconds @@ -80,16 +81,15 @@ private: cl_program collateProgram; cl_kernel slotCompactorKernel; cl_kernel collateKernel; - bool isSetup; // OpenCL buffers cl_mem clAssemblyBuffer; cl_mem clCollationBuffer; // State tracking - bool compactIsSetup; + SpinLock shouldAcceptRequestsLock; + bool shouldAcceptRequests; bool compactIsRunning; - bool collateIsSetup; bool collateIsRunning; cl_event currentCompactKernelEvent; cl_event currentCollateKernelEvent; @@ -155,15 +155,8 @@ private: size_t globalWorkSize, void (CL_CALLBACK *eventCallback)(cl_event, cl_int, void*), const char* kernelName, - bool& isSetup, bool& isRunning) { - if (!isSetup) - { - std::cerr << __func__ << ": engine not set up" << std::endl; - return false; - } - if (isRunning) { std::cerr << __func__ << ": already running, call stop() first"