856 lines
25 KiB
C++
856 lines
25 KiB
C++
#include <boostAsioLinkageFix.h>
|
|
#include <config.h>
|
|
#include <opts.h>
|
|
#include <algorithm>
|
|
#include <iostream>
|
|
#include <iomanip>
|
|
#include <cstring>
|
|
#include <stdexcept>
|
|
#include <functional>
|
|
#include <random>
|
|
#include <sys/socket.h>
|
|
#include <sys/eventfd.h>
|
|
#include <sys/uio.h>
|
|
#include <sys/poll.h>
|
|
#include <unistd.h>
|
|
#include <errno.h>
|
|
#include <boost/system/error_code.hpp>
|
|
#include <livoxProto1/device.h>
|
|
#include <livoxProto1/livoxProto1.h>
|
|
#include <asynchronousContinuation.h>
|
|
#include <asynchronousLoop.h>
|
|
#include <asynchronousBridge.h>
|
|
#include <callback.h>
|
|
#include <callableTracer.h>
|
|
#include <spinLock.h>
|
|
#include "ioUringAssemblyEngine.h"
|
|
#include "pcloudStimulusProducer.h"
|
|
#include "livoxGen1.h"
|
|
|
|
// #define REGISTER_IOURING_BUFFERS
|
|
|
|
namespace smo {
|
|
namespace stim_buff {
|
|
|
|
inline LivoxProto1DllState& getLivoxProto1State() { return livoxProto1; }
|
|
|
|
struct DummyLivoxEthHeader
|
|
{
|
|
DummyLivoxEthHeader()
|
|
: version(0xFF), slot(0xFF), id(0xFF), rsvd(0xFF)
|
|
{}
|
|
|
|
static bool isDummy(const DummyLivoxEthHeader& hdr)
|
|
{
|
|
return hdr.version == 0xFF || hdr.slot == 0xFF || hdr.id == 0xFF
|
|
|| hdr.rsvd == 0xFF;
|
|
}
|
|
|
|
static bool isValid(const DummyLivoxEthHeader& hdr)
|
|
{ return !isDummy(hdr); }
|
|
|
|
uint8_t version, slot, id, rsvd;
|
|
uint32_t err_code;
|
|
uint8_t timestamp_type, data_type;
|
|
uint8_t timestamp[8];
|
|
};
|
|
|
|
IoUringAssemblyEngine::IoUringAssemblyEngine(
|
|
PcloudStimulusProducer& parent_, size_t nDgramsPerStagingBufferFrame_)
|
|
: parent(parent_),
|
|
frameAssemblyDesc(nullptr), ring{},
|
|
eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0),
|
|
stallTimer(parent_.device->componentThread->getIoService()),
|
|
shouldAcceptRequests(false),
|
|
nDgramsPerStagingBufferFrame(nDgramsPerStagingBufferFrame_),
|
|
assembledSlotsTracker(nDgramsPerStagingBufferFrame_),
|
|
randomDevice(), randomGenerator(randomDevice())
|
|
{}
|
|
|
|
bool IoUringAssemblyEngine::setup()
|
|
{
|
|
// Defensive check to prevent double-calling
|
|
{
|
|
SpinLock::Guard lock(shouldAcceptRequestsLock);
|
|
if (shouldAcceptRequests)
|
|
{
|
|
throw std::runtime_error(std::string(__func__) + ": setup() called "
|
|
"while already set up");
|
|
}
|
|
}
|
|
|
|
// Get FrameAssemblyDesc from staging buffer
|
|
frameAssemblyDesc = static_cast<std::shared_ptr<FrameAssemblyDesc>>(
|
|
parent.assemblyBuffer);
|
|
|
|
if (!frameAssemblyDesc || frameAssemblyDesc->slots.empty())
|
|
{ return false; }
|
|
|
|
// Get point cloud data socket descriptor from UdpCommandDemuxer
|
|
auto& livoxState = getLivoxProto1State();
|
|
if (!livoxState.livoxProto1_getPcloudDataFdDesc)
|
|
{ return false; }
|
|
pcloudDataFdDesc = (*livoxState.livoxProto1_getPcloudDataFdDesc)();
|
|
if (!pcloudDataFdDesc)
|
|
{ return false; }
|
|
|
|
// Get UDP socket file descriptor
|
|
int udpFd = pcloudDataFdDesc->native_handle();
|
|
if (udpFd < 0)
|
|
{ return false; }
|
|
|
|
// Set up iovecs for each slot
|
|
for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i)
|
|
{
|
|
assembledSlotsTracker[i].assembled = false;
|
|
assembledSlotsTracker[i].msgHdr = {};
|
|
assembledSlotsTracker[i].msgHdr.msg_iov =
|
|
&assembledSlotsTracker[i].ioVec;
|
|
assembledSlotsTracker[i].msgHdr.msg_iovlen = 1;
|
|
}
|
|
|
|
for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i)
|
|
{
|
|
const auto& slot = frameAssemblyDesc->slots[i];
|
|
assembledSlotsTracker[i].ioVec.iov_base = slot.vaddr;
|
|
assembledSlotsTracker[i].ioVec.iov_len = slot.nBytes;
|
|
}
|
|
|
|
// Declare iovec early to avoid goto crossing initialization
|
|
#ifdef REGISTER_IOURING_BUFFERS
|
|
struct iovec iov;
|
|
#endif
|
|
int ret;
|
|
|
|
/** EXPLANATION:
|
|
* Initialize io_uring ring - allocate SQEs and CQEs for one frame assembly
|
|
* One SQE per slot (one datagram per slot), plus one extra for cancel
|
|
* operations, since io_uring_prep_cancel() requires a valid SQE. So we
|
|
* alloc 1 extra SQE to guarantee that we will always have an available SQE
|
|
* for cancel operations.
|
|
*/
|
|
ret = io_uring_queue_init(
|
|
static_cast<unsigned int>(frameAssemblyDesc->numSlots + 1), &ring, 0);
|
|
if (ret < 0)
|
|
{ goto cleanup; }
|
|
|
|
#ifdef REGISTER_IOURING_BUFFERS
|
|
// Register staging buffer with io_uring for DMA-apt I/O
|
|
iov = parent.assemblyBuffer.getIoUringRegisterIoVec();
|
|
ret = io_uring_register_buffers(&ring, &iov, 1);
|
|
if (ret < 0)
|
|
{ goto cleanup_ring; }
|
|
#endif
|
|
|
|
// Create eventfd for CQE notifications (used with boost's unified loop)
|
|
eventfdFd = eventfd(0, EFD_NONBLOCK);
|
|
if (eventfdFd < 0)
|
|
{
|
|
#ifdef REGISTER_IOURING_BUFFERS
|
|
goto cleanup_buffers;
|
|
#else
|
|
goto cleanup_ring;
|
|
#endif
|
|
}
|
|
|
|
// Register eventfd with io_uring
|
|
ret = io_uring_register_eventfd(&ring, eventfdFd);
|
|
if (ret < 0)
|
|
{ goto cleanup_eventfd; }
|
|
|
|
shouldAcceptRequests = true;
|
|
return true;
|
|
|
|
cleanup_eventfd:
|
|
close(eventfdFd);
|
|
eventfdFd = -1;
|
|
#ifdef REGISTER_IOURING_BUFFERS
|
|
cleanup_buffers:
|
|
io_uring_unregister_buffers(&ring);
|
|
#endif
|
|
cleanup_ring:
|
|
io_uring_queue_exit(&ring);
|
|
cleanup:
|
|
return false;
|
|
}
|
|
|
|
void IoUringAssemblyEngine::finalize()
|
|
{
|
|
bool wasAcceptingRequests = stop();
|
|
|
|
if (eventfdFd >= 0)
|
|
{
|
|
io_uring_unregister_eventfd(&ring);
|
|
close(eventfdFd);
|
|
eventfdFd = -1;
|
|
}
|
|
|
|
if (wasAcceptingRequests)
|
|
{
|
|
#ifdef REGISTER_IOURING_BUFFERS
|
|
io_uring_unregister_buffers(&ring);
|
|
#endif
|
|
io_uring_queue_exit(&ring);
|
|
}
|
|
|
|
// Reset state to allow setup() to be called again
|
|
frameAssemblyDesc = nullptr;
|
|
}
|
|
|
|
void IoUringAssemblyEngine::resetAndAssembleFrame(
|
|
resetAndAssembleFrameCbFn onCqeReady)
|
|
{
|
|
if (!onCqeReady)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": onCqeReady callback is invalid");
|
|
}
|
|
|
|
if (!shouldAcceptRequests)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": engine is not accepting requests");
|
|
}
|
|
|
|
// eventfdDesc should not be valid when resetAndAssembleFrame is called
|
|
if (eventfdDesc)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": eventfdDesc is already set");
|
|
}
|
|
|
|
// Store the callback for re-arming
|
|
onCqeReadyCallback = std::move(onCqeReady);
|
|
|
|
// Reset all assembled slots tracker to false
|
|
for (auto& slotDesc : assembledSlotsTracker) {
|
|
slotDesc.assembled = false;
|
|
}
|
|
|
|
/** EXPLANATION:
|
|
* Flush eventfd state: poll and read any pending events before creating
|
|
* descriptor.
|
|
* Use poll() to check if data is available (non-blocking check).
|
|
* If data is available, read it to flush.
|
|
*/
|
|
struct pollfd pfd;
|
|
pfd.fd = eventfdFd;
|
|
pfd.events = POLLIN;
|
|
pfd.revents = 0;
|
|
int poll_ret = poll(&pfd, 1, 0); // Timeout 0 = non-blocking
|
|
if (poll_ret > 0 && (pfd.revents & POLLIN))
|
|
{
|
|
uint64_t discard;
|
|
ssize_t ret = read(eventfdFd, &discard, sizeof(discard));
|
|
(void)ret; // Ignore errors - just trying to flush
|
|
}
|
|
|
|
eventfdDesc = std::make_unique<boost::asio::posix::stream_descriptor>(
|
|
parent.device->componentThread->getIoService(), eventfdFd);
|
|
|
|
if (!eventfdDesc)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": failed to create eventfd stream descriptor");
|
|
}
|
|
|
|
// Get UDP socket file descriptor
|
|
int udpFd = pcloudDataFdDesc->native_handle();
|
|
if (udpFd < 0)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": invalid UDP socket file descriptor");
|
|
}
|
|
|
|
// Prepare SQEs for each slot in the frame
|
|
struct io_uring_sqe *sqe;
|
|
for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i)
|
|
{
|
|
sqe = io_uring_get_sqe(&ring);
|
|
if (!sqe)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": failed to get SQE for slot " + std::to_string(i));
|
|
}
|
|
|
|
io_uring_prep_recvmsg(sqe, udpFd, &assembledSlotsTracker[i].msgHdr, 0);
|
|
// Set user_data to slot index for tracking
|
|
io_uring_sqe_set_data(sqe, reinterpret_cast<void*>(i));
|
|
}
|
|
|
|
// Submit all SQEs
|
|
int ret = io_uring_submit(&ring);
|
|
if (ret < 0)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": io_uring_submit failed: " + std::strerror(errno)
|
|
+ " (errno=" + std::to_string(errno) + ")");
|
|
}
|
|
|
|
// Start listening for CQE notifications on eventfd
|
|
eventfdDesc->async_read_some(
|
|
boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)),
|
|
std::bind(
|
|
&IoUringAssemblyEngine::onEventfdRead, this,
|
|
std::placeholders::_1,
|
|
std::placeholders::_2));
|
|
}
|
|
|
|
bool IoUringAssemblyEngine::stop()
|
|
{
|
|
// Acquire and release lock tightly around setting the flag
|
|
SpinLock::Guard lock(shouldAcceptRequestsLock);
|
|
bool wasAcceptingRequests = shouldAcceptRequests;
|
|
shouldAcceptRequests = false;
|
|
return wasAcceptingRequests;
|
|
}
|
|
|
|
void IoUringAssemblyEngine::assemblyCycleComplete()
|
|
{
|
|
// Cancel in-flight stall timeout timer
|
|
stallTimer.cancel();
|
|
onCqeReadyCallback = std::move([](void *, int){});
|
|
|
|
if (frameAssemblyDesc)
|
|
{
|
|
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
|
|
if (!sqe)
|
|
{
|
|
std::cerr << __func__ << ": failed to get SQE for cancel op. "
|
|
<< "Continuing cleanup without cancelling.\n";
|
|
|
|
goto cleanup_eventfd;
|
|
}
|
|
|
|
/* Cancel all in-flight operations on our ring
|
|
* using IORING_ASYNC_CANCEL_ANY. Identify the CQE for the cancel
|
|
* op as numSlots since numSlots is an invalid slot index for a
|
|
* real slot.
|
|
*/
|
|
io_uring_prep_cancel(
|
|
sqe, reinterpret_cast<void*>(frameAssemblyDesc->numSlots),
|
|
IORING_ASYNC_CANCEL_ANY);
|
|
|
|
io_uring_submit(&ring);
|
|
|
|
/* Wait for cancellation to complete. According to the man page,
|
|
* cancellation is synchronous and a CQE is guaranteed to be
|
|
* generated by the time submission returns.
|
|
*/
|
|
struct io_uring_cqe *cqe;
|
|
bool sawCancelCqe = false;
|
|
while (io_uring_peek_cqe(&ring, &cqe) == 0)
|
|
{
|
|
// Call seen() on all CQEs for completeness/correctness.
|
|
io_uring_cqe_seen(&ring, cqe);
|
|
void *user_data = io_uring_cqe_get_data(cqe);
|
|
if (user_data == reinterpret_cast<void*>(
|
|
frameAssemblyDesc->numSlots))
|
|
{
|
|
sawCancelCqe = true;
|
|
}
|
|
}
|
|
|
|
if (!sawCancelCqe && smoHooksPtr->OptionParser_getOptions().verbose) {
|
|
std::cerr << __func__ << ": no CQE seen for cancel operation\n";
|
|
}
|
|
}
|
|
|
|
cleanup_eventfd:
|
|
if (eventfdDesc)
|
|
{
|
|
/** EXPLANATION:
|
|
* The goal here is to ensure that our io_service's event loop will not
|
|
* get any events from the eventfd after we've called
|
|
* assemblyCycleComplete(). So we completely deinitialize the eventfd
|
|
* descriptor.
|
|
*
|
|
* But we still want to reuse the underlying eventfd file descriptor,
|
|
* itself in the next resetAndAssembleFrame() cycle, so we call
|
|
* release() instead of reset() to ensure that the underlying fd
|
|
* is not closed.
|
|
*
|
|
* However, we need to close the descriptor's association with the
|
|
* io_service before releasing it, otherwise Boost.Asio will complain
|
|
* when we try to create a new descriptor with the same fd.
|
|
*/
|
|
/** CAVEAT:
|
|
* There's a rare but real race condition here where the eventfd gets an
|
|
* event signaled on it, and while boost is internally processing that
|
|
* event to enqeue our handler, we call cancel() and release() here.
|
|
* If boost internally has locking on the stream_descriptor object,
|
|
* this should be fine. But just in case it doesn't, I'm just
|
|
* documenting that possibility here.
|
|
*
|
|
* There's nothing we can really do about it except know that it would
|
|
* be very rarely happen; and that we can't do anything about it short
|
|
* of modifying the boost.Asio code.
|
|
*/
|
|
eventfdDesc->cancel();
|
|
eventfdDesc->release();
|
|
/* Destroy the descriptor object (now that it's unregistered, destroying
|
|
* it won't close the fd since release() transferred ownership back)
|
|
*/
|
|
eventfdDesc.reset();
|
|
}
|
|
}
|
|
|
|
// Continuation class for assembleFrameReq
|
|
class IoUringAssemblyEngine::AssembleFrameReq
|
|
: public PostedAsynchronousContinuation<
|
|
IoUringAssemblyEngine::assembleFrameReqCbFn>
|
|
{
|
|
public:
|
|
AssembleFrameReq(
|
|
IoUringAssemblyEngine& engine_,
|
|
const std::shared_ptr<ComponentThread>& caller,
|
|
Callback<IoUringAssemblyEngine::assembleFrameReqCbFn> cb)
|
|
: PostedAsynchronousContinuation<
|
|
IoUringAssemblyEngine::assembleFrameReqCbFn>(caller, cb),
|
|
engine(engine_),
|
|
loop(engine_.frameAssemblyDesc->numSlots),
|
|
timerFired(false), handlerExecuted(false)
|
|
{}
|
|
|
|
void callOriginalCallback(bool success, AsynchronousLoop loop)
|
|
{
|
|
callOriginalCb(success, loop);
|
|
}
|
|
|
|
public:
|
|
void assembleFrameReq1_posted(
|
|
std::shared_ptr<AssembleFrameReq> context)
|
|
{
|
|
SpinLock::Guard lock(engine.shouldAcceptRequestsLock);
|
|
|
|
if (!engine.shouldAcceptRequests)
|
|
{
|
|
context->callOriginalCallback(false, AsynchronousLoop(0));
|
|
return;
|
|
}
|
|
|
|
// Initialize loop with number of slots
|
|
context->loop = AsynchronousLoop(engine.frameAssemblyDesc->numSlots);
|
|
|
|
/** FIXME:
|
|
* I'm suspicious of this std::bind return object here. What if us
|
|
* setting it to null inside of stop() doesn't actually cause the
|
|
* object to be destroyed? This would cause this contin's sh_ptr's
|
|
* reference count to never reach 0, causing a memory leak.
|
|
*/
|
|
engine.resetAndAssembleFrame(
|
|
std::bind(&AssembleFrameReq::assembleFrameReq2_2,
|
|
context.get(), context,
|
|
std::placeholders::_1, std::placeholders::_2));
|
|
|
|
// Set up timeout timer for CONFIG_STIMBUFF_FRAME_PERIOD_MS/2 ms
|
|
engine.stallTimer.expires_from_now(
|
|
boost::posix_time::milliseconds(
|
|
CONFIG_STIMBUFF_FRAME_PERIOD_MS / 2));
|
|
engine.stallTimer.async_wait(
|
|
std::bind(&AssembleFrameReq::assembleFrameReq2_1,
|
|
context.get(), context,
|
|
std::placeholders::_1));
|
|
}
|
|
|
|
void assembleFrameReq2_1(
|
|
std::shared_ptr<AssembleFrameReq> context,
|
|
const boost::system::error_code& error)
|
|
{
|
|
// Check if timer was cancelled (ignore if operation_aborted)
|
|
if (error == boost::asio::error::operation_aborted) { return; }
|
|
|
|
/** EXPLANATION:
|
|
* This lock acquisition here will also cover the call to
|
|
* assembleFrameReq3 below. Because of that, it means that the
|
|
* requirement that the lock be held while accessing
|
|
* the metadata that's destroyed in stop() is satisfied.
|
|
*
|
|
* In theory though, we shouldn't need to hold the lock into
|
|
* assembleFrameReq3 below because that function doesn't really access
|
|
* any state that's destroyed in stop()? But I'm not sure, and we have
|
|
* indeed seen a SEGFAULT even in the current code with locking, so
|
|
* I'm going to hold the lock here for now.
|
|
*/
|
|
SpinLock::Guard lock(context->engine.shouldAcceptRequestsLock);
|
|
|
|
if (!context->engine.shouldAcceptRequests)
|
|
{
|
|
context->engine.assemblyCycleComplete();
|
|
context->loop.setRemainingIterationsToFailure();
|
|
context->callOriginalCallback(false, context->loop);
|
|
return;
|
|
}
|
|
|
|
// Set timer fired flag
|
|
context->timerFired.store(true);
|
|
context->assembleFrameReq3(context);
|
|
}
|
|
|
|
void assembleFrameReq2_2(
|
|
std::shared_ptr<AssembleFrameReq> context,
|
|
void *user_data, int cqe_result)
|
|
{
|
|
// NB: The lock was acquired by onEventFdRead before calling this func
|
|
if (!context->engine.shouldAcceptRequests)
|
|
{
|
|
context->engine.assemblyCycleComplete();
|
|
context->loop.setRemainingIterationsToFailure();
|
|
context->callOriginalCallback(false, context->loop);
|
|
return;
|
|
}
|
|
|
|
// Extract index from user_data and mark slot as assembled if successful
|
|
size_t index = reinterpret_cast<size_t>(user_data);
|
|
bool success = (cqe_result >= 0);
|
|
|
|
if (success && index < context->engine.assembledSlotsTracker.size()) {
|
|
context->engine.assembledSlotsTracker[index].assembled = true;
|
|
}
|
|
|
|
// Caller decides success: result >= 0 means success
|
|
if (context->loop.incrementSuccessOrFailureAndTestForCompletionDueTo(
|
|
success))
|
|
{
|
|
// Loop is complete, call oracle function
|
|
context->assembleFrameReq3(context);
|
|
}
|
|
}
|
|
|
|
void assembleFrameReq3(
|
|
std::shared_ptr<AssembleFrameReq> context
|
|
)
|
|
{
|
|
/** EXPLANATION:
|
|
* All branch paths that invoke this unifyig oracle function are
|
|
* expected to already hold the shouldAcceptRequestsLock before calling
|
|
* it.
|
|
*/
|
|
// Ensure we only execute once using atomic exchange
|
|
if (context->handlerExecuted.exchange(true)) { return; }
|
|
// Cancel the timer, stop the engine and process frame, if any.
|
|
context->engine.assemblyCycleComplete();
|
|
|
|
/** EXPLANATION:
|
|
* Timeout doesn't necessarily mean error.
|
|
*
|
|
* If we received zero dgrams from the device, that is indeed an error.
|
|
* But if we received some dgrams, but not all, that is not an error:
|
|
* it just means we didn't receive as much data as we would have liked.
|
|
*/
|
|
|
|
// Error: no slots succeeded - no data received successfully.
|
|
if (context->loop.nSucceeded.load() == 0)
|
|
{
|
|
context->callOriginalCallback(false, context->loop);
|
|
return;
|
|
}
|
|
|
|
#if 0
|
|
// Artificially create random dummy slots for testing
|
|
context->engine.randomDummySlotFiller(context->loop);
|
|
#endif
|
|
|
|
// Fill un-assembled slots with dummy datagrams
|
|
context->engine.fillUnAssembledSlotsWithDummyDgrams();
|
|
|
|
#if 0
|
|
// Print first 4 bytes of each slot (whether assembled or not)
|
|
if (context->engine.frameAssemblyDesc)
|
|
{
|
|
for (size_t i = 0; i < context->engine.frameAssemblyDesc->numSlots; ++i) {
|
|
context->engine.printSlotBytes(i, 4);
|
|
}
|
|
}
|
|
#endif
|
|
|
|
if (context->loop.nSucceeded.load() >= context->loop.nTotal)
|
|
{
|
|
// Success: all or more slots succeeded
|
|
if (context->loop.nSucceeded.load() > context->loop.nTotal)
|
|
{
|
|
std::cerr << __func__ << ": nSucceeded > nTotal: succ ("
|
|
<< context->loop.nSucceeded.load()
|
|
<< ") > nTotal (" << context->loop.nTotal << ")\n";
|
|
}
|
|
|
|
context->callOriginalCallback(true, context->loop);
|
|
return;
|
|
}
|
|
|
|
if (context->loop.nSucceeded.load() < context->loop.nTotal)
|
|
{
|
|
// Success: some slots succeeded (less than total)
|
|
// Note: dummy fill for un-assembled slots will be implemented later
|
|
context->callOriginalCallback(true, context->loop);
|
|
return;
|
|
}
|
|
|
|
if (smoHooksPtr->OptionParser_getOptions().verbose)
|
|
{
|
|
std::cerr << __func__ << ": Invalid state: nSucceeded ("
|
|
<< context->loop.nSucceeded.load()
|
|
<< ") < nTotal (" << context->loop.nTotal << ")" << std::endl;
|
|
}
|
|
|
|
context->callOriginalCallback(false, context->loop);
|
|
return;
|
|
}
|
|
|
|
public:
|
|
IoUringAssemblyEngine& engine;
|
|
AsynchronousLoop loop;
|
|
std::atomic<bool> timerFired;
|
|
std::atomic<bool> handlerExecuted;
|
|
};
|
|
|
|
void IoUringAssemblyEngine::assembleFrameReq(
|
|
Callback<assembleFrameReqCbFn> cb)
|
|
{
|
|
{
|
|
SpinLock::Guard lock(shouldAcceptRequestsLock);
|
|
if (!shouldAcceptRequests)
|
|
{
|
|
cb.callbackFn(false, AsynchronousLoop(0));
|
|
return;
|
|
}
|
|
}
|
|
|
|
const auto& caller = smoHooksPtr->ComponentThread_getSelf();
|
|
auto request = std::make_shared<AssembleFrameReq>(
|
|
*this, caller, std::move(cb));
|
|
|
|
parent.device->componentThread->getIoService().post(
|
|
STC(std::bind(
|
|
&AssembleFrameReq::assembleFrameReq1_posted,
|
|
request.get(), request)));
|
|
}
|
|
|
|
void IoUringAssemblyEngine::onEventfdRead(
|
|
const boost::system::error_code& error,
|
|
std::size_t bytes_transferred)
|
|
{
|
|
(void)bytes_transferred;
|
|
|
|
// Ignore cancellation errors
|
|
if (error == boost::asio::error::operation_aborted) { return; }
|
|
|
|
/** EXPLANATION:
|
|
* This lock should be held throughout this method to ensure that the
|
|
* IoUringAssemblyEngine's per-assembly state isn't destroyed while this
|
|
* handler is running.
|
|
*/
|
|
SpinLock::Guard lock(shouldAcceptRequestsLock);
|
|
/** EXPLANATION:
|
|
* You'd think we should put check for shouldAcceptRequests here and
|
|
* `return` here if !shouldAcceptRequests, but we shouldn't because
|
|
* that would mean that we can't invoke the caller's callback. This would
|
|
* make the caller freeze forever.
|
|
*
|
|
* Instead we just let the onCqeReadyCallback check for
|
|
* shouldAcceptRequests. That way the onCqeReadyCallback can actually
|
|
* invoke the caller's callback, as it should. We have no knowledge of the
|
|
* caller's callback because we don't have access to the caller's
|
|
* continuation object. The onCqeReadyCallback does have access to it,
|
|
* so we leave that up to it.
|
|
*/
|
|
|
|
/** FIXME:
|
|
* It may be necessary to specifically check for and handle the cancel op
|
|
* CQE here. I'm not sure as yet though, but I'll highlight it here for now.
|
|
*/
|
|
|
|
// Process all available CQEs and call callback for each one
|
|
struct io_uring_cqe *cqe;
|
|
while (io_uring_peek_cqe(&ring, &cqe) == 0)
|
|
{
|
|
// Get user_data from the CQE
|
|
void* user_data = io_uring_cqe_get_data(cqe);
|
|
// Get result from the CQE
|
|
int cqe_result = cqe->res;
|
|
// Mark the CQE as seen
|
|
io_uring_cqe_seen(&ring, cqe);
|
|
|
|
/** EXPLANATION:
|
|
* Call the user-provided callback for this CQE with its user_data and
|
|
* result.
|
|
*
|
|
* 1. Notice that we call the caller's cb *after* marking the CQE as
|
|
* seen. We may later need to change this if the caller needs
|
|
* information from the CQE before it is marked as seen.
|
|
*
|
|
* 2. Notice that we do not check for or filter out the cancel op CQE
|
|
* here. The caller's handler will be able to see the cancel op CQE
|
|
* because of this.
|
|
*/
|
|
if (onCqeReadyCallback) {
|
|
onCqeReadyCallback(user_data, cqe_result);
|
|
}
|
|
}
|
|
|
|
/** EXPLANATION:
|
|
* But we do put a `return` here because we know that at this point, the
|
|
* caller's callback has already been invoked.
|
|
*/
|
|
if (!shouldAcceptRequests
|
|
|| eventfdDesc == nullptr || !eventfdDesc->is_open())
|
|
{
|
|
return;
|
|
}
|
|
|
|
// Re-arm the eventfd read for next CQE notification
|
|
eventfdDesc->async_read_some(
|
|
boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)),
|
|
std::bind(
|
|
&IoUringAssemblyEngine::onEventfdRead, this,
|
|
std::placeholders::_1,
|
|
std::placeholders::_2));
|
|
}
|
|
|
|
void IoUringAssemblyEngine::fillUnAssembledSlotsWithDummyDgrams()
|
|
{
|
|
if (!frameAssemblyDesc)
|
|
{ return; }
|
|
|
|
for (size_t i = 0; i < frameAssemblyDesc->numSlots; ++i)
|
|
{
|
|
// Only fill slots that were not successfully assembled
|
|
if (i >= assembledSlotsTracker.size()
|
|
|| assembledSlotsTracker[i].assembled)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
// Placement construct DummyLivoxEthHeader in the slot
|
|
new (frameAssemblyDesc->slots[i].vaddr) DummyLivoxEthHeader();
|
|
}
|
|
}
|
|
|
|
void IoUringAssemblyEngine::randomDummySlotFiller(AsynchronousLoop& loop)
|
|
{
|
|
if (!frameAssemblyDesc)
|
|
{ return; }
|
|
|
|
// Check if there are already dummies (natural dummy instance)
|
|
uint32_t nSucceeded = loop.nSucceeded.load();
|
|
uint32_t nTotal = loop.nTotal;
|
|
uint32_t nFailed = loop.nFailed.load();
|
|
|
|
if (nFailed > 0 || nSucceeded < nTotal)
|
|
{
|
|
std::cout << __func__ << ": Natural dummy instance detected (nSucceeded="
|
|
<< nSucceeded << ", nTotal=" << nTotal << ", nFailed=" << nFailed
|
|
<< "), skipping artificial dummy creation" << std::endl;
|
|
return;
|
|
}
|
|
|
|
// Randomly select a number of slots to make into dummies (less than total)
|
|
std::uniform_int_distribution<size_t> numDummiesDist(1, nTotal - 1);
|
|
size_t numDummiesToCreate = numDummiesDist(randomGenerator);
|
|
|
|
std::uniform_int_distribution<size_t> slotIndexDist(0, nTotal - 1);
|
|
size_t dummiesCreated = 0;
|
|
size_t maxAttempts = nTotal * 10; // Safety limit to prevent infinite loop
|
|
size_t attempts = 0;
|
|
|
|
// Mark random slots as unassembled
|
|
while (dummiesCreated < numDummiesToCreate && attempts < maxAttempts)
|
|
{
|
|
++attempts;
|
|
size_t randomIndex = slotIndexDist(randomGenerator);
|
|
|
|
// Skip if already unassembled, re-roll
|
|
if (randomIndex >= assembledSlotsTracker.size()
|
|
|| !assembledSlotsTracker[randomIndex].assembled)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
// Mark as unassembled
|
|
assembledSlotsTracker[randomIndex].assembled = false;
|
|
++dummiesCreated;
|
|
}
|
|
|
|
if (dummiesCreated < numDummiesToCreate)
|
|
{
|
|
std::cerr << __func__ << ": Warning: Could only create " << dummiesCreated
|
|
<< " dummy slots out of " << numDummiesToCreate
|
|
<< " requested (max attempts reached)" << std::endl;
|
|
numDummiesToCreate = dummiesCreated;
|
|
}
|
|
|
|
// Update the AsynchronousLoop to reflect the new number of dummies
|
|
// Since we only reach here when nSucceeded == nTotal and nFailed == 0,
|
|
// we can directly calculate the new values
|
|
uint32_t newSucceeded = nTotal - static_cast<uint32_t>(numDummiesToCreate);
|
|
uint32_t newFailed = static_cast<uint32_t>(numDummiesToCreate);
|
|
|
|
loop.nSucceeded.store(newSucceeded);
|
|
loop.nFailed.store(newFailed);
|
|
|
|
std::cout << __func__ << ": Artificially created " << numDummiesToCreate
|
|
<< " dummy slots (nSucceeded: " << nTotal << " -> "
|
|
<< newSucceeded << ", nFailed: 0 -> " << newFailed << ")" << std::endl;
|
|
}
|
|
|
|
void IoUringAssemblyEngine::printSlotBytes(size_t slotIndex, size_t nBytes)
|
|
{
|
|
if (!frameAssemblyDesc)
|
|
{
|
|
std::cerr << __func__ << ": frameAssemblyDesc is null" << std::endl;
|
|
return;
|
|
}
|
|
|
|
if (slotIndex >= frameAssemblyDesc->numSlots)
|
|
{
|
|
std::cerr << __func__ << ": slotIndex " << slotIndex
|
|
<< " out of range (numSlots=" << frameAssemblyDesc->numSlots
|
|
<< ")" << std::endl;
|
|
return;
|
|
}
|
|
|
|
const auto& slot = frameAssemblyDesc->slots[slotIndex];
|
|
size_t bytesToPrint = std::min(nBytes, static_cast<size_t>(slot.nBytes));
|
|
const uint8_t* data = reinterpret_cast<const uint8_t*>(slot.vaddr);
|
|
|
|
std::cout << __func__ << ": Slot " << slotIndex << " vaddr=" << (void*)slot.vaddr
|
|
<< " (" << bytesToPrint
|
|
<< " bytes):" << std::endl;
|
|
|
|
// Print hex dump format: offset | hex bytes | ASCII
|
|
const size_t bytesPerLine = 16;
|
|
for (size_t offset = 0; offset < bytesToPrint; offset += bytesPerLine)
|
|
{
|
|
// Print offset
|
|
std::cout << std::hex << std::setfill('0') << std::setw(4)
|
|
<< offset << ": ";
|
|
|
|
// Print hex bytes
|
|
for (size_t i = 0; i < bytesPerLine; ++i)
|
|
{
|
|
if (offset + i < bytesToPrint)
|
|
{
|
|
std::cout << std::setw(2) << static_cast<unsigned>(data[offset + i])
|
|
<< " ";
|
|
}
|
|
else
|
|
{
|
|
std::cout << " ";
|
|
}
|
|
}
|
|
|
|
// Print ASCII representation
|
|
std::cout << " |";
|
|
for (size_t i = 0; i < bytesPerLine && offset + i < bytesToPrint; ++i)
|
|
{
|
|
uint8_t byte = data[offset + i];
|
|
char c = (byte >= 32 && byte < 127) ? static_cast<char>(byte) : '.';
|
|
std::cout << c;
|
|
}
|
|
std::cout << "|" << std::dec << std::endl;
|
|
}
|
|
}
|
|
|
|
} // namespace stim_buff
|
|
} // namespace smo
|