Files
salmanoff/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp
T

477 lines
15 KiB
C++
Raw Normal View History

#include <config.h>
#include <opts.h>
#include <algorithm>
#include <unistd.h>
#include <iomanip>
#include <user/spMcRingBuffer.h>
#include <componentThread.h>
#include <asynchronousLoop.h>
#include <user/stimulusFrame.h>
#include <user/frameAssemblyDesc.h>
#include <livoxProto1/device.h>
#include "pcloudStimulusProducer.h"
namespace smo {
namespace stim_buff {
extern const SmoCallbacks* smoHooksPtr;
2025-11-01 21:33:35 -04:00
// OpenCL kernels are used to collate and produce our StimFrames.
static StagingBuffer::IOEngineConstraints openClInputConstraints(
/** FIXME:
* This should eventually be aligned to 4B and padded to 12B.
*/
// slotStartAlignmentByteVal (page alignment)
sizeof(float),
// slotPadToNBytes (XYZ = 3 floats per point)
sizeof(float) * 3,
// frameStartAlignmentByteVal (page alignment)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)),
// framePadToNBytes (pointer size)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)));
// OpenCL kernels are used to collate and produce our StimFrames.
static StagingBuffer::IOEngineConstraints openClMeshInputConstraints(
// slotStartAlignmentByteVal (page alignment)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)),
// slotPadToNBytes: This is dynamically calculated based on the return mode.
sizeof(float) * 3,
// frameStartAlignmentByteVal (page alignment)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)),
// framePadToNBytes (pointer size)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)));
2025-11-01 21:33:35 -04:00
static StagingBuffer::IOEngineConstraints openClIntensityInputConstraints(
// slotStartAlignmentByteVal (page alignment)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)),
// slotPadToNBytes: This is dynamically calculated based on the return mode.
sizeof(float),
// frameStartAlignmentByteVal (page alignment)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)),
// framePadToNBytes (pointer size)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)));
static StagingBuffer::IOEngineConstraints openClAmbienceInputConstraints(
// slotStartAlignmentByteVal (page alignment)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)),
// slotPadToNBytes: This is dynamically calculated based on the return mode.
sizeof(float),
// frameStartAlignmentByteVal (page alignment)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)),
// framePadToNBytes (pointer size)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)));
PcloudStimulusProducer::PcloudStimulusProducer(
const std::shared_ptr<device::DeviceAttachmentSpec> &deviceAttachmentSpec,
2025-11-01 21:33:35 -04:00
std::shared_ptr<livoxProto1::Device> &device,
const PcloudFormatDesc& formatDesc,
size_t nDgramsPerStagingBufferFrame)
2025-11-14 19:50:51 -04:00
: StimulusProducer(
2025-11-01 21:33:35 -04:00
deviceAttachmentSpec,
device->componentThread->getIoService()),
nDgramsPerStagingBufferFrame(nDgramsPerStagingBufferFrame),
device(device),
2025-11-07 22:07:27 -04:00
formatDesc(formatDesc),
openClCollatingAndMeshingEngine(*this),
2025-11-07 22:07:27 -04:00
assemblyBuffer(
StagingBuffer::IOEngineConstraints::ioUringConstraints,
StagingBuffer::IOEngineConstraints::openClInputConstraints,
nDgramsPerStagingBufferFrame),
ioUringAssemblyEngine(*this, nDgramsPerStagingBufferFrame),
2025-11-07 22:07:27 -04:00
collationBuffer(
StagingBuffer::IOEngineConstraints::openClInputConstraints,
StagingBuffer::IOEngineConstraints::openClInputConstraints,
nDgramsPerStagingBufferFrame),
tempStimulusFrameMem(0),
tempStimulusFrame(
FrameAssemblyDesc::SlotDesc{
0,
reinterpret_cast<uint8_t*>(&tempStimulusFrameMem),
sizeof(tempStimulusFrameMem)},
*smoHooksPtr, 0)
2025-11-01 21:33:35 -04:00
{
2025-11-08 01:45:47 -04:00
if (smoHooksPtr->OptionParser_getOptions().verbose)
{
std::cout << __func__ << ": assembly buffer : "
<< assemblyBuffer.stringify()
<< "\n\tcollation buffer : " << collationBuffer.stringify()
<< "\n";
}
std::cout << __func__ << ": Device's component thread is "
<< device->componentThread->name << std::endl;
#ifndef CONFIG_WORLD_USE_BODY_THREAD
if (smoHooksPtr->ComponentThread_getSelf()->id != ComponentThread::WORLD)
#else
if (smoHooksPtr->ComponentThread_getSelf()->id != ComponentThread::BODY)
#endif
{
std::string errMsg = std::string(__func__) +
": PcloudStimulusProducer constructor called on non-world/body thread " +
smoHooksPtr->ComponentThread_getSelf()->name;
std::cout << errMsg << std::endl;
// throw std::runtime_error(errMsg);
}
2025-11-01 21:33:35 -04:00
}
void PcloudStimulusProducer::start()
2025-11-01 22:03:28 -04:00
{
std::cout << __func__ << ": Starting PcloudStimulusProducer for device "
<< device->discoveredDevice.deviceIdentifier << std::endl;
2025-11-14 02:08:03 -04:00
// Call ioUringAssemblyEngine setup() as the first step
if (!ioUringAssemblyEngine.setup())
{
std::cout <<__func__ <<"Failed to setup() "
<<"IOUringAssemblyEngine.\n";
return;
}
if (!openClCollatingAndMeshingEngine.setup())
{
std::cout <<__func__ <<"Failed to setup() "
<<"OClCollMeshEngine.\n";
return;
}
2025-11-01 22:03:28 -04:00
// Call base class start() as the final step
2025-11-14 19:50:51 -04:00
StimulusProducer::start();
2025-11-01 22:03:28 -04:00
}
void PcloudStimulusProducer::stop()
2025-11-01 22:03:28 -04:00
{
std::cout << __func__ << ": Stopping PcloudStimulusProducer for device "
<< device->discoveredDevice.deviceIdentifier << std::endl;
2025-11-01 22:03:28 -04:00
// Call base class stop() as the first step
2025-11-14 19:50:51 -04:00
StimulusProducer::stop();
2025-11-01 22:03:28 -04:00
// Call ioUringAssemblyEngine stop() as the final step
openClCollatingAndMeshingEngine.finalize();
ioUringAssemblyEngine.finalize();
2025-11-01 22:03:28 -04:00
}
void produceStimFrameAck(void)
{
}
// Helper function to parse histbuffMs from device attachment spec
static int parseHistbuffMs(
const std::shared_ptr<device::DeviceAttachmentSpec>& spec)
{
const std::vector<std::string> histbuffParamNames = {
"history-buffer-duration-ms",
"hist-buff-duration-ms",
"histbuff-duration-ms",
"histbuff-ms"
};
2025-11-16 04:46:42 -04:00
return device::DeviceAttachmentSpec::parseOptionalParamAsIntWithSynonyms(
spec->qualeIfaceApiParams, histbuffParamNames, 30000);
}
std::shared_ptr<StimulusBuffer>
PcloudStimulusProducer::getAttachedStimulusBuffer(
const std::shared_ptr<device::DeviceAttachmentSpec>& spec) const
{
// Call base class implementation
auto buffer = StimulusProducer::getAttachedStimulusBuffer(spec);
if (!buffer)
{
return nullptr;
}
// Optionally validate/upcast the buffer type matches expected type
// based on qualeIfaceApi (for type safety)
const std::string& qualeIfaceApi = spec->qualeIfaceApi;
if (qualeIfaceApi == "mesh")
{
if (std::dynamic_pointer_cast<MeshStimulusBuffer>(buffer))
{ return buffer; }
}
else if (qualeIfaceApi == "pcloudIntensity")
{
if (std::dynamic_pointer_cast<PcloudIntensityStimulusBuffer>(buffer))
{ return buffer; }
}
else if (qualeIfaceApi == "pcloudAmbience")
{
if (std::dynamic_pointer_cast<PcloudAmbienceStimulusBuffer>(buffer))
{ return buffer; }
}
// Type mismatch - return nullptr
return nullptr;
}
void PcloudStimulusProducer::destroyAttachedStimulusBuffer(
const std::shared_ptr<StimulusBuffer>& buffer)
{
if (!buffer) { return; }
this->stop();
// Clear specialized buffer pointers if they match
if (meshStimulusBuffer == buffer)
{ meshStimulusBuffer.reset(); }
if (intensityStimulusBuffer == buffer)
{ intensityStimulusBuffer.reset(); }
if (ambienceStimulusBuffer == buffer)
{ ambienceStimulusBuffer.reset(); }
// Call base class implementation to remove from attachedStimulusBuffers
StimulusProducer::destroyAttachedStimulusBuffer(buffer);
this->start();
}
std::shared_ptr<StimulusBuffer>
PcloudStimulusProducer::getOrCreateAttachedStimulusBuffer(
const std::shared_ptr<device::DeviceAttachmentSpec>& deviceAttachmentSpec
)
{
// Check if buffer already exists (idempotent)
auto existingBuffer = getAttachedStimulusBuffer(deviceAttachmentSpec);
if (existingBuffer)
{ return existingBuffer; }
// Parse histbuffMs from device attachment spec
int histbuffMs = parseHistbuffMs(deviceAttachmentSpec);
// Parse qualeIfaceApi to determine buffer type
const std::string& qualeIfaceApi = deviceAttachmentSpec->qualeIfaceApi;
// Calculate nPointsPerDgram based on return mode
size_t nPointsPerDgram = livoxProto1::Device::getNPointsPerDgram(
static_cast<int>(device->currentReturnMode));
if (qualeIfaceApi == "mesh")
{
/* Calculate slotStrideNBytes:
* nDgramsPerStagingBufferFrame * nPointsPerDgram * sizeof(float) * 3
*/
size_t slotStrideNBytes = this->nDgramsPerStagingBufferFrame
* nPointsPerDgram * sizeof(float) * 3;
// Reuse openClMeshInputConstraints, only modify slotPadToNBytes
openClMeshInputConstraints.slotPadToNBytes = slotStrideNBytes;
std::cout << __func__ << ": $$$$$$$ Creating MeshStimulusBuffer" << std::endl;
auto meshBuffer = std::make_shared<MeshStimulusBuffer>(
*this, deviceAttachmentSpec, histbuffMs,
openClMeshInputConstraints, openClMeshInputConstraints,
*smoHooksPtr, CL_MEM_READ_WRITE);
std::cout << __func__ << ": $$$$$$$ Created MeshStimulusBuffer" << std::endl;
this->stop();
meshStimulusBuffer = meshBuffer;
attachedStimulusBuffers.push_back(meshBuffer);
this->start();
return meshBuffer;
}
else if (qualeIfaceApi == "pcloudIntensity")
{
/* Calculate slotStrideNBytes:
* nDgramsPerStagingBufferFrame * nPointsPerDgram * sizeof(float) * 1
*/
size_t slotStrideNBytes = this->nDgramsPerStagingBufferFrame
* nPointsPerDgram * sizeof(float) * 1;
// Reuse openClIntensityInputConstraints, only modify slotPadToNBytes
openClIntensityInputConstraints.slotPadToNBytes = slotStrideNBytes;
std::cout << __func__ << ": $$$$$$$ Creating PcloudIntensityStimulusBuffer" << std::endl;
auto intensityBuffer = std::make_shared<PcloudIntensityStimulusBuffer>(
*this, deviceAttachmentSpec, histbuffMs,
openClIntensityInputConstraints, openClIntensityInputConstraints,
*smoHooksPtr, CL_MEM_READ_WRITE);
std::cout << __func__ << ": $$$$$$$ Created PcloudIntensityStimulusBuffer" << std::endl;
this->stop();
intensityStimulusBuffer = intensityBuffer;
attachedStimulusBuffers.push_back(intensityBuffer);
this->start();
return intensityBuffer;
}
else if (qualeIfaceApi == "pcloudAmbience")
{
/* Calculate slotStrideNBytes:
* nDgramsPerStagingBufferFrame * sizeof(float)
*/
size_t slotStrideNBytes = this->nDgramsPerStagingBufferFrame
* sizeof(float);
// Reuse openClAmbienceInputConstraints, only modify slotPadToNBytes
openClAmbienceInputConstraints.slotPadToNBytes = slotStrideNBytes;
auto ambienceBuffer = std::make_shared<PcloudAmbienceStimulusBuffer>(
*this, deviceAttachmentSpec, histbuffMs,
openClAmbienceInputConstraints, openClAmbienceInputConstraints,
*smoHooksPtr, CL_MEM_READ_WRITE);
std::cout << __func__ << ": $$$$$$$ Created PcloudAmbienceStimulusBuffer" << std::endl;
this->stop();
ambienceStimulusBuffer = ambienceBuffer;
attachedStimulusBuffers.push_back(ambienceBuffer);
this->start();
return ambienceBuffer;
}
else
{
throw std::runtime_error(
"Unsupported qualeIfaceApi: '" + qualeIfaceApi + "' for "
"PcloudStimulusProducer. "
"Supported values: mesh, pcloudIntensity, pcloudAmbience");
}
}
void PcloudStimulusProducer::stimFrameProductionTimesliceInd()
2025-11-01 21:33:35 -04:00
{
produceFrameReq({nullptr, nullptr});
2025-11-01 21:33:35 -04:00
}
class PcloudStimulusProducer::ProduceFrameReq
: public PostedAsynchronousContinuation<produceFrameReqCbFn>
{
private:
PcloudStimulusProducer& pcloudProducer;
AsynchronousLoop frameAssemblyResult;
StimulusFrame& stimulusFrame;
std::optional<std::reference_wrapper<StimulusFrame>> intensityStimFrame;
public:
ProduceFrameReq(
PcloudStimulusProducer& producer,
const std::shared_ptr<ComponentThread>& caller,
Callback<produceFrameReqCbFn> cb)
: PostedAsynchronousContinuation<produceFrameReqCbFn>(caller, cb),
pcloudProducer(producer),
frameAssemblyResult(0),
stimulusFrame(producer.tempStimulusFrame)
{}
public:
void callOriginalCallback()
{
pcloudProducer.allowNextStimulusFrame();
callOriginalCb();
}
public:
void produceFrameReq1_doAssemble_posted(
std::shared_ptr<ProduceFrameReq> context)
{
SpinLock::Guard lock(pcloudProducer.shouldContinueLock);
if (!pcloudProducer.shouldContinue)
{
callOriginalCallback();
return;
}
pcloudProducer.ioUringAssemblyEngine.assembleFrameReq(
{context, std::bind(
&ProduceFrameReq::produceFrameReq2_assembleDone,
context.get(), context,
std::placeholders::_1, std::placeholders::_2)});
}
void produceFrameReq2_assembleDone(
std::shared_ptr<ProduceFrameReq> context,
bool success, AsynchronousLoop loop)
{
SpinLock::Guard lock(pcloudProducer.shouldContinueLock);
if (!pcloudProducer.shouldContinue)
{
callOriginalCallback();
return;
}
if (!success)
{
std::cerr << __func__ << ": Failed to assemble frame" << std::endl;
callOriginalCallback();
return;
}
context->frameAssemblyResult = loop;
// Check if intensity buffer is attached and acquire frame if so
if (pcloudProducer.intensityStimulusBuffer)
{
size_t intensityRingbuffIndex = pcloudProducer
.intensityStimulusBuffer->ringBuffer.getIndexToProduceInto();
StimulusFrame& intensityStimFrame = pcloudProducer
.intensityStimulusBuffer->ringBuffer.getDataAtSlot(
intensityRingbuffIndex);
intensityStimFrame.lock.writeAcquire();
context->intensityStimFrame = std::make_optional(
std::ref(intensityStimFrame));
}
else {
context->intensityStimFrame = std::nullopt;
}
pcloudProducer.openClCollatingAndMeshingEngine.compactCollateAndMeshFrameReq(
loop, stimulusFrame, context->intensityStimFrame,
{context, std::bind(
&ProduceFrameReq::produceFrameReq3_compactCollateDone,
context.get(), context,
std::placeholders::_1, std::placeholders::_2)});
}
void produceFrameReq3_compactCollateDone(
[[maybe_unused]] std::shared_ptr<ProduceFrameReq> context,
bool success, StimulusFrame& /*stimulusFrame*/)
{
// Release intensity frame if it was used
if (context->intensityStimFrame.has_value()) {
context->intensityStimFrame->get().lock.writeRelease();
}
SpinLock::Guard lock(pcloudProducer.shouldContinueLock);
if (!pcloudProducer.shouldContinue)
{
callOriginalCallback();
return;
}
if (!success) {
std::cerr << __func__ << ": Failed to compact and collate frame" << std::endl;
} else
{
// Print execution durations
auto assemblyDuration = pcloudProducer.ioUringAssemblyEngine.getAssemblyDuration();
auto compactDuration = pcloudProducer.openClCollatingAndMeshingEngine.getCompactKernelDuration();
auto collateDuration = pcloudProducer.openClCollatingAndMeshingEngine.getCollateKernelDuration();
std::cout << __func__ << ": Successfully compacted and collated frame: assemblyDuration=" << assemblyDuration.count()
<< "ms, compactKernelDuration=" << compactDuration.count()
<< "ms, collateKernelDuration=" << collateDuration.count() << "ms" << std::endl;
}
callOriginalCallback();
}
};
void PcloudStimulusProducer::produceFrameReq(
smo::Callback<produceFrameReqCbFn> callback)
{
/** EXPLANATION:
2025-11-14 19:50:51 -04:00
* We shouldn't acquire the StimulusProducer::shouldContinueLock here because
* this function is called from
2025-11-14 19:50:51 -04:00
* StimulusProducer::stimFrameProductionTimesliceInd(), which is already
* holding the lock.
*/
auto caller = smoHooksPtr->ComponentThread_getSelf();
auto request = std::make_shared<ProduceFrameReq>(
*this, caller, std::move(callback));
// Post the doAssemble method to the component thread
device->componentThread->getIoService().post(
STC(std::bind(
&ProduceFrameReq::produceFrameReq1_doAssemble_posted,
request.get(), request)));
}
} // namespace stim_buff
} // namespace smo