Files
libspinscale/src/puppetApplication.cpp
T

188 lines
4.7 KiB
C++
Raw Normal View History

2025-12-28 03:54:22 -04:00
#include <iostream>
2026-05-19 10:46:52 -04:00
#include <string_view>
#include <vector>
#include <spinscale/co/group.h>
2025-12-28 03:54:22 -04:00
#include <spinscale/puppetApplication.h>
#include <spinscale/componentThread.h>
namespace sscl {
namespace {
2026-05-19 10:46:52 -04:00
constexpr std::string_view noPuppetThreadsToStartLogMessage =
"Mrntt: No puppet threads to start";
constexpr std::string_view noPuppetThreadsToPauseLogMessage =
"Mrntt: No puppet threads to pause";
constexpr std::string_view noPuppetThreadsToResumeLogMessage =
"Mrntt: No puppet threads to resume";
constexpr std::string_view noPuppetThreadsToExitLogMessage =
"Mrntt: No puppet threads to exit";
} // namespace
2026-05-19 10:46:52 -04:00
PuppetApplication::PuppetApplication(
const std::vector<std::shared_ptr<PuppetThread>> &threads)
: componentThreads(threads)
{
}
void PuppetApplication::addAllPuppetLifetimeInvokersToGroup(
PuppetLifetimeMgmtGroup &group,
std::vector<PuppetLifetimeMgmtInvoker> &invokers,
PuppetThread::ThreadOp threadOp) const
2025-12-28 03:54:22 -04:00
{
2026-05-19 10:46:52 -04:00
invokers.reserve(componentThreads.size());
2025-12-28 03:54:22 -04:00
2026-05-19 10:46:52 -04:00
for (const auto &thread : componentThreads)
2025-12-28 03:54:22 -04:00
{
2026-05-19 10:46:52 -04:00
switch (threadOp)
{
case PuppetThread::ThreadOp::START:
invokers.emplace_back(thread->startThreadAReq());
break;
case PuppetThread::ThreadOp::PAUSE:
invokers.emplace_back(thread->pauseThreadAReq());
break;
case PuppetThread::ThreadOp::RESUME:
invokers.emplace_back(thread->resumeThreadAReq());
break;
case PuppetThread::ThreadOp::EXIT:
invokers.emplace_back(thread->exitThreadAReq());
break;
case PuppetThread::ThreadOp::JOLT:
invokers.emplace_back(thread->joltThreadAReq(thread));
break;
default:
throw std::runtime_error(
std::string(__func__) + ": Invalid thread operation");
2025-12-28 03:54:22 -04:00
}
2026-05-19 10:46:52 -04:00
group.add(invokers.back());
2025-12-28 03:54:22 -04:00
}
2026-05-19 10:46:52 -04:00
}
2025-12-28 03:54:22 -04:00
co::ViralNonPostingInvoker<void>
PuppetApplication::joltAllPuppetThreadsCReq(
2026-05-19 10:46:52 -04:00
[[maybe_unused]] std::exception_ptr &exceptionPtr,
[[maybe_unused]] std::function<void()> callback)
{
if (threadsHaveBeenJolted)
{
std::cout << "Mrntt: All puppet threads already JOLTed. "
<< "Skipping JOLT request." << "\n";
co_return;
}
2026-05-19 10:46:52 -04:00
if (componentThreads.empty())
2025-12-28 03:54:22 -04:00
{
threadsHaveBeenJolted = true;
2026-05-19 10:46:52 -04:00
co_return;
2025-12-28 03:54:22 -04:00
}
PuppetLifetimeMgmtGroup group;
std::vector<PuppetLifetimeMgmtInvoker> invokers;
2025-12-28 03:54:22 -04:00
2026-05-19 10:46:52 -04:00
addAllPuppetLifetimeInvokersToGroup(
group, invokers, PuppetThread::ThreadOp::JOLT);
co_await group.getAwaitAllSettlementsInvoker();
2026-05-19 10:46:52 -04:00
group.checkForAndReThrowGroupExceptions();
2025-12-28 03:54:22 -04:00
threadsHaveBeenJolted = true;
2026-05-19 10:46:52 -04:00
co_return;
}
co::ViralNonPostingInvoker<void>
PuppetApplication::allPuppetThreadsLifetimeOpCReq(
2026-05-19 10:46:52 -04:00
[[maybe_unused]] std::exception_ptr &exceptionPtr,
[[maybe_unused]] std::function<void()> callback,
PuppetThread::ThreadOp threadOp,
std::string_view emptyThreadsLogMessage)
2025-12-28 03:54:22 -04:00
{
2026-05-19 10:46:52 -04:00
if (componentThreads.empty())
2025-12-28 03:54:22 -04:00
{
std::cout << emptyThreadsLogMessage << "\n";
2026-05-19 10:46:52 -04:00
co_return;
2025-12-28 03:54:22 -04:00
}
PuppetLifetimeMgmtGroup group;
std::vector<PuppetLifetimeMgmtInvoker> invokers;
2025-12-28 03:54:22 -04:00
addAllPuppetLifetimeInvokersToGroup(group, invokers, threadOp);
co_await group.getAwaitAllSettlementsInvoker();
2026-05-19 10:46:52 -04:00
group.checkForAndReThrowGroupExceptions();
2025-12-28 03:54:22 -04:00
2026-05-19 10:46:52 -04:00
co_return;
2025-12-28 03:54:22 -04:00
}
co::ViralNonPostingInvoker<void>
PuppetApplication::startAllPuppetThreadsCReq(
2026-05-19 10:46:52 -04:00
std::exception_ptr &exceptionPtr, std::function<void()> callback)
2025-12-28 03:54:22 -04:00
{
return allPuppetThreadsLifetimeOpCReq(
exceptionPtr, std::move(callback),
PuppetThread::ThreadOp::START,
noPuppetThreadsToStartLogMessage);
2025-12-28 03:54:22 -04:00
}
co::ViralNonPostingInvoker<void>
PuppetApplication::pauseAllPuppetThreadsCReq(
2026-05-19 10:46:52 -04:00
std::exception_ptr &exceptionPtr, std::function<void()> callback)
2025-12-28 03:54:22 -04:00
{
return allPuppetThreadsLifetimeOpCReq(
exceptionPtr, std::move(callback),
PuppetThread::ThreadOp::PAUSE,
noPuppetThreadsToPauseLogMessage);
2026-05-19 10:46:52 -04:00
}
2025-12-28 03:54:22 -04:00
co::ViralNonPostingInvoker<void>
PuppetApplication::resumeAllPuppetThreadsCReq(
2026-05-19 10:46:52 -04:00
std::exception_ptr &exceptionPtr, std::function<void()> callback)
{
return allPuppetThreadsLifetimeOpCReq(
exceptionPtr, std::move(callback),
PuppetThread::ThreadOp::RESUME,
noPuppetThreadsToResumeLogMessage);
2025-12-28 03:54:22 -04:00
}
co::ViralNonPostingInvoker<void>
PuppetApplication::exitAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr,
std::function<void()> callback)
2025-12-28 03:54:22 -04:00
{
2026-05-19 10:46:52 -04:00
if (componentThreads.empty())
2025-12-28 03:54:22 -04:00
{
std::cout << noPuppetThreadsToExitLogMessage << "\n";
2026-05-19 10:46:52 -04:00
co_return;
2025-12-28 03:54:22 -04:00
}
co_await allPuppetThreadsLifetimeOpCReq(
exceptionPtr, std::move(callback),
PuppetThread::ThreadOp::EXIT,
noPuppetThreadsToExitLogMessage);
2026-05-19 10:46:52 -04:00
for (auto &thread : componentThreads) {
thread->thread.join();
2025-12-28 03:54:22 -04:00
}
2026-05-19 10:46:52 -04:00
co_return;
2025-12-28 03:54:22 -04:00
}
void PuppetApplication::distributeAndPinThreadsAcrossCpus()
{
int cpuCount = ComponentThread::getAvailableCpuCount();
int threadIndex = 0;
for (auto& thread : componentThreads)
{
int targetCpu = threadIndex % cpuCount;
thread->pinToCpu(targetCpu);
++threadIndex;
}
std::cout << __func__ << ": Distributed " << threadIndex << " threads "
<< "across " << cpuCount << " CPUs\n";
}
} // namespace sscl