#include #include #include #include #include #include #include #include #include #include namespace smo { Mind::Mind(void) : componentThreads{ std::make_shared(ComponentThread::DIRECTOR, *this), std::make_shared(ComponentThread::SIMULATOR, *this), std::make_shared(ComponentThread::SUBCONSCIOUS, *this), std::make_shared(ComponentThread::BODY, *this) #ifndef WORLD_USE_BODY_THREAD , std::make_shared(ComponentThread::WORLD, *this) #endif }, director(*this, componentThreads[0]), canvas(*this, componentThreads[1]), subconscious(*this, componentThreads[2]), body(*this, componentThreads[3]), world(*this, #ifndef WORLD_USE_BODY_THREAD componentThreads[4] #else componentThreads[3] #endif ) { } std::shared_ptr Mind::getComponentThread(ComponentThread::ThreadId id) const { if (id == ComponentThread::MRNTT) { throw std::runtime_error( std::string(__func__) + ": MRNTT is not a MindThread and cannot be returned by " "getComponentThread"); } // Search through the vector for the thread with matching id for (auto& thread : componentThreads) { if (thread->id == id) { return thread; } } // Throw exception if no thread found throw std::runtime_error(std::string(__func__) + ": No MindThread found with ID " + std::to_string(static_cast(id))); } std::shared_ptr Mind::getComponentThread(const std::string& name) const { if (name == "mrntt") { throw std::runtime_error( std::string(__func__) + ": MRNTT is not a MindThread and cannot be returned by " "getComponentThread"); } for (auto& thread : componentThreads) { if (thread->name == name) { return thread; } } // Throw exception if no thread found throw std::runtime_error(std::string(__func__) + ": No MindThread found with name '" + name + "'"); } std::vector> Mind::getMindThreads() const { return componentThreads; } class Mind::MindLifetimeMgmtOp : public TargetedAsynchronousContinuation { public: MindLifetimeMgmtOp( Mind &parent, const std::shared_ptr &caller, mindLifetimeMgmtOpCbFn callback) : TargetedAsynchronousContinuation( caller, callback), parent(parent) {} void callOriginalCbFn(bool success) { if (originalCbFn) { caller->getIoService().post(std::bind(originalCbFn, success)); } } public: Mind &parent; public: void initializeReq1_posted( [[maybe_unused]] std::shared_ptr context ) { /* Jolt the threads, then start them */ parent.joltAllMindThreadsReq( std::bind( &MindLifetimeMgmtOp::initializeReq2, context.get(), context)); } void initializeReq2( [[maybe_unused]] std::shared_ptr context ) { std::cout << "Mrntt: All mind threads JOLTed." << "\n"; parent.startAllMindThreadsReq( std::bind( &MindLifetimeMgmtOp::initializeReq3, context.get(), context)); } void initializeReq3( [[maybe_unused]] std::shared_ptr context ) { std::cout << "Mrntt: All mind threads started." << "\n"; parent.body.initializeReq( std::bind( &MindLifetimeMgmtOp::initializeReq4, context.get(), context, std::placeholders::_1)); } void initializeReq4( [[maybe_unused]] std::shared_ptr context, bool success ) { std::cout << "Mrntt: Body component initialized." << "\n"; callOriginalCbFn(success); } void finalizeReq1_posted( [[maybe_unused]] std::shared_ptr context ) { parent.body.finalizeReq( std::bind( &MindLifetimeMgmtOp::finalizeReq2, context.get(), context, std::placeholders::_1)); } void finalizeReq2( [[maybe_unused]] std::shared_ptr context, bool success ) { if (!success) { std::cerr << "Mrntt: Body component failed to finalize." << "\n"; } else { std::cout << "Mrntt: Body component finalized." << "\n"; } /* If the threads haven't been jolted, we need to do that first, because * otherwise they'll just enter their main loops and wait for control * messages from mrntt after processing the exit request. */ parent.joltAllMindThreadsReq( std::bind( &MindLifetimeMgmtOp::finalizeReq3, context.get(), context)); } void finalizeReq3( [[maybe_unused]] std::shared_ptr context ) { std::cout << "Mrntt: All mind threads JOLTed for finalization." << "\n"; parent.exitAllMindThreadsReq( std::bind( &MindLifetimeMgmtOp::finalizeReq4, context.get(), context)); } void finalizeReq4( [[maybe_unused]] std::shared_ptr context ) { std::cout << "Mrntt: All mind threads exited." << "\n"; callOriginalCbFn(true); } }; void Mind::initializeReq(mindLifetimeMgmtOpCbFn callback) { /* Distribute threads across available CPUs */ try { distributeAndPinThreadsAcrossCpus(); } catch (const std::exception& e) { std::cerr << "Salmanoff couldn't distribute the mind threads across " "the CPUs, so performance may be suboptimal.\n" "Error: " << e.what() << "\n"; } const auto& caller = ComponentThread::getSelf(); auto request = std::make_shared( *this, caller, callback); mrntt::mrntt.thread->getIoService().post( std::bind( &MindLifetimeMgmtOp::initializeReq1_posted, request.get(), request)); } void Mind::finalizeReq(mindLifetimeMgmtOpCbFn callback) { const auto& caller = ComponentThread::getSelf(); auto request = std::make_shared( *this, caller, callback); mrntt::mrntt.thread->getIoService().post( std::bind( &MindLifetimeMgmtOp::finalizeReq1_posted, request.get(), request)); } void Mind::distributeAndPinThreadsAcrossCpus() { int cpuCount = ComponentThread::getAvailableCpuCount(); if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": Available CPUs: " << cpuCount << "\n"; } // Distribute and pin threads across CPUs int threadIndex = 0; for (auto& thread : componentThreads) { int targetCpu = threadIndex % cpuCount; thread->pinToCpu(targetCpu); ++threadIndex; } std::cout << __func__ << ": Distributed " << threadIndex << " threads " << "across " << cpuCount << " CPUs\n"; } class Mind::MindThreadLifetimeMgmtOp : public AsynchronousContinuation { public: MindThreadLifetimeMgmtOp( Mind &parent,unsigned int nThreads, mindThreadLifetimeMgmtOpCbFn callback) : AsynchronousContinuation(callback), loop(nThreads), parent(parent) {} void callOriginalCbFn(void) { if (originalCbFn) { originalCbFn(); } } public: AsynchronousLoop loop; Mind &parent; public: void joltAllMindThreadsReq1( [[maybe_unused]] std::shared_ptr context ) { loop.incrementSuccessOrFailureDueTo(true); if (!loop.isComplete()) { return; } parent.threadsHaveBeenJolted = true; callOriginalCbFn(); } void executeGenericOpOnAllMindThreadsReq1( [[maybe_unused]] std::shared_ptr context ) { loop.incrementSuccessOrFailureDueTo(true); if (!loop.isComplete()) { return; } callOriginalCbFn(); } void exitAllMindThreadsReq1( [[maybe_unused]] std::shared_ptr context ) { loop.incrementSuccessOrFailureDueTo(true); if (!loop.isComplete()) { return; } for (auto& thread : parent.componentThreads) { thread->thread.join(); } callOriginalCbFn(); } }; void Mind::joltAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback) { if (threadsHaveBeenJolted) { std::cout << "Mrntt: All mind threads already JOLTed. " << "Skipping JOLT request." << "\n"; callback(); return; } // If no threads, set flag and call callback immediately if (componentThreads.size() == 0 && callback) { threadsHaveBeenJolted = true; callback(); return; } // Create a counter to track when all threads have been jolted auto request = std::make_shared( *this, componentThreads.size(), callback); for (auto& thread : componentThreads) { thread->joltThreadReq( std::bind( &MindThreadLifetimeMgmtOp::joltAllMindThreadsReq1, request.get(), request)); } } // Thread management methods (moved from ComponentThread) void Mind::startAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback) { // If no threads, call callback immediately if (componentThreads.size() == 0 && callback) { callback(); return; } // Create a counter to track when all threads have started auto request = std::make_shared( *this, componentThreads.size(), callback); for (auto& thread : componentThreads) { thread->startThreadReq( std::bind( &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, request.get(), request)); } } void Mind::pauseAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback) { // If no threads, call callback immediately if (componentThreads.size() == 0 && callback) { callback(); return; } // Create a counter to track when all threads have paused auto request = std::make_shared( *this, componentThreads.size(), callback); for (auto& thread : componentThreads) { thread->pauseThreadReq( std::bind( &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, request.get(), request)); } } void Mind::resumeAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback) { // If no threads, call callback immediately if (componentThreads.size() == 0 && callback) { callback(); return; } // Create a counter to track when all threads have resumed auto request = std::make_shared( *this, componentThreads.size(), callback); for (auto& thread : componentThreads) { thread->resumeThreadReq( std::bind( &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, request.get(), request)); } } void Mind::exitAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback) { // If no threads, call callback immediately if (componentThreads.size() == 0 && callback) { callback(); return; } // Create a counter to track when all threads have exited auto request = std::make_shared( *this, componentThreads.size(), callback); for (auto& thread : componentThreads) { thread->exitThreadReq( std::bind( &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, request.get(), request)); } } } // namespace smo