Compare commits

...

2 Commits

Author SHA1 Message Date
hayodea abdb857e55 PuppetApp: Now use coros instead of CPS 2026-05-19 10:46:52 -04:00
hayodea 525530b567 Compilation fixups 2026-05-19 10:06:23 -04:00
3 changed files with 164 additions and 193 deletions
+18 -16
View File
@@ -169,18 +169,19 @@ public:
ThreadOp _threadOp,
PuppetThread &_parentThread,
const std::shared_ptr<PuppetThread> &_selfPtr = nullptr)
: threadOp(_threadOp), parentThread(_parentThread), selfPtr(_selfPtr)
: threadOp(_threadOp),
parentThread(_parentThread),
selfPtr(_selfPtr),
lifetimeMgmtCallback{
nullptr,
[this]()
{
settled = true;
if (callerSchedHandle) {
callerSchedHandle.resume();
}
}}
{
cps::Callback<threadLifetimeMgmtOpCbFn> callback{
nullptr,
[this]()
{
settled = true;
if (callerSchedHandle) {
callerSchedHandle.resume();
}
}};
if (threadOp == ThreadOp::JOLT && selfPtr == nullptr)
{
throw std::runtime_error(std::string(__func__)
@@ -190,19 +191,19 @@ public:
switch (threadOp)
{
case ThreadOp::START:
parentThread.startThreadReq(callback);
parentThread.startThreadReq(lifetimeMgmtCallback);
break;
case ThreadOp::PAUSE:
parentThread.pauseThreadReq(callback);
parentThread.pauseThreadReq(lifetimeMgmtCallback);
break;
case ThreadOp::RESUME:
parentThread.resumeThreadReq(callback);
parentThread.resumeThreadReq(lifetimeMgmtCallback);
break;
case ThreadOp::EXIT:
parentThread.exitThreadReq(callback);
parentThread.exitThreadReq(lifetimeMgmtCallback);
break;
case ThreadOp::JOLT:
parentThread.joltThreadReq(selfPtr, callback);
parentThread.joltThreadReq(selfPtr, lifetimeMgmtCallback);
break;
default:
@@ -228,6 +229,7 @@ public:
std::coroutine_handle<> callerSchedHandle;
PuppetThread &parentThread;
const std::shared_ptr<PuppetThread> selfPtr;
cps::Callback<threadLifetimeMgmtOpCbFn> lifetimeMgmtCallback;
};
ViralThreadLifetimeMgmtInvoker startThreadAReq()
+12 -16
View File
@@ -2,10 +2,11 @@
#define PUPPET_APPLICATION_H
#include <config.h>
#include <exception>
#include <functional>
#include <memory>
#include <vector>
#include <spinscale/cps/callback.h>
#include <spinscale/co/invokers.h>
#include <spinscale/componentThread.h>
namespace sscl {
@@ -18,18 +19,16 @@ public:
const std::vector<std::shared_ptr<PuppetThread>> &threads);
~PuppetApplication() = default;
// Thread management methods
typedef std::function<void()> puppetThreadLifetimeMgmtOpCbFn;
void joltAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback);
void startAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback);
void pauseAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback);
void resumeAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback);
void exitAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback);
co::NonViralNonPostingInvoker joltAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback);
co::NonViralNonPostingInvoker startAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback);
co::NonViralNonPostingInvoker pauseAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback);
co::NonViralNonPostingInvoker resumeAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback);
co::NonViralNonPostingInvoker exitAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback);
// CPU distribution method
void distributeAndPinThreadsAcrossCpus();
@@ -58,9 +57,6 @@ protected:
* a synchronization point for the entire system initialization.
*/
bool threadsHaveBeenJolted = false;
private:
class PuppetThreadLifetimeMgmtOp;
};
} // namespace sscl
+134 -161
View File
@@ -1,205 +1,178 @@
#include <iostream>
#include <spinscale/cps/asynchronousContinuation.h>
#include <spinscale/asynchronousLoop.h>
#include <spinscale/cps/callback.h>
#include <string_view>
#include <vector>
#include <spinscale/co/group.h>
#include <spinscale/puppetApplication.h>
#include <spinscale/componentThread.h>
namespace sscl {
namespace puppet_application_detail {
constexpr std::string_view noPuppetThreadsToStartLogMessage =
"Mrntt: No puppet threads to start";
constexpr std::string_view noPuppetThreadsToPauseLogMessage =
"Mrntt: No puppet threads to pause";
constexpr std::string_view noPuppetThreadsToResumeLogMessage =
"Mrntt: No puppet threads to resume";
constexpr std::string_view noPuppetThreadsToExitLogMessage =
"Mrntt: No puppet threads to exit";
using PuppetLifetimeInvoker = PuppetThread::ViralThreadLifetimeMgmtInvoker;
using PuppetLifetimeGroup = co::Group<PuppetLifetimeInvoker>;
void addAllPuppetLifetimeInvokersToGroup(
PuppetLifetimeGroup &group,
std::vector<PuppetLifetimeInvoker> &invokers,
const std::vector<std::shared_ptr<PuppetThread>> &componentThreads,
PuppetThread::ThreadOp threadOp)
{
invokers.reserve(componentThreads.size());
for (const auto &thread : componentThreads)
{
switch (threadOp)
{
case PuppetThread::ThreadOp::START:
invokers.emplace_back(thread->startThreadAReq());
break;
case PuppetThread::ThreadOp::PAUSE:
invokers.emplace_back(thread->pauseThreadAReq());
break;
case PuppetThread::ThreadOp::RESUME:
invokers.emplace_back(thread->resumeThreadAReq());
break;
case PuppetThread::ThreadOp::EXIT:
invokers.emplace_back(thread->exitThreadAReq());
break;
case PuppetThread::ThreadOp::JOLT:
invokers.emplace_back(thread->joltThreadAReq(thread));
break;
default:
throw std::runtime_error(
std::string(__func__) + ": Invalid thread operation");
}
group.add(invokers.back());
}
}
co::NonViralNonPostingInvoker genericAllPuppetThreadsLifetimeOpCReq(
const std::vector<std::shared_ptr<PuppetThread>> &componentThreads,
PuppetThread::ThreadOp threadOp,
std::string_view emptyThreadsLogMessage,
[[maybe_unused]] std::exception_ptr &exceptionPtr,
[[maybe_unused]] std::function<void()> callback)
{
if (componentThreads.empty())
{
std::cout << emptyThreadsLogMessage << "\n";
co_return;
}
PuppetLifetimeGroup group;
std::vector<PuppetLifetimeInvoker> invokers;
addAllPuppetLifetimeInvokersToGroup(
group, invokers, componentThreads, threadOp);
co_await group.getAwaitAllSettlementsInvoker();
group.checkForAndReThrowGroupExceptions();
co_return;
}
} // namespace puppet_application_detail
PuppetApplication::PuppetApplication(
const std::vector<std::shared_ptr<PuppetThread>> &threads)
: componentThreads(threads)
{
}
class PuppetApplication::PuppetThreadLifetimeMgmtOp
: public cps::NonPostedAsynchronousContinuation<puppetThreadLifetimeMgmtOpCbFn>
{
public:
PuppetThreadLifetimeMgmtOp(
PuppetApplication &parent, unsigned int nThreads,
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback)
: cps::NonPostedAsynchronousContinuation<puppetThreadLifetimeMgmtOpCbFn>(callback),
loop(nThreads),
parent(parent)
{}
public:
AsynchronousLoop loop;
PuppetApplication &parent;
public:
void joltAllPuppetThreadsReq1(
[[maybe_unused]] std::shared_ptr<PuppetThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
parent.threadsHaveBeenJolted = true;
callOriginalCb();
}
void executeGenericOpOnAllPuppetThreadsReq1(
[[maybe_unused]] std::shared_ptr<PuppetThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
callOriginalCb();
}
void exitAllPuppetThreadsReq1(
[[maybe_unused]] std::shared_ptr<PuppetThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
for (auto& thread : parent.componentThreads) {
thread->thread.join();
}
callOriginalCb();
}
};
void PuppetApplication::joltAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback
)
co::NonViralNonPostingInvoker PuppetApplication::joltAllPuppetThreadsCReq(
[[maybe_unused]] std::exception_ptr &exceptionPtr,
[[maybe_unused]] std::function<void()> callback)
{
if (threadsHaveBeenJolted)
{
std::cout << "Mrntt: All puppet threads already JOLTed. "
<< "Skipping JOLT request." << "\n";
callback.callbackFn();
return;
co_return;
}
// If no threads, set flag and call callback immediately
if (componentThreads.size() == 0 && callback.callbackFn)
if (componentThreads.empty())
{
threadsHaveBeenJolted = true;
callback.callbackFn();
return;
co_return;
}
// Create a counter to track when all threads have been jolted
auto request = std::make_shared<PuppetThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
puppet_application_detail::PuppetLifetimeGroup group;
std::vector<puppet_application_detail::PuppetLifetimeInvoker> invokers;
for (auto& thread : componentThreads)
{
thread->joltThreadReq(
thread,
{request, std::bind(
&PuppetThreadLifetimeMgmtOp::joltAllPuppetThreadsReq1,
request.get(), request)});
}
puppet_application_detail::addAllPuppetLifetimeInvokersToGroup(
group, invokers, componentThreads, PuppetThread::ThreadOp::JOLT);
co_await group.getAwaitAllSettlementsInvoker();
group.checkForAndReThrowGroupExceptions();
threadsHaveBeenJolted = true;
co_return;
}
void PuppetApplication::startAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback
)
co::NonViralNonPostingInvoker PuppetApplication::startAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback)
{
// If no threads, call callback immediately
if (componentThreads.size() == 0 && callback.callbackFn)
{
callback.callbackFn();
return;
}
// Create a counter to track when all threads have started
auto request = std::make_shared<PuppetThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->startThreadReq(
{request, std::bind(
&PuppetThreadLifetimeMgmtOp::executeGenericOpOnAllPuppetThreadsReq1,
request.get(), request)});
}
return puppet_application_detail::genericAllPuppetThreadsLifetimeOpCReq(
componentThreads, PuppetThread::ThreadOp::START,
puppet_application_detail::noPuppetThreadsToStartLogMessage,
exceptionPtr, callback);
}
void PuppetApplication::pauseAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback
)
co::NonViralNonPostingInvoker PuppetApplication::pauseAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback)
{
// If no threads, call callback immediately
if (componentThreads.size() == 0 && callback.callbackFn)
{
callback.callbackFn();
return;
}
// Create a counter to track when all threads have paused
auto request = std::make_shared<PuppetThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->pauseThreadReq(
{request, std::bind(
&PuppetThreadLifetimeMgmtOp::executeGenericOpOnAllPuppetThreadsReq1,
request.get(), request)});
}
return puppet_application_detail::genericAllPuppetThreadsLifetimeOpCReq(
componentThreads, PuppetThread::ThreadOp::PAUSE,
puppet_application_detail::noPuppetThreadsToPauseLogMessage,
exceptionPtr, callback);
}
void PuppetApplication::resumeAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback
)
co::NonViralNonPostingInvoker PuppetApplication::resumeAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback)
{
// If no threads, call callback immediately
if (componentThreads.size() == 0 && callback.callbackFn)
{
callback.callbackFn();
return;
}
// Create a counter to track when all threads have resumed
auto request = std::make_shared<PuppetThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->resumeThreadReq(
{request, std::bind(
&PuppetThreadLifetimeMgmtOp::executeGenericOpOnAllPuppetThreadsReq1,
request.get(), request)});
}
return puppet_application_detail::genericAllPuppetThreadsLifetimeOpCReq(
componentThreads, PuppetThread::ThreadOp::RESUME,
puppet_application_detail::noPuppetThreadsToResumeLogMessage,
exceptionPtr, callback);
}
void PuppetApplication::exitAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback
)
co::NonViralNonPostingInvoker PuppetApplication::exitAllPuppetThreadsCReq(
[[maybe_unused]] std::exception_ptr &exceptionPtr,
[[maybe_unused]] std::function<void()> callback)
{
// If no threads, call callback immediately
if (componentThreads.size() == 0 && callback.callbackFn)
if (componentThreads.empty())
{
callback.callbackFn();
return;
std::cout << puppet_application_detail::noPuppetThreadsToExitLogMessage
<< "\n";
co_return;
}
// Create a counter to track when all threads have exited
auto request = std::make_shared<PuppetThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
puppet_application_detail::PuppetLifetimeGroup group;
std::vector<puppet_application_detail::PuppetLifetimeInvoker> invokers;
for (auto& thread : componentThreads)
{
thread->exitThreadReq(
{request, std::bind(
&PuppetThreadLifetimeMgmtOp::exitAllPuppetThreadsReq1,
request.get(), request)});
puppet_application_detail::addAllPuppetLifetimeInvokersToGroup(
group, invokers, componentThreads, PuppetThread::ThreadOp::EXIT);
co_await group.getAwaitAllSettlementsInvoker();
group.checkForAndReThrowGroupExceptions();
for (auto &thread : componentThreads) {
thread->thread.join();
}
co_return;
}
void PuppetApplication::distributeAndPinThreadsAcrossCpus()