From 35eb466a60cbf7d0d4508c072855429d92d95a38 Mon Sep 17 00:00:00 2001 From: Hayodea Hekol Date: Sat, 30 May 2026 19:32:19 -0400 Subject: [PATCH] OClCollMeshEngn,PcloudStimProd: port to sscl::co coros We've now ported the OpenClCollMeshEngn and PcloudStimProd::produceFrameReq portions of the Livox pipeline to coros. --- .../adapters/opencl/clKernelCompletionAReq.h | 93 +++++ include/adapters/smo/assembleFrameAReq.h | 44 +++ stimBuffApis/livoxGen1/CMakeLists.txt | 1 + .../openClCollatingAndMeshingEngine.cpp | 324 +++++++----------- .../openClCollatingAndMeshingEngine.h | 15 +- .../livoxGen1/pcloudStimulusProducer.cpp | 276 +++++++-------- .../livoxGen1/pcloudStimulusProducer.h | 18 +- 7 files changed, 398 insertions(+), 373 deletions(-) create mode 100644 include/adapters/opencl/clKernelCompletionAReq.h create mode 100644 include/adapters/smo/assembleFrameAReq.h diff --git a/include/adapters/opencl/clKernelCompletionAReq.h b/include/adapters/opencl/clKernelCompletionAReq.h new file mode 100644 index 0000000..eedcd32 --- /dev/null +++ b/include/adapters/opencl/clKernelCompletionAReq.h @@ -0,0 +1,93 @@ +#ifndef ADAPTERS_OPENCL_CL_KERNEL_COMPLETION_AREQ_H +#define ADAPTERS_OPENCL_CL_KERNEL_COMPLETION_AREQ_H + +#include +#include +#include +#include + +#define CL_TARGET_OPENCL_VERSION 120 +#include + +#include +#include + +namespace smo { +namespace openclBoundary { + +/** Eager-start std::function callback -> coroutine adapter for OpenCL kernel + * completion (cl_int event_command_exec_status posted to io_context). + */ +template +class ClKernelCompletionAReq +{ +public: + struct AsyncState + { + std::atomic settled{false}; + cl_int result{}; + std::coroutine_handle<> callerSchedHandle; + }; + + explicit ClKernelCompletionAReq( + boost::asio::io_context &resumeIoContext, + StartFn startFn) + : asyncState(std::make_shared()), + resumeIoContext(resumeIoContext) + { + startFn([this](cl_int eventCommandExecStatus) + { + asyncState->result = eventCommandExecStatus; + signalSettledAndResumeCaller(); + }); + } + + bool await_ready() const noexcept + { + return asyncState->settled.load(std::memory_order_acquire); + } + + bool await_suspend(std::coroutine_handle<> callerSchedHandle) noexcept + { + if (asyncState->settled.load(std::memory_order_acquire)) { + return false; + } + + asyncState->callerSchedHandle = callerSchedHandle; + return true; + } + + cl_int await_resume() noexcept + { + return asyncState->result; + } + +private: + void signalSettledAndResumeCaller() noexcept + { + asyncState->settled.store(true, std::memory_order_release); + + std::coroutine_handle<> handle = asyncState->callerSchedHandle; + if (!handle) { + return; + } + + boost::asio::post(resumeIoContext, handle); + } + + std::shared_ptr asyncState; + boost::asio::io_context &resumeIoContext; +}; + +template +ClKernelCompletionAReq getClKernelCompletionAReqAwaiter( + boost::asio::io_context &resumeIoContext, + StartFn startFn) +{ + return ClKernelCompletionAReq(resumeIoContext, std::move(startFn)); +} + +} // namespace openclBoundary +} // namespace smo + +#endif // ADAPTERS_OPENCL_CL_KERNEL_COMPLETION_AREQ_H diff --git a/include/adapters/smo/assembleFrameAReq.h b/include/adapters/smo/assembleFrameAReq.h new file mode 100644 index 0000000..82e428a --- /dev/null +++ b/include/adapters/smo/assembleFrameAReq.h @@ -0,0 +1,44 @@ +#ifndef ADAPTERS_SMO_ASSEMBLE_FRAME_AREQ_H +#define ADAPTERS_SMO_ASSEMBLE_FRAME_AREQ_H + +#include +#include +#include +#include + +namespace smo { +namespace cpsBoundary { + +struct AssembleFrameResult +{ + bool success = false; + sscl::AsynchronousLoop loop{0}; +}; + +inline CpsCallbackAReq< + AssembleFrameResult, + stim_buff::IoUringAssemblyEngine::assembleFrameReqCbFn, + std::function)>> +getAssembleFrameReqAReqAwaiter( + boost::asio::io_context &resumeIoContext, + stim_buff::IoUringAssemblyEngine &engine) +{ + return CpsCallbackAReq< + AssembleFrameResult, + stim_buff::IoUringAssemblyEngine::assembleFrameReqCbFn, + std::function)>>( + resumeIoContext, + [&engine]( + sscl::cps::Callback< + stim_buff::IoUringAssemblyEngine::assembleFrameReqCbFn> cb) + { + engine.assembleFrameReq(std::move(cb)); + }); +} + +} // namespace cpsBoundary +} // namespace smo + +#endif // ADAPTERS_SMO_ASSEMBLE_FRAME_AREQ_H diff --git a/stimBuffApis/livoxGen1/CMakeLists.txt b/stimBuffApis/livoxGen1/CMakeLists.txt index 550d60c..5572f44 100644 --- a/stimBuffApis/livoxGen1/CMakeLists.txt +++ b/stimBuffApis/livoxGen1/CMakeLists.txt @@ -38,6 +38,7 @@ if(ENABLE_STIMBUFFAPI_livoxGen1) ${Boost_INCLUDE_DIRS} ${CMAKE_SOURCE_DIR}/include ${CMAKE_SOURCE_DIR}/smocore/include + ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_SOURCE_DIR}/commonLibs ${URING_INCLUDE_DIRS} ${OPENCL_INCLUDE_DIRS} diff --git a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp index ac224fb..06e8cf8 100644 --- a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp +++ b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp @@ -8,10 +8,10 @@ #include #include #include -#include -#include -#include #include +#include +#include +#include #include #include #include @@ -998,162 +998,135 @@ void OpenClCollatingAndMeshingEngine::produceAmbienceStimulusFrame( passbandCountOut = passbandCount; } -class OpenClCollatingAndMeshingEngine::CompactCollateAndMeshFrameReq -: public sscl::cps::PostedAsynchronousContinuation +sscl::co::ViralNonPostingInvoker +OpenClCollatingAndMeshingEngine::compactCollateAndMeshFrameCReq( + sscl::AsynchronousLoop& frameAssemblyResult, + StimulusFrame& /*stimulusFrame*/, + std::optional> intensityStimFrame, + std::optional lightAmbienceProductionDesc, + std::optional darkAmbienceProductionDesc) { -private: - OpenClCollatingAndMeshingEngine& engine; - sscl::AsynchronousLoop frameAssemblyResult; - StimulusFrame& stimulusFrame; - std::optional> intensityStimFrame; - std::optional lightAmbienceProductionDesc; - std::optional darkAmbienceProductionDesc; - -public: - CompactCollateAndMeshFrameReq( - OpenClCollatingAndMeshingEngine& engine_, - sscl::AsynchronousLoop& asyncLoop, - StimulusFrame& stimulusFrame_, - std::optional> intensityStimFrame_, - std::optional lightAmbienceProductionDesc_, - std::optional darkAmbienceProductionDesc_, - const std::shared_ptr& caller, - sscl::cps::Callback cb) - : sscl::cps::PostedAsynchronousContinuation( - caller, cb), - engine(engine_), - frameAssemblyResult(asyncLoop), stimulusFrame(stimulusFrame_), - intensityStimFrame(intensityStimFrame_), - lightAmbienceProductionDesc(std::move(lightAmbienceProductionDesc_)), - darkAmbienceProductionDesc(std::move(darkAmbienceProductionDesc_)) - {} - - bool anyAmbienceAttached() const { - return lightAmbienceProductionDesc.has_value() - || darkAmbienceProductionDesc.has_value(); + sscl::SpinLock::Guard guard(openClCollMeshEngnCanceler.s.lock); + if (openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) + { co_return false; } } -public: - void callOriginalCallback(bool success) - { callOriginalCb(success, std::ref(stimulusFrame)); } + auto& resumeIoContext = parent.device->componentThread->getIoContext(); -public: - void compactCollateAndMeshFrameReq1_doCompact_posted( - std::shared_ptr context) + bool needsCompaction = IoUringAssemblyEngine::compactionIsNeeded( + frameAssemblyResult.nSucceeded.load(), frameAssemblyResult.nTotal); + + bool anyAmbienceAttached = lightAmbienceProductionDesc.has_value() + || darkAmbienceProductionDesc.has_value(); + + if (needsCompaction) { - sscl::SpinLock::Guard guard(engine.openClCollMeshEngnCanceler.s.lock); - if (engine.openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) + // compactCollateAndMeshFrameReq1_doCompact_posted { - callOriginalCallback(false); - return; + sscl::SpinLock::Guard guard(openClCollMeshEngnCanceler.s.lock); + if (openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) + { co_return false; } + + // Record compact kernel start time + compactKernelStartTime = std::chrono::high_resolution_clock::now(); } - // Record compact kernel start time - engine.compactKernelStartTime = std::chrono::high_resolution_clock::now(); - - bool success = engine.startCompactKernel( - engine.parent.assemblyBuffer, - static_cast(context->frameAssemblyResult.nSucceeded.load()), - std::bind( - &CompactCollateAndMeshFrameReq - ::compactCollateAndMeshFrameReq2_compactDone_posted, - context.get(), context, - std::placeholders::_1)); - - if (!success) + cl_int compactStatus = co_await openclBoundary::getClKernelCompletionAReqAwaiter( + resumeIoContext, + [this, &frameAssemblyResult](std::function completionCb) { - engine.compactKernelComplete(); - callOriginalCallback(false); - return; - } - } + bool success = startCompactKernel( + parent.assemblyBuffer, + static_cast( + frameAssemblyResult.nSucceeded.load()), + std::move(completionCb)); + if (!success) + { + compactKernelComplete(); + completionCb(CL_INVALID_OPERATION); + } + }); - void compactCollateAndMeshFrameReq2_compactDone_posted( - std::shared_ptr context, - cl_int compactStatus) - { - sscl::SpinLock::Guard guard(engine.openClCollMeshEngnCanceler.s.lock); - if (engine.openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) + if (compactStatus == CL_INVALID_OPERATION) { - /** EXPLANATION: - * We intentionally don't call compactKernelComplete() here because - * if shouldAcceptRequests is false, then the caller that called - * finalize() will also be forced to call compactKernelComplete() - * inside of finalize(). - */ - callOriginalCallback(false); - return; + co_return false; } - engine.compactKernelComplete(); - // Record compact kernel end time - engine.compactKernelEndTime = std::chrono::high_resolution_clock::now(); - - // If compact failed, call callback directly with failure - if (compactStatus != CL_SUCCESS) + // compactCollateAndMeshFrameReq2_compactDone_posted { - callOriginalCallback(false); - return; - } + sscl::SpinLock::Guard guard(openClCollMeshEngnCanceler.s.lock); + if (openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) + { + /** EXPLANATION: + * We intentionally don't call compactKernelComplete() here because + * if shouldAcceptRequests is false, then the caller that called + * finalize() will also be forced to call compactKernelComplete() + * inside of finalize(). + */ + co_return false; + } + + compactKernelComplete(); + // Record compact kernel end time + compactKernelEndTime = std::chrono::high_resolution_clock::now(); + + // If compact failed, call callback directly with failure + if (compactStatus != CL_SUCCESS) { + co_return false; + } #if 0 - // Print first 4 bytes of each slot - if (engine.frameAssemblyDesc) - { - for (size_t i = 0; i < engine.frameAssemblyDesc->numSlots; ++i) { - engine.parent.ioUringAssemblyEngine.printSlotBytes(i, 4); + // Print first 4 bytes of each slot + if (frameAssemblyDesc) + { + for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i) { + parent.ioUringAssemblyEngine.printSlotBytes(i, 4); + } } - } #endif - - guard.unlockPrematurely(); - context->compactCollateAndMeshFrameReq3_doCollate_posted(context); + } } - void compactCollateAndMeshFrameReq3_doCollate_posted( - std::shared_ptr context) + // compactCollateAndMeshFrameReq3_doCollate_posted { - sscl::SpinLock::Guard guard(engine.openClCollMeshEngnCanceler.s.lock); - if (engine.openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) - { - callOriginalCallback(false); - return; + sscl::SpinLock::Guard guard(openClCollMeshEngnCanceler.s.lock); + if (openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) { + co_return false; } // Record collate kernel start time - engine.collateKernelStartTime = std::chrono::high_resolution_clock::now(); - - bool success = engine.startCollateKernel( - context->intensityStimFrame, context->anyAmbienceAttached(), - std::bind( - &CompactCollateAndMeshFrameReq - ::compactCollateAndMeshFrameReq4_collateDone_maybePosted, - context.get(), context, - std::placeholders::_1)); - - if (!success) - { - engine.collateKernelComplete( - context->intensityStimFrame, context->anyAmbienceAttached()); - - callOriginalCallback(false); - return; - } + collateKernelStartTime = std::chrono::high_resolution_clock::now(); } - void compactCollateAndMeshFrameReq4_collateDone_maybePosted( - [[maybe_unused]] std::shared_ptr context, - cl_int collateStatus) + cl_int collateStatus = co_await openclBoundary::getClKernelCompletionAReqAwaiter( + resumeIoContext, + [this, intensityStimFrame, anyAmbienceAttached]( + std::function completionCb) { - sscl::SpinLock::Guard guard(engine.openClCollMeshEngnCanceler.s.lock); - if (engine.openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) + bool success = startCollateKernel( + intensityStimFrame, anyAmbienceAttached, + std::move(completionCb)); + if (!success) { + collateKernelComplete( + intensityStimFrame, anyAmbienceAttached); + completionCb(CL_INVALID_OPERATION); + } + }); + + if (collateStatus == CL_INVALID_OPERATION) { + co_return false; + } + + // compactCollateAndMeshFrameReq4_collateDone_maybePosted + { + sscl::SpinLock::Guard guard(openClCollMeshEngnCanceler.s.lock); + if (openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) { /* We intentionally don't call collateKernelComplete() here for the * same reason as above. */ - callOriginalCallback(false); - return; + co_return false; } /** EXPLANATION: @@ -1165,55 +1138,51 @@ public: * Therefore it's finalize()'s responsibility to ensure that it properly * completes/cleans up any in-flight operations. */ - engine.collateKernelComplete( - context->intensityStimFrame, context->anyAmbienceAttached()); + collateKernelComplete( + intensityStimFrame, anyAmbienceAttached); // Produce each attached ambience stimbuff's passband count from // the per-slot averages the collate kernel staged. uint32_t nSucceededForAmbience = - context->frameAssemblyResult.nSucceeded.load(); + frameAssemblyResult.nSucceeded.load(); - if (context->lightAmbienceProductionDesc.has_value()) + if (lightAmbienceProductionDesc.has_value()) { - engine.produceAmbienceStimulusFrame( - context->lightAmbienceProductionDesc->frame.get(), - context->lightAmbienceProductionDesc->comparator, + produceAmbienceStimulusFrame( + lightAmbienceProductionDesc->frame.get(), + lightAmbienceProductionDesc->comparator, nSucceededForAmbience); } - if (context->darkAmbienceProductionDesc.has_value()) + if (darkAmbienceProductionDesc.has_value()) { - engine.produceAmbienceStimulusFrame( - context->darkAmbienceProductionDesc->frame.get(), - context->darkAmbienceProductionDesc->comparator, + produceAmbienceStimulusFrame( + darkAmbienceProductionDesc->frame.get(), + darkAmbienceProductionDesc->comparator, nSucceededForAmbience); } // Record collate kernel end time - engine.collateKernelEndTime = std::chrono::high_resolution_clock::now(); - - bool success = (collateStatus == CL_SUCCESS); + collateKernelEndTime = std::chrono::high_resolution_clock::now(); // Early callback + return pattern - if (!success) - { - callOriginalCallback(false); - return; - } + bool success = (collateStatus == CL_SUCCESS); + if (!success) { co_return false; } - uint32_t nSucceeded = context->frameAssemblyResult.nSucceeded.load(); + uint32_t nSucceeded = frameAssemblyResult.nSucceeded.load(); - int returnMode = static_cast(engine.parent.device->currentReturnMode); + int returnMode = static_cast(parent.device->currentReturnMode); size_t pointsPerDgram = livoxProto1::Device::getNPointsPerDgram( returnMode); size_t totalPoints = nSucceeded * pointsPerDgram; // Count points with intensity greater than 116 size_t highIntensityCount = 0; - if (context->intensityStimFrame.has_value()) + if (intensityStimFrame.has_value()) { - StimulusFrame& intensityFrame = context->intensityStimFrame->get(); - float* intensityFloats = reinterpret_cast(intensityFrame.slotDesc.vaddr); + StimulusFrame& intensityFrame = intensityStimFrame->get(); + float* intensityFloats = reinterpret_cast( + intensityFrame.slotDesc.vaddr); for (size_t i = 0; i < totalPoints; ++i) { float intensity = intensityFloats[i]; @@ -1227,62 +1196,15 @@ public: #if 0 std::cout << __func__ << ": intensityRingBufferIndex=" - << (context->intensityStimFrame.has_value() ? - context->intensityStimFrame->get().ringBufferIndex : SIZE_MAX) + << (intensityStimFrame.has_value() ? + intensityStimFrame->get().ringBufferIndex : SIZE_MAX) << ", pointsPerDgram=" << pointsPerDgram << ", nSucceeded=" << nSucceeded << ", totalPoints=" << totalPoints << ", highIntensityCount=" << highIntensityCount << std::endl; #endif - callOriginalCallback(success); - } -}; - -void OpenClCollatingAndMeshingEngine::compactCollateAndMeshFrameReq( - sscl::AsynchronousLoop& asyncLoop, StimulusFrame& stimulusFrame, - std::optional> intensityStimFrame, - std::optional lightAmbienceProductionDesc, - std::optional darkAmbienceProductionDesc, - sscl::cps::Callback callback) -{ - { - sscl::SpinLock::Guard guard(openClCollMeshEngnCanceler.s.lock); - if (openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) - { - callback.callbackFn(false, stimulusFrame); - return; - } - } - - auto caller = smoHooksPtr->ComponentThread_getSelf(); - auto request = std::make_shared( - *this, asyncLoop, stimulusFrame, intensityStimFrame, - std::move(lightAmbienceProductionDesc), std::move(darkAmbienceProductionDesc), - caller, - std::move(callback)); - - // Check if compaction is needed - bool needsCompaction = IoUringAssemblyEngine::compactionIsNeeded( - asyncLoop.nSucceeded.load(), asyncLoop.nTotal); - - // Start with compaction if needed, then chain to collation - if (needsCompaction) - { - boost::asio::post(parent.device->componentThread->getIoContext(), - STC(std::bind( - &CompactCollateAndMeshFrameReq - ::compactCollateAndMeshFrameReq1_doCompact_posted, - request.get(), request))); - } - else - { - // Skip compaction, go straight to collation - boost::asio::post(parent.device->componentThread->getIoContext(), - STC(std::bind( - &CompactCollateAndMeshFrameReq - ::compactCollateAndMeshFrameReq3_doCollate_posted, - request.get(), request))); + co_return success; } } diff --git a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h index 4477164..92741fe 100644 --- a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h +++ b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h @@ -13,7 +13,7 @@ #define CL_TARGET_OPENCL_VERSION 120 #include #include -#include +#include #include #include #include @@ -86,14 +86,12 @@ public: bool setup(); void finalize(); - typedef std::function - compactCollateAndMeshFrameReqCbFn; - void compactCollateAndMeshFrameReq( - sscl::AsynchronousLoop& asyncLoop, StimulusFrame& stimulusFrame, + sscl::co::ViralNonPostingInvoker compactCollateAndMeshFrameCReq( + sscl::AsynchronousLoop& frameAssemblyResult, + StimulusFrame& stimulusFrame, std::optional> intensityStimFrame, std::optional lightAmbienceProductionDesc, - std::optional darkAmbienceProductionDesc, - sscl::cps::Callback callback); + std::optional darkAmbienceProductionDesc); private: // Callback function types @@ -214,9 +212,6 @@ private: bool mapAverageIntensityBuffer(cl_map_flags mapFlags = CL_MAP_READ); bool unmapAverageIntensityBuffer(); - // Forward declaration for continuation class - class CompactCollateAndMeshFrameReq; - // Unified kernel start function template bool startKernel( diff --git a/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp b/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp index b5e2e8e..5d41179 100644 --- a/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp +++ b/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp @@ -11,6 +11,8 @@ #include #include #include +#include +#include #include #include #include @@ -428,219 +430,205 @@ PcloudStimulusProducer::getOrCreateAttachedStimulusBuffer( void PcloudStimulusProducer::stimFrameProductionTimesliceInd() { - produceFrameReq({nullptr, nullptr}); + holdProduceFrameCReq(); } -class PcloudStimulusProducer::ProduceFrameReq -: public sscl::cps::PostedAsynchronousContinuation +struct AllowNextStimulusFrameGuard { -private: - PcloudStimulusProducer& pcloudProducer; - sscl::AsynchronousLoop frameAssemblyResult; - StimulusFrame& stimulusFrame; + PcloudStimulusProducer& producer; + + explicit AllowNextStimulusFrameGuard(PcloudStimulusProducer& _producer) + : producer(_producer) + {} + + ~AllowNextStimulusFrameGuard() + { producer.allowNextStimulusFrame(); } +}; + +void PcloudStimulusProducer::holdProduceFrameCReq() +{ + /** EXPLANATION: + * We shouldn't acquire stimulusProducerCanceler.s.lock here because + * this function is called from + * StimulusProducer::stimFrameProductionTimesliceInd(), which is already + * holding the lock. + */ + activeProduceFrameInvoker.emplace(produceFrameCReq( + produceFrameExceptionPtr, + [this]() { activeProduceFrameInvoker.reset(); })); +} + +sscl::co::NonViralNonPostingInvoker PcloudStimulusProducer::produceFrameCReq( + [[maybe_unused]] std::exception_ptr& exceptionPtr, + [[maybe_unused]] std::function completion) +{ + AllowNextStimulusFrameGuard allowNextGuard(*this); + StimulusFrame& stimulusFrame = tempStimulusFrame; + sscl::AsynchronousLoop frameAssemblyResult(0); std::optional> intensityStimFrame; std::optional> lightAmbienceStimFrame; std::optional> darkAmbienceStimFrame; -public: - ProduceFrameReq( - PcloudStimulusProducer& producer, - const std::shared_ptr& caller, - sscl::cps::Callback cb) - : sscl::cps::PostedAsynchronousContinuation(caller, cb), - pcloudProducer(producer), - frameAssemblyResult(0), - stimulusFrame(producer.tempStimulusFrame) - {} + auto& resumeIoContext = device->componentThread->getIoContext(); -public: - void callOriginalCallback() + // produceFrameReq1_doAssemble_posted + /** EXPLANATION: + * stimFrameProductionTimesliceInd() is entered with + * stimulusProducerCanceler.s.lock held; do not re-acquire here. + * + * This function is called from + * StimulusProducer::stimFrameProductionTimesliceInd(), whose caller is + * already holding the lock. + */ + if (stimulusProducerCanceler.isCancellationRequestedUnlocked()) + { co_return; } + + cpsBoundary::AssembleFrameResult assembleResult = co_await + cpsBoundary::getAssembleFrameReqAReqAwaiter( + resumeIoContext, ioUringAssemblyEngine); + + // produceFrameReq2_assembleDone + std::optional lightAmbienceProductionDescDesc; + std::optional darkAmbienceProductionDescDesc; { - pcloudProducer.allowNextStimulusFrame(); - callOriginalCb(); - } + sscl::SpinLock::Guard guard(stimulusProducerCanceler.s.lock); + if (stimulusProducerCanceler.isCancellationRequestedUnlocked()) + { co_return; } -public: - void produceFrameReq1_doAssemble_posted( - std::shared_ptr context) - { - sscl::SpinLock::Guard guard( - pcloudProducer.stimulusProducerCanceler.s.lock); - if (pcloudProducer.stimulusProducerCanceler - .isCancellationRequestedUnlocked()) + if (!assembleResult.success) { - callOriginalCallback(); - return; - } - - pcloudProducer.ioUringAssemblyEngine.assembleFrameReq( - {context, std::bind( - &ProduceFrameReq::produceFrameReq2_assembleDone, - context.get(), context, - std::placeholders::_1, std::placeholders::_2)}); - } - - void produceFrameReq2_assembleDone( - std::shared_ptr context, - bool success, sscl::AsynchronousLoop loop) - { - sscl::SpinLock::Guard guard( - pcloudProducer.stimulusProducerCanceler.s.lock); - if (pcloudProducer.stimulusProducerCanceler - .isCancellationRequestedUnlocked()) - { - callOriginalCallback(); - return; - } - - if (!success) - { - callOriginalCallback(); - - if (pcloudProducer.attachedStimulusBuffers.size() > 0) { + if (attachedStimulusBuffers.size() > 0) { std::cerr << __func__ << ": Failed to assemble frame.\n"; } - return; + co_return; } - context->frameAssemblyResult = loop; + frameAssemblyResult = assembleResult.loop; // Check if intensity buffer is attached and acquire frame if so - if (auto intensityBuff = pcloudProducer.intensityStimulusBuffer.load( + if (auto intensityBuff = intensityStimulusBuffer.load( std::memory_order_acquire)) { size_t intensityRingbuffIndex = intensityBuff ->ringBuffer.getIndexToProduceInto(); - StimulusFrame& intensityStimFrame = intensityBuff + StimulusFrame& intensityStimFrameRef = intensityBuff ->ringBuffer.getDataAtSlot( intensityRingbuffIndex); - intensityStimFrame.lock.writeAcquire(); - context->intensityStimFrame = std::make_optional( - std::ref(intensityStimFrame)); + intensityStimFrameRef.lock.writeAcquire(); + intensityStimFrame = std::make_optional( + std::ref(intensityStimFrameRef)); } else { - context->intensityStimFrame = std::nullopt; + intensityStimFrame = std::nullopt; } // Check if light ambience buffer is attached and acquire frame if so - std::optional lightAmbienceProductionDescDesc; if (auto lightAmbienceBuff = - pcloudProducer.lightAmbienceStimulusBuffer.load( + lightAmbienceStimulusBuffer.load( std::memory_order_acquire)) { size_t lightAmbienceRingbuffIndex = lightAmbienceBuff ->ringBuffer.getIndexToProduceInto(); - StimulusFrame& lightAmbienceStimFrame = lightAmbienceBuff + StimulusFrame& lightAmbienceStimFrameRef = lightAmbienceBuff ->ringBuffer.getDataAtSlot(lightAmbienceRingbuffIndex); - lightAmbienceStimFrame.lock.writeAcquire(); - context->lightAmbienceStimFrame = std::make_optional( - std::ref(lightAmbienceStimFrame)); + lightAmbienceStimFrameRef.lock.writeAcquire(); + lightAmbienceStimFrame = std::make_optional( + std::ref(lightAmbienceStimFrameRef)); lightAmbienceProductionDescDesc = AmbienceProductionDesc{ - std::ref(lightAmbienceStimFrame), + std::ref(lightAmbienceStimFrameRef), lightAmbienceBuff->passbandCountGtComparator}; } else { - context->lightAmbienceStimFrame = std::nullopt; + lightAmbienceStimFrame = std::nullopt; } // Check if dark ambience buffer is attached and acquire frame if so - std::optional darkAmbienceProductionDescDesc; if (auto darkAmbienceBuff = - pcloudProducer.darkAmbienceStimulusBuffer.load( + darkAmbienceStimulusBuffer.load( std::memory_order_acquire)) { size_t darkAmbienceRingbuffIndex = darkAmbienceBuff ->ringBuffer.getIndexToProduceInto(); - StimulusFrame& darkAmbienceStimFrame = darkAmbienceBuff + StimulusFrame& darkAmbienceStimFrameRef = darkAmbienceBuff ->ringBuffer.getDataAtSlot(darkAmbienceRingbuffIndex); - darkAmbienceStimFrame.lock.writeAcquire(); - context->darkAmbienceStimFrame = std::make_optional( - std::ref(darkAmbienceStimFrame)); + darkAmbienceStimFrameRef.lock.writeAcquire(); + darkAmbienceStimFrame = std::make_optional( + std::ref(darkAmbienceStimFrameRef)); darkAmbienceProductionDescDesc = AmbienceProductionDesc{ - std::ref(darkAmbienceStimFrame), + std::ref(darkAmbienceStimFrameRef), darkAmbienceBuff->passbandCountLtComparator}; } else { - context->darkAmbienceStimFrame = std::nullopt; + darkAmbienceStimFrame = std::nullopt; } - - pcloudProducer.openClCollatingAndMeshingEngine.compactCollateAndMeshFrameReq( - loop, stimulusFrame, - context->intensityStimFrame, - std::move(lightAmbienceProductionDescDesc), - std::move(darkAmbienceProductionDescDesc), - {context, std::bind( - &ProduceFrameReq::produceFrameReq3_compactCollateDone, - context.get(), context, - std::placeholders::_1, std::placeholders::_2)}); } - void produceFrameReq3_compactCollateDone( - [[maybe_unused]] std::shared_ptr context, - bool success, StimulusFrame& /*stimulusFrame*/) - { + bool compactCollateSuccess = co_await + openClCollatingAndMeshingEngine.compactCollateAndMeshFrameCReq( + frameAssemblyResult, stimulusFrame, + intensityStimFrame, + std::move(lightAmbienceProductionDescDesc), + std::move(darkAmbienceProductionDescDesc)); + + // produceFrameReq3_compactCollateDone #if SMO_DEBUG_PCLOUD_AMBIENCE_INTRIN uint32_t logLightPassbandCount = 0; uint32_t logDarkPassbandCount = 0; bool logLightAmbience = false; bool logDarkAmbience = false; - if (success) + if (compactCollateSuccess) { - if (context->lightAmbienceStimFrame.has_value()) + if (lightAmbienceStimFrame.has_value()) { logLightPassbandCount = *reinterpret_cast( - context->lightAmbienceStimFrame->get().slotDesc.vaddr); + lightAmbienceStimFrame->get().slotDesc.vaddr); logLightAmbience = true; } - if (context->darkAmbienceStimFrame.has_value()) + if (darkAmbienceStimFrame.has_value()) { logDarkPassbandCount = *reinterpret_cast( - context->darkAmbienceStimFrame->get().slotDesc.vaddr); + darkAmbienceStimFrame->get().slotDesc.vaddr); logDarkAmbience = true; } } #endif - // Release intensity frame if it was used - if (context->intensityStimFrame.has_value()) { - context->intensityStimFrame->get().lock.writeRelease(); - } - // Release ambience frames if they were used - if (context->lightAmbienceStimFrame.has_value()) { - context->lightAmbienceStimFrame->get().lock.writeRelease(); - } - if (context->darkAmbienceStimFrame.has_value()) { - context->darkAmbienceStimFrame->get().lock.writeRelease(); - } + // Release intensity frame if it was used + if (intensityStimFrame.has_value()) { + intensityStimFrame->get().lock.writeRelease(); + } + // Release ambience frames if they were used + if (lightAmbienceStimFrame.has_value()) { + lightAmbienceStimFrame->get().lock.writeRelease(); + } + if (darkAmbienceStimFrame.has_value()) { + darkAmbienceStimFrame->get().lock.writeRelease(); + } - sscl::SpinLock::Guard guard( - pcloudProducer.stimulusProducerCanceler.s.lock); - if (pcloudProducer.stimulusProducerCanceler - .isCancellationRequestedUnlocked()) - { - callOriginalCallback(); - return; - } + { + sscl::SpinLock::Guard guard(stimulusProducerCanceler.s.lock); + if (stimulusProducerCanceler.isCancellationRequestedUnlocked()) + { co_return; } - if (!success) { - std::cerr << __func__ << ": Failed to compact and collate frame" << std::endl; + if (!compactCollateSuccess) { + std::cerr << "produceFrameReq3_compactCollateDone" + << ": Failed to compact and collate frame" << std::endl; } else { guard.unlockPrematurely(); - if (pcloudProducer.pcloudFrameDumper.isEnabled()) + if (pcloudFrameDumper.isEnabled()) { try { - pcloudProducer.pcloudFrameDumper.dumpProducedFrame( - *pcloudProducer.device, - pcloudProducer.collationBuffer, - context->frameAssemblyResult); + pcloudFrameDumper.dumpProducedFrame( + *device, collationBuffer, + frameAssemblyResult); } catch (const std::exception& e) { @@ -653,7 +641,7 @@ public: if (logLightAmbience) { auto lightBuff = - pcloudProducer.lightAmbienceStimulusBuffer.load( + lightAmbienceStimulusBuffer.load( std::memory_order_acquire); if (lightBuff) { @@ -687,7 +675,7 @@ public: if (logDarkAmbience) { auto darkBuff = - pcloudProducer.darkAmbienceStimulusBuffer.load( + darkAmbienceStimulusBuffer.load( std::memory_order_acquire); if (darkBuff) { @@ -728,12 +716,12 @@ public: std::chrono::duration_cast( logNow.time_since_epoch()) % 1000; auto assemblyDuration = - pcloudProducer.ioUringAssemblyEngine.getAssemblyDuration(); + ioUringAssemblyEngine.getAssemblyDuration(); auto compactDuration = - pcloudProducer.openClCollatingAndMeshingEngine + openClCollatingAndMeshingEngine .getCompactKernelDuration(); auto collateDuration = - pcloudProducer.openClCollatingAndMeshingEngine + openClCollatingAndMeshingEngine .getCollateKernelDuration(); std::cout << std::put_time(std::localtime(&logTime), "%T") << '.' << std::setfill('0') << std::setw(3) @@ -745,29 +733,9 @@ public: << "ms" << std::endl; #endif } - - callOriginalCallback(); } -}; -void PcloudStimulusProducer::produceFrameReq( - sscl::cps::Callback callback) -{ - /** EXPLANATION: - * We shouldn't acquire stimulusProducerCanceler.s.lock here because - * this function is called from - * StimulusProducer::stimFrameProductionTimesliceInd(), which is already - * holding the lock. - */ - auto caller = smoHooksPtr->ComponentThread_getSelf(); - auto request = std::make_shared( - *this, caller, std::move(callback)); - - // Post the doAssemble method to the component thread - boost::asio::post(device->componentThread->getIoContext(), - STC(std::bind( - &ProduceFrameReq::produceFrameReq1_doAssemble_posted, - request.get(), request))); + co_return; } } // namespace stim_buff diff --git a/stimBuffApis/livoxGen1/pcloudStimulusProducer.h b/stimBuffApis/livoxGen1/pcloudStimulusProducer.h index 43acdb2..b8ab308 100644 --- a/stimBuffApis/livoxGen1/pcloudStimulusProducer.h +++ b/stimBuffApis/livoxGen1/pcloudStimulusProducer.h @@ -3,11 +3,12 @@ #include #include +#include +#include #include #include #include -#include -#include +#include #include #include "ioUringAssemblyEngine.h" #include "livoxPcloudFrameDumper.h" @@ -83,12 +84,13 @@ public: protected: void stimFrameProductionTimesliceInd() override; - // Callback function type for produceFrameReq - typedef std::function produceFrameReqCbFn; + void holdProduceFrameCReq(); + + sscl::co::NonViralNonPostingInvoker produceFrameCReq( + std::exception_ptr& exceptionPtr, + std::function completion); public: - void produceFrameReq(sscl::cps::Callback callback); - size_t nDgramsPerStagingBufferFrame; std::shared_ptr device; PcloudFormatDesc formatDesc; @@ -111,8 +113,8 @@ public: std::atomic> darkAmbienceStimulusBuffer; -private: - class ProduceFrameReq; + std::optional activeProduceFrameInvoker; + std::exception_ptr produceFrameExceptionPtr; }; } // namespace stim_buff