91ccd16b33
This makes the initialization sequence much cleaner and conceptually well encapsulated. We also now dynamically allocate the Mind objects. They're allocated dynamically by Mrntt inside of initializeReq. This means that we no longer have to worry about jolting and cleaning up the running threads of global mind object even when we never explicitly called Mind.initializeReq. Along with other conceptual improvements to our abstractions, this patch also gets us to a real "end of program initialization" point for the first time.
475 lines
12 KiB
C++
475 lines
12 KiB
C++
#include <unistd.h>
|
|
#include <iostream>
|
|
#include <pthread.h>
|
|
#include <sched.h>
|
|
#include <boost/asio.hpp>
|
|
#include <opts.h>
|
|
#include <asynchronousContinuation.h>
|
|
#include <mind.h>
|
|
#include <mindManager/mindManager.h>
|
|
#include <componentThread.h>
|
|
#include <marionette/marionette.h>
|
|
|
|
namespace smo {
|
|
|
|
thread_local std::shared_ptr<ComponentThread> thisComponentThread;
|
|
|
|
// Implementation of static method
|
|
std::shared_ptr<MarionetteThread> ComponentThread::getMrntt()
|
|
{
|
|
return mrntt::thread;
|
|
}
|
|
|
|
void MarionetteThread::initializeTls(void)
|
|
{
|
|
thisComponentThread = shared_from_this();
|
|
}
|
|
|
|
void MindThread::initializeTls(void)
|
|
{
|
|
thisComponentThread = shared_from_this();
|
|
}
|
|
|
|
const std::shared_ptr<ComponentThread> ComponentThread::getSelf(void)
|
|
{
|
|
if (!thisComponentThread)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": TLS not initialized");
|
|
}
|
|
|
|
return thisComponentThread;
|
|
}
|
|
|
|
void MindThread::main(MindThread& self)
|
|
{
|
|
|
|
if (OptionParser::getOptions().verbose)
|
|
{
|
|
std::cout << self.name << ":" << __func__ << ": Waiting for JOLT"
|
|
<<"\n";
|
|
}
|
|
|
|
self.getIoService().run();
|
|
self.initializeTls();
|
|
|
|
std::cout << self.name << ":" << __func__ << ": Entering event loop" <<"\n";
|
|
|
|
/* We loop here because when an exception is caught, we need to first catch
|
|
* it in the catch blocks. We bubble the exception to mrntt in the catch
|
|
* blocks, and then we loop here to await control messages from mrntt.
|
|
*
|
|
* We can't just exit on our own. Rather, we must wait for mrntt to tell us
|
|
* to exit. When we wish to finally exit, we set keepLooping to false.
|
|
*/
|
|
for (self.keepLooping = true; self.keepLooping;)
|
|
{
|
|
bool sendExceptionInd = false;
|
|
|
|
try {
|
|
/** EXPLANATION:
|
|
* This reset() call is crucial for async bridging patterns
|
|
* to work.
|
|
* When the outermost thread's io_service is stop()ped (e.g.,
|
|
* from JOLT sequence), it won't process any new work until
|
|
* reset() is called, even if nested async operations try to
|
|
* post work to it. This means async bridges invoked from
|
|
* the outermost thread main sequence won't work until this
|
|
* reset() call.
|
|
*/
|
|
self.getIoService().reset();
|
|
self.getIoService().run();
|
|
}
|
|
catch (const std::exception& e)
|
|
{
|
|
sendExceptionInd = true;
|
|
std::cerr << self.name << ":" << __func__
|
|
<< ": Exception occurred: " << e.what() << "\n";
|
|
}
|
|
catch (...)
|
|
{
|
|
sendExceptionInd = true;
|
|
std::cerr << self.name << ":" << __func__
|
|
<< ": Unknown exception occurred" << "\n";
|
|
}
|
|
|
|
if (sendExceptionInd)
|
|
{ mrntt::thread->exceptionInd(self.shared_from_this()); }
|
|
}
|
|
|
|
std::cout << self.name << ":" << __func__ << ": Exited event loop" << "\n";
|
|
}
|
|
|
|
class MindThread::ThreadLifetimeMgmtOp
|
|
: public TargetedAsynchronousContinuation<threadLifetimeMgmtOpCbFn>
|
|
{
|
|
public:
|
|
ThreadLifetimeMgmtOp(
|
|
const std::shared_ptr<ComponentThread> &caller,
|
|
const std::shared_ptr<MindThread> &target,
|
|
threadLifetimeMgmtOpCbFn callback)
|
|
: TargetedAsynchronousContinuation<threadLifetimeMgmtOpCbFn>(
|
|
caller, callback),
|
|
target(target)
|
|
{}
|
|
|
|
void callOriginalCbFn(void)
|
|
{
|
|
if (originalCbFn) {
|
|
caller->getIoService().post(originalCbFn);
|
|
}
|
|
}
|
|
|
|
public:
|
|
const std::shared_ptr<MindThread> target;
|
|
|
|
public:
|
|
void joltThreadReq1(
|
|
[[maybe_unused]] std::shared_ptr<ThreadLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
std::cout << "Thread '" << target->name << "': handling JOLT request."
|
|
<< "\n";
|
|
|
|
target->io_service.stop();
|
|
callOriginalCbFn();
|
|
}
|
|
|
|
void startThreadReq1(
|
|
[[maybe_unused]] std::shared_ptr<ThreadLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
std::cout << "Thread '" << target->name << "': handling startThread."
|
|
<< "\n";
|
|
|
|
// Execute private setup sequence here
|
|
// This is where each thread would implement its specific initialization
|
|
|
|
callOriginalCbFn();
|
|
}
|
|
|
|
void exitThreadReq1_mainQueue(
|
|
[[maybe_unused]] std::shared_ptr<ThreadLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
std::cout << "Thread '" << target->name << "': handling exitThread "
|
|
"(main queue)." << std::endl;
|
|
|
|
target->cleanup();
|
|
target->io_service.stop();
|
|
callOriginalCbFn();
|
|
}
|
|
|
|
void exitThreadReq1_pauseQueue(
|
|
[[maybe_unused]] std::shared_ptr<ThreadLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
std::cout << "Thread '" << target->name << "': handling exitThread "
|
|
"(pause queue)." << std::endl;
|
|
|
|
target->cleanup();
|
|
target->pause_io_service.stop();
|
|
target->io_service.stop();
|
|
callOriginalCbFn();
|
|
}
|
|
|
|
void pauseThreadReq1(
|
|
[[maybe_unused]] std::shared_ptr<ThreadLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
std::cout << "Thread '" << target->name << "': handling pauseThread."
|
|
<< std::endl;
|
|
|
|
/* We have to invoke the callback here before moving on because
|
|
* our next operation is going to block the thread, so it won't
|
|
* have a chance to invoke the callback until it's unblocked.
|
|
*/
|
|
callOriginalCbFn();
|
|
target->pause_io_service.reset();
|
|
target->pause_io_service.run();
|
|
}
|
|
|
|
void resumeThreadReq1(
|
|
[[maybe_unused]] std::shared_ptr<ThreadLifetimeMgmtOp> context
|
|
)
|
|
{
|
|
std::cout << "Thread '" << target->name << "': handling resumeThread."
|
|
<< std::endl;
|
|
|
|
target->pause_io_service.stop();
|
|
callOriginalCbFn();
|
|
}
|
|
};
|
|
|
|
void ComponentThread::cleanup(void)
|
|
{
|
|
this->keepLooping = false;
|
|
}
|
|
|
|
void MindThread::joltThreadReq(threadLifetimeMgmtOpCbFn callback)
|
|
{
|
|
/** EXPLANATION:
|
|
* We can't use shared_from_this() here because JOLTing occurs prior to
|
|
* TLS being set up.
|
|
*
|
|
* We also can't use getSelf() as yet for the same reason: getSelf()
|
|
* requires TLS to be set up.
|
|
*
|
|
* To obtain a sh_ptr to the caller, we just supply the mrntt thread since
|
|
* JOLT is always invoked by the mrntt thread. The JOLT sequence that the
|
|
* CRT main() function invokes on the mrntt thread is special since it
|
|
* supplies cmdline args and envp.
|
|
*
|
|
* To obtain a sh_ptr to the target thread, we explicitly look it up in the
|
|
* Mind object's collection of component threads.
|
|
*/
|
|
if (id == ComponentThread::MRNTT)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": invoked on mrntt thread");
|
|
}
|
|
|
|
std::shared_ptr<MarionetteThread> mrntt = mrntt::thread;
|
|
std::shared_ptr<MindThread> target = getParent().getComponentThread(id);
|
|
|
|
auto request = std::make_shared<ThreadLifetimeMgmtOp>(
|
|
mrntt, target, callback);
|
|
|
|
this->getIoService().post(
|
|
std::bind(
|
|
&ThreadLifetimeMgmtOp::joltThreadReq1,
|
|
request.get(), request));
|
|
}
|
|
|
|
// Thread management method implementations
|
|
void MindThread::startThreadReq(threadLifetimeMgmtOpCbFn callback)
|
|
{
|
|
std::shared_ptr<ComponentThread> caller = getSelf();
|
|
auto request = std::make_shared<ThreadLifetimeMgmtOp>(
|
|
caller, shared_from_this(), callback);
|
|
|
|
this->getIoService().post(
|
|
std::bind(
|
|
&ThreadLifetimeMgmtOp::startThreadReq1,
|
|
request.get(), request));
|
|
}
|
|
|
|
void MindThread::exitThreadReq(threadLifetimeMgmtOpCbFn callback)
|
|
{
|
|
std::shared_ptr<ComponentThread> caller = getSelf();
|
|
auto request = std::make_shared<ThreadLifetimeMgmtOp>(
|
|
caller, shared_from_this(), callback);
|
|
|
|
this->getIoService().post(
|
|
std::bind(
|
|
&ThreadLifetimeMgmtOp::exitThreadReq1_mainQueue,
|
|
request.get(), request));
|
|
|
|
pause_io_service.post(
|
|
std::bind(
|
|
&ThreadLifetimeMgmtOp::exitThreadReq1_pauseQueue,
|
|
request.get(), request));
|
|
}
|
|
|
|
void MindThread::pauseThreadReq(threadLifetimeMgmtOpCbFn callback)
|
|
{
|
|
if (id == ComponentThread::MRNTT)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": invoked on mrntt thread");
|
|
}
|
|
|
|
std::shared_ptr<ComponentThread> caller = getSelf();
|
|
auto request = std::make_shared<ThreadLifetimeMgmtOp>(
|
|
caller, shared_from_this(), callback);
|
|
|
|
this->getIoService().post(
|
|
std::bind(
|
|
&ThreadLifetimeMgmtOp::pauseThreadReq1,
|
|
request.get(), request));
|
|
}
|
|
|
|
void MindThread::resumeThreadReq(threadLifetimeMgmtOpCbFn callback)
|
|
{
|
|
if (id == ComponentThread::MRNTT)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": invoked on mrntt thread");
|
|
}
|
|
|
|
// Post to the pause_io_service to unblock the paused thread
|
|
std::shared_ptr<ComponentThread> caller = getSelf();
|
|
auto request = std::make_shared<ThreadLifetimeMgmtOp>(
|
|
caller, shared_from_this(), callback);
|
|
|
|
pause_io_service.post(
|
|
std::bind(
|
|
&ThreadLifetimeMgmtOp::resumeThreadReq1,
|
|
request.get(), request));
|
|
}
|
|
|
|
class MindThread::MindShutdownIndOp
|
|
: public TargetedAsynchronousContinuation<mindShutdownIndOpCbFn>
|
|
{
|
|
public:
|
|
MindShutdownIndOp(
|
|
const std::shared_ptr<ComponentThread> &caller,
|
|
mindShutdownIndOpCbFn callback)
|
|
: TargetedAsynchronousContinuation<mindShutdownIndOpCbFn>(
|
|
caller, callback)
|
|
{}
|
|
|
|
public:
|
|
void mindShutdownInd1_exception(
|
|
[[maybe_unused]] std::shared_ptr<MindShutdownIndOp> context
|
|
)
|
|
{
|
|
std::cerr << "Mrntt: Exception occurred: in thread "
|
|
<< context->caller->name << ". Killing Salmanoff." << "\n";
|
|
|
|
/** EXPLANATION:
|
|
* An exception has occurred in one of a mind's threads. We need to
|
|
* shut down all of that particular mind's threads.
|
|
*/
|
|
smo::mind::globalMind->finalizeReq(
|
|
std::bind(
|
|
&MindShutdownIndOp::mindShutdownInd2,
|
|
context.get(), context));
|
|
}
|
|
|
|
void mindShutdownInd1_userShutdown(
|
|
[[maybe_unused]] std::shared_ptr<MindShutdownIndOp> context
|
|
)
|
|
{
|
|
std::cerr << "Mrntt: User requested shutdown (SIGINT)."
|
|
<< " Killing Salmanoff." << "\n";
|
|
|
|
/** EXPLANATION:
|
|
* A user has requested a shutdown. We need to shut down all of the
|
|
* threads in all running Minds.
|
|
*
|
|
* FIXME:
|
|
* So this should ideally be a loop
|
|
* through all running Minds, calling finalizeReq on each one.
|
|
*/
|
|
smo::mind::globalMind->finalizeReq(
|
|
std::bind(
|
|
&MindShutdownIndOp::mindShutdownInd2,
|
|
context.get(), context));
|
|
}
|
|
|
|
void mindShutdownInd2(
|
|
[[maybe_unused]] std::shared_ptr<MindShutdownIndOp> context
|
|
)
|
|
{
|
|
std::cout << "Mrntt: About to exit marionette loop." << "\n";
|
|
/** FIXME:
|
|
* When we eventually support multiple minds, we should remove this
|
|
* since it causes marionette to exit, even if there are other minds
|
|
* that are still running.
|
|
*/
|
|
smo::mrntt::exitMarionetteLoop();
|
|
}
|
|
|
|
};
|
|
|
|
/* This shouldn't take a callback because the caller shouldn't expect to
|
|
* Mrntt to send a reply signal to it. Sending this Indication means that
|
|
* Mrntt will send the calling thread an exitThreadReq. When the caller
|
|
* processes that exitThreadReq(), the caller will exit its event loop and then
|
|
* terminate.
|
|
*
|
|
* Even if Mrntt sent a RDY response, the caller shouldn't actually be executing
|
|
* any longer to receive it anyway.
|
|
*/
|
|
void ComponentThread::exceptionInd(
|
|
const std::shared_ptr<ComponentThread> &faultyThread
|
|
)
|
|
{
|
|
if (this->id != ComponentThread::MRNTT)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": invoked on non-mrntt thread " + faultyThread->name);
|
|
}
|
|
|
|
auto request = std::make_shared<MindThread::MindShutdownIndOp>(
|
|
faultyThread, nullptr);
|
|
|
|
// Post the exception to the mrntt thread.
|
|
this->getIoService().post(
|
|
std::bind(
|
|
&MindThread::MindShutdownIndOp::mindShutdownInd1_exception,
|
|
request.get(), request));
|
|
}
|
|
|
|
void ComponentThread::userShutdownInd()
|
|
{
|
|
if (this->id != ComponentThread::MRNTT)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": invoked on non-mrntt thread " + this->name);
|
|
}
|
|
|
|
auto request = std::make_shared<MindThread::MindShutdownIndOp>(
|
|
ComponentThread::getMrntt(), nullptr);
|
|
|
|
// Post the user shutdown to the mrntt thread.
|
|
this->getIoService().post(
|
|
std::bind(
|
|
&MindThread::MindShutdownIndOp::mindShutdownInd1_userShutdown,
|
|
request.get(), request));
|
|
}
|
|
|
|
// CPU management method implementations
|
|
int ComponentThread::getAvailableCpuCount()
|
|
{
|
|
int cpuCount = sysconf(_SC_NPROCESSORS_ONLN);
|
|
if (cpuCount <= 0)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": Failed to determine CPU count");
|
|
}
|
|
|
|
// Check if std::thread::hardware_concurrency() matches sysconf result
|
|
unsigned int hwConcurrency = std::thread::hardware_concurrency();
|
|
if (hwConcurrency != static_cast<unsigned int>(cpuCount))
|
|
{
|
|
std::cerr << "Warning: CPU count mismatch - "
|
|
"std::thread::hardware_concurrency() = "
|
|
<< hwConcurrency << ", sysconf(_SC_NPROCESSORS_ONLN) = "
|
|
<< cpuCount << "\n";
|
|
}
|
|
|
|
return cpuCount;
|
|
}
|
|
|
|
void MindThread::pinToCpu(int cpuId)
|
|
{
|
|
if (cpuId < 0)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": Invalid CPU ID: " + std::to_string(cpuId));
|
|
}
|
|
|
|
cpu_set_t cpuset;
|
|
CPU_ZERO(&cpuset);
|
|
CPU_SET(cpuId, &cpuset);
|
|
|
|
int result = pthread_setaffinity_np(
|
|
thread.native_handle(), sizeof(cpu_set_t), &cpuset);
|
|
if (result != 0)
|
|
{
|
|
throw std::runtime_error(std::string(__func__)
|
|
+ ": Failed to pin thread to CPU " + std::to_string(cpuId)
|
|
+ ": " + std::strerror(result));
|
|
}
|
|
|
|
pinnedCpuId = cpuId;
|
|
if (OptionParser::getOptions().verbose)
|
|
{
|
|
std::cout << name << ": Pinned to CPU " << cpuId << "\n";
|
|
}
|
|
}
|
|
|
|
} // namespace smo
|