#include #include #include #include #include #include #include #include namespace smo { thread_local std::shared_ptr thisComponentThread; namespace mrntt { extern std::shared_ptr mrntt; } // Implementation of static method std::shared_ptr ComponentThread::getMrntt() { return mrntt::mrntt; } void ComponentThread::initializeTls(void) { thisComponentThread = shared_from_this(); } const std::shared_ptr ComponentThread::getSelf(void) { if (!thisComponentThread) { throw std::runtime_error(std::string(__func__) + ": TLS not initialized"); } return thisComponentThread; } void ComponentThread::main(ComponentThread& self) { std::cout << self.name << ":" << __func__ << ": Waiting for JOLT" <<"\n"; self.getIoService().run(); self.initializeTls(); std::cout << self.name << ":" << __func__ << ": Entering event loop" <<"\n"; /* We loop here because when an exception is caught, we need to first catch * it in the catch blocks. We bubble the exception to mrntt in the catch * blocks, and then we loop here to await control messages from mrntt. * * We can't just exit on our own. Rather, we must wait for mrntt to tell us * to exit. When we wish to finally exit, we set keepLooping to false. */ for (self.keepLooping = true; self.keepLooping;) { bool sendExceptionInd = false; try { /** EXPLANATION: * This reset() call is crucial for async bridging patterns * to work. * When the outermost thread's io_service is stop()ped (e.g., * from JOLT sequence), it won't process any new work until * reset() is called, even if nested async operations try to * post work to it. This means async bridges invoked from * the outermost thread main sequence won't work until this * reset() call. */ self.getIoService().reset(); self.getIoService().run(); } catch (const std::exception& e) { sendExceptionInd = true; std::cerr << self.name << ":" << __func__ << ": Exception occurred: " << e.what() << "\n"; } catch (...) { sendExceptionInd = true; std::cerr << self.name << ":" << __func__ << ": Unknown exception occurred" << "\n"; } if (sendExceptionInd) { mrntt::mrntt->exceptionInd(self); } } std::cout << self.name << ":" << __func__ << ": Exited event loop" << "\n"; } // Thread management method implementations void ComponentThread::startThreadReq(std::function callback) { this->getIoService().post([this, caller = getSelf(), callback]() { std::cout << "Thread '" << name << "': handling startThread." << "\n"; // Execute private setup sequence here // This is where each thread would implement its specific initialization if (callback) { caller->getIoService().post(callback); } }); } void ComponentThread::cleanup(void) { this->keepLooping = false; } void ComponentThread::exitThreadReq(std::function callback) { // Post to the main io_service this->getIoService().post([this, caller = getSelf(), callback]() { std::cout << "Thread '" << name << "': handling exitThread " "(main queue)." << std::endl; cleanup(); // Stop the main io_service to exit the thread io_service.stop(); if (callback) { caller->getIoService().post(callback); } }); // Also post to the pause io_service this->pause_io_service.post([this, caller = getSelf(), callback]() { std::cout << "Thread '" << name << "': handling exitThread " "(pause queue)." << std::endl; cleanup(); // Stop both io_services to exit the thread pause_io_service.stop(); io_service.stop(); if (callback) { caller->getIoService().post(callback); } }); } void ComponentThread::pauseThreadReq(std::function callback) { this->getIoService().post([this, caller = getSelf(), callback]() { std::cout << "Thread '" << name << "': handling pauseThread." << std::endl; if (callback) { caller->getIoService().post(callback); } // Reset the pause io_service before running to ensure it can run again pause_io_service.reset(); // Run the pause io_service to block this thread pause_io_service.run(); }); } void ComponentThread::resumeThreadReq(std::function callback) { // Post to the pause_io_service to unblock the paused thread pause_io_service.post([this, caller = getSelf(), callback]() { std::cout << "Thread '" << name << "': handling resumeThread." << std::endl; if (callback) { caller->getIoService().post(callback); } // Stop the pause_io_service to unblock the thread pause_io_service.stop(); }); } void ComponentThread::joltThreadReq(std::function callback) { this->getIoService().post([this, caller = getSelf(), callback]() { std::cout << "Thread '" << name << "': handling JOLT request." << "\n"; // Stop the main io_service to jolt the thread io_service.stop(); if (callback) { caller->getIoService().post(callback); } }); } /* This shouldn't take a callback because the caller shouldn't expect to * Mrntt to send a reply signal to it. Sending this Indication means that * Mrntt will send the calling thread an exitThreadReq. When the caller * processes that exitThreadReq(), the caller will exit its event loop and then * terminate. * * Even if Mrntt sent a RDY response, the caller shouldn't actually be executing * any longer to receive it anyway. */ void ComponentThread::exceptionInd(ComponentThread& thread) { if (this->id != ComponentThread::MRNTT) { throw std::runtime_error(std::string(__func__) + ": invoked on non-mrntt thread " + thread.name); } // Post the exception to the mrntt thread. this->getIoService().post( [&thread]() { std::cerr << "Mrntt: Exception occurred: in thread " << thread.name << ". Killing Salmanoff." << "\n"; /** EXPLANATION: * An exception has occurred in one of a mind's threads. We need to * shut down all of that particular mind's threads. */ thread.parent.finalizeReq([]() { /** FIXME: * When we eventually support multiple minds, we should remove this * since it causes marionette to exit, even if there are other minds * that are still running. */ smo::mrntt::exitMarionetteLoop(); }); }); } void ComponentThread::userShutdownInd() { if (this->id != ComponentThread::MRNTT) { throw std::runtime_error(std::string(__func__) + ": invoked on non-mrntt thread " + this->name); } // Post the user shutdown to the mrntt thread. this->getIoService().post( [this]() { std::cerr << "Mrntt: User requested shutdown (SIGINT)." << " Killing Salmanoff." << "\n"; /** EXPLANATION: * A user has requested a shutdown. We need to shut down all of the * threads in all running Minds. */ parent.finalizeReq([]() { /** FIXME: * When we eventually support multiple minds, we should remove this * since it causes marionette to exit, even if there are other minds * that are still running. */ smo::mrntt::exitMarionetteLoop(); }); }); } // CPU management method implementations int ComponentThread::getAvailableCpuCount() { int cpuCount = sysconf(_SC_NPROCESSORS_ONLN); if (cpuCount <= 0) { throw std::runtime_error(std::string(__func__) + ": Failed to determine CPU count"); } // Check if std::thread::hardware_concurrency() matches sysconf result unsigned int hwConcurrency = std::thread::hardware_concurrency(); if (hwConcurrency != static_cast(cpuCount)) { std::cerr << "Warning: CPU count mismatch - " "std::thread::hardware_concurrency() = " << hwConcurrency << ", sysconf(_SC_NPROCESSORS_ONLN) = " << cpuCount << "\n"; } return cpuCount; } void ComponentThread::pinToCpu(int cpuId) { if (cpuId < 0) { throw std::runtime_error(std::string(__func__) + ": Invalid CPU ID: " + std::to_string(cpuId)); } cpu_set_t cpuset; CPU_ZERO(&cpuset); CPU_SET(cpuId, &cpuset); int result = pthread_setaffinity_np( thread.native_handle(), sizeof(cpu_set_t), &cpuset); if (result != 0) { throw std::runtime_error(std::string(__func__) + ": Failed to pin thread to CPU " + std::to_string(cpuId) + ": " + std::strerror(result)); } pinnedCpuId = cpuId; std::cout << name << ": Pinned to CPU " << cpuId << "\n"; } } // namespace smo