diff --git a/main.cpp b/main.cpp index f8f404f..c88c8b8 100644 --- a/main.cpp +++ b/main.cpp @@ -2,6 +2,7 @@ #include #include + int main(int argc, char *argv[], char *envp[]) { /* We don't do anything inside of main() diff --git a/smocore/componentThread.cpp b/smocore/componentThread.cpp index 7009d1b..7f79552 100644 --- a/smocore/componentThread.cpp +++ b/smocore/componentThread.cpp @@ -11,74 +11,15 @@ namespace smo { thread_local std::shared_ptr thisComponentThread; -const std::string ComponentThread::threadNames[N_ITEMS] = -{ - "mrntt", - "director", - "simulator", - "subconscious", - "body", - "world" -}; - namespace mrntt { -std::shared_ptr mrntt = - std::make_shared(ComponentThread::MRNTT); -} -namespace director { -/* The director is the seat of volition in Salmanoff. It receives sensor -* events from the body and world, and uses them to direct its implexors -* to implex new menties. It then loads the menties into canvas for simulation -* and correlation with intrins, in order to form new attrimotions and -* menties. -*/ -std::shared_ptr director = - std::make_shared(ComponentThread::DIRECTOR); -} -namespace simulator { -/* The canvas is the simulation engine in Salmanoff. It receives menties and - * simulates them in accordance with the instructions from director. It then - * re-renders them into perception for director to get feedback. - */ -std::shared_ptr canvas = - std::make_shared(ComponentThread::SIMULATOR); -} -namespace subconscious { -/* The subconscious is the seat of memory in Salmanoff. It receives menties - * from director and stores them in memory for later recall. - */ -std::shared_ptr subconscious = - std::make_shared(ComponentThread::SUBCONSCIOUS); -} -namespace body { -/* The body is a thread that polls, processes, and sends interoceptive sensor - * events to director. It enables these events to occur asynchronously, - * indepdendent any actions that the other threads are taking. - */ -std::shared_ptr body = - std::make_shared(ComponentThread::BODY); -} -namespace world { -/* The world performs the same functions as the body, but for extrospective - * sensor events. - */ -std::shared_ptr world = - std::make_shared(ComponentThread::WORLD); +extern std::shared_ptr mrntt; } -// Initialize static state -std::atomic ComponentThread::threadsHaveBeenJolted{false}; - -std::array, ComponentThread::N_ITEMS> - ComponentThread::componentThreads = +// Implementation of static method +std::shared_ptr ComponentThread::getMrntt() { - mrntt::mrntt, - director::director, - simulator::canvas, - subconscious::subconscious, - body::body, - world::world -}; + return mrntt::mrntt; +} void ComponentThread::initializeTls(void) { @@ -239,125 +180,6 @@ void ComponentThread::joltThreadReq(std::function callback) }); } -struct AllMindThreadsOpReqContext { - AllMindThreadsOpReqContext() : nThreadsProcessed(0) {} - - int nThreadsProcessed; -}; - -static const std::string getOpName(ComponentThread::ThreadOp op) -{ - if (op < (ComponentThread::ThreadOp)0 - || op > ComponentThread::ThreadOp::JOLT) - { - throw std::runtime_error(std::string(__func__) - + ": Invalid operation"); - } - - switch (op) - { - case ComponentThread::ThreadOp::START: return "starting"; - case ComponentThread::ThreadOp::PAUSE: return "pausing"; - case ComponentThread::ThreadOp::RESUME: return "resuming"; - case ComponentThread::ThreadOp::EXIT: return "exiting"; - case ComponentThread::ThreadOp::JOLT: return "jolting"; - default: return "unknown"; - } -} - -void ComponentThread::execOpOnAllMindThreadsReq( - ThreadOp op, std::function callback - ) -{ - std::shared_ptr self = getSelf(); - // Check that we're being called from the marionette thread - if (self->id != MRNTT) - { - throw std::runtime_error(std::string(__func__) - + ": invoked on non-mrntt thread " + self->name); - } - - std::cout << "Mrntt: " << getOpName(op) << " all mind threads." << "\n"; - - auto context = std::make_shared(); - const int N_THREADS_EXCEPT_MRNTT = ComponentThread::N_ITEMS - 1; - - for (auto &currThread : ComponentThread::componentThreads) - { - if (currThread->id == ComponentThread::MRNTT) - { continue; } - - auto threadCallback = [context, callback, N_THREADS_EXCEPT_MRNTT, op]() - { - ++context->nThreadsProcessed; - if (context->nThreadsProcessed < N_THREADS_EXCEPT_MRNTT) - { return; } - - if (op == ThreadOp::EXIT) - { - // Special cleanup for exit operations - for (auto &currThreadJ : ComponentThread::componentThreads) - { - if (currThreadJ->id == ComponentThread::MRNTT) - { continue; } - - currThreadJ->thread.join(); - } - } - - std::cout << "Mrntt: all mind threads done " << getOpName(op) << "." - << "\n"; - - if (callback) { callback(); } - }; - - switch (op) { - case ThreadOp::START: - currThread->startThreadReq(threadCallback); - break; - case ThreadOp::PAUSE: - currThread->pauseThreadReq(threadCallback); - break; - case ThreadOp::RESUME: - currThread->resumeThreadReq(threadCallback); - break; - case ThreadOp::EXIT: - currThread->exitThreadReq(threadCallback); - break; - case ThreadOp::JOLT: - currThread->joltThreadReq(threadCallback); - break; - default: - throw std::runtime_error("Invalid operation"); - } - } -} - -void ComponentThread::startAllMindThreadsReq(std::function callback) -{ - execOpOnAllMindThreadsReq(ThreadOp::START, callback); -} - -void ComponentThread::pauseAllMindThreadsReq(std::function callback) -{ - execOpOnAllMindThreadsReq(ThreadOp::PAUSE, callback); -} - -void ComponentThread::resumeAllMindThreadsReq(std::function callback) -{ - execOpOnAllMindThreadsReq(ThreadOp::RESUME, callback); -} - -void ComponentThread::exitAllMindThreadsReq(std::function callback) -{ - execOpOnAllMindThreadsReq(ThreadOp::EXIT, callback); -} - -void ComponentThread::joltAllMindThreadsReq(std::function callback) -{ - execOpOnAllMindThreadsReq(ThreadOp::JOLT, callback); -} - /* This shouldn't take a callback because the caller shouldn't expect to * Mrntt to send a reply signal to it. Sending this Indication means that * Mrntt will send the calling thread an exitThreadReq. When the caller @@ -382,8 +204,18 @@ void ComponentThread::exceptionInd(ComponentThread& thread) std::cerr << "Mrntt: Exception occurred: in thread " << thread.name << ". Killing Salmanoff." << "\n"; - // Delegate to common shutdown request - mind.finalizeReq(smo::mrntt::exitMarionetteLoop); + /** EXPLANATION: + * An exception has occurred in one of a mind's threads. We need to + * shut down all of that particular mind's threads. + */ + thread.parent.finalizeReq([]() { + /** FIXME: + * When we eventually support multiple minds, we should remove this + * since it causes marionette to exit, even if there are other minds + * that are still running. + */ + smo::mrntt::exitMarionetteLoop(); + }); }); } @@ -397,13 +229,23 @@ void ComponentThread::userShutdownInd() // Post the user shutdown to the mrntt thread. this->getIoService().post( - []() + [this]() { std::cerr << "Mrntt: User requested shutdown (SIGINT)." << " Killing Salmanoff." << "\n"; - // Delegate to common shutdown request - mind.finalizeReq(smo::mrntt::exitMarionetteLoop); + /** EXPLANATION: + * A user has requested a shutdown. We need to shut down all of the + * threads in all running Minds. + */ + parent.finalizeReq([]() { + /** FIXME: + * When we eventually support multiple minds, we should remove this + * since it causes marionette to exit, even if there are other minds + * that are still running. + */ + smo::mrntt::exitMarionetteLoop(); + }); }); } @@ -455,24 +297,4 @@ void ComponentThread::pinToCpu(int cpuId) std::cout << name << ": Pinned to CPU " << cpuId << "\n"; } -void ComponentThread::distributeAndPinThreadsAcrossCpus() -{ - int cpuCount = getAvailableCpuCount(); - std::cout << "Available CPUs: " << cpuCount << "\n"; - - // Skip the marionette thread (MRNTT) as it's the control thread - int threadIndex = 0; - for (auto& thread : componentThreads) - { - if (thread->id == MRNTT) { continue; } - - int targetCpu = threadIndex % cpuCount; - thread->pinToCpu(targetCpu); - ++threadIndex; - } - - std::cout << "Distributed " << (threadIndex) << " threads across " - << cpuCount << " CPUs\n"; -} - } // namespace smo diff --git a/smocore/include/componentThread.h b/smocore/include/componentThread.h index d46f6ca..ff58906 100644 --- a/smocore/include/componentThread.h +++ b/smocore/include/componentThread.h @@ -11,9 +11,12 @@ #include #include #include +#include namespace smo { +class Mind; // Forward declaration + class ComponentThread : public std::enable_shared_from_this { @@ -29,8 +32,8 @@ public: N_ITEMS }; - ComponentThread(ThreadId id) - : id(id), name(getThreadName(id)), + ComponentThread(ThreadId _id, Mind& parent) + : id(_id), name(getThreadName(_id)), parent(parent), work(io_service), pause_work(pause_io_service), pinnedCpuId(-1), thread( @@ -45,34 +48,8 @@ public: void initializeTls(void); static const std::shared_ptr getSelf(void); - static std::shared_ptr getComponentThread( - ThreadId id = N_ITEMS) - { - if (id < 0 || id > N_ITEMS) - { - throw std::runtime_error(std::string(__func__) - + ": Invalid thread ID"); - } - return componentThreads[id]; - } - - // Overload: search by name - static std::shared_ptr getComponentThread( - const std::string& name) - { - for (auto& thread : componentThreads) { - if (thread->name == name) { return thread; } - } - throw std::runtime_error(std::string(__func__) - + ": Thread name not found in componentThreads map"); - } - - static boost::asio::io_service& getEventLoop( - ThreadId id = MRNTT) - { - return getComponentThread(id)->getIoService(); - } - + static std::shared_ptr getMrntt(); + Mind& getParent() const { return parent; } typedef void (mainFn)(ComponentThread &self); static mainFn main, marionetteMain; @@ -91,17 +68,9 @@ public: */ void joltThreadReq(std::function callback = nullptr); - // Convenience wrappers - static void startAllMindThreadsReq(std::function callback = nullptr); - static void pauseAllMindThreadsReq(std::function callback = nullptr); - static void resumeAllMindThreadsReq(std::function callback = nullptr); - static void exitAllMindThreadsReq(std::function callback = nullptr); - static void joltAllMindThreadsReq(std::function callback = nullptr); - // CPU management methods static int getAvailableCpuCount(); void pinToCpu(int cpuId); - static void distributeAndPinThreadsAcrossCpus(); enum class ThreadOp { @@ -112,8 +81,6 @@ public: JOLT, N_ITEMS }; - static void execOpOnAllMindThreadsReq( - ThreadOp op, std::function callback = nullptr); // Intentionally doesn't take a callback. void exceptionInd(ComponentThread& thread); @@ -123,41 +90,20 @@ public: public: ThreadId id; std::string name; + Mind &parent; boost::asio::io_service io_service; boost::asio::io_service::work work; boost::asio::io_service pause_io_service; boost::asio::io_service::work pause_work; std::atomic keepLooping; int pinnedCpuId; - /** - * Indicates whether all mind threads have been JOLTed at least once. - * - * JOLTing serves two critical purposes: - * - * 1. **Global Constructor Sequencing**: Since pthreads begin executing while - * global constructors are still being executed, globally defined pthreads - * cannot depend on global objects having been constructed. JOLTing is done - * by the CRT's main thread within main(), which provides a sequencing - * guarantee that global constructors have been called. - * - * 2. **shared_from_this Safety**: shared_from_this() requires a prior - * shared_ptr handle to be established. The global list of - * shared_ptr guarantees that at least one shared_ptr to - * each ComponentThread has been initialized before JOLTing occurs. - * - * This atomic flag ensures that JOLTing happens exactly once and provides - * a synchronization point for the entire system initialization. - */ - static std::atomic threadsHaveBeenJolted; + /* Always ensure that this is last so that the thread is spawned after * everything else is constructed. */ std::thread thread; - static std::array, ComponentThread::N_ITEMS> - componentThreads; - static const std::string threadNames[ComponentThread::N_ITEMS]; static const std::string getThreadName(ThreadId id) { if (id < 0 || id >= ComponentThread::N_ITEMS) @@ -165,6 +111,17 @@ public: throw std::runtime_error(std::string(__func__) + ": Invalid thread ID"); } + + // Use function-local static to ensure proper initialization order + static const std::string threadNames[N_ITEMS] = { + "mrntt", + "director", + "simulator", + "subconscious", + "body", + "world" + }; + return threadNames[id]; } }; @@ -172,21 +129,6 @@ public: namespace mrntt { extern std::shared_ptr mrntt; } -namespace director { -extern std::shared_ptr director; -} -namespace simulator { -extern std::shared_ptr canvas; -} -namespace subconscious { -extern std::shared_ptr subconscious; -} -namespace body { -extern std::shared_ptr body; -} -namespace world { -extern std::shared_ptr world; -} } // namespace smo diff --git a/smocore/include/mind.h b/smocore/include/mind.h index 2ddc429..dab5f85 100644 --- a/smocore/include/mind.h +++ b/smocore/include/mind.h @@ -4,21 +4,44 @@ #include #include #include +#include +#include +#include #include #include +#include namespace smo { -class Mind +class Mind : public std::enable_shared_from_this { public: - Mind(void) {} + Mind(void); + ~Mind(void) = default; void initialize(void); void execute(void); void finalizeReq(std::function callback); + // ComponentThread access methods + std::shared_ptr getComponentThread( + ComponentThread::ThreadId id) const; + std::shared_ptr getComponentThread( + const std::string& name) const; + // Get all this Mind's component threads. + std::vector> getMindThreads() const; + + // Thread management methods (moved from ComponentThread) + void startAllMindThreadsReq(std::function callback = nullptr); + void pauseAllMindThreadsReq(std::function callback = nullptr); + void resumeAllMindThreadsReq(std::function callback = nullptr); + void exitAllMindThreadsReq(std::function callback = nullptr); + void joltAllMindThreadsReq(std::function callback = nullptr); + + // CPU distribution method + void distributeAndPinThreadsAcrossCpus(); + public: std::thread directorThread; std::thread simulatorThread; @@ -26,9 +49,34 @@ public: director::Director director; simulator::Simulator canvas; + +private: + /** + * Indicates whether all mind threads have been JOLTed at least once. + * + * JOLTing serves two critical purposes: + * + * 1. **Global Constructor Sequencing**: Since pthreads begin executing while + * global constructors are still being executed, globally defined pthreads + * cannot depend on global objects having been constructed. JOLTing is done + * by the CRT's main thread within main(), which provides a sequencing + * guarantee that global constructors have been called. + * + * 2. **shared_from_this Safety**: shared_from_this() requires a prior + * shared_ptr handle to be established. The global list of + * shared_ptr guarantees that at least one shared_ptr to + * each ComponentThread has been initialized before JOLTing occurs. + * + * This flag ensures that JOLTing happens exactly once and provides + * a synchronization point for the entire system initialization. + */ + bool threadsHaveBeenJolted = false; + // Collection of ComponentThread instances (excluding marionette) + std::vector> componentThreads; }; -extern Mind mind; +// Global Mind instance will be defined in marionette.cpp +extern std::shared_ptr globalMind; } // namespace smo diff --git a/smocore/marionette/marionette.cpp b/smocore/marionette/marionette.cpp index 699988b..2ff50ca 100644 --- a/smocore/marionette/marionette.cpp +++ b/smocore/marionette/marionette.cpp @@ -3,15 +3,22 @@ #include #include #include +#include +#include #include #include #include #include -#include -#include namespace smo { +// Global Mind instance +std::shared_ptr globalMind = std::make_shared(); + +// Global marionette thread instance +std::shared_ptr mrntt::mrntt = + std::make_shared(ComponentThread::MRNTT, *globalMind); + CrtCommandLineArgs crtCommandLineArgs(0, nullptr, nullptr); void CrtCommandLineArgs::set(int argc, char *argv[], char *envp[]) @@ -71,7 +78,8 @@ void ComponentThread::marionetteMain(ComponentThread& self) initializeSalmanoff(); self.getIoService().post([]() { - mind.initialize(); + // Initialize the global Mind object + globalMind->initialize(); }); std::cout << __func__ << ": Entering event loop" << "\n"; @@ -143,7 +151,7 @@ void ComponentThread::marionetteMain(ComponentThread& self) if (callFinalizeReq) { - mind.finalizeReq([]{ + globalMind->finalizeReq([]{ mrntt::mrntt->getIoService().stop(); }); self.getIoService().reset(); diff --git a/smocore/mind.cpp b/smocore/mind.cpp index 7361127..d14a4e4 100644 --- a/smocore/mind.cpp +++ b/smocore/mind.cpp @@ -4,14 +4,23 @@ namespace smo { -Mind mind; +Mind::Mind(void) + : componentThreads{ + std::make_shared(ComponentThread::DIRECTOR, *this), + std::make_shared(ComponentThread::SIMULATOR, *this), + std::make_shared(ComponentThread::SUBCONSCIOUS, *this), + std::make_shared(ComponentThread::BODY, *this), + std::make_shared(ComponentThread::WORLD, *this) + } +{ +} void Mind::initialize() { /* Distribute threads across available CPUs */ try { - ComponentThread::distributeAndPinThreadsAcrossCpus(); + distributeAndPinThreadsAcrossCpus(); } catch (const std::exception& e) { @@ -21,12 +30,12 @@ void Mind::initialize() } /* Jolt the threads, then start them */ - ComponentThread::joltAllMindThreadsReq( - []() + joltAllMindThreadsReq( + [this]() { - ComponentThread::threadsHaveBeenJolted.store(true); std::cout << "Mrntt: All mind threads JOLTed." << "\n"; - ComponentThread::startAllMindThreadsReq( + // Start all threads after JOLTing + startAllMindThreadsReq( []() { std::cout << "Mrntt: All mind threads started." << "\n"; @@ -42,13 +51,12 @@ void Mind::finalizeReq(std::function callback) * otherwise they'll just enter their main loops and wait for control * messages from mrntt after processing the exit request. */ - if (!ComponentThread::threadsHaveBeenJolted.load()) + if (!threadsHaveBeenJolted) { - ComponentThread::joltAllMindThreadsReq( - [callback]() + joltAllMindThreadsReq( + [this, callback]() { - ComponentThread::threadsHaveBeenJolted.store(true); - ComponentThread::exitAllMindThreadsReq( + exitAllMindThreadsReq( [callback]() { std::cout << "Mrntt: All mind threads exited." << "\n"; @@ -60,7 +68,7 @@ void Mind::finalizeReq(std::function callback) } else { - ComponentThread::exitAllMindThreadsReq( + exitAllMindThreadsReq( [callback]() { std::cout << "Mrntt: All mind threads exited." << "\n"; @@ -70,4 +78,162 @@ void Mind::finalizeReq(std::function callback) } } +void Mind::joltAllMindThreadsReq(std::function callback) +{ + // Create a counter to track when all threads have been jolted + auto counter = std::make_shared>(componentThreads.size()); + + for (auto& thread : componentThreads) + { + thread->joltThreadReq([counter, callback, this]() { + if (--(*counter) == 0) + { + // Set the flag only after all threads have ACKed their JOLT + threadsHaveBeenJolted = true; + if (callback) { callback(); } + } + }); + } + + // If no threads, set flag and call callback immediately + if (componentThreads.empty()) + { + threadsHaveBeenJolted = true; + if (callback) { callback(); } + } +} + +std::shared_ptr +Mind::getComponentThread(ComponentThread::ThreadId id) const +{ + // Access the global marionette thread using ComponentThread::getMrntt() + if (id == ComponentThread::MRNTT) { return ComponentThread::getMrntt(); } + + // Search through the vector for the thread with matching id + for (auto& thread : componentThreads) { + if (thread->id == id) { return thread; } + } + + // Throw exception if no thread found + throw std::runtime_error(std::string(__func__) + + ": No ComponentThread found with ID " + + std::to_string(static_cast(id))); +} + +std::shared_ptr +Mind::getComponentThread(const std::string& name) const +{ + if (name == "mrntt") { return ComponentThread::getMrntt(); } + + for (auto& thread : componentThreads) { + if (thread->name == name) { return thread; } + } + + // Throw exception if no thread found + throw std::runtime_error(std::string(__func__) + + ": No ComponentThread found with name '" + name + "'"); +} + +std::vector> +Mind::getMindThreads() const +{ + return componentThreads; +} + +void Mind::distributeAndPinThreadsAcrossCpus() +{ + int cpuCount = ComponentThread::getAvailableCpuCount(); + std::cout << "Available CPUs: " << cpuCount << "\n"; + + // Distribute and pin threads across CPUs + int threadIndex = 0; + for (auto& thread : componentThreads) + { + int targetCpu = threadIndex % cpuCount; + thread->pinToCpu(targetCpu); + ++threadIndex; + } + + std::cout << "Distributed " << threadIndex << " threads across " + << cpuCount << " CPUs\n"; +} + +// Thread management methods (moved from ComponentThread) +void Mind::startAllMindThreadsReq(std::function callback) +{ + // Create a counter to track when all threads have started + auto counter = std::make_shared>(componentThreads.size()); + + for (auto& thread : componentThreads) + { + thread->startThreadReq([counter, callback]() { + if (--(*counter) == 0 && callback) { callback(); } + }); + } + + // If no threads, call callback immediately + if (componentThreads.empty() && callback) { callback(); } +} + +void Mind::pauseAllMindThreadsReq(std::function callback) +{ + // Create a counter to track when all threads have paused + auto counter = std::make_shared>(componentThreads.size()); + + for (auto& thread : componentThreads) + { + thread->pauseThreadReq([counter, callback]() { + if (--(*counter) == 0 && callback) { callback(); } + }); + } + + // If no threads, call callback immediately + if (componentThreads.empty() && callback) { + callback(); + } +} + +void Mind::resumeAllMindThreadsReq(std::function callback) +{ + // Create a counter to track when all threads have resumed + auto counter = std::make_shared>(componentThreads.size()); + + for (auto& thread : componentThreads) + { + thread->resumeThreadReq([counter, callback]() { + if (--(*counter) == 0 && callback) { callback(); } + }); + } + + // If no threads, call callback immediately + if (componentThreads.empty() && callback) { + callback(); + } +} + +void Mind::exitAllMindThreadsReq(std::function callback) +{ + // Create a counter to track when all threads have exited + auto counter = std::make_shared>(componentThreads.size()); + + for (auto& thread : componentThreads) + { + thread->exitThreadReq([counter, callback, this]() { + if (--(*counter) == 0) + { + // All threads have exited their loops, now join them + for (auto& t : componentThreads) { + t->thread.join(); + } + if (callback) { callback(); } + } + }); + } + + // If no threads, call callback immediately + if (componentThreads.empty() && callback) { + callback(); + } +} + } // namespace smo