diff --git a/smocore/include/mind.h b/smocore/include/mind.h index dab5f85..cc56e6a 100644 --- a/smocore/include/mind.h +++ b/smocore/include/mind.h @@ -20,9 +20,9 @@ public: Mind(void); ~Mind(void) = default; - void initialize(void); - void execute(void); - void finalizeReq(std::function callback); + typedef std::function mindLifetimeMgmtOpCbFn; + void initializeReq(mindLifetimeMgmtOpCbFn callback); + void finalizeReq(mindLifetimeMgmtOpCbFn callback); // ComponentThread access methods std::shared_ptr getComponentThread( @@ -33,11 +33,12 @@ public: std::vector> getMindThreads() const; // Thread management methods (moved from ComponentThread) - void startAllMindThreadsReq(std::function callback = nullptr); - void pauseAllMindThreadsReq(std::function callback = nullptr); - void resumeAllMindThreadsReq(std::function callback = nullptr); - void exitAllMindThreadsReq(std::function callback = nullptr); - void joltAllMindThreadsReq(std::function callback = nullptr); + typedef std::function mindThreadLifetimeMgmtOpCbFn; + void joltAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback); + void startAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback); + void pauseAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback); + void resumeAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback); + void exitAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback); // CPU distribution method void distributeAndPinThreadsAcrossCpus(); @@ -73,6 +74,9 @@ private: bool threadsHaveBeenJolted = false; // Collection of ComponentThread instances (excluding marionette) std::vector> componentThreads; + + class MindLifetimeMgmtOp; + class MindThreadLifetimeMgmtOp; }; // Global Mind instance will be defined in marionette.cpp diff --git a/smocore/marionette/marionette.cpp b/smocore/marionette/marionette.cpp index 6337331..443814e 100644 --- a/smocore/marionette/marionette.cpp +++ b/smocore/marionette/marionette.cpp @@ -90,16 +90,24 @@ void ComponentThread::marionetteMain(ComponentThread& self) self.getIoService().post([]() { - // Initialize Salmanoff first - initializeSalmanoff([](bool success) - { - if (success) { - // Then initialize the global Mind object - globalMind->initialize(); - } else { - std::cerr << "Failed to initialize Salmanoff" << std::endl; + // Initialize Mind object (threads) first + globalMind->initializeReq( + [](bool success) + { + if (success) { + initializeSalmanoff([](bool success) { + if (!success) { + std::cerr << "Failed to initialize " + "Salmanoff" << '\n'; + } + }); + } + else { + std::cerr << "Failed to initialize Mind object " + "(threads)" << '\n'; + } } - }); + ); }); std::cout << __func__ << ": Entering event loop" << "\n"; @@ -199,7 +207,10 @@ void ComponentThread::marionetteMain(ComponentThread& self) if (callFinalizeReq) { - globalMind->finalizeReq([]{ + globalMind->finalizeReq([](bool success) { + if (!success) { + std::cerr << "Failed to finalize Mind object (threads)" << '\n'; + } mrntt::mrntt->getIoService().stop(); }); self.getIoService().reset(); diff --git a/smocore/mind.cpp b/smocore/mind.cpp index fbb2a99..331ca9d 100644 --- a/smocore/mind.cpp +++ b/smocore/mind.cpp @@ -1,5 +1,7 @@ #include #include +#include +#include #include #include @@ -16,94 +18,6 @@ Mind::Mind(void) { } -void Mind::initialize() -{ - /* Distribute threads across available CPUs */ - try - { - distributeAndPinThreadsAcrossCpus(); - } - catch (const std::exception& e) - { - std::cerr << "Salmanoff couldn't distribute the mind threads across " - "the CPUs, so performance may be suboptimal.\n" - "Error: " << e.what() << "\n"; - } - - /* Jolt the threads, then start them */ - joltAllMindThreadsReq( - [this]() - { - std::cout << "Mrntt: All mind threads JOLTed." << "\n"; - // Start all threads after JOLTing - startAllMindThreadsReq( - []() - { - std::cout << "Mrntt: All mind threads started." << "\n"; - } - ); - } - ); -} - -void Mind::finalizeReq(std::function callback) -{ - /* If the threads haven't been jolted, we need to do that first, because - * otherwise they'll just enter their main loops and wait for control - * messages from mrntt after processing the exit request. - */ - if (!threadsHaveBeenJolted) - { - joltAllMindThreadsReq( - [this, callback]() - { - exitAllMindThreadsReq( - [callback]() - { - std::cout << "Mrntt: All mind threads exited." << "\n"; - if (callback) { callback(); } - } - ); - } - ); - } - else - { - exitAllMindThreadsReq( - [callback]() - { - std::cout << "Mrntt: All mind threads exited." << "\n"; - if (callback) { callback(); } - } - ); - } -} - -void Mind::joltAllMindThreadsReq(std::function callback) -{ - // Create a counter to track when all threads have been jolted - auto counter = std::make_shared>(componentThreads.size()); - - for (auto& thread : componentThreads) - { - thread->joltThreadReq([counter, callback, this]() { - if (--(*counter) == 0) - { - // Set the flag only after all threads have ACKed their JOLT - threadsHaveBeenJolted = true; - if (callback) { callback(); } - } - }); - } - - // If no threads, set flag and call callback immediately - if (componentThreads.empty()) - { - threadsHaveBeenJolted = true; - if (callback) { callback(); } - } -} - std::shared_ptr Mind::getComponentThread(ComponentThread::ThreadId id) const { @@ -141,6 +55,117 @@ Mind::getMindThreads() const return componentThreads; } +class Mind::MindLifetimeMgmtOp +: public AsynchronousContinuation +{ +public: + MindLifetimeMgmtOp( + Mind &parent, mindLifetimeMgmtOpCbFn callback) + : AsynchronousContinuation(callback), + parent(parent) + {} + + void callOriginalCbFn(void) + { + if (originalCbFn) { + originalCbFn(true); + } + } + +public: + Mind &parent; + +public: + void initializeReq1( + [[maybe_unused]] std::shared_ptr context + ) + { + std::cout << "Mrntt: All mind threads JOLTed." << "\n"; + + parent.startAllMindThreadsReq( + std::bind( + &MindLifetimeMgmtOp::initializeReq2, + context.get(), context)); + } + + void initializeReq2( + [[maybe_unused]] std::shared_ptr context + ) + { + std::cout << "Mrntt: All mind threads started." << "\n"; + callOriginalCbFn(); + } + + void finalizeReq1( + [[maybe_unused]] std::shared_ptr context + ) + { + std::cout << "Mrntt: All mind threads JOLTed." << "\n"; + + parent.exitAllMindThreadsReq( + std::bind( + &MindLifetimeMgmtOp::finalizeReq2, + context.get(), context)); + } + + void finalizeReq2( + [[maybe_unused]] std::shared_ptr context + ) + { + std::cout << "Mrntt: All mind threads exited." << "\n"; + callOriginalCbFn(); + } +}; + +void Mind::initializeReq(mindLifetimeMgmtOpCbFn callback) +{ + /* Distribute threads across available CPUs */ + try + { + distributeAndPinThreadsAcrossCpus(); + } + catch (const std::exception& e) + { + std::cerr << "Salmanoff couldn't distribute the mind threads across " + "the CPUs, so performance may be suboptimal.\n" + "Error: " << e.what() << "\n"; + } + + auto request = std::make_shared( + *this, callback); + + /* Jolt the threads, then start them */ + joltAllMindThreadsReq( + std::bind( + &MindLifetimeMgmtOp::initializeReq1, + request.get(), request)); +} + +void Mind::finalizeReq(mindLifetimeMgmtOpCbFn callback) +{ + auto request = std::make_shared( + *this, callback); + + /* If the threads haven't been jolted, we need to do that first, because + * otherwise they'll just enter their main loops and wait for control + * messages from mrntt after processing the exit request. + */ + if (!threadsHaveBeenJolted) + { + joltAllMindThreadsReq( + std::bind( + &MindLifetimeMgmtOp::finalizeReq1, + request.get(), request)); + } + else + { + exitAllMindThreadsReq( + std::bind( + &MindLifetimeMgmtOp::finalizeReq1, + request.get(), request)); + } +} + void Mind::distributeAndPinThreadsAcrossCpus() { int cpuCount = ComponentThread::getAvailableCpuCount(); @@ -162,82 +187,165 @@ void Mind::distributeAndPinThreadsAcrossCpus() << "across " << cpuCount << " CPUs\n"; } +class Mind::MindThreadLifetimeMgmtOp +: public AsynchronousContinuation +{ +public: + MindThreadLifetimeMgmtOp( + Mind &parent,unsigned int nThreads, + mindThreadLifetimeMgmtOpCbFn callback) + : AsynchronousContinuation(callback), + loop(nThreads), + parent(parent) + {} + + void callOriginalCbFn(void) + { + if (originalCbFn) { + originalCbFn(); + } + } + +public: + AsynchronousLoop loop; + Mind &parent; + +public: + void joltAllMindThreadsReq1( + [[maybe_unused]] std::shared_ptr context + ) + { + loop.incrementSuccessOrFailureDueTo(true); + if (!loop.isComplete()) { + return; + } + + parent.threadsHaveBeenJolted = true; + callOriginalCbFn(); + } + + void executeGenericOpOnAllMindThreadsReq1( + [[maybe_unused]] std::shared_ptr context + ) + { + loop.incrementSuccessOrFailureDueTo(true); + if (!loop.isComplete()) { + return; + } + + callOriginalCbFn(); + } + + void exitAllMindThreadsReq1( + [[maybe_unused]] std::shared_ptr context + ) + { + loop.incrementSuccessOrFailureDueTo(true); + if (!loop.isComplete()) { + return; + } + + for (auto& thread : parent.componentThreads) { + thread->thread.join(); + } + + callOriginalCbFn(); + } +}; + +void Mind::joltAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback) +{ + // Create a counter to track when all threads have been jolted + auto request = std::make_shared( + *this, componentThreads.size(), callback); + + for (auto& thread : componentThreads) + { + thread->joltThreadReq( + std::bind( + &MindThreadLifetimeMgmtOp::joltAllMindThreadsReq1, + request.get(), request)); + } + + // If no threads, set flag and call callback immediately + if (request->loop.nTotalIsZero() && callback) + { + threadsHaveBeenJolted = true; + callback(); + } +} + // Thread management methods (moved from ComponentThread) -void Mind::startAllMindThreadsReq(std::function callback) +void Mind::startAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback) { // Create a counter to track when all threads have started - auto counter = std::make_shared>(componentThreads.size()); + auto request = std::make_shared( + *this, componentThreads.size(), callback); for (auto& thread : componentThreads) { - thread->startThreadReq([counter, callback]() { - if (--(*counter) == 0 && callback) { callback(); } - }); + thread->startThreadReq( + std::bind( + &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, + request.get(), request)); } // If no threads, call callback immediately - if (componentThreads.empty() && callback) { callback(); } + if (request->loop.nTotalIsZero() && callback) { callback(); } } -void Mind::pauseAllMindThreadsReq(std::function callback) +void Mind::pauseAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback) { // Create a counter to track when all threads have paused - auto counter = std::make_shared>(componentThreads.size()); + auto request = std::make_shared( + *this, componentThreads.size(), callback); for (auto& thread : componentThreads) { - thread->pauseThreadReq([counter, callback]() { - if (--(*counter) == 0 && callback) { callback(); } - }); + thread->pauseThreadReq( + std::bind( + &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, + request.get(), request)); } // If no threads, call callback immediately - if (componentThreads.empty() && callback) { - callback(); - } + if (request->loop.nTotalIsZero() && callback) { callback(); } } -void Mind::resumeAllMindThreadsReq(std::function callback) +void Mind::resumeAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback) { // Create a counter to track when all threads have resumed - auto counter = std::make_shared>(componentThreads.size()); + auto request = std::make_shared( + *this, componentThreads.size(), callback); for (auto& thread : componentThreads) { - thread->resumeThreadReq([counter, callback]() { - if (--(*counter) == 0 && callback) { callback(); } - }); + thread->resumeThreadReq( + std::bind( + &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, + request.get(), request)); } // If no threads, call callback immediately - if (componentThreads.empty() && callback) { - callback(); - } + if (request->loop.nTotalIsZero() && callback) { callback(); } } -void Mind::exitAllMindThreadsReq(std::function callback) +void Mind::exitAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback) { // Create a counter to track when all threads have exited - auto counter = std::make_shared>(componentThreads.size()); + auto request = std::make_shared( + *this, componentThreads.size(), callback); for (auto& thread : componentThreads) { - thread->exitThreadReq([counter, callback, this]() { - if (--(*counter) == 0) - { - // All threads have exited their loops, now join them - for (auto& t : componentThreads) { - t->thread.join(); - } - if (callback) { callback(); } - } - }); + thread->exitThreadReq( + std::bind( + &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1, + request.get(), request)); } // If no threads, call callback immediately - if (componentThreads.empty() && callback) { - callback(); - } + if (request->loop.nTotalIsZero() && callback) { callback(); } } } // namespace smo