diff --git a/smocore/componentThread.cpp b/smocore/componentThread.cpp index 081bdcf..0be353e 100644 --- a/smocore/componentThread.cpp +++ b/smocore/componentThread.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -90,112 +91,285 @@ void ComponentThread::main(ComponentThread& self) << ": Unknown exception occurred" << "\n"; } - if (sendExceptionInd) { mrntt::mrntt->exceptionInd(self); } + if (sendExceptionInd) + { mrntt::mrntt->exceptionInd(self.shared_from_this()); } } std::cout << self.name << ":" << __func__ << ": Exited event loop" << "\n"; } -// Thread management method implementations -void ComponentThread::startThreadReq(std::function callback) +class ComponentThread::ThreadLifetimeMgmtOp +: public TargetedAsynchronousContinuation { - this->getIoService().post([this, caller = getSelf(), callback]() +public: + ThreadLifetimeMgmtOp( + const std::shared_ptr &caller, + const std::shared_ptr &target, + threadLifetimeMgmtOpCbFn callback) + : TargetedAsynchronousContinuation( + caller, callback), + target(target) + {} + + void callOriginalCbFn(void) { - std::cout << "Thread '" << name << "': handling startThread." << "\n"; + if (originalCbFn) { + caller->getIoService().post(originalCbFn); + } + } + +public: + const std::shared_ptr target; + +public: + void joltThreadReq1( + [[maybe_unused]] std::shared_ptr context + ) + { + std::cout << "Thread '" << target->name << "': handling JOLT request." + << "\n"; + + target->io_service.stop(); + callOriginalCbFn(); + } + + void startThreadReq1( + [[maybe_unused]] std::shared_ptr context + ) + { + std::cout << "Thread '" << target->name << "': handling startThread." + << "\n"; // Execute private setup sequence here // This is where each thread would implement its specific initialization - if (callback) { - caller->getIoService().post(callback); - } - }); -} + callOriginalCbFn(); + } + + void exitThreadReq1_mainQueue( + [[maybe_unused]] std::shared_ptr context + ) + { + std::cout << "Thread '" << target->name << "': handling exitThread " + "(main queue)." << std::endl; + + target->cleanup(); + target->io_service.stop(); + callOriginalCbFn(); + } + + void exitThreadReq1_pauseQueue( + [[maybe_unused]] std::shared_ptr context + ) + { + std::cout << "Thread '" << target->name << "': handling exitThread " + "(pause queue)." << std::endl; + + target->cleanup(); + target->pause_io_service.stop(); + target->io_service.stop(); + callOriginalCbFn(); + } + + void pauseThreadReq1( + [[maybe_unused]] std::shared_ptr context + ) + { + std::cout << "Thread '" << target->name << "': handling pauseThread." + << std::endl; + + /* We have to invoke the callback here before moving on because + * our next operation is going to block the thread, so it won't + * have a chance to invoke the callback until it's unblocked. + */ + callOriginalCbFn(); + target->pause_io_service.reset(); + target->pause_io_service.run(); + } + + void resumeThreadReq1( + [[maybe_unused]] std::shared_ptr context + ) + { + std::cout << "Thread '" << target->name << "': handling resumeThread." + << std::endl; + + target->pause_io_service.stop(); + callOriginalCbFn(); + } +}; void ComponentThread::cleanup(void) { this->keepLooping = false; } -void ComponentThread::exitThreadReq(std::function callback) +void ComponentThread::joltThreadReq(threadLifetimeMgmtOpCbFn callback) { - // Post to the main io_service - this->getIoService().post([this, caller = getSelf(), callback]() + /** EXPLANATION: + * We can't use shared_from_this() here because JOLTing occurs prior to + * TLS being set up. + * + * We also can't use getSelf() as yet for the same reason: getSelf() + * requires TLS to be set up. + * + * To obtain a sh_ptr to the caller, we just supply the mrntt thread since + * JOLT is always invoked by the mrntt thread. The JOLT sequence that the + * CRT main() function invokes on the mrntt thread is special since it + * supplies cmdline args and envp. + * + * To obtain a sh_ptr to the target thread, we explicitly look it up in the + * Mind object's collection of component threads. + */ + if (id == ComponentThread::MRNTT) { - std::cout << "Thread '" << name << "': handling exitThread " - "(main queue)." << std::endl; + throw std::runtime_error(std::string(__func__) + + ": invoked on mrntt thread"); + } - cleanup(); + std::shared_ptr + mrntt = mrntt::mrntt, + target = parent.getComponentThread(id); - // Stop the main io_service to exit the thread - io_service.stop(); - if (callback) { caller->getIoService().post(callback); } - }); + auto request = std::make_shared( + mrntt, target, callback); - // Also post to the pause io_service - this->pause_io_service.post([this, caller = getSelf(), callback]() - { - std::cout << "Thread '" << name << "': handling exitThread " - "(pause queue)." << std::endl; - - cleanup(); - - // Stop both io_services to exit the thread - pause_io_service.stop(); - io_service.stop(); - if (callback) { caller->getIoService().post(callback); } - }); + this->getIoService().post( + std::bind( + &ThreadLifetimeMgmtOp::joltThreadReq1, + request.get(), request)); } -void ComponentThread::pauseThreadReq(std::function callback) +// Thread management method implementations +void ComponentThread::startThreadReq(threadLifetimeMgmtOpCbFn callback) { - this->getIoService().post([this, caller = getSelf(), callback]() - { - std::cout << "Thread '" << name << "': handling pauseThread." - << std::endl; + std::shared_ptr caller = getSelf(); + auto request = std::make_shared( + caller, shared_from_this(), callback); - if (callback) { - caller->getIoService().post(callback); - } - - // Reset the pause io_service before running to ensure it can run again - pause_io_service.reset(); - // Run the pause io_service to block this thread - pause_io_service.run(); - }); + this->getIoService().post( + std::bind( + &ThreadLifetimeMgmtOp::startThreadReq1, + request.get(), request)); } -void ComponentThread::resumeThreadReq(std::function callback) +void ComponentThread::exitThreadReq(threadLifetimeMgmtOpCbFn callback) { + std::shared_ptr caller = getSelf(); + auto request = std::make_shared( + caller, shared_from_this(), callback); + + this->getIoService().post( + std::bind( + &ThreadLifetimeMgmtOp::exitThreadReq1_mainQueue, + request.get(), request)); + + this->pause_io_service.post( + std::bind( + &ThreadLifetimeMgmtOp::exitThreadReq1_pauseQueue, + request.get(), request)); +} + +void ComponentThread::pauseThreadReq(threadLifetimeMgmtOpCbFn callback) +{ + if (id == ComponentThread::MRNTT) + { + throw std::runtime_error(std::string(__func__) + + ": invoked on mrntt thread"); + } + + std::shared_ptr caller = getSelf(); + auto request = std::make_shared( + caller, shared_from_this(), callback); + + this->getIoService().post( + std::bind( + &ThreadLifetimeMgmtOp::pauseThreadReq1, + request.get(), request)); +} + +void ComponentThread::resumeThreadReq(threadLifetimeMgmtOpCbFn callback) +{ + if (id == ComponentThread::MRNTT) + { + throw std::runtime_error(std::string(__func__) + + ": invoked on mrntt thread"); + } + // Post to the pause_io_service to unblock the paused thread - pause_io_service.post([this, caller = getSelf(), callback]() - { - std::cout << "Thread '" << name << "': handling resumeThread." - << std::endl; + std::shared_ptr caller = getSelf(); + auto request = std::make_shared( + caller, shared_from_this(), callback); - if (callback) { - caller->getIoService().post(callback); - } - - // Stop the pause_io_service to unblock the thread - pause_io_service.stop(); - }); + this->pause_io_service.post( + std::bind( + &ThreadLifetimeMgmtOp::resumeThreadReq1, + request.get(), request)); } -void ComponentThread::joltThreadReq(std::function callback) +class ComponentThread::MindShutdownIndOp +: public TargetedAsynchronousContinuation { - this->getIoService().post([this, caller = getSelf(), callback]() +public: + MindShutdownIndOp( + const std::shared_ptr &caller, + mindShutdownIndOpCbFn callback) + : TargetedAsynchronousContinuation( + caller, callback) + {} + +public: + void mindShutdownInd1_exception( + [[maybe_unused]] std::shared_ptr context + ) { - std::cout << "Thread '" << name << "': handling JOLT request." << "\n"; + std::cerr << "Mrntt: Exception occurred: in thread " + << context->caller->name << ". Killing Salmanoff." << "\n"; - // Stop the main io_service to jolt the thread - io_service.stop(); + /** EXPLANATION: + * An exception has occurred in one of a mind's threads. We need to + * shut down all of that particular mind's threads. + */ + context->caller->parent.finalizeReq( + std::bind( + &MindShutdownIndOp::mindShutdownInd2, + context.get(), context)); + } - if (callback) { - caller->getIoService().post(callback); - } - }); -} + void mindShutdownInd1_userShutdown( + [[maybe_unused]] std::shared_ptr context + ) + { + std::cerr << "Mrntt: User requested shutdown (SIGINT)." + << " Killing Salmanoff." << "\n"; + + /** EXPLANATION: + * A user has requested a shutdown. We need to shut down all of the + * threads in all running Minds. + * + * FIXME: + * So this should ideally be a loop + * through all running Minds, calling finalizeReq on each one. + */ + context->caller->parent.finalizeReq( + std::bind( + &MindShutdownIndOp::mindShutdownInd2, + context.get(), context)); + } + + void mindShutdownInd2( + [[maybe_unused]] std::shared_ptr context + ) + { + /** FIXME: + * When we eventually support multiple minds, we should remove this + * since it causes marionette to exit, even if there are other minds + * that are still running. + */ + smo::mrntt::exitMarionetteLoop(); + } + +}; /* This shouldn't take a callback because the caller shouldn't expect to * Mrntt to send a reply signal to it. Sending this Indication means that @@ -206,34 +380,24 @@ void ComponentThread::joltThreadReq(std::function callback) * Even if Mrntt sent a RDY response, the caller shouldn't actually be executing * any longer to receive it anyway. */ -void ComponentThread::exceptionInd(ComponentThread& thread) +void ComponentThread::exceptionInd( + const std::shared_ptr &faultyThread +) { if (this->id != ComponentThread::MRNTT) { throw std::runtime_error(std::string(__func__) - + ": invoked on non-mrntt thread " + thread.name); + + ": invoked on non-mrntt thread " + faultyThread->name); } + auto request = std::make_shared( + faultyThread, nullptr); + // Post the exception to the mrntt thread. this->getIoService().post( - [&thread]() - { - std::cerr << "Mrntt: Exception occurred: in thread " - << thread.name << ". Killing Salmanoff." << "\n"; - - /** EXPLANATION: - * An exception has occurred in one of a mind's threads. We need to - * shut down all of that particular mind's threads. - */ - thread.parent.finalizeReq([]() { - /** FIXME: - * When we eventually support multiple minds, we should remove this - * since it causes marionette to exit, even if there are other minds - * that are still running. - */ - smo::mrntt::exitMarionetteLoop(); - }); - }); + std::bind( + &MindShutdownIndOp::mindShutdownInd1_exception, + request.get(), request)); } void ComponentThread::userShutdownInd() @@ -244,26 +408,14 @@ void ComponentThread::userShutdownInd() + ": invoked on non-mrntt thread " + this->name); } + auto request = std::make_shared( + ComponentThread::getMrntt(), nullptr); + // Post the user shutdown to the mrntt thread. this->getIoService().post( - [this]() - { - std::cerr << "Mrntt: User requested shutdown (SIGINT)." - << " Killing Salmanoff." << "\n"; - - /** EXPLANATION: - * A user has requested a shutdown. We need to shut down all of the - * threads in all running Minds. - */ - parent.finalizeReq([]() { - /** FIXME: - * When we eventually support multiple minds, we should remove this - * since it causes marionette to exit, even if there are other minds - * that are still running. - */ - smo::mrntt::exitMarionetteLoop(); - }); - }); + std::bind( + &MindShutdownIndOp::mindShutdownInd1_userShutdown, + request.get(), request)); } // CPU management method implementations diff --git a/smocore/include/componentThread.h b/smocore/include/componentThread.h index ff58906..bb9126f 100644 --- a/smocore/include/componentThread.h +++ b/smocore/include/componentThread.h @@ -54,11 +54,13 @@ public: typedef void (mainFn)(ComponentThread &self); static mainFn main, marionetteMain; + typedef std::function threadLifetimeMgmtOpCbFn; // Thread management methods - void startThreadReq(std::function callback = nullptr); - void exitThreadReq(std::function callback = nullptr); - void pauseThreadReq(std::function callback = nullptr); - void resumeThreadReq(std::function callback = nullptr); + void startThreadReq(threadLifetimeMgmtOpCbFn callback); + void exitThreadReq(threadLifetimeMgmtOpCbFn callback); + void pauseThreadReq(threadLifetimeMgmtOpCbFn callback); + void resumeThreadReq(threadLifetimeMgmtOpCbFn callback); + /** * JOLTs this thread to begin processing after global initialization. * @@ -66,7 +68,7 @@ public: * event loops and set up TLS vars after all global constructors have * completed. This prevents race conditions during system startup. */ - void joltThreadReq(std::function callback = nullptr); + void joltThreadReq(threadLifetimeMgmtOpCbFn callback); // CPU management methods static int getAvailableCpuCount(); @@ -82,8 +84,9 @@ public: N_ITEMS }; + typedef std::function mindShutdownIndOpCbFn; // Intentionally doesn't take a callback. - void exceptionInd(ComponentThread& thread); + void exceptionInd(const std::shared_ptr &faultyThread); // Intentionally doesn't take a callback. void userShutdownInd(); @@ -124,6 +127,10 @@ public: return threadNames[id]; } + +private: + class ThreadLifetimeMgmtOp; + class MindShutdownIndOp; }; namespace mrntt { diff --git a/smocore/marionette/marionette.cpp b/smocore/marionette/marionette.cpp index 5b46ae3..6337331 100644 --- a/smocore/marionette/marionette.cpp +++ b/smocore/marionette/marionette.cpp @@ -47,7 +47,7 @@ void ComponentThread::marionetteMain(ComponentThread& self) self.initializeTls(); mrntt::exitCode = EXIT_SUCCESS; static boost::asio::signal_set signals(self.getIoService(), SIGINT); - bool callFinalizeReq = false, callShutdownSalmanoff = false; + bool callFinalizeReq = false, callShutdownSalmanoffReq = false; try { // Register SIGINT (Ctrl+C) and SIGSEGV handlers @@ -144,7 +144,7 @@ void ComponentThread::marionetteMain(ComponentThread& self) if (sendExceptionInd) { mrntt::exitCode = EXIT_FAILURE; - self.exceptionInd(self); + self.exceptionInd(self.shared_from_this()); } } @@ -163,23 +163,23 @@ void ComponentThread::marionetteMain(ComponentThread& self) } *out << outUsageMsg << e.what() << std::endl; - callShutdownSalmanoff = callFinalizeReq = true; + callShutdownSalmanoffReq = callFinalizeReq = true; } catch (const std::exception& e) { std::cerr << __func__ << ": Exception occurred: " << e.what() << std::endl; mrntt::exitCode = EXIT_FAILURE; - callShutdownSalmanoff = callFinalizeReq = true; + callShutdownSalmanoffReq = callFinalizeReq = true; } catch (...) { std::cerr << __func__ << ": Unknown exception occurred" << std::endl; mrntt::exitCode = EXIT_FAILURE; - callShutdownSalmanoff = callFinalizeReq = true; + callShutdownSalmanoffReq = callFinalizeReq = true; } - if (callShutdownSalmanoff) + if (callShutdownSalmanoffReq) { shutdownSalmanoff( [](bool success)