#include #include #include #include #include #include #include #include #include #include "pcloudStimulusProducer.h" #include namespace smo { namespace stim_buff { extern const SmoCallbacks* smoHooksPtr; // OpenCL kernels are used to collate and produce our StimFrames. static SpMcRingBuffer::InputEngineConstraints openClInputConstraints( static_cast(sysconf(_SC_PAGE_SIZE)), sizeof(void *)); PcloudStimulusProducer::PcloudStimulusProducer( const std::shared_ptr &deviceAttachmentSpec, std::shared_ptr &device, const PcloudFormatDesc& formatDesc, size_t nDgramsPerStagingBufferFrame) : StimulusProducer( deviceAttachmentSpec, device->componentThread->getIoService()), device(device), formatDesc(formatDesc), openClCollatingAndMeshingEngine(*this), assemblyBuffer( StagingBuffer::IOEngineConstraints::ioUringConstraints, StagingBuffer::IOEngineConstraints::openClInputConstraints, nDgramsPerStagingBufferFrame), ioUringAssemblyEngine(*this, nDgramsPerStagingBufferFrame), collationBuffer( StagingBuffer::IOEngineConstraints::openClInputConstraints, StagingBuffer::IOEngineConstraints::openClInputConstraints, nDgramsPerStagingBufferFrame) { if (smoHooksPtr->OptionParser_getOptions().verbose) { std::cout << __func__ << ": assembly buffer : " << assemblyBuffer.stringify() << "\n\tcollation buffer : " << collationBuffer.stringify() << "\n"; } std::cout << __func__ << ": Device's component thread is " << device->componentThread->name << std::endl; #ifndef CONFIG_WORLD_USE_BODY_THREAD if (smoHooksPtr->ComponentThread_getSelf()->id != ComponentThread::WORLD) #else if (smoHooksPtr->ComponentThread_getSelf()->id != ComponentThread::BODY) #endif { std::string errMsg = std::string(__func__) + ": PcloudStimulusProducer constructor called on non-world/body thread " + smoHooksPtr->ComponentThread_getSelf()->name; std::cout << errMsg << std::endl; // throw std::runtime_error(errMsg); } } void PcloudStimulusProducer::start() { std::cout << __func__ << ": Starting PcloudStimulusProducer for device " << device->discoveredDevice.deviceIdentifier << std::endl; // Call ioUringAssemblyEngine setup() as the first step if (!ioUringAssemblyEngine.setup()) { std::cout <<__func__ <<"Failed to setup() " <<"IOUringAssemblyEngine.\n"; return; } if (!openClCollatingAndMeshingEngine.setup()) { std::cout <<__func__ <<"Failed to setup() " <<"OClCollMeshEngine.\n"; return; } // Call base class start() as the final step StimulusProducer::start(); } void PcloudStimulusProducer::stop() { std::cout << __func__ << ": Stopping PcloudStimulusProducer for device " << device->discoveredDevice.deviceIdentifier << std::endl; // Call base class stop() as the first step StimulusProducer::stop(); // Call ioUringAssemblyEngine stop() as the final step openClCollatingAndMeshingEngine.finalize(); ioUringAssemblyEngine.finalize(); } void produceStimFrameAck(void) { } // Helper function to parse histbuffMs from device attachment spec static int parseHistbuffMs( const std::shared_ptr& spec) { int histbuffMs = 30000; // Default: 30000ms (30 seconds) const std::vector histbuffParamNames = { "history-buffer-duration-ms", "hist-buff-duration-ms", "histbuff-duration-ms", "histbuff-ms" }; // Loop through synonyms in reverse order; lattermost synonym wins. for (auto synIt = histbuffParamNames.rbegin(); synIt != histbuffParamNames.rend(); ++synIt) { const auto& paramName = *synIt; try { histbuffMs = device::DeviceAttachmentSpec ::parseRequiredParamAsInt( spec->qualeIfaceApiParams, paramName); break; // Found and parsed successfully } catch (const std::exception&) { // Parameter not found or parse error, continue to next synonym continue; } } return histbuffMs; } std::shared_ptr PcloudStimulusProducer::getAttachedStimulusBuffer( const std::shared_ptr& spec) const { // Call base class implementation auto buffer = StimulusProducer::getAttachedStimulusBuffer(spec); if (!buffer) { return nullptr; } // Optionally validate/upcast the buffer type matches expected type // based on qualeIfaceApi (for type safety) const std::string& qualeIfaceApi = spec->qualeIfaceApi; if (qualeIfaceApi == "mesh") { if (std::dynamic_pointer_cast(buffer)) { return buffer; } } else if (qualeIfaceApi == "pcloudIntensity") { if (std::dynamic_pointer_cast(buffer)) { return buffer; } } else if (qualeIfaceApi == "pcloudAmbience") { if (std::dynamic_pointer_cast(buffer)) { return buffer; } } // Type mismatch - return nullptr return nullptr; } std::shared_ptr PcloudStimulusProducer::getOrCreateAttachedStimulusBuffer( const std::shared_ptr& deviceAttachmentSpec ) { // Check if buffer already exists (idempotent) auto existingBuffer = getAttachedStimulusBuffer(deviceAttachmentSpec); if (existingBuffer) { return existingBuffer; } // Parse histbuffMs from device attachment spec int histbuffMs = parseHistbuffMs(deviceAttachmentSpec); // Parse qualeIfaceApi to determine buffer type const std::string& qualeIfaceApi = deviceAttachmentSpec->qualeIfaceApi; if (qualeIfaceApi == "mesh") { auto meshBuffer = std::make_shared( *this, deviceAttachmentSpec, histbuffMs, openClInputConstraints); meshStimulusBuffer = meshBuffer; attachedStimulusBuffers.push_back(meshBuffer); return meshBuffer; } else if (qualeIfaceApi == "pcloudIntensity") { auto intensityBuffer = std::make_shared( *this, deviceAttachmentSpec, histbuffMs, openClInputConstraints); intensityStimulusBuffer = intensityBuffer; attachedStimulusBuffers.push_back(intensityBuffer); return intensityBuffer; } else if (qualeIfaceApi == "pcloudAmbience") { auto ambienceBuffer = std::make_shared( *this, deviceAttachmentSpec, histbuffMs, openClInputConstraints); ambienceStimulusBuffer = ambienceBuffer; attachedStimulusBuffers.push_back(ambienceBuffer); return ambienceBuffer; } else { throw std::runtime_error( "Unsupported qualeIfaceApi: '" + qualeIfaceApi + "' for " "PcloudStimulusProducer. " "Supported values: mesh, pcloudIntensity, pcloudAmbience"); } } void PcloudStimulusProducer::stimFrameProductionTimesliceInd() { produceFrameReq({nullptr, nullptr}); } class PcloudStimulusProducer::ProduceFrameReq : public PostedAsynchronousContinuation { private: PcloudStimulusProducer& pcloudProducer; AsynchronousLoop frameAssemblyResult; StimulusFrame& stimulusFrame; public: ProduceFrameReq( PcloudStimulusProducer& producer, const std::shared_ptr& caller, Callback cb) : PostedAsynchronousContinuation(caller, cb), pcloudProducer(producer), frameAssemblyResult(0), stimulusFrame(producer.tempStimulusFrame) {} public: void callOriginalCallback() { pcloudProducer.allowNextStimulusFrame(); callOriginalCb(); } public: void produceFrameReq1_doAssemble_posted( std::shared_ptr context) { SpinLock::Guard lock(pcloudProducer.shouldContinueLock); if (!pcloudProducer.shouldContinue) { callOriginalCallback(); return; } pcloudProducer.ioUringAssemblyEngine.assembleFrameReq( {context, std::bind( &ProduceFrameReq::produceFrameReq2_assembleDone, context.get(), context, std::placeholders::_1, std::placeholders::_2)}); } void produceFrameReq2_assembleDone( std::shared_ptr context, bool success, AsynchronousLoop loop) { SpinLock::Guard lock(pcloudProducer.shouldContinueLock); if (!pcloudProducer.shouldContinue) { callOriginalCallback(); return; } if (!success) { std::cerr << __func__ << ": Failed to assemble frame" << std::endl; callOriginalCallback(); return; } context->frameAssemblyResult = loop; pcloudProducer.openClCollatingAndMeshingEngine.compactCollateAndMeshFrameReq( loop, stimulusFrame, {context, std::bind( &ProduceFrameReq::produceFrameReq3_compactCollateDone, context.get(), context, std::placeholders::_1, std::placeholders::_2)}); } void produceFrameReq3_compactCollateDone( [[maybe_unused]] std::shared_ptr context, bool success, StimulusFrame& /*stimulusFrame*/) { SpinLock::Guard lock(pcloudProducer.shouldContinueLock); if (!pcloudProducer.shouldContinue) { callOriginalCallback(); return; } if (!success) { std::cerr << __func__ << ": Failed to compact and collate frame" << std::endl; } else { std::cout << __func__ << ": Successfully compacted and collated frame" << std::endl; } // Print kernel execution durations auto compactDuration = pcloudProducer.openClCollatingAndMeshingEngine.getCompactKernelDuration(); auto collateDuration = pcloudProducer.openClCollatingAndMeshingEngine.getCollateKernelDuration(); std::cout << __func__ << ": compactKernelDuration=" << compactDuration.count() << "ms, collateKernelDuration=" << collateDuration.count() << "ms" << std::endl; callOriginalCallback(); } }; void PcloudStimulusProducer::produceFrameReq( smo::Callback callback) { /** EXPLANATION: * We shouldn't acquire the StimulusProducer::shouldContinueLock here because * this function is called from * StimulusProducer::stimFrameProductionTimesliceInd(), which is already * holding the lock. */ auto caller = smoHooksPtr->ComponentThread_getSelf(); auto request = std::make_shared( *this, caller, std::move(callback)); // Post the doAssemble method to the component thread device->componentThread->getIoService().post( STC(std::bind( &ProduceFrameReq::produceFrameReq1_doAssemble_posted, request.get(), request))); } } // namespace stim_buff } // namespace smo