ComponentThread: Remove lambdas; use standard async pattern

We've finally cleaned this code up by removing these dirty lambdas.
Next we do the Mind:: class sequences.
This commit is contained in:
2025-09-11 18:41:45 -04:00
parent b8c931397d
commit fb17c51ef6
3 changed files with 278 additions and 119 deletions
+259 -107
View File
@@ -4,6 +4,7 @@
#include <sched.h> #include <sched.h>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <opts.h> #include <opts.h>
#include <asynchronousContinuation.h>
#include <mind.h> #include <mind.h>
#include <componentThread.h> #include <componentThread.h>
#include <marionette/marionette.h> #include <marionette/marionette.h>
@@ -90,112 +91,285 @@ void ComponentThread::main(ComponentThread& self)
<< ": Unknown exception occurred" << "\n"; << ": Unknown exception occurred" << "\n";
} }
if (sendExceptionInd) { mrntt::mrntt->exceptionInd(self); } if (sendExceptionInd)
{ mrntt::mrntt->exceptionInd(self.shared_from_this()); }
} }
std::cout << self.name << ":" << __func__ << ": Exited event loop" << "\n"; std::cout << self.name << ":" << __func__ << ": Exited event loop" << "\n";
} }
// Thread management method implementations class ComponentThread::ThreadLifetimeMgmtOp
void ComponentThread::startThreadReq(std::function<void()> callback) : public TargetedAsynchronousContinuation<threadLifetimeMgmtOpCbFn>
{ {
this->getIoService().post([this, caller = getSelf(), callback]() public:
ThreadLifetimeMgmtOp(
const std::shared_ptr<ComponentThread> &caller,
const std::shared_ptr<ComponentThread> &target,
threadLifetimeMgmtOpCbFn callback)
: TargetedAsynchronousContinuation<threadLifetimeMgmtOpCbFn>(
caller, callback),
target(target)
{}
void callOriginalCbFn(void)
{ {
std::cout << "Thread '" << name << "': handling startThread." << "\n"; if (originalCbFn) {
caller->getIoService().post(originalCbFn);
}
}
public:
const std::shared_ptr<ComponentThread> target;
public:
void joltThreadReq1(
[[maybe_unused]] std::shared_ptr<ThreadLifetimeMgmtOp> context
)
{
std::cout << "Thread '" << target->name << "': handling JOLT request."
<< "\n";
target->io_service.stop();
callOriginalCbFn();
}
void startThreadReq1(
[[maybe_unused]] std::shared_ptr<ThreadLifetimeMgmtOp> context
)
{
std::cout << "Thread '" << target->name << "': handling startThread."
<< "\n";
// Execute private setup sequence here // Execute private setup sequence here
// This is where each thread would implement its specific initialization // This is where each thread would implement its specific initialization
if (callback) { callOriginalCbFn();
caller->getIoService().post(callback); }
}
}); void exitThreadReq1_mainQueue(
} [[maybe_unused]] std::shared_ptr<ThreadLifetimeMgmtOp> context
)
{
std::cout << "Thread '" << target->name << "': handling exitThread "
"(main queue)." << std::endl;
target->cleanup();
target->io_service.stop();
callOriginalCbFn();
}
void exitThreadReq1_pauseQueue(
[[maybe_unused]] std::shared_ptr<ThreadLifetimeMgmtOp> context
)
{
std::cout << "Thread '" << target->name << "': handling exitThread "
"(pause queue)." << std::endl;
target->cleanup();
target->pause_io_service.stop();
target->io_service.stop();
callOriginalCbFn();
}
void pauseThreadReq1(
[[maybe_unused]] std::shared_ptr<ThreadLifetimeMgmtOp> context
)
{
std::cout << "Thread '" << target->name << "': handling pauseThread."
<< std::endl;
/* We have to invoke the callback here before moving on because
* our next operation is going to block the thread, so it won't
* have a chance to invoke the callback until it's unblocked.
*/
callOriginalCbFn();
target->pause_io_service.reset();
target->pause_io_service.run();
}
void resumeThreadReq1(
[[maybe_unused]] std::shared_ptr<ThreadLifetimeMgmtOp> context
)
{
std::cout << "Thread '" << target->name << "': handling resumeThread."
<< std::endl;
target->pause_io_service.stop();
callOriginalCbFn();
}
};
void ComponentThread::cleanup(void) void ComponentThread::cleanup(void)
{ {
this->keepLooping = false; this->keepLooping = false;
} }
void ComponentThread::exitThreadReq(std::function<void()> callback) void ComponentThread::joltThreadReq(threadLifetimeMgmtOpCbFn callback)
{ {
// Post to the main io_service /** EXPLANATION:
this->getIoService().post([this, caller = getSelf(), callback]() * We can't use shared_from_this() here because JOLTing occurs prior to
* TLS being set up.
*
* We also can't use getSelf() as yet for the same reason: getSelf()
* requires TLS to be set up.
*
* To obtain a sh_ptr to the caller, we just supply the mrntt thread since
* JOLT is always invoked by the mrntt thread. The JOLT sequence that the
* CRT main() function invokes on the mrntt thread is special since it
* supplies cmdline args and envp.
*
* To obtain a sh_ptr to the target thread, we explicitly look it up in the
* Mind object's collection of component threads.
*/
if (id == ComponentThread::MRNTT)
{ {
std::cout << "Thread '" << name << "': handling exitThread " throw std::runtime_error(std::string(__func__)
"(main queue)." << std::endl; + ": invoked on mrntt thread");
}
cleanup(); std::shared_ptr<ComponentThread>
mrntt = mrntt::mrntt,
target = parent.getComponentThread(id);
// Stop the main io_service to exit the thread auto request = std::make_shared<ThreadLifetimeMgmtOp>(
io_service.stop(); mrntt, target, callback);
if (callback) { caller->getIoService().post(callback); }
});
// Also post to the pause io_service this->getIoService().post(
this->pause_io_service.post([this, caller = getSelf(), callback]() std::bind(
{ &ThreadLifetimeMgmtOp::joltThreadReq1,
std::cout << "Thread '" << name << "': handling exitThread " request.get(), request));
"(pause queue)." << std::endl;
cleanup();
// Stop both io_services to exit the thread
pause_io_service.stop();
io_service.stop();
if (callback) { caller->getIoService().post(callback); }
});
} }
void ComponentThread::pauseThreadReq(std::function<void()> callback) // Thread management method implementations
void ComponentThread::startThreadReq(threadLifetimeMgmtOpCbFn callback)
{ {
this->getIoService().post([this, caller = getSelf(), callback]() std::shared_ptr<ComponentThread> caller = getSelf();
{ auto request = std::make_shared<ThreadLifetimeMgmtOp>(
std::cout << "Thread '" << name << "': handling pauseThread." caller, shared_from_this(), callback);
<< std::endl;
if (callback) { this->getIoService().post(
caller->getIoService().post(callback); std::bind(
} &ThreadLifetimeMgmtOp::startThreadReq1,
request.get(), request));
// Reset the pause io_service before running to ensure it can run again
pause_io_service.reset();
// Run the pause io_service to block this thread
pause_io_service.run();
});
} }
void ComponentThread::resumeThreadReq(std::function<void()> callback) void ComponentThread::exitThreadReq(threadLifetimeMgmtOpCbFn callback)
{ {
std::shared_ptr<ComponentThread> caller = getSelf();
auto request = std::make_shared<ThreadLifetimeMgmtOp>(
caller, shared_from_this(), callback);
this->getIoService().post(
std::bind(
&ThreadLifetimeMgmtOp::exitThreadReq1_mainQueue,
request.get(), request));
this->pause_io_service.post(
std::bind(
&ThreadLifetimeMgmtOp::exitThreadReq1_pauseQueue,
request.get(), request));
}
void ComponentThread::pauseThreadReq(threadLifetimeMgmtOpCbFn callback)
{
if (id == ComponentThread::MRNTT)
{
throw std::runtime_error(std::string(__func__)
+ ": invoked on mrntt thread");
}
std::shared_ptr<ComponentThread> caller = getSelf();
auto request = std::make_shared<ThreadLifetimeMgmtOp>(
caller, shared_from_this(), callback);
this->getIoService().post(
std::bind(
&ThreadLifetimeMgmtOp::pauseThreadReq1,
request.get(), request));
}
void ComponentThread::resumeThreadReq(threadLifetimeMgmtOpCbFn callback)
{
if (id == ComponentThread::MRNTT)
{
throw std::runtime_error(std::string(__func__)
+ ": invoked on mrntt thread");
}
// Post to the pause_io_service to unblock the paused thread // Post to the pause_io_service to unblock the paused thread
pause_io_service.post([this, caller = getSelf(), callback]() std::shared_ptr<ComponentThread> caller = getSelf();
{ auto request = std::make_shared<ThreadLifetimeMgmtOp>(
std::cout << "Thread '" << name << "': handling resumeThread." caller, shared_from_this(), callback);
<< std::endl;
if (callback) { this->pause_io_service.post(
caller->getIoService().post(callback); std::bind(
} &ThreadLifetimeMgmtOp::resumeThreadReq1,
request.get(), request));
// Stop the pause_io_service to unblock the thread
pause_io_service.stop();
});
} }
void ComponentThread::joltThreadReq(std::function<void()> callback) class ComponentThread::MindShutdownIndOp
: public TargetedAsynchronousContinuation<mindShutdownIndOpCbFn>
{ {
this->getIoService().post([this, caller = getSelf(), callback]() public:
MindShutdownIndOp(
const std::shared_ptr<ComponentThread> &caller,
mindShutdownIndOpCbFn callback)
: TargetedAsynchronousContinuation<mindShutdownIndOpCbFn>(
caller, callback)
{}
public:
void mindShutdownInd1_exception(
[[maybe_unused]] std::shared_ptr<MindShutdownIndOp> context
)
{ {
std::cout << "Thread '" << name << "': handling JOLT request." << "\n"; std::cerr << "Mrntt: Exception occurred: in thread "
<< context->caller->name << ". Killing Salmanoff." << "\n";
// Stop the main io_service to jolt the thread /** EXPLANATION:
io_service.stop(); * An exception has occurred in one of a mind's threads. We need to
* shut down all of that particular mind's threads.
*/
context->caller->parent.finalizeReq(
std::bind(
&MindShutdownIndOp::mindShutdownInd2,
context.get(), context));
}
if (callback) { void mindShutdownInd1_userShutdown(
caller->getIoService().post(callback); [[maybe_unused]] std::shared_ptr<MindShutdownIndOp> context
} )
}); {
} std::cerr << "Mrntt: User requested shutdown (SIGINT)."
<< " Killing Salmanoff." << "\n";
/** EXPLANATION:
* A user has requested a shutdown. We need to shut down all of the
* threads in all running Minds.
*
* FIXME:
* So this should ideally be a loop
* through all running Minds, calling finalizeReq on each one.
*/
context->caller->parent.finalizeReq(
std::bind(
&MindShutdownIndOp::mindShutdownInd2,
context.get(), context));
}
void mindShutdownInd2(
[[maybe_unused]] std::shared_ptr<MindShutdownIndOp> context
)
{
/** FIXME:
* When we eventually support multiple minds, we should remove this
* since it causes marionette to exit, even if there are other minds
* that are still running.
*/
smo::mrntt::exitMarionetteLoop();
}
};
/* This shouldn't take a callback because the caller shouldn't expect to /* This shouldn't take a callback because the caller shouldn't expect to
* Mrntt to send a reply signal to it. Sending this Indication means that * Mrntt to send a reply signal to it. Sending this Indication means that
@@ -206,34 +380,24 @@ void ComponentThread::joltThreadReq(std::function<void()> callback)
* Even if Mrntt sent a RDY response, the caller shouldn't actually be executing * Even if Mrntt sent a RDY response, the caller shouldn't actually be executing
* any longer to receive it anyway. * any longer to receive it anyway.
*/ */
void ComponentThread::exceptionInd(ComponentThread& thread) void ComponentThread::exceptionInd(
const std::shared_ptr<ComponentThread> &faultyThread
)
{ {
if (this->id != ComponentThread::MRNTT) if (this->id != ComponentThread::MRNTT)
{ {
throw std::runtime_error(std::string(__func__) throw std::runtime_error(std::string(__func__)
+ ": invoked on non-mrntt thread " + thread.name); + ": invoked on non-mrntt thread " + faultyThread->name);
} }
auto request = std::make_shared<MindShutdownIndOp>(
faultyThread, nullptr);
// Post the exception to the mrntt thread. // Post the exception to the mrntt thread.
this->getIoService().post( this->getIoService().post(
[&thread]() std::bind(
{ &MindShutdownIndOp::mindShutdownInd1_exception,
std::cerr << "Mrntt: Exception occurred: in thread " request.get(), request));
<< thread.name << ". Killing Salmanoff." << "\n";
/** EXPLANATION:
* An exception has occurred in one of a mind's threads. We need to
* shut down all of that particular mind's threads.
*/
thread.parent.finalizeReq([]() {
/** FIXME:
* When we eventually support multiple minds, we should remove this
* since it causes marionette to exit, even if there are other minds
* that are still running.
*/
smo::mrntt::exitMarionetteLoop();
});
});
} }
void ComponentThread::userShutdownInd() void ComponentThread::userShutdownInd()
@@ -244,26 +408,14 @@ void ComponentThread::userShutdownInd()
+ ": invoked on non-mrntt thread " + this->name); + ": invoked on non-mrntt thread " + this->name);
} }
auto request = std::make_shared<MindShutdownIndOp>(
ComponentThread::getMrntt(), nullptr);
// Post the user shutdown to the mrntt thread. // Post the user shutdown to the mrntt thread.
this->getIoService().post( this->getIoService().post(
[this]() std::bind(
{ &MindShutdownIndOp::mindShutdownInd1_userShutdown,
std::cerr << "Mrntt: User requested shutdown (SIGINT)." request.get(), request));
<< " Killing Salmanoff." << "\n";
/** EXPLANATION:
* A user has requested a shutdown. We need to shut down all of the
* threads in all running Minds.
*/
parent.finalizeReq([]() {
/** FIXME:
* When we eventually support multiple minds, we should remove this
* since it causes marionette to exit, even if there are other minds
* that are still running.
*/
smo::mrntt::exitMarionetteLoop();
});
});
} }
// CPU management method implementations // CPU management method implementations
+13 -6
View File
@@ -54,11 +54,13 @@ public:
typedef void (mainFn)(ComponentThread &self); typedef void (mainFn)(ComponentThread &self);
static mainFn main, marionetteMain; static mainFn main, marionetteMain;
typedef std::function<void()> threadLifetimeMgmtOpCbFn;
// Thread management methods // Thread management methods
void startThreadReq(std::function<void()> callback = nullptr); void startThreadReq(threadLifetimeMgmtOpCbFn callback);
void exitThreadReq(std::function<void()> callback = nullptr); void exitThreadReq(threadLifetimeMgmtOpCbFn callback);
void pauseThreadReq(std::function<void()> callback = nullptr); void pauseThreadReq(threadLifetimeMgmtOpCbFn callback);
void resumeThreadReq(std::function<void()> callback = nullptr); void resumeThreadReq(threadLifetimeMgmtOpCbFn callback);
/** /**
* JOLTs this thread to begin processing after global initialization. * JOLTs this thread to begin processing after global initialization.
* *
@@ -66,7 +68,7 @@ public:
* event loops and set up TLS vars after all global constructors have * event loops and set up TLS vars after all global constructors have
* completed. This prevents race conditions during system startup. * completed. This prevents race conditions during system startup.
*/ */
void joltThreadReq(std::function<void()> callback = nullptr); void joltThreadReq(threadLifetimeMgmtOpCbFn callback);
// CPU management methods // CPU management methods
static int getAvailableCpuCount(); static int getAvailableCpuCount();
@@ -82,8 +84,9 @@ public:
N_ITEMS N_ITEMS
}; };
typedef std::function<void()> mindShutdownIndOpCbFn;
// Intentionally doesn't take a callback. // Intentionally doesn't take a callback.
void exceptionInd(ComponentThread& thread); void exceptionInd(const std::shared_ptr<ComponentThread> &faultyThread);
// Intentionally doesn't take a callback. // Intentionally doesn't take a callback.
void userShutdownInd(); void userShutdownInd();
@@ -124,6 +127,10 @@ public:
return threadNames[id]; return threadNames[id];
} }
private:
class ThreadLifetimeMgmtOp;
class MindShutdownIndOp;
}; };
namespace mrntt { namespace mrntt {
+6 -6
View File
@@ -47,7 +47,7 @@ void ComponentThread::marionetteMain(ComponentThread& self)
self.initializeTls(); self.initializeTls();
mrntt::exitCode = EXIT_SUCCESS; mrntt::exitCode = EXIT_SUCCESS;
static boost::asio::signal_set signals(self.getIoService(), SIGINT); static boost::asio::signal_set signals(self.getIoService(), SIGINT);
bool callFinalizeReq = false, callShutdownSalmanoff = false; bool callFinalizeReq = false, callShutdownSalmanoffReq = false;
try { try {
// Register SIGINT (Ctrl+C) and SIGSEGV handlers // Register SIGINT (Ctrl+C) and SIGSEGV handlers
@@ -144,7 +144,7 @@ void ComponentThread::marionetteMain(ComponentThread& self)
if (sendExceptionInd) if (sendExceptionInd)
{ {
mrntt::exitCode = EXIT_FAILURE; mrntt::exitCode = EXIT_FAILURE;
self.exceptionInd(self); self.exceptionInd(self.shared_from_this());
} }
} }
@@ -163,23 +163,23 @@ void ComponentThread::marionetteMain(ComponentThread& self)
} }
*out << outUsageMsg << e.what() << std::endl; *out << outUsageMsg << e.what() << std::endl;
callShutdownSalmanoff = callFinalizeReq = true; callShutdownSalmanoffReq = callFinalizeReq = true;
} }
catch (const std::exception& e) catch (const std::exception& e)
{ {
std::cerr << __func__ << ": Exception occurred: " << e.what() std::cerr << __func__ << ": Exception occurred: " << e.what()
<< std::endl; << std::endl;
mrntt::exitCode = EXIT_FAILURE; mrntt::exitCode = EXIT_FAILURE;
callShutdownSalmanoff = callFinalizeReq = true; callShutdownSalmanoffReq = callFinalizeReq = true;
} }
catch (...) catch (...)
{ {
std::cerr << __func__ << ": Unknown exception occurred" << std::endl; std::cerr << __func__ << ": Unknown exception occurred" << std::endl;
mrntt::exitCode = EXIT_FAILURE; mrntt::exitCode = EXIT_FAILURE;
callShutdownSalmanoff = callFinalizeReq = true; callShutdownSalmanoffReq = callFinalizeReq = true;
} }
if (callShutdownSalmanoff) if (callShutdownSalmanoffReq)
{ {
shutdownSalmanoff( shutdownSalmanoff(
[](bool success) [](bool success)