#include #include #include #include #include #include #include #include #include #include #include #include "pcloudStimulusProducer.h" namespace smo { namespace stim_buff { extern const SmoCallbacks* smoHooksPtr; // OpenCL kernels are used to collate and produce our StimFrames. static StagingBuffer::IOEngineConstraints openClInputConstraints( /** FIXME: * This should eventually be aligned to 4B and padded to 12B. */ // slotStartAlignmentByteVal (page alignment) sizeof(float) * 4, // slotPadToNBytes (pointer size) sizeof(float) * 4, // frameStartAlignmentByteVal (page alignment) static_cast(sysconf(_SC_PAGE_SIZE)), // framePadToNBytes (pointer size) static_cast(sysconf(_SC_PAGE_SIZE))); // OpenCL kernels are used to collate and produce our StimFrames. static StagingBuffer::IOEngineConstraints openClMeshInputConstraints( // slotStartAlignmentByteVal (page alignment) static_cast(sysconf(_SC_PAGE_SIZE)), // slotPadToNBytes (pointer size) sizeof(float) * 3, // frameStartAlignmentByteVal (page alignment) static_cast(sysconf(_SC_PAGE_SIZE)), // framePadToNBytes (pointer size) static_cast(sysconf(_SC_PAGE_SIZE))); static StagingBuffer::IOEngineConstraints openClIntensityInputConstraints( // slotStartAlignmentByteVal (page alignment) static_cast(sysconf(_SC_PAGE_SIZE)), // slotPadToNBytes (intensity value size) sizeof(float), // frameStartAlignmentByteVal (page alignment) static_cast(sysconf(_SC_PAGE_SIZE)), // framePadToNBytes (pointer size) static_cast(sysconf(_SC_PAGE_SIZE))); static StagingBuffer::IOEngineConstraints openClAmbienceInputConstraints( // slotStartAlignmentByteVal (page alignment) static_cast(sysconf(_SC_PAGE_SIZE)), // slotPadToNBytes (pointer size) sizeof(float), // frameStartAlignmentByteVal (page alignment) static_cast(sysconf(_SC_PAGE_SIZE)), // framePadToNBytes (pointer size) static_cast(sysconf(_SC_PAGE_SIZE))); PcloudStimulusProducer::PcloudStimulusProducer( const std::shared_ptr &deviceAttachmentSpec, std::shared_ptr &device, const PcloudFormatDesc& formatDesc, size_t nDgramsPerStagingBufferFrame) : StimulusProducer( deviceAttachmentSpec, device->componentThread->getIoService()), nDgramsPerStagingBufferFrame(nDgramsPerStagingBufferFrame), device(device), formatDesc(formatDesc), openClCollatingAndMeshingEngine(*this), assemblyBuffer( StagingBuffer::IOEngineConstraints::ioUringConstraints, StagingBuffer::IOEngineConstraints::openClInputConstraints, nDgramsPerStagingBufferFrame), ioUringAssemblyEngine(*this, nDgramsPerStagingBufferFrame), collationBuffer( StagingBuffer::IOEngineConstraints::openClInputConstraints, StagingBuffer::IOEngineConstraints::openClInputConstraints, nDgramsPerStagingBufferFrame), tempStimulusFrameMem(0), tempStimulusFrame( FrameAssemblyDesc::SlotDesc{ 0, reinterpret_cast(&tempStimulusFrameMem), sizeof(tempStimulusFrameMem)}, *smoHooksPtr, 0) { if (smoHooksPtr->OptionParser_getOptions().verbose) { std::cout << __func__ << ": assembly buffer : " << assemblyBuffer.stringify() << "\n\tcollation buffer : " << collationBuffer.stringify() << "\n"; } std::cout << __func__ << ": Device's component thread is " << device->componentThread->name << std::endl; #ifndef CONFIG_WORLD_USE_BODY_THREAD if (smoHooksPtr->ComponentThread_getSelf()->id != ComponentThread::WORLD) #else if (smoHooksPtr->ComponentThread_getSelf()->id != ComponentThread::BODY) #endif { std::string errMsg = std::string(__func__) + ": PcloudStimulusProducer constructor called on non-world/body thread " + smoHooksPtr->ComponentThread_getSelf()->name; std::cout << errMsg << std::endl; // throw std::runtime_error(errMsg); } } void PcloudStimulusProducer::start() { std::cout << __func__ << ": Starting PcloudStimulusProducer for device " << device->discoveredDevice.deviceIdentifier << std::endl; // Call ioUringAssemblyEngine setup() as the first step if (!ioUringAssemblyEngine.setup()) { std::cout <<__func__ <<"Failed to setup() " <<"IOUringAssemblyEngine.\n"; return; } if (!openClCollatingAndMeshingEngine.setup()) { std::cout <<__func__ <<"Failed to setup() " <<"OClCollMeshEngine.\n"; return; } // Call base class start() as the final step StimulusProducer::start(); } void PcloudStimulusProducer::stop() { std::cout << __func__ << ": Stopping PcloudStimulusProducer for device " << device->discoveredDevice.deviceIdentifier << std::endl; // Call base class stop() as the first step StimulusProducer::stop(); // Call ioUringAssemblyEngine stop() as the final step openClCollatingAndMeshingEngine.finalize(); ioUringAssemblyEngine.finalize(); } void produceStimFrameAck(void) { } // Helper function to parse histbuffMs from device attachment spec static int parseHistbuffMs( const std::shared_ptr& spec) { const std::vector histbuffParamNames = { "history-buffer-duration-ms", "hist-buff-duration-ms", "histbuff-duration-ms", "histbuff-ms" }; return device::DeviceAttachmentSpec::parseOptionalParamAsIntWithSynonyms( spec->qualeIfaceApiParams, histbuffParamNames, 30000); } std::shared_ptr PcloudStimulusProducer::getAttachedStimulusBuffer( const std::shared_ptr& spec) const { // Call base class implementation auto buffer = StimulusProducer::getAttachedStimulusBuffer(spec); if (!buffer) { return nullptr; } // Optionally validate/upcast the buffer type matches expected type // based on qualeIfaceApi (for type safety) const std::string& qualeIfaceApi = spec->qualeIfaceApi; if (qualeIfaceApi == "mesh") { if (std::dynamic_pointer_cast(buffer)) { return buffer; } } else if (qualeIfaceApi == "pcloudIntensity") { if (std::dynamic_pointer_cast(buffer)) { return buffer; } } else if (qualeIfaceApi == "pcloudAmbience") { if (std::dynamic_pointer_cast(buffer)) { return buffer; } } // Type mismatch - return nullptr return nullptr; } std::shared_ptr PcloudStimulusProducer::getOrCreateAttachedStimulusBuffer( const std::shared_ptr& deviceAttachmentSpec ) { // Check if buffer already exists (idempotent) auto existingBuffer = getAttachedStimulusBuffer(deviceAttachmentSpec); if (existingBuffer) { return existingBuffer; } // Parse histbuffMs from device attachment spec int histbuffMs = parseHistbuffMs(deviceAttachmentSpec); // Parse qualeIfaceApi to determine buffer type const std::string& qualeIfaceApi = deviceAttachmentSpec->qualeIfaceApi; // Calculate nPointsPerDgram based on return mode size_t nPointsPerDgram = livoxProto1::Device::getNPointsPerDgram( static_cast(device->currentReturnMode)); if (qualeIfaceApi == "mesh") { /* Calculate slotStrideNBytes: * nDgramsPerStagingBufferFrame * nPointsPerDgram * sizeof(float) * 3 */ size_t slotStrideNBytes = this->nDgramsPerStagingBufferFrame * nPointsPerDgram * sizeof(float) * 3; // Reuse openClMeshInputConstraints, only modify slotPadToNBytes openClMeshInputConstraints.slotPadToNBytes = slotStrideNBytes; std::cout << __func__ << ": $$$$$$$ Creating MeshStimulusBuffer" << std::endl; auto meshBuffer = std::make_shared( *this, deviceAttachmentSpec, histbuffMs, openClMeshInputConstraints, openClMeshInputConstraints, *smoHooksPtr, CL_MEM_READ_WRITE); std::cout << __func__ << ": $$$$$$$ Created MeshStimulusBuffer" << std::endl; meshStimulusBuffer = meshBuffer; attachedStimulusBuffers.push_back(meshBuffer); return meshBuffer; } else if (qualeIfaceApi == "pcloudIntensity") { /* Calculate slotStrideNBytes: * nDgramsPerStagingBufferFrame * nPointsPerDgram * sizeof(float) * 1 */ size_t slotStrideNBytes = this->nDgramsPerStagingBufferFrame * nPointsPerDgram * sizeof(float) * 1; // Reuse openClIntensityInputConstraints, only modify slotPadToNBytes openClIntensityInputConstraints.slotPadToNBytes = slotStrideNBytes; std::cout << __func__ << ": $$$$$$$ Creating PcloudIntensityStimulusBuffer" << std::endl; auto intensityBuffer = std::make_shared( *this, deviceAttachmentSpec, histbuffMs, openClIntensityInputConstraints, openClIntensityInputConstraints, *smoHooksPtr, CL_MEM_READ_WRITE); std::cout << __func__ << ": $$$$$$$ Created PcloudIntensityStimulusBuffer" << std::endl; intensityStimulusBuffer = intensityBuffer; attachedStimulusBuffers.push_back(intensityBuffer); return intensityBuffer; } else if (qualeIfaceApi == "pcloudAmbience") { /* Calculate slotStrideNBytes: * nDgramsPerStagingBufferFrame * sizeof(float) */ size_t slotStrideNBytes = this->nDgramsPerStagingBufferFrame * sizeof(float); // Reuse openClAmbienceInputConstraints, only modify slotPadToNBytes openClAmbienceInputConstraints.slotPadToNBytes = slotStrideNBytes; auto ambienceBuffer = std::make_shared( *this, deviceAttachmentSpec, histbuffMs, openClAmbienceInputConstraints, openClAmbienceInputConstraints, *smoHooksPtr, CL_MEM_READ_WRITE); std::cout << __func__ << ": $$$$$$$ Created PcloudAmbienceStimulusBuffer" << std::endl; ambienceStimulusBuffer = ambienceBuffer; attachedStimulusBuffers.push_back(ambienceBuffer); return ambienceBuffer; } else { throw std::runtime_error( "Unsupported qualeIfaceApi: '" + qualeIfaceApi + "' for " "PcloudStimulusProducer. " "Supported values: mesh, pcloudIntensity, pcloudAmbience"); } } void PcloudStimulusProducer::stimFrameProductionTimesliceInd() { produceFrameReq({nullptr, nullptr}); } class PcloudStimulusProducer::ProduceFrameReq : public PostedAsynchronousContinuation { private: PcloudStimulusProducer& pcloudProducer; AsynchronousLoop frameAssemblyResult; StimulusFrame& stimulusFrame; public: ProduceFrameReq( PcloudStimulusProducer& producer, const std::shared_ptr& caller, Callback cb) : PostedAsynchronousContinuation(caller, cb), pcloudProducer(producer), frameAssemblyResult(0), stimulusFrame(producer.tempStimulusFrame) {} public: void callOriginalCallback() { pcloudProducer.allowNextStimulusFrame(); callOriginalCb(); } public: void produceFrameReq1_doAssemble_posted( std::shared_ptr context) { SpinLock::Guard lock(pcloudProducer.shouldContinueLock); if (!pcloudProducer.shouldContinue) { callOriginalCallback(); return; } pcloudProducer.ioUringAssemblyEngine.assembleFrameReq( {context, std::bind( &ProduceFrameReq::produceFrameReq2_assembleDone, context.get(), context, std::placeholders::_1, std::placeholders::_2)}); } void produceFrameReq2_assembleDone( std::shared_ptr context, bool success, AsynchronousLoop loop) { SpinLock::Guard lock(pcloudProducer.shouldContinueLock); if (!pcloudProducer.shouldContinue) { callOriginalCallback(); return; } if (!success) { std::cerr << __func__ << ": Failed to assemble frame" << std::endl; callOriginalCallback(); return; } context->frameAssemblyResult = loop; pcloudProducer.openClCollatingAndMeshingEngine.compactCollateAndMeshFrameReq( loop, stimulusFrame, {context, std::bind( &ProduceFrameReq::produceFrameReq3_compactCollateDone, context.get(), context, std::placeholders::_1, std::placeholders::_2)}); } void produceFrameReq3_compactCollateDone( [[maybe_unused]] std::shared_ptr context, bool success, StimulusFrame& /*stimulusFrame*/) { SpinLock::Guard lock(pcloudProducer.shouldContinueLock); if (!pcloudProducer.shouldContinue) { callOriginalCallback(); return; } if (!success) { std::cerr << __func__ << ": Failed to compact and collate frame" << std::endl; } else { std::cout << __func__ << ": Successfully compacted and collated frame" << std::endl; } // Print kernel execution durations auto compactDuration = pcloudProducer.openClCollatingAndMeshingEngine.getCompactKernelDuration(); auto collateDuration = pcloudProducer.openClCollatingAndMeshingEngine.getCollateKernelDuration(); std::cout << __func__ << ": compactKernelDuration=" << compactDuration.count() << "ms, collateKernelDuration=" << collateDuration.count() << "ms" << std::endl; callOriginalCallback(); } }; void PcloudStimulusProducer::produceFrameReq( smo::Callback callback) { /** EXPLANATION: * We shouldn't acquire the StimulusProducer::shouldContinueLock here because * this function is called from * StimulusProducer::stimFrameProductionTimesliceInd(), which is already * holding the lock. */ auto caller = smoHooksPtr->ComponentThread_getSelf(); auto request = std::make_shared( *this, caller, std::move(callback)); // Post the doAssemble method to the component thread device->componentThread->getIoService().post( STC(std::bind( &ProduceFrameReq::produceFrameReq1_doAssemble_posted, request.get(), request))); } } // namespace stim_buff } // namespace smo