From 1d64ce0c7ee82070030e5522c7f2534f18ec9cd5 Mon Sep 17 00:00:00 2001 From: Hayodea Hekol Date: Thu, 2 Apr 2026 03:51:22 -0400 Subject: [PATCH] StagingBuff: support both Mlock & IOUring pin; Use in IoUAssmEngn We use io_uring_register_buffers() for IoUringAssemblyEngine instead of using mlock(). This __appears__ to have reduced CPU utilization on the Dell laptop. Could also be that we recently upgraded total RAM from 8GiB to 32GiB. --- commonLibs/attachmentSupport/CMakeLists.txt | 11 ++ .../attachmentSupport/stagingBuffer.cpp | 141 ++++++++++++++++-- include/user/stagingBuffer.h | 83 ++++++++++- include/user/stimulusBuffer.h | 8 +- .../livoxGen1/ioUringAssemblyEngine.cpp | 41 +++-- .../livoxGen1/ioUringAssemblyEngine.h | 3 +- stimBuffApis/livoxGen1/meshStimulusBuffer.h | 7 +- .../livoxGen1/pcloudAmbienceStimulusBuffer.h | 6 +- .../livoxGen1/pcloudIntensityStimulusBuffer.h | 6 +- .../livoxGen1/pcloudStimulusProducer.cpp | 3 + .../livoxGen1/pcloudStimulusProducer.h | 9 +- 11 files changed, 257 insertions(+), 61 deletions(-) diff --git a/commonLibs/attachmentSupport/CMakeLists.txt b/commonLibs/attachmentSupport/CMakeLists.txt index df4a7e0..c585901 100644 --- a/commonLibs/attachmentSupport/CMakeLists.txt +++ b/commonLibs/attachmentSupport/CMakeLists.txt @@ -1,3 +1,5 @@ +pkg_check_modules(ATTACHMENT_SUPPORT_URING REQUIRED liburing) + add_library(attachmentSupport SHARED compute.cpp stimulusProducer.cpp @@ -14,12 +16,21 @@ target_include_directories(attachmentSupport PUBLIC ${CMAKE_SOURCE_DIR}/include ${CMAKE_BINARY_DIR}/include ) +target_include_directories(attachmentSupport PRIVATE + ${ATTACHMENT_SUPPORT_URING_INCLUDE_DIRS} +) target_link_libraries(attachmentSupport PUBLIC Boost::system Boost::log spinscale ) +target_link_libraries(attachmentSupport PRIVATE + ${ATTACHMENT_SUPPORT_URING_LIBRARIES} +) +target_link_directories(attachmentSupport PRIVATE + ${ATTACHMENT_SUPPORT_URING_LIBRARY_DIRS} +) # Verify Boost dynamic dependencies after build add_custom_command(TARGET attachmentSupport POST_BUILD diff --git a/commonLibs/attachmentSupport/stagingBuffer.cpp b/commonLibs/attachmentSupport/stagingBuffer.cpp index b55aedc..9b83b0d 100644 --- a/commonLibs/attachmentSupport/stagingBuffer.cpp +++ b/commonLibs/attachmentSupport/stagingBuffer.cpp @@ -1,15 +1,33 @@ #include +#include #include #include #include #include #include +#include #include namespace smo { namespace stim_buff { +static const char* pinningMechanismToString( + StagingBuffer::PinningMechanism mechanism) +{ + switch (mechanism) + { + case StagingBuffer::PinningMechanism::NONE: + return "NONE"; + case StagingBuffer::PinningMechanism::MLOCK: + return "MLOCK"; + case StagingBuffer::PinningMechanism::IO_URING: + return "IO_URING"; + } + + return "Unknown"; +} + // Static defaults for io_uring const StagingBuffer::IOEngineConstraints StagingBuffer::IOEngineConstraints::ioUringConstraints( @@ -169,11 +187,9 @@ StagingBuffer::StagingBuffer( const IOEngineConstraints& inputEngineConstraints_, const IOEngineConstraints& /*outputEngineConstraints*/, size_t nSlots) -: buffer(nullptr, MmapDeleter(0)), bufferNBytes(0), -nSlots(nSlots), slotStrideNBytes(0), -firstSlotOffsetNBytes(0), -inputConstraints(inputEngineConstraints_), -assemblingFlag(false) +: buffer(nullptr, MmapDeleter(0)), +nSlots(nSlots), +inputConstraints(inputEngineConstraints_) { if (nSlots == 0) { @@ -202,13 +218,6 @@ assemblingFlag(false) static_cast(mmapped), MmapDeleter(bufferNBytes)); currentNBytes.store(0); - // Lock the buffer in memory to prevent swapping - if (mlock(buffer.get(), bufferNBytes) != 0) - { - throw std::runtime_error(std::string(__func__) - + ": StagingBuffer: mlock() failed"); - } - // Calculate offset and validate invariants (helper function in .cpp) firstSlotOffsetNBytes = StagingBuffer::calculateFirstSlotOffsetAndValidate( buffer.get(), bufferNBytes, nSlots, @@ -232,5 +241,113 @@ assemblingFlag(false) std::move(slots)); } +StagingBuffer::~StagingBuffer() +{ + assert(!currentlyPinned); +} + +StagingBuffer::Pinner::Pinner(StagingBuffer& parent_) +: parent(parent_) +{} + +void StagingBuffer::assertUnpinnedAndMarkPinned(PinningMechanism mechanism) +{ + if (currentlyPinned) + { + throw std::runtime_error( + std::string(__func__) + ": StagingBuffer already pinned with " + + pinningMechanismToString(currentPinningMechanism)); + } + + currentlyPinned = true; + currentPinningMechanism = mechanism; +} + +std::unique_ptr StagingBuffer::makeMlockPinner() +{ + return std::make_unique(*this); +} + +std::unique_ptr StagingBuffer::makeIoUringPinner( + struct io_uring* ring) +{ + return std::make_unique(*this, ring); +} + +StagingBuffer::MlockPinner::MlockPinner(StagingBuffer& parent_) +: Pinner(parent_) +{ + if (!parent.buffer || parent.bufferNBytes == 0) + { + throw std::runtime_error(std::string(__func__) + + ": Cannot mlock an uninitialized StagingBuffer"); + } + + parent.assertUnpinnedAndMarkPinned(PinningMechanism::MLOCK); + + if (mlock(parent.buffer.get(), parent.bufferNBytes) != 0) + { + parent.currentlyPinned = false; + parent.currentPinningMechanism = PinningMechanism::NONE; + throw std::runtime_error(std::string(__func__) + + ": mlock() failed"); + } +} + +StagingBuffer::MlockPinner::~MlockPinner() +{ + assert(parent.currentlyPinned); + assert(parent.currentPinningMechanism == PinningMechanism::MLOCK); + + int ret = munlock(parent.buffer.get(), parent.bufferNBytes); + assert(ret == 0); + (void)ret; + + parent.currentlyPinned = false; + parent.currentPinningMechanism = PinningMechanism::NONE; +} + +StagingBuffer::IoUringPinner::IoUringPinner( + StagingBuffer& parent_, struct io_uring* ring_) +: Pinner(parent_), ring(ring_) +{ + if (!ring) + { + throw std::runtime_error(std::string(__func__) + + ": io_uring ring pointer is null"); + } + + if (!parent.buffer || parent.bufferNBytes == 0) + { + throw std::runtime_error(std::string(__func__) + + ": Cannot register an uninitialized StagingBuffer"); + } + + parent.assertUnpinnedAndMarkPinned(PinningMechanism::IO_URING); + + struct iovec iov = parent.getIoUringRegisterIoVec(); + int ret = io_uring_register_buffers(ring, &iov, 1); + if (ret < 0) + { + parent.currentlyPinned = false; + parent.currentPinningMechanism = PinningMechanism::NONE; + throw std::runtime_error(std::string(__func__) + + ": io_uring_register_buffers failed"); + } +} + +StagingBuffer::IoUringPinner::~IoUringPinner() +{ + assert(parent.currentlyPinned); + assert(parent.currentPinningMechanism == PinningMechanism::IO_URING); + + int ret = io_uring_unregister_buffers(ring); + assert(ret == 0); + (void)ret; + + parent.currentlyPinned = false; + parent.currentPinningMechanism = PinningMechanism::NONE; +} + } // namespace stim_buff } // namespace smo diff --git a/include/user/stagingBuffer.h b/include/user/stagingBuffer.h index 05622fa..e5dbfdb 100644 --- a/include/user/stagingBuffer.h +++ b/include/user/stagingBuffer.h @@ -15,6 +15,14 @@ namespace stim_buff { // Forward declaration class FrameAssemblyDesc; +} // namespace stim_buff +} // namespace smo + +struct io_uring; + +namespace smo { +namespace stim_buff { + /** * StagingBuffer manages a large buffer to guide io_uring in assembling some * number of Livox Avia pcloud UDP dgrams into a single stim frame. @@ -28,6 +36,13 @@ class FrameAssemblyDesc; class StagingBuffer { public: + enum class PinningMechanism + { + NONE, + MLOCK, + IO_URING + }; + class IOEngineConstraints { public: @@ -83,7 +98,7 @@ public: const IOEngineConstraints& inputEngineConstraints, const IOEngineConstraints& outputEngineConstraints, size_t nSlots); - ~StagingBuffer() = default; + ~StagingBuffer(); // Non-copyable, movable StagingBuffer(const StagingBuffer&) = delete; @@ -91,6 +106,50 @@ public: StagingBuffer(StagingBuffer&&) = default; StagingBuffer& operator=(StagingBuffer&&) = default; + class Pinner + { + public: + Pinner(const Pinner&) = delete; + Pinner& operator=(const Pinner&) = delete; + Pinner(Pinner&&) = delete; + Pinner& operator=(Pinner&&) = delete; + + protected: + explicit Pinner(StagingBuffer& parent_); + ~Pinner() = default; + + StagingBuffer& parent; + }; + + class MlockPinner + : public Pinner + { + public: + explicit MlockPinner(StagingBuffer& parent); + ~MlockPinner(); + + MlockPinner(const MlockPinner&) = delete; + MlockPinner& operator=(const MlockPinner&) = delete; + MlockPinner(MlockPinner&&) = delete; + MlockPinner& operator=(MlockPinner&&) = delete; + }; + + class IoUringPinner + : public Pinner + { + public: + IoUringPinner(StagingBuffer& parent, struct io_uring* ring); + ~IoUringPinner(); + + IoUringPinner(const IoUringPinner&) = delete; + IoUringPinner& operator=(const IoUringPinner&) = delete; + IoUringPinner(IoUringPinner&&) = delete; + IoUringPinner& operator=(IoUringPinner&&) = delete; + + private: + struct io_uring* ring; + }; + public: /** EXPLANATION: * Returns an input-engine-agnostic descriptor describing per-frame packet @@ -104,6 +163,9 @@ public: void startAssembly() { assemblingFlag.store(true); } void stopAssembly() { assemblingFlag.store(false); } + std::unique_ptr makeMlockPinner(); + std::unique_ptr makeIoUringPinner(struct io_uring* ring); + /** EXPLANATION: * Returns an iovec for io_uring registration. * The buffer is mmap()-allocated and suitable for IORING_REGISTER_BUFFERS. @@ -144,6 +206,7 @@ public: private: void computeSlotStrideAndBufferSize(); + void assertUnpinnedAndMarkPinned(PinningMechanism mechanism); static size_t calculateFirstSlotOffsetAndValidate( uint8_t* buffer, size_t bufferNBytes, @@ -163,7 +226,6 @@ private: { if (ptr != nullptr && size > 0) { - munlock(ptr, size); munmap(ptr, size); } } @@ -173,14 +235,14 @@ private: // Using unique_ptr instead of array syntax // since we have a custom deleter that knows the size std::unique_ptr buffer; - size_t bufferNBytes; + size_t bufferNBytes = 0; // Layout/invariants - size_t nSlots; + size_t nSlots = 0; public: - size_t slotStrideNBytes; - size_t firstSlotOffsetNBytes; // offset from buffer start to first slot + size_t slotStrideNBytes = 0; + size_t firstSlotOffsetNBytes = 0; // offset from buffer start to first slot private: IOEngineConstraints inputConstraints; @@ -189,8 +251,13 @@ private: mutable std::shared_ptr frameDesc; // Current state - std::atomic currentNBytes; - std::atomic assemblingFlag; + std::atomic currentNBytes{0}; + std::atomic assemblingFlag{false}; + bool currentlyPinned = false; + PinningMechanism currentPinningMechanism = PinningMechanism::NONE; + + friend class MlockPinner; + friend class IoUringPinner; }; } // namespace stim_buff diff --git a/include/user/stimulusBuffer.h b/include/user/stimulusBuffer.h index 15750c1..67b0531 100644 --- a/include/user/stimulusBuffer.h +++ b/include/user/stimulusBuffer.h @@ -44,6 +44,7 @@ public: inputEngineConstraints, outputEngineConstraints, static_cast(histbuffMs / CONFIG_STIMBUFF_FRAME_PERIOD_MS)), + stagingBufferPinner(stagingBuffer.makeMlockPinner()), ringBuffer( static_cast>(stagingBuffer), callbacks, flags) @@ -51,17 +52,18 @@ public: virtual ~StimulusBuffer() = default; - // Non-copyable, movable + // Non-copyable, non-movable: pinner lifetime is tied to this instance StimulusBuffer(const StimulusBuffer&) = delete; StimulusBuffer& operator=(const StimulusBuffer&) = delete; - StimulusBuffer(StimulusBuffer&&) = default; - StimulusBuffer& operator=(StimulusBuffer&&) = default; + StimulusBuffer(StimulusBuffer&&) = delete; + StimulusBuffer& operator=(StimulusBuffer&&) = delete; public: StimulusProducer& parent; std::shared_ptr deviceAttachmentSpec; int histbuffMs; StagingBuffer stagingBuffer; + std::unique_ptr stagingBufferPinner; SpMcRingBuffer ringBuffer; }; diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp index 08ce1f4..9732fa3 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp @@ -27,8 +27,6 @@ #include "pcloudStimulusProducer.h" #include "livoxGen1.h" -// #define REGISTER_IOURING_BUFFERS - namespace smo { namespace stim_buff { @@ -116,10 +114,6 @@ bool IoUringAssemblyEngine::setup() assembledSlotsTracker[i].ioVec.iov_len = slot.nBytes; } - // Declare iovec early to avoid goto crossing initialization -#ifdef REGISTER_IOURING_BUFFERS - struct iovec iov; -#endif int ret; /** EXPLANATION: @@ -134,23 +128,27 @@ bool IoUringAssemblyEngine::setup() if (ret < 0) { goto cleanup; } -#ifdef REGISTER_IOURING_BUFFERS - // Register staging buffer with io_uring for DMA-apt I/O - iov = parent.assemblyBuffer.getIoUringRegisterIoVec(); - ret = io_uring_register_buffers(&ring, &iov, 1); - if (ret < 0) - { goto cleanup_ring; } -#endif + try + { + if (assemblyBufferIoUringPinner) + { + throw std::runtime_error(std::string(__func__) + + ": assemblyBufferIoUringPinner already exists"); + } + + assemblyBufferIoUringPinner = + parent.assemblyBuffer.makeIoUringPinner(&ring); + } + catch (const std::exception&) + { + goto cleanup_ring; + } // Create eventfd for CQE notifications (used with boost's unified loop) eventfdFd = eventfd(0, EFD_NONBLOCK); if (eventfdFd < 0) { -#ifdef REGISTER_IOURING_BUFFERS - goto cleanup_buffers; -#else goto cleanup_ring; -#endif } // Register eventfd with io_uring @@ -164,11 +162,8 @@ bool IoUringAssemblyEngine::setup() cleanup_eventfd: close(eventfdFd); eventfdFd = -1; -#ifdef REGISTER_IOURING_BUFFERS -cleanup_buffers: - io_uring_unregister_buffers(&ring); -#endif cleanup_ring: + assemblyBufferIoUringPinner.reset(); io_uring_queue_exit(&ring); cleanup: return false; @@ -217,9 +212,7 @@ void IoUringAssemblyEngine::finalize() if (wasAcceptingRequests) { -#ifdef REGISTER_IOURING_BUFFERS - io_uring_unregister_buffers(&ring); -#endif + assemblyBufferIoUringPinner.reset(); io_uring_queue_exit(&ring); } diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h index 6c7696d..2003498 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h @@ -21,6 +21,7 @@ #include #include #include +#include #define IOURINGASSM_ENGN_FRAME_ASSEM_TIMEOUT_MS \ (CONFIG_STIMBUFF_FRAME_PERIOD_MS / 2) @@ -73,6 +74,7 @@ private: uint64_t eventfd_value; // Buffer for async_read_some // Point cloud data socket descriptor std::shared_ptr pcloudDataFdDesc; + std::unique_ptr assemblyBufferIoUringPinner; // Stall detection timer boost::asio::deadline_timer stallTimer; @@ -122,4 +124,3 @@ public: #endif // _LIVOX_GEN1_IOURING_ASSEMBLY_ENGINE_H - diff --git a/stimBuffApis/livoxGen1/meshStimulusBuffer.h b/stimBuffApis/livoxGen1/meshStimulusBuffer.h index b208bd8..81e5608 100644 --- a/stimBuffApis/livoxGen1/meshStimulusBuffer.h +++ b/stimBuffApis/livoxGen1/meshStimulusBuffer.h @@ -34,15 +34,14 @@ public: ~MeshStimulusBuffer() = default; - // Non-copyable, movable + // Non-copyable, non-movable: inherited pinner lifetime is instance-bound MeshStimulusBuffer(const MeshStimulusBuffer&) = delete; MeshStimulusBuffer& operator=(const MeshStimulusBuffer&) = delete; - MeshStimulusBuffer(MeshStimulusBuffer&&) = default; - MeshStimulusBuffer& operator=(MeshStimulusBuffer&&) = default; + MeshStimulusBuffer(MeshStimulusBuffer&&) = delete; + MeshStimulusBuffer& operator=(MeshStimulusBuffer&&) = delete; }; } // namespace stim_buff } // namespace smo #endif // _LIVOX_GEN1_MESH_STIMULUS_BUFFER_H - diff --git a/stimBuffApis/livoxGen1/pcloudAmbienceStimulusBuffer.h b/stimBuffApis/livoxGen1/pcloudAmbienceStimulusBuffer.h index 4154a5b..d333961 100644 --- a/stimBuffApis/livoxGen1/pcloudAmbienceStimulusBuffer.h +++ b/stimBuffApis/livoxGen1/pcloudAmbienceStimulusBuffer.h @@ -73,11 +73,11 @@ public: ~PcloudAmbienceStimulusBuffer() = default; - // Non-copyable, movable + // Non-copyable, non-movable: inherited pinner lifetime is instance-bound PcloudAmbienceStimulusBuffer(const PcloudAmbienceStimulusBuffer&) = delete; PcloudAmbienceStimulusBuffer& operator=(const PcloudAmbienceStimulusBuffer&) = delete; - PcloudAmbienceStimulusBuffer(PcloudAmbienceStimulusBuffer&&) = default; - PcloudAmbienceStimulusBuffer& operator=(PcloudAmbienceStimulusBuffer&&) = default; + PcloudAmbienceStimulusBuffer(PcloudAmbienceStimulusBuffer&&) = delete; + PcloudAmbienceStimulusBuffer& operator=(PcloudAmbienceStimulusBuffer&&) = delete; public: uint32_t postrinInterestPercentage; diff --git a/stimBuffApis/livoxGen1/pcloudIntensityStimulusBuffer.h b/stimBuffApis/livoxGen1/pcloudIntensityStimulusBuffer.h index e978f30..2158bba 100644 --- a/stimBuffApis/livoxGen1/pcloudIntensityStimulusBuffer.h +++ b/stimBuffApis/livoxGen1/pcloudIntensityStimulusBuffer.h @@ -35,15 +35,15 @@ public: ~PcloudIntensityStimulusBuffer() = default; - // Non-copyable, movable + // Non-copyable, non-movable: inherited pinner lifetime is instance-bound PcloudIntensityStimulusBuffer( const PcloudIntensityStimulusBuffer&) = delete; PcloudIntensityStimulusBuffer& operator=( const PcloudIntensityStimulusBuffer&) = delete; PcloudIntensityStimulusBuffer( - PcloudIntensityStimulusBuffer&&) = default; + PcloudIntensityStimulusBuffer&&) = delete; PcloudIntensityStimulusBuffer& operator=( - PcloudIntensityStimulusBuffer&&) = default; + PcloudIntensityStimulusBuffer&&) = delete; }; } // namespace stim_buff diff --git a/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp b/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp index 0ab8dfe..0d0c4d4 100644 --- a/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp +++ b/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp @@ -95,10 +95,13 @@ collationBuffer( StagingBuffer::IOEngineConstraints::openClInputConstraints, StagingBuffer::IOEngineConstraints::openClInputConstraints, nDgramsPerStagingBufferFrame), +collationBufferMlockPinner(collationBuffer.makeMlockPinner()), averageIntensityBuffer( openClAverageIntensityConstraints, openClAverageIntensityConstraints, nDgramsPerStagingBufferFrame), +averageIntensityBufferMlockPinner( + averageIntensityBuffer.makeMlockPinner()), tempStimulusFrameMem(0), tempStimulusFrame( FrameAssemblyDesc::SlotDesc{ diff --git a/stimBuffApis/livoxGen1/pcloudStimulusProducer.h b/stimBuffApis/livoxGen1/pcloudStimulusProducer.h index 7d2f016..5977b8b 100644 --- a/stimBuffApis/livoxGen1/pcloudStimulusProducer.h +++ b/stimBuffApis/livoxGen1/pcloudStimulusProducer.h @@ -52,11 +52,11 @@ public: ~PcloudStimulusProducer() = default; - // Non-copyable, movable + // Non-copyable, non-movable: pinners hold buffer instance references PcloudStimulusProducer(const PcloudStimulusProducer&) = delete; PcloudStimulusProducer& operator=(const PcloudStimulusProducer&) = delete; - PcloudStimulusProducer(PcloudStimulusProducer&&) = default; - PcloudStimulusProducer& operator=(PcloudStimulusProducer&&) = default; + PcloudStimulusProducer(PcloudStimulusProducer&&) = delete; + PcloudStimulusProducer& operator=(PcloudStimulusProducer&&) = delete; // Control methods void start() override; @@ -94,7 +94,10 @@ public: StagingBuffer assemblyBuffer; IoUringAssemblyEngine ioUringAssemblyEngine; StagingBuffer collationBuffer; + std::unique_ptr collationBufferMlockPinner; StagingBuffer averageIntensityBuffer; + std::unique_ptr + averageIntensityBufferMlockPinner; size_t tempStimulusFrameMem; StimulusFrame tempStimulusFrame; std::atomic> meshStimulusBuffer;