Mind,Mrntt: Use async pattern in Mind; init threads before initializeSmo

In Mrntt, we now initialize Mind:: object threads before calling
initializeSalmanoffReq().

We've also propagated the spinscale async pattern into the Mind
class.
This commit is contained in:
2025-09-11 20:11:10 -04:00
parent 89947dfc71
commit 4429135539
3 changed files with 266 additions and 143 deletions
+12 -8
View File
@@ -20,9 +20,9 @@ public:
Mind(void); Mind(void);
~Mind(void) = default; ~Mind(void) = default;
void initialize(void); typedef std::function<void(bool)> mindLifetimeMgmtOpCbFn;
void execute(void); void initializeReq(mindLifetimeMgmtOpCbFn callback);
void finalizeReq(std::function<void()> callback); void finalizeReq(mindLifetimeMgmtOpCbFn callback);
// ComponentThread access methods // ComponentThread access methods
std::shared_ptr<ComponentThread> getComponentThread( std::shared_ptr<ComponentThread> getComponentThread(
@@ -33,11 +33,12 @@ public:
std::vector<std::shared_ptr<ComponentThread>> getMindThreads() const; std::vector<std::shared_ptr<ComponentThread>> getMindThreads() const;
// Thread management methods (moved from ComponentThread) // Thread management methods (moved from ComponentThread)
void startAllMindThreadsReq(std::function<void()> callback = nullptr); typedef std::function<void()> mindThreadLifetimeMgmtOpCbFn;
void pauseAllMindThreadsReq(std::function<void()> callback = nullptr); void joltAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback);
void resumeAllMindThreadsReq(std::function<void()> callback = nullptr); void startAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback);
void exitAllMindThreadsReq(std::function<void()> callback = nullptr); void pauseAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback);
void joltAllMindThreadsReq(std::function<void()> callback = nullptr); void resumeAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback);
void exitAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback);
// CPU distribution method // CPU distribution method
void distributeAndPinThreadsAcrossCpus(); void distributeAndPinThreadsAcrossCpus();
@@ -73,6 +74,9 @@ private:
bool threadsHaveBeenJolted = false; bool threadsHaveBeenJolted = false;
// Collection of ComponentThread instances (excluding marionette) // Collection of ComponentThread instances (excluding marionette)
std::vector<std::shared_ptr<ComponentThread>> componentThreads; std::vector<std::shared_ptr<ComponentThread>> componentThreads;
class MindLifetimeMgmtOp;
class MindThreadLifetimeMgmtOp;
}; };
// Global Mind instance will be defined in marionette.cpp // Global Mind instance will be defined in marionette.cpp
+21 -10
View File
@@ -90,16 +90,24 @@ void ComponentThread::marionetteMain(ComponentThread& self)
self.getIoService().post([]() self.getIoService().post([]()
{ {
// Initialize Salmanoff first // Initialize Mind object (threads) first
initializeSalmanoff([](bool success) globalMind->initializeReq(
{ [](bool success)
if (success) { {
// Then initialize the global Mind object if (success) {
globalMind->initialize(); initializeSalmanoff([](bool success) {
} else { if (!success) {
std::cerr << "Failed to initialize Salmanoff" << std::endl; std::cerr << "Failed to initialize "
"Salmanoff" << '\n';
}
});
}
else {
std::cerr << "Failed to initialize Mind object "
"(threads)" << '\n';
}
} }
}); );
}); });
std::cout << __func__ << ": Entering event loop" << "\n"; std::cout << __func__ << ": Entering event loop" << "\n";
@@ -199,7 +207,10 @@ void ComponentThread::marionetteMain(ComponentThread& self)
if (callFinalizeReq) if (callFinalizeReq)
{ {
globalMind->finalizeReq([]{ globalMind->finalizeReq([](bool success) {
if (!success) {
std::cerr << "Failed to finalize Mind object (threads)" << '\n';
}
mrntt::mrntt->getIoService().stop(); mrntt::mrntt->getIoService().stop();
}); });
self.getIoService().reset(); self.getIoService().reset();
+233 -125
View File
@@ -1,5 +1,7 @@
#include <iostream> #include <iostream>
#include <opts.h> #include <opts.h>
#include <asynchronousContinuation.h>
#include <asynchronousLoop.h>
#include <mind.h> #include <mind.h>
#include <componentThread.h> #include <componentThread.h>
@@ -16,94 +18,6 @@ Mind::Mind(void)
{ {
} }
void Mind::initialize()
{
/* Distribute threads across available CPUs */
try
{
distributeAndPinThreadsAcrossCpus();
}
catch (const std::exception& e)
{
std::cerr << "Salmanoff couldn't distribute the mind threads across "
"the CPUs, so performance may be suboptimal.\n"
"Error: " << e.what() << "\n";
}
/* Jolt the threads, then start them */
joltAllMindThreadsReq(
[this]()
{
std::cout << "Mrntt: All mind threads JOLTed." << "\n";
// Start all threads after JOLTing
startAllMindThreadsReq(
[]()
{
std::cout << "Mrntt: All mind threads started." << "\n";
}
);
}
);
}
void Mind::finalizeReq(std::function<void()> callback)
{
/* If the threads haven't been jolted, we need to do that first, because
* otherwise they'll just enter their main loops and wait for control
* messages from mrntt after processing the exit request.
*/
if (!threadsHaveBeenJolted)
{
joltAllMindThreadsReq(
[this, callback]()
{
exitAllMindThreadsReq(
[callback]()
{
std::cout << "Mrntt: All mind threads exited." << "\n";
if (callback) { callback(); }
}
);
}
);
}
else
{
exitAllMindThreadsReq(
[callback]()
{
std::cout << "Mrntt: All mind threads exited." << "\n";
if (callback) { callback(); }
}
);
}
}
void Mind::joltAllMindThreadsReq(std::function<void()> callback)
{
// Create a counter to track when all threads have been jolted
auto counter = std::make_shared<std::atomic<int>>(componentThreads.size());
for (auto& thread : componentThreads)
{
thread->joltThreadReq([counter, callback, this]() {
if (--(*counter) == 0)
{
// Set the flag only after all threads have ACKed their JOLT
threadsHaveBeenJolted = true;
if (callback) { callback(); }
}
});
}
// If no threads, set flag and call callback immediately
if (componentThreads.empty())
{
threadsHaveBeenJolted = true;
if (callback) { callback(); }
}
}
std::shared_ptr<ComponentThread> std::shared_ptr<ComponentThread>
Mind::getComponentThread(ComponentThread::ThreadId id) const Mind::getComponentThread(ComponentThread::ThreadId id) const
{ {
@@ -141,6 +55,117 @@ Mind::getMindThreads() const
return componentThreads; return componentThreads;
} }
class Mind::MindLifetimeMgmtOp
: public AsynchronousContinuation<mindLifetimeMgmtOpCbFn>
{
public:
MindLifetimeMgmtOp(
Mind &parent, mindLifetimeMgmtOpCbFn callback)
: AsynchronousContinuation<mindLifetimeMgmtOpCbFn>(callback),
parent(parent)
{}
void callOriginalCbFn(void)
{
if (originalCbFn) {
originalCbFn(true);
}
}
public:
Mind &parent;
public:
void initializeReq1(
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
)
{
std::cout << "Mrntt: All mind threads JOLTed." << "\n";
parent.startAllMindThreadsReq(
std::bind(
&MindLifetimeMgmtOp::initializeReq2,
context.get(), context));
}
void initializeReq2(
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
)
{
std::cout << "Mrntt: All mind threads started." << "\n";
callOriginalCbFn();
}
void finalizeReq1(
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
)
{
std::cout << "Mrntt: All mind threads JOLTed." << "\n";
parent.exitAllMindThreadsReq(
std::bind(
&MindLifetimeMgmtOp::finalizeReq2,
context.get(), context));
}
void finalizeReq2(
[[maybe_unused]] std::shared_ptr<MindLifetimeMgmtOp> context
)
{
std::cout << "Mrntt: All mind threads exited." << "\n";
callOriginalCbFn();
}
};
void Mind::initializeReq(mindLifetimeMgmtOpCbFn callback)
{
/* Distribute threads across available CPUs */
try
{
distributeAndPinThreadsAcrossCpus();
}
catch (const std::exception& e)
{
std::cerr << "Salmanoff couldn't distribute the mind threads across "
"the CPUs, so performance may be suboptimal.\n"
"Error: " << e.what() << "\n";
}
auto request = std::make_shared<MindLifetimeMgmtOp>(
*this, callback);
/* Jolt the threads, then start them */
joltAllMindThreadsReq(
std::bind(
&MindLifetimeMgmtOp::initializeReq1,
request.get(), request));
}
void Mind::finalizeReq(mindLifetimeMgmtOpCbFn callback)
{
auto request = std::make_shared<MindLifetimeMgmtOp>(
*this, callback);
/* If the threads haven't been jolted, we need to do that first, because
* otherwise they'll just enter their main loops and wait for control
* messages from mrntt after processing the exit request.
*/
if (!threadsHaveBeenJolted)
{
joltAllMindThreadsReq(
std::bind(
&MindLifetimeMgmtOp::finalizeReq1,
request.get(), request));
}
else
{
exitAllMindThreadsReq(
std::bind(
&MindLifetimeMgmtOp::finalizeReq1,
request.get(), request));
}
}
void Mind::distributeAndPinThreadsAcrossCpus() void Mind::distributeAndPinThreadsAcrossCpus()
{ {
int cpuCount = ComponentThread::getAvailableCpuCount(); int cpuCount = ComponentThread::getAvailableCpuCount();
@@ -162,82 +187,165 @@ void Mind::distributeAndPinThreadsAcrossCpus()
<< "across " << cpuCount << " CPUs\n"; << "across " << cpuCount << " CPUs\n";
} }
class Mind::MindThreadLifetimeMgmtOp
: public AsynchronousContinuation<mindThreadLifetimeMgmtOpCbFn>
{
public:
MindThreadLifetimeMgmtOp(
Mind &parent,unsigned int nThreads,
mindThreadLifetimeMgmtOpCbFn callback)
: AsynchronousContinuation<mindThreadLifetimeMgmtOpCbFn>(callback),
loop(nThreads),
parent(parent)
{}
void callOriginalCbFn(void)
{
if (originalCbFn) {
originalCbFn();
}
}
public:
AsynchronousLoop loop;
Mind &parent;
public:
void joltAllMindThreadsReq1(
[[maybe_unused]] std::shared_ptr<MindThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
parent.threadsHaveBeenJolted = true;
callOriginalCbFn();
}
void executeGenericOpOnAllMindThreadsReq1(
[[maybe_unused]] std::shared_ptr<MindThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
callOriginalCbFn();
}
void exitAllMindThreadsReq1(
[[maybe_unused]] std::shared_ptr<MindThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
for (auto& thread : parent.componentThreads) {
thread->thread.join();
}
callOriginalCbFn();
}
};
void Mind::joltAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
{
// Create a counter to track when all threads have been jolted
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->joltThreadReq(
std::bind(
&MindThreadLifetimeMgmtOp::joltAllMindThreadsReq1,
request.get(), request));
}
// If no threads, set flag and call callback immediately
if (request->loop.nTotalIsZero() && callback)
{
threadsHaveBeenJolted = true;
callback();
}
}
// Thread management methods (moved from ComponentThread) // Thread management methods (moved from ComponentThread)
void Mind::startAllMindThreadsReq(std::function<void()> callback) void Mind::startAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
{ {
// Create a counter to track when all threads have started // Create a counter to track when all threads have started
auto counter = std::make_shared<std::atomic<int>>(componentThreads.size()); auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads) for (auto& thread : componentThreads)
{ {
thread->startThreadReq([counter, callback]() { thread->startThreadReq(
if (--(*counter) == 0 && callback) { callback(); } std::bind(
}); &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
request.get(), request));
} }
// If no threads, call callback immediately // If no threads, call callback immediately
if (componentThreads.empty() && callback) { callback(); } if (request->loop.nTotalIsZero() && callback) { callback(); }
} }
void Mind::pauseAllMindThreadsReq(std::function<void()> callback) void Mind::pauseAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
{ {
// Create a counter to track when all threads have paused // Create a counter to track when all threads have paused
auto counter = std::make_shared<std::atomic<int>>(componentThreads.size()); auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads) for (auto& thread : componentThreads)
{ {
thread->pauseThreadReq([counter, callback]() { thread->pauseThreadReq(
if (--(*counter) == 0 && callback) { callback(); } std::bind(
}); &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
request.get(), request));
} }
// If no threads, call callback immediately // If no threads, call callback immediately
if (componentThreads.empty() && callback) { if (request->loop.nTotalIsZero() && callback) { callback(); }
callback();
}
} }
void Mind::resumeAllMindThreadsReq(std::function<void()> callback) void Mind::resumeAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
{ {
// Create a counter to track when all threads have resumed // Create a counter to track when all threads have resumed
auto counter = std::make_shared<std::atomic<int>>(componentThreads.size()); auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads) for (auto& thread : componentThreads)
{ {
thread->resumeThreadReq([counter, callback]() { thread->resumeThreadReq(
if (--(*counter) == 0 && callback) { callback(); } std::bind(
}); &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
request.get(), request));
} }
// If no threads, call callback immediately // If no threads, call callback immediately
if (componentThreads.empty() && callback) { if (request->loop.nTotalIsZero() && callback) { callback(); }
callback();
}
} }
void Mind::exitAllMindThreadsReq(std::function<void()> callback) void Mind::exitAllMindThreadsReq(mindThreadLifetimeMgmtOpCbFn callback)
{ {
// Create a counter to track when all threads have exited // Create a counter to track when all threads have exited
auto counter = std::make_shared<std::atomic<int>>(componentThreads.size()); auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads) for (auto& thread : componentThreads)
{ {
thread->exitThreadReq([counter, callback, this]() { thread->exitThreadReq(
if (--(*counter) == 0) std::bind(
{ &MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
// All threads have exited their loops, now join them request.get(), request));
for (auto& t : componentThreads) {
t->thread.join();
}
if (callback) { callback(); }
}
});
} }
// If no threads, call callback immediately // If no threads, call callback immediately
if (componentThreads.empty() && callback) { if (request->loop.nTotalIsZero() && callback) { callback(); }
callback();
}
} }
} // namespace smo } // namespace smo