#include #include #include #include #include #include namespace smo { Mind::Mind(void) : componentThreads{ std::make_shared(ComponentThread::DIRECTOR, *this), std::make_shared(ComponentThread::SIMULATOR, *this), std::make_shared(ComponentThread::SUBCONSCIOUS, *this), std::make_shared(ComponentThread::BODY, *this), std::make_shared(ComponentThread::WORLD, *this) } { } std::shared_ptr Mind::getComponentThread(ComponentThread::ThreadId id) const { // Access the global marionette thread using ComponentThread::getMrntt() if (id == ComponentThread::MRNTT) { return ComponentThread::getMrntt(); } // Search through the vector for the thread with matching id for (auto& thread : componentThreads) { if (thread->id == id) { return thread; } } // Throw exception if no thread found throw std::runtime_error(std::string(__func__) + ": No ComponentThread found with ID " + std::to_string(static_cast(id))); } std::shared_ptr Mind::getComponentThread(const std::string& name) const { if (name == "mrntt") { return ComponentThread::getMrntt(); } for (auto& thread : componentThreads) { if (thread->name == name) { return thread; } } // Throw exception if no thread found throw std::runtime_error(std::string(__func__) + ": No ComponentThread found with name '" + name + "'"); } std::vector> Mind::getMindThreads() const { return componentThreads; } class Mind::MindLifetimeMgmtOp : public AsynchronousContinuation { public: MindLifetimeMgmtOp( Mind &parent, mindLifetimeMgmtOpCbFn callback) : AsynchronousContinuation(callback), parent(parent) {} void callOriginalCbFn(void) { if (originalCbFn) { originalCbFn(true); } } public: Mind &parent; public: void initializeReq1( [[maybe_unused]] std::shared_ptr context ) { std::cout << "Mrntt: All mind threads JOLTed." << "\n"; parent.startAllMindThreadsReq( std::bind( &MindLifetimeMgmtOp::initializeReq2, context.get(), context)); } void initializeReq2( [[maybe_unused]] std::shared_ptr context ) { std::cout << "Mrntt: All mind threads started." << "\n"; callOriginalCbFn(); } void finalizeReq1( [[maybe_unused]] std::shared_ptr context ) { std::cout << "Mrntt: All mind threads JOLTed." << "\n"; parent.exitAllMindThreadsReq( std::bind( &MindLifetimeMgmtOp::finalizeReq2, context.get(), context)); } void finalizeReq2( [[maybe_unused]] std::shared_ptr context ) { std::cout << "Mrntt: All mind threads exited." << "\n"; callOriginalCbFn(); } }; void Mind::initializeReq(mindLifetimeMgmtOpCbFn callback) { /* Distribute threads across available CPUs */ try { distributeAndPinThreadsAcrossCpus(); } catch (const std::exception& e) { std::cerr << "Salmanoff couldn't distribute the mind threads across " "the CPUs, so performance may be suboptimal.\n" "Error: " << e.what() << "\n"; } auto request = std::make_shared( *this, callback); /* Jolt the threads, then start them */ joltAllMindThreadsReq( std::bind( &MindLifetimeMgmtOp::initializeReq1, request.get(), request)); } void Mind::finalizeReq(mindLifetimeMgmtOpCbFn callback) { auto request = std::make_shared( *this, callback); /* If the threads haven't been jolted, we need to do that first, because * otherwise they'll just enter their main loops and wait for control * messages from mrntt after processing the exit request. */ if (!threadsHaveBeenJolted) { joltAllMindThreadsReq( std::bind( &MindLifetimeMgmtOp::finalizeReq1, request.get(), request)); } else { exitAllMindThreadsReq( std::bind( &MindLifetimeMgmtOp::finalizeReq1, request.get(), request)); } } void Mind::distributeAndPinThreadsAcrossCpus() { int cpuCount = ComponentThread::getAvailableCpuCount(); if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": Available CPUs: " << cpuCount << "\n"; } // Distribute and pin threads across CPUs int threadIndex = 0; for (auto& thread : componentThreads) { int targetCpu = threadIndex % cpuCount; thread->pinToCpu(targetCpu); ++threadIndex; } std::cout << __func__ << ": Distributed " << threadIndex << " threads " << "across " << cpuCount << " CPUs\n"; } class Mind::MindThreadLifetimeMgmtOp : public AsynchronousContinuation { public: MindThreadLifetimeMgmtOp( Mind &parent,unsigned int nThreads, mindThreadLifetimeMgmtOpCbFn callback) : AsynchronousContinuation(callback), loop(nThreads), parent(parent) {} void callOriginalCbFn(void) { if (originalCbFn) { originalCbFn(); } } public: AsynchronousLoop loop; Mind &parent; public: void joltAllMindThreadsReq1( [[maybe_unused]] std::shared_ptr context ) { loop.incrementSuccessOrFailureDueTo(true); if (!loop.isComplete()) { return; } parent.threadsHaveBeenJolted = true; callOriginalCbFn(); } void executeGenericOpOnAllMindThreadsReq1( [[maybe_unused]] std::shared_ptr context ) { loop.incrementSuccessOrFailureDueTo(true); if (!loop.isComplete()) { return; } callOriginalCbFn(); } void exitAllMindThreadsReq1( [[maybe_unused]] std::shared_ptr context ) { loop.incrementSuccessOrFailureDueTo(true); if (!loop.isComplete()) { return; } for (auto& thread : parent.componentThreads) { thread->thread.join(); } callOriginalCbFn(); } }; void Mind::joltAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback) { // Create a counter to track when all threads have been jolted auto request = std::make_shared( *this, componentThreads.size(), callback); for (auto& thread : componentThreads) { thread->joltThreadReq( std::bind( &MindThreadLifetimeMgmtOp::joltAllMindThreadsReq1, request.get(), request)); } // If no threads, set flag and call callback immediately if (request->loop.nTotalIsZero() && callback) { threadsHaveBeenJolted = true; callback(); } } // Thread management methods (moved from ComponentThread) void Mind::startAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback) { // Create a counter to track when all threads have started auto request = std::make_shared( *this, componentThreads.size(), callback); for (auto& thread : componentThreads) { thread->startThreadReq( std::bind( &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, request.get(), request)); } // If no threads, call callback immediately if (request->loop.nTotalIsZero() && callback) { callback(); } } void Mind::pauseAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback) { // Create a counter to track when all threads have paused auto request = std::make_shared( *this, componentThreads.size(), callback); for (auto& thread : componentThreads) { thread->pauseThreadReq( std::bind( &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, request.get(), request)); } // If no threads, call callback immediately if (request->loop.nTotalIsZero() && callback) { callback(); } } void Mind::resumeAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback) { // Create a counter to track when all threads have resumed auto request = std::make_shared( *this, componentThreads.size(), callback); for (auto& thread : componentThreads) { thread->resumeThreadReq( std::bind( &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, request.get(), request)); } // If no threads, call callback immediately if (request->loop.nTotalIsZero() && callback) { callback(); } } void Mind::exitAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback) { // Create a counter to track when all threads have exited auto request = std::make_shared( *this, componentThreads.size(), callback); for (auto& thread : componentThreads) { thread->exitThreadReq( std::bind( &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, request.get(), request)); } // If no threads, call callback immediately if (request->loop.nTotalIsZero() && callback) { callback(); } } } // namespace smo