#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "livoxGen1.h" #include "openClCollatingAndMeshingEngine.h" #include "pcloudStimulusProducer.h" #include "openClKernels.h" #include #include "ioUringAssemblyEngine.h" #include extern const smo::stim_buff::SmoCallbacks* smoHooksPtr; namespace smo { namespace stim_buff { #ifndef SMO_PRINT_PCLOUD_COLLATE_RESULTS #define SMO_PRINT_PCLOUD_COLLATE_RESULTS 1 #endif OpenClCollatingAndMeshingEngine::OpenClCollatingAndMeshingEngine( PcloudStimulusProducer& parent_) : parent(parent_), computeDevice(nullptr), clAssemblyBufferClBuffer(nullptr), clCollationBufferClBuffer(nullptr), clAverageIntensityBufferClBuffer(nullptr), clAssemblyBuffer(nullptr), clCollationBuffer(nullptr), clAverageIntensityBuffer(nullptr), compactIsRunning(false), collateIsRunning(false), currentCompactKernelEvent(nullptr), currentCollateKernelEvent(nullptr), assemblyBufferPtr(nullptr), assemblyBufferSize(0), collationBufferPtr(nullptr), collationBufferSize(0), averageIntensityBufferPtr(nullptr), averageIntensityBufferSize(0), mappedAssemblyBuffer(nullptr), mappedCollationBuffer(nullptr), mappedAverageIntensityBuffer(nullptr), frameAssemblyDesc(nullptr) { } OpenClCollatingAndMeshingEngine::~OpenClCollatingAndMeshingEngine() { finalize(); } bool OpenClCollatingAndMeshingEngine::setup() { // Defensive check to prevent double-calling if (!openClCollMeshEngnCanceler.isCancellationRequested()) { throw std::runtime_error(std::string(__func__) + ": setup() called " "while already set up"); } if (!smoHooksPtr || !smoHooksPtr->ComputeManager_getDevice) { std::cerr << __func__ << ": smo hooks not available" << std::endl; return false; } // Get ComputeDevice from smo hooks auto wip_computeDevice = smoHooksPtr->ComputeManager_getDevice(); if (!wip_computeDevice) { std::cerr << __func__ << ": failed to get compute device" << std::endl; return false; } // Get StagingBuffer memory pointers from parent struct iovec assemblyIov = parent.assemblyBuffer.getClEngineIovec(); struct iovec collationIov = parent.collationBuffer.getClEngineIovec(); struct iovec averageIntensityIov = parent.averageIntensityBuffer .getClEngineIovec(); assemblyBufferPtr = assemblyIov.iov_base; assemblyBufferSize = assemblyIov.iov_len; collationBufferPtr = collationIov.iov_base; collationBufferSize = collationIov.iov_len; averageIntensityBufferPtr = averageIntensityIov.iov_base; averageIntensityBufferSize = averageIntensityIov.iov_len; // Get FrameAssemblyDesc from assembly buffer frameAssemblyDesc = static_cast>( parent.assemblyBuffer); if (!frameAssemblyDesc || frameAssemblyDesc->slots.empty()) { std::cerr << __func__ << ": invalid frame descriptor" << std::endl; return false; } // Create OpenCL buffers using smo hooks if (!smoHooksPtr->ComputeManager_createUseHostPtrBuffer) { std::cerr << __func__ << ": createUseHostPtrBuffer hook not available" << std::endl; return false; } auto wip_clAssemblyBufferClBuffer = smoHooksPtr ->ComputeManager_createUseHostPtrBuffer( assemblyBufferPtr, assemblyBufferSize, CL_MEM_READ_WRITE); if (!wip_clAssemblyBufferClBuffer) { std::cerr << __func__ << ": failed to create assembly buffer" << std::endl; return false; } auto wip_clCollationBufferClBuffer = smoHooksPtr ->ComputeManager_createUseHostPtrBuffer( collationBufferPtr, collationBufferSize, CL_MEM_WRITE_ONLY); if (!wip_clCollationBufferClBuffer) { std::cerr << __func__ << ": failed to create collation buffer" << std::endl; return false; } /* CL_MEM_WRITE_ONLY describes *kernel* access: the collate kernel only * writes per-slot averages, never reads them. Host-side reads in * produceAmbienceStimulusFrame go through clEnqueueMapBuffer(CL_MAP_READ) * which is independent of this flag. */ auto wip_clAverageIntensityBufferClBuffer = smoHooksPtr ->ComputeManager_createUseHostPtrBuffer( averageIntensityBufferPtr, averageIntensityBufferSize, CL_MEM_WRITE_ONLY); if (!wip_clAverageIntensityBufferClBuffer) { std::cerr << __func__ << ": failed to create average intensity buffer" << std::endl; return false; } // Cache cl_mem handles for the device we're using cl_mem wip_clAssemblyBuffer = wip_clAssemblyBufferClBuffer ->getAssociatedBufferHandleForDevice(wip_computeDevice); cl_mem wip_clCollationBuffer = wip_clCollationBufferClBuffer ->getAssociatedBufferHandleForDevice(wip_computeDevice); cl_mem wip_clAverageIntensityBuffer = wip_clAverageIntensityBufferClBuffer ->getAssociatedBufferHandleForDevice(wip_computeDevice); if (!wip_clAssemblyBuffer || !wip_clCollationBuffer || !wip_clAverageIntensityBuffer) { std::cerr << __func__ << ": failed to get buffer handles for device" << std::endl; return false; } // Compile and prepare both kernels ClProgramPtr wip_slotCompactorProgram; ClProgramPtr wip_collateProgram; ClKernelPtr wip_slotCompactorKernel; ClKernelPtr wip_collateKernel; if (!compileAndPrepareKernels( wip_computeDevice, wip_slotCompactorProgram, wip_collateProgram, wip_slotCompactorKernel, wip_collateKernel)) { return false; } // All operations succeeded - assign to member variables computeDevice = wip_computeDevice; clAssemblyBufferClBuffer = wip_clAssemblyBufferClBuffer; clCollationBufferClBuffer = wip_clCollationBufferClBuffer; clAverageIntensityBufferClBuffer = wip_clAverageIntensityBufferClBuffer; clAssemblyBuffer = wip_clAssemblyBuffer; clCollationBuffer = wip_clCollationBuffer; clAverageIntensityBuffer = wip_clAverageIntensityBuffer; slotCompactorProgram = std::move(wip_slotCompactorProgram); collateProgram = std::move(wip_collateProgram); slotCompactorKernel = std::move(wip_slotCompactorKernel); collateKernel = std::move(wip_collateKernel); clFlush(computeDevice->commandQueue); clFinish(computeDevice->commandQueue); openClCollMeshEngnCanceler.startAcceptingWork(); return true; } void OpenClCollatingAndMeshingEngine::finalize() { // Call stop() to clear shouldContinue and get previous state bool wasAcceptingRequests = stop(); (void)wasAcceptingRequests; // Complete any running kernels if (compactIsRunning) { compactKernelComplete(true); } if (collateIsRunning) { collateKernelComplete(std::nullopt, false, true); } { /** EXPLANATION: * Calculate the delay as the maximum of the configured delay and any * future delays. The 0 is a placeholder for any delays that will be * introduced in the future. When new delays are added, they should be * included in the std::max() call (e.g., std::max( * OCLCOLLMESH_ENGN_FINALIZE_DELAY_MS, futureDelay1, futureDelay2, 0)). */ int delayMs = std::max(OCLCOLLMESH_ENGN_FINALIZE_DELAY_MS, 0); auto& ioContext = smoHooksPtr->ComponentThread_getSelf()->getIoContext(); sscl::cps::AsynchronousBridge bridge(ioContext); boost::asio::deadline_timer timeoutTimer(ioContext); /** EXPLANATION: * We wait for delayMs milliseconds to ensure that any in-flight OpenCL * kernel operations have definitely finished. OpenCL kernels cannot be * cancelled once enqueued, so in-flight kernels may still be executing * when finalize() is called. The delay ensures any running kernels * complete and their callbacks execute before we destroy resources. * This prevents use-after-free errors from resumed async continuations * accessing destroyed state. */ timeoutTimer.expires_from_now( boost::posix_time::milliseconds(delayMs)); timeoutTimer.async_wait( [&bridge](const boost::system::error_code& error) { (void)error; // Always signal complete, whether timeout expired or was cancelled bridge.setAsyncOperationComplete(); }); bridge.waitForAsyncOperationCompleteOrIoContextStopped(); } // Release OpenCL buffers via smo hooks if (smoHooksPtr && smoHooksPtr->ComputeManager_releaseUseHostPtrBuffer) { if (clAverageIntensityBufferClBuffer) { smoHooksPtr->ComputeManager_releaseUseHostPtrBuffer( clAverageIntensityBufferClBuffer); clAverageIntensityBufferClBuffer.reset(); } if (clCollationBufferClBuffer) { smoHooksPtr->ComputeManager_releaseUseHostPtrBuffer( clCollationBufferClBuffer); clCollationBufferClBuffer.reset(); } if (clAssemblyBufferClBuffer) { smoHooksPtr->ComputeManager_releaseUseHostPtrBuffer( clAssemblyBufferClBuffer); clAssemblyBufferClBuffer.reset(); } } // Reset cached cl_mem handles clAverageIntensityBuffer = nullptr; clCollationBuffer = nullptr; clAssemblyBuffer = nullptr; // Release kernels and programs (handled automatically by unique_ptr destructors) slotCompactorKernel.reset(); collateKernel.reset(); slotCompactorProgram.reset(); collateProgram.reset(); // Release compute device via smo hooks if (smoHooksPtr && smoHooksPtr->ComputeManager_releaseDevice && computeDevice) { smoHooksPtr->ComputeManager_releaseDevice(computeDevice); computeDevice.reset(); } // Reset state variables compactIsRunning = false; collateIsRunning = false; currentCompactKernelEvent = nullptr; currentCollateKernelEvent = nullptr; assemblyBufferPtr = nullptr; assemblyBufferSize = 0; collationBufferPtr = nullptr; collationBufferSize = 0; averageIntensityBufferPtr = nullptr; averageIntensityBufferSize = 0; frameAssemblyDesc = nullptr; } // Static callback for compact kernel event void CL_CALLBACK OpenClCollatingAndMeshingEngine::compactKernelEventCallback( cl_event /*event*/, cl_int event_command_exec_status, void* user_data) { OpenClCollatingAndMeshingEngine* engine = static_cast(user_data); if (!engine || !engine->compactKernelCb) { return; } // Post to io_context to call callback on the correct thread if (engine->parent.device && engine->parent.device->componentThread) { boost::asio::post(engine->parent.device->componentThread->getIoContext(), std::bind(engine->compactKernelCb, event_command_exec_status)); } } // Static callback for collate kernel event void CL_CALLBACK OpenClCollatingAndMeshingEngine::collateKernelEventCallback( cl_event /*event*/, cl_int event_command_exec_status, void* user_data) { OpenClCollatingAndMeshingEngine* engine = static_cast(user_data); if (!engine || !engine->collateKernelCb) { return; } // Post to io_context to call callback on the correct thread if (engine->parent.device && engine->parent.device->componentThread) { boost::asio::post(engine->parent.device->componentThread->getIoContext(), std::bind(engine->collateKernelCb, event_command_exec_status)); } } bool OpenClCollatingAndMeshingEngine::startCompactKernel( StagingBuffer& assemblyBuff, uint32_t nSucceeded, compactKernelCbFn callback) { // Store the caller's callback compactKernelCb = std::move(callback); // Validate buffers callable auto validateBuffers = [this, &assemblyBuff]() { struct iovec assemblyIov = assemblyBuff.getClEngineIovec(); if (assemblyIov.iov_base != assemblyBufferPtr || assemblyIov.iov_len != assemblyBufferSize) { throw std::runtime_error( std::string(__func__) + ": buffer mismatch - buffers have " "changed"); } }; // Setup args callable auto setupArgs = [this, &assemblyBuff, nSucceeded]() { return setupSlotCompactorsArgs(assemblyBuff, nSucceeded); }; /** EXPLANAITON: * Map assembly buffer as WRITE_INVALIDATE_REGION to inform OpenCL that host * (io_uring) has written data, and that the host doesn't care what the * prior contents of the device's cache see. The device must invalidate * its own view of the HOST_PTR and accept our view. * * Then immediately unmap to let OpenCL make the changes visible to the GPU */ if (!mapAssemblyBuffer(CL_MAP_WRITE_INVALIDATE_REGION)) { std::cerr << __func__ << ": failed to map assembly buffer" << std::endl; return false; } // Unmap immediately to sync host writes to GPU unmapAssemblyBuffer(); bool success = startKernel( slotCompactorKernel.get(), ¤tCompactKernelEvent, setupArgs, validateBuffers, 1, // globalWorkSize compactKernelEventCallback, "slotCompactor", compactIsRunning); if (!success) { return false; } return true; } bool OpenClCollatingAndMeshingEngine::startCollateKernel( std::optional> intensityStimFrame, bool anyAmbienceAttached, collateKernelCbFn callback) { // Store the caller's callback collateKernelCb = std::move(callback); // Validate buffers callable auto validateBuffers = [this]() { struct iovec assemblyIov = parent.assemblyBuffer.getClEngineIovec(); struct iovec collationIov = parent.collationBuffer.getClEngineIovec(); struct iovec averageIntensityIov = parent.averageIntensityBuffer .getClEngineIovec(); if (assemblyIov.iov_base != assemblyBufferPtr || assemblyIov.iov_len != assemblyBufferSize || collationIov.iov_base != collationBufferPtr || collationIov.iov_len != collationBufferSize || averageIntensityIov.iov_base != averageIntensityBufferPtr || averageIntensityIov.iov_len != averageIntensityBufferSize) { throw std::runtime_error( std::string(__func__) + ": buffer mismatch - buffers have changed"); } }; // Setup args callable auto setupArgs = [this, intensityStimFrame, anyAmbienceAttached]() { return setupCollateDgramsArgs(intensityStimFrame, anyAmbienceAttached); }; /** EXPLANATION: * It shouldn't be necessary to map the assembly/collation buffers here * since we don't need to read/write them on the host CPUs (unless we're * intervening to debug; in which case we should map them as CL_MAP_READ). * * Otherwise, the foreign GPU's view of the data in the assembly buffer * is currently up to date; and the collation buffer's state is undefined... * and also irrelevant since it's only going to be used for output anyway. */ if (!mapAssemblyBuffer(CL_MAP_WRITE_INVALIDATE_REGION)) { std::cerr << __func__ << ": failed to map assembly buffer" << std::endl; return false; } unmapAssemblyBuffer(); if (!mapCollationBuffer(CL_MAP_WRITE)) { std::cerr << __func__ << ": failed to map assembly buffer" << std::endl; return false; } unmapCollationBuffer(); // Map/unmap intensity buffer if it exists if (intensityStimFrame.has_value()) { StimulusFrame& intensityFrame = intensityStimFrame->get(); cl_mem intensityClBuffer = intensityFrame.clBuffer ->getAssociatedBufferHandleForDevice(computeDevice); if (intensityClBuffer) { void* mappedIntensityBuffer = nullptr; if (!mapBuffer( intensityClBuffer, intensityFrame.slotDesc.nBytes, CL_MAP_WRITE_INVALIDATE_REGION, mappedIntensityBuffer)) { std::cerr << __func__ << ": failed to map intensity buffer" << std::endl; return false; } unmapBuffer(intensityClBuffer, mappedIntensityBuffer); } } // Map/unmap average intensity staging buffer (collate writes per-slot // averages here when any ambience stimbuff is attached). if (anyAmbienceAttached) { if (!mapAverageIntensityBuffer(CL_MAP_WRITE_INVALIDATE_REGION)) { std::cerr << __func__ << ": failed to map average intensity buffer" << std::endl; return false; } unmapAverageIntensityBuffer(); } // Calculate global work size (just num slots in the frame) size_t globalWorkSize = static_cast(frameAssemblyDesc->numSlots); bool success = startKernel( collateKernel.get(), ¤tCollateKernelEvent, setupArgs, validateBuffers, globalWorkSize, collateKernelEventCallback, "collateDgrams", collateIsRunning); if (!success) { return false; } return true; } bool OpenClCollatingAndMeshingEngine::compileAndPrepareKernel( const std::shared_ptr& computeDevice, const char* kernelSource, size_t kernelSourceLen, const char* kernelName, ClProgramPtr& program, ClKernelPtr& kernel) { cl_int err; // Create program from source cl_program rawProgram = clCreateProgramWithSource( computeDevice->context, 1, &kernelSource, &kernelSourceLen, &err); if (err != CL_SUCCESS || !rawProgram) { std::cerr << __func__ << ": failed to create " << kernelName << " program: " << err << std::endl; return false; } // Wrap in unique_ptr program = ClProgramPtr(rawProgram); // Build program err = clBuildProgram(program.get(), 1, &computeDevice->device, nullptr, nullptr, nullptr); if (err != CL_SUCCESS) { std::cerr << __func__ << ": failed to build " << kernelName << " program: " << err << std::endl; // Print build log if available size_t logSize = 0; clGetProgramBuildInfo( program.get(), computeDevice->device, CL_PROGRAM_BUILD_LOG, 0, nullptr, &logSize); if (logSize > 0) { std::vector log(logSize); clGetProgramBuildInfo( program.get(), computeDevice->device, CL_PROGRAM_BUILD_LOG, logSize, log.data(), nullptr); std::cerr << kernelName << " build log: " << log.data() << std::endl; } return false; } // Create kernel cl_kernel rawKernel = clCreateKernel(program.get(), kernelName, &err); if (err != CL_SUCCESS || !rawKernel) { std::cerr << __func__ << ": failed to create " << kernelName << " kernel: " << err << std::endl; return false; } // Wrap in unique_ptr kernel = ClKernelPtr(rawKernel); return true; } bool OpenClCollatingAndMeshingEngine::compileAndPrepareKernels( const std::shared_ptr& computeDevice, ClProgramPtr& slotCompactorProgram, ClProgramPtr& collateProgram, ClKernelPtr& slotCompactorKernel, ClKernelPtr& collateKernel) { // Compile slotCompactor kernel if (!compileAndPrepareKernel( computeDevice, slotCompactorKernelStart, slotCompactorKernelNBytes, "slotCompactor", slotCompactorProgram, slotCompactorKernel)) { return false; } // Compile collateDgrams kernel if (!compileAndPrepareKernel( computeDevice, collateKernelStart, collateKernelNBytes, "collate", collateProgram, collateKernel)) { return false; } return true; } bool OpenClCollatingAndMeshingEngine::setupSlotCompactorsArgs( StagingBuffer& assemblyBuff, uint32_t nSucceeded) { // Extract parameters for slotCompactor kernel uint32_t numSlots = static_cast(frameAssemblyDesc->numSlots); uint32_t slotStride = static_cast(assemblyBuff.slotStrideNBytes); uint32_t slotSize = static_cast(frameAssemblyDesc->slotSizeBytes); uint32_t nSucceededUint = static_cast(nSucceeded); // Set kernel arguments for slotCompactor cl_int err; err = clSetKernelArg( slotCompactorKernel.get(), 0, sizeof(cl_mem), &clAssemblyBuffer); if (err != CL_SUCCESS) { std::cerr << __func__ << ": failed to set kernel arg 0: " << err << std::endl; return false; } err = clSetKernelArg( slotCompactorKernel.get(), 1, sizeof(uint32_t), &numSlots); if (err != CL_SUCCESS) { std::cerr << __func__ << ": failed to set kernel arg 1: " << err << std::endl; return false; } err = clSetKernelArg( slotCompactorKernel.get(), 2, sizeof(uint32_t), &slotStride); if (err != CL_SUCCESS) { std::cerr << __func__ << ": failed to set kernel arg 2: " << err << std::endl; return false; } err = clSetKernelArg( slotCompactorKernel.get(), 3, sizeof(uint32_t), &slotSize); if (err != CL_SUCCESS) { std::cerr << __func__ << ": failed to set kernel arg 3: " << err << std::endl; return false; } err = clSetKernelArg( slotCompactorKernel.get(), 4, sizeof(uint32_t), &nSucceededUint); if (err != CL_SUCCESS) { std::cerr << __func__ << ": failed to set kernel arg 4: " << err << std::endl; return false; } return true; } bool OpenClCollatingAndMeshingEngine::setupCollateDgramsArgs( std::optional> intensityStimFrame, bool anyAmbienceAttached) { // Extract parameters for collateDgrams kernel uint32_t slotStride = static_cast( parent.assemblyBuffer.slotStrideNBytes); // Calculate nPointsPerSlot from device return mode if (!parent.device) { std::cerr << __func__ << ": device not available" << std::endl; return false; } int returnMode = static_cast(parent.device->currentReturnMode); uint32_t nPointsPerSlot = static_cast( livoxProto1::Device::getNPointsPerDgram(returnMode)); uint32_t nDgramsPerFrame = static_cast( frameAssemblyDesc->numSlots); // Set kernel arguments for collateDgrams cl_int err; err = clSetKernelArg( collateKernel.get(), 0, sizeof(cl_mem), &clAssemblyBuffer); if (err != CL_SUCCESS) { std::cerr << __func__ << ": failed to set kernel arg 0: " << err << std::endl; return false; } err = clSetKernelArg( collateKernel.get(), 1, sizeof(cl_mem), &clCollationBuffer); if (err != CL_SUCCESS) { std::cerr << __func__ << ": failed to set kernel arg 1: " << err << std::endl; return false; } // Set intensity buffer argument (arg 2) cl_mem intensityClBuffer = nullptr; if (intensityStimFrame.has_value()) { intensityClBuffer = intensityStimFrame->get().clBuffer ->getAssociatedBufferHandleForDevice(computeDevice); } err = clSetKernelArg( collateKernel.get(), 2, sizeof(cl_mem), &intensityClBuffer); if (err != CL_SUCCESS) { std::cerr << __func__ << ": failed to set kernel arg 2: " << err << std::endl; return false; } // Set ambience buffer argument (arg 3): per-slot average intensity // staging buffer. Set when any ambience stimbuff is attached. cl_mem averageIntensityClBufferArg = anyAmbienceAttached ? clAverageIntensityBuffer : nullptr; const size_t needBytes = static_cast(nDgramsPerFrame) * sizeof(float); if (anyAmbienceAttached && averageIntensityBufferSize < needBytes) { std::cerr << __func__ << ": average intensity buffer too small: " << averageIntensityBufferSize << " < " << needBytes << std::endl; return false; } err = clSetKernelArg( collateKernel.get(), 3, sizeof(cl_mem), &averageIntensityClBufferArg); if (err != CL_SUCCESS) { std::cerr << __func__ << ": failed to set kernel arg 3: " << err << std::endl; return false; } err = clSetKernelArg(collateKernel.get(), 4, sizeof(uint32_t), &slotStride); if (err != CL_SUCCESS) { std::cerr << __func__ << ": failed to set kernel arg 4: " << err << std::endl; return false; } err = clSetKernelArg( collateKernel.get(), 5, sizeof(uint32_t), &nPointsPerSlot); if (err != CL_SUCCESS) { std::cerr << __func__ << ": failed to set kernel arg 5: " << err << std::endl; return false; } err = clSetKernelArg( collateKernel.get(), 6, sizeof(uint32_t), &nDgramsPerFrame); if (err != CL_SUCCESS) { std::cerr << __func__ << ": failed to set kernel arg 6: " << err << std::endl; return false; } return true; } bool OpenClCollatingAndMeshingEngine::stop() { return openClCollMeshEngnCanceler.requestStop(); } void OpenClCollatingAndMeshingEngine::compactKernelComplete(bool isFinalizing) { cl_map_flags mapFlags; /** EXPLANATION: * Technically we should only need to do this if we plan to read the * compacted slots for debugging purposes. Otherwise this is unnecessary. */ if (isFinalizing) { mapFlags = CL_MAP_WRITE_INVALIDATE_REGION; } else { mapFlags = CL_MAP_READ; } if (mapAssemblyBuffer(mapFlags)) { unmapAssemblyBuffer(); } clFlush(computeDevice->commandQueue); // Stop only compact kernel if (compactIsRunning && currentCompactKernelEvent) { clWaitForEvents(1, ¤tCompactKernelEvent); clReleaseEvent(currentCompactKernelEvent); currentCompactKernelEvent = nullptr; } clFinish(computeDevice->commandQueue); compactKernelCb = [](cl_int){}; compactIsRunning = false; } void OpenClCollatingAndMeshingEngine::collateKernelComplete( std::optional> intensityStimFrame, bool anyAmbienceAttached, bool isFinalizing) { cl_map_flags mapFlags; /** EXPLANATION: * Technically we should only need to do this if we plan to read the * collated dgrams for debugging purposes. Otherwise this is unnecessary. */ if (isFinalizing) { mapFlags = CL_MAP_WRITE_INVALIDATE_REGION; } else { mapFlags = CL_MAP_READ; } if (mapCollationBuffer(mapFlags)) { unmapCollationBuffer(); } // Map/unmap intensity buffer if it exists if (intensityStimFrame.has_value()) { StimulusFrame& intensityFrame = intensityStimFrame->get(); cl_mem intensityClBuffer = intensityFrame.clBuffer ->getAssociatedBufferHandleForDevice(computeDevice); if (intensityClBuffer) { void* mappedIntensityBuffer = nullptr; if (mapBuffer( intensityClBuffer, intensityFrame.slotDesc.nBytes, CL_MAP_READ, mappedIntensityBuffer)) { unmapBuffer(intensityClBuffer, mappedIntensityBuffer); } } } // Sync GPU writes into average intensity staging buffer host backing // store so attached ambience stimbuffs can read the per-slot averages. if (anyAmbienceAttached) { if (mapAverageIntensityBuffer(mapFlags)) { unmapAverageIntensityBuffer(); } } clFlush(computeDevice->commandQueue); // Stop only collate kernel if (collateIsRunning && currentCollateKernelEvent) { clWaitForEvents(1, ¤tCollateKernelEvent); clReleaseEvent(currentCollateKernelEvent); currentCollateKernelEvent = nullptr; } clFinish(computeDevice->commandQueue); collateKernelCb = [](cl_int){}; collateIsRunning = false; } bool OpenClCollatingAndMeshingEngine::mapBuffer( cl_mem buffer, size_t size, cl_map_flags mapFlags, void*& mappedPtr) { if (!computeDevice->commandQueue || !buffer) { std::cerr << __func__ << ": engine not set up or invalid buffer" << std::endl; return false; } // If already mapped, return early with success. if (mappedPtr != nullptr) { return true; } cl_int err; mappedPtr = clEnqueueMapBuffer( computeDevice->commandQueue, buffer, CL_TRUE, mapFlags, 0, size, 0, nullptr, nullptr, &err); if (err != CL_SUCCESS || !mappedPtr) { std::cerr << __func__ << ": failed to map buffer: " << err << std::endl; goto cleanup; } return true; cleanup: mappedPtr = nullptr; return false; } bool OpenClCollatingAndMeshingEngine::unmapBuffer( cl_mem buffer, void*& mappedPtr ) { if (mappedPtr == nullptr) { // Already unmapped return true; } if (!computeDevice->commandQueue || !buffer) { std::cerr << __func__ << ": engine not set up or invalid buffer.\n"; return false; } cl_int err; cl_event rawUnmapEvent = nullptr; err = clEnqueueUnmapMemObject( computeDevice->commandQueue, buffer, mappedPtr, 0, nullptr, &rawUnmapEvent); if (err != CL_SUCCESS) { std::cerr << __func__ << ": failed to unmap buffer: " << err << std::endl; return false; } // Wrap event in unique_ptr for automatic cleanup ClEventPtr unmapEvent(rawUnmapEvent); // Wait for unmap to complete and release the event cl_event eventPtr = unmapEvent.get(); err = clWaitForEvents(1, &eventPtr); if (err != CL_SUCCESS) { std::cerr << __func__ << ": failed to wait for unmap event: " << err << std::endl; return false; } mappedPtr = nullptr; return true; } bool OpenClCollatingAndMeshingEngine::mapAssemblyBuffer(cl_map_flags mapFlags) { return mapBuffer( clAssemblyBuffer, assemblyBufferSize, mapFlags, mappedAssemblyBuffer); } bool OpenClCollatingAndMeshingEngine::unmapAssemblyBuffer() { unmapBuffer(clAssemblyBuffer, mappedAssemblyBuffer); mappedAssemblyBuffer = nullptr; return true; } bool OpenClCollatingAndMeshingEngine::mapCollationBuffer(cl_map_flags mapFlags) { return mapBuffer( clCollationBuffer, collationBufferSize, mapFlags, mappedCollationBuffer); } bool OpenClCollatingAndMeshingEngine::unmapCollationBuffer() { unmapBuffer(clCollationBuffer, mappedCollationBuffer); mappedCollationBuffer = nullptr; return true; } bool OpenClCollatingAndMeshingEngine::mapAverageIntensityBuffer( cl_map_flags mapFlags) { return mapBuffer( clAverageIntensityBuffer, averageIntensityBufferSize, mapFlags, mappedAverageIntensityBuffer); } bool OpenClCollatingAndMeshingEngine::unmapAverageIntensityBuffer() { unmapBuffer(clAverageIntensityBuffer, mappedAverageIntensityBuffer); mappedAverageIntensityBuffer = nullptr; return true; } void OpenClCollatingAndMeshingEngine::produceAmbienceStimulusFrame( StimulusFrame& ambienceFrame, const ParamComparator& comparator, uint32_t nSucceeded) { const float* averages = static_cast(averageIntensityBufferPtr); uint32_t passbandCount = 0; for (uint32_t i = 0; i < nSucceeded; ++i) { const float& average = averages[i]; if (comparator(average)) { ++passbandCount; } } uint32_t& passbandCountOut = *reinterpret_cast(ambienceFrame.slotDesc.vaddr); passbandCountOut = passbandCount; } sscl::co::ViralNonPostingInvoker OpenClCollatingAndMeshingEngine::compactCollateAndMeshFrameCReq( sscl::AsynchronousLoop& frameAssemblyResult, StimulusFrame& /*stimulusFrame*/, std::optional> intensityStimFrame, std::optional lightAmbienceProductionDesc, std::optional darkAmbienceProductionDesc) { { sscl::SpinLock::Guard guard(openClCollMeshEngnCanceler.s.lock); if (openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) { co_return false; } } auto& resumeIoContext = parent.device->componentThread->getIoContext(); bool needsCompaction = IoUringAssemblyEngine::compactionIsNeeded( frameAssemblyResult.nSucceeded.load(), frameAssemblyResult.nTotal); bool anyAmbienceAttached = lightAmbienceProductionDesc.has_value() || darkAmbienceProductionDesc.has_value(); if (needsCompaction) { // compactCollateAndMeshFrameReq1_doCompact_posted { sscl::SpinLock::Guard guard(openClCollMeshEngnCanceler.s.lock); if (openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) { co_return false; } // Record compact kernel start time compactKernelStartTime = std::chrono::high_resolution_clock::now(); } cl_int compactStatus = co_await openclBoundary::getClKernelCompletionAReqAwaiter( resumeIoContext, [this, &frameAssemblyResult](std::function completionCb) { bool success = startCompactKernel( parent.assemblyBuffer, static_cast( frameAssemblyResult.nSucceeded.load()), std::move(completionCb)); if (!success) { compactKernelComplete(); completionCb(CL_INVALID_OPERATION); } }); if (compactStatus == CL_INVALID_OPERATION) { co_return false; } // compactCollateAndMeshFrameReq2_compactDone_posted { sscl::SpinLock::Guard guard(openClCollMeshEngnCanceler.s.lock); if (openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) { /** EXPLANATION: * We intentionally don't call compactKernelComplete() here because * if shouldAcceptRequests is false, then the caller that called * finalize() will also be forced to call compactKernelComplete() * inside of finalize(). */ co_return false; } compactKernelComplete(); // Record compact kernel end time compactKernelEndTime = std::chrono::high_resolution_clock::now(); // If compact failed, call callback directly with failure if (compactStatus != CL_SUCCESS) { co_return false; } #if 0 // Print first 4 bytes of each slot if (frameAssemblyDesc) { for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i) { parent.ioUringAssemblyEngine.printSlotBytes(i, 4); } } #endif } } // compactCollateAndMeshFrameReq3_doCollate_posted { sscl::SpinLock::Guard guard(openClCollMeshEngnCanceler.s.lock); if (openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) { co_return false; } // Record collate kernel start time collateKernelStartTime = std::chrono::high_resolution_clock::now(); } cl_int collateStatus = co_await openclBoundary::getClKernelCompletionAReqAwaiter( resumeIoContext, [this, intensityStimFrame, anyAmbienceAttached]( std::function completionCb) { bool success = startCollateKernel( intensityStimFrame, anyAmbienceAttached, std::move(completionCb)); if (!success) { collateKernelComplete( intensityStimFrame, anyAmbienceAttached); completionCb(CL_INVALID_OPERATION); } }); if (collateStatus == CL_INVALID_OPERATION) { co_return false; } // compactCollateAndMeshFrameReq4_collateDone_maybePosted { sscl::SpinLock::Guard guard(openClCollMeshEngnCanceler.s.lock); if (openClCollMeshEngnCanceler.isCancellationRequestedUnlocked()) { /* We intentionally don't call collateKernelComplete() here for the * same reason as above. */ co_return false; } /** EXPLANATION: * The reason we don't call collateKernelComplete before checking * shouldAcceptRequests is because if shouldAcceptRequests is false, then * we shouldn't touch any of the collate cycle state...or any state within * the engine at all, since finalize() may well have been called. * * Therefore it's finalize()'s responsibility to ensure that it properly * completes/cleans up any in-flight operations. */ collateKernelComplete( intensityStimFrame, anyAmbienceAttached); // Produce each attached ambience stimbuff's passband count from // the per-slot averages the collate kernel staged. uint32_t nSucceededForAmbience = frameAssemblyResult.nSucceeded.load(); if (lightAmbienceProductionDesc.has_value()) { produceAmbienceStimulusFrame( lightAmbienceProductionDesc->frame.get(), lightAmbienceProductionDesc->comparator, nSucceededForAmbience); } if (darkAmbienceProductionDesc.has_value()) { produceAmbienceStimulusFrame( darkAmbienceProductionDesc->frame.get(), darkAmbienceProductionDesc->comparator, nSucceededForAmbience); } // Record collate kernel end time collateKernelEndTime = std::chrono::high_resolution_clock::now(); // Early callback + return pattern bool success = (collateStatus == CL_SUCCESS); if (!success) { co_return false; } uint32_t nSucceeded = frameAssemblyResult.nSucceeded.load(); int returnMode = static_cast(parent.device->currentReturnMode); size_t pointsPerDgram = livoxProto1::Device::getNPointsPerDgram( returnMode); size_t totalPoints = nSucceeded * pointsPerDgram; // Count points with intensity greater than 116 size_t highIntensityCount = 0; if (intensityStimFrame.has_value()) { StimulusFrame& intensityFrame = intensityStimFrame->get(); float* intensityFloats = reinterpret_cast( intensityFrame.slotDesc.vaddr); for (size_t i = 0; i < totalPoints; ++i) { float intensity = intensityFloats[i]; if (intensity >= 116.0f) { ++highIntensityCount; } } } (void)highIntensityCount; #if SMO_PRINT_PCLOUD_COLLATE_RESULTS std::cout << __func__ << ": collate done intensityRingBufferIndex=" << (intensityStimFrame.has_value() ? intensityStimFrame->get().ringBufferIndex : SIZE_MAX) << " pointsPerDgram=" << pointsPerDgram << " nSucceeded=" << nSucceeded << " totalPoints=" << totalPoints << " highIntensityCount=" << highIntensityCount << std::endl; #endif co_return success; } } std::chrono::milliseconds OpenClCollatingAndMeshingEngine::getCompactKernelDuration() const { auto duration = compactKernelEndTime - compactKernelStartTime; if (duration.count() < 0) { return std::chrono::milliseconds(0); } return std::chrono::duration_cast(duration); } std::chrono::milliseconds OpenClCollatingAndMeshingEngine::getCollateKernelDuration() const { auto duration = collateKernelEndTime - collateKernelStartTime; if (duration.count() < 0) { return std::chrono::milliseconds(0); } return std::chrono::duration_cast(duration); } } // namespace stim_buff } // namespace smo