Files
salmanoff/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp
T
hayodea 8eb7eaba3d Add PcloudAmbienceStencil, LG1PcloudAmbienceStencil
These two classes represent our first foray into stencil
construction. One of them standardizes PcloudAmbience stencils
across all stimbuffs, and the other specifies the internal
memory constraints and requirements for a LivoxGen1 device's
stencils.
2025-11-26 12:32:42 -04:00

532 lines
17 KiB
C++

#include <config.h>
#include <opts.h>
#include <algorithm>
#include <unistd.h>
#include <iomanip>
#include <cstddef>
#include <user/spMcRingBuffer.h>
#include <componentThread.h>
#include <asynchronousLoop.h>
#include <user/stimulusFrame.h>
#include <user/frameAssemblyDesc.h>
#include <user/pcloudAmbienceStencil.h>
#include <livoxProto1/device.h>
#include "livoxGen1.h"
#include "pcloudStimulusProducer.h"
namespace smo {
namespace stim_buff {
extern const SmoCallbacks* smoHooksPtr;
// OpenCL kernels are used to collate and produce our StimFrames.
static StagingBuffer::IOEngineConstraints openClInputConstraints(
/** FIXME:
* This should eventually be aligned to 4B and padded to 12B.
*/
// slotStartAlignmentByteVal (page alignment)
sizeof(float),
// slotPadToNBytes (XYZ = 3 floats per point)
sizeof(float) * 3,
// frameStartAlignmentByteVal (page alignment)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)),
// framePadToNBytes (pointer size)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)));
// OpenCL kernels are used to collate and produce our StimFrames.
static StagingBuffer::IOEngineConstraints openClMeshInputConstraints(
// slotStartAlignmentByteVal (page alignment)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)),
// slotPadToNBytes: This is dynamically calculated based on the return mode.
sizeof(float) * 3,
// frameStartAlignmentByteVal (page alignment)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)),
// framePadToNBytes (pointer size)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)));
static StagingBuffer::IOEngineConstraints openClIntensityInputConstraints(
// slotStartAlignmentByteVal (page alignment)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)),
// slotPadToNBytes: This is dynamically calculated based on the return mode.
sizeof(float),
// frameStartAlignmentByteVal (page alignment)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)),
// framePadToNBytes (pointer size)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)));
static StagingBuffer::IOEngineConstraints openClAmbienceInputConstraints(
// slotStartAlignmentByteVal (page alignment)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)),
// slotPadToNBytes: This is dynamically calculated based on the return mode.
sizeof(PcloudAmbienceStencil::PcloudAmbienceStimulusValue),
// frameStartAlignmentByteVal (page alignment)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)),
// framePadToNBytes (pointer size)
static_cast<size_t>(sysconf(_SC_PAGE_SIZE)));
PcloudStimulusProducer::PcloudStimulusProducer(
const std::shared_ptr<device::DeviceAttachmentSpec> &deviceAttachmentSpec,
std::shared_ptr<livoxProto1::Device> &device,
const PcloudFormatDesc& formatDesc,
size_t nDgramsPerStagingBufferFrame)
: StimulusProducer(
deviceAttachmentSpec,
device->componentThread->getIoService()),
nDgramsPerStagingBufferFrame(nDgramsPerStagingBufferFrame),
device(device),
formatDesc(formatDesc),
openClCollatingAndMeshingEngine(*this),
assemblyBuffer(
StagingBuffer::IOEngineConstraints::ioUringConstraints,
StagingBuffer::IOEngineConstraints::openClInputConstraints,
nDgramsPerStagingBufferFrame),
ioUringAssemblyEngine(*this, nDgramsPerStagingBufferFrame),
collationBuffer(
StagingBuffer::IOEngineConstraints::openClInputConstraints,
StagingBuffer::IOEngineConstraints::openClInputConstraints,
nDgramsPerStagingBufferFrame),
tempStimulusFrameMem(0),
tempStimulusFrame(
FrameAssemblyDesc::SlotDesc{
0,
reinterpret_cast<uint8_t*>(&tempStimulusFrameMem),
sizeof(tempStimulusFrameMem)},
*smoHooksPtr, 0, SIZE_MAX)
{
if (smoHooksPtr->OptionParser_getOptions().verbose)
{
std::cout << __func__ << ": assembly buffer : "
<< assemblyBuffer.stringify()
<< "\n\tcollation buffer : " << collationBuffer.stringify()
<< "\n";
}
std::cout << __func__ << ": Device's component thread is "
<< device->componentThread->name << std::endl;
#ifndef CONFIG_WORLD_USE_BODY_THREAD
if (smoHooksPtr->ComponentThread_getSelf()->id != ComponentThread::WORLD)
#else
if (smoHooksPtr->ComponentThread_getSelf()->id != ComponentThread::BODY)
#endif
{
std::string errMsg = std::string(__func__) +
": PcloudStimulusProducer constructor called on non-world/body thread " +
smoHooksPtr->ComponentThread_getSelf()->name;
std::cout << errMsg << std::endl;
// throw std::runtime_error(errMsg);
}
}
void PcloudStimulusProducer::start()
{
std::cout << __func__ << ": Starting PcloudStimulusProducer for device "
<< device->discoveredDevice.deviceIdentifier << std::endl;
// Call ioUringAssemblyEngine setup() as the first step
if (!ioUringAssemblyEngine.setup())
{
std::cout <<__func__ <<"Failed to setup() "
<<"IOUringAssemblyEngine.\n";
return;
}
if (!openClCollatingAndMeshingEngine.setup())
{
std::cout <<__func__ <<"Failed to setup() "
<<"OClCollMeshEngine.\n";
return;
}
// Call base class start() as the final step
StimulusProducer::start();
}
void PcloudStimulusProducer::stop()
{
std::cout << __func__ << ": Stopping PcloudStimulusProducer for device "
<< device->discoveredDevice.deviceIdentifier << std::endl;
// Call base class stop() as the first step
StimulusProducer::stop();
// Call ioUringAssemblyEngine stop() as the final step
openClCollatingAndMeshingEngine.finalize();
ioUringAssemblyEngine.finalize();
}
void produceStimFrameAck(void)
{
}
// Helper function to parse histbuffMs from device attachment spec
static int parseHistbuffMs(
const std::shared_ptr<device::DeviceAttachmentSpec>& spec)
{
const std::vector<std::string> histbuffParamNames = {
"history-buffer-duration-ms",
"hist-buff-duration-ms",
"histbuff-duration-ms",
"histbuff-ms"
};
return device::DeviceAttachmentSpec::parseOptionalParamAsIntWithSynonyms(
spec->qualeIfaceApiParams, histbuffParamNames, 30000);
}
std::shared_ptr<StimulusBuffer>
PcloudStimulusProducer::getAttachedStimulusBuffer(
const std::shared_ptr<device::DeviceAttachmentSpec>& spec) const
{
// Call base class implementation
auto buffer = StimulusProducer::getAttachedStimulusBuffer(spec);
if (!buffer)
{
return nullptr;
}
// Optionally validate/upcast the buffer type matches expected type
// based on qualeIfaceApi (for type safety)
const std::string& qualeIfaceApi = spec->qualeIfaceApi;
if (qualeIfaceApi == "mesh")
{
if (std::dynamic_pointer_cast<MeshStimulusBuffer>(buffer))
{ return buffer; }
}
else if (qualeIfaceApi == "pcloudIntensity")
{
if (std::dynamic_pointer_cast<PcloudIntensityStimulusBuffer>(buffer))
{ return buffer; }
}
else if (qualeIfaceApi == "pcloudAmbience")
{
if (std::dynamic_pointer_cast<PcloudAmbienceStimulusBuffer>(buffer))
{ return buffer; }
}
// Type mismatch - return nullptr
return nullptr;
}
void PcloudStimulusProducer::destroyAttachedStimulusBuffer(
const std::shared_ptr<StimulusBuffer>& buffer)
{
if (!buffer) { return; }
this->stop();
// Clear specialized buffer pointers if they match
auto meshBuff = meshStimulusBuffer.load(std::memory_order_acquire);
if (meshBuff == buffer)
{
meshBuff.reset();
meshStimulusBuffer.store(nullptr, std::memory_order_release);
}
auto intensityBuff = intensityStimulusBuffer.load(std::memory_order_acquire);
if (intensityBuff == buffer)
{
intensityBuff.reset();
intensityStimulusBuffer.store(nullptr, std::memory_order_release);
}
auto ambienceBuff = ambienceStimulusBuffer.load(std::memory_order_acquire);
if (ambienceBuff == buffer)
{
ambienceBuff.reset();
ambienceStimulusBuffer.store(nullptr, std::memory_order_release);
}
// Call base class implementation to remove from attachedStimulusBuffers
StimulusProducer::destroyAttachedStimulusBuffer(buffer);
this->start();
}
std::shared_ptr<StimulusBuffer>
PcloudStimulusProducer::getOrCreateAttachedStimulusBuffer(
const std::shared_ptr<device::DeviceAttachmentSpec>& deviceAttachmentSpec
)
{
// Check if buffer already exists (idempotent)
auto existingBuffer = getAttachedStimulusBuffer(deviceAttachmentSpec);
if (existingBuffer)
{ return existingBuffer; }
// Parse histbuffMs from device attachment spec
int histbuffMs = parseHistbuffMs(deviceAttachmentSpec);
// Parse qualeIfaceApi to determine buffer type
const std::string& qualeIfaceApi = deviceAttachmentSpec->qualeIfaceApi;
// Calculate nPointsPerDgram based on return mode
size_t nPointsPerDgram = livoxProto1::Device::getNPointsPerDgram(
static_cast<int>(device->currentReturnMode));
if (qualeIfaceApi == "mesh")
{
/* Calculate slotStrideNBytes:
* nDgramsPerStagingBufferFrame * nPointsPerDgram * sizeof(float) * 3
*/
size_t slotStrideNBytes = this->nDgramsPerStagingBufferFrame
* nPointsPerDgram * sizeof(float) * 3;
// Reuse openClMeshInputConstraints, only modify slotPadToNBytes
openClMeshInputConstraints.slotPadToNBytes = slotStrideNBytes;
std::cout << __func__ << ": $$$$$$$ Creating MeshStimulusBuffer" << std::endl;
auto meshBuffer = std::make_shared<MeshStimulusBuffer>(
*this, deviceAttachmentSpec, histbuffMs,
openClMeshInputConstraints, openClMeshInputConstraints,
*smoHooksPtr, CL_MEM_READ_WRITE);
std::cout << __func__ << ": $$$$$$$ Created MeshStimulusBuffer" << std::endl;
this->stop();
addAttachedStimulusBufferIfNotExists(meshBuffer);
meshStimulusBuffer.store(meshBuffer, std::memory_order_release);
this->start();
return meshBuffer;
}
else if (qualeIfaceApi == "pcloudIntensity")
{
/* Calculate slotStrideNBytes:
* nDgramsPerStagingBufferFrame * nPointsPerDgram * sizeof(float) * 1
*/
size_t slotStrideNBytes = this->nDgramsPerStagingBufferFrame
* nPointsPerDgram * sizeof(float) * 1;
// Reuse openClIntensityInputConstraints, only modify slotPadToNBytes
openClIntensityInputConstraints.slotPadToNBytes = slotStrideNBytes;
std::cout << __func__ << ": $$$$$$$ Creating PcloudIntensityStimulusBuffer" << std::endl;
auto intensityBuffer = std::make_shared<PcloudIntensityStimulusBuffer>(
*this, deviceAttachmentSpec, histbuffMs,
openClIntensityInputConstraints, openClIntensityInputConstraints,
*smoHooksPtr, CL_MEM_READ_WRITE);
std::cout << __func__ << ": $$$$$$$ Created PcloudIntensityStimulusBuffer" << std::endl;
this->stop();
addAttachedStimulusBufferIfNotExists(intensityBuffer);
intensityStimulusBuffer.store(
intensityBuffer, std::memory_order_release);
this->start();
return intensityBuffer;
}
else if (qualeIfaceApi == "pcloudAmbience")
{
// Parse ambienceHighVal from qualeIfaceApiParams (temporary, undocumented)
const std::vector<std::string> ambienceHighValParamNames = {
"negtrin-intolerable-threshold",
"negtrin-intolerable",
"intolerable-threshold",
"intolerable"
};
int ambienceHighValInt = device::DeviceAttachmentSpec
::parseOptionalParamAsIntWithSynonyms(
deviceAttachmentSpec->qualeIfaceApiParams,
ambienceHighValParamNames, 116);
uint32_t ambienceHighVal = static_cast<uint32_t>(ambienceHighValInt);
/* Calculate slotStrideNBytes:
* nDgramsPerStagingBufferFrame * sizeof(uint32_t)
*/
size_t slotStrideNBytes = this->nDgramsPerStagingBufferFrame
* sizeof(uint32_t);
// Reuse openClAmbienceInputConstraints, only modify slotPadToNBytes
openClAmbienceInputConstraints.slotPadToNBytes = slotStrideNBytes;
auto ambienceBuffer = std::make_shared<PcloudAmbienceStimulusBuffer>(
*this, deviceAttachmentSpec, histbuffMs,
openClAmbienceInputConstraints, openClAmbienceInputConstraints,
*smoHooksPtr, CL_MEM_READ_WRITE, ambienceHighVal);
std::cout << __func__ << ": $$$$$$$ Created PcloudAmbienceStimulusBuffer" << std::endl;
this->stop();
addAttachedStimulusBufferIfNotExists(ambienceBuffer);
ambienceStimulusBuffer.store(ambienceBuffer, std::memory_order_release);
this->start();
return ambienceBuffer;
}
else
{
throw std::runtime_error(
"Unsupported qualeIfaceApi: '" + qualeIfaceApi + "' for "
"PcloudStimulusProducer. "
"Supported values: mesh, pcloudIntensity, pcloudAmbience");
}
}
void PcloudStimulusProducer::stimFrameProductionTimesliceInd()
{
produceFrameReq({nullptr, nullptr});
}
class PcloudStimulusProducer::ProduceFrameReq
: public PostedAsynchronousContinuation<produceFrameReqCbFn>
{
private:
PcloudStimulusProducer& pcloudProducer;
AsynchronousLoop frameAssemblyResult;
StimulusFrame& stimulusFrame;
std::optional<std::reference_wrapper<StimulusFrame>> intensityStimFrame;
std::optional<std::reference_wrapper<StimulusFrame>> ambienceStimFrame;
public:
ProduceFrameReq(
PcloudStimulusProducer& producer,
const std::shared_ptr<ComponentThread>& caller,
Callback<produceFrameReqCbFn> cb)
: PostedAsynchronousContinuation<produceFrameReqCbFn>(caller, cb),
pcloudProducer(producer),
frameAssemblyResult(0),
stimulusFrame(producer.tempStimulusFrame)
{}
public:
void callOriginalCallback()
{
pcloudProducer.allowNextStimulusFrame();
callOriginalCb();
}
public:
void produceFrameReq1_doAssemble_posted(
std::shared_ptr<ProduceFrameReq> context)
{
SpinLock::Guard lock(pcloudProducer.shouldContinueLock);
if (!pcloudProducer.shouldContinue)
{
callOriginalCallback();
return;
}
pcloudProducer.ioUringAssemblyEngine.assembleFrameReq(
{context, std::bind(
&ProduceFrameReq::produceFrameReq2_assembleDone,
context.get(), context,
std::placeholders::_1, std::placeholders::_2)});
}
void produceFrameReq2_assembleDone(
std::shared_ptr<ProduceFrameReq> context,
bool success, AsynchronousLoop loop)
{
SpinLock::Guard lock(pcloudProducer.shouldContinueLock);
if (!pcloudProducer.shouldContinue)
{
callOriginalCallback();
return;
}
if (!success)
{
std::cerr << __func__ << ": Failed to assemble frame" << std::endl;
callOriginalCallback();
return;
}
context->frameAssemblyResult = loop;
// Check if intensity buffer is attached and acquire frame if so
if (auto intensityBuff = pcloudProducer.intensityStimulusBuffer.load(
std::memory_order_acquire))
{
size_t intensityRingbuffIndex = intensityBuff
->ringBuffer.getIndexToProduceInto();
StimulusFrame& intensityStimFrame = intensityBuff
->ringBuffer.getDataAtSlot(
intensityRingbuffIndex);
intensityStimFrame.lock.writeAcquire();
context->intensityStimFrame = std::make_optional(
std::ref(intensityStimFrame));
}
else {
context->intensityStimFrame = std::nullopt;
}
// Check if ambience buffer is attached and acquire frame if so
if (auto ambienceBuff = pcloudProducer.ambienceStimulusBuffer.load(
std::memory_order_acquire))
{
size_t ambienceRingbuffIndex = ambienceBuff
->ringBuffer.getIndexToProduceInto();
StimulusFrame& ambienceStimFrame = ambienceBuff
->ringBuffer.getDataAtSlot(
ambienceRingbuffIndex);
ambienceStimFrame.lock.writeAcquire();
context->ambienceStimFrame = std::make_optional(
std::ref(ambienceStimFrame));
}
else {
context->ambienceStimFrame = std::nullopt;
}
pcloudProducer.openClCollatingAndMeshingEngine.compactCollateAndMeshFrameReq(
loop, stimulusFrame,
context->intensityStimFrame, context->ambienceStimFrame,
{context, std::bind(
&ProduceFrameReq::produceFrameReq3_compactCollateDone,
context.get(), context,
std::placeholders::_1, std::placeholders::_2)});
}
void produceFrameReq3_compactCollateDone(
[[maybe_unused]] std::shared_ptr<ProduceFrameReq> context,
bool success, StimulusFrame& /*stimulusFrame*/)
{
// Release intensity frame if it was used
if (context->intensityStimFrame.has_value()) {
context->intensityStimFrame->get().lock.writeRelease();
}
// Release ambience frame if it was used
if (context->ambienceStimFrame.has_value()) {
context->ambienceStimFrame->get().lock.writeRelease();
}
SpinLock::Guard lock(pcloudProducer.shouldContinueLock);
if (!pcloudProducer.shouldContinue)
{
callOriginalCallback();
return;
}
if (!success) {
std::cerr << __func__ << ": Failed to compact and collate frame" << std::endl;
} else
{
// Print execution durations
auto assemblyDuration = pcloudProducer.ioUringAssemblyEngine.getAssemblyDuration();
auto compactDuration = pcloudProducer.openClCollatingAndMeshingEngine.getCompactKernelDuration();
auto collateDuration = pcloudProducer.openClCollatingAndMeshingEngine.getCollateKernelDuration();
std::cout << __func__ << ": Successfully compacted and collated frame: assemblyDuration=" << assemblyDuration.count()
<< "ms, compactKernelDuration=" << compactDuration.count()
<< "ms, collateKernelDuration=" << collateDuration.count() << "ms" << std::endl;
}
callOriginalCallback();
}
};
void PcloudStimulusProducer::produceFrameReq(
smo::Callback<produceFrameReqCbFn> callback)
{
/** EXPLANATION:
* We shouldn't acquire the StimulusProducer::shouldContinueLock here because
* this function is called from
* StimulusProducer::stimFrameProductionTimesliceInd(), which is already
* holding the lock.
*/
auto caller = smoHooksPtr->ComponentThread_getSelf();
auto request = std::make_shared<ProduceFrameReq>(
*this, caller, std::move(callback));
// Post the doAssemble method to the component thread
device->componentThread->getIoService().post(
STC(std::bind(
&ProduceFrameReq::produceFrameReq1_doAssemble_posted,
request.get(), request)));
}
} // namespace stim_buff
} // namespace smo