Files
salmanoff/smocore/mind.cpp
T
hayodea 91ccd16b33 Add Mrntt component; init globalMind in mrntt.initializeReq
This makes the initialization sequence much cleaner and conceptually
well encapsulated.

We also now dynamically allocate the Mind objects. They're allocated
dynamically by Mrntt inside of initializeReq. This means that we no
longer have to worry about jolting and cleaning up the running threads
of global mind object even when we never explicitly called
Mind.initializeReq.

Along with other conceptual improvements to our abstractions, this
patch also gets us to a real "end of program initialization" point
for the first time.
2025-09-14 22:17:19 -04:00

407 lines
10 KiB
C++

#include <iostream>
#include <opts.h>
#include <asynchronousContinuation.h>
#include <asynchronousLoop.h>
#include <mind.h>
#include <componentThread.h>
#include <director/director.h>
#include <simulator/simulator.h>
#include <senseApis/senseApiManager.h>
namespace smo {
Mind::Mind(void)
: componentThreads{
std::make_shared<MindThread>(ComponentThread::DIRECTOR, *this),
std::make_shared<MindThread>(ComponentThread::SIMULATOR, *this),
std::make_shared<MindThread>(ComponentThread::SUBCONSCIOUS, *this),
std::make_shared<MindThread>(ComponentThread::BODY, *this)
#ifndef WORLD_USE_BODY_THREAD
, std::make_shared<MindThread>(ComponentThread::WORLD, *this)
#endif
},
director(*this, componentThreads[0]),
canvas(*this, componentThreads[1]),
subconscious(*this, componentThreads[2]),
body(*this, componentThreads[3]),
world(*this,
#ifndef WORLD_USE_BODY_THREAD
componentThreads[4]
#else
componentThreads[3]
#endif
)
{
}
std::shared_ptr<MindThread>
Mind::getComponentThread(ComponentThread::ThreadId id) const
{
if (id == ComponentThread::MRNTT)
{
throw std::runtime_error(
std::string(__func__) +
": MRNTT is not a MindThread and cannot be returned by "
"getComponentThread");
}
// Search through the vector for the thread with matching id
for (auto& thread : componentThreads) {
if (thread->id == id) { return thread; }
}
// Throw exception if no thread found
throw std::runtime_error(std::string(__func__) +
": No MindThread found with ID "
+ std::to_string(static_cast<int>(id)));
}
std::shared_ptr<MindThread>
Mind::getComponentThread(const std::string& name) const
{
if (name == "mrntt")
{
throw std::runtime_error(
std::string(__func__) +
": MRNTT is not a MindThread and cannot be returned by "
"getComponentThread");
}
for (auto& thread : componentThreads) {
if (thread->name == name) { return thread; }
}
// Throw exception if no thread found
throw std::runtime_error(std::string(__func__) +
": No MindThread found with name '" + name + "'");
}
std::vector<std::shared_ptr<MindThread>>
Mind::getMindThreads() const
{
return componentThreads;
}
class Mind::MindLifetimeMgmtOp
: public AsynchronousContinuation<mindLifetimeMgmtOpCbFn>
{
public:
MindLifetimeMgmtOp(
Mind &parent, mindLifetimeMgmtOpCbFn callback)
: AsynchronousContinuation<mindLifetimeMgmtOpCbFn>(callback),
parent(parent)
{}
void callOriginalCbFn(bool success)
{
if (originalCbFn) {
originalCbFn(success);
}
}
public:
Mind &parent;
public:
void initializeReq1(
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
)
{
std::cout << "Mrntt: All mind threads JOLTed." << "\n";
parent.startAllMindThreadsReq(
std::bind(
&MindLifetimeMgmtOp::initializeReq2,
context.get(), context));
}
void initializeReq2(
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
)
{
std::cout << "Mrntt: All mind threads started." << "\n";
parent.body.initializeReq(
std::bind(
&MindLifetimeMgmtOp::initializeReq3,
context.get(), context, std::placeholders::_1));
}
void initializeReq3(
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context,
bool success
)
{
std::cout << "Mrntt: Body component initialized." << "\n";
callOriginalCbFn(success);
}
void finalizeReq1(
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context,
bool success
)
{
if (!success) {
std::cerr << "Mrntt: Body component failed to finalize." << "\n";
} else {
std::cout << "Mrntt: Body component finalized." << "\n";
}
/* If the threads haven't been jolted, we need to do that first, because
* otherwise they'll just enter their main loops and wait for control
* messages from mrntt after processing the exit request.
*/
parent.joltAllMindThreadsReq(
std::bind(
&MindLifetimeMgmtOp::finalizeReq2,
context.get(), context));
}
void finalizeReq2(
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
)
{
std::cout << "Mrntt: All mind threads JOLTed for finalization." << "\n";
parent.exitAllMindThreadsReq(
std::bind(
&MindLifetimeMgmtOp::finalizeReq3,
context.get(), context));
}
void finalizeReq3(
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
)
{
std::cout << "Mrntt: All mind threads exited." << "\n";
callOriginalCbFn(true);
}
};
void Mind::initializeReq(mindLifetimeMgmtOpCbFn callback)
{
/* Distribute threads across available CPUs */
try
{
distributeAndPinThreadsAcrossCpus();
}
catch (const std::exception& e)
{
std::cerr << "Salmanoff couldn't distribute the mind threads across "
"the CPUs, so performance may be suboptimal.\n"
"Error: " << e.what() << "\n";
}
auto request = std::make_shared<MindLifetimeMgmtOp>(
*this, callback);
/* Jolt the threads, then start them */
joltAllMindThreadsReq(
std::bind(
&MindLifetimeMgmtOp::initializeReq1,
request.get(), request));
}
void Mind::finalizeReq(mindLifetimeMgmtOpCbFn callback)
{
auto request = std::make_shared<MindLifetimeMgmtOp>(
*this, callback);
body.finalizeReq(
std::bind(
&MindLifetimeMgmtOp::finalizeReq1,
request.get(), request, std::placeholders::_1));
}
void Mind::distributeAndPinThreadsAcrossCpus()
{
int cpuCount = ComponentThread::getAvailableCpuCount();
if (OptionParser::getOptions().verbose) {
std::cout << __func__ << ": Available CPUs: " << cpuCount << "\n";
}
// Distribute and pin threads across CPUs
int threadIndex = 0;
for (auto& thread : componentThreads)
{
int targetCpu = threadIndex % cpuCount;
thread->pinToCpu(targetCpu);
++threadIndex;
}
std::cout << __func__ << ": Distributed " << threadIndex << " threads "
<< "across " << cpuCount << " CPUs\n";
}
class Mind::MindThreadLifetimeMgmtOp
: public AsynchronousContinuation<mindThreadLifetimeMgmtOpCbFn>
{
public:
MindThreadLifetimeMgmtOp(
Mind &parent,unsigned int nThreads,
mindThreadLifetimeMgmtOpCbFn callback)
: AsynchronousContinuation<mindThreadLifetimeMgmtOpCbFn>(callback),
loop(nThreads),
parent(parent)
{}
void callOriginalCbFn(void)
{
if (originalCbFn) {
originalCbFn();
}
}
public:
AsynchronousLoop loop;
Mind &parent;
public:
void joltAllMindThreadsReq1(
[[maybe_unused]] std::shared_ptr<MindThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
parent.threadsHaveBeenJolted = true;
callOriginalCbFn();
}
void executeGenericOpOnAllMindThreadsReq1(
[[maybe_unused]] std::shared_ptr<MindThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
callOriginalCbFn();
}
void exitAllMindThreadsReq1(
[[maybe_unused]] std::shared_ptr<MindThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
for (auto& thread : parent.componentThreads) {
thread->thread.join();
}
callOriginalCbFn();
}
};
void Mind::joltAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
{
if (threadsHaveBeenJolted)
{
std::cout << "Mrntt: All mind threads already JOLTed. "
<< "Skipping JOLT request." << "\n";
callback();
return;
}
// Create a counter to track when all threads have been jolted
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->joltThreadReq(
std::bind(
&MindThreadLifetimeMgmtOp::joltAllMindThreadsReq1,
request.get(), request));
}
// If no threads, set flag and call callback immediately
if (request->loop.nTotalIsZero() && callback)
{
threadsHaveBeenJolted = true;
callback();
}
}
// Thread management methods (moved from ComponentThread)
void Mind::startAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
{
// Create a counter to track when all threads have started
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->startThreadReq(
std::bind(
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
request.get(), request));
}
// If no threads, call callback immediately
if (request->loop.nTotalIsZero() && callback) { callback(); }
}
void Mind::pauseAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
{
// Create a counter to track when all threads have paused
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->pauseThreadReq(
std::bind(
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
request.get(), request));
}
// If no threads, call callback immediately
if (request->loop.nTotalIsZero() && callback) { callback(); }
}
void Mind::resumeAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
{
// Create a counter to track when all threads have resumed
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->resumeThreadReq(
std::bind(
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
request.get(), request));
}
// If no threads, call callback immediately
if (request->loop.nTotalIsZero() && callback) { callback(); }
}
void Mind::exitAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
{
// Create a counter to track when all threads have exited
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->exitThreadReq(
std::bind(
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
request.get(), request));
}
// If no threads, call callback immediately
if (request->loop.nTotalIsZero() && callback) { callback(); }
}
} // namespace smo