From e08dc0678b9f55c8c83ab9cd74c3822d90acfffd Mon Sep 17 00:00:00 2001 From: Hayodea Hakol Date: Wed, 10 Sep 2025 18:12:08 -0400 Subject: [PATCH] Make [at|de]tachAllSenseDevices[FromSpecs] and initializeSalmanoff async This is the culmination of a lot of changes over the last week. We're making SMO basically fully async in many areas, and then preparing to implement the spinqueueing mechanism for locking. --- smocore/include/salmanoff.h | 7 +- smocore/include/senseApis/senseApiManager.h | 16 +- smocore/marionette/marionette.cpp | 41 +++- smocore/marionette/salmanoff.cpp | 93 +++++++-- smocore/senseApis/senseApiManager.cpp | 199 +++++++++++--------- 5 files changed, 236 insertions(+), 120 deletions(-) diff --git a/smocore/include/salmanoff.h b/smocore/include/salmanoff.h index b3b1878..1143d2b 100644 --- a/smocore/include/salmanoff.h +++ b/smocore/include/salmanoff.h @@ -2,12 +2,15 @@ #define _SALMANOFF_H #include +#include #include namespace smo { -void initializeSalmanoff(std::shared_ptr& componentThread); -void shutdownSalmanoff(void); +typedef std::function initializeSalmanoffCbFn; +typedef initializeSalmanoffCbFn shutdownSalmanoffCbFn; +void initializeSalmanoff(initializeSalmanoffCbFn callback); +void shutdownSalmanoff(shutdownSalmanoffCbFn callback); } // namespace smo diff --git a/smocore/include/senseApis/senseApiManager.h b/smocore/include/senseApis/senseApiManager.h index b412589..3973eb8 100644 --- a/smocore/include/senseApis/senseApiManager.h +++ b/smocore/include/senseApis/senseApiManager.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -52,9 +53,16 @@ public: void detachSenseDeviceReq( const std::shared_ptr& spec, detachSenseDeviceReqCbFn cb); - void attachAllSenseDevicesFromSpecs(void); - void detachAllSenseDevices(void); - void detachAllSenseDevicesReq(void); + + typedef std::function + attachAllSenseDevicesFromSpecsReqCbFn; + typedef std::function + detachAllSenseDevicesReqCbFn; + + void attachAllSenseDevicesFromSpecsReq( + attachAllSenseDevicesFromSpecsReqCbFn cb); + void detachAllSenseDevicesReq( + detachAllSenseDevicesReqCbFn cb); std::string stringifyLibs() const; @@ -69,6 +77,8 @@ private: class AttachSenseDeviceReq; class DetachSenseDeviceReq; + class AttachAllSenseDevicesFromSpecsReq; + class DetachAllSenseDevicesReq; public: static std::optional searchForLibInSmoSearchPaths( diff --git a/smocore/marionette/marionette.cpp b/smocore/marionette/marionette.cpp index 54cd1a5..003be5d 100644 --- a/smocore/marionette/marionette.cpp +++ b/smocore/marionette/marionette.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -46,7 +47,7 @@ void ComponentThread::marionetteMain(ComponentThread& self) self.initializeTls(); mrntt::exitCode = EXIT_SUCCESS; static boost::asio::signal_set signals(self.getIoService(), SIGINT); - bool callFinalizeReq = false; + bool callFinalizeReq = false, callShutdownSalmanoff = false; try { // Register SIGINT (Ctrl+C) and SIGSEGV handlers @@ -88,9 +89,15 @@ void ComponentThread::marionetteMain(ComponentThread& self) self.getIoService().post([]() { // Initialize Salmanoff first - initializeSalmanoff(mrntt::mrntt); - // Then initialize the global Mind object - globalMind->initialize(); + initializeSalmanoff([](bool success) + { + if (success) { + // Then initialize the global Mind object + globalMind->initialize(); + } else { + std::cerr << "Failed to initialize Salmanoff" << std::endl; + } + }); }); std::cout << __func__ << ": Entering event loop" << "\n"; @@ -154,20 +161,38 @@ void ComponentThread::marionetteMain(ComponentThread& self) } *out << outUsageMsg << e.what() << std::endl; - callFinalizeReq = true; + callShutdownSalmanoff = callFinalizeReq = true; } catch (const std::exception& e) { std::cerr << __func__ << ": Exception occurred: " << e.what() << std::endl; mrntt::exitCode = EXIT_FAILURE; - callFinalizeReq = true; + callShutdownSalmanoff = callFinalizeReq = true; } catch (...) { std::cerr << __func__ << ": Unknown exception occurred" << std::endl; mrntt::exitCode = EXIT_FAILURE; - callFinalizeReq = true; + callShutdownSalmanoff = callFinalizeReq = true; + } + + if (callShutdownSalmanoff) + { + shutdownSalmanoff( + [](bool success) + { + if (success) { + std::cout << "Salmanoff shutdown completed successfully" + << std::endl; + } else { + std::cerr << "Salmanoff shutdown failed" << std::endl; + } + mrntt::mrntt->getIoService().stop(); + } + ); + self.getIoService().reset(); + self.getIoService().run(); } if (callFinalizeReq) @@ -178,8 +203,6 @@ void ComponentThread::marionetteMain(ComponentThread& self) self.getIoService().reset(); self.getIoService().run(); } - - shutdownSalmanoff(); } } // namespace smo diff --git a/smocore/marionette/salmanoff.cpp b/smocore/marionette/salmanoff.cpp index b47f385..c3da11a 100644 --- a/smocore/marionette/salmanoff.cpp +++ b/smocore/marionette/salmanoff.cpp @@ -1,37 +1,106 @@ #include #include #include +#include +#include + namespace smo { -void initializeSalmanoff(std::shared_ptr& componentThread) +class InitializeSalmanoffReq +: public AsynchronousContinuation +{ +public: + InitializeSalmanoffReq(initializeSalmanoffCbFn cb) + : AsynchronousContinuation(std::move(cb)) + {} + + // Callback methods for the initialization sequence + void initializeSalmanoffReq1( + std::shared_ptr context, + smo::AsynchronousLoop &results + ) + { + std::cout << __func__ << ": Done. " << results.nSucceeded + << " succeeded, " << results.nFailed << " failed." << std::endl; + context->originalCbFn(true); + } +}; + +void initializeSalmanoff( + initializeSalmanoffCbFn callback) { std::cout << __func__ << ": Entered." << std::endl; + std::shared_ptr mrntt = ComponentThread::getMrntt(); + auto request = std::make_shared( + std::move(callback)); + device::DeviceManager::getInstance().collateAllDapSpecs(); device::DeviceManager::getInstance().parseAllDapSpecs(); std::cout << device::DeviceManager::stringifyDeviceSpecs() << std::endl; + + /** EXPLANATION: + * The ComponentThread instance we pass in here is the one that will be used + * by Senseapi libs to perform device-independent background operations. + * For example, liblivoxProto1's BroadcastListener will use this thread to + * listen for UDP broadcast dgrams from Livox devices. + * + * Right now we use Marionette, but there's a strong argument for using + * Body instead since it's meant to handle device-management operations. + */ sense_api::SenseApiManager::getInstance().loadAllSenseApiLibsFromOptions( - componentThread); + mrntt); + std::cout << sense_api::SenseApiManager::getInstance().stringifyLibs() << std::endl; -std::cerr << "About to initializeAllSenseApiLibs" << std::endl; - sense_api::SenseApiManager::getInstance().initializeAllSenseApiLibs(); -std::cerr << "About to attachAllSenseDevicesFromSpecs" << std::endl; - sense_api::SenseApiManager::getInstance().attachAllSenseDevicesFromSpecs(); -std::cerr << "Done attachAllSenseDevicesFromSpecs" << std::endl; - std::cout << __func__ << ": Done." << std::endl; + if (OptionParser::getOptions().verbose) { + std::cout << __func__ << ": About to initializeAllSenseApiLibs" << '\n'; + } + sense_api::SenseApiManager::getInstance().initializeAllSenseApiLibs(); + + if (OptionParser::getOptions().verbose) { + std::cout << __func__ << ": About to attachAllSenseDevicesFromSpecs" << '\n'; + } + sense_api::SenseApiManager::getInstance().attachAllSenseDevicesFromSpecsReq( + std::bind( + &InitializeSalmanoffReq::initializeSalmanoffReq1, + request.get(), request, + std::placeholders::_1)); } -void shutdownSalmanoff(void) +class ShutdownSalmanoffReq +: public InitializeSalmanoffReq +{ +public: + using InitializeSalmanoffReq::InitializeSalmanoffReq; + + // Callback methods for the shutdown sequence + void shutdownSalmanoffReq1( + std::shared_ptr context, + smo::AsynchronousLoop &results + ) + { + sense_api::SenseApiManager::getInstance().finalizeAllSenseApiLibs(); + std::cout << __func__ << ": Done. " << results.nSucceeded + << " succeeded, " << results.nFailed << " failed." << std::endl; + context->originalCbFn(true); + } +}; + +void shutdownSalmanoff(shutdownSalmanoffCbFn callback) { std::cout << __func__ << ": Entered." << std::endl; - sense_api::SenseApiManager::getInstance().detachAllSenseDevicesReq(); - sense_api::SenseApiManager::getInstance().finalizeAllSenseApiLibs(); + // Create the shutdown request object to hold state and callbacks + auto request = std::make_shared(std::move(callback)); - std::cout << __func__ << ": Done." << std::endl; + sense_api::SenseApiManager::getInstance().detachAllSenseDevicesReq( + std::bind( + &ShutdownSalmanoffReq::shutdownSalmanoffReq1, + request.get(), request, + std::placeholders::_1)); } } // namespace smo diff --git a/smocore/senseApis/senseApiManager.cpp b/smocore/senseApis/senseApiManager.cpp index 68e14c8..3c91208 100644 --- a/smocore/senseApis/senseApiManager.cpp +++ b/smocore/senseApis/senseApiManager.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -257,8 +258,6 @@ void SenseApiManager::finalizeAllSenseApiLibs(void) } } - - void SenseApiManager::attachSenseDeviceReq( const std::shared_ptr& spec, attachSenseDeviceReqCbFn cb @@ -313,118 +312,130 @@ void SenseApiManager::detachSenseDeviceReq( lib.senseApiDesc.sal_mgmt_libOps.detachDeviceReq(spec, cb); } -void SenseApiManager::attachAllSenseDevicesFromSpecs(void) +class SenseApiManager::AttachAllSenseDevicesFromSpecsReq +: public AsynchronousContinuation { - auto self = ComponentThread::getSelf(); - AsynchronousBridge bridge(self->getIoService()); - AsynchronousLoop loop(device::DeviceManager::deviceAttachmentSpecs.size()); +public: + AttachAllSenseDevicesFromSpecsReq( + const unsigned int totalNSpecs, + attachAllSenseDevicesFromSpecsReqCbFn cb) + : AsynchronousContinuation(std::move(cb)), + loop(totalNSpecs) + {} + + // Callback methods for the attachment sequence + void attachAllSenseDevicesFromSpecsReq1( + std::shared_ptr context, + bool success, std::shared_ptr spec + ) + { + if (!success) + { + std::cerr << __func__ << ": Failed to attach device: " + << spec->deviceIdentifier << "\n"; + } + + if (!context->loop.incrementSuccessOrFailureAndTestForCompletionDueTo( + success)) + { + return; + } + + if (OptionParser::getOptions().verbose) + { + std::cout << __func__ << ": " << context->loop.nSucceeded.load() + << " devices attached, " + << context->loop.nFailed.load() << " devices failed\n"; + } + + context->originalCbFn(context->loop); + } + +public: + AsynchronousLoop loop; +}; + +void SenseApiManager::attachAllSenseDevicesFromSpecsReq( + attachAllSenseDevicesFromSpecsReqCbFn cb + ) +{ + // Create the attachment request object to hold state and callbacks + auto request = std::make_shared( + device::DeviceManager::deviceAttachmentSpecs.size(), std::move(cb)); for (const auto& spec : device::DeviceManager::deviceAttachmentSpecs) { try { - attachSenseDeviceReq(spec, - [spec, &loop, &bridge](bool success) -> void - { - if (!success) - { - std::cerr << __func__ << ": Failed to attach device: " - << spec->deviceIdentifier << "\n"; - } - - if (!loop.incrementSuccessOrFailureAndTestForCompletionDueTo( - success)) - { - return; - } - - std::cout << __func__ << ": " << loop.nSucceeded.load() - << " devices attached, " - << loop.nFailed.load() << " devices failed\n"; - - bridge.setAsyncOperationComplete(); - }); + attachSenseDeviceReq( + spec, + std::bind( + &AttachAllSenseDevicesFromSpecsReq::attachAllSenseDevicesFromSpecsReq1, + request.get(), request, + std::placeholders::_1, std::placeholders::_2)); } catch (const std::exception& e) { std::cerr << __func__ << ": Exception: " << e.what() << "\n"; - if (loop.incrementSuccessOrFailureAndTestForCompletionDueTo(false)) - { bridge.setAsyncOperationComplete(); } + if (request->loop.incrementSuccessOrFailureAndTestForCompletionDueTo(false)) + { cb(request->loop); } } } - - /* Bridge the async op here. */ - bridge.waitForAsyncOperationCompleteOrIoServiceStopped(); - if (bridge.exitedBecauseIoServiceStopped()) - { - /* Return early because the io_service is stopped. */ - return; - } - - if (!loop.isComplete()) - { - throw std::runtime_error( - std::string(__func__) + ": Failed to get through all devices"); - } - - std::cout << __func__ << ": " << loop.nSucceeded.load() << "/" - << loop.nTotal << " devices attached, " - << loop.nFailed.load() << "/" << loop.nTotal - << " devices failed\n"; } -void SenseApiManager::detachAllSenseDevicesReq(void) +class SenseApiManager::DetachAllSenseDevicesReq +: public AttachAllSenseDevicesFromSpecsReq { - auto self = ComponentThread::getSelf(); - AsynchronousBridge bridge(self->getIoService()); - AsynchronousLoop loop(device::DeviceManager::deviceAttachmentSpecs.size()); +public: + using AttachAllSenseDevicesFromSpecsReq::AttachAllSenseDevicesFromSpecsReq; + + void detachAllSenseDevicesReq1( + std::shared_ptr context, + bool success, std::shared_ptr spec + ) + { + if (!success) + { + std::cerr << __func__ << ": Failed to detach device: " + << spec->deviceIdentifier << "\n"; + } + + if (!context->loop.incrementSuccessOrFailureAndTestForCompletionDueTo( + success)) + { + return; + } + + if (OptionParser::getOptions().verbose) + { + std::cout << __func__ << ": " << context->loop.nSucceeded.load() + << " devices detached, " + << context->loop.nFailed.load() << " devices failed\n"; + } + + context->originalCbFn(context->loop); + } +}; + +void SenseApiManager::detachAllSenseDevicesReq( + detachAllSenseDevicesReqCbFn cb + ) +{ + auto request = std::make_shared( + device::DeviceManager::deviceAttachmentSpecs.size(), std::move(cb)); for (const auto& spec : device::DeviceManager::deviceAttachmentSpecs) { try { - detachSenseDeviceReq(spec, - [spec, &loop, &bridge](bool success) -> void - { - if (!success) - { - std::cerr << __func__ << ": Failed to detach device: " - << spec->deviceIdentifier << "\n"; - } - - if (!loop.incrementSuccessOrFailureAndTestForCompletionDueTo( - success)) - { - return; - } - - std::cout << __func__ << ": " << loop.nSucceeded.load() - << " devices detached, " - << loop.nFailed.load() << " devices failed\n"; - - bridge.setAsyncOperationComplete(); - }); + detachSenseDeviceReq( + spec, + std::bind( + &DetachAllSenseDevicesReq::detachAllSenseDevicesReq1, + request.get(), request, + std::placeholders::_1, std::placeholders::_2)); } catch (const std::exception& e) { std::cerr << __func__ << ": Exception: " << e.what() << "\n"; - if (loop.incrementSuccessOrFailureAndTestForCompletionDueTo(false)) - { bridge.setAsyncOperationComplete(); } + if (request->loop.incrementSuccessOrFailureAndTestForCompletionDueTo(false)) + { cb(request->loop); } } } - - /* Bridge the async op here. */ - bridge.waitForAsyncOperationCompleteOrIoServiceStopped(); - if (bridge.exitedBecauseIoServiceStopped()) - { - /* Return early because the io_service is stopped. */ - return; - } - - if (!loop.isComplete()) - { - throw std::runtime_error( - std::string(__func__) + ": Failed to get through all devices"); - } - - std::cout << __func__ << ": " << loop.nSucceeded.load() << "/" - << loop.nTotal << " devices detached, " - << loop.nFailed.load() << "/" << loop.nTotal - << " devices failed\n"; } } // namespace sense_api