Files
libspinscale/include/spinscale/componentThread.h
T

323 lines
8.6 KiB
C++
Raw Normal View History

2025-12-28 03:54:22 -04:00
#ifndef COMPONENT_THREAD_H
#define COMPONENT_THREAD_H
#include <atomic>
#include <thread>
#include <unordered_map>
#include <stdexcept>
#include <queue>
#include <functional>
#include <pthread.h>
#include <sched.h>
#include <unistd.h>
#include <memory>
2026-05-19 10:01:15 -04:00
#include <coroutine>
2025-12-28 03:54:22 -04:00
#include <cstdint>
#include <string>
2026-05-30 11:57:57 -04:00
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
#include <spinscale/cps/callback.h>
2025-12-28 03:54:22 -04:00
namespace sscl {
2026-02-22 17:38:53 -04:00
class PuppetComponent;
2026-02-18 01:13:02 -04:00
class PuppeteerThread;
2025-12-28 03:54:22 -04:00
class PuppetThread;
2026-02-22 17:38:53 -04:00
namespace pptr {
class PuppeteerComponent;
}
2025-12-28 03:54:22 -04:00
// ThreadId is a generic type - application-specific enums should be defined elsewhere
typedef uint8_t ThreadId;
class ComponentThread
{
protected:
ComponentThread(ThreadId _id, std::string _name)
2026-05-30 11:57:57 -04:00
: id(_id), name(std::move(_name)),
work(boost::asio::make_work_guard(io_context)), keepLooping(true)
2025-12-28 03:54:22 -04:00
{}
public:
virtual ~ComponentThread() = default;
void cleanup(void);
2026-05-30 11:57:57 -04:00
boost::asio::io_context& getIoContext(void) { return io_context; }
2025-12-28 03:54:22 -04:00
static const std::shared_ptr<ComponentThread> getSelf(void);
static bool tlsInitialized(void);
2026-02-22 17:38:53 -04:00
static void setPuppeteerThread(const std::shared_ptr<PuppeteerThread> &t);
static void setPuppeteerThreadId(ThreadId id);
static std::shared_ptr<PuppeteerThread> getPptr();
static std::shared_ptr<PuppeteerThread> getPuppeteer()
{ return getPptr(); }
2025-12-28 03:54:22 -04:00
// CPU management methods
static int getAvailableCpuCount();
typedef std::function<void()> mindShutdownIndOpCbFn;
// Intentionally doesn't take a callback.
void exceptionInd(const std::shared_ptr<ComponentThread> &faultyThread);
// Intentionally doesn't take a callback.
void userShutdownInd();
public:
ThreadId id;
std::string name;
2026-05-30 11:57:57 -04:00
boost::asio::io_context io_context;
boost::asio::executor_work_guard<
boost::asio::io_context::executor_type> work;
2025-12-28 03:54:22 -04:00
std::atomic<bool> keepLooping;
};
2026-02-18 01:13:02 -04:00
class PuppeteerThread
: public std::enable_shared_from_this<PuppeteerThread>,
2025-12-28 03:54:22 -04:00
public ComponentThread
{
public:
2026-02-22 17:38:53 -04:00
typedef void (*preJoltHookFn)(PuppeteerThread &);
struct EntryFnArguments
2025-12-28 03:54:22 -04:00
{
2026-02-22 17:38:53 -04:00
PuppeteerThread &usableBeforeJolt;
/** EXPLANATION:
* The `Puppet*Component` ref points at the Component object which this
* thread is associated with. However, we have no guarantee that this
* object has been constructed at the point of OS thread entry.
*
* Hence this ref must be dereferenced only after JOLT.
*/
pptr::PuppeteerComponent &useOnlyAfterJolt;
preJoltHookFn preJoltHook;
};
using entryPointFn = std::function<void(const EntryFnArguments &)>;
PuppeteerThread(
ThreadId id, std::string name,
entryPointFn entryPoint,
2026-02-22 17:38:53 -04:00
pptr::PuppeteerComponent &component,
preJoltHookFn preJoltFn)
: ComponentThread(id, std::move(name)),
2026-02-22 17:38:53 -04:00
entryFnArguments(*this, component, preJoltFn),
thread(std::move(entryPoint), std::cref(entryFnArguments))
{}
2025-12-28 03:54:22 -04:00
void initializeTls(void);
2026-02-22 17:38:53 -04:00
void exitLoop(void);
2025-12-28 03:54:22 -04:00
public:
2026-02-22 17:38:53 -04:00
EntryFnArguments entryFnArguments;
/** EXPLANATION:
* Must always be memberwise-initialized last.
* This ensures that the ref to this `ComponentThread` object, which is
* passed to the entry point function, is fully constructed when the OS
* thread begins executing.
*/
2025-12-28 03:54:22 -04:00
std::thread thread;
};
class PuppetThread
: public std::enable_shared_from_this<PuppetThread>,
public ComponentThread
{
public:
2026-02-22 17:38:53 -04:00
typedef void (*preJoltHookFn)(PuppetThread &);
struct EntryFnArguments
{
PuppetThread &usableBeforeJolt;
/** See comment above in:
* PuppeteerThread::EntryFnArguments::useOnlyAfterJolt.
*/
PuppetComponent &useOnlyAfterJolt;
preJoltHookFn preJoltHook;
};
using entryPointFn = std::function<void(const EntryFnArguments &)>;
2025-12-28 03:54:22 -04:00
enum class ThreadOp
{
START,
PAUSE,
RESUME,
EXIT,
JOLT,
N_ITEMS
};
2026-02-22 17:38:53 -04:00
PuppetThread(
ThreadId _id, std::string name,
entryPointFn entryPoint, PuppetComponent &component,
2026-02-22 17:38:53 -04:00
preJoltHookFn preJoltFn)
: ComponentThread(_id, std::move(name)),
2025-12-28 03:54:22 -04:00
pinnedCpuId(-1),
2026-05-30 11:57:57 -04:00
pause_work(boost::asio::make_work_guard(pause_io_context)),
2026-02-22 17:38:53 -04:00
entryFnArguments(*this, component, preJoltFn),
thread(std::move(entryPoint), std::cref(entryFnArguments))
{}
2025-12-28 03:54:22 -04:00
virtual ~PuppetThread() = default;
void initializeTls(void);
2026-05-19 10:01:15 -04:00
typedef std::function<void()> threadLifetimeMgmtOpCbFn;
struct ViralThreadLifetimeMgmtInvoker
{
struct AsyncState
{
std::atomic<bool> settled{false};
std::coroutine_handle<> callerSchedHandle;
};
ViralThreadLifetimeMgmtInvoker(
2026-05-19 10:01:15 -04:00
ThreadOp _threadOp,
PuppetThread &_parentThread,
const std::shared_ptr<PuppetThread> &_selfPtr = nullptr)
2026-05-19 10:06:23 -04:00
: threadOp(_threadOp),
asyncState(std::make_shared<AsyncState>()),
2026-05-19 10:06:23 -04:00
parentThread(_parentThread),
selfPtr(_selfPtr),
lifetimeMgmtCallback{
nullptr,
[asyncState = asyncState]()
2026-05-19 10:06:23 -04:00
{
asyncState->settled.store(true, std::memory_order_release);
std::coroutine_handle<> handle =
asyncState->callerSchedHandle;
if (!handle) {
return;
2026-05-19 10:06:23 -04:00
}
/** Post resume to the puppeteer queue: direct resume() from
* within an asio completion handler can destroy adapter
* coroutine state while the handler is still unwinding.
*/
boost::asio::post(
2026-05-30 11:57:57 -04:00
ComponentThread::getPptr()->getIoContext(),
[handle]() { handle.resume(); });
2026-05-19 10:06:23 -04:00
}}
{
if (threadOp == ThreadOp::JOLT && selfPtr == nullptr)
{
throw std::runtime_error(std::string(__func__)
+ ": JOLT request must be made with a valid selfPtr");
}
switch (threadOp)
{
case ThreadOp::START:
2026-05-19 10:06:23 -04:00
parentThread.startThreadReq(lifetimeMgmtCallback);
break;
case ThreadOp::PAUSE:
2026-05-19 10:06:23 -04:00
parentThread.pauseThreadReq(lifetimeMgmtCallback);
break;
case ThreadOp::RESUME:
2026-05-19 10:06:23 -04:00
parentThread.resumeThreadReq(lifetimeMgmtCallback);
break;
case ThreadOp::EXIT:
2026-05-19 10:06:23 -04:00
parentThread.exitThreadReq(lifetimeMgmtCallback);
break;
case ThreadOp::JOLT:
2026-05-19 10:06:23 -04:00
parentThread.joltThreadReq(selfPtr, lifetimeMgmtCallback);
break;
default:
throw std::runtime_error(std::string(__func__)
+ ": Invalid thread operation");
}
}
bool await_ready() const noexcept
{
return asyncState->settled.load(std::memory_order_acquire);
}
bool await_suspend(
2026-05-19 10:01:15 -04:00
std::coroutine_handle<> _callerSchedHandle) noexcept
{
if (asyncState->settled.load(std::memory_order_acquire)) {
return false;
}
asyncState->callerSchedHandle = _callerSchedHandle;
return true;
}
2026-05-19 06:45:59 -04:00
void await_resume() noexcept {}
2026-05-19 10:01:15 -04:00
ThreadOp threadOp;
std::shared_ptr<AsyncState> asyncState;
PuppetThread &parentThread;
const std::shared_ptr<PuppetThread> selfPtr;
2026-05-19 10:06:23 -04:00
cps::Callback<threadLifetimeMgmtOpCbFn> lifetimeMgmtCallback;
};
// Thread lifetime management request invokers
ViralThreadLifetimeMgmtInvoker startThreadAReq()
{ return ViralThreadLifetimeMgmtInvoker(ThreadOp::START, *this); }
ViralThreadLifetimeMgmtInvoker pauseThreadAReq()
{ return ViralThreadLifetimeMgmtInvoker(ThreadOp::PAUSE, *this); }
ViralThreadLifetimeMgmtInvoker resumeThreadAReq()
{ return ViralThreadLifetimeMgmtInvoker(ThreadOp::RESUME, *this); }
ViralThreadLifetimeMgmtInvoker exitThreadAReq()
{ return ViralThreadLifetimeMgmtInvoker(ThreadOp::EXIT, *this); }
2026-05-17 17:26:21 -04:00
void startThreadReq(cps::Callback<threadLifetimeMgmtOpCbFn> callback);
void exitThreadReq(cps::Callback<threadLifetimeMgmtOpCbFn> callback);
void pauseThreadReq(cps::Callback<threadLifetimeMgmtOpCbFn> callback);
void resumeThreadReq(cps::Callback<threadLifetimeMgmtOpCbFn> callback);
2025-12-28 03:54:22 -04:00
/**
* JOLTs this thread to begin processing after global initialization.
*
* JOLTing is the mechanism that allows threads to enter their main
* event loops and set up TLS vars after all global constructors have
* completed. This prevents race conditions during system startup.
*
* @param selfPtr Shared pointer to this thread (required because TLS
* isn't set up yet, so shared_from_this() can't be used)
* @param callback Callback to invoke when JOLT completes
*/
ViralThreadLifetimeMgmtInvoker joltThreadAReq(
const std::shared_ptr<PuppetThread> &selfPtr)
{ return ViralThreadLifetimeMgmtInvoker(ThreadOp::JOLT, *this, selfPtr); }
2025-12-28 03:54:22 -04:00
void joltThreadReq(
const std::shared_ptr<PuppetThread>& selfPtr,
2026-05-17 17:26:21 -04:00
cps::Callback<threadLifetimeMgmtOpCbFn> callback);
2025-12-28 03:54:22 -04:00
// CPU management methods
void pinToCpu(int cpuId);
public:
int pinnedCpuId;
2026-05-30 11:57:57 -04:00
boost::asio::io_context pause_io_context;
boost::asio::executor_work_guard<
boost::asio::io_context::executor_type> pause_work;
2026-02-22 17:38:53 -04:00
public:
EntryFnArguments entryFnArguments;
/** Must always be memberwise-initialized last.
* See comment on `PuppeteerThread::thread` for explanation.
*/
2025-12-28 03:54:22 -04:00
std::thread thread;
public:
class ThreadLifetimeMgmtOp;
};
2026-02-18 01:13:02 -04:00
namespace pptr {
extern std::shared_ptr<PuppeteerThread> thread;
extern ThreadId puppeteerThreadId;
} // namespace pptr
} // namespace sscl
2025-12-28 03:54:22 -04:00
#endif // COMPONENT_THREAD_H