#include #include #include namespace smo { Mind::Mind(void) : componentThreads{ std::make_shared(ComponentThread::DIRECTOR, *this), std::make_shared(ComponentThread::SIMULATOR, *this), std::make_shared(ComponentThread::SUBCONSCIOUS, *this), std::make_shared(ComponentThread::BODY, *this), std::make_shared(ComponentThread::WORLD, *this) } { } void Mind::initialize() { /* Distribute threads across available CPUs */ try { distributeAndPinThreadsAcrossCpus(); } catch (const std::exception& e) { std::cerr << "Salmanoff couldn't distribute the mind threads across " "the CPUs, so performance may be suboptimal.\n" "Error: " << e.what() << "\n"; } /* Jolt the threads, then start them */ joltAllMindThreadsReq( [this]() { std::cout << "Mrntt: All mind threads JOLTed." << "\n"; // Start all threads after JOLTing startAllMindThreadsReq( []() { std::cout << "Mrntt: All mind threads started." << "\n"; } ); } ); } void Mind::finalizeReq(std::function callback) { /* If the threads haven't been jolted, we need to do that first, because * otherwise they'll just enter their main loops and wait for control * messages from mrntt after processing the exit request. */ if (!threadsHaveBeenJolted) { joltAllMindThreadsReq( [this, callback]() { exitAllMindThreadsReq( [callback]() { std::cout << "Mrntt: All mind threads exited." << "\n"; if (callback) { callback(); } } ); } ); } else { exitAllMindThreadsReq( [callback]() { std::cout << "Mrntt: All mind threads exited." << "\n"; if (callback) { callback(); } } ); } } void Mind::joltAllMindThreadsReq(std::function callback) { // Create a counter to track when all threads have been jolted auto counter = std::make_shared>(componentThreads.size()); for (auto& thread : componentThreads) { thread->joltThreadReq([counter, callback, this]() { if (--(*counter) == 0) { // Set the flag only after all threads have ACKed their JOLT threadsHaveBeenJolted = true; if (callback) { callback(); } } }); } // If no threads, set flag and call callback immediately if (componentThreads.empty()) { threadsHaveBeenJolted = true; if (callback) { callback(); } } } std::shared_ptr Mind::getComponentThread(ComponentThread::ThreadId id) const { // Access the global marionette thread using ComponentThread::getMrntt() if (id == ComponentThread::MRNTT) { return ComponentThread::getMrntt(); } // Search through the vector for the thread with matching id for (auto& thread : componentThreads) { if (thread->id == id) { return thread; } } // Throw exception if no thread found throw std::runtime_error(std::string(__func__) + ": No ComponentThread found with ID " + std::to_string(static_cast(id))); } std::shared_ptr Mind::getComponentThread(const std::string& name) const { if (name == "mrntt") { return ComponentThread::getMrntt(); } for (auto& thread : componentThreads) { if (thread->name == name) { return thread; } } // Throw exception if no thread found throw std::runtime_error(std::string(__func__) + ": No ComponentThread found with name '" + name + "'"); } std::vector> Mind::getMindThreads() const { return componentThreads; } void Mind::distributeAndPinThreadsAcrossCpus() { int cpuCount = ComponentThread::getAvailableCpuCount(); std::cout << "Available CPUs: " << cpuCount << "\n"; // Distribute and pin threads across CPUs int threadIndex = 0; for (auto& thread : componentThreads) { int targetCpu = threadIndex % cpuCount; thread->pinToCpu(targetCpu); ++threadIndex; } std::cout << "Distributed " << threadIndex << " threads across " << cpuCount << " CPUs\n"; } // Thread management methods (moved from ComponentThread) void Mind::startAllMindThreadsReq(std::function callback) { // Create a counter to track when all threads have started auto counter = std::make_shared>(componentThreads.size()); for (auto& thread : componentThreads) { thread->startThreadReq([counter, callback]() { if (--(*counter) == 0 && callback) { callback(); } }); } // If no threads, call callback immediately if (componentThreads.empty() && callback) { callback(); } } void Mind::pauseAllMindThreadsReq(std::function callback) { // Create a counter to track when all threads have paused auto counter = std::make_shared>(componentThreads.size()); for (auto& thread : componentThreads) { thread->pauseThreadReq([counter, callback]() { if (--(*counter) == 0 && callback) { callback(); } }); } // If no threads, call callback immediately if (componentThreads.empty() && callback) { callback(); } } void Mind::resumeAllMindThreadsReq(std::function callback) { // Create a counter to track when all threads have resumed auto counter = std::make_shared>(componentThreads.size()); for (auto& thread : componentThreads) { thread->resumeThreadReq([counter, callback]() { if (--(*counter) == 0 && callback) { callback(); } }); } // If no threads, call callback immediately if (componentThreads.empty() && callback) { callback(); } } void Mind::exitAllMindThreadsReq(std::function callback) { // Create a counter to track when all threads have exited auto counter = std::make_shared>(componentThreads.size()); for (auto& thread : componentThreads) { thread->exitThreadReq([counter, callback, this]() { if (--(*counter) == 0) { // All threads have exited their loops, now join them for (auto& t : componentThreads) { t->thread.join(); } if (callback) { callback(); } } }); } // If no threads, call callback immediately if (componentThreads.empty() && callback) { callback(); } } } // namespace smo