From 94982d50b901c58e328ef862072044d61f560164 Mon Sep 17 00:00:00 2001 From: Hayodea Hekol Date: Wed, 5 Nov 2025 15:34:23 -0400 Subject: [PATCH] IoUringAssmEngn: map StagingBuff w/mmap; reg w/io_uring; add eventFd StagingBuffer: We now allocate memory with mmap(MAP_ANONYMOUS) so that we can be sure it can be pinned with io_uring_register_buffers(). This ensures that if DMA is possible, it should be usable. IoUringAssemblyEngine: We now register an eventfd with io_uring so that we can listen for CQEs with boost::asio. --- .../livoxGen1/ioUringAssemblyEngine.cpp | 51 +++++++++++++++-- .../livoxGen1/ioUringAssemblyEngine.h | 2 + stimBuffApis/livoxGen1/stagingBuffer.h | 56 +++++++++++++++++-- todo | 2 + 4 files changed, 103 insertions(+), 8 deletions(-) diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp index f345ad8..8882094 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp @@ -1,6 +1,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -33,7 +36,7 @@ IoUringAssemblyEngine::IoUringAssemblyEngine(PcloudStimulusBuffer& parent_) : parent(parent_), frameAssemblyDesc(nullptr), ring{}, isSetup(false), -stallTimer(parent_.device->componentThread->getIoService()) +eventfdFd(-1), stallTimer(parent_.device->componentThread->getIoService()) {} bool IoUringAssemblyEngine::setup() @@ -61,17 +64,47 @@ bool IoUringAssemblyEngine::setup() if (udpFd < 0) { return false; } + // Declare iovec early to avoid goto crossing initialization + struct iovec iov; + int ret; + /** EXPLANATION: * Initialize io_uring ring - allocate SQEs and CQEs for one frame assembly * One SQE per slot (one datagram per slot) */ - int ret = io_uring_queue_init( + ret = io_uring_queue_init( static_cast(frameAssemblyDesc->numSlots), &ring, 0); if (ret < 0) - { return false; } + { goto cleanup; } + + // Register staging buffer with io_uring for DMA-apt I/O + iov = parent.stagingBuffer.getIoUringRegisterIoVec(); + ret = io_uring_register_buffers(&ring, &iov, 1); + if (ret < 0) + { goto cleanup_ring; } + + // Create eventfd for CQE notifications (used with boost's unified loop) + eventfdFd = eventfd(0, EFD_NONBLOCK); + if (eventfdFd < 0) + { goto cleanup_buffers; } + + // Register eventfd with io_uring + ret = io_uring_register_eventfd(&ring, eventfdFd); + if (ret < 0) + { goto cleanup_eventfd; } isSetup = true; return true; + +cleanup_eventfd: + close(eventfdFd); + eventfdFd = -1; +cleanup_buffers: + io_uring_unregister_buffers(&ring); +cleanup_ring: + io_uring_queue_exit(&ring); +cleanup: + return false; } void IoUringAssemblyEngine::finalize() @@ -79,9 +112,16 @@ void IoUringAssemblyEngine::finalize() // Call stop() to cancel in-flight operations (stop() already cancels the timer) stop(); - // Clean up io_uring ring if it was initialized + if (eventfdFd >= 0) + { + io_uring_unregister_eventfd(&ring); + close(eventfdFd); + eventfdFd = -1; + } + if (isSetup) { + io_uring_unregister_buffers(&ring); io_uring_queue_exit(&ring); isSetup = false; } @@ -109,6 +149,9 @@ void IoUringAssemblyEngine::stop() // 1. Cancel all pending SQEs using io_uring cancellation mechanisms // 2. Cancel in-flight stall timeout timer via stallTimer.cancel() // 3. Set appropriate state flags + + // Cancel in-flight stall timeout timer + stallTimer.cancel(); } void IoUringAssemblyEngine::cancelIncompleteAndFillDummies() diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h index 32ecb20..8dc9f92 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h @@ -46,6 +46,8 @@ private: struct io_uring ring; bool isSetup; + // Eventfd for CQE notifications (used with boost's unified loop) + int eventfdFd; // Point cloud data socket descriptor std::shared_ptr pcloudDataFdDesc; diff --git a/stimBuffApis/livoxGen1/stagingBuffer.h b/stimBuffApis/livoxGen1/stagingBuffer.h index 9c0f4a1..6914251 100644 --- a/stimBuffApis/livoxGen1/stagingBuffer.h +++ b/stimBuffApis/livoxGen1/stagingBuffer.h @@ -9,6 +9,9 @@ #include #include #include +#include +#include +#include #include "frameAssemblyDesc.h" @@ -96,6 +99,18 @@ public: void startAssembly() { assemblingFlag.store(true); } void stopAssembly() { assemblingFlag.store(false); } + /** EXPLANATION: + * Returns an iovec for io_uring registration. + * The buffer is mmap()-allocated and suitable for IORING_REGISTER_BUFFERS. + */ + struct iovec getIoUringRegisterIoVec() const + { + struct iovec iov; + iov.iov_base = buffer.get(); + iov.iov_len = bufferNBytes; + return iov; + } + inline std::string stringify() const { std::ostringstream oss; @@ -111,8 +126,25 @@ public: private: void computeSlotStrideAndBufferSize(); - // Buffer data - std::unique_ptr buffer; + // Custom deleter for mmap-allocated buffer + struct MmapDeleter + { + size_t size; + MmapDeleter(size_t s) : size(s) {} + + void operator()(uint8_t* ptr) const + { + if (ptr != nullptr && size > 0) + { + munmap(ptr, size); + } + } + }; + + // Buffer data - mmap-allocated for io_uring registration + // Using unique_ptr instead of array syntax + // since we have a custom deleter that knows the size + std::unique_ptr buffer; size_t bufferNBytes; // Layout/invariants @@ -135,7 +167,7 @@ inline StagingBuffer::StagingBuffer( const InputEngineConstraints& inputEngineConstraints_, const OutputEngineConstraints& /*outputEngineConstraints*/, size_t nDgramsPerFrame) -: buffer(nullptr), bufferNBytes(0), +: buffer(nullptr, MmapDeleter(0)), bufferNBytes(0), nDgramsPerFrame(nDgramsPerFrame), slotStrideNBytes(0), inputConstraints(inputEngineConstraints_), assemblingFlag(false) @@ -148,7 +180,23 @@ assemblingFlag(false) computeSlotStrideAndBufferSize(); - buffer = std::make_unique(bufferNBytes); + /* Allocate buffer using mmap() for io_uring registration + * MAP_ANONYMOUS | MAP_PRIVATE creates anonymous, non-file-backed memory + */ + void* mmapped = mmap( + nullptr, bufferNBytes, + PROT_READ | PROT_WRITE, + MAP_ANONYMOUS | MAP_PRIVATE, + -1, 0); + + if (mmapped == MAP_FAILED) + { + throw std::runtime_error(std::string(__func__) + + ": StagingBuffer: mmap() failed"); + } + + buffer = std::unique_ptr( + static_cast(mmapped), MmapDeleter(bufferNBytes)); currentNBytes.store(0); // Build FrameAssemblyDesc once diff --git a/todo b/todo index fb670c4..3545ba2 100644 --- a/todo +++ b/todo @@ -39,3 +39,5 @@ to the io_context of the thread it should post its callbacks to, and then post callbacks to those io_contexts when UDP cmd responses come in. +* Consider using MAP_HUGEPAGE with both PcloudStimBuff::StagingBuffer + and in the PcloudStimulusBuffer's ringbuff.