#include #include #include #include #include #include #include #include #include #include #include namespace smo { thread_local std::shared_ptr thisComponentThread; // Implementation of static method std::shared_ptr ComponentThread::getMrntt() { return mrntt::thread; } void MarionetteThread::initializeTls(void) { thisComponentThread = shared_from_this(); } void MindThread::initializeTls(void) { thisComponentThread = shared_from_this(); } const std::shared_ptr ComponentThread::getSelf(void) { if (!thisComponentThread) { throw std::runtime_error(std::string(__func__) + ": TLS not initialized"); } return thisComponentThread; } void MindThread::main(MindThread& self) { if (OptionParser::getOptions().verbose) { std::cout << self.name << ":" << __func__ << ": Waiting for JOLT" <<"\n"; } self.getIoService().run(); self.initializeTls(); std::cout << self.name << ":" << __func__ << ": Entering event loop" <<"\n"; /* We loop here because when an exception is caught, we need to first catch * it in the catch blocks. We bubble the exception to mrntt in the catch * blocks, and then we loop here to await control messages from mrntt. * * We can't just exit on our own. Rather, we must wait for mrntt to tell us * to exit. When we wish to finally exit, we set keepLooping to false. */ for (self.keepLooping = true; self.keepLooping;) { bool sendExceptionInd = false; try { /** EXPLANATION: * This reset() call is crucial for async bridging patterns * to work. * When the outermost thread's io_service is stop()ped (e.g., * from JOLT sequence), it won't process any new work until * reset() is called, even if nested async operations try to * post work to it. This means async bridges invoked from * the outermost thread main sequence won't work until this * reset() call. */ self.getIoService().reset(); self.getIoService().run(); } catch (const std::exception& e) { sendExceptionInd = true; std::cerr << self.name << ":" << __func__ << ": Exception occurred: " << e.what() << "\n"; } catch (...) { sendExceptionInd = true; std::cerr << self.name << ":" << __func__ << ": Unknown exception occurred" << "\n"; } if (sendExceptionInd) { mrntt::mrntt.finalizeReq( std::bind( &mrntt::marionetteFinalizeReqCb, std::placeholders::_1)); } } std::cout << self.name << ":" << __func__ << ": Exited event loop" << "\n"; } class MindThread::ThreadLifetimeMgmtOp : public PostedAsynchronousContinuation { public: ThreadLifetimeMgmtOp( const std::shared_ptr &caller, const std::shared_ptr &target, threadLifetimeMgmtOpCbFn callback) : PostedAsynchronousContinuation( caller, callback), target(target) {} public: const std::shared_ptr target; public: void joltThreadReq1_posted( [[maybe_unused]] std::shared_ptr context ) { std::cout << __func__ << ": Thread '" << target->name << "': handling " "JOLT request." << "\n"; target->io_service.stop(); callOriginalCb(); } void startThreadReq1_posted( [[maybe_unused]] std::shared_ptr context ) { std::cout << __func__ << ": Thread '" << target->name << "': handling " "startThread." << "\n"; // Execute private setup sequence here // This is where each thread would implement its specific initialization callOriginalCb(); } void exitThreadReq1_mainQueue_posted( [[maybe_unused]] std::shared_ptr context ) { std::cout << __func__ << ": Thread '" << target->name << "': handling " "exitThread (main queue)." << "\n"; target->cleanup(); target->io_service.stop(); callOriginalCb(); } void exitThreadReq1_pauseQueue_posted( [[maybe_unused]] std::shared_ptr context ) { std::cout << __func__ << ": Thread '" << target->name << "': handling " "exitThread (pause queue)."<< "\n"; target->cleanup(); target->pause_io_service.stop(); target->io_service.stop(); callOriginalCb(); } void pauseThreadReq1_posted( [[maybe_unused]] std::shared_ptr context ) { std::cout << __func__ << ": Thread '" << target->name << "': handling " "pauseThread." << "\n"; /* We have to invoke the callback here before moving on because * our next operation is going to block the thread, so it won't * have a chance to invoke the callback until it's unblocked. */ callOriginalCb(); target->pause_io_service.reset(); target->pause_io_service.run(); } void resumeThreadReq1_posted( [[maybe_unused]] std::shared_ptr context ) { std::cout << __func__ << ": Thread '" << target->name << "': handling " "resumeThread." << "\n"; target->pause_io_service.stop(); callOriginalCb(); } }; void ComponentThread::cleanup(void) { this->keepLooping = false; } void MindThread::joltThreadReq(threadLifetimeMgmtOpCbFn callback) { /** EXPLANATION: * We can't use shared_from_this() here because JOLTing occurs prior to * TLS being set up. * * We also can't use getSelf() as yet for the same reason: getSelf() * requires TLS to be set up. * * To obtain a sh_ptr to the caller, we just supply the mrntt thread since * JOLT is always invoked by the mrntt thread. The JOLT sequence that the * CRT main() function invokes on the mrntt thread is special since it * supplies cmdline args and envp. * * To obtain a sh_ptr to the target thread, we explicitly look it up in the * Mind object's collection of component threads. */ if (id == ComponentThread::MRNTT) { throw std::runtime_error(std::string(__func__) + ": invoked on mrntt thread"); } std::shared_ptr mrntt = mrntt::thread; std::shared_ptr target = getParent().getComponentThread(id); auto request = std::make_shared( mrntt, target, callback); this->getIoService().post( std::bind( &ThreadLifetimeMgmtOp::joltThreadReq1_posted, request.get(), request)); } // Thread management method implementations void MindThread::startThreadReq(threadLifetimeMgmtOpCbFn callback) { std::shared_ptr caller = getSelf(); auto request = std::make_shared( caller, shared_from_this(), callback); this->getIoService().post( std::bind( &ThreadLifetimeMgmtOp::startThreadReq1_posted, request.get(), request)); } void MindThread::exitThreadReq(threadLifetimeMgmtOpCbFn callback) { std::shared_ptr caller = getSelf(); auto request = std::make_shared( caller, shared_from_this(), callback); this->getIoService().post( std::bind( &ThreadLifetimeMgmtOp::exitThreadReq1_mainQueue_posted, request.get(), request)); pause_io_service.post( std::bind( &ThreadLifetimeMgmtOp::exitThreadReq1_pauseQueue_posted, request.get(), request)); } void MindThread::pauseThreadReq(threadLifetimeMgmtOpCbFn callback) { if (id == ComponentThread::MRNTT) { throw std::runtime_error(std::string(__func__) + ": invoked on mrntt thread"); } std::shared_ptr caller = getSelf(); auto request = std::make_shared( caller, shared_from_this(), callback); this->getIoService().post( std::bind( &ThreadLifetimeMgmtOp::pauseThreadReq1_posted, request.get(), request)); } void MindThread::resumeThreadReq(threadLifetimeMgmtOpCbFn callback) { if (id == ComponentThread::MRNTT) { throw std::runtime_error(std::string(__func__) + ": invoked on mrntt thread"); } // Post to the pause_io_service to unblock the paused thread std::shared_ptr caller = getSelf(); auto request = std::make_shared( caller, shared_from_this(), callback); pause_io_service.post( std::bind( &ThreadLifetimeMgmtOp::resumeThreadReq1_posted, request.get(), request)); } // CPU management method implementations int ComponentThread::getAvailableCpuCount() { int cpuCount = sysconf(_SC_NPROCESSORS_ONLN); if (cpuCount <= 0) { throw std::runtime_error(std::string(__func__) + ": Failed to determine CPU count"); } // Check if std::thread::hardware_concurrency() matches sysconf result unsigned int hwConcurrency = std::thread::hardware_concurrency(); if (hwConcurrency != static_cast(cpuCount)) { std::cerr << "Warning: CPU count mismatch - " "std::thread::hardware_concurrency() = " << hwConcurrency << ", sysconf(_SC_NPROCESSORS_ONLN) = " << cpuCount << "\n"; } return cpuCount; } void MindThread::pinToCpu(int cpuId) { if (cpuId < 0) { throw std::runtime_error(std::string(__func__) + ": Invalid CPU ID: " + std::to_string(cpuId)); } cpu_set_t cpuset; CPU_ZERO(&cpuset); CPU_SET(cpuId, &cpuset); int result = pthread_setaffinity_np( thread.native_handle(), sizeof(cpu_set_t), &cpuset); if (result != 0) { throw std::runtime_error(std::string(__func__) + ": Failed to pin thread to CPU " + std::to_string(cpuId) + ": " + std::strerror(result)); } pinnedCpuId = cpuId; if (OptionParser::getOptions().verbose) { std::cout << name << ": Pinned to CPU " << cpuId << "\n"; } } } // namespace smo