IoUringAssmEngn: add assembleFrameReq
Invoke it instimFrameProductionTimesliceInd. Also, we discovered: * stream_descriptor::release() doesn't fully release all metadata from the fd it was assigned. This suggests that we should go through the codebase and do: release()=>reset() whenever we wish to release(). * We've confirmed that spinlocks can be used to prevent race conditions between stop() and handler methods.
This commit is contained in:
@@ -1,3 +1,5 @@
|
|||||||
|
#include <config.h>
|
||||||
|
#include <opts.h>
|
||||||
#include <boostAsioLinkageFix.h>
|
#include <boostAsioLinkageFix.h>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
@@ -11,6 +13,11 @@
|
|||||||
#include <boost/system/error_code.hpp>
|
#include <boost/system/error_code.hpp>
|
||||||
#include <livoxProto1/device.h>
|
#include <livoxProto1/device.h>
|
||||||
#include <livoxProto1/livoxProto1.h>
|
#include <livoxProto1/livoxProto1.h>
|
||||||
|
#include <asynchronousContinuation.h>
|
||||||
|
#include <asynchronousLoop.h>
|
||||||
|
#include <asynchronousBridge.h>
|
||||||
|
#include <callback.h>
|
||||||
|
#include <spinLock.h>
|
||||||
#include "ioUringAssemblyEngine.h"
|
#include "ioUringAssemblyEngine.h"
|
||||||
#include "pcloudStimulusBuffer.h"
|
#include "pcloudStimulusBuffer.h"
|
||||||
#include "livoxGen1.h"
|
#include "livoxGen1.h"
|
||||||
@@ -41,7 +48,8 @@ IoUringAssemblyEngine::IoUringAssemblyEngine(PcloudStimulusBuffer& parent_)
|
|||||||
frameAssemblyDesc(nullptr), ring{},
|
frameAssemblyDesc(nullptr), ring{},
|
||||||
isSetup(false),
|
isSetup(false),
|
||||||
eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0),
|
eventfdFd(-1), eventfdDesc(nullptr), eventfd_value(0),
|
||||||
stallTimer(parent_.device->componentThread->getIoService())
|
stallTimer(parent_.device->componentThread->getIoService()),
|
||||||
|
isAssembling(false)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
bool IoUringAssemblyEngine::setup()
|
bool IoUringAssemblyEngine::setup()
|
||||||
@@ -139,7 +147,7 @@ void IoUringAssemblyEngine::finalize()
|
|||||||
}
|
}
|
||||||
|
|
||||||
void IoUringAssemblyEngine::resetAndAssembleFrame(
|
void IoUringAssemblyEngine::resetAndAssembleFrame(
|
||||||
std::function<void(void*)> onCqeReady)
|
resetAndAssembleFrameCbFn onCqeReady)
|
||||||
{
|
{
|
||||||
if (!onCqeReady)
|
if (!onCqeReady)
|
||||||
{
|
{
|
||||||
@@ -236,6 +244,8 @@ void IoUringAssemblyEngine::resetAndAssembleFrame(
|
|||||||
+ " (errno=" + std::to_string(errno) + ")");
|
+ " (errno=" + std::to_string(errno) + ")");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set assembly flag
|
||||||
|
isAssembling = true;
|
||||||
// Start listening for CQE notifications on eventfd
|
// Start listening for CQE notifications on eventfd
|
||||||
eventfdDesc->async_read_some(
|
eventfdDesc->async_read_some(
|
||||||
boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)),
|
boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)),
|
||||||
@@ -245,8 +255,35 @@ void IoUringAssemblyEngine::resetAndAssembleFrame(
|
|||||||
std::placeholders::_2));
|
std::placeholders::_2));
|
||||||
}
|
}
|
||||||
|
|
||||||
void IoUringAssemblyEngine::stop()
|
void IoUringAssemblyEngine::stop(bool doAcquireLock)
|
||||||
{
|
{
|
||||||
|
// Clear assembly flag first to signal onEventfdRead to stop re-arming
|
||||||
|
// Acquire and release lock tightly around setting the flag
|
||||||
|
if (doAcquireLock)
|
||||||
|
{
|
||||||
|
SpinLock::Guard lock(isAssemblingLock);
|
||||||
|
isAssembling = false;
|
||||||
|
} else {
|
||||||
|
isAssembling = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** FIXME:
|
||||||
|
* There's a problem with this bridge here.
|
||||||
|
*
|
||||||
|
* We can't delay during every call to stop because under normal operating
|
||||||
|
* conditions, this whole assembly process should be able to move as fast
|
||||||
|
* as possible and to receive as much data as possible without maximum
|
||||||
|
* throughput.
|
||||||
|
*
|
||||||
|
* Yet we need to delay briefly here to ensure that the onEventfdRead loop
|
||||||
|
* has a chance to see the flag and halt.
|
||||||
|
*
|
||||||
|
* We need to analyze this carefully and figure out what the correct
|
||||||
|
* conditions are for being certain that we aren't destroying state while
|
||||||
|
* the eventfdRead loop is still running; and we need to figure out how to
|
||||||
|
* ensure that we only delay when absolutely necessary.
|
||||||
|
*/
|
||||||
|
|
||||||
// Cancel in-flight stall timeout timer
|
// Cancel in-flight stall timeout timer
|
||||||
stallTimer.cancel();
|
stallTimer.cancel();
|
||||||
|
|
||||||
@@ -290,7 +327,7 @@ void IoUringAssemblyEngine::stop()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!sawCancelCqe) {
|
if (!sawCancelCqe && OptionParser::getOptions().verbose) {
|
||||||
std::cerr << __func__ << ": no CQE seen for cancel operation\n";
|
std::cerr << __func__ << ": no CQE seen for cancel operation\n";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -307,12 +344,178 @@ cleanup_eventfd:
|
|||||||
* itself in the next resetAndAssembleFrame() cycle, so we call
|
* itself in the next resetAndAssembleFrame() cycle, so we call
|
||||||
* release() instead of reset() to ensure that the underlying fd
|
* release() instead of reset() to ensure that the underlying fd
|
||||||
* is not closed.
|
* is not closed.
|
||||||
|
*
|
||||||
|
* However, we need to close the descriptor's association with the
|
||||||
|
* io_service before releasing it, otherwise Boost.Asio will complain
|
||||||
|
* when we try to create a new descriptor with the same fd.
|
||||||
*/
|
*/
|
||||||
eventfdDesc->cancel();
|
eventfdDesc->cancel();
|
||||||
eventfdDesc.release();
|
eventfdDesc->release();
|
||||||
|
/* Destroy the descriptor object (now that it's unregistered, destroying
|
||||||
|
* it won't close the fd since release() transferred ownership back)
|
||||||
|
*/
|
||||||
|
eventfdDesc.reset();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
onCqeReadyCallback = nullptr;
|
// Continuation class for assembleFrameReq
|
||||||
|
class IoUringAssemblyEngine::AssembleFrameReq
|
||||||
|
: public PostedAsynchronousContinuation<
|
||||||
|
IoUringAssemblyEngine::assembleFrameReqCbFn>
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
AssembleFrameReq(
|
||||||
|
IoUringAssemblyEngine& engine_,
|
||||||
|
const std::shared_ptr<ComponentThread>& caller,
|
||||||
|
Callback<IoUringAssemblyEngine::assembleFrameReqCbFn> cb)
|
||||||
|
: PostedAsynchronousContinuation<
|
||||||
|
IoUringAssemblyEngine::assembleFrameReqCbFn>(caller, cb),
|
||||||
|
engine(engine_),
|
||||||
|
loop(engine_.frameAssemblyDesc->numSlots),
|
||||||
|
timerFired(false), handlerExecuted(false)
|
||||||
|
{}
|
||||||
|
|
||||||
|
public:
|
||||||
|
void assembleFrameReq1_posted(
|
||||||
|
std::shared_ptr<AssembleFrameReq> context)
|
||||||
|
{
|
||||||
|
if (!engine.frameAssemblyDesc)
|
||||||
|
{
|
||||||
|
throw std::runtime_error(std::string(__func__)
|
||||||
|
+ ": frameAssemblyDesc is null");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize loop with number of slots
|
||||||
|
context->loop = AsynchronousLoop(engine.frameAssemblyDesc->numSlots);
|
||||||
|
|
||||||
|
/** FIXME:
|
||||||
|
* I'm suspicious of this std::bind return object here. What if us
|
||||||
|
* setting it to null inside of stop() doesn't actually cause the
|
||||||
|
* object to be destroyed? This would cause this contin's sh_ptr's
|
||||||
|
* reference count to never reach 0, causing a memory leak.
|
||||||
|
*/
|
||||||
|
engine.resetAndAssembleFrame(
|
||||||
|
std::bind(&AssembleFrameReq::assembleFrameReq2_2,
|
||||||
|
context.get(), context,
|
||||||
|
std::placeholders::_1, std::placeholders::_2));
|
||||||
|
|
||||||
|
// Set up timeout timer for CONFIG_STIMBUFF_FRAME_PERIOD_MS/2 ms
|
||||||
|
engine.stallTimer.expires_from_now(
|
||||||
|
boost::posix_time::milliseconds(
|
||||||
|
CONFIG_STIMBUFF_FRAME_PERIOD_MS / 2));
|
||||||
|
engine.stallTimer.async_wait(
|
||||||
|
std::bind(&AssembleFrameReq::assembleFrameReq2_1,
|
||||||
|
context.get(), context,
|
||||||
|
std::placeholders::_1));
|
||||||
|
}
|
||||||
|
|
||||||
|
void assembleFrameReq2_1(
|
||||||
|
std::shared_ptr<AssembleFrameReq> context,
|
||||||
|
const boost::system::error_code& error)
|
||||||
|
{
|
||||||
|
// Check if timer was cancelled (ignore if operation_aborted)
|
||||||
|
if (error == boost::asio::error::operation_aborted) { return; }
|
||||||
|
|
||||||
|
// Set timer fired flag
|
||||||
|
context->timerFired.store(true);
|
||||||
|
context->assembleFrameReq3(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
void assembleFrameReq2_2(
|
||||||
|
std::shared_ptr<AssembleFrameReq> context,
|
||||||
|
void *user_data, int cqe_result)
|
||||||
|
{
|
||||||
|
(void)user_data; // Not used - we just track success/failure counts
|
||||||
|
|
||||||
|
// Caller decides success: result >= 0 means success
|
||||||
|
bool success = (cqe_result >= 0);
|
||||||
|
if (context->loop.incrementSuccessOrFailureAndTestForCompletionDueTo(
|
||||||
|
success))
|
||||||
|
{
|
||||||
|
// Loop is complete, call oracle function
|
||||||
|
context->assembleFrameReq3(context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void assembleFrameReq3(std::shared_ptr<AssembleFrameReq> context)
|
||||||
|
{
|
||||||
|
// Ensure we only execute once using atomic exchange
|
||||||
|
if (context->handlerExecuted.exchange(true)) { return; }
|
||||||
|
// Cancel the timer, stop the engine and process frame, if any.
|
||||||
|
context->engine.stop(false);
|
||||||
|
|
||||||
|
/** EXPLANATION:
|
||||||
|
* Timeout doesn't necessarily mean error.
|
||||||
|
*
|
||||||
|
* If we received zero dgrams from the device, that is indeed an error.
|
||||||
|
* But if we received some dgrams, but not all, that is not an error:
|
||||||
|
* it just means we didn't receive as much data as we would have liked.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// Error: no slots succeeded - no data received successfully.
|
||||||
|
if (context->loop.nSucceeded.load() == 0)
|
||||||
|
{
|
||||||
|
context->callOriginalCb(false, context->loop);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (context->loop.nSucceeded.load() >= context->loop.nTotal)
|
||||||
|
{
|
||||||
|
// Success: all or more slots succeeded
|
||||||
|
if (context->loop.nSucceeded.load() > context->loop.nTotal)
|
||||||
|
{
|
||||||
|
std::cerr << __func__ << ": nSucceeded > nTotal: succ ("
|
||||||
|
<< context->loop.nSucceeded.load()
|
||||||
|
<< ") > nTotal (" << context->loop.nTotal << ")\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
context->callOriginalCb(true, context->loop);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (context->loop.nSucceeded.load() < context->loop.nTotal)
|
||||||
|
{
|
||||||
|
// Success: some slots succeeded (less than total)
|
||||||
|
// Note: dummy fill for un-assembled slots will be implemented later
|
||||||
|
context->callOriginalCb(true, context->loop);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OptionParser::getOptions().verbose)
|
||||||
|
{
|
||||||
|
std::cerr << __func__ << ": Invalid state: nSucceeded ("
|
||||||
|
<< context->loop.nSucceeded.load()
|
||||||
|
<< ") < nTotal (" << context->loop.nTotal << ")" << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
context->callOriginalCb(false, context->loop);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
IoUringAssemblyEngine& engine;
|
||||||
|
AsynchronousLoop loop;
|
||||||
|
std::atomic<bool> timerFired;
|
||||||
|
std::atomic<bool> handlerExecuted;
|
||||||
|
};
|
||||||
|
|
||||||
|
void IoUringAssemblyEngine::assembleFrameReq(
|
||||||
|
Callback<assembleFrameReqCbFn> cb)
|
||||||
|
{
|
||||||
|
if (!frameAssemblyDesc)
|
||||||
|
{
|
||||||
|
throw std::runtime_error(std::string(__func__)
|
||||||
|
+ ": frameAssemblyDesc is null");
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto& caller = smoHooksPtr->ComponentThread_getSelf();
|
||||||
|
auto request = std::make_shared<AssembleFrameReq>(
|
||||||
|
*this, caller, std::move(cb));
|
||||||
|
|
||||||
|
parent.device->componentThread->getIoService().post(
|
||||||
|
std::bind(
|
||||||
|
&AssembleFrameReq::assembleFrameReq1_posted,
|
||||||
|
request.get(), request));
|
||||||
}
|
}
|
||||||
|
|
||||||
void IoUringAssemblyEngine::onEventfdRead(
|
void IoUringAssemblyEngine::onEventfdRead(
|
||||||
@@ -322,8 +525,15 @@ void IoUringAssemblyEngine::onEventfdRead(
|
|||||||
(void)bytes_transferred;
|
(void)bytes_transferred;
|
||||||
|
|
||||||
// Ignore cancellation errors
|
// Ignore cancellation errors
|
||||||
if (error == boost::asio::error::operation_aborted)
|
if (error == boost::asio::error::operation_aborted) { return; }
|
||||||
{ return; }
|
|
||||||
|
/** EXPLANATION:
|
||||||
|
* This lock should be held throughout this method to ensure that the
|
||||||
|
* IoUringAssemblyEngine's per-assembly state isn't destroyed while this
|
||||||
|
* handler is running.
|
||||||
|
*/
|
||||||
|
SpinLock::Guard lock(isAssemblingLock);
|
||||||
|
if (!isAssembling) { return; }
|
||||||
|
|
||||||
/** FIXME:
|
/** FIXME:
|
||||||
* It may be necessary to specifically check for and handle the cancel op
|
* It may be necessary to specifically check for and handle the cancel op
|
||||||
@@ -336,11 +546,14 @@ void IoUringAssemblyEngine::onEventfdRead(
|
|||||||
{
|
{
|
||||||
// Get user_data from the CQE
|
// Get user_data from the CQE
|
||||||
void* user_data = io_uring_cqe_get_data(cqe);
|
void* user_data = io_uring_cqe_get_data(cqe);
|
||||||
|
// Get result from the CQE
|
||||||
|
int cqe_result = cqe->res;
|
||||||
// Mark the CQE as seen
|
// Mark the CQE as seen
|
||||||
io_uring_cqe_seen(&ring, cqe);
|
io_uring_cqe_seen(&ring, cqe);
|
||||||
|
|
||||||
/** EXPLANATION:
|
/** EXPLANATION:
|
||||||
* Call the user-provided callback for this CQE with its user_data.
|
* Call the user-provided callback for this CQE with its user_data and
|
||||||
|
* result.
|
||||||
*
|
*
|
||||||
* 1. Notice that we call the caller's cb *after* marking the CQE as
|
* 1. Notice that we call the caller's cb *after* marking the CQE as
|
||||||
* seen. We may later need to change this if the caller needs
|
* seen. We may later need to change this if the caller needs
|
||||||
@@ -351,16 +564,18 @@ void IoUringAssemblyEngine::onEventfdRead(
|
|||||||
* because of this.
|
* because of this.
|
||||||
*/
|
*/
|
||||||
if (onCqeReadyCallback) {
|
if (onCqeReadyCallback) {
|
||||||
onCqeReadyCallback(user_data);
|
onCqeReadyCallback(user_data, cqe_result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Re-arm the eventfd read for next CQE notification
|
// Re-arm the eventfd read for next CQE notification
|
||||||
|
// Only re-arm if assembly is still active (stop() hasn't been called)
|
||||||
if (eventfdDesc && eventfdFd >= 0)
|
if (eventfdDesc && eventfdFd >= 0)
|
||||||
{
|
{
|
||||||
eventfdDesc->async_read_some(
|
eventfdDesc->async_read_some(
|
||||||
boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)),
|
boost::asio::buffer(&eventfd_value, sizeof(eventfd_value)),
|
||||||
std::bind(&IoUringAssemblyEngine::onEventfdRead, this,
|
std::bind(
|
||||||
|
&IoUringAssemblyEngine::onEventfdRead, this,
|
||||||
std::placeholders::_1,
|
std::placeholders::_1,
|
||||||
std::placeholders::_2));
|
std::placeholders::_2));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,6 +14,10 @@
|
|||||||
#include <boost/asio/deadline_timer.hpp>
|
#include <boost/asio/deadline_timer.hpp>
|
||||||
#include <boost/asio/posix/stream_descriptor.hpp>
|
#include <boost/asio/posix/stream_descriptor.hpp>
|
||||||
#include <livoxProto1/device.h>
|
#include <livoxProto1/device.h>
|
||||||
|
#include <asynchronousContinuation.h>
|
||||||
|
#include <asynchronousLoop.h>
|
||||||
|
#include <callback.h>
|
||||||
|
#include <spinLock.h>
|
||||||
#include "frameAssemblyDesc.h"
|
#include "frameAssemblyDesc.h"
|
||||||
|
|
||||||
namespace smo {
|
namespace smo {
|
||||||
@@ -29,9 +33,13 @@ public:
|
|||||||
|
|
||||||
bool setup();
|
bool setup();
|
||||||
void finalize();
|
void finalize();
|
||||||
void resetAndAssembleFrame(
|
|
||||||
std::function<void(void*)> onCqeReady);
|
typedef std::function<void(void*, int)> resetAndAssembleFrameCbFn;
|
||||||
void stop();
|
void resetAndAssembleFrame(resetAndAssembleFrameCbFn onCqeReady);
|
||||||
|
void stop(bool doAcquireLock = true);
|
||||||
|
|
||||||
|
typedef std::function<void(bool, AsynchronousLoop)> assembleFrameReqCbFn;
|
||||||
|
void assembleFrameReq(Callback<assembleFrameReqCbFn> cb);
|
||||||
|
|
||||||
// Telemetry helpers
|
// Telemetry helpers
|
||||||
static size_t computePointsPerDgram(int returnMode);
|
static size_t computePointsPerDgram(int returnMode);
|
||||||
@@ -57,12 +65,19 @@ private:
|
|||||||
|
|
||||||
// Stall detection timer
|
// Stall detection timer
|
||||||
boost::asio::deadline_timer stallTimer;
|
boost::asio::deadline_timer stallTimer;
|
||||||
// Callback for CQE notifications (called with user_data from each CQE)
|
// Callback for CQE ntfns (called with user_data+result from each CQE)
|
||||||
std::function<void(void*)> onCqeReadyCallback;
|
resetAndAssembleFrameCbFn onCqeReadyCallback;
|
||||||
|
// Flag to indicate assembly is in progress (cleared by stop())
|
||||||
|
// Protected by isAssemblingLock
|
||||||
|
SpinLock isAssemblingLock;
|
||||||
|
bool isAssembling;
|
||||||
|
|
||||||
void cancelIncompleteAndFillDummies();
|
void cancelIncompleteAndFillDummies();
|
||||||
void onEventfdRead(
|
void onEventfdRead(
|
||||||
const boost::system::error_code& error, std::size_t bytes_transferred);
|
const boost::system::error_code& error, std::size_t bytes_transferred);
|
||||||
|
|
||||||
|
class AssembleFrameReq;
|
||||||
|
friend class AssembleFrameReq;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace stim_buff
|
} // namespace stim_buff
|
||||||
|
|||||||
@@ -30,6 +30,8 @@ struct LivoxProto1DllState
|
|||||||
|
|
||||||
extern LivoxProto1DllState livoxProto1;
|
extern LivoxProto1DllState livoxProto1;
|
||||||
|
|
||||||
|
extern const SmoCallbacks* smoHooksPtr;
|
||||||
|
|
||||||
} // namespace stim_buff
|
} // namespace stim_buff
|
||||||
} // namespace smo
|
} // namespace smo
|
||||||
|
|
||||||
|
|||||||
@@ -68,6 +68,16 @@ void PcloudStimulusBuffer::stop()
|
|||||||
|
|
||||||
void PcloudStimulusBuffer::stimFrameProductionTimesliceInd()
|
void PcloudStimulusBuffer::stimFrameProductionTimesliceInd()
|
||||||
{
|
{
|
||||||
|
ioUringAssemblyEngine.assembleFrameReq(
|
||||||
|
{nullptr, [this](bool success, AsynchronousLoop loop) {
|
||||||
|
if (!success) {
|
||||||
|
std::cerr << __func__ << ": Failed to assemble frame" << std::endl;
|
||||||
|
} else {
|
||||||
|
std::cout << __func__ << ": Successfully assembled frame "
|
||||||
|
<< loop.nSucceeded.load() << " slots succeeded "
|
||||||
|
<< "out of " << loop.nTotal << " total slots" << std::endl;
|
||||||
|
}
|
||||||
|
}});
|
||||||
// Release the spinlock for now
|
// Release the spinlock for now
|
||||||
frameAssemblyRateLimiter.release();
|
frameAssemblyRateLimiter.release();
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user