diff --git a/smocore/componentThread.cpp b/smocore/componentThread.cpp index 46a3586..d684675 100644 --- a/smocore/componentThread.cpp +++ b/smocore/componentThread.cpp @@ -214,11 +214,152 @@ void ComponentThread::resumeThreadReq(std::function callback) }); } -static int threadsKilledCount; +void ComponentThread::joltThreadReq(std::function callback) +{ + this->getIoService().post([this, caller = getSelf(), callback]() + { + std::cout << "Thread '" << name << "': handling JOLT request." << "\n"; + // Stop the main io_service to jolt the thread + io_service.stop(); + + if (callback) { + caller->getIoService().post(callback); + } + }); +} + +struct AllMindThreadsOpReqContext { + AllMindThreadsOpReqContext() : nThreadsProcessed(0) {} + + int nThreadsProcessed; +}; + +static const std::string getOpName(ComponentThread::ThreadOp op) +{ + if (op < (ComponentThread::ThreadOp)0 + || op > ComponentThread::ThreadOp::JOLT) + { + throw std::runtime_error(std::string(__func__) + + ": Invalid operation"); + } + + switch (op) + { + case ComponentThread::ThreadOp::START: return "starting"; + case ComponentThread::ThreadOp::PAUSE: return "pausing"; + case ComponentThread::ThreadOp::RESUME: return "resuming"; + case ComponentThread::ThreadOp::EXIT: return "exiting"; + case ComponentThread::ThreadOp::JOLT: return "jolting"; + default: return "unknown"; + } +} + +void ComponentThread::execOpOnAllMindThreadsReq( + ThreadOp op, std::function callback + ) +{ + std::shared_ptr self = getSelf(); + // Check that we're being called from the marionette thread + if (self->id != MRNTT) + { + throw std::runtime_error(std::string(__func__) + + ": invoked on non-mrntt thread " + self->name); + } + + std::cout << "Mrntt: " << getOpName(op) << " all mind threads." << "\n"; + + auto context = std::make_shared(); + const int N_THREADS_EXCEPT_MRNTT = ComponentThread::N_ITEMS - 1; + + for (auto &currThread : ComponentThread::componentThreads) + { + if (currThread->id == ComponentThread::MRNTT) + { continue; } + + auto threadCallback = [context, callback, N_THREADS_EXCEPT_MRNTT, op]() + { + ++context->nThreadsProcessed; + if (context->nThreadsProcessed < N_THREADS_EXCEPT_MRNTT) + { return; } + + if (op == ThreadOp::EXIT) + { + // Special cleanup for exit operations + for (auto &currThreadJ : ComponentThread::componentThreads) + { + if (currThreadJ->id == ComponentThread::MRNTT) + { continue; } + + currThreadJ->thread.join(); + } + } + + std::cout << "Mrntt: all mind threads done " << getOpName(op) << "." + << "\n"; + + if (callback) { callback(); } + }; + + switch (op) { + case ThreadOp::START: + currThread->startThreadReq(threadCallback); + break; + case ThreadOp::PAUSE: + currThread->pauseThreadReq(threadCallback); + break; + case ThreadOp::RESUME: + currThread->resumeThreadReq(threadCallback); + break; + case ThreadOp::EXIT: + currThread->exitThreadReq(threadCallback); + break; + case ThreadOp::JOLT: + currThread->joltThreadReq(threadCallback); + break; + default: + throw std::runtime_error("Invalid operation"); + } + } +} + +void ComponentThread::startAllMindThreadsReq(std::function callback) +{ + execOpOnAllMindThreadsReq(ThreadOp::START, callback); +} + +void ComponentThread::pauseAllMindThreadsReq(std::function callback) +{ + execOpOnAllMindThreadsReq(ThreadOp::PAUSE, callback); +} + +void ComponentThread::resumeAllMindThreadsReq(std::function callback) +{ + execOpOnAllMindThreadsReq(ThreadOp::RESUME, callback); +} + +void ComponentThread::exitAllMindThreadsReq(std::function callback) +{ + execOpOnAllMindThreadsReq(ThreadOp::EXIT, callback); +} + +void ComponentThread::joltAllMindThreadsReq(std::function callback) +{ + execOpOnAllMindThreadsReq(ThreadOp::JOLT, callback); +} + +/* This shouldn't take a callback because the caller shouldn't expect to + * Mrntt to send a reply signal to it. Sending this Indication means that + * Mrntt will send the calling thread an exitThreadReq. When the caller + * processes that exitThreadReq(), the caller will exit its event loop and then + * terminate. + * + * Even if Mrntt sent a RDY response, the caller shouldn't actually be executing + * any longer to receive it anyway. + */ void ComponentThread::exceptionInd(ComponentThread& thread) { - if (this->id != MRNTT) + if (this->id != ComponentThread::MRNTT) { throw std::runtime_error(std::string(__func__) + ": invoked on non-mrntt thread " + thread.name); @@ -226,40 +367,19 @@ void ComponentThread::exceptionInd(ComponentThread& thread) // Post the exception to the mrntt thread. this->getIoService().post( - [&thread]() + [&thread]() + { + std::cerr << "Mrntt: Exception occurred: in thread " + << thread.name << ". Killing Salmanoff." << "\n"; + + ComponentThread::exitAllMindThreadsReq( + []() { - std::cerr << "Mrntt: Exception occurred: in thread " - << thread.name << ". Killing Salmanoff." << "\n"; - - threadsKilledCount = 0; - for (auto &currThread : ComponentThread::componentThreads) - { - if (currThread->id == MRNTT) - { continue; } - - currThread->exitThreadReq( - []() - { - ++threadsKilledCount; - if (threadsKilledCount < ComponentThread::N_ITEMS - 1) - { return; } - - for (auto &currThreadJ - : ComponentThread::componentThreads) - { - if (currThreadJ->id == MRNTT) - { continue; } - - currThreadJ->thread.join(); - } - - mrntt::mrntt->keepLooping = false; - mrntt::mrntt->getIoService().stop(); - } - ); - } - } - ); + mrntt::mrntt->keepLooping = false; + mrntt::mrntt->getIoService().stop(); + std::cout << "Mrntt: Signaled main loop to exit." << "\n"; + }); + }); } } // namespace smo diff --git a/smocore/include/componentThread.h b/smocore/include/componentThread.h index 8bcf43d..e676334 100644 --- a/smocore/include/componentThread.h +++ b/smocore/include/componentThread.h @@ -39,7 +39,7 @@ public: boost::asio::io_service& getIoService(void) { return io_service; } void initializeTls(void); - const std::shared_ptr getSelf(void); + static const std::shared_ptr getSelf(void); static std::shared_ptr getComponentThread( ThreadId id = N_ITEMS) @@ -78,6 +78,28 @@ public: void exitThreadReq(std::function callback = nullptr); void pauseThreadReq(std::function callback = nullptr); void resumeThreadReq(std::function callback = nullptr); + void joltThreadReq(std::function callback = nullptr); + + // Convenience wrappers + static void startAllMindThreadsReq(std::function callback = nullptr); + static void pauseAllMindThreadsReq(std::function callback = nullptr); + static void resumeAllMindThreadsReq(std::function callback = nullptr); + static void exitAllMindThreadsReq(std::function callback = nullptr); + static void joltAllMindThreadsReq(std::function callback = nullptr); + + enum class ThreadOp + { + START, + PAUSE, + RESUME, + EXIT, + JOLT, + N_ITEMS + }; + static void execOpOnAllMindThreadsReq( + ThreadOp op, std::function callback = nullptr); + + // Intentionally doesn't take a callback. void exceptionInd(ComponentThread& thread); public: diff --git a/smocore/mind.cpp b/smocore/mind.cpp index bd1bfe6..da1eca9 100644 --- a/smocore/mind.cpp +++ b/smocore/mind.cpp @@ -1,4 +1,4 @@ - +#include #include #include @@ -6,20 +6,19 @@ namespace smo { void Mind::initialize() { - /* Start the threads */ - for (auto& componentThread : smo::ComponentThread::componentThreads) - { - // Post startThread() to the event loop of all threads except MRNTT. - if (componentThread->id == ComponentThread::MRNTT) { continue; } - - // JOLT the thread. - componentThread->getIoService().post([componentThread]() - { componentThread->getIoService().stop(); } - ); - - // Now tell it to execute its initialization sequence. - componentThread->startThreadReq(); - } + /* Jolt the threads, then start them */ + ComponentThread::joltAllMindThreadsReq( + []() + { + std::cout << "Mrntt: All mind threads JOLTed." << "\n"; + ComponentThread::startAllMindThreadsReq( + []() + { + std::cout << "Mrntt: All mind threads started." << "\n"; + } + ); + } + ); } void Mind::finalize(void)