da0ef64f62
We now allocate globalMind locally inside of marionetteMain. Why? Before now, we had an asymmetric threading situation where the globalMind's threads were initialized at during global constructor invocation and not on demand. This meant that we had to shut down those threads even if we had never got to the point of calling Mind::initializeReq. This significantly complicated our shutdown sequence since we had to factor in the lifetime of the std::thread objects inside of the ComponentThreads which were inside of the globalMind object. Now, if we hadn't called Mind::initializeReq, we don't have to perform any Mind::finalizeReq or adjacent operations. Shutdown is symmetrically mirrored against the operations we actually performed during execution. We introduced some complexity by splitting ComponentThreads into two derivative types (MindThread and MarionetteThread) but I think in the long term we'll be able to massage this split into a much cleaner situation overall.
560 lines
14 KiB
C++
560 lines
14 KiB
C++
#include <iostream>
|
|
#include <opts.h>
|
|
#include <asynchronousContinuation.h>
|
|
#include <asynchronousLoop.h>
|
|
#include <mind.h>
|
|
#include <componentThread.h>
|
|
#include <senseApis/senseApiManager.h>
|
|
|
|
namespace smo {
|
|
|
|
Mind::Mind(void)
|
|
: componentThreads{
|
|
std::make_shared<MindThread>(ComponentThread::DIRECTOR, *this),
|
|
std::make_shared<MindThread>(ComponentThread::SIMULATOR, *this),
|
|
std::make_shared<MindThread>(ComponentThread::SUBCONSCIOUS, *this),
|
|
std::make_shared<MindThread>(ComponentThread::BODY, *this),
|
|
std::make_shared<MindThread>(ComponentThread::WORLD, *this)
|
|
}
|
|
{
|
|
}
|
|
|
|
std::shared_ptr<MindThread>
|
|
Mind::getComponentThread(ComponentThread::ThreadId id) const
|
|
{
|
|
if (id == ComponentThread::MRNTT)
|
|
{
|
|
throw std::runtime_error(
|
|
std::string(__func__) +
|
|
": MRNTT is not a MindThread and cannot be returned by "
|
|
"getComponentThread");
|
|
}
|
|
|
|
// Search through the vector for the thread with matching id
|
|
for (auto& thread : componentThreads) {
|
|
if (thread->id == id) { return thread; }
|
|
}
|
|
|
|
// Throw exception if no thread found
|
|
throw std::runtime_error(std::string(__func__) +
|
|
": No MindThread found with ID "
|
|
+ std::to_string(static_cast<int>(id)));
|
|
}
|
|
|
|
std::shared_ptr<MindThread>
|
|
Mind::getComponentThread(const std::string& name) const
|
|
{
|
|
if (name == "mrntt")
|
|
{
|
|
throw std::runtime_error(
|
|
std::string(__func__) +
|
|
": MRNTT is not a MindThread and cannot be returned by "
|
|
"getComponentThread");
|
|
}
|
|
|
|
for (auto& thread : componentThreads) {
|
|
if (thread->name == name) { return thread; }
|
|
}
|
|
|
|
// Throw exception if no thread found
|
|
throw std::runtime_error(std::string(__func__) +
|
|
": No MindThread found with name '" + name + "'");
|
|
}
|
|
|
|
std::vector<std::shared_ptr<MindThread>>
|
|
Mind::getMindThreads() const
|
|
{
|
|
return componentThreads;
|
|
}
|
|
|
|
class Mind::MindLifetimeMgmtOp
|
|
: public AsynchronousContinuation<mindLifetimeMgmtOpCbFn>
|
|
{
|
|
public:
|
|
MindLifetimeMgmtOp(
|
|
Mind &parent, mindLifetimeMgmtOpCbFn callback)
|
|
: AsynchronousContinuation<mindLifetimeMgmtOpCbFn>(callback),
|
|
parent(parent)
|
|
{}
|
|
|
|
void callOriginalCbFn(bool success)
|
|
{
|
|
if (originalCbFn) {
|
|
originalCbFn(success);
|
|
}
|
|
}
|
|
|
|
public:
|
|
Mind &parent;
|
|
|
|
public:
|
|
void initializeReq1(
|
|
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
std::cout << "Mrntt: All mind threads JOLTed." << "\n";
|
|
|
|
parent.startAllMindThreadsReq(
|
|
std::bind(
|
|
&MindLifetimeMgmtOp::initializeReq2,
|
|
context.get(), context));
|
|
}
|
|
|
|
void initializeReq2(
|
|
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
std::cout << "Mrntt: All mind threads started." << "\n";
|
|
|
|
parent.initializeBodyReq(
|
|
std::bind(
|
|
&MindLifetimeMgmtOp::initializeReq3,
|
|
context.get(), context, std::placeholders::_1));
|
|
}
|
|
|
|
void initializeReq3(
|
|
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context,
|
|
bool success
|
|
)
|
|
{
|
|
std::cout << "Mrntt: Body component initialized." << "\n";
|
|
callOriginalCbFn(success);
|
|
}
|
|
|
|
void finalizeReq1(
|
|
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context,
|
|
bool success
|
|
)
|
|
{
|
|
if (!success) {
|
|
std::cerr << "Mrntt: Body component failed to finalize." << "\n";
|
|
} else {
|
|
std::cout << "Mrntt: Body component finalized." << "\n";
|
|
}
|
|
|
|
/* If the threads haven't been jolted, we need to do that first, because
|
|
* otherwise they'll just enter their main loops and wait for control
|
|
* messages from mrntt after processing the exit request.
|
|
*/
|
|
parent.joltAllMindThreadsReq(
|
|
std::bind(
|
|
&MindLifetimeMgmtOp::finalizeReq2,
|
|
context.get(), context));
|
|
}
|
|
|
|
void finalizeReq2(
|
|
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
std::cout << "Mrntt: All mind threads JOLTed for finalization." << "\n";
|
|
|
|
parent.exitAllMindThreadsReq(
|
|
std::bind(
|
|
&MindLifetimeMgmtOp::finalizeReq3,
|
|
context.get(), context));
|
|
}
|
|
|
|
void finalizeReq3(
|
|
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
std::cout << "Mrntt: All mind threads exited." << "\n";
|
|
callOriginalCbFn(true);
|
|
}
|
|
};
|
|
|
|
void Mind::initializeReq(mindLifetimeMgmtOpCbFn callback)
|
|
{
|
|
/* Distribute threads across available CPUs */
|
|
try
|
|
{
|
|
distributeAndPinThreadsAcrossCpus();
|
|
}
|
|
catch (const std::exception& e)
|
|
{
|
|
std::cerr << "Salmanoff couldn't distribute the mind threads across "
|
|
"the CPUs, so performance may be suboptimal.\n"
|
|
"Error: " << e.what() << "\n";
|
|
}
|
|
|
|
auto request = std::make_shared<MindLifetimeMgmtOp>(
|
|
*this, callback);
|
|
|
|
/* Jolt the threads, then start them */
|
|
joltAllMindThreadsReq(
|
|
std::bind(
|
|
&MindLifetimeMgmtOp::initializeReq1,
|
|
request.get(), request));
|
|
}
|
|
|
|
void Mind::finalizeReq(mindLifetimeMgmtOpCbFn callback)
|
|
{
|
|
auto request = std::make_shared<MindLifetimeMgmtOp>(
|
|
*this, callback);
|
|
|
|
finalizeBodyReq(
|
|
std::bind(
|
|
&MindLifetimeMgmtOp::finalizeReq1,
|
|
request.get(), request, std::placeholders::_1));
|
|
}
|
|
|
|
class Mind::InitializeBodyReq
|
|
: public MindLifetimeMgmtOp, public ContinuationTarget
|
|
{
|
|
public:
|
|
InitializeBodyReq(
|
|
Mind &parent, const std::shared_ptr<ComponentThread> &caller,
|
|
mindLifetimeMgmtOpCbFn callback)
|
|
: MindLifetimeMgmtOp(parent, callback), ContinuationTarget(caller)
|
|
{}
|
|
|
|
void callOriginalCbFn(bool success)
|
|
{
|
|
if (originalCbFn)
|
|
{
|
|
caller->getIoService().post(
|
|
std::bind(originalCbFn, success));
|
|
}
|
|
}
|
|
|
|
public:
|
|
void initializeBodyReq1(
|
|
[[maybe_unused]] std::shared_ptr<InitializeBodyReq> context
|
|
)
|
|
{
|
|
auto self = ComponentThread::getSelf();
|
|
if (self->id != ComponentThread::BODY)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": Must be executed on Body thread");
|
|
}
|
|
|
|
/** EXPLANATION:
|
|
* The ComponentThread instance we pass in here is the one that will be
|
|
* used by Senseapi libs to perform device-independent background
|
|
* operations.
|
|
* For example, liblivoxProto1's BroadcastListener will use this thread
|
|
* to listen for UDP broadcast dgrams from Livox devices.
|
|
*
|
|
* Right now we use Marionette, but there's a strong argument for using
|
|
* Body instead since it's meant to handle device-management operations.
|
|
*/
|
|
sense_api::SenseApiManager::getInstance()
|
|
.loadAllSenseApiLibsFromOptions(caller);
|
|
|
|
std::cout << sense_api::SenseApiManager::getInstance().stringifyLibs()
|
|
<< std::endl;
|
|
|
|
if (OptionParser::getOptions().verbose)
|
|
{
|
|
std::cout << __func__ << ": About to initializeAllSenseApiLibs"
|
|
<< '\n';
|
|
}
|
|
sense_api::SenseApiManager::getInstance().initializeAllSenseApiLibs();
|
|
|
|
if (OptionParser::getOptions().verbose)
|
|
{
|
|
std::cout << __func__ << ": About to attachAllSenseDevicesFromSpecs"
|
|
<< '\n';
|
|
}
|
|
sense_api::SenseApiManager::getInstance()
|
|
.attachAllSenseDevicesFromSpecsReq(
|
|
std::bind(
|
|
&InitializeBodyReq::initializeBodyReq2,
|
|
context.get(), context,
|
|
std::placeholders::_1));
|
|
}
|
|
|
|
void initializeBodyReq2(
|
|
[[maybe_unused]] std::shared_ptr<InitializeBodyReq> context,
|
|
smo::AsynchronousLoop &results
|
|
)
|
|
{
|
|
parent.bodyComponentInitialized = true;
|
|
std::cout << "Mrntt: attached "
|
|
<< results.nSucceeded << " of " << results.nTotal
|
|
<< " sense devices." << "\n";
|
|
|
|
callOriginalCbFn(results.nSucceeded == results.nTotal);
|
|
}
|
|
};
|
|
|
|
class Mind::FinalizeBodyReq
|
|
: public InitializeBodyReq
|
|
{
|
|
public:
|
|
using InitializeBodyReq::InitializeBodyReq;
|
|
|
|
public:
|
|
void finalizeBodyReq1(
|
|
[[maybe_unused]] std::shared_ptr<FinalizeBodyReq> context
|
|
)
|
|
{
|
|
auto self = ComponentThread::getSelf();
|
|
if (self->id != ComponentThread::BODY)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": Must be executed on Body thread");
|
|
}
|
|
|
|
std::cout << "Mrntt: About to detach all sense devices." << "\n";
|
|
sense_api::SenseApiManager::getInstance().detachAllSenseDevicesReq(
|
|
std::bind(
|
|
&FinalizeBodyReq::finalizeBodyReq2,
|
|
context.get(), context,
|
|
std::placeholders::_1));
|
|
}
|
|
|
|
void finalizeBodyReq2(
|
|
[[maybe_unused]] std::shared_ptr<FinalizeBodyReq> context,
|
|
smo::AsynchronousLoop &results
|
|
)
|
|
{
|
|
std::cout << "Mrntt: Successfully detached "
|
|
<< results.nSucceeded << " of " << results.nTotal
|
|
<< " sense devices." << "\n";
|
|
|
|
std::cout << "Mrntt: About to unload all sense api libs." << "\n";
|
|
sense_api::SenseApiManager::getInstance().unloadAllSenseApiLibs();
|
|
callOriginalCbFn(results.nSucceeded == results.nTotal);
|
|
}
|
|
};
|
|
|
|
void Mind::initializeBodyReq(mindLifetimeMgmtOpCbFn callback)
|
|
{
|
|
auto mrntt = ComponentThread::getSelf();
|
|
|
|
if (mrntt->id != ComponentThread::MRNTT)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": Must be invoked by Mrntt thread");
|
|
}
|
|
|
|
auto request = std::make_shared<InitializeBodyReq>(
|
|
*this, mrntt, callback);
|
|
|
|
this->getComponentThread(ComponentThread::BODY)->getIoService().post(
|
|
std::bind(
|
|
&InitializeBodyReq::initializeBodyReq1,
|
|
request.get(), request));
|
|
}
|
|
|
|
void Mind::finalizeBodyReq(mindLifetimeMgmtOpCbFn callback)
|
|
{
|
|
auto mrntt = ComponentThread::getSelf();
|
|
|
|
if (mrntt->id != ComponentThread::MRNTT)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": Must be invoked by Mrntt thread");
|
|
}
|
|
|
|
if (!bodyComponentInitialized)
|
|
{
|
|
std::cout << "Mrntt: Body component not initialized. "
|
|
<< "Skipping finalization." << "\n";
|
|
callback(true);
|
|
return;
|
|
}
|
|
|
|
auto request = std::make_shared<FinalizeBodyReq>(
|
|
*this, mrntt, callback);
|
|
|
|
this->getComponentThread(ComponentThread::BODY)->getIoService().post(
|
|
std::bind(
|
|
&FinalizeBodyReq::finalizeBodyReq1,
|
|
request.get(), request));
|
|
}
|
|
|
|
void Mind::distributeAndPinThreadsAcrossCpus()
|
|
{
|
|
int cpuCount = ComponentThread::getAvailableCpuCount();
|
|
|
|
if (OptionParser::getOptions().verbose) {
|
|
std::cout << __func__ << ": Available CPUs: " << cpuCount << "\n";
|
|
}
|
|
|
|
// Distribute and pin threads across CPUs
|
|
int threadIndex = 0;
|
|
for (auto& thread : componentThreads)
|
|
{
|
|
int targetCpu = threadIndex % cpuCount;
|
|
thread->pinToCpu(targetCpu);
|
|
++threadIndex;
|
|
}
|
|
|
|
std::cout << __func__ << ": Distributed " << threadIndex << " threads "
|
|
<< "across " << cpuCount << " CPUs\n";
|
|
}
|
|
|
|
class Mind::MindThreadLifetimeMgmtOp
|
|
: public AsynchronousContinuation<mindThreadLifetimeMgmtOpCbFn>
|
|
{
|
|
public:
|
|
MindThreadLifetimeMgmtOp(
|
|
Mind &parent,unsigned int nThreads,
|
|
mindThreadLifetimeMgmtOpCbFn callback)
|
|
: AsynchronousContinuation<mindThreadLifetimeMgmtOpCbFn>(callback),
|
|
loop(nThreads),
|
|
parent(parent)
|
|
{}
|
|
|
|
void callOriginalCbFn(void)
|
|
{
|
|
if (originalCbFn) {
|
|
originalCbFn();
|
|
}
|
|
}
|
|
|
|
public:
|
|
AsynchronousLoop loop;
|
|
Mind &parent;
|
|
|
|
public:
|
|
void joltAllMindThreadsReq1(
|
|
[[maybe_unused]] std::shared_ptr<MindThreadLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
loop.incrementSuccessOrFailureDueTo(true);
|
|
if (!loop.isComplete()) {
|
|
return;
|
|
}
|
|
|
|
parent.threadsHaveBeenJolted = true;
|
|
callOriginalCbFn();
|
|
}
|
|
|
|
void executeGenericOpOnAllMindThreadsReq1(
|
|
[[maybe_unused]] std::shared_ptr<MindThreadLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
loop.incrementSuccessOrFailureDueTo(true);
|
|
if (!loop.isComplete()) {
|
|
return;
|
|
}
|
|
|
|
callOriginalCbFn();
|
|
}
|
|
|
|
void exitAllMindThreadsReq1(
|
|
[[maybe_unused]] std::shared_ptr<MindThreadLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
loop.incrementSuccessOrFailureDueTo(true);
|
|
if (!loop.isComplete()) {
|
|
return;
|
|
}
|
|
|
|
for (auto& thread : parent.componentThreads) {
|
|
thread->thread.join();
|
|
}
|
|
|
|
callOriginalCbFn();
|
|
}
|
|
};
|
|
|
|
void Mind::joltAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
|
|
{
|
|
if (threadsHaveBeenJolted)
|
|
{
|
|
std::cout << "Mrntt: All mind threads already JOLTed. "
|
|
<< "Skipping JOLT request." << "\n";
|
|
callback();
|
|
return;
|
|
}
|
|
|
|
// Create a counter to track when all threads have been jolted
|
|
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
|
|
*this, componentThreads.size(), callback);
|
|
|
|
for (auto& thread : componentThreads)
|
|
{
|
|
thread->joltThreadReq(
|
|
std::bind(
|
|
&MindThreadLifetimeMgmtOp::joltAllMindThreadsReq1,
|
|
request.get(), request));
|
|
}
|
|
|
|
// If no threads, set flag and call callback immediately
|
|
if (request->loop.nTotalIsZero() && callback)
|
|
{
|
|
threadsHaveBeenJolted = true;
|
|
callback();
|
|
}
|
|
}
|
|
|
|
// Thread management methods (moved from ComponentThread)
|
|
void Mind::startAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
|
|
{
|
|
// Create a counter to track when all threads have started
|
|
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
|
|
*this, componentThreads.size(), callback);
|
|
|
|
for (auto& thread : componentThreads)
|
|
{
|
|
thread->startThreadReq(
|
|
std::bind(
|
|
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
|
|
request.get(), request));
|
|
}
|
|
|
|
// If no threads, call callback immediately
|
|
if (request->loop.nTotalIsZero() && callback) { callback(); }
|
|
}
|
|
|
|
void Mind::pauseAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
|
|
{
|
|
// Create a counter to track when all threads have paused
|
|
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
|
|
*this, componentThreads.size(), callback);
|
|
|
|
for (auto& thread : componentThreads)
|
|
{
|
|
thread->pauseThreadReq(
|
|
std::bind(
|
|
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
|
|
request.get(), request));
|
|
}
|
|
|
|
// If no threads, call callback immediately
|
|
if (request->loop.nTotalIsZero() && callback) { callback(); }
|
|
}
|
|
|
|
void Mind::resumeAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
|
|
{
|
|
// Create a counter to track when all threads have resumed
|
|
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
|
|
*this, componentThreads.size(), callback);
|
|
|
|
for (auto& thread : componentThreads)
|
|
{
|
|
thread->resumeThreadReq(
|
|
std::bind(
|
|
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
|
|
request.get(), request));
|
|
}
|
|
|
|
// If no threads, call callback immediately
|
|
if (request->loop.nTotalIsZero() && callback) { callback(); }
|
|
}
|
|
|
|
void Mind::exitAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
|
|
{
|
|
// Create a counter to track when all threads have exited
|
|
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
|
|
*this, componentThreads.size(), callback);
|
|
|
|
for (auto& thread : componentThreads)
|
|
{
|
|
thread->exitThreadReq(
|
|
std::bind(
|
|
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
|
|
request.get(), request));
|
|
}
|
|
|
|
// If no threads, call callback immediately
|
|
if (request->loop.nTotalIsZero() && callback) { callback(); }
|
|
}
|
|
|
|
} // namespace smo
|