CompThreads: create execOpOnAllMindThreads common helper

This allows us to execute an op on all mind threads without having
to repeatedly write loops. We've implemented wrappers to handle
start, pause, resume, exit and JOLT sequences.
This commit is contained in:
2025-08-03 08:22:45 -04:00
parent 6f6fa77498
commit 1deb92a416
3 changed files with 192 additions and 51 deletions
+155 -35
View File
@@ -214,11 +214,152 @@ void ComponentThread::resumeThreadReq(std::function<void()> callback)
});
}
static int threadsKilledCount;
void ComponentThread::joltThreadReq(std::function<void()> callback)
{
this->getIoService().post([this, caller = getSelf(), callback]()
{
std::cout << "Thread '" << name << "': handling JOLT request." << "\n";
// Stop the main io_service to jolt the thread
io_service.stop();
if (callback) {
caller->getIoService().post(callback);
}
});
}
struct AllMindThreadsOpReqContext {
AllMindThreadsOpReqContext() : nThreadsProcessed(0) {}
int nThreadsProcessed;
};
static const std::string getOpName(ComponentThread::ThreadOp op)
{
if (op < (ComponentThread::ThreadOp)0
|| op > ComponentThread::ThreadOp::JOLT)
{
throw std::runtime_error(std::string(__func__)
+ ": Invalid operation");
}
switch (op)
{
case ComponentThread::ThreadOp::START: return "starting";
case ComponentThread::ThreadOp::PAUSE: return "pausing";
case ComponentThread::ThreadOp::RESUME: return "resuming";
case ComponentThread::ThreadOp::EXIT: return "exiting";
case ComponentThread::ThreadOp::JOLT: return "jolting";
default: return "unknown";
}
}
void ComponentThread::execOpOnAllMindThreadsReq(
ThreadOp op, std::function<void()> callback
)
{
std::shared_ptr<ComponentThread> self = getSelf();
// Check that we're being called from the marionette thread
if (self->id != MRNTT)
{
throw std::runtime_error(std::string(__func__)
+ ": invoked on non-mrntt thread " + self->name);
}
std::cout << "Mrntt: " << getOpName(op) << " all mind threads." << "\n";
auto context = std::make_shared<AllMindThreadsOpReqContext>();
const int N_THREADS_EXCEPT_MRNTT = ComponentThread::N_ITEMS - 1;
for (auto &currThread : ComponentThread::componentThreads)
{
if (currThread->id == ComponentThread::MRNTT)
{ continue; }
auto threadCallback = [context, callback, N_THREADS_EXCEPT_MRNTT, op]()
{
++context->nThreadsProcessed;
if (context->nThreadsProcessed < N_THREADS_EXCEPT_MRNTT)
{ return; }
if (op == ThreadOp::EXIT)
{
// Special cleanup for exit operations
for (auto &currThreadJ : ComponentThread::componentThreads)
{
if (currThreadJ->id == ComponentThread::MRNTT)
{ continue; }
currThreadJ->thread.join();
}
}
std::cout << "Mrntt: all mind threads done " << getOpName(op) << "."
<< "\n";
if (callback) { callback(); }
};
switch (op) {
case ThreadOp::START:
currThread->startThreadReq(threadCallback);
break;
case ThreadOp::PAUSE:
currThread->pauseThreadReq(threadCallback);
break;
case ThreadOp::RESUME:
currThread->resumeThreadReq(threadCallback);
break;
case ThreadOp::EXIT:
currThread->exitThreadReq(threadCallback);
break;
case ThreadOp::JOLT:
currThread->joltThreadReq(threadCallback);
break;
default:
throw std::runtime_error("Invalid operation");
}
}
}
void ComponentThread::startAllMindThreadsReq(std::function<void()> callback)
{
execOpOnAllMindThreadsReq(ThreadOp::START, callback);
}
void ComponentThread::pauseAllMindThreadsReq(std::function<void()> callback)
{
execOpOnAllMindThreadsReq(ThreadOp::PAUSE, callback);
}
void ComponentThread::resumeAllMindThreadsReq(std::function<void()> callback)
{
execOpOnAllMindThreadsReq(ThreadOp::RESUME, callback);
}
void ComponentThread::exitAllMindThreadsReq(std::function<void()> callback)
{
execOpOnAllMindThreadsReq(ThreadOp::EXIT, callback);
}
void ComponentThread::joltAllMindThreadsReq(std::function<void()> callback)
{
execOpOnAllMindThreadsReq(ThreadOp::JOLT, callback);
}
/* This shouldn't take a callback because the caller shouldn't expect to
* Mrntt to send a reply signal to it. Sending this Indication means that
* Mrntt will send the calling thread an exitThreadReq. When the caller
* processes that exitThreadReq(), the caller will exit its event loop and then
* terminate.
*
* Even if Mrntt sent a RDY response, the caller shouldn't actually be executing
* any longer to receive it anyway.
*/
void ComponentThread::exceptionInd(ComponentThread& thread)
{
if (this->id != MRNTT)
if (this->id != ComponentThread::MRNTT)
{
throw std::runtime_error(std::string(__func__)
+ ": invoked on non-mrntt thread " + thread.name);
@@ -226,40 +367,19 @@ void ComponentThread::exceptionInd(ComponentThread& thread)
// Post the exception to the mrntt thread.
this->getIoService().post(
[&thread]()
[&thread]()
{
std::cerr << "Mrntt: Exception occurred: in thread "
<< thread.name << ". Killing Salmanoff." << "\n";
ComponentThread::exitAllMindThreadsReq(
[]()
{
std::cerr << "Mrntt: Exception occurred: in thread "
<< thread.name << ". Killing Salmanoff." << "\n";
threadsKilledCount = 0;
for (auto &currThread : ComponentThread::componentThreads)
{
if (currThread->id == MRNTT)
{ continue; }
currThread->exitThreadReq(
[]()
{
++threadsKilledCount;
if (threadsKilledCount < ComponentThread::N_ITEMS - 1)
{ return; }
for (auto &currThreadJ
: ComponentThread::componentThreads)
{
if (currThreadJ->id == MRNTT)
{ continue; }
currThreadJ->thread.join();
}
mrntt::mrntt->keepLooping = false;
mrntt::mrntt->getIoService().stop();
}
);
}
}
);
mrntt::mrntt->keepLooping = false;
mrntt::mrntt->getIoService().stop();
std::cout << "Mrntt: Signaled main loop to exit." << "\n";
});
});
}
} // namespace smo
+23 -1
View File
@@ -39,7 +39,7 @@ public:
boost::asio::io_service& getIoService(void) { return io_service; }
void initializeTls(void);
const std::shared_ptr<ComponentThread> getSelf(void);
static const std::shared_ptr<ComponentThread> getSelf(void);
static std::shared_ptr<ComponentThread> getComponentThread(
ThreadId id = N_ITEMS)
@@ -78,6 +78,28 @@ public:
void exitThreadReq(std::function<void()> callback = nullptr);
void pauseThreadReq(std::function<void()> callback = nullptr);
void resumeThreadReq(std::function<void()> callback = nullptr);
void joltThreadReq(std::function<void()> callback = nullptr);
// Convenience wrappers
static void startAllMindThreadsReq(std::function<void()> callback = nullptr);
static void pauseAllMindThreadsReq(std::function<void()> callback = nullptr);
static void resumeAllMindThreadsReq(std::function<void()> callback = nullptr);
static void exitAllMindThreadsReq(std::function<void()> callback = nullptr);
static void joltAllMindThreadsReq(std::function<void()> callback = nullptr);
enum class ThreadOp
{
START,
PAUSE,
RESUME,
EXIT,
JOLT,
N_ITEMS
};
static void execOpOnAllMindThreadsReq(
ThreadOp op, std::function<void()> callback = nullptr);
// Intentionally doesn't take a callback.
void exceptionInd(ComponentThread& thread);
public:
+14 -15
View File
@@ -1,4 +1,4 @@
#include <iostream>
#include <mind.h>
#include <componentThread.h>
@@ -6,20 +6,19 @@ namespace smo {
void Mind::initialize()
{
/* Start the threads */
for (auto& componentThread : smo::ComponentThread::componentThreads)
{
// Post startThread() to the event loop of all threads except MRNTT.
if (componentThread->id == ComponentThread::MRNTT) { continue; }
// JOLT the thread.
componentThread->getIoService().post([componentThread]()
{ componentThread->getIoService().stop(); }
);
// Now tell it to execute its initialization sequence.
componentThread->startThreadReq();
}
/* Jolt the threads, then start them */
ComponentThread::joltAllMindThreadsReq(
[]()
{
std::cout << "Mrntt: All mind threads JOLTed." << "\n";
ComponentThread::startAllMindThreadsReq(
[]()
{
std::cout << "Mrntt: All mind threads started." << "\n";
}
);
}
);
}
void Mind::finalize(void)