#ifndef COMPONENT_THREAD_H #define COMPONENT_THREAD_H #include #include #include #include #include #include #include #include #include #include namespace smo { class ComponentThread : public std::enable_shared_from_this { public: enum ThreadId { MRNTT = 0, DIRECTOR, SIMULATOR, SUBCONSCIOUS, BODY, WORLD, N_ITEMS }; ComponentThread(ThreadId id) : id(id), name(getThreadName(id)), work(io_service), pause_work(pause_io_service), pinnedCpuId(-1), thread( ((id == MRNTT) ? marionetteMain : main), std::ref(*this)) {} void cleanup(void); boost::asio::io_service& getIoService(void) { return io_service; } void initializeTls(void); static const std::shared_ptr getSelf(void); static std::shared_ptr getComponentThread( ThreadId id = N_ITEMS) { if (id < 0 || id > N_ITEMS) { throw std::runtime_error(std::string(__func__) + ": Invalid thread ID"); } return componentThreads[id]; } // Overload: search by name static std::shared_ptr getComponentThread( const std::string& name) { for (auto& thread : componentThreads) { if (thread->name == name) { return thread; } } throw std::runtime_error(std::string(__func__) + ": Thread name not found in componentThreads map"); } static boost::asio::io_service& getEventLoop( ThreadId id = MRNTT) { return getComponentThread(id)->getIoService(); } typedef void (mainFn)(ComponentThread &self); static mainFn main, marionetteMain; // Thread management methods void startThreadReq(std::function callback = nullptr); void exitThreadReq(std::function callback = nullptr); void pauseThreadReq(std::function callback = nullptr); void resumeThreadReq(std::function callback = nullptr); void joltThreadReq(std::function callback = nullptr); // Convenience wrappers static void startAllMindThreadsReq(std::function callback = nullptr); static void pauseAllMindThreadsReq(std::function callback = nullptr); static void resumeAllMindThreadsReq(std::function callback = nullptr); static void exitAllMindThreadsReq(std::function callback = nullptr); static void joltAllMindThreadsReq(std::function callback = nullptr); // CPU management methods static int getAvailableCpuCount(); void pinToCpu(int cpuId); static void distributeAndPinThreadsAcrossCpus(); enum class ThreadOp { START, PAUSE, RESUME, EXIT, JOLT, N_ITEMS }; static void execOpOnAllMindThreadsReq( ThreadOp op, std::function callback = nullptr); // Intentionally doesn't take a callback. void exceptionInd(ComponentThread& thread); public: ThreadId id; std::string name; boost::asio::io_service io_service; boost::asio::io_service::work work; boost::asio::io_service pause_io_service; boost::asio::io_service::work pause_work; std::atomic keepLooping; int pinnedCpuId; /* Always ensure that this is last so that the thread is spawned after * everything else is constructed. */ std::thread thread; static std::array, ComponentThread::N_ITEMS> componentThreads; static const std::string threadNames[ComponentThread::N_ITEMS]; static const std::string getThreadName(ThreadId id) { if (id < 0 || id >= ComponentThread::N_ITEMS) { throw std::runtime_error(std::string(__func__) + ": Invalid thread ID"); } return threadNames[id]; } }; namespace mrntt { extern std::shared_ptr mrntt; } namespace director { extern std::shared_ptr director; } namespace simulator { extern std::shared_ptr canvas; } namespace subconscious { extern std::shared_ptr subconscious; } namespace body { extern std::shared_ptr body; } namespace world { extern std::shared_ptr world; } } // namespace smo #endif // COMPONENT_THREAD_H