Files
salmanoff/smocore/componentThread.cpp
T
hayodea 36c79f3a2e Threading: run all code in PThreads, add JOLTing & exception bubbling
This commit significantly restructures the way we setup threading in
SMO. We now don't use the CRT main() thread at all. It's only used
as a mechanism to ensure that Marionette doesn't execute before
global constructors have been executed.

JOLTing:

This is a simple ASIO post()ed message that makes each thread setup
its thread-local data pointer to its own ComponentThread object,
and then enter its main ASIO run() loop to await commands from
Marionette.

Exception bubbling:

We now cleanly cause mind threads to report their exceptions
to marionette, so that marionette can cleanly shut the mind down
in an orderly fashion.

Thread Control messaging API:

A namespace of asynchronous messages to be post()ed to threads to
control them. It enables us to pause and resume threads. This will
be very useful for Marionette when we add the ability for it to
suspend Salmanoff's running mind, inject new goals, inspect current
state, etc; and then resume the mind's execution.
2025-07-28 07:20:44 -04:00

257 lines
6.5 KiB
C++

#include <iostream>
#include <componentThread.h>
#include <boost/asio.hpp>
namespace smo {
thread_local std::shared_ptr<ComponentThread> thisComponentThread;
const std::string ComponentThread::threadNames[N_ITEMS] =
{
"mrntt",
"director",
"simulator",
"subconscious",
"body",
"world"
};
namespace mrntt {
std::shared_ptr<ComponentThread> mrntt =
std::make_shared<ComponentThread>(ComponentThread::MRNTT);
}
namespace director {
/* The director is the seat of volition in Salmanoff. It receives sensor
* events from the body and world, and uses them to direct its implexors
* to implex new menties. It then loads the menties into canvas for simulation
* and correlation with intrins, in order to form new attrimotions and
* menties.
*/
std::shared_ptr<ComponentThread> director =
std::make_shared<ComponentThread>(ComponentThread::DIRECTOR);
}
namespace simulator {
/* The canvas is the simulation engine in Salmanoff. It receives menties and
* simulates them in accordance with the instructions from director. It then
* re-renders them into perception for director to get feedback.
*/
std::shared_ptr<ComponentThread> canvas =
std::make_shared<ComponentThread>(ComponentThread::SIMULATOR);
}
namespace subconscious {
/* The subconscious is the seat of memory in Salmanoff. It receives menties
* from director and stores them in memory for later recall.
*/
std::shared_ptr<ComponentThread> subconscious =
std::make_shared<ComponentThread>(ComponentThread::SUBCONSCIOUS);
}
namespace body {
/* The body is a thread that polls, processes, and sends interoceptive sensor
* events to director. It enables these events to occur asynchronously,
* indepdendent any actions that the other threads are taking.
*/
std::shared_ptr<ComponentThread> body =
std::make_shared<ComponentThread>(ComponentThread::BODY);
}
namespace world {
/* The world performs the same functions as the body, but for extrospective
* sensor events.
*/
std::shared_ptr<ComponentThread> world =
std::make_shared<ComponentThread>(ComponentThread::WORLD);
}
std::array<std::shared_ptr<ComponentThread>, ComponentThread::N_ITEMS>
ComponentThread::componentThreads =
{
mrntt::mrntt,
director::director,
simulator::canvas,
subconscious::subconscious,
body::body,
world::world
};
void ComponentThread::initializeTls(void)
{
thisComponentThread = shared_from_this();
}
const std::shared_ptr<ComponentThread> ComponentThread::getSelf(void)
{
if (!thisComponentThread)
{
throw std::runtime_error(std::string(__func__)
+ ": TLS not initialized");
}
return thisComponentThread;
}
void ComponentThread::main(ComponentThread& self)
{
std::cout << self.name << ":" << __func__ << ": Waiting for JOLT" <<"\n";
self.getIoService().run();
self.initializeTls();
std::cout << self.name << ":" << __func__ << ": Entering event loop" <<"\n";
for (self.keepLooping = true; self.keepLooping;)
{
try {
self.getIoService().reset();
self.getIoService().run();
}
catch (const std::exception& e)
{
std::cerr << self.name << ":" << __func__
<< ": Exception occurred: " << e.what() << "\n";
mrntt::mrntt->exceptionInd(self);
}
catch (...)
{
std::cerr << self.name << ":" << __func__
<< ": Unknown exception occurred" << "\n";
mrntt::mrntt->exceptionInd(self);
}
}
std::cout << self.name << ":" << __func__ << ": Exiting event loop" << "\n";
}
// Thread management method implementations
void ComponentThread::startThreadReq(std::function<void()> callback)
{
this->getIoService().post([this, caller = getSelf(), callback]()
{
std::cout << "Thread '" << name << "': handling startThread." << "\n";
// Execute private setup sequence here
// This is where each thread would implement its specific initialization
if (callback) {
caller->getIoService().post(callback);
}
});
}
void ComponentThread::cleanup(void)
{
this->keepLooping = false;
}
void ComponentThread::exitThreadReq(std::function<void()> callback)
{
// Post to the main io_service
this->getIoService().post([this, caller = getSelf(), callback]()
{
std::cout << "Thread '" << name << "': handling exitThread "
"(main queue)." << std::endl;
cleanup();
// Stop the main io_service to exit the thread
io_service.stop();
if (callback) { caller->getIoService().post(callback); }
});
// Also post to the pause io_service
this->pause_io_service.post([this, caller = getSelf(), callback]()
{
std::cout << "Thread '" << name << "': handling exitThread "
"(pause queue)." << std::endl;
cleanup();
// Stop both io_services to exit the thread
pause_io_service.stop();
io_service.stop();
if (callback) { caller->getIoService().post(callback); }
});
}
void ComponentThread::pauseThreadReq(std::function<void()> callback)
{
this->getIoService().post([this, caller = getSelf(), callback]()
{
std::cout << "Thread '" << name << "': handling pauseThread."
<< std::endl;
if (callback) {
caller->getIoService().post(callback);
}
// Reset the pause io_service before running to ensure it can run again
pause_io_service.reset();
// Run the pause io_service to block this thread
pause_io_service.run();
});
}
void ComponentThread::resumeThreadReq(std::function<void()> callback)
{
// Post to the pause_io_service to unblock the paused thread
pause_io_service.post([this, caller = getSelf(), callback]()
{
std::cout << "Thread '" << name << "': handling resumeThread."
<< std::endl;
if (callback) {
caller->getIoService().post(callback);
}
// Stop the pause_io_service to unblock the thread
pause_io_service.stop();
});
}
static int threadsKilledCount;
void ComponentThread::exceptionInd(ComponentThread& thread)
{
if (this->id != MRNTT)
{
throw std::runtime_error(std::string(__func__)
+ ": invoked on non-mrntt thread " + thread.name);
}
// Post the exception to the mrntt thread.
this->getIoService().post(
[&thread]()
{
std::cerr << "Mrntt: Exception occurred: in thread "
<< thread.name << ". Killing Salmanoff." << "\n";
threadsKilledCount = 0;
for (auto &currThread : ComponentThread::componentThreads)
{
if (currThread->id == MRNTT)
{ continue; }
currThread->exitThreadReq(
[]()
{
++threadsKilledCount;
if (threadsKilledCount < ComponentThread::N_ITEMS - 1)
{ return; }
for (auto &currThreadJ
: ComponentThread::componentThreads)
{
if (currThreadJ->id == MRNTT)
{ continue; }
currThreadJ->thread.join();
}
mrntt::mrntt->getIoService().stop();
}
);
}
}
);
}
} // namespace smo