91ccd16b33
This makes the initialization sequence much cleaner and conceptually well encapsulated. We also now dynamically allocate the Mind objects. They're allocated dynamically by Mrntt inside of initializeReq. This means that we no longer have to worry about jolting and cleaning up the running threads of global mind object even when we never explicitly called Mind.initializeReq. Along with other conceptual improvements to our abstractions, this patch also gets us to a real "end of program initialization" point for the first time.
407 lines
10 KiB
C++
407 lines
10 KiB
C++
#include <iostream>
|
|
#include <opts.h>
|
|
#include <asynchronousContinuation.h>
|
|
#include <asynchronousLoop.h>
|
|
#include <mind.h>
|
|
#include <componentThread.h>
|
|
#include <director/director.h>
|
|
#include <simulator/simulator.h>
|
|
#include <senseApis/senseApiManager.h>
|
|
|
|
namespace smo {
|
|
|
|
Mind::Mind(void)
|
|
: componentThreads{
|
|
std::make_shared<MindThread>(ComponentThread::DIRECTOR, *this),
|
|
std::make_shared<MindThread>(ComponentThread::SIMULATOR, *this),
|
|
std::make_shared<MindThread>(ComponentThread::SUBCONSCIOUS, *this),
|
|
std::make_shared<MindThread>(ComponentThread::BODY, *this)
|
|
#ifndef WORLD_USE_BODY_THREAD
|
|
, std::make_shared<MindThread>(ComponentThread::WORLD, *this)
|
|
#endif
|
|
},
|
|
director(*this, componentThreads[0]),
|
|
canvas(*this, componentThreads[1]),
|
|
subconscious(*this, componentThreads[2]),
|
|
body(*this, componentThreads[3]),
|
|
world(*this,
|
|
#ifndef WORLD_USE_BODY_THREAD
|
|
componentThreads[4]
|
|
#else
|
|
componentThreads[3]
|
|
#endif
|
|
)
|
|
{
|
|
}
|
|
|
|
std::shared_ptr<MindThread>
|
|
Mind::getComponentThread(ComponentThread::ThreadId id) const
|
|
{
|
|
if (id == ComponentThread::MRNTT)
|
|
{
|
|
throw std::runtime_error(
|
|
std::string(__func__) +
|
|
": MRNTT is not a MindThread and cannot be returned by "
|
|
"getComponentThread");
|
|
}
|
|
|
|
// Search through the vector for the thread with matching id
|
|
for (auto& thread : componentThreads) {
|
|
if (thread->id == id) { return thread; }
|
|
}
|
|
|
|
// Throw exception if no thread found
|
|
throw std::runtime_error(std::string(__func__) +
|
|
": No MindThread found with ID "
|
|
+ std::to_string(static_cast<int>(id)));
|
|
}
|
|
|
|
std::shared_ptr<MindThread>
|
|
Mind::getComponentThread(const std::string& name) const
|
|
{
|
|
if (name == "mrntt")
|
|
{
|
|
throw std::runtime_error(
|
|
std::string(__func__) +
|
|
": MRNTT is not a MindThread and cannot be returned by "
|
|
"getComponentThread");
|
|
}
|
|
|
|
for (auto& thread : componentThreads) {
|
|
if (thread->name == name) { return thread; }
|
|
}
|
|
|
|
// Throw exception if no thread found
|
|
throw std::runtime_error(std::string(__func__) +
|
|
": No MindThread found with name '" + name + "'");
|
|
}
|
|
|
|
std::vector<std::shared_ptr<MindThread>>
|
|
Mind::getMindThreads() const
|
|
{
|
|
return componentThreads;
|
|
}
|
|
|
|
class Mind::MindLifetimeMgmtOp
|
|
: public AsynchronousContinuation<mindLifetimeMgmtOpCbFn>
|
|
{
|
|
public:
|
|
MindLifetimeMgmtOp(
|
|
Mind &parent, mindLifetimeMgmtOpCbFn callback)
|
|
: AsynchronousContinuation<mindLifetimeMgmtOpCbFn>(callback),
|
|
parent(parent)
|
|
{}
|
|
|
|
void callOriginalCbFn(bool success)
|
|
{
|
|
if (originalCbFn) {
|
|
originalCbFn(success);
|
|
}
|
|
}
|
|
|
|
public:
|
|
Mind &parent;
|
|
|
|
public:
|
|
void initializeReq1(
|
|
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
std::cout << "Mrntt: All mind threads JOLTed." << "\n";
|
|
|
|
parent.startAllMindThreadsReq(
|
|
std::bind(
|
|
&MindLifetimeMgmtOp::initializeReq2,
|
|
context.get(), context));
|
|
}
|
|
|
|
void initializeReq2(
|
|
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
std::cout << "Mrntt: All mind threads started." << "\n";
|
|
|
|
parent.body.initializeReq(
|
|
std::bind(
|
|
&MindLifetimeMgmtOp::initializeReq3,
|
|
context.get(), context, std::placeholders::_1));
|
|
}
|
|
|
|
void initializeReq3(
|
|
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context,
|
|
bool success
|
|
)
|
|
{
|
|
std::cout << "Mrntt: Body component initialized." << "\n";
|
|
callOriginalCbFn(success);
|
|
}
|
|
|
|
void finalizeReq1(
|
|
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context,
|
|
bool success
|
|
)
|
|
{
|
|
if (!success) {
|
|
std::cerr << "Mrntt: Body component failed to finalize." << "\n";
|
|
} else {
|
|
std::cout << "Mrntt: Body component finalized." << "\n";
|
|
}
|
|
|
|
/* If the threads haven't been jolted, we need to do that first, because
|
|
* otherwise they'll just enter their main loops and wait for control
|
|
* messages from mrntt after processing the exit request.
|
|
*/
|
|
parent.joltAllMindThreadsReq(
|
|
std::bind(
|
|
&MindLifetimeMgmtOp::finalizeReq2,
|
|
context.get(), context));
|
|
}
|
|
|
|
void finalizeReq2(
|
|
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
std::cout << "Mrntt: All mind threads JOLTed for finalization." << "\n";
|
|
|
|
parent.exitAllMindThreadsReq(
|
|
std::bind(
|
|
&MindLifetimeMgmtOp::finalizeReq3,
|
|
context.get(), context));
|
|
}
|
|
|
|
void finalizeReq3(
|
|
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
std::cout << "Mrntt: All mind threads exited." << "\n";
|
|
callOriginalCbFn(true);
|
|
}
|
|
};
|
|
|
|
void Mind::initializeReq(mindLifetimeMgmtOpCbFn callback)
|
|
{
|
|
/* Distribute threads across available CPUs */
|
|
try
|
|
{
|
|
distributeAndPinThreadsAcrossCpus();
|
|
}
|
|
catch (const std::exception& e)
|
|
{
|
|
std::cerr << "Salmanoff couldn't distribute the mind threads across "
|
|
"the CPUs, so performance may be suboptimal.\n"
|
|
"Error: " << e.what() << "\n";
|
|
}
|
|
|
|
auto request = std::make_shared<MindLifetimeMgmtOp>(
|
|
*this, callback);
|
|
|
|
/* Jolt the threads, then start them */
|
|
joltAllMindThreadsReq(
|
|
std::bind(
|
|
&MindLifetimeMgmtOp::initializeReq1,
|
|
request.get(), request));
|
|
}
|
|
|
|
void Mind::finalizeReq(mindLifetimeMgmtOpCbFn callback)
|
|
{
|
|
auto request = std::make_shared<MindLifetimeMgmtOp>(
|
|
*this, callback);
|
|
|
|
body.finalizeReq(
|
|
std::bind(
|
|
&MindLifetimeMgmtOp::finalizeReq1,
|
|
request.get(), request, std::placeholders::_1));
|
|
}
|
|
|
|
void Mind::distributeAndPinThreadsAcrossCpus()
|
|
{
|
|
int cpuCount = ComponentThread::getAvailableCpuCount();
|
|
|
|
if (OptionParser::getOptions().verbose) {
|
|
std::cout << __func__ << ": Available CPUs: " << cpuCount << "\n";
|
|
}
|
|
|
|
// Distribute and pin threads across CPUs
|
|
int threadIndex = 0;
|
|
for (auto& thread : componentThreads)
|
|
{
|
|
int targetCpu = threadIndex % cpuCount;
|
|
thread->pinToCpu(targetCpu);
|
|
++threadIndex;
|
|
}
|
|
|
|
std::cout << __func__ << ": Distributed " << threadIndex << " threads "
|
|
<< "across " << cpuCount << " CPUs\n";
|
|
}
|
|
|
|
class Mind::MindThreadLifetimeMgmtOp
|
|
: public AsynchronousContinuation<mindThreadLifetimeMgmtOpCbFn>
|
|
{
|
|
public:
|
|
MindThreadLifetimeMgmtOp(
|
|
Mind &parent,unsigned int nThreads,
|
|
mindThreadLifetimeMgmtOpCbFn callback)
|
|
: AsynchronousContinuation<mindThreadLifetimeMgmtOpCbFn>(callback),
|
|
loop(nThreads),
|
|
parent(parent)
|
|
{}
|
|
|
|
void callOriginalCbFn(void)
|
|
{
|
|
if (originalCbFn) {
|
|
originalCbFn();
|
|
}
|
|
}
|
|
|
|
public:
|
|
AsynchronousLoop loop;
|
|
Mind &parent;
|
|
|
|
public:
|
|
void joltAllMindThreadsReq1(
|
|
[[maybe_unused]] std::shared_ptr<MindThreadLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
loop.incrementSuccessOrFailureDueTo(true);
|
|
if (!loop.isComplete()) {
|
|
return;
|
|
}
|
|
|
|
parent.threadsHaveBeenJolted = true;
|
|
callOriginalCbFn();
|
|
}
|
|
|
|
void executeGenericOpOnAllMindThreadsReq1(
|
|
[[maybe_unused]] std::shared_ptr<MindThreadLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
loop.incrementSuccessOrFailureDueTo(true);
|
|
if (!loop.isComplete()) {
|
|
return;
|
|
}
|
|
|
|
callOriginalCbFn();
|
|
}
|
|
|
|
void exitAllMindThreadsReq1(
|
|
[[maybe_unused]] std::shared_ptr<MindThreadLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
loop.incrementSuccessOrFailureDueTo(true);
|
|
if (!loop.isComplete()) {
|
|
return;
|
|
}
|
|
|
|
for (auto& thread : parent.componentThreads) {
|
|
thread->thread.join();
|
|
}
|
|
|
|
callOriginalCbFn();
|
|
}
|
|
};
|
|
|
|
void Mind::joltAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
|
|
{
|
|
if (threadsHaveBeenJolted)
|
|
{
|
|
std::cout << "Mrntt: All mind threads already JOLTed. "
|
|
<< "Skipping JOLT request." << "\n";
|
|
callback();
|
|
return;
|
|
}
|
|
|
|
// Create a counter to track when all threads have been jolted
|
|
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
|
|
*this, componentThreads.size(), callback);
|
|
|
|
for (auto& thread : componentThreads)
|
|
{
|
|
thread->joltThreadReq(
|
|
std::bind(
|
|
&MindThreadLifetimeMgmtOp::joltAllMindThreadsReq1,
|
|
request.get(), request));
|
|
}
|
|
|
|
// If no threads, set flag and call callback immediately
|
|
if (request->loop.nTotalIsZero() && callback)
|
|
{
|
|
threadsHaveBeenJolted = true;
|
|
callback();
|
|
}
|
|
}
|
|
|
|
// Thread management methods (moved from ComponentThread)
|
|
void Mind::startAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
|
|
{
|
|
// Create a counter to track when all threads have started
|
|
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
|
|
*this, componentThreads.size(), callback);
|
|
|
|
for (auto& thread : componentThreads)
|
|
{
|
|
thread->startThreadReq(
|
|
std::bind(
|
|
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
|
|
request.get(), request));
|
|
}
|
|
|
|
// If no threads, call callback immediately
|
|
if (request->loop.nTotalIsZero() && callback) { callback(); }
|
|
}
|
|
|
|
void Mind::pauseAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
|
|
{
|
|
// Create a counter to track when all threads have paused
|
|
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
|
|
*this, componentThreads.size(), callback);
|
|
|
|
for (auto& thread : componentThreads)
|
|
{
|
|
thread->pauseThreadReq(
|
|
std::bind(
|
|
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
|
|
request.get(), request));
|
|
}
|
|
|
|
// If no threads, call callback immediately
|
|
if (request->loop.nTotalIsZero() && callback) { callback(); }
|
|
}
|
|
|
|
void Mind::resumeAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
|
|
{
|
|
// Create a counter to track when all threads have resumed
|
|
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
|
|
*this, componentThreads.size(), callback);
|
|
|
|
for (auto& thread : componentThreads)
|
|
{
|
|
thread->resumeThreadReq(
|
|
std::bind(
|
|
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
|
|
request.get(), request));
|
|
}
|
|
|
|
// If no threads, call callback immediately
|
|
if (request->loop.nTotalIsZero() && callback) { callback(); }
|
|
}
|
|
|
|
void Mind::exitAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
|
|
{
|
|
// Create a counter to track when all threads have exited
|
|
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
|
|
*this, componentThreads.size(), callback);
|
|
|
|
for (auto& thread : componentThreads)
|
|
{
|
|
thread->exitThreadReq(
|
|
std::bind(
|
|
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
|
|
request.get(), request));
|
|
}
|
|
|
|
// If no threads, call callback immediately
|
|
if (request->loop.nTotalIsZero() && callback) { callback(); }
|
|
}
|
|
|
|
} // namespace smo
|