#include #include #include #include #include #include #include #include #include #include "pcloudStimulusProducer.h" #include "frameAssemblyDesc.h" namespace smo { namespace stim_buff { extern const SmoCallbacks* smoHooksPtr; // OpenCL kernels are used to collate and produce our StimFrames. static SpMcRingBuffer::InputEngineConstraints openClInputConstraints( static_cast(sysconf(_SC_PAGE_SIZE)), sizeof(void *)); PcloudStimulusProducer::PcloudStimulusProducer( const std::shared_ptr &deviceAttachmentSpec, std::shared_ptr &device, const PcloudFormatDesc& formatDesc, size_t nDgramsPerStagingBufferFrame) : StimulusProducer( deviceAttachmentSpec, device->componentThread->getIoService()), device(device), formatDesc(formatDesc), openClCollatingAndMeshingEngine(*this), assemblyBuffer( StagingBuffer::IOEngineConstraints::ioUringConstraints, StagingBuffer::IOEngineConstraints::openClInputConstraints, nDgramsPerStagingBufferFrame), ioUringAssemblyEngine(*this, nDgramsPerStagingBufferFrame), collationBuffer( StagingBuffer::IOEngineConstraints::openClInputConstraints, StagingBuffer::IOEngineConstraints::openClInputConstraints, nDgramsPerStagingBufferFrame) { if (smoHooksPtr->OptionParser_getOptions().verbose) { std::cout << __func__ << ": assembly buffer : " << assemblyBuffer.stringify() << "\n\tcollation buffer : " << collationBuffer.stringify() << "\n"; } std::cout << __func__ << ": Device's component thread is " << device->componentThread->name << std::endl; #ifndef CONFIG_WORLD_USE_BODY_THREAD if (smoHooksPtr->ComponentThread_getSelf()->id != ComponentThread::WORLD) #else if (smoHooksPtr->ComponentThread_getSelf()->id != ComponentThread::BODY) #endif { std::string errMsg = std::string(__func__) + ": PcloudStimulusProducer constructor called on non-world/body thread " + smoHooksPtr->ComponentThread_getSelf()->name; std::cout << errMsg << std::endl; // throw std::runtime_error(errMsg); } } void PcloudStimulusProducer::start() { std::cout << __func__ << ": Starting PcloudStimulusProducer for device " << device->discoveredDevice.deviceIdentifier << std::endl; // Call ioUringAssemblyEngine setup() as the first step if (!ioUringAssemblyEngine.setup()) { std::cout <<__func__ <<"Failed to setup() " <<"IOUringAssemblyEngine.\n"; return; } if (!openClCollatingAndMeshingEngine.setup()) { std::cout <<__func__ <<"Failed to setup() " <<"OClCollMeshEngine.\n"; return; } // Call base class start() as the final step StimulusProducer::start(); } void PcloudStimulusProducer::stop() { std::cout << __func__ << ": Stopping PcloudStimulusProducer for device " << device->discoveredDevice.deviceIdentifier << std::endl; // Call base class stop() as the first step StimulusProducer::stop(); // Call ioUringAssemblyEngine stop() as the final step openClCollatingAndMeshingEngine.finalize(); ioUringAssemblyEngine.finalize(); } void produceStimFrameAck(void) { } std::shared_ptr PcloudStimulusProducer::getOrCreateAttachedStimulusBuffer( const std::shared_ptr& deviceAttachmentSpec, int histbuffMs ) { // Check if buffer already exists (idempotent) auto existingBuffer = getAttachedStimulusBuffer(deviceAttachmentSpec); if (existingBuffer) { return existingBuffer; } // Create new PcloudXyzStimulusBuffer (for now, always use XYZ type) auto buffer = std::make_shared( *this, deviceAttachmentSpec, histbuffMs, openClInputConstraints); // Add to collection attachedStimulusBuffers.push_back(buffer); // Update specialized member xyzStimulusBuffer = buffer; return buffer; } void PcloudStimulusProducer::stimFrameProductionTimesliceInd() { produceFrameReq({nullptr, nullptr}); } class PcloudStimulusProducer::ProduceFrameReq : public PostedAsynchronousContinuation { private: PcloudStimulusProducer& pcloudProducer; AsynchronousLoop frameAssemblyResult; StimulusFrame& stimulusFrame; public: ProduceFrameReq( PcloudStimulusProducer& producer, const std::shared_ptr& caller, Callback cb) : PostedAsynchronousContinuation(caller, cb), pcloudProducer(producer), frameAssemblyResult(0), stimulusFrame(producer.tempStimulusFrame) {} public: void callOriginalCallback() { pcloudProducer.allowNextStimulusFrame(); callOriginalCb(); } public: void produceFrameReq1_doAssemble_posted( std::shared_ptr context) { SpinLock::Guard lock(pcloudProducer.shouldContinueLock); if (!pcloudProducer.shouldContinue) { callOriginalCallback(); return; } pcloudProducer.ioUringAssemblyEngine.assembleFrameReq( {context, std::bind( &ProduceFrameReq::produceFrameReq2_assembleDone, context.get(), context, std::placeholders::_1, std::placeholders::_2)}); } void produceFrameReq2_assembleDone( std::shared_ptr context, bool success, AsynchronousLoop loop) { SpinLock::Guard lock(pcloudProducer.shouldContinueLock); if (!pcloudProducer.shouldContinue) { callOriginalCallback(); return; } if (!success) { std::cerr << __func__ << ": Failed to assemble frame" << std::endl; callOriginalCallback(); return; } context->frameAssemblyResult = loop; pcloudProducer.openClCollatingAndMeshingEngine.compactCollateAndMeshFrameReq( loop, stimulusFrame, {context, std::bind( &ProduceFrameReq::produceFrameReq3_compactCollateDone, context.get(), context, std::placeholders::_1, std::placeholders::_2)}); } void produceFrameReq3_compactCollateDone( [[maybe_unused]] std::shared_ptr context, bool success, StimulusFrame& /*stimulusFrame*/) { SpinLock::Guard lock(pcloudProducer.shouldContinueLock); if (!pcloudProducer.shouldContinue) { callOriginalCallback(); return; } if (!success) { std::cerr << __func__ << ": Failed to compact and collate frame" << std::endl; } else { std::cout << __func__ << ": Successfully compacted and collated frame" << std::endl; } // Print kernel execution durations auto compactDuration = pcloudProducer.openClCollatingAndMeshingEngine.getCompactKernelDuration(); auto collateDuration = pcloudProducer.openClCollatingAndMeshingEngine.getCollateKernelDuration(); std::cout << __func__ << ": compactKernelDuration=" << compactDuration.count() << "ms, collateKernelDuration=" << collateDuration.count() << "ms" << std::endl; callOriginalCallback(); } }; void PcloudStimulusProducer::produceFrameReq( smo::Callback callback) { /** EXPLANATION: * We shouldn't acquire the StimulusProducer::shouldContinueLock here because * this function is called from * StimulusProducer::stimFrameProductionTimesliceInd(), which is already * holding the lock. */ auto caller = smoHooksPtr->ComponentThread_getSelf(); auto request = std::make_shared( *this, caller, std::move(callback)); // Post the doAssemble method to the component thread device->componentThread->getIoService().post( STC(std::bind( &ProduceFrameReq::produceFrameReq1_doAssemble_posted, request.get(), request))); } } // namespace stim_buff } // namespace smo