Files
salmanoff/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp
T
hayodea e689063a8c StimFrame: Store ringbuff index as member var
Now each StimFrame knows its index within its parent SpMcRingbuff
object.
2025-11-23 06:15:54 -04:00

1095 lines
30 KiB
C++

#include <boostAsioLinkageFix.h>
#include <stdexcept>
#include <iostream>
#include <cstring>
#include <vector>
#include <string>
#include <string_view>
#include <boost/system/error_code.hpp>
#include <asynchronousContinuation.h>
#include <callback.h>
#include <asynchronousLoop.h>
#include <componentThread.h>
#include <user/stimulusFrame.h>
#include <livoxProto1/device.h>
#include "livoxGen1.h"
#include "openClCollatingAndMeshingEngine.h"
#include "pcloudStimulusProducer.h"
#include "openClKernels.h"
#include <user/frameAssemblyDesc.h>
#include "ioUringAssemblyEngine.h"
#include <user/senseApiDesc.h>
extern const smo::stim_buff::SmoCallbacks* smoHooksPtr;
namespace smo {
namespace stim_buff {
OpenClCollatingAndMeshingEngine::OpenClCollatingAndMeshingEngine(
PcloudStimulusProducer& parent_)
: parent(parent_),
computeDevice(nullptr),
slotCompactorProgram(nullptr), collateProgram(nullptr),
slotCompactorKernel(nullptr), collateKernel(nullptr),
clAssemblyBufferClBuffer(nullptr),
clCollationBufferClBuffer(nullptr),
clAssemblyBuffer(nullptr),
clCollationBuffer(nullptr),
shouldAcceptRequests(false),
compactIsRunning(false),
collateIsRunning(false),
currentCompactKernelEvent(nullptr), currentCollateKernelEvent(nullptr),
assemblyBufferPtr(nullptr),
assemblyBufferSize(0),
collationBufferPtr(nullptr),
collationBufferSize(0),
mappedAssemblyBuffer(nullptr),
mappedCollationBuffer(nullptr),
frameAssemblyDesc(nullptr)
{
}
OpenClCollatingAndMeshingEngine::~OpenClCollatingAndMeshingEngine()
{
finalize();
}
bool OpenClCollatingAndMeshingEngine::setup()
{
// Defensive check to prevent double-calling
{
SpinLock::Guard lock(shouldAcceptRequestsLock);
if (shouldAcceptRequests)
{
throw std::runtime_error(std::string(__func__) + ": setup() called "
"while already set up");
}
}
if (!smoHooksPtr || !smoHooksPtr->ComputeManager_getDevice)
{
std::cerr << __func__ << ": smo hooks not available" << std::endl;
return false;
}
// Get ComputeDevice from smo hooks
computeDevice = smoHooksPtr->ComputeManager_getDevice();
if (!computeDevice)
{
std::cerr << __func__ << ": failed to get compute device" << std::endl;
return false;
}
// Get StagingBuffer memory pointers from parent
struct iovec assemblyIov = parent.assemblyBuffer.getClEngineIovec();
struct iovec collationIov = parent.collationBuffer.getClEngineIovec();
assemblyBufferPtr = assemblyIov.iov_base;
assemblyBufferSize = assemblyIov.iov_len;
collationBufferPtr = collationIov.iov_base;
collationBufferSize = collationIov.iov_len;
// Get FrameAssemblyDesc from assembly buffer
frameAssemblyDesc = static_cast<std::shared_ptr<FrameAssemblyDesc>>(
parent.assemblyBuffer);
if (!frameAssemblyDesc || frameAssemblyDesc->slots.empty())
{
std::cerr << __func__ << ": invalid frame descriptor" << std::endl;
goto cleanup;
}
// Create OpenCL buffers using smo hooks
if (!smoHooksPtr->ComputeManager_createUseHostPtrBuffer)
{
std::cerr << __func__ << ": createUseHostPtrBuffer hook not available"
<< std::endl;
goto cleanup;
}
clAssemblyBufferClBuffer = smoHooksPtr
->ComputeManager_createUseHostPtrBuffer(
assemblyBufferPtr, assemblyBufferSize, CL_MEM_READ_WRITE);
if (!clAssemblyBufferClBuffer)
{
std::cerr << __func__ << ": failed to create assembly buffer"
<< std::endl;
goto cleanup;
}
clCollationBufferClBuffer = smoHooksPtr
->ComputeManager_createUseHostPtrBuffer(
collationBufferPtr, collationBufferSize, CL_MEM_WRITE_ONLY);
if (!clCollationBufferClBuffer)
{
std::cerr << __func__ << ": failed to create collation buffer"
<< std::endl;
goto cleanup;
}
// Cache cl_mem handles for the device we're using
clAssemblyBuffer = clAssemblyBufferClBuffer
->getAssociatedBufferHandleForDevice(computeDevice);
clCollationBuffer = clCollationBufferClBuffer
->getAssociatedBufferHandleForDevice(computeDevice);
if (!clAssemblyBuffer || !clCollationBuffer)
{
std::cerr << __func__ << ": failed to get buffer handles for device"
<< std::endl;
goto cleanup;
}
// Compile and prepare both kernels
if (!compileAndPrepareKernels()) {
goto cleanup;
}
clFlush(computeDevice->commandQueue);
clFinish(computeDevice->commandQueue);
shouldAcceptRequests = true;
return true;
cleanup:
finalize();
return false;
}
void OpenClCollatingAndMeshingEngine::finalize()
{
// Call stop() to set shouldAcceptRequests to false and get previous state
bool wasAcceptingRequests = stop();
(void)wasAcceptingRequests;
// Complete any running kernels
if (compactIsRunning) { compactKernelComplete(true); }
if (collateIsRunning) { collateKernelComplete(std::nullopt, true); }
// Release OpenCL buffers via smo hooks
if (smoHooksPtr && smoHooksPtr->ComputeManager_releaseUseHostPtrBuffer)
{
if (clCollationBufferClBuffer)
{
smoHooksPtr->ComputeManager_releaseUseHostPtrBuffer(
clCollationBufferClBuffer);
clCollationBufferClBuffer.reset();
}
if (clAssemblyBufferClBuffer)
{
smoHooksPtr->ComputeManager_releaseUseHostPtrBuffer(
clAssemblyBufferClBuffer);
clAssemblyBufferClBuffer.reset();
}
}
// Reset cached cl_mem handles
clCollationBuffer = nullptr;
clAssemblyBuffer = nullptr;
// Release kernels
if (slotCompactorKernel)
{
clReleaseKernel(slotCompactorKernel);
slotCompactorKernel = nullptr;
}
if (collateKernel)
{
clReleaseKernel(collateKernel);
collateKernel = nullptr;
}
// Release programs
if (slotCompactorProgram)
{
clReleaseProgram(slotCompactorProgram);
slotCompactorProgram = nullptr;
}
if (collateProgram)
{
clReleaseProgram(collateProgram);
collateProgram = nullptr;
}
// Release compute device via smo hooks
if (smoHooksPtr && smoHooksPtr->ComputeManager_releaseDevice
&& computeDevice)
{
smoHooksPtr->ComputeManager_releaseDevice(computeDevice);
computeDevice.reset();
}
// Reset state variables
compactIsRunning = false;
collateIsRunning = false;
currentCompactKernelEvent = nullptr;
currentCollateKernelEvent = nullptr;
assemblyBufferPtr = nullptr;
assemblyBufferSize = 0;
collationBufferPtr = nullptr;
collationBufferSize = 0;
frameAssemblyDesc = nullptr;
}
// Static callback for compact kernel event
void CL_CALLBACK OpenClCollatingAndMeshingEngine::compactKernelEventCallback(
cl_event /*event*/, cl_int event_command_exec_status, void* user_data)
{
OpenClCollatingAndMeshingEngine* engine =
static_cast<OpenClCollatingAndMeshingEngine*>(user_data);
if (!engine || !engine->compactKernelCb)
{ return; }
// Post to io_service to call callback on the correct thread
if (engine->parent.device && engine->parent.device->componentThread)
{
engine->parent.device->componentThread->getIoService().post(
std::bind(engine->compactKernelCb, event_command_exec_status));
}
}
// Static callback for collate kernel event
void CL_CALLBACK OpenClCollatingAndMeshingEngine::collateKernelEventCallback(
cl_event /*event*/, cl_int event_command_exec_status, void* user_data)
{
OpenClCollatingAndMeshingEngine* engine =
static_cast<OpenClCollatingAndMeshingEngine*>(user_data);
if (!engine || !engine->collateKernelCb)
{ return; }
// Post to io_service to call callback on the correct thread
if (engine->parent.device && engine->parent.device->componentThread)
{
engine->parent.device->componentThread->getIoService().post(
std::bind(engine->collateKernelCb, event_command_exec_status));
}
}
bool OpenClCollatingAndMeshingEngine::startCompactKernel(
StagingBuffer& assemblyBuff, uint32_t nSucceeded,
compactKernelCbFn callback)
{
// Store the caller's callback
compactKernelCb = std::move(callback);
// Validate buffers callable
auto validateBuffers = [this, &assemblyBuff]() {
struct iovec assemblyIov = assemblyBuff.getClEngineIovec();
if (assemblyIov.iov_base != assemblyBufferPtr
|| assemblyIov.iov_len != assemblyBufferSize)
{
throw std::runtime_error(
std::string(__func__) + ": buffer mismatch - buffers have "
"changed");
}
};
// Setup args callable
auto setupArgs = [this, &assemblyBuff, nSucceeded]() {
return setupSlotCompactorsArgs(assemblyBuff, nSucceeded);
};
/** EXPLANAITON:
* Map assembly buffer as WRITE_INVALIDATE_REGION to inform OpenCL that host
* (io_uring) has written data, and that the host doesn't care what the
* prior contents of the device's cache see. The device must invalidate
* its own view of the HOST_PTR and accept our view.
*
* Then immediately unmap to let OpenCL make the changes visible to the GPU
*/
if (!mapAssemblyBuffer(CL_MAP_WRITE_INVALIDATE_REGION))
{
std::cerr << __func__ << ": failed to map assembly buffer" << std::endl;
return false;
}
// Unmap immediately to sync host writes to GPU
unmapAssemblyBuffer();
bool success = startKernel(
slotCompactorKernel,
&currentCompactKernelEvent,
setupArgs,
validateBuffers,
1, // globalWorkSize
compactKernelEventCallback,
"slotCompactor",
compactIsRunning);
if (!success) { return false; }
return true;
}
bool OpenClCollatingAndMeshingEngine::startCollateKernel(
StagingBuffer& assemblyBuff, StagingBuffer& collationBuff,
std::optional<std::reference_wrapper<StimulusFrame>> intensityStimFrame,
collateKernelCbFn callback)
{
// Store the caller's callback
collateKernelCb = std::move(callback);
// Validate buffers callable
auto validateBuffers = [this, &assemblyBuff, &collationBuff]() {
struct iovec assemblyIov = assemblyBuff.getClEngineIovec();
struct iovec collationIov = collationBuff.getClEngineIovec();
if (assemblyIov.iov_base != assemblyBufferPtr
|| assemblyIov.iov_len != assemblyBufferSize
|| collationIov.iov_base != collationBufferPtr
|| collationIov.iov_len != collationBufferSize)
{
throw std::runtime_error(
std::string(__func__) + ": buffer mismatch - buffers have changed");
}
};
// Setup args callable
auto setupArgs = [this, &assemblyBuff, intensityStimFrame]() {
return setupCollateDgramsArgs(assemblyBuff, intensityStimFrame);
};
/** EXPLANATION:
* It shouldn't be necessary to map the assembly/collation buffers here
* since we don't need to read/write them on the host CPUs (unless we're
* intervening to debug; in which case we should map them as CL_MAP_READ).
*
* Otherwise, the foreign GPU's view of the data in the assembly buffer
* is currently up to date; and the collation buffer's state is undefined...
* and also irrelevant since it's only going to be used for output anyway.
*/
if (!mapAssemblyBuffer(CL_MAP_WRITE_INVALIDATE_REGION))
{
std::cerr << __func__ << ": failed to map assembly buffer" << std::endl;
return false;
}
unmapAssemblyBuffer();
if (!mapCollationBuffer(CL_MAP_WRITE))
{
std::cerr << __func__ << ": failed to map assembly buffer" << std::endl;
return false;
}
unmapCollationBuffer();
// Map/unmap intensity buffer if it exists
if (intensityStimFrame.has_value())
{
StimulusFrame& intensityFrame = intensityStimFrame->get();
cl_mem intensityClBuffer = intensityFrame.clBuffer
->getAssociatedBufferHandleForDevice(computeDevice);
if (intensityClBuffer)
{
void* mappedIntensityBuffer = nullptr;
if (!mapBuffer(intensityClBuffer, intensityFrame.slotDesc.nBytes,
CL_MAP_WRITE_INVALIDATE_REGION, mappedIntensityBuffer))
{
std::cerr << __func__ << ": failed to map intensity buffer"
<< std::endl;
return false;
}
unmapBuffer(intensityClBuffer, mappedIntensityBuffer);
}
}
// Calculate global work size (just num slots in the frame)
size_t globalWorkSize = static_cast<uint32_t>(frameAssemblyDesc->numSlots);
bool success = startKernel(
collateKernel,
&currentCollateKernelEvent,
setupArgs,
validateBuffers,
globalWorkSize,
collateKernelEventCallback,
"collateDgrams",
collateIsRunning);
if (!success) { return false; }
return true;
}
bool OpenClCollatingAndMeshingEngine::compileAndPrepareKernel(
const char* kernelSource, size_t kernelSourceLen,
const char* kernelName, cl_program& program, cl_kernel& kernel)
{
cl_int err;
// Create program from source
program = clCreateProgramWithSource(
computeDevice->context, 1, &kernelSource, &kernelSourceLen, &err);
if (err != CL_SUCCESS || !program)
{
std::cerr << __func__ << ": failed to create " << kernelName
<< " program: " << err << std::endl;
return false;
}
// Build program
err = clBuildProgram(program, 1, &computeDevice->device,
nullptr, nullptr, nullptr);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to build " << kernelName
<< " program: " << err << std::endl;
// Print build log if available
size_t logSize = 0;
clGetProgramBuildInfo(
program, computeDevice->device, CL_PROGRAM_BUILD_LOG,
0, nullptr, &logSize);
if (logSize > 0)
{
std::vector<char> log(logSize);
clGetProgramBuildInfo(
program, computeDevice->device, CL_PROGRAM_BUILD_LOG,
logSize, log.data(), nullptr);
std::cerr << kernelName << " build log: " << log.data()
<< std::endl;
}
return false;
}
// Create kernel
kernel = clCreateKernel(program, kernelName, &err);
if (err != CL_SUCCESS || !kernel)
{
std::cerr << __func__ << ": failed to create " << kernelName
<< " kernel: " << err << std::endl;
return false;
}
return true;
}
bool OpenClCollatingAndMeshingEngine::compileAndPrepareKernels()
{
// Compile slotCompactor kernel
if (!compileAndPrepareKernel(
slotCompactorKernelStart, slotCompactorKernelNBytes,
"slotCompactor", slotCompactorProgram, slotCompactorKernel))
{
return false;
}
// Compile collateDgrams kernel
if (!compileAndPrepareKernel(
collateKernelStart, collateKernelNBytes,
"collate", collateProgram, collateKernel))
{
return false;
}
return true;
}
bool OpenClCollatingAndMeshingEngine::setupSlotCompactorsArgs(
StagingBuffer& assemblyBuff, uint32_t nSucceeded)
{
// Extract parameters for slotCompactor kernel
uint32_t numSlots = static_cast<uint32_t>(frameAssemblyDesc->numSlots);
uint32_t slotStride = static_cast<uint32_t>(assemblyBuff.slotStrideNBytes);
uint32_t slotSize = static_cast<uint32_t>(frameAssemblyDesc->slotSizeBytes);
uint32_t nSucceededUint = static_cast<uint32_t>(nSucceeded);
// Set kernel arguments for slotCompactor
cl_int err;
err = clSetKernelArg(
slotCompactorKernel, 0, sizeof(cl_mem), &clAssemblyBuffer);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 0: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(slotCompactorKernel, 1, sizeof(uint32_t), &numSlots);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 1: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(slotCompactorKernel, 2, sizeof(uint32_t), &slotStride);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 2: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(slotCompactorKernel, 3, sizeof(uint32_t), &slotSize);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 3: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(
slotCompactorKernel, 4, sizeof(uint32_t), &nSucceededUint);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 4: " << err
<< std::endl;
return false;
}
return true;
}
bool OpenClCollatingAndMeshingEngine::setupCollateDgramsArgs(
StagingBuffer& assemblyBuff,
std::optional<std::reference_wrapper<StimulusFrame>> intensityStimFrame)
{
// Extract parameters for collateDgrams kernel
uint32_t slotStride = static_cast<uint32_t>(assemblyBuff.slotStrideNBytes);
// Calculate nPointsPerSlot from device return mode
if (!parent.device)
{
std::cerr << __func__ << ": device not available" << std::endl;
return false;
}
int returnMode = static_cast<int>(parent.device->currentReturnMode);
uint32_t nPointsPerSlot = static_cast<uint32_t>(
livoxProto1::Device::getNPointsPerDgram(returnMode));
uint32_t nDgramsPerFrame = static_cast<uint32_t>(
frameAssemblyDesc->numSlots);
// Set kernel arguments for collateDgrams
cl_int err;
err = clSetKernelArg(collateKernel, 0, sizeof(cl_mem), &clAssemblyBuffer);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 0: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(collateKernel, 1, sizeof(cl_mem), &clCollationBuffer);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 1: " << err
<< std::endl;
return false;
}
// Set intensity buffer argument (arg 2)
cl_mem intensityClBuffer = nullptr;
if (intensityStimFrame.has_value())
{
intensityClBuffer = intensityStimFrame->get().clBuffer
->getAssociatedBufferHandleForDevice(computeDevice);
}
err = clSetKernelArg(collateKernel, 2, sizeof(cl_mem), &intensityClBuffer);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 2: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(collateKernel, 3, sizeof(uint32_t), &slotStride);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 3: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(collateKernel, 4, sizeof(uint32_t), &nPointsPerSlot);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 4: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(collateKernel, 5, sizeof(uint32_t), &nDgramsPerFrame);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 5: " << err
<< std::endl;
return false;
}
return true;
}
bool OpenClCollatingAndMeshingEngine::stop()
{
// Acquire and release lock tightly around setting the flag
SpinLock::Guard lock(shouldAcceptRequestsLock);
bool wasAcceptingRequests = shouldAcceptRequests;
shouldAcceptRequests = false;
return wasAcceptingRequests;
}
void OpenClCollatingAndMeshingEngine::compactKernelComplete(bool isFinalizing)
{
cl_map_flags mapFlags;
/** EXPLANATION:
* Technically we should only need to do this if we plan to read the
* compacted slots for debugging purposes. Otherwise this is unnecessary.
*/
if (isFinalizing) { mapFlags = CL_MAP_WRITE_INVALIDATE_REGION; }
else { mapFlags = CL_MAP_READ; }
if (mapAssemblyBuffer(mapFlags)) {
unmapAssemblyBuffer();
}
clFlush(computeDevice->commandQueue);
// Stop only compact kernel
if (compactIsRunning && currentCompactKernelEvent)
{
clWaitForEvents(1, &currentCompactKernelEvent);
clReleaseEvent(currentCompactKernelEvent);
currentCompactKernelEvent = nullptr;
}
clFinish(computeDevice->commandQueue);
compactKernelCb = [](cl_int){};
compactIsRunning = false;
}
void OpenClCollatingAndMeshingEngine::collateKernelComplete(
std::optional<std::reference_wrapper<StimulusFrame>> intensityStimFrame,
bool isFinalizing)
{
cl_map_flags mapFlags;
/** EXPLANATION:
* Technically we should only need to do this if we plan to read the
* collated dgrams for debugging purposes. Otherwise this is unnecessary.
*/
if (isFinalizing) { mapFlags = CL_MAP_WRITE_INVALIDATE_REGION; }
else { mapFlags = CL_MAP_READ; }
if (mapCollationBuffer(mapFlags)) {
unmapCollationBuffer();
}
// Map/unmap intensity buffer if it exists
if (intensityStimFrame.has_value())
{
StimulusFrame& intensityFrame = intensityStimFrame->get();
cl_mem intensityClBuffer = intensityFrame.clBuffer
->getAssociatedBufferHandleForDevice(computeDevice);
if (intensityClBuffer)
{
void* mappedIntensityBuffer = nullptr;
if (mapBuffer(intensityClBuffer, intensityFrame.slotDesc.nBytes,
CL_MAP_READ, mappedIntensityBuffer))
{
unmapBuffer(intensityClBuffer, mappedIntensityBuffer);
}
}
}
clFlush(computeDevice->commandQueue);
// Stop only collate kernel
if (collateIsRunning && currentCollateKernelEvent)
{
clWaitForEvents(1, &currentCollateKernelEvent);
clReleaseEvent(currentCollateKernelEvent);
currentCollateKernelEvent = nullptr;
}
clFinish(computeDevice->commandQueue);
collateKernelCb = [](cl_int){};
collateIsRunning = false;
}
bool OpenClCollatingAndMeshingEngine::mapBuffer(
cl_mem buffer, size_t size, cl_map_flags mapFlags, void*& mappedPtr)
{
if (!computeDevice->commandQueue || !buffer)
{
std::cerr << __func__ << ": engine not set up or invalid buffer"
<< std::endl;
return false;
}
// If already mapped, return early with success.
if (mappedPtr != nullptr) { return true; }
cl_int err;
mappedPtr = clEnqueueMapBuffer(
computeDevice->commandQueue, buffer, CL_TRUE, mapFlags,
0, size, 0, nullptr, nullptr, &err);
if (err != CL_SUCCESS || !mappedPtr)
{
std::cerr << __func__ << ": failed to map buffer: " << err
<< std::endl;
goto cleanup;
}
return true;
cleanup:
mappedPtr = nullptr;
return false;
}
bool OpenClCollatingAndMeshingEngine::unmapBuffer(
cl_mem buffer, void*& mappedPtr
)
{
if (mappedPtr == nullptr)
{
// Already unmapped
return true;
}
if (!computeDevice->commandQueue || !buffer)
{
std::cerr << __func__ << ": engine not set up or invalid buffer.\n";
return false;
}
cl_int err;
cl_event unmapEvent = nullptr;
err = clEnqueueUnmapMemObject(
computeDevice->commandQueue, buffer, mappedPtr,
0, nullptr, &unmapEvent);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to unmap buffer: " << err
<< std::endl;
goto cleanup;
}
// Wait for unmap to complete and release the event
err = clWaitForEvents(1, &unmapEvent);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to wait for unmap event: " << err
<< std::endl;
goto cleanup_unmapEvent;
}
clReleaseEvent(unmapEvent);
mappedPtr = nullptr;
return true;
cleanup_unmapEvent:
clReleaseEvent(unmapEvent);
cleanup:
return false;
}
bool OpenClCollatingAndMeshingEngine::mapAssemblyBuffer(cl_map_flags mapFlags)
{
return mapBuffer(
clAssemblyBuffer, assemblyBufferSize, mapFlags, mappedAssemblyBuffer);
}
bool OpenClCollatingAndMeshingEngine::unmapAssemblyBuffer()
{
unmapBuffer(clAssemblyBuffer, mappedAssemblyBuffer);
mappedAssemblyBuffer = nullptr;
return true;
}
bool OpenClCollatingAndMeshingEngine::mapCollationBuffer(cl_map_flags mapFlags)
{
return mapBuffer(
clCollationBuffer, collationBufferSize, mapFlags,
mappedCollationBuffer);
}
bool OpenClCollatingAndMeshingEngine::unmapCollationBuffer()
{
unmapBuffer(clCollationBuffer, mappedCollationBuffer);
mappedCollationBuffer = nullptr;
return true;
}
class OpenClCollatingAndMeshingEngine::CompactCollateAndMeshFrameReq
: public PostedAsynchronousContinuation<compactCollateAndMeshFrameReqCbFn>
{
private:
OpenClCollatingAndMeshingEngine& engine;
AsynchronousLoop frameAssemblyResult;
StimulusFrame& stimulusFrame;
std::optional<std::reference_wrapper<StimulusFrame>> intensityStimFrame;
public:
CompactCollateAndMeshFrameReq(
OpenClCollatingAndMeshingEngine& engine_,
AsynchronousLoop& asyncLoop,
StimulusFrame& stimulusFrame_,
std::optional<std::reference_wrapper<StimulusFrame>> intensityStimFrame_,
const std::shared_ptr<ComponentThread>& caller,
Callback<compactCollateAndMeshFrameReqCbFn> cb)
: PostedAsynchronousContinuation<compactCollateAndMeshFrameReqCbFn>(
caller, cb),
engine(engine_),
frameAssemblyResult(asyncLoop), stimulusFrame(stimulusFrame_),
intensityStimFrame(intensityStimFrame_)
{}
public:
void callOriginalCallback(bool success)
{ callOriginalCb(success, std::ref(stimulusFrame)); }
public:
void compactCollateAndMeshFrameReq1_doCompact_posted(
std::shared_ptr<CompactCollateAndMeshFrameReq> context)
{
SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
if (!engine.shouldAcceptRequests)
{
callOriginalCallback(false);
return;
}
// Record compact kernel start time
engine.compactKernelStartTime = std::chrono::high_resolution_clock::now();
bool success = engine.startCompactKernel(
engine.parent.assemblyBuffer,
static_cast<uint32_t>(context->frameAssemblyResult.nSucceeded.load()),
std::bind(
&CompactCollateAndMeshFrameReq
::compactCollateAndMeshFrameReq2_compactDone_posted,
context.get(), context,
std::placeholders::_1));
if (!success)
{
engine.compactKernelComplete();
callOriginalCallback(false);
return;
}
}
void compactCollateAndMeshFrameReq2_compactDone_posted(
std::shared_ptr<CompactCollateAndMeshFrameReq> context,
cl_int compactStatus)
{
SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
if (!engine.shouldAcceptRequests)
{
/** EXPLANATION:
* We intentionally don't call compactKernelComplete() here because
* if shouldAcceptRequests is false, then the caller that called
* finalize() will also be forced to call compactKernelComplete()
* inside of finalize().
*/
callOriginalCallback(false);
return;
}
engine.compactKernelComplete();
// Record compact kernel end time
engine.compactKernelEndTime = std::chrono::high_resolution_clock::now();
// If compact failed, call callback directly with failure
if (compactStatus != CL_SUCCESS)
{
callOriginalCallback(false);
return;
}
#if 0
// Print first 4 bytes of each slot
if (engine.frameAssemblyDesc)
{
for (size_t i = 0; i < engine.frameAssemblyDesc->numSlots; ++i) {
engine.parent.ioUringAssemblyEngine.printSlotBytes(i, 4);
}
}
#endif
lock.unlockPrematurely();
context->compactCollateAndMeshFrameReq3_doCollate_posted(context);
}
void compactCollateAndMeshFrameReq3_doCollate_posted(
std::shared_ptr<CompactCollateAndMeshFrameReq> context)
{
SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
if (!engine.shouldAcceptRequests)
{
callOriginalCallback(false);
return;
}
// Record collate kernel start time
engine.collateKernelStartTime = std::chrono::high_resolution_clock::now();
bool success = engine.startCollateKernel(
engine.parent.assemblyBuffer, engine.parent.collationBuffer,
context->intensityStimFrame,
std::bind(
&CompactCollateAndMeshFrameReq
::compactCollateAndMeshFrameReq4_collateDone_maybePosted,
context.get(), context,
std::placeholders::_1));
if (!success)
{
engine.collateKernelComplete(context->intensityStimFrame);
callOriginalCallback(false);
return;
}
}
void compactCollateAndMeshFrameReq4_collateDone_maybePosted(
[[maybe_unused]] std::shared_ptr<CompactCollateAndMeshFrameReq> context,
cl_int collateStatus)
{
SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
if (!engine.shouldAcceptRequests)
{
/* We intentionally don't call collateKernelComplete() here for the
* same reason as above.
*/
callOriginalCallback(false);
return;
}
/** EXPLANATION:
* The reason we don't call collateKernelComplete before checking
* shouldAcceptRequests is because if shouldAcceptRequests is false, then
* we shouldn't touch any of the collate cycle state...or any state within
* the engine at all, since finalize() may well have been called.
*
* Therefore it's finalize()'s responsibility to ensure that it properly
* completes/cleans up any in-flight operations.
*/
engine.collateKernelComplete(context->intensityStimFrame);
// Record collate kernel end time
engine.collateKernelEndTime = std::chrono::high_resolution_clock::now();
bool success = (collateStatus == CL_SUCCESS);
// Early callback + return pattern
if (!success)
{
callOriginalCallback(false);
return;
}
int returnMode = static_cast<int>(engine.parent.device->currentReturnMode);
size_t pointsPerDgram = livoxProto1::Device::getNPointsPerDgram(
returnMode);
uint32_t nSucceeded = context->frameAssemblyResult.nSucceeded.load();
size_t totalPoints = nSucceeded * pointsPerDgram;
// Count points with intensity greater than 116
size_t highIntensityCount = 0;
if (context->intensityStimFrame.has_value())
{
StimulusFrame& intensityFrame = context->intensityStimFrame->get();
float* intensityFloats = reinterpret_cast<float*>(intensityFrame.slotDesc.vaddr);
for (size_t i = 0; i < totalPoints; ++i)
{
float intensity = intensityFloats[i];
if (intensity > 116.0f)
{
++highIntensityCount;
}
}
}
std::cout << __func__ << ": ringBufferIndex="
<< context->intensityStimFrame->get().ringBufferIndex
<< ", pointsPerDgram=" << pointsPerDgram
<< ", nSucceeded=" << nSucceeded
<< ", totalPoints=" << totalPoints
<< ", highIntensityCount=" << highIntensityCount << std::endl;
callOriginalCallback(success);
}
};
void OpenClCollatingAndMeshingEngine::compactCollateAndMeshFrameReq(
AsynchronousLoop& asyncLoop, StimulusFrame& stimulusFrame,
std::optional<std::reference_wrapper<StimulusFrame>> intensityStimFrame,
Callback<compactCollateAndMeshFrameReqCbFn> callback)
{
{
SpinLock::Guard lock(shouldAcceptRequestsLock);
if (!shouldAcceptRequests)
{
callback.callbackFn(false, stimulusFrame);
return;
}
}
auto caller = smoHooksPtr->ComponentThread_getSelf();
auto request = std::make_shared<CompactCollateAndMeshFrameReq>(
*this, asyncLoop, stimulusFrame, intensityStimFrame,
caller,
std::move(callback));
// Check if compaction is needed
bool needsCompaction = IoUringAssemblyEngine::compactionIsNeeded(
asyncLoop.nSucceeded.load(), asyncLoop.nTotal);
// Start with compaction if needed, then chain to collation
if (needsCompaction)
{
parent.device->componentThread->getIoService().post(
STC(std::bind(
&CompactCollateAndMeshFrameReq
::compactCollateAndMeshFrameReq1_doCompact_posted,
request.get(), request)));
}
else
{
// Skip compaction, go straight to collation
parent.device->componentThread->getIoService().post(
STC(std::bind(
&CompactCollateAndMeshFrameReq
::compactCollateAndMeshFrameReq3_doCollate_posted,
request.get(), request)));
}
}
std::chrono::milliseconds OpenClCollatingAndMeshingEngine::getCompactKernelDuration() const
{
auto duration = compactKernelEndTime - compactKernelStartTime;
if (duration.count() < 0)
{
return std::chrono::milliseconds(0);
}
return std::chrono::duration_cast<std::chrono::milliseconds>(duration);
}
std::chrono::milliseconds OpenClCollatingAndMeshingEngine::getCollateKernelDuration() const
{
auto duration = collateKernelEndTime - collateKernelStartTime;
if (duration.count() < 0)
{
return std::chrono::milliseconds(0);
}
return std::chrono::duration_cast<std::chrono::milliseconds>(duration);
}
} // namespace stim_buff
} // namespace smo