From 1deb92a4168fe08857ac11e8b24e07234b5c0d4f Mon Sep 17 00:00:00 2001 From: Hayodea Hakol Date: Sun, 3 Aug 2025 08:22:45 -0400 Subject: [PATCH] CompThreads: create execOpOnAllMindThreads common helper This allows us to execute an op on all mind threads without having to repeatedly write loops. We've implemented wrappers to handle start, pause, resume, exit and JOLT sequences. --- smocore/componentThread.cpp | 190 ++++++++++++++++++++++++------ smocore/include/componentThread.h | 24 +++- smocore/mind.cpp | 29 +++-- 3 files changed, 192 insertions(+), 51 deletions(-) diff --git a/smocore/componentThread.cpp b/smocore/componentThread.cpp index 46a3586..d684675 100644 --- a/smocore/componentThread.cpp +++ b/smocore/componentThread.cpp @@ -214,11 +214,152 @@ void ComponentThread::resumeThreadReq(std::function callback) }); } -static int threadsKilledCount; +void ComponentThread::joltThreadReq(std::function callback) +{ + this->getIoService().post([this, caller = getSelf(), callback]() + { + std::cout << "Thread '" << name << "': handling JOLT request." << "\n"; + // Stop the main io_service to jolt the thread + io_service.stop(); + + if (callback) { + caller->getIoService().post(callback); + } + }); +} + +struct AllMindThreadsOpReqContext { + AllMindThreadsOpReqContext() : nThreadsProcessed(0) {} + + int nThreadsProcessed; +}; + +static const std::string getOpName(ComponentThread::ThreadOp op) +{ + if (op < (ComponentThread::ThreadOp)0 + || op > ComponentThread::ThreadOp::JOLT) + { + throw std::runtime_error(std::string(__func__) + + ": Invalid operation"); + } + + switch (op) + { + case ComponentThread::ThreadOp::START: return "starting"; + case ComponentThread::ThreadOp::PAUSE: return "pausing"; + case ComponentThread::ThreadOp::RESUME: return "resuming"; + case ComponentThread::ThreadOp::EXIT: return "exiting"; + case ComponentThread::ThreadOp::JOLT: return "jolting"; + default: return "unknown"; + } +} + +void ComponentThread::execOpOnAllMindThreadsReq( + ThreadOp op, std::function callback + ) +{ + std::shared_ptr self = getSelf(); + // Check that we're being called from the marionette thread + if (self->id != MRNTT) + { + throw std::runtime_error(std::string(__func__) + + ": invoked on non-mrntt thread " + self->name); + } + + std::cout << "Mrntt: " << getOpName(op) << " all mind threads." << "\n"; + + auto context = std::make_shared(); + const int N_THREADS_EXCEPT_MRNTT = ComponentThread::N_ITEMS - 1; + + for (auto &currThread : ComponentThread::componentThreads) + { + if (currThread->id == ComponentThread::MRNTT) + { continue; } + + auto threadCallback = [context, callback, N_THREADS_EXCEPT_MRNTT, op]() + { + ++context->nThreadsProcessed; + if (context->nThreadsProcessed < N_THREADS_EXCEPT_MRNTT) + { return; } + + if (op == ThreadOp::EXIT) + { + // Special cleanup for exit operations + for (auto &currThreadJ : ComponentThread::componentThreads) + { + if (currThreadJ->id == ComponentThread::MRNTT) + { continue; } + + currThreadJ->thread.join(); + } + } + + std::cout << "Mrntt: all mind threads done " << getOpName(op) << "." + << "\n"; + + if (callback) { callback(); } + }; + + switch (op) { + case ThreadOp::START: + currThread->startThreadReq(threadCallback); + break; + case ThreadOp::PAUSE: + currThread->pauseThreadReq(threadCallback); + break; + case ThreadOp::RESUME: + currThread->resumeThreadReq(threadCallback); + break; + case ThreadOp::EXIT: + currThread->exitThreadReq(threadCallback); + break; + case ThreadOp::JOLT: + currThread->joltThreadReq(threadCallback); + break; + default: + throw std::runtime_error("Invalid operation"); + } + } +} + +void ComponentThread::startAllMindThreadsReq(std::function callback) +{ + execOpOnAllMindThreadsReq(ThreadOp::START, callback); +} + +void ComponentThread::pauseAllMindThreadsReq(std::function callback) +{ + execOpOnAllMindThreadsReq(ThreadOp::PAUSE, callback); +} + +void ComponentThread::resumeAllMindThreadsReq(std::function callback) +{ + execOpOnAllMindThreadsReq(ThreadOp::RESUME, callback); +} + +void ComponentThread::exitAllMindThreadsReq(std::function callback) +{ + execOpOnAllMindThreadsReq(ThreadOp::EXIT, callback); +} + +void ComponentThread::joltAllMindThreadsReq(std::function callback) +{ + execOpOnAllMindThreadsReq(ThreadOp::JOLT, callback); +} + +/* This shouldn't take a callback because the caller shouldn't expect to + * Mrntt to send a reply signal to it. Sending this Indication means that + * Mrntt will send the calling thread an exitThreadReq. When the caller + * processes that exitThreadReq(), the caller will exit its event loop and then + * terminate. + * + * Even if Mrntt sent a RDY response, the caller shouldn't actually be executing + * any longer to receive it anyway. + */ void ComponentThread::exceptionInd(ComponentThread& thread) { - if (this->id != MRNTT) + if (this->id != ComponentThread::MRNTT) { throw std::runtime_error(std::string(__func__) + ": invoked on non-mrntt thread " + thread.name); @@ -226,40 +367,19 @@ void ComponentThread::exceptionInd(ComponentThread& thread) // Post the exception to the mrntt thread. this->getIoService().post( - [&thread]() + [&thread]() + { + std::cerr << "Mrntt: Exception occurred: in thread " + << thread.name << ". Killing Salmanoff." << "\n"; + + ComponentThread::exitAllMindThreadsReq( + []() { - std::cerr << "Mrntt: Exception occurred: in thread " - << thread.name << ". Killing Salmanoff." << "\n"; - - threadsKilledCount = 0; - for (auto &currThread : ComponentThread::componentThreads) - { - if (currThread->id == MRNTT) - { continue; } - - currThread->exitThreadReq( - []() - { - ++threadsKilledCount; - if (threadsKilledCount < ComponentThread::N_ITEMS - 1) - { return; } - - for (auto &currThreadJ - : ComponentThread::componentThreads) - { - if (currThreadJ->id == MRNTT) - { continue; } - - currThreadJ->thread.join(); - } - - mrntt::mrntt->keepLooping = false; - mrntt::mrntt->getIoService().stop(); - } - ); - } - } - ); + mrntt::mrntt->keepLooping = false; + mrntt::mrntt->getIoService().stop(); + std::cout << "Mrntt: Signaled main loop to exit." << "\n"; + }); + }); } } // namespace smo diff --git a/smocore/include/componentThread.h b/smocore/include/componentThread.h index 8bcf43d..e676334 100644 --- a/smocore/include/componentThread.h +++ b/smocore/include/componentThread.h @@ -39,7 +39,7 @@ public: boost::asio::io_service& getIoService(void) { return io_service; } void initializeTls(void); - const std::shared_ptr getSelf(void); + static const std::shared_ptr getSelf(void); static std::shared_ptr getComponentThread( ThreadId id = N_ITEMS) @@ -78,6 +78,28 @@ public: void exitThreadReq(std::function callback = nullptr); void pauseThreadReq(std::function callback = nullptr); void resumeThreadReq(std::function callback = nullptr); + void joltThreadReq(std::function callback = nullptr); + + // Convenience wrappers + static void startAllMindThreadsReq(std::function callback = nullptr); + static void pauseAllMindThreadsReq(std::function callback = nullptr); + static void resumeAllMindThreadsReq(std::function callback = nullptr); + static void exitAllMindThreadsReq(std::function callback = nullptr); + static void joltAllMindThreadsReq(std::function callback = nullptr); + + enum class ThreadOp + { + START, + PAUSE, + RESUME, + EXIT, + JOLT, + N_ITEMS + }; + static void execOpOnAllMindThreadsReq( + ThreadOp op, std::function callback = nullptr); + + // Intentionally doesn't take a callback. void exceptionInd(ComponentThread& thread); public: diff --git a/smocore/mind.cpp b/smocore/mind.cpp index bd1bfe6..da1eca9 100644 --- a/smocore/mind.cpp +++ b/smocore/mind.cpp @@ -1,4 +1,4 @@ - +#include #include #include @@ -6,20 +6,19 @@ namespace smo { void Mind::initialize() { - /* Start the threads */ - for (auto& componentThread : smo::ComponentThread::componentThreads) - { - // Post startThread() to the event loop of all threads except MRNTT. - if (componentThread->id == ComponentThread::MRNTT) { continue; } - - // JOLT the thread. - componentThread->getIoService().post([componentThread]() - { componentThread->getIoService().stop(); } - ); - - // Now tell it to execute its initialization sequence. - componentThread->startThreadReq(); - } + /* Jolt the threads, then start them */ + ComponentThread::joltAllMindThreadsReq( + []() + { + std::cout << "Mrntt: All mind threads JOLTed." << "\n"; + ComponentThread::startAllMindThreadsReq( + []() + { + std::cout << "Mrntt: All mind threads started." << "\n"; + } + ); + } + ); } void Mind::finalize(void)