Files
salmanoff/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp
T

1022 lines
27 KiB
C++

#include <boostAsioLinkageFix.h>
#include <stdexcept>
#include <iostream>
#include <cstring>
#include <vector>
#include <string>
#include <string_view>
#include <boost/system/error_code.hpp>
#include <asynchronousContinuation.h>
#include <callback.h>
#include <asynchronousLoop.h>
#include <componentThread.h>
#include <user/stimulusFrame.h>
#include "livoxGen1.h"
#include "openClCollatingAndMeshingEngine.h"
#include "pcloudStimulusBuffer.h"
#include "openClKernels.h"
#include "frameAssemblyDesc.h"
#include "ioUringAssemblyEngine.h"
namespace smo {
namespace stim_buff {
/* @brief Helper function to parse OpenCL version string.
* Expected format: "OpenCL <major>.<minor> <vendor info>"
* @param versionStr The OpenCL version string to parse.
* @return A pair of (major, minor) version numbers.
* If parsing fails, returns (-1, -1).
*/
static std::pair<int, int> parseOpenClVersion(const std::string& versionStr)
{
size_t spacePos = versionStr.find(' ');
if (spacePos == std::string::npos) { return {-1, -1}; }
std::string versionNum = versionStr.substr(spacePos + 1);
size_t dotPos = versionNum.find('.');
if (dotPos == std::string::npos) { return {-1, -1}; }
try {
int major = std::stoi(versionNum.substr(0, dotPos));
int minor = std::stoi(versionNum.substr(dotPos + 1));
return {major, minor};
} catch (const std::exception&) {
return {-1, -1};
}
}
/*
* @brief Validates OpenCL version string and checks if it meets minimum requirement.
* @param versionStr The OpenCL version string to validate.
* @param versionType Description of version type (e.g., "platform", "device") for error messages.
* @param minMajor Minimum major version required.
* @param minMinor Minimum minor version required (for the given major version).
* @return true if version is valid and meets minimum requirement, false otherwise.
*/
static bool validateOpenClVersion(
std::string_view versionStr, std::string_view versionType,
int minMajor, int minMinor)
{
auto [major, minor] = parseOpenClVersion(std::string(versionStr));
// Early return if version couldn't be parsed
if (major == -1 && minor == -1)
{
std::cerr << __func__ << ": failed to parse OpenCL " << versionType
<< " version: " << versionStr << std::endl;
return false;
}
// Require minimum version
if (major < minMajor || (major == minMajor && minor < minMinor))
{
std::cerr << __func__ << ": OpenCL " << versionType << " version "
<< major << "." << minor << " found, but " << minMajor << "."
<< minMinor << " or higher is required" << std::endl;
return false;
}
std::cout << __func__ << ": OpenCL " << versionType << " version: "
<< versionStr << std::endl;
return true;
}
OpenClCollatingAndMeshingEngine::OpenClCollatingAndMeshingEngine(
PcloudStimulusBuffer& parent_)
: parent(parent_),
platform(nullptr),
device(nullptr),
context(nullptr),
commandQueue(nullptr),
slotCompactorProgram(nullptr), collateProgram(nullptr),
slotCompactorKernel(nullptr), collateKernel(nullptr),
clAssemblyBuffer(nullptr),
clCollationBuffer(nullptr),
compactIsSetup(false), compactIsRunning(false),
collateIsSetup(false), collateIsRunning(false),
currentCompactKernelEvent(nullptr), currentCollateKernelEvent(nullptr),
assemblyBufferPtr(nullptr),
assemblyBufferSize(0),
collationBufferPtr(nullptr),
collationBufferSize(0),
mappedAssemblyBuffer(nullptr),
mappedCollationBuffer(nullptr),
frameAssemblyDesc(nullptr)
{
}
OpenClCollatingAndMeshingEngine::~OpenClCollatingAndMeshingEngine()
{
finalize();
}
bool OpenClCollatingAndMeshingEngine::setup()
{
if (compactIsSetup && collateIsSetup) {
return true;
}
cl_int err;
cl_command_queue_properties queueProps = 0;
// Get platform
cl_uint numPlatforms;
err = clGetPlatformIDs(1, &platform, &numPlatforms);
if (err != CL_SUCCESS || numPlatforms == 0)
{
std::cerr << __func__ << ": failed to get OpenCL platform: "
<< err << std::endl;
return false;
}
// Get device
err = clGetDeviceIDs(platform, CL_DEVICE_TYPE_GPU, 1, &device, nullptr);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to get GPU device: "
<< err << std::endl;
return false;
}
// Check OpenCL version - require 1.2 or higher
char platformVersion[128];
err = clGetPlatformInfo(platform, CL_PLATFORM_VERSION,
sizeof(platformVersion), platformVersion, nullptr);
if (err == CL_SUCCESS)
{
if (!validateOpenClVersion(platformVersion, "platform", 1, 2)) {
return false;
}
}
// Also check device version
char deviceVersion[128];
err = clGetDeviceInfo(device, CL_DEVICE_VERSION,
sizeof(deviceVersion), deviceVersion, nullptr);
if (err == CL_SUCCESS)
{
if (!validateOpenClVersion(deviceVersion, "device", 1, 2)) {
return false;
}
}
// Create context
context = clCreateContext(nullptr, 1, &device, nullptr, nullptr, &err);
if (err != CL_SUCCESS || !context)
{
std::cerr << __func__ << ": failed to create OpenCL context: "
<< err << std::endl;
goto cleanup;
}
// Create command queue (OpenCL 1.2 API)
commandQueue = clCreateCommandQueue(
context, device, queueProps, &err);
if (err != CL_SUCCESS || !commandQueue)
{
std::cerr << __func__ << ": failed to create command queue: "
<< err << std::endl;
goto cleanup;
}
// Declare variables early to avoid goto crossing initialization
struct iovec assemblyIov;
struct iovec collationIov;
// Get StagingBuffer memory pointers from parent
assemblyIov = parent.assemblyBuffer.getClEngineIovec();
collationIov = parent.collationBuffer.getClEngineIovec();
assemblyBufferPtr = assemblyIov.iov_base;
assemblyBufferSize = assemblyIov.iov_len;
collationBufferPtr = collationIov.iov_base;
collationBufferSize = collationIov.iov_len;
// Get FrameAssemblyDesc from assembly buffer
frameAssemblyDesc = static_cast<std::shared_ptr<FrameAssemblyDesc>>(
parent.assemblyBuffer);
if (!frameAssemblyDesc || frameAssemblyDesc->slots.empty())
{
std::cerr << __func__ << ": invalid frame descriptor" << std::endl;
goto cleanup;
}
// Create OpenCL buffers using CL_MEM_USE_HOST_PTR for zero-copy
clAssemblyBuffer = clCreateBuffer(
context,
CL_MEM_USE_HOST_PTR | CL_MEM_READ_WRITE,
assemblyBufferSize, assemblyBufferPtr,
&err);
if (err != CL_SUCCESS || !clAssemblyBuffer)
{
std::cerr << __func__ << ": failed to create assembly buffer: "
<< err << std::endl;
goto cleanup;
}
clCollationBuffer = clCreateBuffer(
context,
CL_MEM_USE_HOST_PTR | CL_MEM_WRITE_ONLY,
collationBufferSize, collationBufferPtr,
&err);
if (err != CL_SUCCESS || !clCollationBuffer)
{
std::cerr << __func__ << ": failed to create collation buffer: "
<< err << std::endl;
goto cleanup;
}
// Compile and prepare both kernels
if (!compileAndPrepareKernels()) {
goto cleanup;
}
compactIsSetup = true;
collateIsSetup = true;
return true;
cleanup:
finalize();
return false;
}
void OpenClCollatingAndMeshingEngine::finalize()
{
// Call stop() first
stop();
// Release OpenCL buffers in reverse order
if (clCollationBuffer)
{
clReleaseMemObject(clCollationBuffer);
clCollationBuffer = nullptr;
}
if (clAssemblyBuffer)
{
clReleaseMemObject(clAssemblyBuffer);
clAssemblyBuffer = nullptr;
}
// Release kernels
if (slotCompactorKernel)
{
clReleaseKernel(slotCompactorKernel);
slotCompactorKernel = nullptr;
}
if (collateKernel)
{
clReleaseKernel(collateKernel);
collateKernel = nullptr;
}
// Release programs
if (slotCompactorProgram)
{
clReleaseProgram(slotCompactorProgram);
slotCompactorProgram = nullptr;
}
if (collateProgram)
{
clReleaseProgram(collateProgram);
collateProgram = nullptr;
}
// Release command queue
if (commandQueue)
{
clReleaseCommandQueue(commandQueue);
commandQueue = nullptr;
}
// Release context
if (context)
{
clReleaseContext(context);
context = nullptr;
}
// Reset state variables
device = nullptr;
platform = nullptr;
compactIsSetup = false;
compactIsRunning = false;
collateIsSetup = false;
collateIsRunning = false;
currentCompactKernelEvent = nullptr;
currentCollateKernelEvent = nullptr;
assemblyBufferPtr = nullptr;
assemblyBufferSize = 0;
collationBufferPtr = nullptr;
collationBufferSize = 0;
frameAssemblyDesc = nullptr;
}
// Static callback for compact kernel event
void CL_CALLBACK OpenClCollatingAndMeshingEngine::compactKernelEventCallback(
cl_event /*event*/, cl_int event_command_exec_status, void* user_data)
{
OpenClCollatingAndMeshingEngine* engine =
static_cast<OpenClCollatingAndMeshingEngine*>(user_data);
if (!engine || !engine->compactKernelCb)
{ return; }
// Post to io_service to call callback on the correct thread
if (engine->parent.device && engine->parent.device->componentThread)
{
engine->parent.device->componentThread->getIoService().post(
std::bind(engine->compactKernelCb, event_command_exec_status));
}
}
// Static callback for collate kernel event
void CL_CALLBACK OpenClCollatingAndMeshingEngine::collateKernelEventCallback(
cl_event /*event*/, cl_int event_command_exec_status, void* user_data)
{
OpenClCollatingAndMeshingEngine* engine =
static_cast<OpenClCollatingAndMeshingEngine*>(user_data);
if (!engine || !engine->collateKernelCb)
{ return; }
// Post to io_service to call callback on the correct thread
if (engine->parent.device && engine->parent.device->componentThread)
{
engine->parent.device->componentThread->getIoService().post(
std::bind(engine->collateKernelCb, event_command_exec_status));
}
}
bool OpenClCollatingAndMeshingEngine::startCompactKernel(
StagingBuffer& assemblyBuff, uint32_t nSucceeded,
compactKernelCbFn callback)
{
// Store the caller's callback
compactKernelCb = std::move(callback);
// Validate buffers callable
auto validateBuffers = [this, &assemblyBuff]() {
struct iovec assemblyIov = assemblyBuff.getClEngineIovec();
if (assemblyIov.iov_base != assemblyBufferPtr
|| assemblyIov.iov_len != assemblyBufferSize)
{
throw std::runtime_error(
std::string(__func__) + ": buffer mismatch - buffers have "
"changed");
}
};
// Setup args callable
auto setupArgs = [this, &assemblyBuff, nSucceeded]() {
return setupSlotCompactorsArgs(assemblyBuff, nSucceeded);
};
/** EXPLANAITON:
* Map assembly buffer as WRITE_INVALIDATE_REGION to inform OpenCL that host
* (io_uring) has written data, and that the host doesn't care what the
* prior contents of the device's cache see. The device must invalidate
* its own view of the HOST_PTR and accept our view.
*
* Then immediately unmap to let OpenCL make the changes visible to the GPU
*/
if (!mapAssemblyBuffer(CL_MAP_WRITE_INVALIDATE_REGION))
{
std::cerr << __func__ << ": failed to map assembly buffer" << std::endl;
return false;
}
// Unmap immediately to sync host writes to GPU
unmapAssemblyBuffer();
bool success = startKernel(
slotCompactorKernel,
&currentCompactKernelEvent,
setupArgs,
validateBuffers,
1, // globalWorkSize
compactKernelEventCallback,
"slotCompactor",
compactIsSetup,
compactIsRunning);
if (!success) { return false; }
return true;
}
bool OpenClCollatingAndMeshingEngine::startCollateKernel(
StagingBuffer& assemblyBuff, StagingBuffer& collationBuff,
collateKernelCbFn callback)
{
// Store the caller's callback
collateKernelCb = std::move(callback);
/** EXPLANATION:
* It shouldn't be necessary to map the assembly/collation buffers here
* since we don't need to read/write them on the host CPUs (unless we're
* intervening to debug; in which case we should map them as CL_MAP_READ).
*
* Otherwise, the foreign GPU's view of the data in the assembly buffer
* is currently up to date; and the collation buffer's state is undefined...
* and also irrelevant since it's only going to be used for output anyway.
*/
mapAssemblyBuffer(CL_MAP_WRITE_INVALIDATE_REGION);
unmapAssemblyBuffer();
mapCollationBuffer(CL_MAP_WRITE);
unmapCollationBuffer();
// Validate buffers callable
auto validateBuffers = [this, &assemblyBuff, &collationBuff]() {
struct iovec assemblyIov = assemblyBuff.getClEngineIovec();
struct iovec collationIov = collationBuff.getClEngineIovec();
if (assemblyIov.iov_base != assemblyBufferPtr
|| assemblyIov.iov_len != assemblyBufferSize
|| collationIov.iov_base != collationBufferPtr
|| collationIov.iov_len != collationBufferSize)
{
throw std::runtime_error(
std::string(__func__) + ": buffer mismatch - buffers have changed");
}
};
// Setup args callable
auto setupArgs = [this, &assemblyBuff]() {
return setupCollateDgramsArgs(assemblyBuff);
};
// Calculate global work size (just num slots in the frame)
size_t globalWorkSize = static_cast<uint32_t>(frameAssemblyDesc->numSlots);
bool success = startKernel(
collateKernel,
&currentCollateKernelEvent,
setupArgs,
validateBuffers,
globalWorkSize,
collateKernelEventCallback,
"collateDgrams",
collateIsSetup,
collateIsRunning);
if (!success) { return false; }
return true;
}
bool OpenClCollatingAndMeshingEngine::compileAndPrepareKernel(
const char* kernelSource, size_t kernelSourceLen,
const char* kernelName, cl_program& program, cl_kernel& kernel)
{
cl_int err;
// Create program from source
program = clCreateProgramWithSource(
context, 1, &kernelSource, &kernelSourceLen, &err);
if (err != CL_SUCCESS || !program)
{
std::cerr << __func__ << ": failed to create " << kernelName
<< " program: " << err << std::endl;
return false;
}
// Build program
err = clBuildProgram(program, 1, &device, nullptr, nullptr, nullptr);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to build " << kernelName
<< " program: " << err << std::endl;
// Print build log if available
size_t logSize = 0;
clGetProgramBuildInfo(program, device, CL_PROGRAM_BUILD_LOG,
0, nullptr, &logSize);
if (logSize > 0)
{
std::vector<char> log(logSize);
clGetProgramBuildInfo(program, device, CL_PROGRAM_BUILD_LOG,
logSize, log.data(), nullptr);
std::cerr << kernelName << " build log: " << log.data()
<< std::endl;
}
return false;
}
// Create kernel
kernel = clCreateKernel(program, kernelName, &err);
if (err != CL_SUCCESS || !kernel)
{
std::cerr << __func__ << ": failed to create " << kernelName
<< " kernel: " << err << std::endl;
return false;
}
return true;
}
bool OpenClCollatingAndMeshingEngine::compileAndPrepareKernels()
{
// Compile slotCompactor kernel
if (!compileAndPrepareKernel(
slotCompactorKernelStart, slotCompactorKernelNBytes,
"slotCompactor", slotCompactorProgram, slotCompactorKernel))
{
return false;
}
// Compile collateDgrams kernel
if (!compileAndPrepareKernel(
collateKernelStart, collateKernelNBytes,
"collate", collateProgram, collateKernel))
{
return false;
}
return true;
}
bool OpenClCollatingAndMeshingEngine::setupSlotCompactorsArgs(
StagingBuffer& assemblyBuff, uint32_t nSucceeded)
{
// Extract parameters for slotCompactor kernel
uint32_t numSlots = static_cast<uint32_t>(frameAssemblyDesc->numSlots);
uint32_t slotStride = static_cast<uint32_t>(assemblyBuff.slotStrideNBytes);
uint32_t slotSize = static_cast<uint32_t>(frameAssemblyDesc->slotSizeBytes);
uint32_t firstSlotOffset = static_cast<uint32_t>(
assemblyBuff.firstSlotOffsetNBytes);
uint32_t nSucceededUint = static_cast<uint32_t>(nSucceeded);
// Set kernel arguments for slotCompactor
cl_int err;
err = clSetKernelArg(
slotCompactorKernel, 0, sizeof(cl_mem), &clAssemblyBuffer);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 0: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(slotCompactorKernel, 1, sizeof(uint32_t), &numSlots);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 1: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(slotCompactorKernel, 2, sizeof(uint32_t), &slotStride);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 2: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(slotCompactorKernel, 3, sizeof(uint32_t), &slotSize);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 3: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(
slotCompactorKernel, 4, sizeof(uint32_t), &firstSlotOffset);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 4: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(
slotCompactorKernel, 5, sizeof(uint32_t), &nSucceededUint);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 5: " << err
<< std::endl;
return false;
}
return true;
}
bool OpenClCollatingAndMeshingEngine::setupCollateDgramsArgs(
StagingBuffer& assemblyBuff)
{
// Extract parameters for collateDgrams kernel
uint32_t slotStride = static_cast<uint32_t>(assemblyBuff.slotStrideNBytes);
uint32_t firstSlotOffset = static_cast<uint32_t>(
assemblyBuff.firstSlotOffsetNBytes);
// Calculate nPointsPerSlot from device return mode
if (!parent.device)
{
std::cerr << __func__ << ": device not available" << std::endl;
return false;
}
int returnMode = static_cast<int>(parent.device->currentReturnMode);
uint32_t nPointsPerSlot = static_cast<uint32_t>(
IoUringAssemblyEngine::computePointsPerDgram(returnMode));
uint32_t nDgramsPerFrame = static_cast<uint32_t>(
frameAssemblyDesc->numSlots);
// Set kernel arguments for collateDgrams
cl_int err;
err = clSetKernelArg(collateKernel, 0, sizeof(cl_mem), &clAssemblyBuffer);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 0: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(collateKernel, 1, sizeof(cl_mem), &clCollationBuffer);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 1: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(collateKernel, 2, sizeof(uint32_t), &slotStride);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 2: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(collateKernel, 3, sizeof(uint32_t), &firstSlotOffset);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 3: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(collateKernel, 4, sizeof(uint32_t), &nPointsPerSlot);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 4: " << err
<< std::endl;
return false;
}
err = clSetKernelArg(collateKernel, 5, sizeof(uint32_t), &nDgramsPerFrame);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to set kernel arg 5: " << err
<< std::endl;
return false;
}
return true;
}
void OpenClCollatingAndMeshingEngine::stop()
{
stopCompactKernel();
stopCollateKernel();
}
void OpenClCollatingAndMeshingEngine::stopCompactKernel()
{
/** EXPLANATION:
* Technically we should only need to do this if we plan to read the
* compacted slots for debugging purposes. Otherwise this is unnecessary.
*/
mapAssemblyBuffer(CL_MAP_READ);
unmapAssemblyBuffer();
clFlush(commandQueue);
clFinish(commandQueue);
// Stop only compact kernel
if (compactIsRunning && currentCompactKernelEvent)
{
clWaitForEvents(1, &currentCompactKernelEvent);
clReleaseEvent(currentCompactKernelEvent);
currentCompactKernelEvent = nullptr;
compactIsRunning = false;
}
compactKernelCb = [](cl_int){};
}
void OpenClCollatingAndMeshingEngine::stopCollateKernel()
{
/** EXPLANATION:
* Technically we should only need to do this if we plan to read the
* collated dgrams for debugging purposes. Otherwise this is unnecessary.
*/
mapCollationBuffer(CL_MAP_READ);
unmapCollationBuffer();
clFlush(commandQueue);
clFinish(commandQueue);
// Stop only collate kernel
if (collateIsRunning && currentCollateKernelEvent)
{
clWaitForEvents(1, &currentCollateKernelEvent);
clReleaseEvent(currentCollateKernelEvent);
currentCollateKernelEvent = nullptr;
collateIsRunning = false;
}
collateKernelCb = [](cl_int){};
}
bool OpenClCollatingAndMeshingEngine::mapBuffer(
cl_mem buffer, size_t size, cl_map_flags mapFlags, void*& mappedPtr)
{
if (!commandQueue || !buffer)
{
std::cerr << __func__ << ": engine not set up or invalid buffer"
<< std::endl;
return false;
}
// If already mapped, return early with success.
if (mappedPtr != nullptr) { return true; }
cl_int err;
cl_event mapEvent;
mappedPtr = clEnqueueMapBuffer(
commandQueue, buffer, CL_TRUE, mapFlags,
0, size, 0, nullptr, &mapEvent, &err);
if (err != CL_SUCCESS || !mappedPtr)
{
std::cerr << __func__ << ": failed to map buffer: " << err
<< std::endl;
mappedPtr = nullptr;
return false;
}
return true;
}
bool OpenClCollatingAndMeshingEngine::unmapBuffer(cl_mem buffer, void*& mappedPtr)
{
if (mappedPtr == nullptr)
{
// Already unmapped
return true;
}
if (!commandQueue || !buffer)
{
std::cerr << __func__ << ": engine not set up or invalid buffer" << std::endl;
return false;
}
cl_int err;
cl_event unmapEvent;
err = clEnqueueUnmapMemObject(
commandQueue, buffer, mappedPtr,
0, nullptr, &unmapEvent);
if (err != CL_SUCCESS)
{
std::cerr << __func__ << ": failed to unmap buffer: " << err
<< std::endl;
return false;
}
mappedPtr = nullptr;
return true;
}
bool OpenClCollatingAndMeshingEngine::mapAssemblyBuffer(cl_map_flags mapFlags)
{
return mapBuffer(
clAssemblyBuffer, assemblyBufferSize, mapFlags, mappedAssemblyBuffer);
}
bool OpenClCollatingAndMeshingEngine::unmapAssemblyBuffer()
{
unmapBuffer(clAssemblyBuffer, mappedAssemblyBuffer);
mappedAssemblyBuffer = nullptr;
return true;
}
bool OpenClCollatingAndMeshingEngine::mapCollationBuffer(cl_map_flags mapFlags)
{
return mapBuffer(
clCollationBuffer, collationBufferSize, mapFlags,
mappedCollationBuffer);
}
bool OpenClCollatingAndMeshingEngine::unmapCollationBuffer()
{
unmapBuffer(clCollationBuffer, mappedCollationBuffer);
mappedCollationBuffer = nullptr;
return true;
}
class OpenClCollatingAndMeshingEngine::CompactCollateAndMeshFrameReq
: public PostedAsynchronousContinuation<compactCollateAndMeshFrameReqCbFn>
{
private:
OpenClCollatingAndMeshingEngine& engine;
AsynchronousLoop frameAssemblyResult;
StimulusFrame& stimulusFrame;
public:
CompactCollateAndMeshFrameReq(
OpenClCollatingAndMeshingEngine& engine_,
AsynchronousLoop& asyncLoop,
StimulusFrame& stimulusFrame_,
const std::shared_ptr<ComponentThread>& caller,
Callback<compactCollateAndMeshFrameReqCbFn> cb)
: PostedAsynchronousContinuation<compactCollateAndMeshFrameReqCbFn>(
caller, cb),
engine(engine_),
frameAssemblyResult(asyncLoop), stimulusFrame(stimulusFrame_)
{}
public:
void callOriginalCallback(bool success)
{ callOriginalCb(success, std::ref(stimulusFrame)); }
public:
void compactCollateAndMeshFrameReq1_doCompact_posted(
std::shared_ptr<CompactCollateAndMeshFrameReq> context)
{
// Record compact kernel start time
engine.compactKernelStartTime = std::chrono::high_resolution_clock::now();
bool success = engine.startCompactKernel(
engine.parent.assemblyBuffer,
static_cast<uint32_t>(context->frameAssemblyResult.nSucceeded.load()),
std::bind(
&CompactCollateAndMeshFrameReq
::compactCollateAndMeshFrameReq2_compactDone_posted,
context.get(), context,
std::placeholders::_1));
if (!success)
{
engine.stopCompactKernel();
callOriginalCallback(false);
return;
}
}
void compactCollateAndMeshFrameReq2_compactDone_posted(
std::shared_ptr<CompactCollateAndMeshFrameReq> context,
cl_int compactStatus)
{
engine.stopCompactKernel();
// Record compact kernel end time
engine.compactKernelEndTime = std::chrono::high_resolution_clock::now();
// If compact failed, call callback directly with failure
if (compactStatus != CL_SUCCESS)
{
callOriginalCallback(false);
return;
}
#if 0
// Print first 4 bytes of each slot
if (engine.frameAssemblyDesc)
{
for (size_t i = 0; i < engine.frameAssemblyDesc->numSlots; ++i) {
engine.parent.ioUringAssemblyEngine.printSlotBytes(i, 4);
}
}
#endif
context->compactCollateAndMeshFrameReq3_doCollate_posted(context);
}
void compactCollateAndMeshFrameReq3_doCollate_posted(
std::shared_ptr<CompactCollateAndMeshFrameReq> context)
{
// Record collate kernel start time
engine.collateKernelStartTime = std::chrono::high_resolution_clock::now();
bool success = engine.startCollateKernel(
engine.parent.assemblyBuffer, engine.parent.collationBuffer,
std::bind(
&CompactCollateAndMeshFrameReq
::compactCollateAndMeshFrameReq4_collateDone_maybePosted,
context.get(), context,
std::placeholders::_1));
if (!success)
{
engine.stopCollateKernel();
callOriginalCallback(false);
return;
}
}
void compactCollateAndMeshFrameReq4_collateDone_maybePosted(
[[maybe_unused]] std::shared_ptr<CompactCollateAndMeshFrameReq> context,
cl_int collateStatus)
{
engine.stopCollateKernel();
// Record collate kernel end time
engine.collateKernelEndTime = std::chrono::high_resolution_clock::now();
bool success = (collateStatus == CL_SUCCESS);
// Early callback + return pattern
if (!success)
{
callOriginalCallback(false);
return;
}
int returnMode = static_cast<int>(engine.parent.device->currentReturnMode);
size_t pointsPerDgram = IoUringAssemblyEngine::computePointsPerDgram(returnMode);
uint32_t nSucceeded = context->frameAssemblyResult.nSucceeded.load();
size_t totalPoints = nSucceeded * pointsPerDgram;
// Count points with intensity greater than 116
float* collationFloats = static_cast<float*>(engine.collationBufferPtr);
size_t highIntensityCount = 0;
for (size_t i = 0; i < totalPoints; ++i)
{
float intensity = collationFloats[i * 4 + 3];
if (intensity > 116.0f)
{
++highIntensityCount;
}
}
std::cout << __func__ << ": pointsPerDgram=" << pointsPerDgram
<< ", nSucceeded=" << nSucceeded
<< ", totalPoints=" << totalPoints
<< ", highIntensityCount=" << highIntensityCount << std::endl;
callOriginalCallback(success);
}
};
void OpenClCollatingAndMeshingEngine::compactCollateAndMeshFrameReq(
AsynchronousLoop& asyncLoop, StimulusFrame& stimulusFrame,
Callback<compactCollateAndMeshFrameReqCbFn> callback)
{
auto caller = smoHooksPtr->ComponentThread_getSelf();
auto request = std::make_shared<CompactCollateAndMeshFrameReq>(
*this, asyncLoop, stimulusFrame,
caller,
std::move(callback));
// Check if compaction is needed
bool needsCompaction = IoUringAssemblyEngine::compactionIsNeeded(
asyncLoop.nSucceeded.load(), asyncLoop.nTotal);
// Start with compaction if needed, then chain to collation
if (needsCompaction)
{
parent.device->componentThread->getIoService().post(
STC(std::bind(
&CompactCollateAndMeshFrameReq
::compactCollateAndMeshFrameReq1_doCompact_posted,
request.get(), request)));
}
else
{
// Skip compaction, go straight to collation
parent.device->componentThread->getIoService().post(
STC(std::bind(
&CompactCollateAndMeshFrameReq
::compactCollateAndMeshFrameReq3_doCollate_posted,
request.get(), request)));
}
}
std::chrono::milliseconds OpenClCollatingAndMeshingEngine::getCompactKernelDuration() const
{
auto duration = compactKernelEndTime - compactKernelStartTime;
if (duration.count() < 0)
{
return std::chrono::milliseconds(0);
}
return std::chrono::duration_cast<std::chrono::milliseconds>(duration);
}
std::chrono::milliseconds OpenClCollatingAndMeshingEngine::getCollateKernelDuration() const
{
auto duration = collateKernelEndTime - collateKernelStartTime;
if (duration.count() < 0)
{
return std::chrono::milliseconds(0);
}
return std::chrono::duration_cast<std::chrono::milliseconds>(duration);
}
} // namespace stim_buff
} // namespace smo