diff --git a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp index 22d0007..a9c3eb5 100644 --- a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp +++ b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp @@ -37,6 +37,8 @@ assemblyBufferPtr(nullptr), assemblyBufferSize(0), collationBufferPtr(nullptr), collationBufferSize(0), +mappedAssemblyBuffer(nullptr), +mappedCollationBuffer(nullptr), frameAssemblyDesc(nullptr) { } @@ -239,9 +241,6 @@ void CL_CALLBACK OpenClCollatingAndMeshingEngine::compactKernelEventCallback( if (!engine || !engine->compactKernelCb) { return; } - // Stop the compact kernel -// engine->stopCompactKernel(); - // Post to io_service to call callback on the correct thread if (engine->parent.device && engine->parent.device->componentThread) { @@ -260,9 +259,6 @@ void CL_CALLBACK OpenClCollatingAndMeshingEngine::collateKernelEventCallback( if (!engine || !engine->collateKernelCb) { return; } - // Stop the collate kernel -// engine->stopCollateKernel(); - // Post to io_service to call callback on the correct thread if (engine->parent.device && engine->parent.device->componentThread) { @@ -285,7 +281,8 @@ bool OpenClCollatingAndMeshingEngine::startCompactKernel( || assemblyIov.iov_len != assemblyBufferSize) { throw std::runtime_error( - std::string(__func__) + ": buffer mismatch - buffers have changed"); + std::string(__func__) + ": buffer mismatch - buffers have " + "changed"); } }; @@ -294,7 +291,24 @@ bool OpenClCollatingAndMeshingEngine::startCompactKernel( return setupSlotCompactorsArgs(assemblyBuff, nSucceeded); }; - return startKernel( + /** EXPLANAITON: + * Map assembly buffer as WRITE_INVALIDATE_REGION to inform OpenCL that host + * (io_uring) has written data, and that the host doesn't care what the + * prior contents of the device's cache see. The device must invalidate + * its own view of the HOST_PTR and accept our view. + * + * Then immediately unmap to let OpenCL make the changes visible to the GPU + */ + if (!mapAssemblyBuffer(CL_MAP_WRITE_INVALIDATE_REGION)) + { + std::cerr << __func__ << ": failed to map assembly buffer" << std::endl; + return false; + } + + // Unmap immediately to sync host writes to GPU + unmapAssemblyBuffer(); + + bool success = startKernel( slotCompactorKernel, ¤tCompactKernelEvent, setupArgs, @@ -304,6 +318,9 @@ bool OpenClCollatingAndMeshingEngine::startCompactKernel( "slotCompactor", compactIsSetup, compactIsRunning); + + if (!success) { return false; } + return true; } bool OpenClCollatingAndMeshingEngine::startCollateKernel( @@ -313,6 +330,21 @@ bool OpenClCollatingAndMeshingEngine::startCollateKernel( // Store the caller's callback collateKernelCb = std::move(callback); + /** EXPLANATION: + * It shouldn't be necessary to map the assembly/collation buffers here + * since we don't need to read/write them on the host CPUs (unless we're + * intervening to debug; in which case we should map them as CL_MAP_READ). + * + * Otherwise, the foreign GPU's view of the data in the assembly buffer + * is currently up to date; and the collation buffer's state is undefined... + * and also irrelevant since it's only going to be used for output anyway. + */ + + mapAssemblyBuffer(CL_MAP_WRITE_INVALIDATE_REGION); + unmapAssemblyBuffer(); + mapCollationBuffer(CL_MAP_WRITE_INVALIDATE_REGION); + unmapCollationBuffer(); + // Validate buffers callable auto validateBuffers = [this, &assemblyBuff, &collationBuff]() { struct iovec assemblyIov = assemblyBuff.getClEngineIovec(); @@ -332,12 +364,10 @@ bool OpenClCollatingAndMeshingEngine::startCollateKernel( return setupCollateDgramsArgs(assemblyBuff); }; - // Calculate global work size - uint32_t nDgramsPerFrame = static_cast( - frameAssemblyDesc->numSlots); - size_t globalWorkSize = nDgramsPerFrame; + // Calculate global work size (just num slots in the frame) + size_t globalWorkSize = static_cast(frameAssemblyDesc->numSlots); - return startKernel( + bool success = startKernel( collateKernel, ¤tCollateKernelEvent, setupArgs, @@ -347,6 +377,9 @@ bool OpenClCollatingAndMeshingEngine::startCollateKernel( "collateDgrams", collateIsSetup, collateIsRunning); + + if (!success) { return false; } + return true; } bool OpenClCollatingAndMeshingEngine::compileAndPrepareKernel( @@ -574,6 +607,15 @@ void OpenClCollatingAndMeshingEngine::stop() void OpenClCollatingAndMeshingEngine::stopCompactKernel() { + /** EXPLANATION: + * Technically we should only need to do this if we plan to read the + * compacted slots for debugging purposes. Otherwise this is unnecessary. + */ + mapAssemblyBuffer(CL_MAP_READ); + unmapAssemblyBuffer(); + clFlush(commandQueue); + clFinish(commandQueue); + // Stop only compact kernel if (compactIsRunning && currentCompactKernelEvent) { @@ -587,6 +629,14 @@ void OpenClCollatingAndMeshingEngine::stopCompactKernel() void OpenClCollatingAndMeshingEngine::stopCollateKernel() { + /** EXPLANATION: + * Technically we should only need to do this if we plan to read the + * collated dgrams for debugging purposes. Otherwise this is unnecessary. + */ + mapCollationBuffer(CL_MAP_READ); + unmapCollationBuffer(); + clFlush(commandQueue); + clFinish(commandQueue); // Stop only collate kernel if (collateIsRunning && currentCollateKernelEvent) { @@ -598,6 +648,95 @@ void OpenClCollatingAndMeshingEngine::stopCollateKernel() collateKernelCb = [](cl_int){}; } +bool OpenClCollatingAndMeshingEngine::mapBuffer( + cl_mem buffer, size_t size, cl_map_flags mapFlags, void*& mappedPtr) +{ + if (!commandQueue || !buffer) + { + std::cerr << __func__ << ": engine not set up or invalid buffer" + << std::endl; + + return false; + } + + // If already mapped, return early with success. + if (mappedPtr != nullptr) { return true; } + + cl_int err; + cl_event mapEvent; + mappedPtr = clEnqueueMapBuffer( + commandQueue, buffer, CL_TRUE, mapFlags, + 0, size, 0, nullptr, &mapEvent, &err); + + if (err != CL_SUCCESS || !mappedPtr) + { + std::cerr << __func__ << ": failed to map buffer: " << err + << std::endl; + mappedPtr = nullptr; + return false; + } + + return true; +} + +bool OpenClCollatingAndMeshingEngine::unmapBuffer(cl_mem buffer, void*& mappedPtr) +{ + if (mappedPtr == nullptr) + { + // Already unmapped + return true; + } + + if (!commandQueue || !buffer) + { + std::cerr << __func__ << ": engine not set up or invalid buffer" << std::endl; + return false; + } + + cl_int err; + cl_event unmapEvent; + err = clEnqueueUnmapMemObject( + commandQueue, buffer, mappedPtr, + 0, nullptr, &unmapEvent); + + if (err != CL_SUCCESS) + { + std::cerr << __func__ << ": failed to unmap buffer: " << err + << std::endl; + return false; + } + + mappedPtr = nullptr; + return true; +} + +bool OpenClCollatingAndMeshingEngine::mapAssemblyBuffer(cl_map_flags mapFlags) +{ + return mapBuffer( + clAssemblyBuffer, assemblyBufferSize, mapFlags, mappedAssemblyBuffer); +} + +bool OpenClCollatingAndMeshingEngine::unmapAssemblyBuffer() +{ + unmapBuffer(clAssemblyBuffer, mappedAssemblyBuffer); + mappedAssemblyBuffer = nullptr; + return true; +} + +bool OpenClCollatingAndMeshingEngine::mapCollationBuffer(cl_map_flags mapFlags) +{ + return mapBuffer( + clCollationBuffer, collationBufferSize, mapFlags, + mappedCollationBuffer); +} + +bool OpenClCollatingAndMeshingEngine::unmapCollationBuffer() +{ + unmapBuffer(clCollationBuffer, mappedCollationBuffer); + mappedCollationBuffer = nullptr; + return true; +} + class OpenClCollatingAndMeshingEngine::CompactCollateAndMeshFrameReq : public PostedAsynchronousContinuation { @@ -638,6 +777,7 @@ public: if (!success) { + engine.stopCompactKernel(); callOriginalCallback(false); return; } @@ -648,7 +788,6 @@ public: cl_int compactStatus) { engine.stopCompactKernel(); -std::cout << __func__ << ": Compact done, compact status: " << compactStatus << std::endl; // If compact failed, call callback directly with failure if (compactStatus != CL_SUCCESS) { @@ -656,6 +795,7 @@ std::cout << __func__ << ": Compact done, compact status: " << compactStatus << return; } +#if 0 // Print first 4 bytes of each slot if (engine.frameAssemblyDesc) { @@ -663,6 +803,7 @@ std::cout << __func__ << ": Compact done, compact status: " << compactStatus << engine.parent.ioUringAssemblyEngine.printSlotBytes(i, 4); } } +#endif context->compactCollateAndMeshFrameReq3_doCollate_posted(context); } @@ -680,10 +821,10 @@ std::cout << __func__ << ": Compact done, compact status: " << compactStatus << if (!success) { + engine.stopCollateKernel(); callOriginalCallback(false); return; } -std::cout << __func__ << ": Started collate kernel" << std::endl; } void compactCollateAndMeshFrameReq4_collateDone_maybePosted( @@ -691,8 +832,37 @@ std::cout << __func__ << ": Started collate kernel" << std::endl; cl_int collateStatus) { engine.stopCollateKernel(); -std::cout << __func__ << ": Collate done, collate status: " << collateStatus << std::endl; bool success = (collateStatus == CL_SUCCESS); + + // Early callback + return pattern + if (!success) + { + callOriginalCallback(false); + return; + } + + int returnMode = static_cast(engine.parent.device->currentReturnMode); + size_t pointsPerDgram = IoUringAssemblyEngine::computePointsPerDgram(returnMode); + uint32_t nSucceeded = context->frameAssemblyResult.nSucceeded.load(); + size_t totalPoints = nSucceeded * pointsPerDgram; + + // Count points with intensity greater than 116 + float* collationFloats = static_cast(engine.collationBufferPtr); + size_t highIntensityCount = 0; + for (size_t i = 0; i < totalPoints; ++i) + { + float intensity = collationFloats[i * 4 + 3]; + if (intensity > 116.0f) + { + ++highIntensityCount; + } + } + + std::cout << __func__ << ": pointsPerDgram=" << pointsPerDgram + << ", nSucceeded=" << nSucceeded + << ", totalPoints=" << totalPoints + << ", highIntensityCount=" << highIntensityCount << std::endl; + callOriginalCallback(success); } }; @@ -719,7 +889,6 @@ void OpenClCollatingAndMeshingEngine::compactCollateAndMeshFrameReq( &CompactCollateAndMeshFrameReq ::compactCollateAndMeshFrameReq1_doCompact_posted, request.get(), request))); -std::cout << __func__ << ": Started compact kernel" << std::endl; } else { @@ -729,7 +898,6 @@ std::cout << __func__ << ": Started compact kernel" << std::endl; &CompactCollateAndMeshFrameReq ::compactCollateAndMeshFrameReq3_doCollate_posted, request.get(), request))); -std::cout << __func__ << ": Skipped compaction, started collate kernel" << std::endl; } } diff --git a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h index ecf4825..f6b6b7a 100644 --- a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h +++ b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h @@ -92,6 +92,9 @@ private: size_t assemblyBufferSize; void* collationBufferPtr; size_t collationBufferSize; + // Mapped buffer pointers (for zero-copy synchronization) + void* mappedAssemblyBuffer; + void* mappedCollationBuffer; // Frame descriptor (cached from setup) std::shared_ptr frameAssemblyDesc; @@ -115,6 +118,17 @@ private: StagingBuffer& assemblyBuff, uint32_t nSucceeded); bool setupCollateDgramsArgs(StagingBuffer& assemblyBuff); + // Generic buffer mapping/unmapping for zero-copy synchronization + bool mapBuffer( + cl_mem buffer, size_t size, cl_map_flags mapFlags, void*& mappedPtr); + bool unmapBuffer(cl_mem buffer, void*& mappedPtr); + + // Wrapper functions for specific buffers + bool mapAssemblyBuffer(cl_map_flags mapFlags = CL_MAP_READ); + bool unmapAssemblyBuffer(); + bool mapCollationBuffer(cl_map_flags mapFlags = CL_MAP_READ); + bool unmapCollationBuffer(); + // Forward declaration for continuation class class CompactCollateAndMeshFrameReq; @@ -177,6 +191,17 @@ private: return false; } + // Force queue flush to ensure event processing and callback invocation + err = clFlush(commandQueue); + if (err != CL_SUCCESS) + { + std::cerr << __func__ << ": failed to flush queue: " << err + << std::endl; + clReleaseEvent(*eventPtr); + *eventPtr = nullptr; + return false; + } + isRunning = true; return true; }