StagingBuff: support both Mlock & IOUring pin; Use in IoUAssmEngn

We use io_uring_register_buffers() for IoUringAssemblyEngine instead
of using mlock(). This __appears__ to have reduced CPU utilization on
the Dell laptop. Could also be that we recently upgraded total RAM
from 8GiB to 32GiB.
This commit is contained in:
2026-04-02 03:51:22 -04:00
parent 26dd686ebf
commit 1d64ce0c7e
11 changed files with 257 additions and 61 deletions
@@ -1,3 +1,5 @@
pkg_check_modules(ATTACHMENT_SUPPORT_URING REQUIRED liburing)
add_library(attachmentSupport SHARED
compute.cpp
stimulusProducer.cpp
@@ -14,12 +16,21 @@ target_include_directories(attachmentSupport PUBLIC
${CMAKE_SOURCE_DIR}/include
${CMAKE_BINARY_DIR}/include
)
target_include_directories(attachmentSupport PRIVATE
${ATTACHMENT_SUPPORT_URING_INCLUDE_DIRS}
)
target_link_libraries(attachmentSupport PUBLIC
Boost::system
Boost::log
spinscale
)
target_link_libraries(attachmentSupport PRIVATE
${ATTACHMENT_SUPPORT_URING_LIBRARIES}
)
target_link_directories(attachmentSupport PRIVATE
${ATTACHMENT_SUPPORT_URING_LIBRARY_DIRS}
)
# Verify Boost dynamic dependencies after build
add_custom_command(TARGET attachmentSupport POST_BUILD
+129 -12
View File
@@ -1,15 +1,33 @@
#include <user/stagingBuffer.h>
#include <cassert>
#include <unistd.h>
#include <cstdint>
#include <stdexcept>
#include <sys/mman.h>
#include <vector>
#include <liburing.h>
#include <user/frameAssemblyDesc.h>
namespace smo {
namespace stim_buff {
static const char* pinningMechanismToString(
StagingBuffer::PinningMechanism mechanism)
{
switch (mechanism)
{
case StagingBuffer::PinningMechanism::NONE:
return "NONE";
case StagingBuffer::PinningMechanism::MLOCK:
return "MLOCK";
case StagingBuffer::PinningMechanism::IO_URING:
return "IO_URING";
}
return "Unknown";
}
// Static defaults for io_uring
const StagingBuffer::IOEngineConstraints
StagingBuffer::IOEngineConstraints::ioUringConstraints(
@@ -169,11 +187,9 @@ StagingBuffer::StagingBuffer(
const IOEngineConstraints& inputEngineConstraints_,
const IOEngineConstraints& /*outputEngineConstraints*/,
size_t nSlots)
: buffer(nullptr, MmapDeleter(0)), bufferNBytes(0),
nSlots(nSlots), slotStrideNBytes(0),
firstSlotOffsetNBytes(0),
inputConstraints(inputEngineConstraints_),
assemblingFlag(false)
: buffer(nullptr, MmapDeleter(0)),
nSlots(nSlots),
inputConstraints(inputEngineConstraints_)
{
if (nSlots == 0)
{
@@ -202,13 +218,6 @@ assemblingFlag(false)
static_cast<uint8_t*>(mmapped), MmapDeleter(bufferNBytes));
currentNBytes.store(0);
// Lock the buffer in memory to prevent swapping
if (mlock(buffer.get(), bufferNBytes) != 0)
{
throw std::runtime_error(std::string(__func__)
+ ": StagingBuffer: mlock() failed");
}
// Calculate offset and validate invariants (helper function in .cpp)
firstSlotOffsetNBytes = StagingBuffer::calculateFirstSlotOffsetAndValidate(
buffer.get(), bufferNBytes, nSlots,
@@ -232,5 +241,113 @@ assemblingFlag(false)
std::move(slots));
}
StagingBuffer::~StagingBuffer()
{
assert(!currentlyPinned);
}
StagingBuffer::Pinner::Pinner(StagingBuffer& parent_)
: parent(parent_)
{}
void StagingBuffer::assertUnpinnedAndMarkPinned(PinningMechanism mechanism)
{
if (currentlyPinned)
{
throw std::runtime_error(
std::string(__func__) + ": StagingBuffer already pinned with "
+ pinningMechanismToString(currentPinningMechanism));
}
currentlyPinned = true;
currentPinningMechanism = mechanism;
}
std::unique_ptr<StagingBuffer::MlockPinner> StagingBuffer::makeMlockPinner()
{
return std::make_unique<MlockPinner>(*this);
}
std::unique_ptr<StagingBuffer::IoUringPinner> StagingBuffer::makeIoUringPinner(
struct io_uring* ring)
{
return std::make_unique<IoUringPinner>(*this, ring);
}
StagingBuffer::MlockPinner::MlockPinner(StagingBuffer& parent_)
: Pinner(parent_)
{
if (!parent.buffer || parent.bufferNBytes == 0)
{
throw std::runtime_error(std::string(__func__)
+ ": Cannot mlock an uninitialized StagingBuffer");
}
parent.assertUnpinnedAndMarkPinned(PinningMechanism::MLOCK);
if (mlock(parent.buffer.get(), parent.bufferNBytes) != 0)
{
parent.currentlyPinned = false;
parent.currentPinningMechanism = PinningMechanism::NONE;
throw std::runtime_error(std::string(__func__)
+ ": mlock() failed");
}
}
StagingBuffer::MlockPinner::~MlockPinner()
{
assert(parent.currentlyPinned);
assert(parent.currentPinningMechanism == PinningMechanism::MLOCK);
int ret = munlock(parent.buffer.get(), parent.bufferNBytes);
assert(ret == 0);
(void)ret;
parent.currentlyPinned = false;
parent.currentPinningMechanism = PinningMechanism::NONE;
}
StagingBuffer::IoUringPinner::IoUringPinner(
StagingBuffer& parent_, struct io_uring* ring_)
: Pinner(parent_), ring(ring_)
{
if (!ring)
{
throw std::runtime_error(std::string(__func__)
+ ": io_uring ring pointer is null");
}
if (!parent.buffer || parent.bufferNBytes == 0)
{
throw std::runtime_error(std::string(__func__)
+ ": Cannot register an uninitialized StagingBuffer");
}
parent.assertUnpinnedAndMarkPinned(PinningMechanism::IO_URING);
struct iovec iov = parent.getIoUringRegisterIoVec();
int ret = io_uring_register_buffers(ring, &iov, 1);
if (ret < 0)
{
parent.currentlyPinned = false;
parent.currentPinningMechanism = PinningMechanism::NONE;
throw std::runtime_error(std::string(__func__)
+ ": io_uring_register_buffers failed");
}
}
StagingBuffer::IoUringPinner::~IoUringPinner()
{
assert(parent.currentlyPinned);
assert(parent.currentPinningMechanism == PinningMechanism::IO_URING);
int ret = io_uring_unregister_buffers(ring);
assert(ret == 0);
(void)ret;
parent.currentlyPinned = false;
parent.currentPinningMechanism = PinningMechanism::NONE;
}
} // namespace stim_buff
} // namespace smo
+75 -8
View File
@@ -15,6 +15,14 @@ namespace stim_buff {
// Forward declaration
class FrameAssemblyDesc;
} // namespace stim_buff
} // namespace smo
struct io_uring;
namespace smo {
namespace stim_buff {
/**
* StagingBuffer manages a large buffer to guide io_uring in assembling some
* number of Livox Avia pcloud UDP dgrams into a single stim frame.
@@ -28,6 +36,13 @@ class FrameAssemblyDesc;
class StagingBuffer
{
public:
enum class PinningMechanism
{
NONE,
MLOCK,
IO_URING
};
class IOEngineConstraints
{
public:
@@ -83,7 +98,7 @@ public:
const IOEngineConstraints& inputEngineConstraints,
const IOEngineConstraints& outputEngineConstraints,
size_t nSlots);
~StagingBuffer() = default;
~StagingBuffer();
// Non-copyable, movable
StagingBuffer(const StagingBuffer&) = delete;
@@ -91,6 +106,50 @@ public:
StagingBuffer(StagingBuffer&&) = default;
StagingBuffer& operator=(StagingBuffer&&) = default;
class Pinner
{
public:
Pinner(const Pinner&) = delete;
Pinner& operator=(const Pinner&) = delete;
Pinner(Pinner&&) = delete;
Pinner& operator=(Pinner&&) = delete;
protected:
explicit Pinner(StagingBuffer& parent_);
~Pinner() = default;
StagingBuffer& parent;
};
class MlockPinner
: public Pinner
{
public:
explicit MlockPinner(StagingBuffer& parent);
~MlockPinner();
MlockPinner(const MlockPinner&) = delete;
MlockPinner& operator=(const MlockPinner&) = delete;
MlockPinner(MlockPinner&&) = delete;
MlockPinner& operator=(MlockPinner&&) = delete;
};
class IoUringPinner
: public Pinner
{
public:
IoUringPinner(StagingBuffer& parent, struct io_uring* ring);
~IoUringPinner();
IoUringPinner(const IoUringPinner&) = delete;
IoUringPinner& operator=(const IoUringPinner&) = delete;
IoUringPinner(IoUringPinner&&) = delete;
IoUringPinner& operator=(IoUringPinner&&) = delete;
private:
struct io_uring* ring;
};
public:
/** EXPLANATION:
* Returns an input-engine-agnostic descriptor describing per-frame packet
@@ -104,6 +163,9 @@ public:
void startAssembly() { assemblingFlag.store(true); }
void stopAssembly() { assemblingFlag.store(false); }
std::unique_ptr<MlockPinner> makeMlockPinner();
std::unique_ptr<IoUringPinner> makeIoUringPinner(struct io_uring* ring);
/** EXPLANATION:
* Returns an iovec for io_uring registration.
* The buffer is mmap()-allocated and suitable for IORING_REGISTER_BUFFERS.
@@ -144,6 +206,7 @@ public:
private:
void computeSlotStrideAndBufferSize();
void assertUnpinnedAndMarkPinned(PinningMechanism mechanism);
static size_t calculateFirstSlotOffsetAndValidate(
uint8_t* buffer,
size_t bufferNBytes,
@@ -163,7 +226,6 @@ private:
{
if (ptr != nullptr && size > 0)
{
munlock(ptr, size);
munmap(ptr, size);
}
}
@@ -173,14 +235,14 @@ private:
// Using unique_ptr<uint8_t, MmapDeleter> instead of array syntax
// since we have a custom deleter that knows the size
std::unique_ptr<uint8_t, MmapDeleter> buffer;
size_t bufferNBytes;
size_t bufferNBytes = 0;
// Layout/invariants
size_t nSlots;
size_t nSlots = 0;
public:
size_t slotStrideNBytes;
size_t firstSlotOffsetNBytes; // offset from buffer start to first slot
size_t slotStrideNBytes = 0;
size_t firstSlotOffsetNBytes = 0; // offset from buffer start to first slot
private:
IOEngineConstraints inputConstraints;
@@ -189,8 +251,13 @@ private:
mutable std::shared_ptr<FrameAssemblyDesc> frameDesc;
// Current state
std::atomic<size_t> currentNBytes;
std::atomic<bool> assemblingFlag;
std::atomic<size_t> currentNBytes{0};
std::atomic<bool> assemblingFlag{false};
bool currentlyPinned = false;
PinningMechanism currentPinningMechanism = PinningMechanism::NONE;
friend class MlockPinner;
friend class IoUringPinner;
};
} // namespace stim_buff
+5 -3
View File
@@ -44,6 +44,7 @@ public:
inputEngineConstraints,
outputEngineConstraints,
static_cast<size_t>(histbuffMs / CONFIG_STIMBUFF_FRAME_PERIOD_MS)),
stagingBufferPinner(stagingBuffer.makeMlockPinner()),
ringBuffer(
static_cast<std::shared_ptr<FrameAssemblyDesc>>(stagingBuffer),
callbacks, flags)
@@ -51,17 +52,18 @@ public:
virtual ~StimulusBuffer() = default;
// Non-copyable, movable
// Non-copyable, non-movable: pinner lifetime is tied to this instance
StimulusBuffer(const StimulusBuffer&) = delete;
StimulusBuffer& operator=(const StimulusBuffer&) = delete;
StimulusBuffer(StimulusBuffer&&) = default;
StimulusBuffer& operator=(StimulusBuffer&&) = default;
StimulusBuffer(StimulusBuffer&&) = delete;
StimulusBuffer& operator=(StimulusBuffer&&) = delete;
public:
StimulusProducer& parent;
std::shared_ptr<device::DeviceAttachmentSpec> deviceAttachmentSpec;
int histbuffMs;
StagingBuffer stagingBuffer;
std::unique_ptr<StagingBuffer::MlockPinner> stagingBufferPinner;
SpMcRingBuffer ringBuffer;
};
@@ -27,8 +27,6 @@
#include "pcloudStimulusProducer.h"
#include "livoxGen1.h"
// #define REGISTER_IOURING_BUFFERS
namespace smo {
namespace stim_buff {
@@ -116,10 +114,6 @@ bool IoUringAssemblyEngine::setup()
assembledSlotsTracker[i].ioVec.iov_len = slot.nBytes;
}
// Declare iovec early to avoid goto crossing initialization
#ifdef REGISTER_IOURING_BUFFERS
struct iovec iov;
#endif
int ret;
/** EXPLANATION:
@@ -134,23 +128,27 @@ bool IoUringAssemblyEngine::setup()
if (ret < 0)
{ goto cleanup; }
#ifdef REGISTER_IOURING_BUFFERS
// Register staging buffer with io_uring for DMA-apt I/O
iov = parent.assemblyBuffer.getIoUringRegisterIoVec();
ret = io_uring_register_buffers(&ring, &iov, 1);
if (ret < 0)
{ goto cleanup_ring; }
#endif
try
{
if (assemblyBufferIoUringPinner)
{
throw std::runtime_error(std::string(__func__)
+ ": assemblyBufferIoUringPinner already exists");
}
assemblyBufferIoUringPinner =
parent.assemblyBuffer.makeIoUringPinner(&ring);
}
catch (const std::exception&)
{
goto cleanup_ring;
}
// Create eventfd for CQE notifications (used with boost's unified loop)
eventfdFd = eventfd(0, EFD_NONBLOCK);
if (eventfdFd < 0)
{
#ifdef REGISTER_IOURING_BUFFERS
goto cleanup_buffers;
#else
goto cleanup_ring;
#endif
}
// Register eventfd with io_uring
@@ -164,11 +162,8 @@ bool IoUringAssemblyEngine::setup()
cleanup_eventfd:
close(eventfdFd);
eventfdFd = -1;
#ifdef REGISTER_IOURING_BUFFERS
cleanup_buffers:
io_uring_unregister_buffers(&ring);
#endif
cleanup_ring:
assemblyBufferIoUringPinner.reset();
io_uring_queue_exit(&ring);
cleanup:
return false;
@@ -217,9 +212,7 @@ void IoUringAssemblyEngine::finalize()
if (wasAcceptingRequests)
{
#ifdef REGISTER_IOURING_BUFFERS
io_uring_unregister_buffers(&ring);
#endif
assemblyBufferIoUringPinner.reset();
io_uring_queue_exit(&ring);
}
@@ -21,6 +21,7 @@
#include <spinscale/callback.h>
#include <spinscale/spinLock.h>
#include <user/frameAssemblyDesc.h>
#include <user/stagingBuffer.h>
#define IOURINGASSM_ENGN_FRAME_ASSEM_TIMEOUT_MS \
(CONFIG_STIMBUFF_FRAME_PERIOD_MS / 2)
@@ -73,6 +74,7 @@ private:
uint64_t eventfd_value; // Buffer for async_read_some
// Point cloud data socket descriptor
std::shared_ptr<boost::asio::posix::stream_descriptor> pcloudDataFdDesc;
std::unique_ptr<StagingBuffer::IoUringPinner> assemblyBufferIoUringPinner;
// Stall detection timer
boost::asio::deadline_timer stallTimer;
@@ -122,4 +124,3 @@ public:
#endif // _LIVOX_GEN1_IOURING_ASSEMBLY_ENGINE_H
+3 -4
View File
@@ -34,15 +34,14 @@ public:
~MeshStimulusBuffer() = default;
// Non-copyable, movable
// Non-copyable, non-movable: inherited pinner lifetime is instance-bound
MeshStimulusBuffer(const MeshStimulusBuffer&) = delete;
MeshStimulusBuffer& operator=(const MeshStimulusBuffer&) = delete;
MeshStimulusBuffer(MeshStimulusBuffer&&) = default;
MeshStimulusBuffer& operator=(MeshStimulusBuffer&&) = default;
MeshStimulusBuffer(MeshStimulusBuffer&&) = delete;
MeshStimulusBuffer& operator=(MeshStimulusBuffer&&) = delete;
};
} // namespace stim_buff
} // namespace smo
#endif // _LIVOX_GEN1_MESH_STIMULUS_BUFFER_H
@@ -73,11 +73,11 @@ public:
~PcloudAmbienceStimulusBuffer() = default;
// Non-copyable, movable
// Non-copyable, non-movable: inherited pinner lifetime is instance-bound
PcloudAmbienceStimulusBuffer(const PcloudAmbienceStimulusBuffer&) = delete;
PcloudAmbienceStimulusBuffer& operator=(const PcloudAmbienceStimulusBuffer&) = delete;
PcloudAmbienceStimulusBuffer(PcloudAmbienceStimulusBuffer&&) = default;
PcloudAmbienceStimulusBuffer& operator=(PcloudAmbienceStimulusBuffer&&) = default;
PcloudAmbienceStimulusBuffer(PcloudAmbienceStimulusBuffer&&) = delete;
PcloudAmbienceStimulusBuffer& operator=(PcloudAmbienceStimulusBuffer&&) = delete;
public:
uint32_t postrinInterestPercentage;
@@ -35,15 +35,15 @@ public:
~PcloudIntensityStimulusBuffer() = default;
// Non-copyable, movable
// Non-copyable, non-movable: inherited pinner lifetime is instance-bound
PcloudIntensityStimulusBuffer(
const PcloudIntensityStimulusBuffer&) = delete;
PcloudIntensityStimulusBuffer& operator=(
const PcloudIntensityStimulusBuffer&) = delete;
PcloudIntensityStimulusBuffer(
PcloudIntensityStimulusBuffer&&) = default;
PcloudIntensityStimulusBuffer&&) = delete;
PcloudIntensityStimulusBuffer& operator=(
PcloudIntensityStimulusBuffer&&) = default;
PcloudIntensityStimulusBuffer&&) = delete;
};
} // namespace stim_buff
@@ -95,10 +95,13 @@ collationBuffer(
StagingBuffer::IOEngineConstraints::openClInputConstraints,
StagingBuffer::IOEngineConstraints::openClInputConstraints,
nDgramsPerStagingBufferFrame),
collationBufferMlockPinner(collationBuffer.makeMlockPinner()),
averageIntensityBuffer(
openClAverageIntensityConstraints,
openClAverageIntensityConstraints,
nDgramsPerStagingBufferFrame),
averageIntensityBufferMlockPinner(
averageIntensityBuffer.makeMlockPinner()),
tempStimulusFrameMem(0),
tempStimulusFrame(
FrameAssemblyDesc::SlotDesc{
@@ -52,11 +52,11 @@ public:
~PcloudStimulusProducer() = default;
// Non-copyable, movable
// Non-copyable, non-movable: pinners hold buffer instance references
PcloudStimulusProducer(const PcloudStimulusProducer&) = delete;
PcloudStimulusProducer& operator=(const PcloudStimulusProducer&) = delete;
PcloudStimulusProducer(PcloudStimulusProducer&&) = default;
PcloudStimulusProducer& operator=(PcloudStimulusProducer&&) = default;
PcloudStimulusProducer(PcloudStimulusProducer&&) = delete;
PcloudStimulusProducer& operator=(PcloudStimulusProducer&&) = delete;
// Control methods
void start() override;
@@ -94,7 +94,10 @@ public:
StagingBuffer assemblyBuffer;
IoUringAssemblyEngine ioUringAssemblyEngine;
StagingBuffer collationBuffer;
std::unique_ptr<StagingBuffer::MlockPinner> collationBufferMlockPinner;
StagingBuffer averageIntensityBuffer;
std::unique_ptr<StagingBuffer::MlockPinner>
averageIntensityBufferMlockPinner;
size_t tempStimulusFrameMem;
StimulusFrame tempStimulusFrame;
std::atomic<std::shared_ptr<MeshStimulusBuffer>> meshStimulusBuffer;