Files
salmanoff/smocore/componentThread.cpp
T

463 lines
12 KiB
C++

#include <unistd.h>
#include <iostream>
#include <pthread.h>
#include <sched.h>
#include <boost/asio.hpp>
#include <mind.h>
#include <componentThread.h>
namespace smo {
thread_local std::shared_ptr<ComponentThread> thisComponentThread;
const std::string ComponentThread::threadNames[N_ITEMS] =
{
"mrntt",
"director",
"simulator",
"subconscious",
"body",
"world"
};
namespace mrntt {
std::shared_ptr<ComponentThread> mrntt =
std::make_shared<ComponentThread>(ComponentThread::MRNTT);
}
namespace director {
/* The director is the seat of volition in Salmanoff. It receives sensor
* events from the body and world, and uses them to direct its implexors
* to implex new menties. It then loads the menties into canvas for simulation
* and correlation with intrins, in order to form new attrimotions and
* menties.
*/
std::shared_ptr<ComponentThread> director =
std::make_shared<ComponentThread>(ComponentThread::DIRECTOR);
}
namespace simulator {
/* The canvas is the simulation engine in Salmanoff. It receives menties and
* simulates them in accordance with the instructions from director. It then
* re-renders them into perception for director to get feedback.
*/
std::shared_ptr<ComponentThread> canvas =
std::make_shared<ComponentThread>(ComponentThread::SIMULATOR);
}
namespace subconscious {
/* The subconscious is the seat of memory in Salmanoff. It receives menties
* from director and stores them in memory for later recall.
*/
std::shared_ptr<ComponentThread> subconscious =
std::make_shared<ComponentThread>(ComponentThread::SUBCONSCIOUS);
}
namespace body {
/* The body is a thread that polls, processes, and sends interoceptive sensor
* events to director. It enables these events to occur asynchronously,
* indepdendent any actions that the other threads are taking.
*/
std::shared_ptr<ComponentThread> body =
std::make_shared<ComponentThread>(ComponentThread::BODY);
}
namespace world {
/* The world performs the same functions as the body, but for extrospective
* sensor events.
*/
std::shared_ptr<ComponentThread> world =
std::make_shared<ComponentThread>(ComponentThread::WORLD);
}
// Initialize static state
std::atomic<bool> ComponentThread::threadsHaveBeenJolted{false};
std::array<std::shared_ptr<ComponentThread>, ComponentThread::N_ITEMS>
ComponentThread::componentThreads =
{
mrntt::mrntt,
director::director,
simulator::canvas,
subconscious::subconscious,
body::body,
world::world
};
void ComponentThread::initializeTls(void)
{
thisComponentThread = shared_from_this();
}
const std::shared_ptr<ComponentThread> ComponentThread::getSelf(void)
{
if (!thisComponentThread)
{
throw std::runtime_error(std::string(__func__)
+ ": TLS not initialized");
}
return thisComponentThread;
}
void ComponentThread::main(ComponentThread& self)
{
std::cout << self.name << ":" << __func__ << ": Waiting for JOLT" <<"\n";
self.getIoService().run();
self.initializeTls();
std::cout << self.name << ":" << __func__ << ": Entering event loop" <<"\n";
/* We loop here because when an exception is caught, we need to first catch
* it in the catch blocks. We bubble the exception to mrntt in the catch
* blocks, and then we loop here to await control messages from mrntt.
*
* We can't just exit on our own. Rather, we must wait for mrntt to tell us
* to exit. When we wish to finally exit, we set keepLooping to false.
*/
for (self.keepLooping = true; self.keepLooping;)
{
bool sendExceptionInd = false;
try {
self.getIoService().reset();
self.getIoService().run();
}
catch (const std::exception& e)
{
sendExceptionInd = true;
std::cerr << self.name << ":" << __func__
<< ": Exception occurred: " << e.what() << "\n";
}
catch (...)
{
sendExceptionInd = true;
std::cerr << self.name << ":" << __func__
<< ": Unknown exception occurred" << "\n";
}
if (sendExceptionInd) { mrntt::mrntt->exceptionInd(self); }
}
std::cout << self.name << ":" << __func__ << ": Exited event loop" << "\n";
}
// Thread management method implementations
void ComponentThread::startThreadReq(std::function<void()> callback)
{
this->getIoService().post([this, caller = getSelf(), callback]()
{
std::cout << "Thread '" << name << "': handling startThread." << "\n";
// Execute private setup sequence here
// This is where each thread would implement its specific initialization
if (callback) {
caller->getIoService().post(callback);
}
});
}
void ComponentThread::cleanup(void)
{
this->keepLooping = false;
}
void ComponentThread::exitThreadReq(std::function<void()> callback)
{
// Post to the main io_service
this->getIoService().post([this, caller = getSelf(), callback]()
{
std::cout << "Thread '" << name << "': handling exitThread "
"(main queue)." << std::endl;
cleanup();
// Stop the main io_service to exit the thread
io_service.stop();
if (callback) { caller->getIoService().post(callback); }
});
// Also post to the pause io_service
this->pause_io_service.post([this, caller = getSelf(), callback]()
{
std::cout << "Thread '" << name << "': handling exitThread "
"(pause queue)." << std::endl;
cleanup();
// Stop both io_services to exit the thread
pause_io_service.stop();
io_service.stop();
if (callback) { caller->getIoService().post(callback); }
});
}
void ComponentThread::pauseThreadReq(std::function<void()> callback)
{
this->getIoService().post([this, caller = getSelf(), callback]()
{
std::cout << "Thread '" << name << "': handling pauseThread."
<< std::endl;
if (callback) {
caller->getIoService().post(callback);
}
// Reset the pause io_service before running to ensure it can run again
pause_io_service.reset();
// Run the pause io_service to block this thread
pause_io_service.run();
});
}
void ComponentThread::resumeThreadReq(std::function<void()> callback)
{
// Post to the pause_io_service to unblock the paused thread
pause_io_service.post([this, caller = getSelf(), callback]()
{
std::cout << "Thread '" << name << "': handling resumeThread."
<< std::endl;
if (callback) {
caller->getIoService().post(callback);
}
// Stop the pause_io_service to unblock the thread
pause_io_service.stop();
});
}
void ComponentThread::joltThreadReq(std::function<void()> callback)
{
this->getIoService().post([this, caller = getSelf(), callback]()
{
std::cout << "Thread '" << name << "': handling JOLT request." << "\n";
// Stop the main io_service to jolt the thread
io_service.stop();
if (callback) {
caller->getIoService().post(callback);
}
});
}
struct AllMindThreadsOpReqContext {
AllMindThreadsOpReqContext() : nThreadsProcessed(0) {}
int nThreadsProcessed;
};
static const std::string getOpName(ComponentThread::ThreadOp op)
{
if (op < (ComponentThread::ThreadOp)0
|| op > ComponentThread::ThreadOp::JOLT)
{
throw std::runtime_error(std::string(__func__)
+ ": Invalid operation");
}
switch (op)
{
case ComponentThread::ThreadOp::START: return "starting";
case ComponentThread::ThreadOp::PAUSE: return "pausing";
case ComponentThread::ThreadOp::RESUME: return "resuming";
case ComponentThread::ThreadOp::EXIT: return "exiting";
case ComponentThread::ThreadOp::JOLT: return "jolting";
default: return "unknown";
}
}
void ComponentThread::execOpOnAllMindThreadsReq(
ThreadOp op, std::function<void()> callback
)
{
std::shared_ptr<ComponentThread> self = getSelf();
// Check that we're being called from the marionette thread
if (self->id != MRNTT)
{
throw std::runtime_error(std::string(__func__)
+ ": invoked on non-mrntt thread " + self->name);
}
std::cout << "Mrntt: " << getOpName(op) << " all mind threads." << "\n";
auto context = std::make_shared<AllMindThreadsOpReqContext>();
const int N_THREADS_EXCEPT_MRNTT = ComponentThread::N_ITEMS - 1;
for (auto &currThread : ComponentThread::componentThreads)
{
if (currThread->id == ComponentThread::MRNTT)
{ continue; }
auto threadCallback = [context, callback, N_THREADS_EXCEPT_MRNTT, op]()
{
++context->nThreadsProcessed;
if (context->nThreadsProcessed < N_THREADS_EXCEPT_MRNTT)
{ return; }
if (op == ThreadOp::EXIT)
{
// Special cleanup for exit operations
for (auto &currThreadJ : ComponentThread::componentThreads)
{
if (currThreadJ->id == ComponentThread::MRNTT)
{ continue; }
currThreadJ->thread.join();
}
}
std::cout << "Mrntt: all mind threads done " << getOpName(op) << "."
<< "\n";
if (callback) { callback(); }
};
switch (op) {
case ThreadOp::START:
currThread->startThreadReq(threadCallback);
break;
case ThreadOp::PAUSE:
currThread->pauseThreadReq(threadCallback);
break;
case ThreadOp::RESUME:
currThread->resumeThreadReq(threadCallback);
break;
case ThreadOp::EXIT:
currThread->exitThreadReq(threadCallback);
break;
case ThreadOp::JOLT:
currThread->joltThreadReq(threadCallback);
break;
default:
throw std::runtime_error("Invalid operation");
}
}
}
void ComponentThread::startAllMindThreadsReq(std::function<void()> callback)
{
execOpOnAllMindThreadsReq(ThreadOp::START, callback);
}
void ComponentThread::pauseAllMindThreadsReq(std::function<void()> callback)
{
execOpOnAllMindThreadsReq(ThreadOp::PAUSE, callback);
}
void ComponentThread::resumeAllMindThreadsReq(std::function<void()> callback)
{
execOpOnAllMindThreadsReq(ThreadOp::RESUME, callback);
}
void ComponentThread::exitAllMindThreadsReq(std::function<void()> callback)
{
execOpOnAllMindThreadsReq(ThreadOp::EXIT, callback);
}
void ComponentThread::joltAllMindThreadsReq(std::function<void()> callback)
{
execOpOnAllMindThreadsReq(ThreadOp::JOLT, callback);
}
/* This shouldn't take a callback because the caller shouldn't expect to
* Mrntt to send a reply signal to it. Sending this Indication means that
* Mrntt will send the calling thread an exitThreadReq. When the caller
* processes that exitThreadReq(), the caller will exit its event loop and then
* terminate.
*
* Even if Mrntt sent a RDY response, the caller shouldn't actually be executing
* any longer to receive it anyway.
*/
void ComponentThread::exceptionInd(ComponentThread& thread)
{
if (this->id != ComponentThread::MRNTT)
{
throw std::runtime_error(std::string(__func__)
+ ": invoked on non-mrntt thread " + thread.name);
}
// Post the exception to the mrntt thread.
this->getIoService().post(
[&thread]()
{
std::cerr << "Mrntt: Exception occurred: in thread "
<< thread.name << ". Killing Salmanoff." << "\n";
smo::mind.finalizeReq(
[]()
{
mrntt::mrntt->keepLooping = false;
mrntt::mrntt->getIoService().stop();
std::cout << "Mrntt: Signaled main loop to exit." << "\n";
});
});
}
// CPU management method implementations
int ComponentThread::getAvailableCpuCount()
{
int cpuCount = sysconf(_SC_NPROCESSORS_ONLN);
if (cpuCount <= 0)
{
throw std::runtime_error(std::string(__func__)
+ ": Failed to determine CPU count");
}
// Check if std::thread::hardware_concurrency() matches sysconf result
unsigned int hwConcurrency = std::thread::hardware_concurrency();
if (hwConcurrency != static_cast<unsigned int>(cpuCount))
{
std::cerr << "Warning: CPU count mismatch - "
"std::thread::hardware_concurrency() = "
<< hwConcurrency << ", sysconf(_SC_NPROCESSORS_ONLN) = "
<< cpuCount << "\n";
}
return cpuCount;
}
void ComponentThread::pinToCpu(int cpuId)
{
if (cpuId < 0)
{
throw std::runtime_error(std::string(__func__)
+ ": Invalid CPU ID: " + std::to_string(cpuId));
}
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpuId, &cpuset);
int result = pthread_setaffinity_np(
thread.native_handle(), sizeof(cpu_set_t), &cpuset);
if (result != 0)
{
throw std::runtime_error(std::string(__func__)
+ ": Failed to pin thread to CPU " + std::to_string(cpuId)
+ ": " + std::strerror(result));
}
pinnedCpuId = cpuId;
std::cout << name << ": Pinned to CPU " << cpuId << "\n";
}
void ComponentThread::distributeAndPinThreadsAcrossCpus()
{
int cpuCount = getAvailableCpuCount();
std::cout << "Available CPUs: " << cpuCount << "\n";
// Skip the marionette thread (MRNTT) as it's the control thread
int threadIndex = 0;
for (auto& thread : componentThreads)
{
if (thread->id == MRNTT) { continue; }
int targetCpu = threadIndex % cpuCount;
thread->pinToCpu(targetCpu);
++threadIndex;
}
std::cout << "Distributed " << (threadIndex) << " threads across "
<< cpuCount << " CPUs\n";
}
} // namespace smo