diff --git a/commonLibs/livoxProto1/core.cpp b/commonLibs/livoxProto1/core.cpp index c439623..9f09143 100644 --- a/commonLibs/livoxProto1/core.cpp +++ b/commonLibs/livoxProto1/core.cpp @@ -72,7 +72,8 @@ std::optional> DeviceManager::getDevice( // GetOrCreateDeviceReq nested class implementation class DeviceManager::GetOrCreateDeviceReq -: public smo::AsynchronousContinuation +: public smo::NonPostedAsynchronousContinuation< + livoxProto1_getOrCreateDeviceReqCbFn> { public: DeviceManager& deviceManager; @@ -84,14 +85,14 @@ public: DeviceManager& mgr, std::shared_ptr device, livoxProto1_getOrCreateDeviceReqCbFn cb) - : smo::AsynchronousContinuation< + : smo::NonPostedAsynchronousContinuation< livoxProto1_getOrCreateDeviceReqCbFn>(std::move(cb)), deviceManager(mgr), pendingDevice(device) {} // Public accessor for the original callback void callOriginalCallback(bool success, std::shared_ptr device) - { originalCbFn(success, device); } + { callOriginalCb(success, device); } void callOriginalCallbackWithFailure() { callOriginalCallback(false, nullptr); } @@ -176,7 +177,8 @@ void DeviceManager::getOrCreateDeviceReq( } class DeviceManager::DestroyDeviceReq -: public smo::AsynchronousContinuation +: public smo::NonPostedAsynchronousContinuation< + livoxProto1_destroyDeviceReqCbFn> { public: DeviceManager& deviceManager; @@ -187,14 +189,14 @@ public: DeviceManager& mgr, std::shared_ptr device, livoxProto1_destroyDeviceReqCbFn cb) - : smo::AsynchronousContinuation< + : smo::NonPostedAsynchronousContinuation< livoxProto1_destroyDeviceReqCbFn>(std::move(cb)), deviceManager(mgr), pendingDevice(device) {} // Public accessor for the original callback void callOriginalCallback(bool success) - { originalCbFn(success); } + { callOriginalCb(success); } void callOriginalCallbackWithFailure() { callOriginalCallback(false); } diff --git a/commonLibs/livoxProto1/device.cpp b/commonLibs/livoxProto1/device.cpp index cb49666..a69657c 100644 --- a/commonLibs/livoxProto1/device.cpp +++ b/commonLibs/livoxProto1/device.cpp @@ -120,7 +120,7 @@ Device::~Device() * This class manages the overall device connection process including handshake and heartbeat setup */ class Device::ConnectReq -: public smo::AsynchronousContinuation +: public smo::NonPostedAsynchronousContinuation { private: Device& device; @@ -128,7 +128,7 @@ private: public: ConnectReq(Device& dev, Device::connectReqCbFn cb) - : smo::AsynchronousContinuation( + : smo::NonPostedAsynchronousContinuation( std::move(cb)), device(dev) {} @@ -148,7 +148,7 @@ public: // Store the handshake FD for heartbeats context->device.heartbeatFd = fd; context->device.startHeartbeat(); - context->originalCbFn(true); + context->callOriginalCb(true); return; } @@ -177,7 +177,7 @@ public: // Store the handshake FD for heartbeats context->device.heartbeatFd = fd; context->device.startHeartbeat(); - context->originalCbFn(true); + context->callOriginalCb(true); return; } @@ -208,7 +208,7 @@ public: { if (error) { - context->originalCbFn(false); + context->callOriginalCb(false); return; } @@ -234,12 +234,12 @@ public: context->device.discoveredDevice.ipAddr = ipAddr; context->device.heartbeatFd = fd; context->device.startHeartbeat(); - context->originalCbFn(true); + context->callOriginalCb(true); return; } // All connection attempts failed - context->originalCbFn(false); + context->callOriginalCb(false); } }; @@ -261,7 +261,8 @@ void Device::connectReq(Device::connectReqCbFn callback) } class Device::ConnectToKnownDeviceReq -: public smo::AsynchronousContinuation +: public smo::NonPostedAsynchronousContinuation< + Device::connectToKnownDeviceReqCbFn> { public: Device& device; @@ -269,13 +270,13 @@ public: std::shared_ptr deviceInfo; ConnectToKnownDeviceReq(Device& dev, Device::connectToKnownDeviceReqCbFn cb) - : smo::AsynchronousContinuation( - std::move(cb)), device(dev) + : smo::NonPostedAsynchronousContinuation< + Device::connectToKnownDeviceReqCbFn>(std::move(cb)), device(dev) {} // Public accessor for the original callback void callOriginalCallback(bool success, const std::string& ipAddr, int fd) - { originalCbFn(success, ipAddr, fd); } + { callOriginalCb(success, ipAddr, fd); } // Wrapper for failure cases void callOriginalCallbackWithFailure() @@ -363,7 +364,7 @@ void Device::connectToKnownDeviceReq( } class Device::ConnectByDeviceIdentifierReq -: public smo::AsynchronousContinuation< +: public smo::NonPostedAsynchronousContinuation< Device::connectByDeviceIdentifierReqCbFn> { public: @@ -372,13 +373,14 @@ public: ConnectByDeviceIdentifierReq( Device& dev, Device::connectByDeviceIdentifierReqCbFn cb) - : smo::AsynchronousContinuation( - std::move(cb)), device(dev) + : smo::NonPostedAsynchronousContinuation< + Device::connectByDeviceIdentifierReqCbFn>( + std::move(cb)), device(dev) {} // Public accessor for the original callback void callOriginalCallback(bool success, const std::string& ipAddr, int fd) - { originalCbFn(success, ipAddr, fd); } + { callOriginalCb(success, ipAddr, fd); } // Wrapper for failure cases void callOriginalCallbackWithFailure() @@ -445,7 +447,8 @@ void Device::connectByDeviceIdentifierReq( } class Device::ExecuteHandshakeReq -: public smo::AsynchronousContinuation +: public smo::NonPostedAsynchronousContinuation< + Device::executeHandshakeReqCbFn> { public: friend void Device::executeHandshakeReq( @@ -482,7 +485,7 @@ public: ExecuteHandshakeReq( Device& dev, const std::string& deviceIP, Device::executeHandshakeReqCbFn cb) - : smo::AsynchronousContinuation( + : smo::NonPostedAsynchronousContinuation( std::move(cb)), device(dev), deviceIP(deviceIP), handshakeFdDesc(device.componentThread->getIoService()), @@ -497,7 +500,7 @@ public: // Public accessor for the original callback void callOriginalCallback(bool success, int fd) - { originalCbFn(success, fd); } + { callOriginalCb(success, fd); } void callOriginalCallbackWithFailure() { diff --git a/include/asynchronousContinuation.h b/include/asynchronousContinuation.h index 3259390..b147280 100644 --- a/include/asynchronousContinuation.h +++ b/include/asynchronousContinuation.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace smo { @@ -15,20 +16,6 @@ namespace smo { * * The template parameter OriginalCbFnT represents the signature of the original * callback that will be invoked when the async sequence completes. - * - * Usage: - * class MyAsyncReq - * : public AsynchronousContinuation> - * { - * public: - * MyAsyncReq(std::function originalCbFn) - * : AsynchronousContinuation(originalCbFn) - * {} - * - * // Segment methods take only the shared_ptr for lifetime management - * void myAsyncReq1(std::shared_ptr context); - * void myAsyncReq2(std::shared_ptr context); - * }; */ template class AsynchronousContinuation @@ -46,33 +33,101 @@ public: std::shared_ptr> lifetimePreservingConveyance); -protected: + /** + * @brief Call the original callback with perfect forwarding + * + * IMPORTANT: This method cannot be virtual because templates cannot be + * virtual in C++. Therefore, this method MUST be called from the + * most-derived class reference/pointer context. Never call this method + * through a polymorphic base class reference/pointer, as it will not + * dispatch to the correct derived class implementation. + * + * @param args Arguments to forward to the original callback + */ + template + void callOriginalCb(Args&&... args) + { + if (originalCbFn) { originalCbFn(std::forward(args)...); } + } + +public: OriginalCbFnT originalCbFn; }; -class ContinuationTarget +/** + * NonPostedAsynchronousContinuation - For continuations that don't post + * callbacks + * + * Note: We intentionally do not create a + * LockedNonPostedAsynchronousContinuation because the only way to implement + * non-posted locking would be via busy-spinning or sleeplocks. This would + * eliminate the throughput advantage from our Qspinning mechanism, which + * relies on re-posting to the io_service queue when locks are unavailable. + */ +template +class NonPostedAsynchronousContinuation +: public AsynchronousContinuation { public: - ContinuationTarget( - const std::shared_ptr &caller) - : caller(caller) + explicit NonPostedAsynchronousContinuation(OriginalCbFnT originalCbFn) + : AsynchronousContinuation(originalCbFn) {} public: - const std::shared_ptr caller; + using AsynchronousContinuation::callOriginalCb; }; template -class TargetedAsynchronousContinuation -: public AsynchronousContinuation, public ContinuationTarget +class PostedAsynchronousContinuation +: public AsynchronousContinuation { public: - TargetedAsynchronousContinuation( + PostedAsynchronousContinuation( const std::shared_ptr &caller, OriginalCbFnT originalCbFn) : AsynchronousContinuation(originalCbFn), - ContinuationTarget(caller) + caller(caller) {} + + template + void callOriginalCb(Args&&... args) + { + if (AsynchronousContinuation::originalCbFn) { + caller->getIoService().post( + std::bind( + AsynchronousContinuation::originalCbFn, + std::forward(args)...)); + } + } + +public: + std::shared_ptr caller; + using AsynchronousContinuation::callOriginalCb; +}; + +template +class SerializedAsynchronousContinuation +: public PostedAsynchronousContinuation, public LockSpec +{ +public: + SerializedAsynchronousContinuation( + const std::shared_ptr &caller, + OriginalCbFnT originalCbFn, + std::vector> requiredLocks = {}) + : PostedAsynchronousContinuation(caller, originalCbFn), + LockSpec(std::move(requiredLocks)) + {} + + template + void callOriginalCb(Args&&... args) + { + LockSpec::release(); + PostedAsynchronousContinuation::callOriginalCb( + std::forward(args)...); + } + +public: + using AsynchronousContinuation::callOriginalCb; }; } // namespace smo diff --git a/smocore/body/body.cpp b/smocore/body/body.cpp index 87d5899..57f0c12 100644 --- a/smocore/body/body.cpp +++ b/smocore/body/body.cpp @@ -16,27 +16,16 @@ Body::Body(Mind &parent, const std::shared_ptr &thread) } class Body::InitializeReq -: public AsynchronousContinuation, - public ContinuationTarget +: public PostedAsynchronousContinuation { public: InitializeReq( Mind &parent, const std::shared_ptr &caller, bodyLifetimeMgmtOpCbFn callback) - : AsynchronousContinuation(callback), - ContinuationTarget(caller), + : PostedAsynchronousContinuation(caller, callback), parent(parent) {} - void callOriginalCbFn(bool success) - { - if (originalCbFn) - { - caller->getIoService().post( - std::bind(originalCbFn, success)); - } - } - private: Mind &parent; @@ -104,7 +93,7 @@ public: << results.nSucceeded << " of " << results.nTotal << " sense devices." << "\n"; - callOriginalCbFn(results.nSucceeded > 0); + callOriginalCb(results.nSucceeded > 0); } }; @@ -148,7 +137,7 @@ public: std::cout << "Mrntt: About to unload all sense api libs." << "\n"; sense_api::SenseApiManager::getInstance().unloadAllSenseApiLibs(); - callOriginalCbFn(results.nSucceeded == results.nTotal); + callOriginalCb(results.nSucceeded == results.nTotal); } }; diff --git a/smocore/componentThread.cpp b/smocore/componentThread.cpp index aa059e2..5d1bde2 100644 --- a/smocore/componentThread.cpp +++ b/smocore/componentThread.cpp @@ -105,32 +105,18 @@ void MindThread::main(MindThread& self) } class MindThread::ThreadLifetimeMgmtOp -: public TargetedAsynchronousContinuation +: public PostedAsynchronousContinuation { public: ThreadLifetimeMgmtOp( const std::shared_ptr &caller, const std::shared_ptr &target, threadLifetimeMgmtOpCbFn callback) - : TargetedAsynchronousContinuation( + : PostedAsynchronousContinuation( caller, callback), target(target) {} - void callOriginalCbFn(void) - { - if (originalCbFn) - { - /** EXPLANATION: - * This is only permissible because originalCbFn doesn't take any - * further arguments. If we had to bind argument to it before - * posting it, we'd have to std::bind them and then post the - * resulting function object. - */ - caller->getIoService().post(originalCbFn); - } - } - public: const std::shared_ptr target; @@ -144,7 +130,7 @@ public: << "\n"; target->io_service.stop(); - callOriginalCbFn(); + callOriginalCb(); } void startThreadReq1_posted( @@ -158,7 +144,7 @@ public: // Execute private setup sequence here // This is where each thread would implement its specific initialization - callOriginalCbFn(); + callOriginalCb(); } void exitThreadReq1_mainQueue_posted( @@ -170,7 +156,7 @@ public: target->cleanup(); target->io_service.stop(); - callOriginalCbFn(); + callOriginalCb(); } void exitThreadReq1_pauseQueue_posted( @@ -183,7 +169,7 @@ public: target->cleanup(); target->pause_io_service.stop(); target->io_service.stop(); - callOriginalCbFn(); + callOriginalCb(); } void pauseThreadReq1_posted( @@ -197,7 +183,7 @@ public: * our next operation is going to block the thread, so it won't * have a chance to invoke the callback until it's unblocked. */ - callOriginalCbFn(); + callOriginalCb(); target->pause_io_service.reset(); target->pause_io_service.run(); } @@ -210,7 +196,7 @@ public: "resumeThread." << "\n"; target->pause_io_service.stop(); - callOriginalCbFn(); + callOriginalCb(); } }; diff --git a/smocore/deviceManager/deviceManager.cpp b/smocore/deviceManager/deviceManager.cpp index f3687e7..5afe19a 100644 --- a/smocore/deviceManager/deviceManager.cpp +++ b/smocore/deviceManager/deviceManager.cpp @@ -39,31 +39,18 @@ const std::string DeviceManager::stringifyDeviceSpecs(void) } class DeviceManager::NewDeviceAttachmentSpecInd -: public TargetedAsynchronousContinuation +: public PostedAsynchronousContinuation { public: NewDeviceAttachmentSpecInd( std::shared_ptr s, const std::shared_ptr &caller, newDeviceAttachmentSpecIndCbFn cb) - : TargetedAsynchronousContinuation( + : PostedAsynchronousContinuation( caller, cb), spec(s) {} - void callOriginalCb( - bool success, - std::shared_ptr device, - std::shared_ptr deviceSpec) - { - if (originalCbFn) - { - caller->getIoService().post( - std::bind( - originalCbFn, success, device, deviceSpec)); - } - } - public: std::shared_ptr spec; diff --git a/smocore/marionette/lifetime.cpp b/smocore/marionette/lifetime.cpp index f8df6d5..faac941 100644 --- a/smocore/marionette/lifetime.cpp +++ b/smocore/marionette/lifetime.cpp @@ -10,27 +10,17 @@ namespace smo { namespace mrntt { class MarionetteComponent::MrnttLifetimeMgmtOp -: public AsynchronousContinuation, - public ContinuationTarget +: public PostedAsynchronousContinuation { public: MrnttLifetimeMgmtOp( MarionetteComponent &parent, const std::shared_ptr &caller, mrnttLifetimeMgmtOpCbFn callback) - : AsynchronousContinuation(callback), - ContinuationTarget(caller), - parent(parent) + : PostedAsynchronousContinuation( + caller, callback), + parent(parent) {} - void callOriginalCbFn(bool success) - { - if (originalCbFn) - { - caller->getIoService().post( - std::bind(originalCbFn, success)); - } - } - private: MarionetteComponent &parent; @@ -62,11 +52,11 @@ public: { std::cerr << __func__ << ": Failed to initialize globalMind" << std::endl; - context->callOriginalCbFn(false); + context->callOriginalCb(false); return; } - context->callOriginalCbFn(success); + context->callOriginalCb(success); } void finalizeReq1_posted( @@ -95,11 +85,11 @@ public: { std::cerr << __func__ << ": globalMind finalization failed" << std::endl; - context->callOriginalCbFn(false); + context->callOriginalCb(false); return; } - context->callOriginalCbFn(success); + context->callOriginalCb(success); } }; diff --git a/smocore/mind.cpp b/smocore/mind.cpp index aacec77..118d5eb 100644 --- a/smocore/mind.cpp +++ b/smocore/mind.cpp @@ -84,24 +84,17 @@ Mind::getMindThreads() const } class Mind::MindLifetimeMgmtOp -: public TargetedAsynchronousContinuation +: public PostedAsynchronousContinuation { public: MindLifetimeMgmtOp( Mind &parent, const std::shared_ptr &caller, mindLifetimeMgmtOpCbFn callback) - : TargetedAsynchronousContinuation( + : PostedAsynchronousContinuation( caller, callback), parent(parent) {} - void callOriginalCbFn(bool success) - { - if (originalCbFn) { - caller->getIoService().post(std::bind(originalCbFn, success)); - } - } - public: Mind &parent; @@ -147,7 +140,7 @@ public: ) { std::cout << "Mrntt: Body component initialized." << "\n"; - callOriginalCbFn(success); + callOriginalCb(success); } void finalizeReq1_posted( @@ -198,7 +191,7 @@ public: ) { std::cout << "Mrntt: All mind threads exited." << "\n"; - callOriginalCbFn(true); + callOriginalCb(true); } }; @@ -260,24 +253,17 @@ void Mind::distributeAndPinThreadsAcrossCpus() } class Mind::MindThreadLifetimeMgmtOp -: public AsynchronousContinuation +: public NonPostedAsynchronousContinuation { public: MindThreadLifetimeMgmtOp( Mind &parent,unsigned int nThreads, mindThreadLifetimeMgmtOpCbFn callback) - : AsynchronousContinuation(callback), + : NonPostedAsynchronousContinuation(callback), loop(nThreads), parent(parent) {} - void callOriginalCbFn(void) - { - if (originalCbFn) { - originalCbFn(); - } - } - public: AsynchronousLoop loop; Mind &parent; @@ -293,7 +279,7 @@ public: } parent.threadsHaveBeenJolted = true; - callOriginalCbFn(); + callOriginalCb(); } void executeGenericOpOnAllMindThreadsReq1( @@ -305,7 +291,7 @@ public: return; } - callOriginalCbFn(); + callOriginalCb(); } void exitAllMindThreadsReq1( @@ -321,7 +307,7 @@ public: thread->thread.join(); } - callOriginalCbFn(); + callOriginalCb(); } }; diff --git a/smocore/senseApis/senseApiManager.cpp b/smocore/senseApis/senseApiManager.cpp index a8c632a..52893d5 100644 --- a/smocore/senseApis/senseApiManager.cpp +++ b/smocore/senseApis/senseApiManager.cpp @@ -262,30 +262,18 @@ void SenseApiManager::finalizeAllSenseApiLibs(void) } class SenseApiManager::AttachSenseDeviceReq -: public TargetedAsynchronousContinuation +: public PostedAsynchronousContinuation { public: AttachSenseDeviceReq( const std::shared_ptr& spec, const std::shared_ptr &caller, attachSenseDeviceReqCbFn cb) - : TargetedAsynchronousContinuation( + : PostedAsynchronousContinuation( caller, cb), spec(spec) {} - void callOriginalCb( - bool success, std::shared_ptr deviceSpec - ) - { - if (originalCbFn) - { - caller->getIoService().post( - std::bind( - originalCbFn, success, deviceSpec)); - } - } - public: void attachSenseDeviceReq1_posted( [[maybe_unused]] std::shared_ptr context @@ -448,7 +436,7 @@ void SenseApiManager::detachSenseDeviceReq( } class SenseApiManager::AttachAllSenseDevicesFromSpecsReq -: public TargetedAsynchronousContinuation< +: public PostedAsynchronousContinuation< attachAllSenseDevicesFromSpecsReqCbFn> { public: @@ -456,21 +444,11 @@ public: const unsigned int totalNSpecs, const std::shared_ptr& caller, attachAllSenseDevicesFromSpecsReqCbFn cb) - : TargetedAsynchronousContinuation( + : PostedAsynchronousContinuation( caller, cb), loop(totalNSpecs) {} - void callOriginalCallback() - { - if (originalCbFn) - { - caller->getIoService().post( - std::bind( - originalCbFn, loop)); - } - } - public: void attachAllSenseDevicesFromSpecsReq1_posted( [[maybe_unused]] std::shared_ptr context @@ -513,7 +491,7 @@ public: << context->loop.nFailed.load() << " devices failed\n"; } - context->callOriginalCallback(); + context->callOriginalCb(loop); } public: @@ -588,7 +566,7 @@ public: << context->loop.nFailed.load() << " devices failed\n"; } - context->callOriginalCallback(); + context->callOriginalCb(loop); } };