From 25a9721f925ea4fd8e882f26ded0b2d0dfad43d4 Mon Sep 17 00:00:00 2001 From: Hayodea Hakol Date: Fri, 12 Sep 2025 16:09:26 -0400 Subject: [PATCH] Mind: Implement initialize/finalizeBodyReq() We've done a lot of general work on the init sequencing. --- smocore/componentThread.cpp | 1 + smocore/deviceManager/deviceManager.cpp | 14 +- smocore/include/deviceManager/deviceManager.h | 5 + smocore/include/mind.h | 4 + smocore/include/salmanoff.h | 6 +- smocore/include/senseApis/senseApiManager.h | 9 +- smocore/marionette/marionette.cpp | 60 ++--- smocore/marionette/salmanoff.cpp | 90 +------ smocore/mind.cpp | 244 ++++++++++++++++-- smocore/senseApis/senseApiManager.cpp | 24 +- 10 files changed, 294 insertions(+), 163 deletions(-) diff --git a/smocore/componentThread.cpp b/smocore/componentThread.cpp index 0be353e..aaab457 100644 --- a/smocore/componentThread.cpp +++ b/smocore/componentThread.cpp @@ -361,6 +361,7 @@ public: [[maybe_unused]] std::shared_ptr context ) { + std::cout << "Mrntt: About to exit marionette loop." << "\n"; /** FIXME: * When we eventually support multiple minds, we should remove this * since it causes marionette to exit, even if there are other minds diff --git a/smocore/deviceManager/deviceManager.cpp b/smocore/deviceManager/deviceManager.cpp index e22bbd6..5d5c4e2 100644 --- a/smocore/deviceManager/deviceManager.cpp +++ b/smocore/deviceManager/deviceManager.cpp @@ -37,6 +37,14 @@ struct DeviceAttachmentContinuation { > cb) : spec(s), callback(cb) {} + + void callOriginalCallback( + bool success, + std::shared_ptr device, + std::shared_ptr deviceSpec) + { + callback(success, device, deviceSpec); + } }; const std::string DeviceManager::stringifyDeviceSpecs(void) @@ -71,7 +79,7 @@ void DeviceManager::newDeviceAttachmentSpecInd( { if (!(*existingSpec == *spec)) { continue; } // Already exists, callback with error - callback(false, nullptr, nullptr); + continuation->callOriginalCallback(false, nullptr, nullptr); return; } @@ -104,10 +112,10 @@ void DeviceManager::newDeviceAttachmentSpecInd( deviceAttachmentSpecs.push_back(spec); // Callback with success - callback(true, device, spec); + continuation->callOriginalCallback(true, device, spec); } catch (const std::exception& e) { // Attach failed, callback with error - callback(false, nullptr, nullptr); + continuation->callOriginalCallback(false, nullptr, nullptr); } } #endif diff --git a/smocore/include/deviceManager/deviceManager.h b/smocore/include/deviceManager/deviceManager.h index 4c33e6b..ac0a550 100644 --- a/smocore/include/deviceManager/deviceManager.h +++ b/smocore/include/deviceManager/deviceManager.h @@ -23,6 +23,11 @@ public: return instance; } + void initialize(void) + {}; + void finalize(void) + {}; + std::string readDapSpecFile(const std::string& filename); void collateAllDapSpecs(void); void parseAllDapSpecs(void); diff --git a/smocore/include/mind.h b/smocore/include/mind.h index cc56e6a..12ebccf 100644 --- a/smocore/include/mind.h +++ b/smocore/include/mind.h @@ -23,6 +23,8 @@ public: typedef std::function mindLifetimeMgmtOpCbFn; void initializeReq(mindLifetimeMgmtOpCbFn callback); void finalizeReq(mindLifetimeMgmtOpCbFn callback); + void initializeBodyReq(mindLifetimeMgmtOpCbFn callback); + void finalizeBodyReq(mindLifetimeMgmtOpCbFn callback); // ComponentThread access methods std::shared_ptr getComponentThread( @@ -77,6 +79,8 @@ private: class MindLifetimeMgmtOp; class MindThreadLifetimeMgmtOp; + class InitializeBodyReq; + class FinalizeBodyReq; }; // Global Mind instance will be defined in marionette.cpp diff --git a/smocore/include/salmanoff.h b/smocore/include/salmanoff.h index 1143d2b..6259850 100644 --- a/smocore/include/salmanoff.h +++ b/smocore/include/salmanoff.h @@ -7,10 +7,8 @@ namespace smo { -typedef std::function initializeSalmanoffCbFn; -typedef initializeSalmanoffCbFn shutdownSalmanoffCbFn; -void initializeSalmanoff(initializeSalmanoffCbFn callback); -void shutdownSalmanoff(shutdownSalmanoffCbFn callback); +void initializeSalmanoff(void); +void shutdownSalmanoff(void); } // namespace smo diff --git a/smocore/include/senseApis/senseApiManager.h b/smocore/include/senseApis/senseApiManager.h index 3973eb8..78a621c 100644 --- a/smocore/include/senseApis/senseApiManager.h +++ b/smocore/include/senseApis/senseApiManager.h @@ -24,9 +24,14 @@ public: return instance; } + void initialize(void) + {}; + void finalize(void) + {}; + SenseApiLib& loadSenseApiLib( const std::string& libraryPath, - std::shared_ptr& componentThread); + const std::shared_ptr& componentThread); std::optional> getSenseApiLib( const std::string& libraryPath); @@ -38,7 +43,7 @@ public: void finalizeSenseApiLib(SenseApiLib& lib); void loadAllSenseApiLibsFromOptions( - std::shared_ptr& componentThread); + const std::shared_ptr& componentThread); void unloadAllSenseApiLibs(void); void initializeAllSenseApiLibs(void); diff --git a/smocore/marionette/marionette.cpp b/smocore/marionette/marionette.cpp index 0bfa24b..a0e7fa6 100644 --- a/smocore/marionette/marionette.cpp +++ b/smocore/marionette/marionette.cpp @@ -47,7 +47,7 @@ void ComponentThread::marionetteMain(ComponentThread& self) self.initializeTls(); mrntt::exitCode = EXIT_SUCCESS; static boost::asio::signal_set signals(self.getIoService(), SIGINT); - bool callFinalizeReq = false, callShutdownSalmanoffReq = false; + bool callFinalizeReq = false, callShutdownSalmanoff = false; try { // Register SIGINT (Ctrl+C) and SIGSEGV handlers @@ -88,6 +88,9 @@ void ComponentThread::marionetteMain(ComponentThread& self) throw JustPrintUsageNoError(options); } + initializeSalmanoff(); + callShutdownSalmanoff = true; + self.getIoService().post([]() { /** EXPLANATION: @@ -109,22 +112,17 @@ void ComponentThread::marionetteMain(ComponentThread& self) * easier to implement. */ globalMind->initializeReq( - [](bool success) + [](bool success) + { + if (!success) { - if (success) { - initializeSalmanoff([](bool success) { - if (!success) { - std::cerr << "Failed to initialize " - "Salmanoff" << '\n'; - } - }); - } - else { - std::cerr << "Failed to initialize Mind object " - "(threads)" << '\n'; - } + std::cerr << "Failed to initialize Mind object " + "(threads)" << '\n'; } - ); + + std::cout << "Mrntt: Mind object (threads) initialized." + << '\n'; + }); }); std::cout << __func__ << ": Entering event loop" << "\n"; @@ -188,43 +186,26 @@ void ComponentThread::marionetteMain(ComponentThread& self) } *out << outUsageMsg << e.what() << std::endl; - callShutdownSalmanoffReq = callFinalizeReq = true; + callFinalizeReq = true; } catch (const std::exception& e) { std::cerr << __func__ << ": Exception occurred: " << e.what() << std::endl; mrntt::exitCode = EXIT_FAILURE; - callShutdownSalmanoffReq = callFinalizeReq = true; + callShutdownSalmanoff = callFinalizeReq = true; } catch (...) { std::cerr << __func__ << ": Unknown exception occurred" << std::endl; mrntt::exitCode = EXIT_FAILURE; - callShutdownSalmanoffReq = callFinalizeReq = true; - } - - if (callShutdownSalmanoffReq) - { - shutdownSalmanoff( - [](bool success) - { - if (success) { - std::cout << "Salmanoff shutdown completed successfully" - << std::endl; - } else { - std::cerr << "Salmanoff shutdown failed" << std::endl; - } - mrntt::mrntt->getIoService().stop(); - } - ); - self.getIoService().reset(); - self.getIoService().run(); + callShutdownSalmanoff = callFinalizeReq = true; } if (callFinalizeReq) { - globalMind->finalizeReq([](bool success) { + globalMind->finalizeReq([](bool success) + { if (!success) { std::cerr << "Failed to finalize Mind object (threads)" << '\n'; } @@ -233,6 +214,11 @@ void ComponentThread::marionetteMain(ComponentThread& self) self.getIoService().reset(); self.getIoService().run(); } + + if (callShutdownSalmanoff) { + shutdownSalmanoff(); + } + } } // namespace smo diff --git a/smocore/marionette/salmanoff.cpp b/smocore/marionette/salmanoff.cpp index c3da11a..14da021 100644 --- a/smocore/marionette/salmanoff.cpp +++ b/smocore/marionette/salmanoff.cpp @@ -7,100 +7,22 @@ namespace smo { -class InitializeSalmanoffReq -: public AsynchronousContinuation -{ -public: - InitializeSalmanoffReq(initializeSalmanoffCbFn cb) - : AsynchronousContinuation(std::move(cb)) - {} - - // Callback methods for the initialization sequence - void initializeSalmanoffReq1( - std::shared_ptr context, - smo::AsynchronousLoop &results - ) - { - std::cout << __func__ << ": Done. " << results.nSucceeded - << " succeeded, " << results.nFailed << " failed." << std::endl; - context->originalCbFn(true); - } -}; - -void initializeSalmanoff( - initializeSalmanoffCbFn callback) +void initializeSalmanoff(void) { std::cout << __func__ << ": Entered." << std::endl; - std::shared_ptr mrntt = ComponentThread::getMrntt(); - auto request = std::make_shared( - std::move(callback)); - + sense_api::SenseApiManager::getInstance().initialize(); + device::DeviceManager::getInstance().initialize(); device::DeviceManager::getInstance().collateAllDapSpecs(); device::DeviceManager::getInstance().parseAllDapSpecs(); std::cout << device::DeviceManager::stringifyDeviceSpecs() << std::endl; - - /** EXPLANATION: - * The ComponentThread instance we pass in here is the one that will be used - * by Senseapi libs to perform device-independent background operations. - * For example, liblivoxProto1's BroadcastListener will use this thread to - * listen for UDP broadcast dgrams from Livox devices. - * - * Right now we use Marionette, but there's a strong argument for using - * Body instead since it's meant to handle device-management operations. - */ - sense_api::SenseApiManager::getInstance().loadAllSenseApiLibsFromOptions( - mrntt); - - std::cout << sense_api::SenseApiManager::getInstance().stringifyLibs() - << std::endl; - - if (OptionParser::getOptions().verbose) { - std::cout << __func__ << ": About to initializeAllSenseApiLibs" << '\n'; - } - sense_api::SenseApiManager::getInstance().initializeAllSenseApiLibs(); - - if (OptionParser::getOptions().verbose) { - std::cout << __func__ << ": About to attachAllSenseDevicesFromSpecs" << '\n'; - } - sense_api::SenseApiManager::getInstance().attachAllSenseDevicesFromSpecsReq( - std::bind( - &InitializeSalmanoffReq::initializeSalmanoffReq1, - request.get(), request, - std::placeholders::_1)); } -class ShutdownSalmanoffReq -: public InitializeSalmanoffReq -{ -public: - using InitializeSalmanoffReq::InitializeSalmanoffReq; - - // Callback methods for the shutdown sequence - void shutdownSalmanoffReq1( - std::shared_ptr context, - smo::AsynchronousLoop &results - ) - { - sense_api::SenseApiManager::getInstance().finalizeAllSenseApiLibs(); - std::cout << __func__ << ": Done. " << results.nSucceeded - << " succeeded, " << results.nFailed << " failed." << std::endl; - context->originalCbFn(true); - } -}; - -void shutdownSalmanoff(shutdownSalmanoffCbFn callback) +void shutdownSalmanoff(void) { std::cout << __func__ << ": Entered." << std::endl; - - // Create the shutdown request object to hold state and callbacks - auto request = std::make_shared(std::move(callback)); - - sense_api::SenseApiManager::getInstance().detachAllSenseDevicesReq( - std::bind( - &ShutdownSalmanoffReq::shutdownSalmanoffReq1, - request.get(), request, - std::placeholders::_1)); + device::DeviceManager::getInstance().finalize(); + sense_api::SenseApiManager::getInstance().finalize(); } } // namespace smo diff --git a/smocore/mind.cpp b/smocore/mind.cpp index 331ca9d..c3731b7 100644 --- a/smocore/mind.cpp +++ b/smocore/mind.cpp @@ -4,6 +4,7 @@ #include #include #include +#include namespace smo { @@ -65,10 +66,10 @@ public: parent(parent) {} - void callOriginalCbFn(void) + void callOriginalCbFn(bool success) { if (originalCbFn) { - originalCbFn(true); + originalCbFn(success); } } @@ -93,27 +94,71 @@ public: ) { std::cout << "Mrntt: All mind threads started." << "\n"; - callOriginalCbFn(); + + parent.initializeBodyReq( + std::bind( + &MindLifetimeMgmtOp::initializeReq3, + context.get(), context, std::placeholders::_1)); + } + + void initializeReq3( + [[maybe_unused]] std::shared_ptr context, + bool success + ) + { + std::cout << "Mrntt: Body component initialized." << "\n"; + callOriginalCbFn(success); } void finalizeReq1( - [[maybe_unused]] std::shared_ptr context + [[maybe_unused]] std::shared_ptr context, + bool success ) { - std::cout << "Mrntt: All mind threads JOLTed." << "\n"; + if (!success) { + std::cerr << "Mrntt: Body component failed to finalize." << "\n"; + } else { + std::cout << "Mrntt: Body component finalized." << "\n"; + } - parent.exitAllMindThreadsReq( - std::bind( - &MindLifetimeMgmtOp::finalizeReq2, - context.get(), context)); + /* If the threads haven't been jolted, we need to do that first, because + * otherwise they'll just enter their main loops and wait for control + * messages from mrntt after processing the exit request. + */ + if (!parent.threadsHaveBeenJolted) + { + parent.joltAllMindThreadsReq( + std::bind( + &MindLifetimeMgmtOp::finalizeReq2, + context.get(), context)); + } + else + { + parent.exitAllMindThreadsReq( + std::bind( + &MindLifetimeMgmtOp::finalizeReq3, + context.get(), context)); + } } void finalizeReq2( [[maybe_unused]] std::shared_ptr context ) + { + std::cout << "Mrntt: All mind threads JOLTed for finalization." << "\n"; + + parent.exitAllMindThreadsReq( + std::bind( + &MindLifetimeMgmtOp::finalizeReq3, + context.get(), context)); + } + + void finalizeReq3( + [[maybe_unused]] std::shared_ptr context + ) { std::cout << "Mrntt: All mind threads exited." << "\n"; - callOriginalCbFn(); + callOriginalCbFn(true); } }; @@ -146,24 +191,169 @@ void Mind::finalizeReq(mindLifetimeMgmtOpCbFn callback) auto request = std::make_shared( *this, callback); - /* If the threads haven't been jolted, we need to do that first, because - * otherwise they'll just enter their main loops and wait for control - * messages from mrntt after processing the exit request. - */ - if (!threadsHaveBeenJolted) - { - joltAllMindThreadsReq( + finalizeBodyReq( + std::bind( + &MindLifetimeMgmtOp::finalizeReq1, + request.get(), request, std::placeholders::_1)); +} + +class Mind::InitializeBodyReq +: public MindLifetimeMgmtOp, public ContinuationTarget +{ +public: + InitializeBodyReq( + Mind &parent, const std::shared_ptr &caller, + mindLifetimeMgmtOpCbFn callback) + : MindLifetimeMgmtOp(parent, callback), ContinuationTarget(caller) + {} + + void callOriginalCbFn(bool success) + { + if (originalCbFn) + { + caller->getIoService().post( + std::bind(originalCbFn, success)); + } + } + +public: + void initializeBodyReq1( + [[maybe_unused]] std::shared_ptr context + ) + { + auto self = ComponentThread::getSelf(); + if (self->id != ComponentThread::BODY) + { + throw std::runtime_error(std::string(__func__) + + ": Must be executed on Body thread"); + } + + /** EXPLANATION: + * The ComponentThread instance we pass in here is the one that will be + * used by Senseapi libs to perform device-independent background + * operations. + * For example, liblivoxProto1's BroadcastListener will use this thread + * to listen for UDP broadcast dgrams from Livox devices. + * + * Right now we use Marionette, but there's a strong argument for using + * Body instead since it's meant to handle device-management operations. + */ + sense_api::SenseApiManager::getInstance() + .loadAllSenseApiLibsFromOptions(caller); + + std::cout << sense_api::SenseApiManager::getInstance().stringifyLibs() + << std::endl; + + if (OptionParser::getOptions().verbose) + { + std::cout << __func__ << ": About to initializeAllSenseApiLibs" + << '\n'; + } + sense_api::SenseApiManager::getInstance().initializeAllSenseApiLibs(); + + if (OptionParser::getOptions().verbose) + { + std::cout << __func__ << ": About to attachAllSenseDevicesFromSpecs" + << '\n'; + } + sense_api::SenseApiManager::getInstance() + .attachAllSenseDevicesFromSpecsReq( + std::bind( + &InitializeBodyReq::initializeBodyReq2, + context.get(), context, + std::placeholders::_1)); + } + + void initializeBodyReq2( + [[maybe_unused]] std::shared_ptr context, + smo::AsynchronousLoop &results + ) + { + std::cout << "Mrntt: attached " + << results.nSucceeded << " of " << results.nTotal + << " sense devices." << "\n"; + + callOriginalCbFn(results.nSucceeded == results.nTotal); + } +}; + +class Mind::FinalizeBodyReq +: public InitializeBodyReq +{ +public: + using InitializeBodyReq::InitializeBodyReq; + +public: + void finalizeBodyReq1( + [[maybe_unused]] std::shared_ptr context + ) + { + auto self = ComponentThread::getSelf(); + if (self->id != ComponentThread::BODY) + { + throw std::runtime_error(std::string(__func__) + + ": Must be executed on Body thread"); + } + + std::cout << "Mrntt: About to detach all sense devices." << "\n"; + sense_api::SenseApiManager::getInstance().detachAllSenseDevicesReq( std::bind( - &MindLifetimeMgmtOp::finalizeReq1, - request.get(), request)); - } - else - { - exitAllMindThreadsReq( - std::bind( - &MindLifetimeMgmtOp::finalizeReq1, - request.get(), request)); - } + &FinalizeBodyReq::finalizeBodyReq2, + context.get(), context, + std::placeholders::_1)); + } + + void finalizeBodyReq2( + [[maybe_unused]] std::shared_ptr context, + smo::AsynchronousLoop &results + ) + { + std::cout << "Mrntt: Successfully detached " + << results.nSucceeded << " of " << results.nTotal + << " sense devices." << "\n"; + + std::cout << "Mrntt: About to unload all sense api libs." << "\n"; + sense_api::SenseApiManager::getInstance().unloadAllSenseApiLibs(); + callOriginalCbFn(results.nSucceeded == results.nTotal); + } +}; + +void Mind::initializeBodyReq(mindLifetimeMgmtOpCbFn callback) +{ + auto mrntt = ComponentThread::getSelf(); + + if (mrntt->id != ComponentThread::MRNTT) + { + throw std::runtime_error(std::string(__func__) + + ": Must be invoked by Mrntt thread"); + } + + auto request = std::make_shared( + *this, mrntt, callback); + + this->getComponentThread(ComponentThread::BODY)->getIoService().post( + std::bind( + &InitializeBodyReq::initializeBodyReq1, + request.get(), request)); +} + +void Mind::finalizeBodyReq(mindLifetimeMgmtOpCbFn callback) +{ + auto mrntt = ComponentThread::getSelf(); + + if (mrntt->id != ComponentThread::MRNTT) + { + throw std::runtime_error(std::string(__func__) + + ": Must be invoked by Mrntt thread"); + } + + auto request = std::make_shared( + *this, mrntt, callback); + + this->getComponentThread(ComponentThread::BODY)->getIoService().post( + std::bind( + &FinalizeBodyReq::finalizeBodyReq1, + request.get(), request)); } void Mind::distributeAndPinThreadsAcrossCpus() diff --git a/smocore/senseApis/senseApiManager.cpp b/smocore/senseApis/senseApiManager.cpp index a62ce8a..44d43b7 100644 --- a/smocore/senseApis/senseApiManager.cpp +++ b/smocore/senseApis/senseApiManager.cpp @@ -97,7 +97,7 @@ std::optional SenseApiManager::searchForLibInSmoSearchPaths( SenseApiLib& SenseApiManager::loadSenseApiLib( const std::string& libraryPath, - std::shared_ptr& componentThread + const std::shared_ptr& componentThread ) { std::optional fullPath = searchForLibInSmoSearchPaths( @@ -204,7 +204,7 @@ void SenseApiManager::unloadAllSenseApiLibs(void) } void SenseApiManager::loadAllSenseApiLibsFromOptions( - std::shared_ptr& componentThread + const std::shared_ptr& componentThread ) { const auto& options = OptionParser::getOptions(); @@ -351,6 +351,11 @@ public: context->originalCbFn(context->loop); } + void callOriginalCallback() + { + originalCbFn(loop); + } + public: AsynchronousLoop loop; }; @@ -365,7 +370,7 @@ void SenseApiManager::attachAllSenseDevicesFromSpecsReq( if (request->loop.nTotalIsZero()) { - cb(request->loop); + request->callOriginalCallback(); return; } @@ -381,7 +386,7 @@ void SenseApiManager::attachAllSenseDevicesFromSpecsReq( } catch (const std::exception& e) { std::cerr << __func__ << ": Exception: " << e.what() << "\n"; if (request->loop.incrementSuccessOrFailureAndTestForCompletionDueTo(false)) - { cb(request->loop); } + { request->callOriginalCallback(); } } } } @@ -418,6 +423,11 @@ public: context->originalCbFn(context->loop); } + + void callOriginalCallback() + { + originalCbFn(loop); + } }; void SenseApiManager::detachAllSenseDevicesReq( @@ -429,7 +439,7 @@ void SenseApiManager::detachAllSenseDevicesReq( if (request->loop.nTotalIsZero()) { - cb(request->loop); + request->callOriginalCallback(); return; } @@ -445,7 +455,9 @@ void SenseApiManager::detachAllSenseDevicesReq( } catch (const std::exception& e) { std::cerr << __func__ << ": Exception: " << e.what() << "\n"; if (request->loop.incrementSuccessOrFailureAndTestForCompletionDueTo(false)) - { cb(request->loop); } + { + request->callOriginalCallback(); + } } } }