IoUringAssmEngn: map StagingBuff w/mmap; reg w/io_uring; add eventFd

StagingBuffer:
We now allocate memory with mmap(MAP_ANONYMOUS) so that we can be
sure it can be pinned with io_uring_register_buffers(). This
ensures that if DMA is possible, it should be usable.

	IoUringAssemblyEngine:
We now register an eventfd with io_uring so that we can listen
for CQEs with boost::asio.
This commit is contained in:
2025-11-05 15:34:23 -04:00
parent 0503705a13
commit 94982d50b9
4 changed files with 103 additions and 8 deletions
@@ -1,6 +1,9 @@
#include <boostAsioLinkageFix.h>
#include <cstring>
#include <sys/socket.h>
#include <sys/eventfd.h>
#include <sys/uio.h>
#include <unistd.h>
#include <boost/system/error_code.hpp>
#include <livoxProto1/device.h>
#include <livoxProto1/livoxProto1.h>
@@ -33,7 +36,7 @@ IoUringAssemblyEngine::IoUringAssemblyEngine(PcloudStimulusBuffer& parent_)
: parent(parent_),
frameAssemblyDesc(nullptr), ring{},
isSetup(false),
stallTimer(parent_.device->componentThread->getIoService())
eventfdFd(-1), stallTimer(parent_.device->componentThread->getIoService())
{}
bool IoUringAssemblyEngine::setup()
@@ -61,17 +64,47 @@ bool IoUringAssemblyEngine::setup()
if (udpFd < 0)
{ return false; }
// Declare iovec early to avoid goto crossing initialization
struct iovec iov;
int ret;
/** EXPLANATION:
* Initialize io_uring ring - allocate SQEs and CQEs for one frame assembly
* One SQE per slot (one datagram per slot)
*/
int ret = io_uring_queue_init(
ret = io_uring_queue_init(
static_cast<unsigned int>(frameAssemblyDesc->numSlots), &ring, 0);
if (ret < 0)
{ return false; }
{ goto cleanup; }
// Register staging buffer with io_uring for DMA-apt I/O
iov = parent.stagingBuffer.getIoUringRegisterIoVec();
ret = io_uring_register_buffers(&ring, &iov, 1);
if (ret < 0)
{ goto cleanup_ring; }
// Create eventfd for CQE notifications (used with boost's unified loop)
eventfdFd = eventfd(0, EFD_NONBLOCK);
if (eventfdFd < 0)
{ goto cleanup_buffers; }
// Register eventfd with io_uring
ret = io_uring_register_eventfd(&ring, eventfdFd);
if (ret < 0)
{ goto cleanup_eventfd; }
isSetup = true;
return true;
cleanup_eventfd:
close(eventfdFd);
eventfdFd = -1;
cleanup_buffers:
io_uring_unregister_buffers(&ring);
cleanup_ring:
io_uring_queue_exit(&ring);
cleanup:
return false;
}
void IoUringAssemblyEngine::finalize()
@@ -79,9 +112,16 @@ void IoUringAssemblyEngine::finalize()
// Call stop() to cancel in-flight operations (stop() already cancels the timer)
stop();
// Clean up io_uring ring if it was initialized
if (eventfdFd >= 0)
{
io_uring_unregister_eventfd(&ring);
close(eventfdFd);
eventfdFd = -1;
}
if (isSetup)
{
io_uring_unregister_buffers(&ring);
io_uring_queue_exit(&ring);
isSetup = false;
}
@@ -109,6 +149,9 @@ void IoUringAssemblyEngine::stop()
// 1. Cancel all pending SQEs using io_uring cancellation mechanisms
// 2. Cancel in-flight stall timeout timer via stallTimer.cancel()
// 3. Set appropriate state flags
// Cancel in-flight stall timeout timer
stallTimer.cancel();
}
void IoUringAssemblyEngine::cancelIncompleteAndFillDummies()
@@ -46,6 +46,8 @@ private:
struct io_uring ring;
bool isSetup;
// Eventfd for CQE notifications (used with boost's unified loop)
int eventfdFd;
// Point cloud data socket descriptor
std::shared_ptr<boost::asio::posix::stream_descriptor> pcloudDataFdDesc;
+52 -4
View File
@@ -9,6 +9,9 @@
#include <string>
#include <sstream>
#include <algorithm>
#include <sys/mman.h>
#include <sys/uio.h>
#include <unistd.h>
#include "frameAssemblyDesc.h"
@@ -96,6 +99,18 @@ public:
void startAssembly() { assemblingFlag.store(true); }
void stopAssembly() { assemblingFlag.store(false); }
/** EXPLANATION:
* Returns an iovec for io_uring registration.
* The buffer is mmap()-allocated and suitable for IORING_REGISTER_BUFFERS.
*/
struct iovec getIoUringRegisterIoVec() const
{
struct iovec iov;
iov.iov_base = buffer.get();
iov.iov_len = bufferNBytes;
return iov;
}
inline std::string stringify() const
{
std::ostringstream oss;
@@ -111,8 +126,25 @@ public:
private:
void computeSlotStrideAndBufferSize();
// Buffer data
std::unique_ptr<uint8_t[]> buffer;
// Custom deleter for mmap-allocated buffer
struct MmapDeleter
{
size_t size;
MmapDeleter(size_t s) : size(s) {}
void operator()(uint8_t* ptr) const
{
if (ptr != nullptr && size > 0)
{
munmap(ptr, size);
}
}
};
// Buffer data - mmap-allocated for io_uring registration
// Using unique_ptr<uint8_t, MmapDeleter> instead of array syntax
// since we have a custom deleter that knows the size
std::unique_ptr<uint8_t, MmapDeleter> buffer;
size_t bufferNBytes;
// Layout/invariants
@@ -135,7 +167,7 @@ inline StagingBuffer::StagingBuffer(
const InputEngineConstraints& inputEngineConstraints_,
const OutputEngineConstraints& /*outputEngineConstraints*/,
size_t nDgramsPerFrame)
: buffer(nullptr), bufferNBytes(0),
: buffer(nullptr, MmapDeleter(0)), bufferNBytes(0),
nDgramsPerFrame(nDgramsPerFrame), slotStrideNBytes(0),
inputConstraints(inputEngineConstraints_),
assemblingFlag(false)
@@ -148,7 +180,23 @@ assemblingFlag(false)
computeSlotStrideAndBufferSize();
buffer = std::make_unique<uint8_t[]>(bufferNBytes);
/* Allocate buffer using mmap() for io_uring registration
* MAP_ANONYMOUS | MAP_PRIVATE creates anonymous, non-file-backed memory
*/
void* mmapped = mmap(
nullptr, bufferNBytes,
PROT_READ | PROT_WRITE,
MAP_ANONYMOUS | MAP_PRIVATE,
-1, 0);
if (mmapped == MAP_FAILED)
{
throw std::runtime_error(std::string(__func__)
+ ": StagingBuffer: mmap() failed");
}
buffer = std::unique_ptr<uint8_t, MmapDeleter>(
static_cast<uint8_t*>(mmapped), MmapDeleter(bufferNBytes));
currentNBytes.store(0);
// Build FrameAssemblyDesc once