OClCollMeshEngn,PcloudStimProd: port to sscl::co coros

We've now ported the OpenClCollMeshEngn and PcloudStimProd::produceFrameReq
portions of the Livox pipeline to coros.
This commit is contained in:
2026-05-30 19:32:19 -04:00
parent 1cf1be4194
commit 35eb466a60
7 changed files with 398 additions and 373 deletions
@@ -0,0 +1,93 @@
#ifndef ADAPTERS_OPENCL_CL_KERNEL_COMPLETION_AREQ_H
#define ADAPTERS_OPENCL_CL_KERNEL_COMPLETION_AREQ_H
#include <atomic>
#include <coroutine>
#include <functional>
#include <memory>
#define CL_TARGET_OPENCL_VERSION 120
#include <CL/cl.h>
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
namespace smo {
namespace openclBoundary {
/** Eager-start std::function callback -> coroutine adapter for OpenCL kernel
* completion (cl_int event_command_exec_status posted to io_context).
*/
template <typename StartFn>
class ClKernelCompletionAReq
{
public:
struct AsyncState
{
std::atomic<bool> settled{false};
cl_int result{};
std::coroutine_handle<> callerSchedHandle;
};
explicit ClKernelCompletionAReq(
boost::asio::io_context &resumeIoContext,
StartFn startFn)
: asyncState(std::make_shared<AsyncState>()),
resumeIoContext(resumeIoContext)
{
startFn([this](cl_int eventCommandExecStatus)
{
asyncState->result = eventCommandExecStatus;
signalSettledAndResumeCaller();
});
}
bool await_ready() const noexcept
{
return asyncState->settled.load(std::memory_order_acquire);
}
bool await_suspend(std::coroutine_handle<> callerSchedHandle) noexcept
{
if (asyncState->settled.load(std::memory_order_acquire)) {
return false;
}
asyncState->callerSchedHandle = callerSchedHandle;
return true;
}
cl_int await_resume() noexcept
{
return asyncState->result;
}
private:
void signalSettledAndResumeCaller() noexcept
{
asyncState->settled.store(true, std::memory_order_release);
std::coroutine_handle<> handle = asyncState->callerSchedHandle;
if (!handle) {
return;
}
boost::asio::post(resumeIoContext, handle);
}
std::shared_ptr<AsyncState> asyncState;
boost::asio::io_context &resumeIoContext;
};
template <typename StartFn>
ClKernelCompletionAReq<StartFn> getClKernelCompletionAReqAwaiter(
boost::asio::io_context &resumeIoContext,
StartFn startFn)
{
return ClKernelCompletionAReq<StartFn>(resumeIoContext, std::move(startFn));
}
} // namespace openclBoundary
} // namespace smo
#endif // ADAPTERS_OPENCL_CL_KERNEL_COMPLETION_AREQ_H
+44
View File
@@ -0,0 +1,44 @@
#ifndef ADAPTERS_SMO_ASSEMBLE_FRAME_AREQ_H
#define ADAPTERS_SMO_ASSEMBLE_FRAME_AREQ_H
#include <adapters/smo/cpsCallbackAReq.h>
#include <spinscale/asynchronousLoop.h>
#include <spinscale/cps/callback.h>
#include <ioUringAssemblyEngine.h>
namespace smo {
namespace cpsBoundary {
struct AssembleFrameResult
{
bool success = false;
sscl::AsynchronousLoop loop{0};
};
inline CpsCallbackAReq<
AssembleFrameResult,
stim_buff::IoUringAssemblyEngine::assembleFrameReqCbFn,
std::function<void(sscl::cps::Callback<
stim_buff::IoUringAssemblyEngine::assembleFrameReqCbFn>)>>
getAssembleFrameReqAReqAwaiter(
boost::asio::io_context &resumeIoContext,
stim_buff::IoUringAssemblyEngine &engine)
{
return CpsCallbackAReq<
AssembleFrameResult,
stim_buff::IoUringAssemblyEngine::assembleFrameReqCbFn,
std::function<void(sscl::cps::Callback<
stim_buff::IoUringAssemblyEngine::assembleFrameReqCbFn>)>>(
resumeIoContext,
[&engine](
sscl::cps::Callback<
stim_buff::IoUringAssemblyEngine::assembleFrameReqCbFn> cb)
{
engine.assembleFrameReq(std::move(cb));
});
}
} // namespace cpsBoundary
} // namespace smo
#endif // ADAPTERS_SMO_ASSEMBLE_FRAME_AREQ_H
+1
View File
@@ -38,6 +38,7 @@ if(ENABLE_STIMBUFFAPI_livoxGen1)
${Boost_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS}
${CMAKE_SOURCE_DIR}/include ${CMAKE_SOURCE_DIR}/include
${CMAKE_SOURCE_DIR}/smocore/include ${CMAKE_SOURCE_DIR}/smocore/include
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_SOURCE_DIR}/commonLibs ${CMAKE_SOURCE_DIR}/commonLibs
${URING_INCLUDE_DIRS} ${URING_INCLUDE_DIRS}
${OPENCL_INCLUDE_DIRS} ${OPENCL_INCLUDE_DIRS}
@@ -8,10 +8,10 @@
#include <algorithm> #include <algorithm>
#include <boost/system/error_code.hpp> #include <boost/system/error_code.hpp>
#include <boost/asio/deadline_timer.hpp> #include <boost/asio/deadline_timer.hpp>
#include <spinscale/cps/asynchronousContinuation.h>
#include <spinscale/cps/asynchronousBridge.h>
#include <spinscale/cps/callback.h>
#include <spinscale/asynchronousLoop.h> #include <spinscale/asynchronousLoop.h>
#include <spinscale/co/invokers.h>
#include <spinscale/cps/asynchronousBridge.h>
#include <adapters/opencl/clKernelCompletionAReq.h>
#include <componentThread.h> #include <componentThread.h>
#include <user/stimulusFrame.h> #include <user/stimulusFrame.h>
#include <livoxProto1/device.h> #include <livoxProto1/device.h>
@@ -998,83 +998,65 @@ void OpenClCollatingAndMeshingEngine::produceAmbienceStimulusFrame(
passbandCountOut = passbandCount; passbandCountOut = passbandCount;
} }
class OpenClCollatingAndMeshingEngine::CompactCollateAndMeshFrameReq sscl::co::ViralNonPostingInvoker<bool>
: public sscl::cps::PostedAsynchronousContinuation<compactCollateAndMeshFrameReqCbFn> OpenClCollatingAndMeshingEngine::compactCollateAndMeshFrameCReq(
sscl::AsynchronousLoop& frameAssemblyResult,
StimulusFrame& /*stimulusFrame*/,
std::optional<std::reference_wrapper<StimulusFrame>> intensityStimFrame,
std::optional<AmbienceProductionDesc> lightAmbienceProductionDesc,
std::optional<AmbienceProductionDesc> darkAmbienceProductionDesc)
{ {
private:
OpenClCollatingAndMeshingEngine& engine;
sscl::AsynchronousLoop frameAssemblyResult;
StimulusFrame& stimulusFrame;
std::optional<std::reference_wrapper<StimulusFrame>> intensityStimFrame;
std::optional<AmbienceProductionDesc> lightAmbienceProductionDesc;
std::optional<AmbienceProductionDesc> darkAmbienceProductionDesc;
public:
CompactCollateAndMeshFrameReq(
OpenClCollatingAndMeshingEngine& engine_,
sscl::AsynchronousLoop& asyncLoop,
StimulusFrame& stimulusFrame_,
std::optional<std::reference_wrapper<StimulusFrame>> intensityStimFrame_,
std::optional<AmbienceProductionDesc> lightAmbienceProductionDesc_,
std::optional<AmbienceProductionDesc> darkAmbienceProductionDesc_,
const std::shared_ptr<sscl::ComponentThread>& caller,
sscl::cps::Callback<compactCollateAndMeshFrameReqCbFn> cb)
: sscl::cps::PostedAsynchronousContinuation<compactCollateAndMeshFrameReqCbFn>(
caller, cb),
engine(engine_),
frameAssemblyResult(asyncLoop), stimulusFrame(stimulusFrame_),
intensityStimFrame(intensityStimFrame_),
lightAmbienceProductionDesc(std::move(lightAmbienceProductionDesc_)),
darkAmbienceProductionDesc(std::move(darkAmbienceProductionDesc_))
{}
bool anyAmbienceAttached() const
{ {
return lightAmbienceProductionDesc.has_value() sscl::SpinLock::Guard guard(openClCollMeshEngnCanceler.s.lock);
if (openClCollMeshEngnCanceler.isCancellationRequestedUnlocked())
{ co_return false; }
}
auto& resumeIoContext = parent.device->componentThread->getIoContext();
bool needsCompaction = IoUringAssemblyEngine::compactionIsNeeded(
frameAssemblyResult.nSucceeded.load(), frameAssemblyResult.nTotal);
bool anyAmbienceAttached = lightAmbienceProductionDesc.has_value()
|| darkAmbienceProductionDesc.has_value(); || darkAmbienceProductionDesc.has_value();
}
public: if (needsCompaction)
void callOriginalCallback(bool success)
{ callOriginalCb(success, std::ref(stimulusFrame)); }
public:
void compactCollateAndMeshFrameReq1_doCompact_posted(
std::shared_ptr<CompactCollateAndMeshFrameReq> context)
{ {
sscl::SpinLock::Guard guard(engine.openClCollMeshEngnCanceler.s.lock); // compactCollateAndMeshFrameReq1_doCompact_posted
if (engine.openClCollMeshEngnCanceler.isCancellationRequestedUnlocked())
{ {
callOriginalCallback(false); sscl::SpinLock::Guard guard(openClCollMeshEngnCanceler.s.lock);
return; if (openClCollMeshEngnCanceler.isCancellationRequestedUnlocked())
} { co_return false; }
// Record compact kernel start time // Record compact kernel start time
engine.compactKernelStartTime = std::chrono::high_resolution_clock::now(); compactKernelStartTime = std::chrono::high_resolution_clock::now();
}
bool success = engine.startCompactKernel(
engine.parent.assemblyBuffer,
static_cast<uint32_t>(context->frameAssemblyResult.nSucceeded.load()),
std::bind(
&CompactCollateAndMeshFrameReq
::compactCollateAndMeshFrameReq2_compactDone_posted,
context.get(), context,
std::placeholders::_1));
cl_int compactStatus = co_await openclBoundary::getClKernelCompletionAReqAwaiter(
resumeIoContext,
[this, &frameAssemblyResult](std::function<void(cl_int)> completionCb)
{
bool success = startCompactKernel(
parent.assemblyBuffer,
static_cast<uint32_t>(
frameAssemblyResult.nSucceeded.load()),
std::move(completionCb));
if (!success) if (!success)
{ {
engine.compactKernelComplete(); compactKernelComplete();
callOriginalCallback(false); completionCb(CL_INVALID_OPERATION);
return;
} }
});
if (compactStatus == CL_INVALID_OPERATION)
{
co_return false;
} }
void compactCollateAndMeshFrameReq2_compactDone_posted( // compactCollateAndMeshFrameReq2_compactDone_posted
std::shared_ptr<CompactCollateAndMeshFrameReq> context,
cl_int compactStatus)
{ {
sscl::SpinLock::Guard guard(engine.openClCollMeshEngnCanceler.s.lock); sscl::SpinLock::Guard guard(openClCollMeshEngnCanceler.s.lock);
if (engine.openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) if (openClCollMeshEngnCanceler.isCancellationRequestedUnlocked())
{ {
/** EXPLANATION: /** EXPLANATION:
* We intentionally don't call compactKernelComplete() here because * We intentionally don't call compactKernelComplete() here because
@@ -1082,78 +1064,69 @@ public:
* finalize() will also be forced to call compactKernelComplete() * finalize() will also be forced to call compactKernelComplete()
* inside of finalize(). * inside of finalize().
*/ */
callOriginalCallback(false); co_return false;
return;
} }
engine.compactKernelComplete(); compactKernelComplete();
// Record compact kernel end time // Record compact kernel end time
engine.compactKernelEndTime = std::chrono::high_resolution_clock::now(); compactKernelEndTime = std::chrono::high_resolution_clock::now();
// If compact failed, call callback directly with failure // If compact failed, call callback directly with failure
if (compactStatus != CL_SUCCESS) if (compactStatus != CL_SUCCESS) {
{ co_return false;
callOriginalCallback(false);
return;
} }
#if 0 #if 0
// Print first 4 bytes of each slot // Print first 4 bytes of each slot
if (engine.frameAssemblyDesc) if (frameAssemblyDesc)
{ {
for (size_t i = 0; i < engine.frameAssemblyDesc->numSlots; ++i) { for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i) {
engine.parent.ioUringAssemblyEngine.printSlotBytes(i, 4); parent.ioUringAssemblyEngine.printSlotBytes(i, 4);
} }
} }
#endif #endif
}
guard.unlockPrematurely();
context->compactCollateAndMeshFrameReq3_doCollate_posted(context);
} }
void compactCollateAndMeshFrameReq3_doCollate_posted( // compactCollateAndMeshFrameReq3_doCollate_posted
std::shared_ptr<CompactCollateAndMeshFrameReq> context)
{ {
sscl::SpinLock::Guard guard(engine.openClCollMeshEngnCanceler.s.lock); sscl::SpinLock::Guard guard(openClCollMeshEngnCanceler.s.lock);
if (engine.openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) if (openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) {
{ co_return false;
callOriginalCallback(false);
return;
} }
// Record collate kernel start time // Record collate kernel start time
engine.collateKernelStartTime = std::chrono::high_resolution_clock::now(); collateKernelStartTime = std::chrono::high_resolution_clock::now();
}
bool success = engine.startCollateKernel(
context->intensityStimFrame, context->anyAmbienceAttached(),
std::bind(
&CompactCollateAndMeshFrameReq
::compactCollateAndMeshFrameReq4_collateDone_maybePosted,
context.get(), context,
std::placeholders::_1));
cl_int collateStatus = co_await openclBoundary::getClKernelCompletionAReqAwaiter(
resumeIoContext,
[this, intensityStimFrame, anyAmbienceAttached](
std::function<void(cl_int)> completionCb)
{
bool success = startCollateKernel(
intensityStimFrame, anyAmbienceAttached,
std::move(completionCb));
if (!success) if (!success)
{ {
engine.collateKernelComplete( collateKernelComplete(
context->intensityStimFrame, context->anyAmbienceAttached()); intensityStimFrame, anyAmbienceAttached);
completionCb(CL_INVALID_OPERATION);
callOriginalCallback(false);
return;
} }
});
if (collateStatus == CL_INVALID_OPERATION) {
co_return false;
} }
void compactCollateAndMeshFrameReq4_collateDone_maybePosted( // compactCollateAndMeshFrameReq4_collateDone_maybePosted
[[maybe_unused]] std::shared_ptr<CompactCollateAndMeshFrameReq> context,
cl_int collateStatus)
{
sscl::SpinLock::Guard guard(engine.openClCollMeshEngnCanceler.s.lock);
if (engine.openClCollMeshEngnCanceler.isCancellationRequestedUnlocked())
{ {
sscl::SpinLock::Guard guard(openClCollMeshEngnCanceler.s.lock);
if (openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) {
/* We intentionally don't call collateKernelComplete() here for the /* We intentionally don't call collateKernelComplete() here for the
* same reason as above. * same reason as above.
*/ */
callOriginalCallback(false); co_return false;
return;
} }
/** EXPLANATION: /** EXPLANATION:
@@ -1165,55 +1138,51 @@ public:
* Therefore it's finalize()'s responsibility to ensure that it properly * Therefore it's finalize()'s responsibility to ensure that it properly
* completes/cleans up any in-flight operations. * completes/cleans up any in-flight operations.
*/ */
engine.collateKernelComplete( collateKernelComplete(
context->intensityStimFrame, context->anyAmbienceAttached()); intensityStimFrame, anyAmbienceAttached);
// Produce each attached ambience stimbuff's passband count from // Produce each attached ambience stimbuff's passband count from
// the per-slot averages the collate kernel staged. // the per-slot averages the collate kernel staged.
uint32_t nSucceededForAmbience = uint32_t nSucceededForAmbience =
context->frameAssemblyResult.nSucceeded.load(); frameAssemblyResult.nSucceeded.load();
if (context->lightAmbienceProductionDesc.has_value()) if (lightAmbienceProductionDesc.has_value())
{ {
engine.produceAmbienceStimulusFrame( produceAmbienceStimulusFrame(
context->lightAmbienceProductionDesc->frame.get(), lightAmbienceProductionDesc->frame.get(),
context->lightAmbienceProductionDesc->comparator, lightAmbienceProductionDesc->comparator,
nSucceededForAmbience); nSucceededForAmbience);
} }
if (context->darkAmbienceProductionDesc.has_value()) if (darkAmbienceProductionDesc.has_value())
{ {
engine.produceAmbienceStimulusFrame( produceAmbienceStimulusFrame(
context->darkAmbienceProductionDesc->frame.get(), darkAmbienceProductionDesc->frame.get(),
context->darkAmbienceProductionDesc->comparator, darkAmbienceProductionDesc->comparator,
nSucceededForAmbience); nSucceededForAmbience);
} }
// Record collate kernel end time // Record collate kernel end time
engine.collateKernelEndTime = std::chrono::high_resolution_clock::now(); collateKernelEndTime = std::chrono::high_resolution_clock::now();
bool success = (collateStatus == CL_SUCCESS);
// Early callback + return pattern // Early callback + return pattern
if (!success) bool success = (collateStatus == CL_SUCCESS);
{ if (!success) { co_return false; }
callOriginalCallback(false);
return;
}
uint32_t nSucceeded = context->frameAssemblyResult.nSucceeded.load(); uint32_t nSucceeded = frameAssemblyResult.nSucceeded.load();
int returnMode = static_cast<int>(engine.parent.device->currentReturnMode); int returnMode = static_cast<int>(parent.device->currentReturnMode);
size_t pointsPerDgram = livoxProto1::Device::getNPointsPerDgram( size_t pointsPerDgram = livoxProto1::Device::getNPointsPerDgram(
returnMode); returnMode);
size_t totalPoints = nSucceeded * pointsPerDgram; size_t totalPoints = nSucceeded * pointsPerDgram;
// Count points with intensity greater than 116 // Count points with intensity greater than 116
size_t highIntensityCount = 0; size_t highIntensityCount = 0;
if (context->intensityStimFrame.has_value()) if (intensityStimFrame.has_value())
{ {
StimulusFrame& intensityFrame = context->intensityStimFrame->get(); StimulusFrame& intensityFrame = intensityStimFrame->get();
float* intensityFloats = reinterpret_cast<float*>(intensityFrame.slotDesc.vaddr); float* intensityFloats = reinterpret_cast<float*>(
intensityFrame.slotDesc.vaddr);
for (size_t i = 0; i < totalPoints; ++i) for (size_t i = 0; i < totalPoints; ++i)
{ {
float intensity = intensityFloats[i]; float intensity = intensityFloats[i];
@@ -1227,62 +1196,15 @@ public:
#if 0 #if 0
std::cout << __func__ << ": intensityRingBufferIndex=" std::cout << __func__ << ": intensityRingBufferIndex="
<< (context->intensityStimFrame.has_value() ? << (intensityStimFrame.has_value() ?
context->intensityStimFrame->get().ringBufferIndex : SIZE_MAX) intensityStimFrame->get().ringBufferIndex : SIZE_MAX)
<< ", pointsPerDgram=" << pointsPerDgram << ", pointsPerDgram=" << pointsPerDgram
<< ", nSucceeded=" << nSucceeded << ", nSucceeded=" << nSucceeded
<< ", totalPoints=" << totalPoints << ", totalPoints=" << totalPoints
<< ", highIntensityCount=" << highIntensityCount << std::endl; << ", highIntensityCount=" << highIntensityCount << std::endl;
#endif #endif
callOriginalCallback(success); co_return success;
}
};
void OpenClCollatingAndMeshingEngine::compactCollateAndMeshFrameReq(
sscl::AsynchronousLoop& asyncLoop, StimulusFrame& stimulusFrame,
std::optional<std::reference_wrapper<StimulusFrame>> intensityStimFrame,
std::optional<AmbienceProductionDesc> lightAmbienceProductionDesc,
std::optional<AmbienceProductionDesc> darkAmbienceProductionDesc,
sscl::cps::Callback<compactCollateAndMeshFrameReqCbFn> callback)
{
{
sscl::SpinLock::Guard guard(openClCollMeshEngnCanceler.s.lock);
if (openClCollMeshEngnCanceler.isCancellationRequestedUnlocked())
{
callback.callbackFn(false, stimulusFrame);
return;
}
}
auto caller = smoHooksPtr->ComponentThread_getSelf();
auto request = std::make_shared<CompactCollateAndMeshFrameReq>(
*this, asyncLoop, stimulusFrame, intensityStimFrame,
std::move(lightAmbienceProductionDesc), std::move(darkAmbienceProductionDesc),
caller,
std::move(callback));
// Check if compaction is needed
bool needsCompaction = IoUringAssemblyEngine::compactionIsNeeded(
asyncLoop.nSucceeded.load(), asyncLoop.nTotal);
// Start with compaction if needed, then chain to collation
if (needsCompaction)
{
boost::asio::post(parent.device->componentThread->getIoContext(),
STC(std::bind(
&CompactCollateAndMeshFrameReq
::compactCollateAndMeshFrameReq1_doCompact_posted,
request.get(), request)));
}
else
{
// Skip compaction, go straight to collation
boost::asio::post(parent.device->componentThread->getIoContext(),
STC(std::bind(
&CompactCollateAndMeshFrameReq
::compactCollateAndMeshFrameReq3_doCollate_posted,
request.get(), request)));
} }
} }
@@ -13,7 +13,7 @@
#define CL_TARGET_OPENCL_VERSION 120 #define CL_TARGET_OPENCL_VERSION 120
#include <CL/cl.h> #include <CL/cl.h>
#include <spinscale/asynchronousLoop.h> #include <spinscale/asynchronousLoop.h>
#include <spinscale/cps/callback.h> #include <spinscale/co/invokers.h>
#include <spinscale/syncCancelerForAsyncWork.h> #include <spinscale/syncCancelerForAsyncWork.h>
#include <user/stimulusFrame.h> #include <user/stimulusFrame.h>
#include <user/stagingBuffer.h> #include <user/stagingBuffer.h>
@@ -86,14 +86,12 @@ public:
bool setup(); bool setup();
void finalize(); void finalize();
typedef std::function<void(bool, StimulusFrame&)> sscl::co::ViralNonPostingInvoker<bool> compactCollateAndMeshFrameCReq(
compactCollateAndMeshFrameReqCbFn; sscl::AsynchronousLoop& frameAssemblyResult,
void compactCollateAndMeshFrameReq( StimulusFrame& stimulusFrame,
sscl::AsynchronousLoop& asyncLoop, StimulusFrame& stimulusFrame,
std::optional<std::reference_wrapper<StimulusFrame>> intensityStimFrame, std::optional<std::reference_wrapper<StimulusFrame>> intensityStimFrame,
std::optional<AmbienceProductionDesc> lightAmbienceProductionDesc, std::optional<AmbienceProductionDesc> lightAmbienceProductionDesc,
std::optional<AmbienceProductionDesc> darkAmbienceProductionDesc, std::optional<AmbienceProductionDesc> darkAmbienceProductionDesc);
sscl::cps::Callback<compactCollateAndMeshFrameReqCbFn> callback);
private: private:
// Callback function types // Callback function types
@@ -214,9 +212,6 @@ private:
bool mapAverageIntensityBuffer(cl_map_flags mapFlags = CL_MAP_READ); bool mapAverageIntensityBuffer(cl_map_flags mapFlags = CL_MAP_READ);
bool unmapAverageIntensityBuffer(); bool unmapAverageIntensityBuffer();
// Forward declaration for continuation class
class CompactCollateAndMeshFrameReq;
// Unified kernel start function // Unified kernel start function
template<typename SetupArgsFn, typename ValidateBuffersFn> template<typename SetupArgsFn, typename ValidateBuffersFn>
bool startKernel( bool startKernel(
+113 -145
View File
@@ -11,6 +11,8 @@
#include <user/spMcRingBuffer.h> #include <user/spMcRingBuffer.h>
#include <componentThread.h> #include <componentThread.h>
#include <spinscale/asynchronousLoop.h> #include <spinscale/asynchronousLoop.h>
#include <spinscale/co/invokers.h>
#include <adapters/smo/assembleFrameAReq.h>
#include <user/stimulusFrame.h> #include <user/stimulusFrame.h>
#include <user/frameAssemblyDesc.h> #include <user/frameAssemblyDesc.h>
#include <livoxProto1/device.h> #include <livoxProto1/device.h>
@@ -428,219 +430,205 @@ PcloudStimulusProducer::getOrCreateAttachedStimulusBuffer(
void PcloudStimulusProducer::stimFrameProductionTimesliceInd() void PcloudStimulusProducer::stimFrameProductionTimesliceInd()
{ {
produceFrameReq({nullptr, nullptr}); holdProduceFrameCReq();
} }
class PcloudStimulusProducer::ProduceFrameReq struct AllowNextStimulusFrameGuard
: public sscl::cps::PostedAsynchronousContinuation<produceFrameReqCbFn>
{ {
private: PcloudStimulusProducer& producer;
PcloudStimulusProducer& pcloudProducer;
sscl::AsynchronousLoop frameAssemblyResult; explicit AllowNextStimulusFrameGuard(PcloudStimulusProducer& _producer)
StimulusFrame& stimulusFrame; : producer(_producer)
{}
~AllowNextStimulusFrameGuard()
{ producer.allowNextStimulusFrame(); }
};
void PcloudStimulusProducer::holdProduceFrameCReq()
{
/** EXPLANATION:
* We shouldn't acquire stimulusProducerCanceler.s.lock here because
* this function is called from
* StimulusProducer::stimFrameProductionTimesliceInd(), which is already
* holding the lock.
*/
activeProduceFrameInvoker.emplace(produceFrameCReq(
produceFrameExceptionPtr,
[this]() { activeProduceFrameInvoker.reset(); }));
}
sscl::co::NonViralNonPostingInvoker PcloudStimulusProducer::produceFrameCReq(
[[maybe_unused]] std::exception_ptr& exceptionPtr,
[[maybe_unused]] std::function<void()> completion)
{
AllowNextStimulusFrameGuard allowNextGuard(*this);
StimulusFrame& stimulusFrame = tempStimulusFrame;
sscl::AsynchronousLoop frameAssemblyResult(0);
std::optional<std::reference_wrapper<StimulusFrame>> intensityStimFrame; std::optional<std::reference_wrapper<StimulusFrame>> intensityStimFrame;
std::optional<std::reference_wrapper<StimulusFrame>> lightAmbienceStimFrame; std::optional<std::reference_wrapper<StimulusFrame>> lightAmbienceStimFrame;
std::optional<std::reference_wrapper<StimulusFrame>> darkAmbienceStimFrame; std::optional<std::reference_wrapper<StimulusFrame>> darkAmbienceStimFrame;
public: auto& resumeIoContext = device->componentThread->getIoContext();
ProduceFrameReq(
PcloudStimulusProducer& producer,
const std::shared_ptr<sscl::ComponentThread>& caller,
sscl::cps::Callback<produceFrameReqCbFn> cb)
: sscl::cps::PostedAsynchronousContinuation<produceFrameReqCbFn>(caller, cb),
pcloudProducer(producer),
frameAssemblyResult(0),
stimulusFrame(producer.tempStimulusFrame)
{}
public: // produceFrameReq1_doAssemble_posted
void callOriginalCallback() /** EXPLANATION:
{ * stimFrameProductionTimesliceInd() is entered with
pcloudProducer.allowNextStimulusFrame(); * stimulusProducerCanceler.s.lock held; do not re-acquire here.
callOriginalCb(); *
} * This function is called from
* StimulusProducer::stimFrameProductionTimesliceInd(), whose caller is
* already holding the lock.
*/
if (stimulusProducerCanceler.isCancellationRequestedUnlocked())
{ co_return; }
public: cpsBoundary::AssembleFrameResult assembleResult = co_await
void produceFrameReq1_doAssemble_posted( cpsBoundary::getAssembleFrameReqAReqAwaiter(
std::shared_ptr<ProduceFrameReq> context) resumeIoContext, ioUringAssemblyEngine);
{
sscl::SpinLock::Guard guard(
pcloudProducer.stimulusProducerCanceler.s.lock);
if (pcloudProducer.stimulusProducerCanceler
.isCancellationRequestedUnlocked())
{
callOriginalCallback();
return;
}
pcloudProducer.ioUringAssemblyEngine.assembleFrameReq( // produceFrameReq2_assembleDone
{context, std::bind( std::optional<AmbienceProductionDesc> lightAmbienceProductionDescDesc;
&ProduceFrameReq::produceFrameReq2_assembleDone, std::optional<AmbienceProductionDesc> darkAmbienceProductionDescDesc;
context.get(), context,
std::placeholders::_1, std::placeholders::_2)});
}
void produceFrameReq2_assembleDone(
std::shared_ptr<ProduceFrameReq> context,
bool success, sscl::AsynchronousLoop loop)
{ {
sscl::SpinLock::Guard guard( sscl::SpinLock::Guard guard(stimulusProducerCanceler.s.lock);
pcloudProducer.stimulusProducerCanceler.s.lock); if (stimulusProducerCanceler.isCancellationRequestedUnlocked())
if (pcloudProducer.stimulusProducerCanceler { co_return; }
.isCancellationRequestedUnlocked())
{
callOriginalCallback();
return;
}
if (!success) if (!assembleResult.success)
{ {
callOriginalCallback(); if (attachedStimulusBuffers.size() > 0) {
if (pcloudProducer.attachedStimulusBuffers.size() > 0) {
std::cerr << __func__ << ": Failed to assemble frame.\n"; std::cerr << __func__ << ": Failed to assemble frame.\n";
} }
return; co_return;
} }
context->frameAssemblyResult = loop; frameAssemblyResult = assembleResult.loop;
// Check if intensity buffer is attached and acquire frame if so // Check if intensity buffer is attached and acquire frame if so
if (auto intensityBuff = pcloudProducer.intensityStimulusBuffer.load( if (auto intensityBuff = intensityStimulusBuffer.load(
std::memory_order_acquire)) std::memory_order_acquire))
{ {
size_t intensityRingbuffIndex = intensityBuff size_t intensityRingbuffIndex = intensityBuff
->ringBuffer.getIndexToProduceInto(); ->ringBuffer.getIndexToProduceInto();
StimulusFrame& intensityStimFrame = intensityBuff StimulusFrame& intensityStimFrameRef = intensityBuff
->ringBuffer.getDataAtSlot( ->ringBuffer.getDataAtSlot(
intensityRingbuffIndex); intensityRingbuffIndex);
intensityStimFrame.lock.writeAcquire(); intensityStimFrameRef.lock.writeAcquire();
context->intensityStimFrame = std::make_optional( intensityStimFrame = std::make_optional(
std::ref(intensityStimFrame)); std::ref(intensityStimFrameRef));
} }
else { else {
context->intensityStimFrame = std::nullopt; intensityStimFrame = std::nullopt;
} }
// Check if light ambience buffer is attached and acquire frame if so // Check if light ambience buffer is attached and acquire frame if so
std::optional<AmbienceProductionDesc> lightAmbienceProductionDescDesc;
if (auto lightAmbienceBuff = if (auto lightAmbienceBuff =
pcloudProducer.lightAmbienceStimulusBuffer.load( lightAmbienceStimulusBuffer.load(
std::memory_order_acquire)) std::memory_order_acquire))
{ {
size_t lightAmbienceRingbuffIndex = lightAmbienceBuff size_t lightAmbienceRingbuffIndex = lightAmbienceBuff
->ringBuffer.getIndexToProduceInto(); ->ringBuffer.getIndexToProduceInto();
StimulusFrame& lightAmbienceStimFrame = lightAmbienceBuff StimulusFrame& lightAmbienceStimFrameRef = lightAmbienceBuff
->ringBuffer.getDataAtSlot(lightAmbienceRingbuffIndex); ->ringBuffer.getDataAtSlot(lightAmbienceRingbuffIndex);
lightAmbienceStimFrame.lock.writeAcquire(); lightAmbienceStimFrameRef.lock.writeAcquire();
context->lightAmbienceStimFrame = std::make_optional( lightAmbienceStimFrame = std::make_optional(
std::ref(lightAmbienceStimFrame)); std::ref(lightAmbienceStimFrameRef));
lightAmbienceProductionDescDesc = AmbienceProductionDesc{ lightAmbienceProductionDescDesc = AmbienceProductionDesc{
std::ref(lightAmbienceStimFrame), std::ref(lightAmbienceStimFrameRef),
lightAmbienceBuff->passbandCountGtComparator}; lightAmbienceBuff->passbandCountGtComparator};
} }
else { else {
context->lightAmbienceStimFrame = std::nullopt; lightAmbienceStimFrame = std::nullopt;
} }
// Check if dark ambience buffer is attached and acquire frame if so // Check if dark ambience buffer is attached and acquire frame if so
std::optional<AmbienceProductionDesc> darkAmbienceProductionDescDesc;
if (auto darkAmbienceBuff = if (auto darkAmbienceBuff =
pcloudProducer.darkAmbienceStimulusBuffer.load( darkAmbienceStimulusBuffer.load(
std::memory_order_acquire)) std::memory_order_acquire))
{ {
size_t darkAmbienceRingbuffIndex = darkAmbienceBuff size_t darkAmbienceRingbuffIndex = darkAmbienceBuff
->ringBuffer.getIndexToProduceInto(); ->ringBuffer.getIndexToProduceInto();
StimulusFrame& darkAmbienceStimFrame = darkAmbienceBuff StimulusFrame& darkAmbienceStimFrameRef = darkAmbienceBuff
->ringBuffer.getDataAtSlot(darkAmbienceRingbuffIndex); ->ringBuffer.getDataAtSlot(darkAmbienceRingbuffIndex);
darkAmbienceStimFrame.lock.writeAcquire(); darkAmbienceStimFrameRef.lock.writeAcquire();
context->darkAmbienceStimFrame = std::make_optional( darkAmbienceStimFrame = std::make_optional(
std::ref(darkAmbienceStimFrame)); std::ref(darkAmbienceStimFrameRef));
darkAmbienceProductionDescDesc = AmbienceProductionDesc{ darkAmbienceProductionDescDesc = AmbienceProductionDesc{
std::ref(darkAmbienceStimFrame), std::ref(darkAmbienceStimFrameRef),
darkAmbienceBuff->passbandCountLtComparator}; darkAmbienceBuff->passbandCountLtComparator};
} }
else { else {
context->darkAmbienceStimFrame = std::nullopt; darkAmbienceStimFrame = std::nullopt;
}
} }
pcloudProducer.openClCollatingAndMeshingEngine.compactCollateAndMeshFrameReq( bool compactCollateSuccess = co_await
loop, stimulusFrame, openClCollatingAndMeshingEngine.compactCollateAndMeshFrameCReq(
context->intensityStimFrame, frameAssemblyResult, stimulusFrame,
intensityStimFrame,
std::move(lightAmbienceProductionDescDesc), std::move(lightAmbienceProductionDescDesc),
std::move(darkAmbienceProductionDescDesc), std::move(darkAmbienceProductionDescDesc));
{context, std::bind(
&ProduceFrameReq::produceFrameReq3_compactCollateDone,
context.get(), context,
std::placeholders::_1, std::placeholders::_2)});
}
void produceFrameReq3_compactCollateDone( // produceFrameReq3_compactCollateDone
[[maybe_unused]] std::shared_ptr<ProduceFrameReq> context,
bool success, StimulusFrame& /*stimulusFrame*/)
{
#if SMO_DEBUG_PCLOUD_AMBIENCE_INTRIN #if SMO_DEBUG_PCLOUD_AMBIENCE_INTRIN
uint32_t logLightPassbandCount = 0; uint32_t logLightPassbandCount = 0;
uint32_t logDarkPassbandCount = 0; uint32_t logDarkPassbandCount = 0;
bool logLightAmbience = false; bool logLightAmbience = false;
bool logDarkAmbience = false; bool logDarkAmbience = false;
if (success) if (compactCollateSuccess)
{ {
if (context->lightAmbienceStimFrame.has_value()) if (lightAmbienceStimFrame.has_value())
{ {
logLightPassbandCount = *reinterpret_cast<const uint32_t*>( logLightPassbandCount = *reinterpret_cast<const uint32_t*>(
context->lightAmbienceStimFrame->get().slotDesc.vaddr); lightAmbienceStimFrame->get().slotDesc.vaddr);
logLightAmbience = true; logLightAmbience = true;
} }
if (context->darkAmbienceStimFrame.has_value()) if (darkAmbienceStimFrame.has_value())
{ {
logDarkPassbandCount = *reinterpret_cast<const uint32_t*>( logDarkPassbandCount = *reinterpret_cast<const uint32_t*>(
context->darkAmbienceStimFrame->get().slotDesc.vaddr); darkAmbienceStimFrame->get().slotDesc.vaddr);
logDarkAmbience = true; logDarkAmbience = true;
} }
} }
#endif #endif
// Release intensity frame if it was used // Release intensity frame if it was used
if (context->intensityStimFrame.has_value()) { if (intensityStimFrame.has_value()) {
context->intensityStimFrame->get().lock.writeRelease(); intensityStimFrame->get().lock.writeRelease();
} }
// Release ambience frames if they were used // Release ambience frames if they were used
if (context->lightAmbienceStimFrame.has_value()) { if (lightAmbienceStimFrame.has_value()) {
context->lightAmbienceStimFrame->get().lock.writeRelease(); lightAmbienceStimFrame->get().lock.writeRelease();
} }
if (context->darkAmbienceStimFrame.has_value()) { if (darkAmbienceStimFrame.has_value()) {
context->darkAmbienceStimFrame->get().lock.writeRelease(); darkAmbienceStimFrame->get().lock.writeRelease();
} }
sscl::SpinLock::Guard guard(
pcloudProducer.stimulusProducerCanceler.s.lock);
if (pcloudProducer.stimulusProducerCanceler
.isCancellationRequestedUnlocked())
{ {
callOriginalCallback(); sscl::SpinLock::Guard guard(stimulusProducerCanceler.s.lock);
return; if (stimulusProducerCanceler.isCancellationRequestedUnlocked())
} { co_return; }
if (!success) { if (!compactCollateSuccess) {
std::cerr << __func__ << ": Failed to compact and collate frame" << std::endl; std::cerr << "produceFrameReq3_compactCollateDone"
<< ": Failed to compact and collate frame" << std::endl;
} else } else
{ {
guard.unlockPrematurely(); guard.unlockPrematurely();
if (pcloudProducer.pcloudFrameDumper.isEnabled()) if (pcloudFrameDumper.isEnabled())
{ {
try try
{ {
pcloudProducer.pcloudFrameDumper.dumpProducedFrame( pcloudFrameDumper.dumpProducedFrame(
*pcloudProducer.device, *device, collationBuffer,
pcloudProducer.collationBuffer, frameAssemblyResult);
context->frameAssemblyResult);
} }
catch (const std::exception& e) catch (const std::exception& e)
{ {
@@ -653,7 +641,7 @@ public:
if (logLightAmbience) if (logLightAmbience)
{ {
auto lightBuff = auto lightBuff =
pcloudProducer.lightAmbienceStimulusBuffer.load( lightAmbienceStimulusBuffer.load(
std::memory_order_acquire); std::memory_order_acquire);
if (lightBuff) if (lightBuff)
{ {
@@ -687,7 +675,7 @@ public:
if (logDarkAmbience) if (logDarkAmbience)
{ {
auto darkBuff = auto darkBuff =
pcloudProducer.darkAmbienceStimulusBuffer.load( darkAmbienceStimulusBuffer.load(
std::memory_order_acquire); std::memory_order_acquire);
if (darkBuff) if (darkBuff)
{ {
@@ -728,12 +716,12 @@ public:
std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::duration_cast<std::chrono::milliseconds>(
logNow.time_since_epoch()) % 1000; logNow.time_since_epoch()) % 1000;
auto assemblyDuration = auto assemblyDuration =
pcloudProducer.ioUringAssemblyEngine.getAssemblyDuration(); ioUringAssemblyEngine.getAssemblyDuration();
auto compactDuration = auto compactDuration =
pcloudProducer.openClCollatingAndMeshingEngine openClCollatingAndMeshingEngine
.getCompactKernelDuration(); .getCompactKernelDuration();
auto collateDuration = auto collateDuration =
pcloudProducer.openClCollatingAndMeshingEngine openClCollatingAndMeshingEngine
.getCollateKernelDuration(); .getCollateKernelDuration();
std::cout << std::put_time(std::localtime(&logTime), "%T") std::cout << std::put_time(std::localtime(&logTime), "%T")
<< '.' << std::setfill('0') << std::setw(3) << '.' << std::setfill('0') << std::setw(3)
@@ -745,29 +733,9 @@ public:
<< "ms" << std::endl; << "ms" << std::endl;
#endif #endif
} }
callOriginalCallback();
} }
};
void PcloudStimulusProducer::produceFrameReq( co_return;
sscl::cps::Callback<produceFrameReqCbFn> callback)
{
/** EXPLANATION:
* We shouldn't acquire stimulusProducerCanceler.s.lock here because
* this function is called from
* StimulusProducer::stimFrameProductionTimesliceInd(), which is already
* holding the lock.
*/
auto caller = smoHooksPtr->ComponentThread_getSelf();
auto request = std::make_shared<ProduceFrameReq>(
*this, caller, std::move(callback));
// Post the doAssemble method to the component thread
boost::asio::post(device->componentThread->getIoContext(),
STC(std::bind(
&ProduceFrameReq::produceFrameReq1_doAssemble_posted,
request.get(), request)));
} }
} // namespace stim_buff } // namespace stim_buff
@@ -3,11 +3,12 @@
#include <functional> #include <functional>
#include <atomic> #include <atomic>
#include <optional>
#include <exception>
#include <user/stimulusProducer.h> #include <user/stimulusProducer.h>
#include <user/stimulusFrame.h> #include <user/stimulusFrame.h>
#include <livoxProto1/device.h> #include <livoxProto1/device.h>
#include <spinscale/cps/asynchronousContinuation.h> #include <spinscale/co/invokers.h>
#include <spinscale/cps/callback.h>
#include <user/stagingBuffer.h> #include <user/stagingBuffer.h>
#include "ioUringAssemblyEngine.h" #include "ioUringAssemblyEngine.h"
#include "livoxPcloudFrameDumper.h" #include "livoxPcloudFrameDumper.h"
@@ -83,12 +84,13 @@ public:
protected: protected:
void stimFrameProductionTimesliceInd() override; void stimFrameProductionTimesliceInd() override;
// Callback function type for produceFrameReq void holdProduceFrameCReq();
typedef std::function<void()> produceFrameReqCbFn;
sscl::co::NonViralNonPostingInvoker produceFrameCReq(
std::exception_ptr& exceptionPtr,
std::function<void()> completion);
public: public:
void produceFrameReq(sscl::cps::Callback<produceFrameReqCbFn> callback);
size_t nDgramsPerStagingBufferFrame; size_t nDgramsPerStagingBufferFrame;
std::shared_ptr<livoxProto1::Device> device; std::shared_ptr<livoxProto1::Device> device;
PcloudFormatDesc formatDesc; PcloudFormatDesc formatDesc;
@@ -111,8 +113,8 @@ public:
std::atomic<std::shared_ptr<PcloudDarkAmbienceStimulusBuffer>> std::atomic<std::shared_ptr<PcloudDarkAmbienceStimulusBuffer>>
darkAmbienceStimulusBuffer; darkAmbienceStimulusBuffer;
private: std::optional<sscl::co::NonViralNonPostingInvoker> activeProduceFrameInvoker;
class ProduceFrameReq; std::exception_ptr produceFrameExceptionPtr;
}; };
} // namespace stim_buff } // namespace stim_buff