spinscale: Move thread init/jolt/exit logic into PuppetApplication

This commit is contained in:
2025-12-27 14:01:15 -04:00
parent cd77f4b02d
commit f862db922e
5 changed files with 308 additions and 251 deletions
+1
View File
@@ -3,6 +3,7 @@ add_library(spinscale SHARED
src/lockerAndInvokerBase.cpp
src/componentThread.cpp
src/component.cpp
src/puppetApplication.cpp
)
# Conditionally add qutexAcquisitionHistoryTracker.cpp only when debug locks
@@ -0,0 +1,65 @@
#ifndef PUPPET_APPLICATION_H
#define PUPPET_APPLICATION_H
#include <config.h>
#include <functional>
#include <memory>
#include <vector>
#include <spinscale/callback.h>
#include <spinscale/componentThread.h>
namespace smo {
class PuppetApplication
: public std::enable_shared_from_this<PuppetApplication>
{
public:
PuppetApplication(
const std::vector<std::shared_ptr<PuppetThread>> &threads);
~PuppetApplication() = default;
// Thread management methods
typedef std::function<void()> puppetThreadLifetimeMgmtOpCbFn;
void joltAllPuppetThreadsReq(
Callback<puppetThreadLifetimeMgmtOpCbFn> callback);
void startAllPuppetThreadsReq(
Callback<puppetThreadLifetimeMgmtOpCbFn> callback);
void pauseAllPuppetThreadsReq(
Callback<puppetThreadLifetimeMgmtOpCbFn> callback);
void resumeAllPuppetThreadsReq(
Callback<puppetThreadLifetimeMgmtOpCbFn> callback);
void exitAllPuppetThreadsReq(
Callback<puppetThreadLifetimeMgmtOpCbFn> callback);
protected:
// Collection of PuppetThread instances
std::vector<std::shared_ptr<PuppetThread>> componentThreads;
/**
* Indicates whether all puppet threads have been JOLTed at least once.
*
* JOLTing serves two critical purposes:
*
* 1. **Global Constructor Sequencing**: Since pthreads begin executing while
* global constructors are still being executed, globally defined pthreads
* cannot depend on global objects having been constructed. JOLTing is done
* by the CRT's main thread within main(), which provides a sequencing
* guarantee that global constructors have been called.
*
* 2. **shared_from_this Safety**: shared_from_this() requires a prior
* shared_ptr handle to be established. The global list of
* shared_ptr<ComponentThread> guarantees that at least one shared_ptr to
* each ComponentThread has been initialized before JOLTing occurs.
*
* This flag ensures that JOLTing happens exactly once and provides
* a synchronization point for the entire system initialization.
*/
bool threadsHaveBeenJolted = false;
private:
class PuppetThreadLifetimeMgmtOp;
};
} // namespace smo
#endif // PUPPET_APPLICATION_H
+204
View File
@@ -0,0 +1,204 @@
#include <iostream>
#include <spinscale/asynchronousContinuation.h>
#include <spinscale/asynchronousLoop.h>
#include <spinscale/callback.h>
#include <spinscale/puppetApplication.h>
#include <spinscale/componentThread.h>
namespace smo {
PuppetApplication::PuppetApplication(
const std::vector<std::shared_ptr<PuppetThread>> &threads)
: componentThreads(threads)
{
}
class PuppetApplication::PuppetThreadLifetimeMgmtOp
: public NonPostedAsynchronousContinuation<puppetThreadLifetimeMgmtOpCbFn>
{
public:
PuppetThreadLifetimeMgmtOp(
PuppetApplication &parent, unsigned int nThreads,
Callback<puppetThreadLifetimeMgmtOpCbFn> callback)
: NonPostedAsynchronousContinuation<puppetThreadLifetimeMgmtOpCbFn>(callback),
loop(nThreads),
parent(parent)
{}
public:
AsynchronousLoop loop;
PuppetApplication &parent;
public:
void joltAllPuppetThreadsReq1(
[[maybe_unused]] std::shared_ptr<PuppetThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
parent.threadsHaveBeenJolted = true;
callOriginalCb();
}
void executeGenericOpOnAllPuppetThreadsReq1(
[[maybe_unused]] std::shared_ptr<PuppetThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
callOriginalCb();
}
void exitAllPuppetThreadsReq1(
[[maybe_unused]] std::shared_ptr<PuppetThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
for (auto& thread : parent.componentThreads) {
thread->thread.join();
}
callOriginalCb();
}
};
void PuppetApplication::joltAllPuppetThreadsReq(
Callback<puppetThreadLifetimeMgmtOpCbFn> callback
)
{
if (threadsHaveBeenJolted)
{
std::cout << "Mrntt: All puppet threads already JOLTed. "
<< "Skipping JOLT request." << "\n";
callback.callbackFn();
return;
}
// If no threads, set flag and call callback immediately
if (componentThreads.size() == 0 && callback.callbackFn)
{
threadsHaveBeenJolted = true;
callback.callbackFn();
return;
}
// Create a counter to track when all threads have been jolted
auto request = std::make_shared<PuppetThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->joltThreadReq(
{request, std::bind(
&PuppetThreadLifetimeMgmtOp::joltAllPuppetThreadsReq1,
request.get(), request)});
}
}
void PuppetApplication::startAllPuppetThreadsReq(
Callback<puppetThreadLifetimeMgmtOpCbFn> callback
)
{
// If no threads, call callback immediately
if (componentThreads.size() == 0 && callback.callbackFn)
{
callback.callbackFn();
return;
}
// Create a counter to track when all threads have started
auto request = std::make_shared<PuppetThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->startThreadReq(
{request, std::bind(
&PuppetThreadLifetimeMgmtOp::executeGenericOpOnAllPuppetThreadsReq1,
request.get(), request)});
}
}
void PuppetApplication::pauseAllPuppetThreadsReq(
Callback<puppetThreadLifetimeMgmtOpCbFn> callback
)
{
// If no threads, call callback immediately
if (componentThreads.size() == 0 && callback.callbackFn)
{
callback.callbackFn();
return;
}
// Create a counter to track when all threads have paused
auto request = std::make_shared<PuppetThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->pauseThreadReq(
{request, std::bind(
&PuppetThreadLifetimeMgmtOp::executeGenericOpOnAllPuppetThreadsReq1,
request.get(), request)});
}
}
void PuppetApplication::resumeAllPuppetThreadsReq(
Callback<puppetThreadLifetimeMgmtOpCbFn> callback
)
{
// If no threads, call callback immediately
if (componentThreads.size() == 0 && callback.callbackFn)
{
callback.callbackFn();
return;
}
// Create a counter to track when all threads have resumed
auto request = std::make_shared<PuppetThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->resumeThreadReq(
{request, std::bind(
&PuppetThreadLifetimeMgmtOp::executeGenericOpOnAllPuppetThreadsReq1,
request.get(), request)});
}
}
void PuppetApplication::exitAllPuppetThreadsReq(
Callback<puppetThreadLifetimeMgmtOpCbFn> callback
)
{
// If no threads, call callback immediately
if (componentThreads.size() == 0 && callback.callbackFn)
{
callback.callbackFn();
return;
}
// Create a counter to track when all threads have exited
auto request = std::make_shared<PuppetThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->exitThreadReq(
{request, std::bind(
&PuppetThreadLifetimeMgmtOp::executeGenericOpOnAllPuppetThreadsReq1,
request.get(), request)});
}
}
} // namespace smo
+4 -40
View File
@@ -2,14 +2,13 @@
#define _MIND_H
#include <config.h>
#include <thread>
#include <functional>
#include <memory>
#include <unordered_map>
#include <string>
#include <spinscale/callback.h>
#include <spinscale/component.h>
#include <spinscale/puppetApplication.h>
#include <componentThread.h>
#include <mindThread.h>
#include <director/director.h>
@@ -18,7 +17,8 @@
namespace smo {
class Mind : public std::enable_shared_from_this<Mind>
class Mind
: public PuppetApplication
{
public:
Mind(void);
@@ -35,24 +35,9 @@ public:
// Get all this Mind's component threads.
std::vector<std::shared_ptr<MindThread>> getMindThreads() const;
// Thread management methods (moved from ComponentThread)
typedef std::function<void()> mindThreadLifetimeMgmtOpCbFn;
void joltAllMindThreadsReq(Callback<mindThreadLifetimeMgmtOpCbFn> callback);
void startAllMindThreadsReq(
Callback<mindThreadLifetimeMgmtOpCbFn> callback);
void pauseAllMindThreadsReq(
Callback<mindThreadLifetimeMgmtOpCbFn> callback);
void resumeAllMindThreadsReq(
Callback<mindThreadLifetimeMgmtOpCbFn> callback);
void exitAllMindThreadsReq(Callback<mindThreadLifetimeMgmtOpCbFn> callback);
// CPU distribution method
void distributeAndPinThreadsAcrossCpus();
private:
// Collection of ComponentThread instances (excluding marionette)
std::vector<std::shared_ptr<MindThread>> componentThreads;
public:
director::Director director;
simulator::Simulator canvas;
@@ -62,31 +47,10 @@ public:
private:
friend class body::Body;
/**
* Indicates whether all mind threads have been JOLTed at least once.
*
* JOLTing serves two critical purposes:
*
* 1. **Global Constructor Sequencing**: Since pthreads begin executing while
* global constructors are still being executed, globally defined pthreads
* cannot depend on global objects having been constructed. JOLTing is done
* by the CRT's main thread within main(), which provides a sequencing
* guarantee that global constructors have been called.
*
* 2. **shared_from_this Safety**: shared_from_this() requires a prior
* shared_ptr handle to be established. The global list of
* shared_ptr<ComponentThread> guarantees that at least one shared_ptr to
* each ComponentThread has been initialized before JOLTing occurs.
*
* This flag ensures that JOLTing happens exactly once and provides
* a synchronization point for the entire system initialization.
*/
bool threadsHaveBeenJolted = false,
bodyComponentInitialized = false;
bool bodyComponentInitialized = false;
private:
class MindLifetimeMgmtOp;
class MindThreadLifetimeMgmtOp;
};
namespace mind {
+29 -206
View File
@@ -14,7 +14,8 @@
namespace smo {
Mind::Mind(void)
: componentThreads{
: PuppetApplication(
std::vector<std::shared_ptr<PuppetThread>>{
std::make_shared<MindThread>(SmoThreadId::DIRECTOR, *this),
std::make_shared<MindThread>(SmoThreadId::SIMULATOR, *this),
std::make_shared<MindThread>(SmoThreadId::SUBCONSCIOUS, *this),
@@ -22,16 +23,16 @@ Mind::Mind(void)
#ifndef CONFIG_WORLD_USE_BODY_THREAD
, std::make_shared<MindThread>(SmoThreadId::WORLD, *this)
#endif
},
director(*this, componentThreads[0]),
canvas(*this, componentThreads[1]),
subconscious(*this, componentThreads[2]),
body(*this, componentThreads[3]),
}),
director(*this, std::static_pointer_cast<MindThread>(componentThreads[0])),
canvas(*this, std::static_pointer_cast<MindThread>(componentThreads[1])),
subconscious(*this, std::static_pointer_cast<MindThread>(componentThreads[2])),
body(*this, std::static_pointer_cast<MindThread>(componentThreads[3])),
world(*this,
#ifndef CONFIG_WORLD_USE_BODY_THREAD
componentThreads[4]
std::static_pointer_cast<MindThread>(componentThreads[4])
#else
componentThreads[3]
std::static_pointer_cast<MindThread>(componentThreads[3])
#endif
)
{
@@ -49,8 +50,11 @@ Mind::getComponentThread(ThreadId id) const
}
// Search through the vector for the thread with matching id
for (auto& thread : componentThreads) {
if (thread->id == id) { return thread; }
for (auto& thread : componentThreads)
{
if (thread->id == id) {
return std::static_pointer_cast<MindThread>(thread);
}
}
// Throw exception if no thread found
@@ -70,8 +74,11 @@ Mind::getComponentThread(const std::string& name) const
"getComponentThread");
}
for (auto& thread : componentThreads) {
if (thread->name == name) { return thread; }
for (auto& thread : componentThreads)
{
if (thread->name == name) {
return std::static_pointer_cast<MindThread>(thread);
}
}
// Throw exception if no thread found
@@ -82,7 +89,12 @@ Mind::getComponentThread(const std::string& name) const
std::vector<std::shared_ptr<MindThread>>
Mind::getMindThreads() const
{
return componentThreads;
std::vector<std::shared_ptr<MindThread>> mindThreads;
mindThreads.reserve(componentThreads.size());
for (auto& thread : componentThreads) {
mindThreads.push_back(std::static_pointer_cast<MindThread>(thread));
}
return mindThreads;
}
class Mind::MindLifetimeMgmtOp
@@ -106,7 +118,7 @@ public:
)
{
/* Jolt the threads, then start them */
parent.joltAllMindThreadsReq(
parent.joltAllPuppetThreadsReq(
{context, std::bind(
&MindLifetimeMgmtOp::initializeReq2,
context.get(), context)});
@@ -118,7 +130,7 @@ public:
{
std::cout << "Mrntt: All mind threads JOLTed." << "\n";
parent.startAllMindThreadsReq(
parent.startAllPuppetThreadsReq(
{context, std::bind(
&MindLifetimeMgmtOp::initializeReq3,
context.get(), context)});
@@ -170,7 +182,7 @@ public:
* otherwise they'll just enter their main loops and wait for control
* messages from mrntt after processing the exit request.
*/
parent.joltAllMindThreadsReq(
parent.joltAllPuppetThreadsReq(
{context, std::bind(
&MindLifetimeMgmtOp::finalizeReq3,
context.get(), context)});
@@ -182,7 +194,7 @@ public:
{
std::cout << "Mrntt: All mind threads JOLTed for finalization." << "\n";
parent.exitAllMindThreadsReq(
parent.exitAllPuppetThreadsReq(
{context, std::bind(
&MindLifetimeMgmtOp::finalizeReq4,
context.get(), context)});
@@ -254,193 +266,4 @@ void Mind::distributeAndPinThreadsAcrossCpus()
<< "across " << cpuCount << " CPUs\n";
}
class Mind::MindThreadLifetimeMgmtOp
: public NonPostedAsynchronousContinuation<mindThreadLifetimeMgmtOpCbFn>
{
public:
MindThreadLifetimeMgmtOp(
Mind &parent,unsigned int nThreads,
Callback<mindThreadLifetimeMgmtOpCbFn> callback)
: NonPostedAsynchronousContinuation<mindThreadLifetimeMgmtOpCbFn>(callback),
loop(nThreads),
parent(parent)
{}
public:
AsynchronousLoop loop;
Mind &parent;
public:
void joltAllMindThreadsReq1(
[[maybe_unused]] std::shared_ptr<MindThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
parent.threadsHaveBeenJolted = true;
callOriginalCb();
}
void executeGenericOpOnAllMindThreadsReq1(
[[maybe_unused]] std::shared_ptr<MindThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
callOriginalCb();
}
void exitAllMindThreadsReq1(
[[maybe_unused]] std::shared_ptr<MindThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
for (auto& thread : parent.componentThreads) {
thread->thread.join();
}
callOriginalCb();
}
};
void Mind::joltAllMindThreadsReq(
Callback<mindThreadLifetimeMgmtOpCbFn> callback
)
{
if (threadsHaveBeenJolted)
{
std::cout << "Mrntt: All mind threads already JOLTed. "
<< "Skipping JOLT request." << "\n";
callback.callbackFn();
return;
}
// If no threads, set flag and call callback immediately
if (componentThreads.size() == 0 && callback.callbackFn)
{
threadsHaveBeenJolted = true;
callback.callbackFn();
return;
}
// Create a counter to track when all threads have been jolted
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->joltThreadReq(
{request, std::bind(
&MindThreadLifetimeMgmtOp::joltAllMindThreadsReq1,
request.get(), request)});
}
}
// Thread management methods (moved from ComponentThread)
void Mind::startAllMindThreadsReq(
Callback<mindThreadLifetimeMgmtOpCbFn> callback
)
{
// If no threads, call callback immediately
if (componentThreads.size() == 0 && callback.callbackFn)
{
callback.callbackFn();
return;
}
// Create a counter to track when all threads have started
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->startThreadReq(
{request, std::bind(
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
request.get(), request)});
}
}
void Mind::pauseAllMindThreadsReq(
Callback<mindThreadLifetimeMgmtOpCbFn> callback
)
{
// If no threads, call callback immediately
if (componentThreads.size() == 0 && callback.callbackFn)
{
callback.callbackFn();
return;
}
// Create a counter to track when all threads have paused
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->pauseThreadReq(
{request, std::bind(
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
request.get(), request)});
}
}
void Mind::resumeAllMindThreadsReq(
Callback<mindThreadLifetimeMgmtOpCbFn> callback
)
{
// If no threads, call callback immediately
if (componentThreads.size() == 0 && callback.callbackFn)
{
callback.callbackFn();
return;
}
// Create a counter to track when all threads have resumed
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->resumeThreadReq(
{request, std::bind(
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
request.get(), request)});
}
}
void Mind::exitAllMindThreadsReq(
Callback<mindThreadLifetimeMgmtOpCbFn> callback
)
{
// If no threads, call callback immediately
if (componentThreads.size() == 0 && callback.callbackFn)
{
callback.callbackFn();
return;
}
// Create a counter to track when all threads have exited
auto request = std::make_shared<MindThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->exitThreadReq(
{request, std::bind(
&MindThreadLifetimeMgmtOp::executeGenericOpOnAllMindThreadsReq1,
request.get(), request)});
}
}
} // namespace smo