Files
salmanoff/smocore/componentThread.cpp
T
hayodea 0dc8abaa28 Rework: Modularize Mind
Now we have modularized the Mind class to contain all of its
ComponentThreads. This enables us to run multiple mind instances
within the same SMO process, at least in theory.

We probably won't actually do this, but we want to ensure that the
design is clean enough to enable it.
2025-09-03 14:56:00 -04:00

301 lines
7.9 KiB
C++

#include <unistd.h>
#include <iostream>
#include <pthread.h>
#include <sched.h>
#include <boost/asio.hpp>
#include <mind.h>
#include <componentThread.h>
#include <marionette/marionette.h>
namespace smo {
thread_local std::shared_ptr<ComponentThread> thisComponentThread;
namespace mrntt {
extern std::shared_ptr<ComponentThread> mrntt;
}
// Implementation of static method
std::shared_ptr<ComponentThread> ComponentThread::getMrntt()
{
return mrntt::mrntt;
}
void ComponentThread::initializeTls(void)
{
thisComponentThread = shared_from_this();
}
const std::shared_ptr<ComponentThread> ComponentThread::getSelf(void)
{
if (!thisComponentThread)
{
throw std::runtime_error(std::string(__func__)
+ ": TLS not initialized");
}
return thisComponentThread;
}
void ComponentThread::main(ComponentThread& self)
{
std::cout << self.name << ":" << __func__ << ": Waiting for JOLT" <<"\n";
self.getIoService().run();
self.initializeTls();
std::cout << self.name << ":" << __func__ << ": Entering event loop" <<"\n";
/* We loop here because when an exception is caught, we need to first catch
* it in the catch blocks. We bubble the exception to mrntt in the catch
* blocks, and then we loop here to await control messages from mrntt.
*
* We can't just exit on our own. Rather, we must wait for mrntt to tell us
* to exit. When we wish to finally exit, we set keepLooping to false.
*/
for (self.keepLooping = true; self.keepLooping;)
{
bool sendExceptionInd = false;
try {
self.getIoService().reset();
self.getIoService().run();
}
catch (const std::exception& e)
{
sendExceptionInd = true;
std::cerr << self.name << ":" << __func__
<< ": Exception occurred: " << e.what() << "\n";
}
catch (...)
{
sendExceptionInd = true;
std::cerr << self.name << ":" << __func__
<< ": Unknown exception occurred" << "\n";
}
if (sendExceptionInd) { mrntt::mrntt->exceptionInd(self); }
}
std::cout << self.name << ":" << __func__ << ": Exited event loop" << "\n";
}
// Thread management method implementations
void ComponentThread::startThreadReq(std::function<void()> callback)
{
this->getIoService().post([this, caller = getSelf(), callback]()
{
std::cout << "Thread '" << name << "': handling startThread." << "\n";
// Execute private setup sequence here
// This is where each thread would implement its specific initialization
if (callback) {
caller->getIoService().post(callback);
}
});
}
void ComponentThread::cleanup(void)
{
this->keepLooping = false;
}
void ComponentThread::exitThreadReq(std::function<void()> callback)
{
// Post to the main io_service
this->getIoService().post([this, caller = getSelf(), callback]()
{
std::cout << "Thread '" << name << "': handling exitThread "
"(main queue)." << std::endl;
cleanup();
// Stop the main io_service to exit the thread
io_service.stop();
if (callback) { caller->getIoService().post(callback); }
});
// Also post to the pause io_service
this->pause_io_service.post([this, caller = getSelf(), callback]()
{
std::cout << "Thread '" << name << "': handling exitThread "
"(pause queue)." << std::endl;
cleanup();
// Stop both io_services to exit the thread
pause_io_service.stop();
io_service.stop();
if (callback) { caller->getIoService().post(callback); }
});
}
void ComponentThread::pauseThreadReq(std::function<void()> callback)
{
this->getIoService().post([this, caller = getSelf(), callback]()
{
std::cout << "Thread '" << name << "': handling pauseThread."
<< std::endl;
if (callback) {
caller->getIoService().post(callback);
}
// Reset the pause io_service before running to ensure it can run again
pause_io_service.reset();
// Run the pause io_service to block this thread
pause_io_service.run();
});
}
void ComponentThread::resumeThreadReq(std::function<void()> callback)
{
// Post to the pause_io_service to unblock the paused thread
pause_io_service.post([this, caller = getSelf(), callback]()
{
std::cout << "Thread '" << name << "': handling resumeThread."
<< std::endl;
if (callback) {
caller->getIoService().post(callback);
}
// Stop the pause_io_service to unblock the thread
pause_io_service.stop();
});
}
void ComponentThread::joltThreadReq(std::function<void()> callback)
{
this->getIoService().post([this, caller = getSelf(), callback]()
{
std::cout << "Thread '" << name << "': handling JOLT request." << "\n";
// Stop the main io_service to jolt the thread
io_service.stop();
if (callback) {
caller->getIoService().post(callback);
}
});
}
/* This shouldn't take a callback because the caller shouldn't expect to
* Mrntt to send a reply signal to it. Sending this Indication means that
* Mrntt will send the calling thread an exitThreadReq. When the caller
* processes that exitThreadReq(), the caller will exit its event loop and then
* terminate.
*
* Even if Mrntt sent a RDY response, the caller shouldn't actually be executing
* any longer to receive it anyway.
*/
void ComponentThread::exceptionInd(ComponentThread& thread)
{
if (this->id != ComponentThread::MRNTT)
{
throw std::runtime_error(std::string(__func__)
+ ": invoked on non-mrntt thread " + thread.name);
}
// Post the exception to the mrntt thread.
this->getIoService().post(
[&thread]()
{
std::cerr << "Mrntt: Exception occurred: in thread "
<< thread.name << ". Killing Salmanoff." << "\n";
/** EXPLANATION:
* An exception has occurred in one of a mind's threads. We need to
* shut down all of that particular mind's threads.
*/
thread.parent.finalizeReq([]() {
/** FIXME:
* When we eventually support multiple minds, we should remove this
* since it causes marionette to exit, even if there are other minds
* that are still running.
*/
smo::mrntt::exitMarionetteLoop();
});
});
}
void ComponentThread::userShutdownInd()
{
if (this->id != ComponentThread::MRNTT)
{
throw std::runtime_error(std::string(__func__)
+ ": invoked on non-mrntt thread " + this->name);
}
// Post the user shutdown to the mrntt thread.
this->getIoService().post(
[this]()
{
std::cerr << "Mrntt: User requested shutdown (SIGINT)."
<< " Killing Salmanoff." << "\n";
/** EXPLANATION:
* A user has requested a shutdown. We need to shut down all of the
* threads in all running Minds.
*/
parent.finalizeReq([]() {
/** FIXME:
* When we eventually support multiple minds, we should remove this
* since it causes marionette to exit, even if there are other minds
* that are still running.
*/
smo::mrntt::exitMarionetteLoop();
});
});
}
// CPU management method implementations
int ComponentThread::getAvailableCpuCount()
{
int cpuCount = sysconf(_SC_NPROCESSORS_ONLN);
if (cpuCount <= 0)
{
throw std::runtime_error(std::string(__func__)
+ ": Failed to determine CPU count");
}
// Check if std::thread::hardware_concurrency() matches sysconf result
unsigned int hwConcurrency = std::thread::hardware_concurrency();
if (hwConcurrency != static_cast<unsigned int>(cpuCount))
{
std::cerr << "Warning: CPU count mismatch - "
"std::thread::hardware_concurrency() = "
<< hwConcurrency << ", sysconf(_SC_NPROCESSORS_ONLN) = "
<< cpuCount << "\n";
}
return cpuCount;
}
void ComponentThread::pinToCpu(int cpuId)
{
if (cpuId < 0)
{
throw std::runtime_error(std::string(__func__)
+ ": Invalid CPU ID: " + std::to_string(cpuId));
}
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpuId, &cpuset);
int result = pthread_setaffinity_np(
thread.native_handle(), sizeof(cpu_set_t), &cpuset);
if (result != 0)
{
throw std::runtime_error(std::string(__func__)
+ ": Failed to pin thread to CPU " + std::to_string(cpuId)
+ ": " + std::strerror(result));
}
pinnedCpuId = cpuId;
std::cout << name << ": Pinned to CPU " << cpuId << "\n";
}
} // namespace smo