PuppetApp: Now use coros instead of CPS

This commit is contained in:
2026-05-19 10:46:52 -04:00
parent 525530b567
commit abdb857e55
2 changed files with 145 additions and 177 deletions
+11 -16
View File
@@ -2,10 +2,10 @@
#define PUPPET_APPLICATION_H #define PUPPET_APPLICATION_H
#include <config.h> #include <config.h>
#include <exception>
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <vector> #include <vector>
#include <spinscale/cps/callback.h>
#include <spinscale/co/invokers.h> #include <spinscale/co/invokers.h>
#include <spinscale/componentThread.h> #include <spinscale/componentThread.h>
@@ -19,18 +19,16 @@ public:
const std::vector<std::shared_ptr<PuppetThread>> &threads); const std::vector<std::shared_ptr<PuppetThread>> &threads);
~PuppetApplication() = default; ~PuppetApplication() = default;
// Thread management methods co::NonViralNonPostingInvoker joltAllPuppetThreadsCReq(
typedef std::function<void()> puppetThreadLifetimeMgmtOpCbFn; std::exception_ptr &exceptionPtr, std::function<void()> callback);
NonViralNonPostingInvoker joltAllPuppetThreadsCReq( co::NonViralNonPostingInvoker startAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback); std::exception_ptr &exceptionPtr, std::function<void()> callback);
NonViralNonPostingInvoker startAllPuppetThreadsCReq( co::NonViralNonPostingInvoker pauseAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback); std::exception_ptr &exceptionPtr, std::function<void()> callback);
NonViralNonPostingInvoker pauseAllPuppetThreadsCReq( co::NonViralNonPostingInvoker resumeAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback); std::exception_ptr &exceptionPtr, std::function<void()> callback);
NonViralNonPostingInvoker resumeAllPuppetThreadsCReq( co::NonViralNonPostingInvoker exitAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback); std::exception_ptr &exceptionPtr, std::function<void()> callback);
NonViralNonPostingInvoker exitAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback);
// CPU distribution method // CPU distribution method
void distributeAndPinThreadsAcrossCpus(); void distributeAndPinThreadsAcrossCpus();
@@ -59,9 +57,6 @@ protected:
* a synchronization point for the entire system initialization. * a synchronization point for the entire system initialization.
*/ */
bool threadsHaveBeenJolted = false; bool threadsHaveBeenJolted = false;
private:
class PuppetThreadLifetimeMgmtOp;
}; };
} // namespace sscl } // namespace sscl
+134 -161
View File
@@ -1,205 +1,178 @@
#include <iostream> #include <iostream>
#include <spinscale/cps/asynchronousContinuation.h> #include <string_view>
#include <spinscale/asynchronousLoop.h> #include <vector>
#include <spinscale/cps/callback.h>
#include <spinscale/co/group.h>
#include <spinscale/puppetApplication.h> #include <spinscale/puppetApplication.h>
#include <spinscale/componentThread.h> #include <spinscale/componentThread.h>
namespace sscl { namespace sscl {
namespace puppet_application_detail {
constexpr std::string_view noPuppetThreadsToStartLogMessage =
"Mrntt: No puppet threads to start";
constexpr std::string_view noPuppetThreadsToPauseLogMessage =
"Mrntt: No puppet threads to pause";
constexpr std::string_view noPuppetThreadsToResumeLogMessage =
"Mrntt: No puppet threads to resume";
constexpr std::string_view noPuppetThreadsToExitLogMessage =
"Mrntt: No puppet threads to exit";
using PuppetLifetimeInvoker = PuppetThread::ViralThreadLifetimeMgmtInvoker;
using PuppetLifetimeGroup = co::Group<PuppetLifetimeInvoker>;
void addAllPuppetLifetimeInvokersToGroup(
PuppetLifetimeGroup &group,
std::vector<PuppetLifetimeInvoker> &invokers,
const std::vector<std::shared_ptr<PuppetThread>> &componentThreads,
PuppetThread::ThreadOp threadOp)
{
invokers.reserve(componentThreads.size());
for (const auto &thread : componentThreads)
{
switch (threadOp)
{
case PuppetThread::ThreadOp::START:
invokers.emplace_back(thread->startThreadAReq());
break;
case PuppetThread::ThreadOp::PAUSE:
invokers.emplace_back(thread->pauseThreadAReq());
break;
case PuppetThread::ThreadOp::RESUME:
invokers.emplace_back(thread->resumeThreadAReq());
break;
case PuppetThread::ThreadOp::EXIT:
invokers.emplace_back(thread->exitThreadAReq());
break;
case PuppetThread::ThreadOp::JOLT:
invokers.emplace_back(thread->joltThreadAReq(thread));
break;
default:
throw std::runtime_error(
std::string(__func__) + ": Invalid thread operation");
}
group.add(invokers.back());
}
}
co::NonViralNonPostingInvoker genericAllPuppetThreadsLifetimeOpCReq(
const std::vector<std::shared_ptr<PuppetThread>> &componentThreads,
PuppetThread::ThreadOp threadOp,
std::string_view emptyThreadsLogMessage,
[[maybe_unused]] std::exception_ptr &exceptionPtr,
[[maybe_unused]] std::function<void()> callback)
{
if (componentThreads.empty())
{
std::cout << emptyThreadsLogMessage << "\n";
co_return;
}
PuppetLifetimeGroup group;
std::vector<PuppetLifetimeInvoker> invokers;
addAllPuppetLifetimeInvokersToGroup(
group, invokers, componentThreads, threadOp);
co_await group.getAwaitAllSettlementsInvoker();
group.checkForAndReThrowGroupExceptions();
co_return;
}
} // namespace puppet_application_detail
PuppetApplication::PuppetApplication( PuppetApplication::PuppetApplication(
const std::vector<std::shared_ptr<PuppetThread>> &threads) const std::vector<std::shared_ptr<PuppetThread>> &threads)
: componentThreads(threads) : componentThreads(threads)
{ {
} }
class PuppetApplication::PuppetThreadLifetimeMgmtOp co::NonViralNonPostingInvoker PuppetApplication::joltAllPuppetThreadsCReq(
: public cps::NonPostedAsynchronousContinuation<puppetThreadLifetimeMgmtOpCbFn> [[maybe_unused]] std::exception_ptr &exceptionPtr,
{ [[maybe_unused]] std::function<void()> callback)
public:
PuppetThreadLifetimeMgmtOp(
PuppetApplication &parent, unsigned int nThreads,
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback)
: cps::NonPostedAsynchronousContinuation<puppetThreadLifetimeMgmtOpCbFn>(callback),
loop(nThreads),
parent(parent)
{}
public:
AsynchronousLoop loop;
PuppetApplication &parent;
public:
void joltAllPuppetThreadsReq1(
[[maybe_unused]] std::shared_ptr<PuppetThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
parent.threadsHaveBeenJolted = true;
callOriginalCb();
}
void executeGenericOpOnAllPuppetThreadsReq1(
[[maybe_unused]] std::shared_ptr<PuppetThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
callOriginalCb();
}
void exitAllPuppetThreadsReq1(
[[maybe_unused]] std::shared_ptr<PuppetThreadLifetimeMgmtOp> context
)
{
loop.incrementSuccessOrFailureDueTo(true);
if (!loop.isComplete()) {
return;
}
for (auto& thread : parent.componentThreads) {
thread->thread.join();
}
callOriginalCb();
}
};
void PuppetApplication::joltAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback
)
{ {
if (threadsHaveBeenJolted) if (threadsHaveBeenJolted)
{ {
std::cout << "Mrntt: All puppet threads already JOLTed. " std::cout << "Mrntt: All puppet threads already JOLTed. "
<< "Skipping JOLT request." << "\n"; << "Skipping JOLT request." << "\n";
callback.callbackFn(); co_return;
return;
} }
// If no threads, set flag and call callback immediately if (componentThreads.empty())
if (componentThreads.size() == 0 && callback.callbackFn)
{ {
threadsHaveBeenJolted = true; threadsHaveBeenJolted = true;
callback.callbackFn(); co_return;
return;
} }
// Create a counter to track when all threads have been jolted puppet_application_detail::PuppetLifetimeGroup group;
auto request = std::make_shared<PuppetThreadLifetimeMgmtOp>( std::vector<puppet_application_detail::PuppetLifetimeInvoker> invokers;
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads) puppet_application_detail::addAllPuppetLifetimeInvokersToGroup(
{ group, invokers, componentThreads, PuppetThread::ThreadOp::JOLT);
thread->joltThreadReq(
thread, co_await group.getAwaitAllSettlementsInvoker();
{request, std::bind( group.checkForAndReThrowGroupExceptions();
&PuppetThreadLifetimeMgmtOp::joltAllPuppetThreadsReq1,
request.get(), request)}); threadsHaveBeenJolted = true;
} co_return;
} }
void PuppetApplication::startAllPuppetThreadsCReq( co::NonViralNonPostingInvoker PuppetApplication::startAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback std::exception_ptr &exceptionPtr, std::function<void()> callback)
)
{ {
// If no threads, call callback immediately return puppet_application_detail::genericAllPuppetThreadsLifetimeOpCReq(
if (componentThreads.size() == 0 && callback.callbackFn) componentThreads, PuppetThread::ThreadOp::START,
{ puppet_application_detail::noPuppetThreadsToStartLogMessage,
callback.callbackFn(); exceptionPtr, callback);
return;
}
// Create a counter to track when all threads have started
auto request = std::make_shared<PuppetThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->startThreadReq(
{request, std::bind(
&PuppetThreadLifetimeMgmtOp::executeGenericOpOnAllPuppetThreadsReq1,
request.get(), request)});
}
} }
void PuppetApplication::pauseAllPuppetThreadsCReq( co::NonViralNonPostingInvoker PuppetApplication::pauseAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback std::exception_ptr &exceptionPtr, std::function<void()> callback)
)
{ {
// If no threads, call callback immediately return puppet_application_detail::genericAllPuppetThreadsLifetimeOpCReq(
if (componentThreads.size() == 0 && callback.callbackFn) componentThreads, PuppetThread::ThreadOp::PAUSE,
{ puppet_application_detail::noPuppetThreadsToPauseLogMessage,
callback.callbackFn(); exceptionPtr, callback);
return;
}
// Create a counter to track when all threads have paused
auto request = std::make_shared<PuppetThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->pauseThreadReq(
{request, std::bind(
&PuppetThreadLifetimeMgmtOp::executeGenericOpOnAllPuppetThreadsReq1,
request.get(), request)});
}
} }
void PuppetApplication::resumeAllPuppetThreadsCReq( co::NonViralNonPostingInvoker PuppetApplication::resumeAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback std::exception_ptr &exceptionPtr, std::function<void()> callback)
)
{ {
// If no threads, call callback immediately return puppet_application_detail::genericAllPuppetThreadsLifetimeOpCReq(
if (componentThreads.size() == 0 && callback.callbackFn) componentThreads, PuppetThread::ThreadOp::RESUME,
{ puppet_application_detail::noPuppetThreadsToResumeLogMessage,
callback.callbackFn(); exceptionPtr, callback);
return;
}
// Create a counter to track when all threads have resumed
auto request = std::make_shared<PuppetThreadLifetimeMgmtOp>(
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads)
{
thread->resumeThreadReq(
{request, std::bind(
&PuppetThreadLifetimeMgmtOp::executeGenericOpOnAllPuppetThreadsReq1,
request.get(), request)});
}
} }
void PuppetApplication::exitAllPuppetThreadsCReq( co::NonViralNonPostingInvoker PuppetApplication::exitAllPuppetThreadsCReq(
cps::Callback<puppetThreadLifetimeMgmtOpCbFn> callback [[maybe_unused]] std::exception_ptr &exceptionPtr,
) [[maybe_unused]] std::function<void()> callback)
{ {
// If no threads, call callback immediately if (componentThreads.empty())
if (componentThreads.size() == 0 && callback.callbackFn)
{ {
callback.callbackFn(); std::cout << puppet_application_detail::noPuppetThreadsToExitLogMessage
return; << "\n";
co_return;
} }
// Create a counter to track when all threads have exited puppet_application_detail::PuppetLifetimeGroup group;
auto request = std::make_shared<PuppetThreadLifetimeMgmtOp>( std::vector<puppet_application_detail::PuppetLifetimeInvoker> invokers;
*this, componentThreads.size(), callback);
for (auto& thread : componentThreads) puppet_application_detail::addAllPuppetLifetimeInvokersToGroup(
{ group, invokers, componentThreads, PuppetThread::ThreadOp::EXIT);
thread->exitThreadReq(
{request, std::bind( co_await group.getAwaitAllSettlementsInvoker();
&PuppetThreadLifetimeMgmtOp::exitAllPuppetThreadsReq1, group.checkForAndReThrowGroupExceptions();
request.get(), request)});
for (auto &thread : componentThreads) {
thread->thread.join();
} }
co_return;
} }
void PuppetApplication::distributeAndPinThreadsAcrossCpus() void PuppetApplication::distributeAndPinThreadsAcrossCpus()