Files
salmanoff/smocore/mind.cpp
T

560 lines
14 KiB
C++
Raw Normal View History

#include <iostream>
#include <opts.h>
#include <asynchronousContinuation.h>
#include <asynchronousLoop.h>
2024-09-08 01:04:41 +10:00
#include <mind.h>
#include <componentThread.h>
#include <senseApis/senseApiManager.h>
namespace smo {
2025-09-03 14:43:00 -04:00
Mind::Mind(void)
: componentThreads{
std::make_shared<MindThread>(ComponentThread::DIRECTOR, *this),
std::make_shared<MindThread>(ComponentThread::SIMULATOR, *this),
std::make_shared<MindThread>(ComponentThread::SUBCONSCIOUS, *this),
std::make_shared<MindThread>(ComponentThread::BODY, *this),
std::make_shared<MindThread>(ComponentThread::WORLD, *this)
2025-09-03 14:43:00 -04:00
}
{
}
std::shared_ptr<MindThread>
2025-09-03 14:43:00 -04:00
Mind::getComponentThread(ComponentThread::ThreadId id) const
{
if (id == ComponentThread::MRNTT)
{
throw std::runtime_error(
std::string(__func__) +
": MRNTT is not a MindThread and cannot be returned by "
"getComponentThread");
}
2025-09-03 14:43:00 -04:00
// Search through the vector for the thread with matching id
for (auto& thread : componentThreads) {
if (thread->id == id) { return thread; }
}
// Throw exception if no thread found
throw std::runtime_error(std::string(__func__) +
": No MindThread found with ID "
2025-09-03 14:43:00 -04:00
+ std::to_string(static_cast<int>(id)));
}
std::shared_ptr<MindThread>
2025-09-03 14:43:00 -04:00
Mind::getComponentThread(const std::string& name) const
{
if (name == "mrntt")
{
throw std::runtime_error(
std::string(__func__) +
": MRNTT is not a MindThread and cannot be returned by "
"getComponentThread");
}
2025-09-03 14:43:00 -04:00
for (auto& thread : componentThreads) {
if (thread->name == name) { return thread; }
}
// Throw exception if no thread found
throw std::runtime_error(std::string(__func__) +
": No MindThread found with name '" + name + "'");
2025-09-03 14:43:00 -04:00
}
std::vector<std::shared_ptr<MindThread>>
2025-09-03 14:43:00 -04:00
Mind::getMindThreads() const
{
return componentThreads;
}
class Mind::MindLifetimeMgmtOp
: public AsynchronousContinuation<mindLifetimeMgmtOpCbFn>
{
public:
MindLifetimeMgmtOp(
Mind &parent, mindLifetimeMgmtOpCbFn callback)
: AsynchronousContinuation<mindLifetimeMgmtOpCbFn>(callback),
parent(parent)
{}
void callOriginalCbFn(bool success)
{
if (originalCbFn) {
originalCbFn(success);
}
}
public:
Mind &parent;
public:
void initializeReq1(
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
)
{
std::cout << "Mrntt: All mind threads JOLTed." << "\n";
parent.startAllMindThreadsReq(
std::bind(
&MindLifetimeMgmtOp::initializeReq2,
context.get(), context));
}
void initializeReq2(
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
)
{
std::cout << "Mrntt: All mind threads started." << "\n";
parent.initializeBodyReq(
std::bind(
&MindLifetimeMgmtOp::initializeReq3,
context.get(), context, std::placeholders::_1));
}
void initializeReq3(
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context,
bool success
)
{
std::cout << "Mrntt: Body component initialized." << "\n";
callOriginalCbFn(success);
}
void finalizeReq1(
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context,
bool success
)
{
if (!success) {
std::cerr << "Mrntt: Body component failed to finalize." << "\n";
} else {
std::cout << "Mrntt: Body component finalized." << "\n";
}
/* If the threads haven't been jolted, we need to do that first, because
* otherwise they'll just enter their main loops and wait for control
* messages from mrntt after processing the exit request.
*/
parent.joltAllMindThreadsReq(
std::bind(
&MindLifetimeMgmtOp::finalizeReq2,
context.get(), context));
}
void finalizeReq2(
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
)
{
std::cout << "Mrntt: All mind threads JOLTed for finalization." << "\n";
parent.exitAllMindThreadsReq(
std::bind(
&MindLifetimeMgmtOp::finalizeReq3,
context.get(), context));
}
void finalizeReq3(
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
)
{
std::cout << "Mrntt: All mind threads exited." << "\n";
callOriginalCbFn(true);
}
};
void Mind::initializeReq(mindLifetimeMgmtOpCbFn callback)
{
/* Distribute threads across available CPUs */
try
{
distributeAndPinThreadsAcrossCpus();
}
catch (const std::exception& e)
{
std::cerr << "Salmanoff couldn't distribute the mind threads across "
"the CPUs, so performance may be suboptimal.\n"
"Error: " << e.what() << "\n";
}
auto request = std::make_shared<MindLifetimeMgmtOp>(
*this, callback);
/* Jolt the threads, then start them */
joltAllMindThreadsReq(
std::bind(
&MindLifetimeMgmtOp::initializeReq1,
request.get(), request));
}
void Mind::finalizeReq(mindLifetimeMgmtOpCbFn callback)
{
auto request = std::make_shared<MindLifetimeMgmtOp>(
*this, callback);
finalizeBodyReq(
std::bind(
&MindLifetimeMgmtOp::finalizeReq1,
request.get(), request, std::placeholders::_1));
}
class Mind::InitializeBodyReq
: public MindLifetimeMgmtOp, public ContinuationTarget
{
public:
InitializeBodyReq(
Mind &parent, const std::shared_ptr<ComponentThread> &caller,
mindLifetimeMgmtOpCbFn callback)
: MindLifetimeMgmtOp(parent, callback), ContinuationTarget(caller)
{}
void callOriginalCbFn(bool success)
{
if (originalCbFn)
{
caller->getIoService().post(
std::bind(originalCbFn, success));
}
}
public:
void initializeBodyReq1(
[[maybe_unused]] std::shared_ptr<InitializeBodyReq> context
)
{
auto self = ComponentThread::getSelf();
if (self->id != ComponentThread::BODY)
{
throw std::runtime_error(std::string(__func__)
+ ": Must be executed on Body thread");
}
/** EXPLANATION:
* The ComponentThread instance we pass in here is the one that will be
* used by Senseapi libs to perform device-independent background
* operations.
* For example, liblivoxProto1's BroadcastListener will use this thread
* to listen for UDP broadcast dgrams from Livox devices.
*
* Right now we use Marionette, but there's a strong argument for using
* Body instead since it's meant to handle device-management operations.
*/
sense_api::SenseApiManager::getInstance()
.loadAllSenseApiLibsFromOptions(caller);
std::cout << sense_api::SenseApiManager::getInstance().stringifyLibs()
<< std::endl;
if (OptionParser::getOptions().verbose)
{
std::cout << __func__ << ": About to initializeAllSenseApiLibs"
<< '\n';
}
sense_api::SenseApiManager::getInstance().initializeAllSenseApiLibs();
if (OptionParser::getOptions().verbose)
{
std::cout << __func__ << ": About to attachAllSenseDevicesFromSpecs"
<< '\n';
}
sense_api::SenseApiManager::getInstance()
.attachAllSenseDevicesFromSpecsReq(
std::bind(
&InitializeBodyReq::initializeBodyReq2,
context.get(), context,
std::placeholders::_1));
}
void initializeBodyReq2(
[[maybe_unused]] std::shared_ptr<InitializeBodyReq> context,
smo::AsynchronousLoop &results
)
{
parent.bodyComponentInitialized = true;
std::cout << "Mrntt: attached "
<< results.nSucceeded << " of " << results.nTotal
<< " sense devices." << "\n";
callOriginalCbFn(results.nSucceeded == results.nTotal);
}
};
class Mind::FinalizeBodyReq
: public InitializeBodyReq
{
public:
using InitializeBodyReq::InitializeBodyReq;
public:
void finalizeBodyReq1(
[[maybe_unused]] std::shared_ptr<FinalizeBodyReq> context
)
{
auto self = ComponentThread::getSelf();
if (self->id != ComponentThread::BODY)
{
throw std::runtime_error(std::string(__func__)
+ ": Must be executed on Body thread");
}
std::cout << "Mrntt: About to detach all sense devices." << "\n";
sense_api::SenseApiManager::getInstance().detachAllSenseDevicesReq(
std::bind(
&FinalizeBodyReq::finalizeBodyReq2,
context.get(), context,
std::placeholders::_1));
}
void finalizeBodyReq2(
[[maybe_unused]] std::shared_ptr<FinalizeBodyReq> context,
smo::AsynchronousLoop &results
)
{
std::cout << "Mrntt: Successfully detached "
<< results.nSucceeded << " of " << results.nTotal
<< " sense devices." << "\n";
std::cout << "Mrntt: About to unload all sense api libs." << "\n";
sense_api::SenseApiManager::getInstance().unloadAllSenseApiLibs();
callOriginalCbFn(results.nSucceeded == results.nTotal);
}
};
void Mind::initializeBodyReq(mindLifetimeMgmtOpCbFn callback)
{
auto mrntt = ComponentThread::getSelf();
if (mrntt->id != ComponentThread::MRNTT)
{
throw std::runtime_error(std::string(__func__)
+ ": Must be invoked by Mrntt thread");
}
auto request = std::make_shared<InitializeBodyReq>(
*this, mrntt, callback);
this->getComponentThread(ComponentThread::BODY)->getIoService().post(
std::bind(
&InitializeBodyReq::initializeBodyReq1,
request.get(), request));
}
void Mind::finalizeBodyReq(mindLifetimeMgmtOpCbFn callback)
{
auto mrntt = ComponentThread::getSelf();
if (mrntt->id != ComponentThread::MRNTT)
{
throw std::runtime_error(std::string(__func__)
+ ": Must be invoked by Mrntt thread");
}
if (!bodyComponentInitialized)
{
std::cout << "Mrntt: Body component not initialized. "
<< "Skipping finalization." << "\n";
callback(true);
return;
}
auto request = std::make_shared<FinalizeBodyReq>(
*this, mrntt, callback);
this->getComponentThread(ComponentThread::BODY)->getIoService().post(
std::bind(
&FinalizeBodyReq::finalizeBodyReq1,
request.get(), request));
}
2025-09-03 14:43:00 -04:00
void Mind::distributeAndPinThreadsAcrossCpus()
{
int cpuCount = ComponentThread::getAvailableCpuCount();
if (OptionParser::getOptions().verbose) {
std::cout << __func__ << ": Available CPUs: " << cpuCount << "\n";
}
2025-09-03 14:43:00 -04:00
// Distribute and pin threads across CPUs
int threadIndex = 0;
for (auto& thread : componentThreads)
{
int targetCpu = threadIndex % cpuCount;
thread->pinToCpu(targetCpu);
++threadIndex;
}
std::cout << __func__ << ": Distributed " << threadIndex << " threads "
<< "across " << cpuCount << " CPUs\n";
2025-09-03 14:43:00 -04:00
}
class Mind::MindThreadLifetimeMgmtOp
: public AsynchronousContinuation<mindThreadLifetimeMgmtOpCbFn>
{
public:
MindThreadLifetimeMgmtOp(
Mind &parent,unsigned int nThreads,
mindThreadLifetimeMgmtOpCbFn callback)
: AsynchronousContinuation<mindThreadLifetimeMgmtOpCbFn>(callback),
loop(nThreads),
parent(parent)
{}
void callOriginalCbFn(void)
{
if (originalCbFn) {
originalCbFn();
}
}
public:
AsynchronousLoop loop;
Mind &parent;
public:
void joltAllMindThreadsReq1(
[[maybe_unused]] std::shared_ptr<MindThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
parent.threadsHaveBeenJolted = true;
callOriginalCbFn();
}
void executeGenericOpOnAllMindThreadsReq1(
[[maybe_unused]] std::shared_ptr<MindThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
callOriginalCbFn();
}
void exitAllMindThreadsReq1(
[[maybe_unused]] std::shared_ptr<MindThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
for (auto& thread : parent.componentThreads) {
thread->thread.join();
}
callOriginalCbFn();
}
};
void Mind::joltAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
{
if (threadsHaveBeenJolted)
{
std::cout << "Mrntt: All mind threads already JOLTed. "
<< "Skipping JOLT request." << "\n";
callback();
return;
}
// Create a counter to track when all threads have been jolted
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->joltThreadReq(
std::bind(
&MindThreadLifetimeMgmtOp::joltAllMindThreadsReq1,
request.get(), request));
}
// If no threads, set flag and call callback immediately
if (request->loop.nTotalIsZero() && callback)
{
threadsHaveBeenJolted = true;
callback();
}
}
2025-09-03 14:43:00 -04:00
// Thread management methods (moved from ComponentThread)
void Mind::startAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
2025-09-03 14:43:00 -04:00
{
// Create a counter to track when all threads have started
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
2025-09-03 14:43:00 -04:00
for (auto& thread : componentThreads)
{
thread->startThreadReq(
std::bind(
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
request.get(), request));
2025-09-03 14:43:00 -04:00
}
// If no threads, call callback immediately
if (request->loop.nTotalIsZero() && callback) { callback(); }
2025-09-03 14:43:00 -04:00
}
void Mind::pauseAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
2025-09-03 14:43:00 -04:00
{
// Create a counter to track when all threads have paused
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
2025-09-03 14:43:00 -04:00
for (auto& thread : componentThreads)
{
thread->pauseThreadReq(
std::bind(
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
request.get(), request));
2025-09-03 14:43:00 -04:00
}
// If no threads, call callback immediately
if (request->loop.nTotalIsZero() && callback) { callback(); }
2025-09-03 14:43:00 -04:00
}
void Mind::resumeAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
2025-09-03 14:43:00 -04:00
{
// Create a counter to track when all threads have resumed
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
2025-09-03 14:43:00 -04:00
for (auto& thread : componentThreads)
{
thread->resumeThreadReq(
std::bind(
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
request.get(), request));
2025-09-03 14:43:00 -04:00
}
// If no threads, call callback immediately
if (request->loop.nTotalIsZero() && callback) { callback(); }
2025-09-03 14:43:00 -04:00
}
void Mind::exitAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
2025-09-03 14:43:00 -04:00
{
// Create a counter to track when all threads have exited
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
2025-09-03 14:43:00 -04:00
for (auto& thread : componentThreads)
{
thread->exitThreadReq(
std::bind(
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
request.get(), request));
2025-09-03 14:43:00 -04:00
}
// If no threads, call callback immediately
if (request->loop.nTotalIsZero() && callback) { callback(); }
2025-09-03 14:43:00 -04:00
}
} // namespace smo