From f862db922ecf36498d5a5379ffcc5ed047dff31d Mon Sep 17 00:00:00 2001 From: Hayodea Hekol Date: Sat, 27 Dec 2025 14:01:15 -0400 Subject: [PATCH] spinscale: Move thread init/jolt/exit logic into PuppetApplication --- libspinscale/CMakeLists.txt | 1 + .../include/spinscale/puppetApplication.h | 65 +++++ libspinscale/src/puppetApplication.cpp | 204 +++++++++++++++ smocore/include/mind.h | 44 +--- smocore/mind.cpp | 245 +++--------------- 5 files changed, 308 insertions(+), 251 deletions(-) create mode 100644 libspinscale/include/spinscale/puppetApplication.h create mode 100644 libspinscale/src/puppetApplication.cpp diff --git a/libspinscale/CMakeLists.txt b/libspinscale/CMakeLists.txt index a72c60c..df4bb75 100644 --- a/libspinscale/CMakeLists.txt +++ b/libspinscale/CMakeLists.txt @@ -3,6 +3,7 @@ add_library(spinscale SHARED src/lockerAndInvokerBase.cpp src/componentThread.cpp src/component.cpp + src/puppetApplication.cpp ) # Conditionally add qutexAcquisitionHistoryTracker.cpp only when debug locks diff --git a/libspinscale/include/spinscale/puppetApplication.h b/libspinscale/include/spinscale/puppetApplication.h new file mode 100644 index 0000000..c13152e --- /dev/null +++ b/libspinscale/include/spinscale/puppetApplication.h @@ -0,0 +1,65 @@ +#ifndef PUPPET_APPLICATION_H +#define PUPPET_APPLICATION_H + +#include +#include +#include +#include +#include +#include + +namespace smo { + +class PuppetApplication +: public std::enable_shared_from_this +{ +public: + PuppetApplication( + const std::vector> &threads); + ~PuppetApplication() = default; + + // Thread management methods + typedef std::function puppetThreadLifetimeMgmtOpCbFn; + void joltAllPuppetThreadsReq( + Callback callback); + void startAllPuppetThreadsReq( + Callback callback); + void pauseAllPuppetThreadsReq( + Callback callback); + void resumeAllPuppetThreadsReq( + Callback callback); + void exitAllPuppetThreadsReq( + Callback callback); + +protected: + // Collection of PuppetThread instances + std::vector> componentThreads; + + /** + * Indicates whether all puppet threads have been JOLTed at least once. + * + * JOLTing serves two critical purposes: + * + * 1. **Global Constructor Sequencing**: Since pthreads begin executing while + * global constructors are still being executed, globally defined pthreads + * cannot depend on global objects having been constructed. JOLTing is done + * by the CRT's main thread within main(), which provides a sequencing + * guarantee that global constructors have been called. + * + * 2. **shared_from_this Safety**: shared_from_this() requires a prior + * shared_ptr handle to be established. The global list of + * shared_ptr guarantees that at least one shared_ptr to + * each ComponentThread has been initialized before JOLTing occurs. + * + * This flag ensures that JOLTing happens exactly once and provides + * a synchronization point for the entire system initialization. + */ + bool threadsHaveBeenJolted = false; + +private: + class PuppetThreadLifetimeMgmtOp; +}; + +} // namespace smo + +#endif // PUPPET_APPLICATION_H diff --git a/libspinscale/src/puppetApplication.cpp b/libspinscale/src/puppetApplication.cpp new file mode 100644 index 0000000..c5e7cad --- /dev/null +++ b/libspinscale/src/puppetApplication.cpp @@ -0,0 +1,204 @@ +#include +#include +#include +#include +#include +#include + +namespace smo { + +PuppetApplication::PuppetApplication( + const std::vector> &threads) +: componentThreads(threads) +{ +} + +class PuppetApplication::PuppetThreadLifetimeMgmtOp +: public NonPostedAsynchronousContinuation +{ +public: + PuppetThreadLifetimeMgmtOp( + PuppetApplication &parent, unsigned int nThreads, + Callback callback) + : NonPostedAsynchronousContinuation(callback), + loop(nThreads), + parent(parent) + {} + +public: + AsynchronousLoop loop; + PuppetApplication &parent; + +public: + void joltAllPuppetThreadsReq1( + [[maybe_unused]] std::shared_ptr context + ) + { + loop.incrementSuccessOrFailureDueTo(true); + if (!loop.isComplete()) { + return; + } + + parent.threadsHaveBeenJolted = true; + callOriginalCb(); + } + + void executeGenericOpOnAllPuppetThreadsReq1( + [[maybe_unused]] std::shared_ptr context + ) + { + loop.incrementSuccessOrFailureDueTo(true); + if (!loop.isComplete()) { + return; + } + + callOriginalCb(); + } + + void exitAllPuppetThreadsReq1( + [[maybe_unused]] std::shared_ptr context + ) + { + loop.incrementSuccessOrFailureDueTo(true); + if (!loop.isComplete()) { + return; + } + + for (auto& thread : parent.componentThreads) { + thread->thread.join(); + } + + callOriginalCb(); + } +}; + +void PuppetApplication::joltAllPuppetThreadsReq( + Callback callback + ) +{ + if (threadsHaveBeenJolted) + { + std::cout << "Mrntt: All puppet threads already JOLTed. " + << "Skipping JOLT request." << "\n"; + callback.callbackFn(); + return; + } + + // If no threads, set flag and call callback immediately + if (componentThreads.size() == 0 && callback.callbackFn) + { + threadsHaveBeenJolted = true; + callback.callbackFn(); + return; + } + + // Create a counter to track when all threads have been jolted + auto request = std::make_shared( + *this, componentThreads.size(), callback); + + for (auto& thread : componentThreads) + { + thread->joltThreadReq( + {request, std::bind( + &PuppetThreadLifetimeMgmtOp::joltAllPuppetThreadsReq1, + request.get(), request)}); + } +} + +void PuppetApplication::startAllPuppetThreadsReq( + Callback callback + ) +{ + // If no threads, call callback immediately + if (componentThreads.size() == 0 && callback.callbackFn) + { + callback.callbackFn(); + return; + } + + // Create a counter to track when all threads have started + auto request = std::make_shared( + *this, componentThreads.size(), callback); + + for (auto& thread : componentThreads) + { + thread->startThreadReq( + {request, std::bind( + &PuppetThreadLifetimeMgmtOp::executeGenericOpOnAllPuppetThreadsReq1, + request.get(), request)}); + } +} + +void PuppetApplication::pauseAllPuppetThreadsReq( + Callback callback + ) +{ + // If no threads, call callback immediately + if (componentThreads.size() == 0 && callback.callbackFn) + { + callback.callbackFn(); + return; + } + + // Create a counter to track when all threads have paused + auto request = std::make_shared( + *this, componentThreads.size(), callback); + + for (auto& thread : componentThreads) + { + thread->pauseThreadReq( + {request, std::bind( + &PuppetThreadLifetimeMgmtOp::executeGenericOpOnAllPuppetThreadsReq1, + request.get(), request)}); + } +} + +void PuppetApplication::resumeAllPuppetThreadsReq( + Callback callback + ) +{ + // If no threads, call callback immediately + if (componentThreads.size() == 0 && callback.callbackFn) + { + callback.callbackFn(); + return; + } + + // Create a counter to track when all threads have resumed + auto request = std::make_shared( + *this, componentThreads.size(), callback); + + for (auto& thread : componentThreads) + { + thread->resumeThreadReq( + {request, std::bind( + &PuppetThreadLifetimeMgmtOp::executeGenericOpOnAllPuppetThreadsReq1, + request.get(), request)}); + } +} + +void PuppetApplication::exitAllPuppetThreadsReq( + Callback callback + ) +{ + // If no threads, call callback immediately + if (componentThreads.size() == 0 && callback.callbackFn) + { + callback.callbackFn(); + return; + } + + // Create a counter to track when all threads have exited + auto request = std::make_shared( + *this, componentThreads.size(), callback); + + for (auto& thread : componentThreads) + { + thread->exitThreadReq( + {request, std::bind( + &PuppetThreadLifetimeMgmtOp::executeGenericOpOnAllPuppetThreadsReq1, + request.get(), request)}); + } +} + +} // namespace smo diff --git a/smocore/include/mind.h b/smocore/include/mind.h index 69de0e3..2838c66 100644 --- a/smocore/include/mind.h +++ b/smocore/include/mind.h @@ -2,14 +2,13 @@ #define _MIND_H #include -#include #include #include -#include #include #include #include +#include #include #include #include @@ -18,7 +17,8 @@ namespace smo { -class Mind : public std::enable_shared_from_this +class Mind +: public PuppetApplication { public: Mind(void); @@ -35,24 +35,9 @@ public: // Get all this Mind's component threads. std::vector> getMindThreads() const; - // Thread management methods (moved from ComponentThread) - typedef std::function mindThreadLifetimeMgmtOpCbFn; - void joltAllMindThreadsReq(Callback callback); - void startAllMindThreadsReq( - Callback callback); - void pauseAllMindThreadsReq( - Callback callback); - void resumeAllMindThreadsReq( - Callback callback); - void exitAllMindThreadsReq(Callback callback); - // CPU distribution method void distributeAndPinThreadsAcrossCpus(); -private: - // Collection of ComponentThread instances (excluding marionette) - std::vector> componentThreads; - public: director::Director director; simulator::Simulator canvas; @@ -62,31 +47,10 @@ public: private: friend class body::Body; - /** - * Indicates whether all mind threads have been JOLTed at least once. - * - * JOLTing serves two critical purposes: - * - * 1. **Global Constructor Sequencing**: Since pthreads begin executing while - * global constructors are still being executed, globally defined pthreads - * cannot depend on global objects having been constructed. JOLTing is done - * by the CRT's main thread within main(), which provides a sequencing - * guarantee that global constructors have been called. - * - * 2. **shared_from_this Safety**: shared_from_this() requires a prior - * shared_ptr handle to be established. The global list of - * shared_ptr guarantees that at least one shared_ptr to - * each ComponentThread has been initialized before JOLTing occurs. - * - * This flag ensures that JOLTing happens exactly once and provides - * a synchronization point for the entire system initialization. - */ - bool threadsHaveBeenJolted = false, - bodyComponentInitialized = false; + bool bodyComponentInitialized = false; private: class MindLifetimeMgmtOp; - class MindThreadLifetimeMgmtOp; }; namespace mind { diff --git a/smocore/mind.cpp b/smocore/mind.cpp index 1fba835..700e2a6 100644 --- a/smocore/mind.cpp +++ b/smocore/mind.cpp @@ -14,24 +14,25 @@ namespace smo { Mind::Mind(void) - : componentThreads{ - std::make_shared(SmoThreadId::DIRECTOR, *this), - std::make_shared(SmoThreadId::SIMULATOR, *this), - std::make_shared(SmoThreadId::SUBCONSCIOUS, *this), - std::make_shared(SmoThreadId::BODY, *this) + : PuppetApplication( + std::vector>{ + std::make_shared(SmoThreadId::DIRECTOR, *this), + std::make_shared(SmoThreadId::SIMULATOR, *this), + std::make_shared(SmoThreadId::SUBCONSCIOUS, *this), + std::make_shared(SmoThreadId::BODY, *this) #ifndef CONFIG_WORLD_USE_BODY_THREAD - , std::make_shared(SmoThreadId::WORLD, *this) + , std::make_shared(SmoThreadId::WORLD, *this) #endif - }, - director(*this, componentThreads[0]), - canvas(*this, componentThreads[1]), - subconscious(*this, componentThreads[2]), - body(*this, componentThreads[3]), + }), + director(*this, std::static_pointer_cast(componentThreads[0])), + canvas(*this, std::static_pointer_cast(componentThreads[1])), + subconscious(*this, std::static_pointer_cast(componentThreads[2])), + body(*this, std::static_pointer_cast(componentThreads[3])), world(*this, #ifndef CONFIG_WORLD_USE_BODY_THREAD - componentThreads[4] + std::static_pointer_cast(componentThreads[4]) #else - componentThreads[3] + std::static_pointer_cast(componentThreads[3]) #endif ) { @@ -49,8 +50,11 @@ Mind::getComponentThread(ThreadId id) const } // Search through the vector for the thread with matching id - for (auto& thread : componentThreads) { - if (thread->id == id) { return thread; } + for (auto& thread : componentThreads) + { + if (thread->id == id) { + return std::static_pointer_cast(thread); + } } // Throw exception if no thread found @@ -70,8 +74,11 @@ Mind::getComponentThread(const std::string& name) const "getComponentThread"); } - for (auto& thread : componentThreads) { - if (thread->name == name) { return thread; } + for (auto& thread : componentThreads) + { + if (thread->name == name) { + return std::static_pointer_cast(thread); + } } // Throw exception if no thread found @@ -82,7 +89,12 @@ Mind::getComponentThread(const std::string& name) const std::vector> Mind::getMindThreads() const { - return componentThreads; + std::vector> mindThreads; + mindThreads.reserve(componentThreads.size()); + for (auto& thread : componentThreads) { + mindThreads.push_back(std::static_pointer_cast(thread)); + } + return mindThreads; } class Mind::MindLifetimeMgmtOp @@ -106,7 +118,7 @@ public: ) { /* Jolt the threads, then start them */ - parent.joltAllMindThreadsReq( + parent.joltAllPuppetThreadsReq( {context, std::bind( &MindLifetimeMgmtOp::initializeReq2, context.get(), context)}); @@ -118,7 +130,7 @@ public: { std::cout << "Mrntt: All mind threads JOLTed." << "\n"; - parent.startAllMindThreadsReq( + parent.startAllPuppetThreadsReq( {context, std::bind( &MindLifetimeMgmtOp::initializeReq3, context.get(), context)}); @@ -170,7 +182,7 @@ public: * otherwise they'll just enter their main loops and wait for control * messages from mrntt after processing the exit request. */ - parent.joltAllMindThreadsReq( + parent.joltAllPuppetThreadsReq( {context, std::bind( &MindLifetimeMgmtOp::finalizeReq3, context.get(), context)}); @@ -182,7 +194,7 @@ public: { std::cout << "Mrntt: All mind threads JOLTed for finalization." << "\n"; - parent.exitAllMindThreadsReq( + parent.exitAllPuppetThreadsReq( {context, std::bind( &MindLifetimeMgmtOp::finalizeReq4, context.get(), context)}); @@ -254,193 +266,4 @@ void Mind::distributeAndPinThreadsAcrossCpus() << "across " << cpuCount << " CPUs\n"; } -class Mind::MindThreadLifetimeMgmtOp -: public NonPostedAsynchronousContinuation -{ -public: - MindThreadLifetimeMgmtOp( - Mind &parent,unsigned int nThreads, - Callback callback) - : NonPostedAsynchronousContinuation(callback), - loop(nThreads), - parent(parent) - {} - -public: - AsynchronousLoop loop; - Mind &parent; - -public: - void joltAllMindThreadsReq1( - [[maybe_unused]] std::shared_ptr context - ) - { - loop.incrementSuccessOrFailureDueTo(true); - if (!loop.isComplete()) { - return; - } - - parent.threadsHaveBeenJolted = true; - callOriginalCb(); - } - - void executeGenericOpOnAllMindThreadsReq1( - [[maybe_unused]] std::shared_ptr context - ) - { - loop.incrementSuccessOrFailureDueTo(true); - if (!loop.isComplete()) { - return; - } - - callOriginalCb(); - } - - void exitAllMindThreadsReq1( - [[maybe_unused]] std::shared_ptr context - ) - { - loop.incrementSuccessOrFailureDueTo(true); - if (!loop.isComplete()) { - return; - } - - for (auto& thread : parent.componentThreads) { - thread->thread.join(); - } - - callOriginalCb(); - } -}; - -void Mind::joltAllMindThreadsReq( - Callback callback - ) -{ - if (threadsHaveBeenJolted) - { - std::cout << "Mrntt: All mind threads already JOLTed. " - << "Skipping JOLT request." << "\n"; - callback.callbackFn(); - return; - } - - // If no threads, set flag and call callback immediately - if (componentThreads.size() == 0 && callback.callbackFn) - { - threadsHaveBeenJolted = true; - callback.callbackFn(); - return; - } - - // Create a counter to track when all threads have been jolted - auto request = std::make_shared( - *this, componentThreads.size(), callback); - - for (auto& thread : componentThreads) - { - thread->joltThreadReq( - {request, std::bind( - &MindThreadLifetimeMgmtOp::joltAllMindThreadsReq1, - request.get(), request)}); - } -} - -// Thread management methods (moved from ComponentThread) -void Mind::startAllMindThreadsReq( - Callback callback - ) -{ - // If no threads, call callback immediately - if (componentThreads.size() == 0 && callback.callbackFn) - { - callback.callbackFn(); - return; - } - - // Create a counter to track when all threads have started - auto request = std::make_shared( - *this, componentThreads.size(), callback); - - for (auto& thread : componentThreads) - { - thread->startThreadReq( - {request, std::bind( - &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, - request.get(), request)}); - } -} - -void Mind::pauseAllMindThreadsReq( - Callback callback - ) -{ - // If no threads, call callback immediately - if (componentThreads.size() == 0 && callback.callbackFn) - { - callback.callbackFn(); - return; - } - - // Create a counter to track when all threads have paused - auto request = std::make_shared( - *this, componentThreads.size(), callback); - - for (auto& thread : componentThreads) - { - thread->pauseThreadReq( - {request, std::bind( - &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, - request.get(), request)}); - } -} - -void Mind::resumeAllMindThreadsReq( - Callback callback - ) -{ - // If no threads, call callback immediately - if (componentThreads.size() == 0 && callback.callbackFn) - { - callback.callbackFn(); - return; - } - - // Create a counter to track when all threads have resumed - auto request = std::make_shared( - *this, componentThreads.size(), callback); - - for (auto& thread : componentThreads) - { - thread->resumeThreadReq( - {request, std::bind( - &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, - request.get(), request)}); - } -} - -void Mind::exitAllMindThreadsReq( - Callback callback - ) -{ - // If no threads, call callback immediately - if (componentThreads.size() == 0 && callback.callbackFn) - { - callback.callbackFn(); - return; - } - - // Create a counter to track when all threads have exited - auto request = std::make_shared( - *this, componentThreads.size(), callback); - - for (auto& thread : componentThreads) - { - thread->exitThreadReq( - {request, std::bind( - &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, - request.get(), request)}); - } -} - } // namespace smo