Files
salmanoff/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp
T
hayodea b331af4f03 ClCollMeshEngn: Split start into start[Collate|Compact]Kernel()
These prepare each kernel separately. We'll unify them further.
2025-11-09 16:12:10 -04:00

652 lines
16 KiB
C++

#include <boostAsioLinkageFix.h>
#include <stdexcept>
#include <iostream>
#include <cstring>
#include <vector>
#include <boost/system/error_code.hpp>
#include "openClCollatingAndMeshingEngine.h"
#include "pcloudStimulusBuffer.h"
#include "openClKernels.h"
#include "frameAssemblyDesc.h"
#include "ioUringAssemblyEngine.h"
namespace smo {
namespace stim_buff {
OpenClCollatingAndMeshingEngine::OpenClCollatingAndMeshingEngine(
PcloudStimulusBuffer& parent_)
: parent(parent_),
platform(nullptr),
device(nullptr),
context(nullptr),
commandQueue(nullptr),
slotCompactorProgram(nullptr), collateProgram(nullptr),
slotCompactorKernel(nullptr), collateKernel(nullptr),
isSetup(false),
clAssemblyBuffer(nullptr),
clCollationBuffer(nullptr),
isRunning(false),
currentKernelEvent(nullptr),
assemblyBufferPtr(nullptr),
assemblyBufferSize(0),
collationBufferPtr(nullptr),
collationBufferSize(0),
frameAssemblyDesc(nullptr)
{
}
OpenClCollatingAndMeshingEngine::~OpenClCollatingAndMeshingEngine()
{
finalize();
}
bool OpenClCollatingAndMeshingEngine::setup()
{
if (isSetup) {
return true;
}
cl_int err;
cl_queue_properties queueProps[] = {CL_QUEUE_PROPERTIES, 0, 0};
// Get platform
cl_uint numPlatforms;
err = clGetPlatformIDs(1, &platform, &numPlatforms);
if (err != CL_SUCCESS || numPlatforms == 0)
{
std::cerr << __func__ << ": failed to get OpenCL platform: "
<< err << std::endl;
return false;
}
// Get device
err = clGetDeviceIDs(platform, CL_DEVICE_TYPE_GPU, 1, &device, nullptr);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to get GPU device: "
<< err << std::endl;
return false;
}
// Create context
context = clCreateContext(nullptr, 1, &device, nullptr, nullptr, &err);
if (err != CL_SUCCESS || !context)
{
std::cerr << __func__ << ": failed to create OpenCL context: "
<< err << std::endl;
goto cleanup;
}
// Create command queue
commandQueue = clCreateCommandQueueWithProperties(
context, device, queueProps, &err);
if (err != CL_SUCCESS || !commandQueue)
{
std::cerr << __func__ << ": failed to create command queue: "
<< err << std::endl;
goto cleanup;
}
// Declare variables early to avoid goto crossing initialization
struct iovec assemblyIov;
struct iovec collationIov;
// Get StagingBuffer memory pointers from parent
assemblyIov = parent.assemblyBuffer.getClEngineIovec();
collationIov = parent.collationBuffer.getClEngineIovec();
assemblyBufferPtr = assemblyIov.iov_base;
assemblyBufferSize = assemblyIov.iov_len;
collationBufferPtr = collationIov.iov_base;
collationBufferSize = collationIov.iov_len;
// Get FrameAssemblyDesc from assembly buffer
frameAssemblyDesc = static_cast<std::shared_ptr<FrameAssemblyDesc>>(
parent.assemblyBuffer);
if (!frameAssemblyDesc || frameAssemblyDesc->slots.empty())
{
std::cerr << __func__ << ": invalid frame descriptor" << std::endl;
goto cleanup;
}
// Create OpenCL buffers using CL_MEM_USE_HOST_PTR for zero-copy
clAssemblyBuffer = clCreateBuffer(
context,
CL_MEM_USE_HOST_PTR | CL_MEM_READ_WRITE,
assemblyBufferSize, assemblyBufferPtr,
&err);
if (err != CL_SUCCESS || !clAssemblyBuffer)
{
std::cerr << __func__ << ": failed to create assembly buffer: "
<< err << std::endl;
goto cleanup;
}
clCollationBuffer = clCreateBuffer(
context,
CL_MEM_USE_HOST_PTR | CL_MEM_WRITE_ONLY,
collationBufferSize, collationBufferPtr,
&err);
if (err != CL_SUCCESS || !clCollationBuffer)
{
std::cerr << __func__ << ": failed to create collation buffer: "
<< err << std::endl;
goto cleanup;
}
// Compile and prepare both kernels
if (!compileAndPrepareKernels())
{
goto cleanup;
}
isSetup = true;
return true;
cleanup:
finalize();
return false;
}
void OpenClCollatingAndMeshingEngine::finalize()
{
// Call stop() first
stop();
// Release OpenCL buffers in reverse order
if (clCollationBuffer)
{
clReleaseMemObject(clCollationBuffer);
clCollationBuffer = nullptr;
}
if (clAssemblyBuffer)
{
clReleaseMemObject(clAssemblyBuffer);
clAssemblyBuffer = nullptr;
}
// Release kernels
if (slotCompactorKernel)
{
clReleaseKernel(slotCompactorKernel);
slotCompactorKernel = nullptr;
}
if (collateKernel)
{
clReleaseKernel(collateKernel);
collateKernel = nullptr;
}
// Release programs
if (slotCompactorProgram)
{
clReleaseProgram(slotCompactorProgram);
slotCompactorProgram = nullptr;
}
if (collateProgram)
{
clReleaseProgram(collateProgram);
collateProgram = nullptr;
}
// Release command queue
if (commandQueue)
{
clReleaseCommandQueue(commandQueue);
commandQueue = nullptr;
}
// Release context
if (context)
{
clReleaseContext(context);
context = nullptr;
}
// Reset state variables
device = nullptr;
platform = nullptr;
isSetup = false;
isRunning = false;
currentKernelEvent = nullptr;
assemblyBufferPtr = nullptr;
assemblyBufferSize = 0;
collationBufferPtr = nullptr;
collationBufferSize = 0;
frameAssemblyDesc = nullptr;
}
// Static callback for compact kernel event
void CL_CALLBACK OpenClCollatingAndMeshingEngine::compactKernelEventCallback(
cl_event /*event*/, cl_int /*event_command_exec_status*/, void* user_data)
{
OpenClCollatingAndMeshingEngine* engine =
static_cast<OpenClCollatingAndMeshingEngine*>(user_data);
if (!engine || !engine->isRunning || !engine->compactKernelCb)
{ return; }
// Post to io_service to call callback on the correct thread
if (engine->parent.device && engine->parent.device->componentThread)
{
engine->parent.device->componentThread->getIoService().post(
engine->compactKernelCb);
}
}
// Static callback for collate kernel event
void CL_CALLBACK OpenClCollatingAndMeshingEngine::collateKernelEventCallback(
cl_event /*event*/, cl_int /*event_command_exec_status*/, void* user_data)
{
OpenClCollatingAndMeshingEngine* engine =
static_cast<OpenClCollatingAndMeshingEngine*>(user_data);
if (!engine || !engine->isRunning || !engine->collateKernelCb)
{ return; }
// Post to io_service to call callback on the correct thread
if (engine->parent.device && engine->parent.device->componentThread)
{
engine->parent.device->componentThread->getIoService().post(
engine->collateKernelCb);
}
}
bool OpenClCollatingAndMeshingEngine::startCompactKernel(
StagingBuffer& assemblyBuff, uint32_t nSucceeded,
compactKernelCbFn callback)
{
if (!isSetup)
{
std::cerr << __func__ << ": engine not set up" << std::endl;
return false;
}
if (isRunning)
{
std::cerr << __func__ << ": already running, call stop() first"
<< std::endl;
return false;
}
// Validate buffers match what we set up
struct iovec assemblyIov = assemblyBuff.getClEngineIovec();
if (assemblyIov.iov_base != assemblyBufferPtr
|| assemblyIov.iov_len != assemblyBufferSize)
{
throw std::runtime_error(
std::string(__func__) + ": buffer mismatch - buffers have changed");
}
// Store the caller's callback
compactKernelCb = callback;
// Set up kernel arguments for slotCompactor
if (!setupSlotCompactorsArgs(assemblyBuff, nSucceeded)) {
return false;
}
// Enqueue slotCompactor kernel execution (single work item for sequential processing)
size_t globalWorkSize = 1;
cl_int err = clEnqueueNDRangeKernel(
commandQueue, slotCompactorKernel, 1, nullptr, &globalWorkSize, nullptr,
0, nullptr, &currentKernelEvent);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to enqueue slotCompactor kernel: "
<< err << std::endl;
return false;
}
// Set up callback using static member function
err = clSetEventCallback(
currentKernelEvent, CL_COMPLETE, compactKernelEventCallback, this);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set event callback: " << err
<< std::endl;
clReleaseEvent(currentKernelEvent);
currentKernelEvent = nullptr;
return false;
}
isRunning = true;
// startCompactKernel() is synchronous - it returns immediately after setting up kernel execution
// The callback will be invoked when the kernel completes
return true;
}
bool OpenClCollatingAndMeshingEngine::startCollateKernel(
StagingBuffer& assemblyBuff, StagingBuffer& collationBuff,
collateKernelCbFn callback)
{
if (!isSetup)
{
std::cerr << __func__ << ": engine not set up" << std::endl;
return false;
}
if (isRunning)
{
std::cerr << __func__ << ": already running, call stop() first"
<< std::endl;
return false;
}
// Validate buffers match what we set up
struct iovec assemblyIov = assemblyBuff.getClEngineIovec();
struct iovec collationIov = collationBuff.getClEngineIovec();
if (assemblyIov.iov_base != assemblyBufferPtr
|| assemblyIov.iov_len != assemblyBufferSize
|| collationIov.iov_base != collationBufferPtr
|| collationIov.iov_len != collationBufferSize)
{
throw std::runtime_error(
std::string(__func__) + ": buffer mismatch - buffers have changed");
}
// Store the caller's callback
collateKernelCb = callback;
// Set up kernel arguments for collateDgrams
if (!setupCollateDgramsArgs(assemblyBuff)) {
return false;
}
// Enqueue collateDgrams kernel execution
// NDRange is nDgramsPerFrame (one work item per slot)
uint32_t nDgramsPerFrame = static_cast<uint32_t>(
frameAssemblyDesc->numSlots);
size_t globalWorkSize = nDgramsPerFrame;
cl_int err = clEnqueueNDRangeKernel(
commandQueue, collateKernel, 1, nullptr, &globalWorkSize, nullptr,
0, nullptr, &currentKernelEvent);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to enqueue collateDgrams kernel: "
<< err << std::endl;
return false;
}
// Set up callback using static member function
err = clSetEventCallback(
currentKernelEvent, CL_COMPLETE, collateKernelEventCallback, this);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set event callback: " << err
<< std::endl;
clReleaseEvent(currentKernelEvent);
currentKernelEvent = nullptr;
return false;
}
isRunning = true;
// startCollateKernel() is synchronous - it returns immediately after setting up kernel execution
// The callback will be invoked when the kernel completes
return true;
}
bool OpenClCollatingAndMeshingEngine::compileAndPrepareKernel(
const char* kernelSource, size_t kernelSourceLen,
const char* kernelName, cl_program& program, cl_kernel& kernel)
{
cl_int err;
// Create program from source
program = clCreateProgramWithSource(
context, 1, &kernelSource, &kernelSourceLen, &err);
if (err != CL_SUCCESS || !program)
{
std::cerr << __func__ << ": failed to create " << kernelName
<< " program: " << err << std::endl;
return false;
}
// Build program
err = clBuildProgram(program, 1, &device, nullptr, nullptr, nullptr);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to build " << kernelName
<< " program: " << err << std::endl;
// Print build log if available
size_t logSize = 0;
clGetProgramBuildInfo(program, device, CL_PROGRAM_BUILD_LOG,
0, nullptr, &logSize);
if (logSize > 0)
{
std::vector<char> log(logSize);
clGetProgramBuildInfo(program, device, CL_PROGRAM_BUILD_LOG,
logSize, log.data(), nullptr);
std::cerr << kernelName << " build log: " << log.data()
<< std::endl;
}
return false;
}
// Create kernel
kernel = clCreateKernel(program, kernelName, &err);
if (err != CL_SUCCESS || !kernel)
{
std::cerr << __func__ << ": failed to create " << kernelName
<< " kernel: " << err << std::endl;
return false;
}
return true;
}
bool OpenClCollatingAndMeshingEngine::compileAndPrepareKernels()
{
// Compile slotCompactor kernel
if (!compileAndPrepareKernel(
slotCompactorKernelStart, slotCompactorKernelNBytes,
"slotCompactor", slotCompactorProgram, slotCompactorKernel))
{
return false;
}
// Compile collateDgrams kernel
if (!compileAndPrepareKernel(
collateKernelStart, collateKernelNBytes,
"collate", collateProgram, collateKernel))
{
return false;
}
return true;
}
bool OpenClCollatingAndMeshingEngine::setupSlotCompactorsArgs(
StagingBuffer& assemblyBuff, uint32_t nSucceeded)
{
// Extract parameters for slotCompactor kernel
uint32_t numSlots = static_cast<uint32_t>(frameAssemblyDesc->numSlots);
uint32_t slotStride = static_cast<uint32_t>(assemblyBuff.slotStrideNBytes);
uint32_t slotSize = static_cast<uint32_t>(frameAssemblyDesc->slotSizeBytes);
uint32_t firstSlotOffset = static_cast<uint32_t>(
assemblyBuff.firstSlotOffsetNBytes);
uint32_t nSucceededUint = static_cast<uint32_t>(nSucceeded);
// Set kernel arguments for slotCompactor
cl_int err;
err = clSetKernelArg(
slotCompactorKernel, 0, sizeof(cl_mem), &clAssemblyBuffer);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 0: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(slotCompactorKernel, 1, sizeof(uint32_t), &numSlots);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 1: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(slotCompactorKernel, 2, sizeof(uint32_t), &slotStride);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 2: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(slotCompactorKernel, 3, sizeof(uint32_t), &slotSize);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 3: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(
slotCompactorKernel, 4, sizeof(uint32_t), &firstSlotOffset);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 4: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(
slotCompactorKernel, 5, sizeof(uint32_t), &nSucceededUint);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 5: " << err
<< std::endl;
return false;
}
return true;
}
bool OpenClCollatingAndMeshingEngine::setupCollateDgramsArgs(
StagingBuffer& assemblyBuff)
{
// Extract parameters for collateDgrams kernel
uint32_t slotStride = static_cast<uint32_t>(assemblyBuff.slotStrideNBytes);
uint32_t firstSlotOffset = static_cast<uint32_t>(
assemblyBuff.firstSlotOffsetNBytes);
// Calculate nPointsPerSlot from device return mode
if (!parent.device)
{
std::cerr << __func__ << ": device not available" << std::endl;
return false;
}
int returnMode = static_cast<int>(parent.device->currentReturnMode);
uint32_t nPointsPerSlot = static_cast<uint32_t>(
IoUringAssemblyEngine::computePointsPerDgram(returnMode));
uint32_t nDgramsPerFrame = static_cast<uint32_t>(
frameAssemblyDesc->numSlots);
// Set kernel arguments for collateDgrams
cl_int err;
err = clSetKernelArg(collateKernel, 0, sizeof(cl_mem), &clAssemblyBuffer);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 0: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(collateKernel, 1, sizeof(cl_mem), &clCollationBuffer);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 1: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(collateKernel, 2, sizeof(uint32_t), &slotStride);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 2: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(collateKernel, 3, sizeof(uint32_t), &firstSlotOffset);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 3: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(collateKernel, 4, sizeof(uint32_t), &nPointsPerSlot);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 4: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(collateKernel, 5, sizeof(uint32_t), &nDgramsPerFrame);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 5: " << err
<< std::endl;
return false;
}
return true;
}
void OpenClCollatingAndMeshingEngine::stop()
{
if (!isRunning) {
return; // No-op if not running
}
// Cancel kernel execution if possible
if (currentKernelEvent)
{
// Note: OpenCL doesn't have a standard way to cancel kernel execution
// We can try to wait for it to complete or just release the event
// For now, we'll just wait for it to complete
clWaitForEvents(1, &currentKernelEvent);
clReleaseEvent(currentKernelEvent);
currentKernelEvent = nullptr;
}
isRunning = false;
}
void OpenClCollatingAndMeshingEngine::stopCompactKernel()
{
stop();
compactKernelCb = [](){};
}
void OpenClCollatingAndMeshingEngine::stopCollateKernel()
{
stop();
collateKernelCb = [](){};
}
} // namespace stim_buff
} // namespace smo