PcloudStimBuff: add skeleton produceFrameReq :)

Big waves.
This function wraps the operation of getting a stimframe from
the SpMcRingBuffer, and then eventually assigning it a
SimultaneityStamp. For now we just always pass in the first
stim frame and we don't get any simulstamps.

Its callOriginalCallback() automatically calls
allowNextStimulusFrame() to ensure that it doesn't deadlock future
timeslices.
This commit is contained in:
2025-11-10 01:02:06 -04:00
parent eedeb4b803
commit 401c844fcc
2 changed files with 85 additions and 36 deletions
+81 -30
View File
@@ -2,9 +2,13 @@
#include <opts.h>
#include <algorithm>
#include <unistd.h>
#include <iomanip>
#include <user/spMcRingBuffer.h>
#include <componentThread.h>
#include <asynchronousLoop.h>
#include <user/stimulusFrame.h>
#include "pcloudStimulusBuffer.h"
#include "frameAssemblyDesc.h"
namespace smo {
namespace stim_buff {
@@ -83,56 +87,103 @@ void PcloudStimulusBuffer::stop()
ioUringAssemblyEngine.finalize();
}
void PcloudStimulusBuffer::stimFrameProductionTimesliceInd()
void produceStimFrameAck(void)
{
ioUringAssemblyEngine.assembleFrameReq(
{nullptr, [this](bool success, AsynchronousLoop loop) {
if (!success) {
std::cerr << __func__ << ": Failed to assemble frame" << std::endl;
} else {
std::cout << __func__ << ": Successfully assembled frame "
<< loop.nSucceeded.load() << " slots succeeded "
<< "out of " << loop.nTotal << " total slots" << std::endl;
}
}});
// Release the spinlock for now
allowNextStimulusFrame();
}
class PcloudStimulusBuffer::AssembleAndProduceStimulusFrameReq
: public smo::PostedAsynchronousContinuation<
assembleAndProduceStimulusFrameReqCbFn>
void PcloudStimulusBuffer::stimFrameProductionTimesliceInd()
{
produceFrameReq({nullptr, nullptr});
}
class PcloudStimulusBuffer::ProduceFrameReq
: public PostedAsynchronousContinuation<produceFrameReqCbFn>
{
private:
PcloudStimulusBuffer& stimBuff;
AsynchronousLoop frameAssemblyResult;
StimulusFrame& stimulusFrame;
public:
AssembleAndProduceStimulusFrameReq(
ProduceFrameReq(
PcloudStimulusBuffer& buffer,
Callback<assembleAndProduceStimulusFrameReqCbFn> callback)
: PostedAsynchronousContinuation<assembleAndProduceStimulusFrameReqCbFn>(
buffer.device->componentThread, std::move(callback)),
stimBuff(buffer)
const std::shared_ptr<ComponentThread>& caller,
Callback<produceFrameReqCbFn> cb)
: PostedAsynchronousContinuation<produceFrameReqCbFn>(caller, cb),
stimBuff(buffer),
frameAssemblyResult(0),
stimulusFrame(buffer.frames_[0])
{}
public:
void callOriginalCallback()
{
stimBuff.frameAssemblyRateLimiter.release();
callOriginalCb(static_cast<SimultaneityStamp>(0));
stimBuff.allowNextStimulusFrame();
callOriginalCb();
}
void callOriginalCallbackWithFailure()
public:
void produceFrameReq1_doAssemble_posted(
std::shared_ptr<ProduceFrameReq> context)
{
stimBuff.frameAssemblyRateLimiter.release();
callOriginalCb(static_cast<SimultaneityStamp>(0));
stimBuff.ioUringAssemblyEngine.assembleFrameReq(
{context, std::bind(
&ProduceFrameReq::produceFrameReq2_assembleDone,
context.get(), context,
std::placeholders::_1, std::placeholders::_2)});
}
void produceFrameReq2_assembleDone(
std::shared_ptr<ProduceFrameReq> context,
bool success, AsynchronousLoop loop)
{
if (!success)
{
std::cerr << __func__ << ": Failed to assemble frame" << std::endl;
callOriginalCallback();
return;
}
std::cout << __func__ << ": Successfully assembled frame "
<< loop.nSucceeded.load() << " slots succeeded "
<< "out of " << loop.nTotal << " total slots" << std::endl;
context->frameAssemblyResult = loop;
stimBuff.openClCollatingAndMeshingEngine.compactCollateAndMeshFrameReq(
loop, stimulusFrame,
{context, std::bind(
&ProduceFrameReq::produceFrameReq3_compactCollateDone,
context.get(), context,
std::placeholders::_1, std::placeholders::_2)});
}
void produceFrameReq3_compactCollateDone(
[[maybe_unused]] std::shared_ptr<ProduceFrameReq> context,
bool success, StimulusFrame& /*stimulusFrame*/)
{
if (!success) {
std::cerr << __func__ << ": Failed to compact and collate frame" << std::endl;
} else {
std::cout << __func__ << ": Successfully compacted and collated frame" << std::endl;
}
callOriginalCallback();
}
};
void PcloudStimulusBuffer::assembleAndProduceStimulusFrameReq(
smo::Callback<assembleAndProduceStimulusFrameReqCbFn> callback)
void PcloudStimulusBuffer::produceFrameReq(
smo::Callback<produceFrameReqCbFn> callback)
{
// Wireframe implementation - do nothing for now
(void)callback;
auto caller = smoHooksPtr->ComponentThread_getSelf();
auto request = std::make_shared<ProduceFrameReq>(
*this, caller, std::move(callback));
// Post the doAssemble method to the component thread
device->componentThread->getIoService().post(
STC(std::bind(
&ProduceFrameReq::produceFrameReq1_doAssemble_posted,
request.get(), request)));
}
} // namespace stim_buff
@@ -47,13 +47,11 @@ public:
protected:
void stimFrameProductionTimesliceInd() override;
// Callback function type for assembleAndProduceStimulusFrameReq
typedef std::function<void(SimultaneityStamp)>
assembleAndProduceStimulusFrameReqCbFn;
// Callback function type for produceFrameReq
typedef std::function<void()> produceFrameReqCbFn;
public:
void assembleAndProduceStimulusFrameReq(
smo::Callback<assembleAndProduceStimulusFrameReqCbFn> callback);
void produceFrameReq(smo::Callback<produceFrameReqCbFn> callback);
std::shared_ptr<livoxProto1::Device> device;
PcloudFormatDesc formatDesc;
@@ -63,7 +61,7 @@ public:
StagingBuffer collationBuffer;
private:
class AssembleAndProduceStimulusFrameReq;
class ProduceFrameReq;
};
} // namespace stim_buff