From 816a047920d543e0d2968792d7cc54b7854a86dc Mon Sep 17 00:00:00 2001 From: Hayodea Hakol Date: Wed, 17 Sep 2025 16:32:20 -0400 Subject: [PATCH] Async: new hierachy; manages reply posting and unlocking Async: Use new [Non]PostedAsyncCont and callOriginalCb This new hierarchy of classes gives us a central mechanism for managing both reply-posting and lockSpec unlocking. * callOriginalCb: Now uses a modern C++ variadic template design enabling it to handle both direct calling and std::bind() re-binding of an arbitrary number of arguments from the caller. This enables us to mostly eliminate the repeated, bespoke definitions of callOriginalCb littered throughout the codebase. We've also propagated these changes throughout the codebase in this patch. --- commonLibs/livoxProto1/core.cpp | 14 ++-- commonLibs/livoxProto1/device.cpp | 39 ++++----- include/asynchronousContinuation.h | 103 ++++++++++++++++++------ smocore/body/body.cpp | 19 +---- smocore/componentThread.cpp | 30 ++----- smocore/deviceManager/deviceManager.cpp | 17 +--- smocore/marionette/lifetime.cpp | 26 ++---- smocore/mind.cpp | 32 +++----- smocore/senseApis/senseApiManager.cpp | 34 ++------ 9 files changed, 145 insertions(+), 169 deletions(-) diff --git a/commonLibs/livoxProto1/core.cpp b/commonLibs/livoxProto1/core.cpp index c439623..9f09143 100644 --- a/commonLibs/livoxProto1/core.cpp +++ b/commonLibs/livoxProto1/core.cpp @@ -72,7 +72,8 @@ std::optional> DeviceManager::getDevice( // GetOrCreateDeviceReq nested class implementation class DeviceManager::GetOrCreateDeviceReq -: public smo::AsynchronousContinuation +: public smo::NonPostedAsynchronousContinuation< + livoxProto1_getOrCreateDeviceReqCbFn> { public: DeviceManager& deviceManager; @@ -84,14 +85,14 @@ public: DeviceManager& mgr, std::shared_ptr device, livoxProto1_getOrCreateDeviceReqCbFn cb) - : smo::AsynchronousContinuation< + : smo::NonPostedAsynchronousContinuation< livoxProto1_getOrCreateDeviceReqCbFn>(std::move(cb)), deviceManager(mgr), pendingDevice(device) {} // Public accessor for the original callback void callOriginalCallback(bool success, std::shared_ptr device) - { originalCbFn(success, device); } + { callOriginalCb(success, device); } void callOriginalCallbackWithFailure() { callOriginalCallback(false, nullptr); } @@ -176,7 +177,8 @@ void DeviceManager::getOrCreateDeviceReq( } class DeviceManager::DestroyDeviceReq -: public smo::AsynchronousContinuation +: public smo::NonPostedAsynchronousContinuation< + livoxProto1_destroyDeviceReqCbFn> { public: DeviceManager& deviceManager; @@ -187,14 +189,14 @@ public: DeviceManager& mgr, std::shared_ptr device, livoxProto1_destroyDeviceReqCbFn cb) - : smo::AsynchronousContinuation< + : smo::NonPostedAsynchronousContinuation< livoxProto1_destroyDeviceReqCbFn>(std::move(cb)), deviceManager(mgr), pendingDevice(device) {} // Public accessor for the original callback void callOriginalCallback(bool success) - { originalCbFn(success); } + { callOriginalCb(success); } void callOriginalCallbackWithFailure() { callOriginalCallback(false); } diff --git a/commonLibs/livoxProto1/device.cpp b/commonLibs/livoxProto1/device.cpp index cb49666..a69657c 100644 --- a/commonLibs/livoxProto1/device.cpp +++ b/commonLibs/livoxProto1/device.cpp @@ -120,7 +120,7 @@ Device::~Device() * This class manages the overall device connection process including handshake and heartbeat setup */ class Device::ConnectReq -: public smo::AsynchronousContinuation +: public smo::NonPostedAsynchronousContinuation { private: Device& device; @@ -128,7 +128,7 @@ private: public: ConnectReq(Device& dev, Device::connectReqCbFn cb) - : smo::AsynchronousContinuation( + : smo::NonPostedAsynchronousContinuation( std::move(cb)), device(dev) {} @@ -148,7 +148,7 @@ public: // Store the handshake FD for heartbeats context->device.heartbeatFd = fd; context->device.startHeartbeat(); - context->originalCbFn(true); + context->callOriginalCb(true); return; } @@ -177,7 +177,7 @@ public: // Store the handshake FD for heartbeats context->device.heartbeatFd = fd; context->device.startHeartbeat(); - context->originalCbFn(true); + context->callOriginalCb(true); return; } @@ -208,7 +208,7 @@ public: { if (error) { - context->originalCbFn(false); + context->callOriginalCb(false); return; } @@ -234,12 +234,12 @@ public: context->device.discoveredDevice.ipAddr = ipAddr; context->device.heartbeatFd = fd; context->device.startHeartbeat(); - context->originalCbFn(true); + context->callOriginalCb(true); return; } // All connection attempts failed - context->originalCbFn(false); + context->callOriginalCb(false); } }; @@ -261,7 +261,8 @@ void Device::connectReq(Device::connectReqCbFn callback) } class Device::ConnectToKnownDeviceReq -: public smo::AsynchronousContinuation +: public smo::NonPostedAsynchronousContinuation< + Device::connectToKnownDeviceReqCbFn> { public: Device& device; @@ -269,13 +270,13 @@ public: std::shared_ptr deviceInfo; ConnectToKnownDeviceReq(Device& dev, Device::connectToKnownDeviceReqCbFn cb) - : smo::AsynchronousContinuation( - std::move(cb)), device(dev) + : smo::NonPostedAsynchronousContinuation< + Device::connectToKnownDeviceReqCbFn>(std::move(cb)), device(dev) {} // Public accessor for the original callback void callOriginalCallback(bool success, const std::string& ipAddr, int fd) - { originalCbFn(success, ipAddr, fd); } + { callOriginalCb(success, ipAddr, fd); } // Wrapper for failure cases void callOriginalCallbackWithFailure() @@ -363,7 +364,7 @@ void Device::connectToKnownDeviceReq( } class Device::ConnectByDeviceIdentifierReq -: public smo::AsynchronousContinuation< +: public smo::NonPostedAsynchronousContinuation< Device::connectByDeviceIdentifierReqCbFn> { public: @@ -372,13 +373,14 @@ public: ConnectByDeviceIdentifierReq( Device& dev, Device::connectByDeviceIdentifierReqCbFn cb) - : smo::AsynchronousContinuation( - std::move(cb)), device(dev) + : smo::NonPostedAsynchronousContinuation< + Device::connectByDeviceIdentifierReqCbFn>( + std::move(cb)), device(dev) {} // Public accessor for the original callback void callOriginalCallback(bool success, const std::string& ipAddr, int fd) - { originalCbFn(success, ipAddr, fd); } + { callOriginalCb(success, ipAddr, fd); } // Wrapper for failure cases void callOriginalCallbackWithFailure() @@ -445,7 +447,8 @@ void Device::connectByDeviceIdentifierReq( } class Device::ExecuteHandshakeReq -: public smo::AsynchronousContinuation +: public smo::NonPostedAsynchronousContinuation< + Device::executeHandshakeReqCbFn> { public: friend void Device::executeHandshakeReq( @@ -482,7 +485,7 @@ public: ExecuteHandshakeReq( Device& dev, const std::string& deviceIP, Device::executeHandshakeReqCbFn cb) - : smo::AsynchronousContinuation( + : smo::NonPostedAsynchronousContinuation( std::move(cb)), device(dev), deviceIP(deviceIP), handshakeFdDesc(device.componentThread->getIoService()), @@ -497,7 +500,7 @@ public: // Public accessor for the original callback void callOriginalCallback(bool success, int fd) - { originalCbFn(success, fd); } + { callOriginalCb(success, fd); } void callOriginalCallbackWithFailure() { diff --git a/include/asynchronousContinuation.h b/include/asynchronousContinuation.h index 3259390..b147280 100644 --- a/include/asynchronousContinuation.h +++ b/include/asynchronousContinuation.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace smo { @@ -15,20 +16,6 @@ namespace smo { * * The template parameter OriginalCbFnT represents the signature of the original * callback that will be invoked when the async sequence completes. - * - * Usage: - * class MyAsyncReq - * : public AsynchronousContinuation> - * { - * public: - * MyAsyncReq(std::function originalCbFn) - * : AsynchronousContinuation(originalCbFn) - * {} - * - * // Segment methods take only the shared_ptr for lifetime management - * void myAsyncReq1(std::shared_ptr context); - * void myAsyncReq2(std::shared_ptr context); - * }; */ template class AsynchronousContinuation @@ -46,33 +33,101 @@ public: std::shared_ptr> lifetimePreservingConveyance); -protected: + /** + * @brief Call the original callback with perfect forwarding + * + * IMPORTANT: This method cannot be virtual because templates cannot be + * virtual in C++. Therefore, this method MUST be called from the + * most-derived class reference/pointer context. Never call this method + * through a polymorphic base class reference/pointer, as it will not + * dispatch to the correct derived class implementation. + * + * @param args Arguments to forward to the original callback + */ + template + void callOriginalCb(Args&&... args) + { + if (originalCbFn) { originalCbFn(std::forward(args)...); } + } + +public: OriginalCbFnT originalCbFn; }; -class ContinuationTarget +/** + * NonPostedAsynchronousContinuation - For continuations that don't post + * callbacks + * + * Note: We intentionally do not create a + * LockedNonPostedAsynchronousContinuation because the only way to implement + * non-posted locking would be via busy-spinning or sleeplocks. This would + * eliminate the throughput advantage from our Qspinning mechanism, which + * relies on re-posting to the io_service queue when locks are unavailable. + */ +template +class NonPostedAsynchronousContinuation +: public AsynchronousContinuation { public: - ContinuationTarget( - const std::shared_ptr &caller) - : caller(caller) + explicit NonPostedAsynchronousContinuation(OriginalCbFnT originalCbFn) + : AsynchronousContinuation(originalCbFn) {} public: - const std::shared_ptr caller; + using AsynchronousContinuation::callOriginalCb; }; template -class TargetedAsynchronousContinuation -: public AsynchronousContinuation, public ContinuationTarget +class PostedAsynchronousContinuation +: public AsynchronousContinuation { public: - TargetedAsynchronousContinuation( + PostedAsynchronousContinuation( const std::shared_ptr &caller, OriginalCbFnT originalCbFn) : AsynchronousContinuation(originalCbFn), - ContinuationTarget(caller) + caller(caller) {} + + template + void callOriginalCb(Args&&... args) + { + if (AsynchronousContinuation::originalCbFn) { + caller->getIoService().post( + std::bind( + AsynchronousContinuation::originalCbFn, + std::forward(args)...)); + } + } + +public: + std::shared_ptr caller; + using AsynchronousContinuation::callOriginalCb; +}; + +template +class SerializedAsynchronousContinuation +: public PostedAsynchronousContinuation, public LockSpec +{ +public: + SerializedAsynchronousContinuation( + const std::shared_ptr &caller, + OriginalCbFnT originalCbFn, + std::vector> requiredLocks = {}) + : PostedAsynchronousContinuation(caller, originalCbFn), + LockSpec(std::move(requiredLocks)) + {} + + template + void callOriginalCb(Args&&... args) + { + LockSpec::release(); + PostedAsynchronousContinuation::callOriginalCb( + std::forward(args)...); + } + +public: + using AsynchronousContinuation::callOriginalCb; }; } // namespace smo diff --git a/smocore/body/body.cpp b/smocore/body/body.cpp index 87d5899..57f0c12 100644 --- a/smocore/body/body.cpp +++ b/smocore/body/body.cpp @@ -16,27 +16,16 @@ Body::Body(Mind &parent, const std::shared_ptr &thread) } class Body::InitializeReq -: public AsynchronousContinuation, - public ContinuationTarget +: public PostedAsynchronousContinuation { public: InitializeReq( Mind &parent, const std::shared_ptr &caller, bodyLifetimeMgmtOpCbFn callback) - : AsynchronousContinuation(callback), - ContinuationTarget(caller), + : PostedAsynchronousContinuation(caller, callback), parent(parent) {} - void callOriginalCbFn(bool success) - { - if (originalCbFn) - { - caller->getIoService().post( - std::bind(originalCbFn, success)); - } - } - private: Mind &parent; @@ -104,7 +93,7 @@ public: << results.nSucceeded << " of " << results.nTotal << " sense devices." << "\n"; - callOriginalCbFn(results.nSucceeded > 0); + callOriginalCb(results.nSucceeded > 0); } }; @@ -148,7 +137,7 @@ public: std::cout << "Mrntt: About to unload all sense api libs." << "\n"; sense_api::SenseApiManager::getInstance().unloadAllSenseApiLibs(); - callOriginalCbFn(results.nSucceeded == results.nTotal); + callOriginalCb(results.nSucceeded == results.nTotal); } }; diff --git a/smocore/componentThread.cpp b/smocore/componentThread.cpp index aa059e2..5d1bde2 100644 --- a/smocore/componentThread.cpp +++ b/smocore/componentThread.cpp @@ -105,32 +105,18 @@ void MindThread::main(MindThread& self) } class MindThread::ThreadLifetimeMgmtOp -: public TargetedAsynchronousContinuation +: public PostedAsynchronousContinuation { public: ThreadLifetimeMgmtOp( const std::shared_ptr &caller, const std::shared_ptr &target, threadLifetimeMgmtOpCbFn callback) - : TargetedAsynchronousContinuation( + : PostedAsynchronousContinuation( caller, callback), target(target) {} - void callOriginalCbFn(void) - { - if (originalCbFn) - { - /** EXPLANATION: - * This is only permissible because originalCbFn doesn't take any - * further arguments. If we had to bind argument to it before - * posting it, we'd have to std::bind them and then post the - * resulting function object. - */ - caller->getIoService().post(originalCbFn); - } - } - public: const std::shared_ptr target; @@ -144,7 +130,7 @@ public: << "\n"; target->io_service.stop(); - callOriginalCbFn(); + callOriginalCb(); } void startThreadReq1_posted( @@ -158,7 +144,7 @@ public: // Execute private setup sequence here // This is where each thread would implement its specific initialization - callOriginalCbFn(); + callOriginalCb(); } void exitThreadReq1_mainQueue_posted( @@ -170,7 +156,7 @@ public: target->cleanup(); target->io_service.stop(); - callOriginalCbFn(); + callOriginalCb(); } void exitThreadReq1_pauseQueue_posted( @@ -183,7 +169,7 @@ public: target->cleanup(); target->pause_io_service.stop(); target->io_service.stop(); - callOriginalCbFn(); + callOriginalCb(); } void pauseThreadReq1_posted( @@ -197,7 +183,7 @@ public: * our next operation is going to block the thread, so it won't * have a chance to invoke the callback until it's unblocked. */ - callOriginalCbFn(); + callOriginalCb(); target->pause_io_service.reset(); target->pause_io_service.run(); } @@ -210,7 +196,7 @@ public: "resumeThread." << "\n"; target->pause_io_service.stop(); - callOriginalCbFn(); + callOriginalCb(); } }; diff --git a/smocore/deviceManager/deviceManager.cpp b/smocore/deviceManager/deviceManager.cpp index f3687e7..5afe19a 100644 --- a/smocore/deviceManager/deviceManager.cpp +++ b/smocore/deviceManager/deviceManager.cpp @@ -39,31 +39,18 @@ const std::string DeviceManager::stringifyDeviceSpecs(void) } class DeviceManager::NewDeviceAttachmentSpecInd -: public TargetedAsynchronousContinuation +: public PostedAsynchronousContinuation { public: NewDeviceAttachmentSpecInd( std::shared_ptr s, const std::shared_ptr &caller, newDeviceAttachmentSpecIndCbFn cb) - : TargetedAsynchronousContinuation( + : PostedAsynchronousContinuation( caller, cb), spec(s) {} - void callOriginalCb( - bool success, - std::shared_ptr device, - std::shared_ptr deviceSpec) - { - if (originalCbFn) - { - caller->getIoService().post( - std::bind( - originalCbFn, success, device, deviceSpec)); - } - } - public: std::shared_ptr spec; diff --git a/smocore/marionette/lifetime.cpp b/smocore/marionette/lifetime.cpp index f8df6d5..faac941 100644 --- a/smocore/marionette/lifetime.cpp +++ b/smocore/marionette/lifetime.cpp @@ -10,27 +10,17 @@ namespace smo { namespace mrntt { class MarionetteComponent::MrnttLifetimeMgmtOp -: public AsynchronousContinuation, - public ContinuationTarget +: public PostedAsynchronousContinuation { public: MrnttLifetimeMgmtOp( MarionetteComponent &parent, const std::shared_ptr &caller, mrnttLifetimeMgmtOpCbFn callback) - : AsynchronousContinuation(callback), - ContinuationTarget(caller), - parent(parent) + : PostedAsynchronousContinuation( + caller, callback), + parent(parent) {} - void callOriginalCbFn(bool success) - { - if (originalCbFn) - { - caller->getIoService().post( - std::bind(originalCbFn, success)); - } - } - private: MarionetteComponent &parent; @@ -62,11 +52,11 @@ public: { std::cerr << __func__ << ": Failed to initialize globalMind" << std::endl; - context->callOriginalCbFn(false); + context->callOriginalCb(false); return; } - context->callOriginalCbFn(success); + context->callOriginalCb(success); } void finalizeReq1_posted( @@ -95,11 +85,11 @@ public: { std::cerr << __func__ << ": globalMind finalization failed" << std::endl; - context->callOriginalCbFn(false); + context->callOriginalCb(false); return; } - context->callOriginalCbFn(success); + context->callOriginalCb(success); } }; diff --git a/smocore/mind.cpp b/smocore/mind.cpp index aacec77..118d5eb 100644 --- a/smocore/mind.cpp +++ b/smocore/mind.cpp @@ -84,24 +84,17 @@ Mind::getMindThreads() const } class Mind::MindLifetimeMgmtOp -: public TargetedAsynchronousContinuation +: public PostedAsynchronousContinuation { public: MindLifetimeMgmtOp( Mind &parent, const std::shared_ptr &caller, mindLifetimeMgmtOpCbFn callback) - : TargetedAsynchronousContinuation( + : PostedAsynchronousContinuation( caller, callback), parent(parent) {} - void callOriginalCbFn(bool success) - { - if (originalCbFn) { - caller->getIoService().post(std::bind(originalCbFn, success)); - } - } - public: Mind &parent; @@ -147,7 +140,7 @@ public: ) { std::cout << "Mrntt: Body component initialized." << "\n"; - callOriginalCbFn(success); + callOriginalCb(success); } void finalizeReq1_posted( @@ -198,7 +191,7 @@ public: ) { std::cout << "Mrntt: All mind threads exited." << "\n"; - callOriginalCbFn(true); + callOriginalCb(true); } }; @@ -260,24 +253,17 @@ void Mind::distributeAndPinThreadsAcrossCpus() } class Mind::MindThreadLifetimeMgmtOp -: public AsynchronousContinuation +: public NonPostedAsynchronousContinuation { public: MindThreadLifetimeMgmtOp( Mind &parent,unsigned int nThreads, mindThreadLifetimeMgmtOpCbFn callback) - : AsynchronousContinuation(callback), + : NonPostedAsynchronousContinuation(callback), loop(nThreads), parent(parent) {} - void callOriginalCbFn(void) - { - if (originalCbFn) { - originalCbFn(); - } - } - public: AsynchronousLoop loop; Mind &parent; @@ -293,7 +279,7 @@ public: } parent.threadsHaveBeenJolted = true; - callOriginalCbFn(); + callOriginalCb(); } void executeGenericOpOnAllMindThreadsReq1( @@ -305,7 +291,7 @@ public: return; } - callOriginalCbFn(); + callOriginalCb(); } void exitAllMindThreadsReq1( @@ -321,7 +307,7 @@ public: thread->thread.join(); } - callOriginalCbFn(); + callOriginalCb(); } }; diff --git a/smocore/senseApis/senseApiManager.cpp b/smocore/senseApis/senseApiManager.cpp index a8c632a..52893d5 100644 --- a/smocore/senseApis/senseApiManager.cpp +++ b/smocore/senseApis/senseApiManager.cpp @@ -262,30 +262,18 @@ void SenseApiManager::finalizeAllSenseApiLibs(void) } class SenseApiManager::AttachSenseDeviceReq -: public TargetedAsynchronousContinuation +: public PostedAsynchronousContinuation { public: AttachSenseDeviceReq( const std::shared_ptr& spec, const std::shared_ptr &caller, attachSenseDeviceReqCbFn cb) - : TargetedAsynchronousContinuation( + : PostedAsynchronousContinuation( caller, cb), spec(spec) {} - void callOriginalCb( - bool success, std::shared_ptr deviceSpec - ) - { - if (originalCbFn) - { - caller->getIoService().post( - std::bind( - originalCbFn, success, deviceSpec)); - } - } - public: void attachSenseDeviceReq1_posted( [[maybe_unused]] std::shared_ptr context @@ -448,7 +436,7 @@ void SenseApiManager::detachSenseDeviceReq( } class SenseApiManager::AttachAllSenseDevicesFromSpecsReq -: public TargetedAsynchronousContinuation< +: public PostedAsynchronousContinuation< attachAllSenseDevicesFromSpecsReqCbFn> { public: @@ -456,21 +444,11 @@ public: const unsigned int totalNSpecs, const std::shared_ptr& caller, attachAllSenseDevicesFromSpecsReqCbFn cb) - : TargetedAsynchronousContinuation( + : PostedAsynchronousContinuation( caller, cb), loop(totalNSpecs) {} - void callOriginalCallback() - { - if (originalCbFn) - { - caller->getIoService().post( - std::bind( - originalCbFn, loop)); - } - } - public: void attachAllSenseDevicesFromSpecsReq1_posted( [[maybe_unused]] std::shared_ptr context @@ -513,7 +491,7 @@ public: << context->loop.nFailed.load() << " devices failed\n"; } - context->callOriginalCallback(); + context->callOriginalCb(loop); } public: @@ -588,7 +566,7 @@ public: << context->loop.nFailed.load() << " devices failed\n"; } - context->callOriginalCallback(); + context->callOriginalCb(loop); } };