#include #include #include #include #include #include #include #include #include #include #include #include namespace smo { Mind::Mind(void) : componentThreads{ std::make_shared(ComponentThread::DIRECTOR, *this), std::make_shared(ComponentThread::SIMULATOR, *this), std::make_shared(ComponentThread::SUBCONSCIOUS, *this), std::make_shared(ComponentThread::BODY, *this) #ifndef CONFIG_WORLD_USE_BODY_THREAD , std::make_shared(ComponentThread::WORLD, *this) #endif }, director(*this, componentThreads[0]), canvas(*this, componentThreads[1]), subconscious(*this, componentThreads[2]), body(*this, componentThreads[3]), world(*this, #ifndef CONFIG_WORLD_USE_BODY_THREAD componentThreads[4] #else componentThreads[3] #endif ) { } std::shared_ptr Mind::getComponentThread(ComponentThread::ThreadId id) const { if (id == ComponentThread::MRNTT) { throw std::runtime_error( std::string(__func__) + ": MRNTT is not a MindThread and cannot be returned by " "getComponentThread"); } // Search through the vector for the thread with matching id for (auto& thread : componentThreads) { if (thread->id == id) { return thread; } } // Throw exception if no thread found throw std::runtime_error(std::string(__func__) + ": No MindThread found with ID " + std::to_string(static_cast(id))); } std::shared_ptr Mind::getComponentThread(const std::string& name) const { if (name == "mrntt") { throw std::runtime_error( std::string(__func__) + ": MRNTT is not a MindThread and cannot be returned by " "getComponentThread"); } for (auto& thread : componentThreads) { if (thread->name == name) { return thread; } } // Throw exception if no thread found throw std::runtime_error(std::string(__func__) + ": No MindThread found with name '" + name + "'"); } std::vector> Mind::getMindThreads() const { return componentThreads; } class Mind::MindLifetimeMgmtOp : public PostedAsynchronousContinuation { public: MindLifetimeMgmtOp( Mind &parent, const std::shared_ptr &caller, Callback callback) : PostedAsynchronousContinuation( caller, callback), parent(parent) {} public: Mind &parent; public: void initializeReq1_posted( [[maybe_unused]] std::shared_ptr context ) { /* Jolt the threads, then start them */ parent.joltAllMindThreadsReq( {context, std::bind( &MindLifetimeMgmtOp::initializeReq2, context.get(), context)}); } void initializeReq2( [[maybe_unused]] std::shared_ptr context ) { std::cout << "Mrntt: All mind threads JOLTed." << "\n"; parent.startAllMindThreadsReq( {context, std::bind( &MindLifetimeMgmtOp::initializeReq3, context.get(), context)}); } void initializeReq3( [[maybe_unused]] std::shared_ptr context ) { std::cout << "Mrntt: All mind threads started." << "\n"; parent.body.initializeReq( {context, std::bind( &MindLifetimeMgmtOp::initializeReq4, context.get(), context, std::placeholders::_1)}); } void initializeReq4( [[maybe_unused]] std::shared_ptr context, bool success ) { std::cout << "Mrntt: Body component initialized." << "\n"; callOriginalCb(success); } void finalizeReq1_posted( [[maybe_unused]] std::shared_ptr context ) { parent.body.finalizeReq( {context, std::bind( &MindLifetimeMgmtOp::finalizeReq2, context.get(), context, std::placeholders::_1)}); } void finalizeReq2( [[maybe_unused]] std::shared_ptr context, bool success ) { if (!success) { std::cerr << "Mrntt: Body component failed to finalize." << "\n"; } else { std::cout << "Mrntt: Body component finalized." << "\n"; } /* If the threads haven't been jolted, we need to do that first, because * otherwise they'll just enter their main loops and wait for control * messages from mrntt after processing the exit request. */ parent.joltAllMindThreadsReq( {context, std::bind( &MindLifetimeMgmtOp::finalizeReq3, context.get(), context)}); } void finalizeReq3( [[maybe_unused]] std::shared_ptr context ) { std::cout << "Mrntt: All mind threads JOLTed for finalization." << "\n"; parent.exitAllMindThreadsReq( {context, std::bind( &MindLifetimeMgmtOp::finalizeReq4, context.get(), context)}); } void finalizeReq4( [[maybe_unused]] std::shared_ptr context ) { std::cout << "Mrntt: All mind threads exited." << "\n"; callOriginalCb(true); } }; void Mind::initializeReq(Callback callback) { /* Distribute threads across available CPUs */ try { distributeAndPinThreadsAcrossCpus(); } catch (const std::exception& e) { std::cerr << "Salmanoff couldn't distribute the mind threads across " "the CPUs, so performance may be suboptimal.\n" "Error: " << e.what() << "\n"; } const auto& caller = ComponentThread::getSelf(); auto request = std::make_shared( *this, caller, callback); mrntt::mrntt.thread->getIoService().post( STC(std::bind( &MindLifetimeMgmtOp::initializeReq1_posted, request.get(), request))); } void Mind::finalizeReq(Callback callback) { const auto& caller = ComponentThread::getSelf(); auto request = std::make_shared( *this, caller, callback); mrntt::mrntt.thread->getIoService().post( STC(std::bind( &MindLifetimeMgmtOp::finalizeReq1_posted, request.get(), request))); } void Mind::distributeAndPinThreadsAcrossCpus() { int cpuCount = ComponentThread::getAvailableCpuCount(); if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": Available CPUs: " << cpuCount << "\n"; } // Distribute and pin threads across CPUs int threadIndex = 0; for (auto& thread : componentThreads) { int targetCpu = threadIndex % cpuCount; thread->pinToCpu(targetCpu); ++threadIndex; } std::cout << __func__ << ": Distributed " << threadIndex << " threads " << "across " << cpuCount << " CPUs\n"; } class Mind::MindThreadLifetimeMgmtOp : public NonPostedAsynchronousContinuation { public: MindThreadLifetimeMgmtOp( Mind &parent,unsigned int nThreads, Callback callback) : NonPostedAsynchronousContinuation(callback), loop(nThreads), parent(parent) {} public: AsynchronousLoop loop; Mind &parent; public: void joltAllMindThreadsReq1( [[maybe_unused]] std::shared_ptr context ) { loop.incrementSuccessOrFailureDueTo(true); if (!loop.isComplete()) { return; } parent.threadsHaveBeenJolted = true; callOriginalCb(); } void executeGenericOpOnAllMindThreadsReq1( [[maybe_unused]] std::shared_ptr context ) { loop.incrementSuccessOrFailureDueTo(true); if (!loop.isComplete()) { return; } callOriginalCb(); } void exitAllMindThreadsReq1( [[maybe_unused]] std::shared_ptr context ) { loop.incrementSuccessOrFailureDueTo(true); if (!loop.isComplete()) { return; } for (auto& thread : parent.componentThreads) { thread->thread.join(); } callOriginalCb(); } }; void Mind::joltAllMindThreadsReq( Callback callback ) { if (threadsHaveBeenJolted) { std::cout << "Mrntt: All mind threads already JOLTed. " << "Skipping JOLT request." << "\n"; callback.callbackFn(); return; } // If no threads, set flag and call callback immediately if (componentThreads.size() == 0 && callback.callbackFn) { threadsHaveBeenJolted = true; callback.callbackFn(); return; } // Create a counter to track when all threads have been jolted auto request = std::make_shared( *this, componentThreads.size(), callback); for (auto& thread : componentThreads) { thread->joltThreadReq( {request, std::bind( &MindThreadLifetimeMgmtOp::joltAllMindThreadsReq1, request.get(), request)}); } } // Thread management methods (moved from ComponentThread) void Mind::startAllMindThreadsReq( Callback callback ) { // If no threads, call callback immediately if (componentThreads.size() == 0 && callback.callbackFn) { callback.callbackFn(); return; } // Create a counter to track when all threads have started auto request = std::make_shared( *this, componentThreads.size(), callback); for (auto& thread : componentThreads) { thread->startThreadReq( {request, std::bind( &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, request.get(), request)}); } } void Mind::pauseAllMindThreadsReq( Callback callback ) { // If no threads, call callback immediately if (componentThreads.size() == 0 && callback.callbackFn) { callback.callbackFn(); return; } // Create a counter to track when all threads have paused auto request = std::make_shared( *this, componentThreads.size(), callback); for (auto& thread : componentThreads) { thread->pauseThreadReq( {request, std::bind( &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, request.get(), request)}); } } void Mind::resumeAllMindThreadsReq( Callback callback ) { // If no threads, call callback immediately if (componentThreads.size() == 0 && callback.callbackFn) { callback.callbackFn(); return; } // Create a counter to track when all threads have resumed auto request = std::make_shared( *this, componentThreads.size(), callback); for (auto& thread : componentThreads) { thread->resumeThreadReq( {request, std::bind( &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, request.get(), request)}); } } void Mind::exitAllMindThreadsReq( Callback callback ) { // If no threads, call callback immediately if (componentThreads.size() == 0 && callback.callbackFn) { callback.callbackFn(); return; } // Create a counter to track when all threads have exited auto request = std::make_shared( *this, componentThreads.size(), callback); for (auto& thread : componentThreads) { thread->exitThreadReq( {request, std::bind( &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, request.get(), request)}); } } } // namespace smo