diff --git a/include/spinscale/co/group.h b/include/spinscale/co/group.h index ff6f895..07a71b2 100644 --- a/include/spinscale/co/group.h +++ b/include/spinscale/co/group.h @@ -439,7 +439,7 @@ struct Group * would be impossible. * * So we should be able to call resume() directly here without - * post()ing to current_io_context(). + * post()ing to ComponentThread::getSelf()->getIoService(). * * EXPLANATION: * However, in order to ensure that we keep this adapter coro diff --git a/include/spinscale/co/invokers.h b/include/spinscale/co/invokers.h index dc70399..5ef65a3 100644 --- a/include/spinscale/co/invokers.h +++ b/include/spinscale/co/invokers.h @@ -47,7 +47,10 @@ struct NonViralPostingInvoker */ std::ostringstream oss; oss << std::this_thread::get_id() - << ": Missing completion lambda: non-viral coroutines require a completion lambda."; + << ": Missing completion lambda: non-viral coroutines require a completion lambda." + << " Promise type=" << typeid(*this).name() + << ". This usually means promise construction did not bind the" + << " (exception_ptr&, function, ...) constructor."; throw std::runtime_error(oss.str()); } @@ -153,12 +156,12 @@ struct ViralPostingInvoker * from get_return_object(). ~NonPostingInvoker destroys the callee frame. */ struct NonViralNonPostingInvoker -: public NonPostingInvoker +: public NonPostingInvoker, void> { struct promise_type - : public NonPostingPromise + : public NonPostingPromise { - using NonPostingPromise::NonPostingPromise; + using NonPostingPromise::NonPostingPromise; NonViralNonPostingInvoker get_return_object() { @@ -169,7 +172,10 @@ struct NonViralNonPostingInvoker { std::ostringstream oss; oss << std::this_thread::get_id() - << ": Missing completion lambda: non-viral coroutines require a completion lambda."; + << ": Missing completion lambda: non-viral coroutines require a completion lambda." + << " Promise type=" << typeid(*this).name() + << ". This usually means promise construction did not bind the" + << " (exception_ptr&, function, ...) constructor."; throw std::runtime_error(oss.str()); } @@ -180,7 +186,7 @@ struct NonViralNonPostingInvoker } }; - using NonPostingInvoker::NonPostingInvoker; + using NonPostingInvoker, void>::NonPostingInvoker; bool await_ready() const noexcept { std::terminate(); } @@ -192,6 +198,66 @@ struct NonViralNonPostingInvoker { std::terminate(); } }; +/** Viral awaitable non-posting coroutine: runs eagerly on the caller thread + * (initial_suspend is never). Caller resume uses symmetric transfer when the + * caller has registered before callee completion; otherwise PostBackStatus + * fast-paths await_resume on co_await. + */ +template +struct ViralNonPostingInvoker +: public NonPostingInvoker, T> +{ + struct promise_type + : public NonPostingPromise + { + using NonPostingPromise::NonPostingPromise; + + ViralNonPostingInvoker get_return_object() noexcept + { +#ifdef CONFIG_LIBSSCL_DEBUG_CO + std::cout << __func__ << ": " << std::this_thread::get_id() + << " Returning ViralNonPostingInvoker.\n"; +#endif + this->setSelfSchedHandle( + std::coroutine_handle::from_promise(*this)); + + return ViralNonPostingInvoker(*this); + } + }; + + using NonPostingInvoker, T>::NonPostingInvoker; + + bool await_ready() const noexcept + { return false; } + + template + bool await_suspend( + std::coroutine_handle callerSchedHandle) noexcept + { + static_assert( + std::is_base_of_v, + "ViralNonPostingInvoker caller promise must derive from " + "PromiseChainLink"); +#ifdef CONFIG_LIBSSCL_DEBUG_CO + std::cout << __func__ << ": " << std::this_thread::get_id() + << " Setting callerSchedHandle.\n"; +#endif + const bool suspendCaller = + this->setCallerSchedHandle(callerSchedHandle); +#ifdef CONFIG_LIBSSCL_DEBUG_CO + std::cout << __func__ << ": " << std::this_thread::get_id() + << " CallerFlowExecutor returned suspend=" << suspendCaller + << ".\n"; +#endif + return suspendCaller; + } + + auto await_resume() + { + return NonPostingInvoker, T>::await_resume(); + } +}; + } // namespace sscl::co #endif // INVOKERS_H diff --git a/include/spinscale/co/nonPostingInvoker.h b/include/spinscale/co/nonPostingInvoker.h index 659e64c..8f37f20 100644 --- a/include/spinscale/co/nonPostingInvoker.h +++ b/include/spinscale/co/nonPostingInvoker.h @@ -5,13 +5,14 @@ #include #include #include +#include #include #include namespace sscl::co { -template +template class NonPostingInvoker { public: @@ -39,15 +40,55 @@ public: } } - ReturnValues &completedReturnValues() noexcept + template + bool setCallerSchedHandle( + std::coroutine_handle callerSchedHandle) noexcept + { + static_assert( + std::is_base_of_v, + "NonPostingInvoker caller promise must derive from PromiseChainLink"); + + calleePromise.callerSchedHandle = callerSchedHandle; + calleePromise.setCallerPromiseChainLink( + &callerSchedHandle.promise()); +#ifdef CONFIG_LIBSSCL_DEBUG_CO + std::cout << __func__ << ": " << std::this_thread::get_id() + << " Done setting callerSchedHandle; running CallerFlowExecutor.\n"; +#endif + return calleePromise.postBackStatus.getCallerFlowExecutor()(); + } + + ReturnValues &completedReturnValues() noexcept { return calleePromise.returnValues; } - const ReturnValues &completedReturnValues() const noexcept + const ReturnValues &completedReturnValues() const noexcept { return calleePromise.returnValues; } -private: + auto await_resume() + { + calleePromise.postBackStatus.reset(); + + ReturnValues &returnValues = calleePromise.returnValues; +#ifdef CONFIG_LIBSSCL_DEBUG_CO + std::cout << __func__ << ": " << std::this_thread::get_id() + << " About to check for and rethrow any exception.\n"; +#endif + + if (returnValues.myExceptionPtr) { + std::exception_ptr const captured = returnValues.myExceptionPtr; + std::rethrow_exception(captured); + } + if constexpr (!std::is_void_v) + { + T result = std::move(returnValues.myReturnValue); + return result; + } + } + +protected: PromiseType &calleePromise; +private: /** Every live invoker owns destruction of its callee coroutine frame in * ~NonPostingInvoker (via calleePromise.selfSchedHandle). * diff --git a/include/spinscale/co/nonPostingPromise.h b/include/spinscale/co/nonPostingPromise.h index b7c5a7c..c24d818 100644 --- a/include/spinscale/co/nonPostingPromise.h +++ b/include/spinscale/co/nonPostingPromise.h @@ -9,15 +9,99 @@ #include #include +#include #include #include #include namespace sscl::co { +template struct NonPostingPromise -: public PromiseChainLink +: public PromiseChainLink, + public PostingPromiseReturnOps, T> { + struct PostBackStatus + { + struct CalleeFlowExecutor; + struct CallerFlowExecutor; + friend struct CalleeFlowExecutor; + friend struct CallerFlowExecutor; + + explicit PostBackStatus(NonPostingPromise &calleePromiseIn) noexcept + : calleePromise(calleePromiseIn) + {} + + void reset() noexcept + { + sscl::SpinLock::Guard guard(lock); + callerHasSetCallerSchedHandle = false; + calleeIsReadyToPostBack = false; + } + + struct FlowExecutor + { + explicit FlowExecutor(PostBackStatus &parentIn) noexcept + : parent(parentIn) + {} + + PostBackStatus &parent; + }; + + struct CalleeFlowExecutor + : public FlowExecutor + { + explicit CalleeFlowExecutor(PostBackStatus &parentIn) noexcept + : FlowExecutor(parentIn) + {} + + bool operator()() noexcept + { + sscl::SpinLock::Guard guard(this->parent.lock); + this->parent.calleeIsReadyToPostBack = true; + if (this->parent.callerHasSetCallerSchedHandle) { + return true; + } + return false; + } + }; + + struct CallerFlowExecutor + : public FlowExecutor + { + explicit CallerFlowExecutor(PostBackStatus &parentIn) noexcept + : FlowExecutor(parentIn) + {} + + bool operator()() noexcept + { + sscl::SpinLock::Guard guard(this->parent.lock); + this->parent.callerHasSetCallerSchedHandle = true; + if (this->parent.calleeIsReadyToPostBack) { + return false; + } + return true; + } + }; + + CalleeFlowExecutor getCalleeFlowExecutor() noexcept + { + return CalleeFlowExecutor(*this); + } + + CallerFlowExecutor getCallerFlowExecutor() noexcept + { + return CallerFlowExecutor(*this); + } + + NonPostingPromise &calleePromise; + + private: + sscl::SpinLock lock; + bool callerHasSetCallerSchedHandle = false; + bool calleeIsReadyToPostBack = false; + }; + /** Completion work must run from this awaiter's await_suspend, not * synchronously inside promise.final_suspend() before it returns: the * hidden coroutine segment index in the coroutine state is only advanced @@ -26,16 +110,19 @@ struct NonPostingPromise struct FinalSuspendNonPostingInvoker : public std::suspend_always { - explicit FinalSuspendNonPostingInvoker(NonPostingPromise &calleePromiseIn) noexcept - : calleePromise(calleePromiseIn) + explicit FinalSuspendNonPostingInvoker( + NonPostingPromise &calleePromiseIn) noexcept + : calleePromise(calleePromiseIn) {} - bool await_suspend(std::coroutine_handle<> const) noexcept + std::coroutine_handle<> await_suspend( + std::coroutine_handle<> const) noexcept { if (calleePromise.callerLambda) { #ifdef CONFIG_LIBSSCL_DEBUG_CO - std::cout << "final_suspend" << ": " << std::this_thread::get_id() + std::cout << "final_suspend" << ": " + << std::this_thread::get_id() << " Non-viral non-posting: invoking callerLambda directly.\n"; #endif if (calleePromise.returnValues.myExceptionPtr) { @@ -44,15 +131,29 @@ struct NonPostingPromise } calleePromise.callerLambda(); + return std::noop_coroutine(); } - return true; + +#ifdef CONFIG_LIBSSCL_DEBUG_CO + std::cout << "final_suspend" << ": " << std::this_thread::get_id() + << " Viral non-posting: running CalleeFlowExecutor.\n"; +#endif + const bool symmetricTransferToCaller = + calleePromise.postBackStatus.getCalleeFlowExecutor()(); + + if (symmetricTransferToCaller && calleePromise.callerSchedHandle) { + return calleePromise.callerSchedHandle; + } + + return std::noop_coroutine(); } NonPostingPromise &calleePromise; }; NonPostingPromise() noexcept - : returnValues() + : returnValues(), + postBackStatus(*this) {} template @@ -61,13 +162,27 @@ struct NonPostingPromise std::function callerLambdaIn, TailArgs &&...) noexcept : returnValues(callerExceptionPtr), - callerLambda(std::move(callerLambdaIn)) + callerLambda(std::move(callerLambdaIn)), + postBackStatus(*this) + {} + + template + requires (!std::same_as, std::exception_ptr>) + NonPostingPromise( + ObjectArg &&, + std::exception_ptr &callerExceptionPtr, + std::function callerLambdaIn, + TailArgs &&...) noexcept + : NonPostingPromise( + callerExceptionPtr, + std::move(callerLambdaIn)) {} ~NonPostingPromise() noexcept { #ifdef CONFIG_LIBSSCL_DEBUG_CO - std::cout << __func__ << ": " << std::this_thread::get_id() << " Destructing.\n"; + std::cout << __func__ << ": " << std::this_thread::get_id() + << " Destructing.\n"; #endif } @@ -77,9 +192,6 @@ struct NonPostingPromise auto final_suspend() noexcept { return FinalSuspendNonPostingInvoker(*this); } - void return_void() noexcept - { return; } - void unhandled_exception() noexcept { returnValues.myExceptionPtr = std::current_exception(); @@ -90,16 +202,26 @@ struct NonPostingPromise eraseFirstMatchingAcquiredLock(coQutex); } + const PromiseChainLink *callerPromiseChainLink() const noexcept override + { return callerChainLink; } + + PromiseChainLink *callerPromiseChainLink() noexcept override + { return callerChainLink; } + void setSelfSchedHandle(std::coroutine_handle<> schedHandle) noexcept - { - selfSchedHandle = schedHandle; - } + { selfSchedHandle = schedHandle; } - ReturnValues returnValues; + void setCallerPromiseChainLink(PromiseChainLink *chainLink) noexcept + { callerChainLink = chainLink; } + + ReturnValues returnValues; std::function callerLambda; + PostBackStatus postBackStatus; std::coroutine_handle<> selfSchedHandle; + std::coroutine_handle<> callerSchedHandle; + PromiseChainLink *callerChainLink = nullptr; - template + template friend class NonPostingInvoker; }; diff --git a/include/spinscale/co/promises.h b/include/spinscale/co/promises.h index c05f42e..3013ca4 100644 --- a/include/spinscale/co/promises.h +++ b/include/spinscale/co/promises.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -267,6 +268,22 @@ struct PostingPromise postBackStatus(*this) {} + /** Member coroutines pass the implicit object parameter before explicit + * (exceptionPtr, callback, ...) args. Discard the object and delegate to + * the free-function constructor shape. + */ + template + requires (!std::same_as, std::exception_ptr>) + PostingPromise( + ObjectArg &&, + std::exception_ptr &_callerExceptionPtr, + std::function _callerLambda, + TailArgs &&...) noexcept + : PostingPromise( + _callerExceptionPtr, + std::move(_callerLambda)) + {} + ~PostingPromise() noexcept { #ifdef CONFIG_LIBSSCL_DEBUG_CO @@ -345,6 +362,19 @@ struct TaggedPostingPromise std::forward(tailArgs)...) {} + template + requires (!std::same_as, std::exception_ptr>) + TaggedPostingPromise( + ObjectArg &&, + std::exception_ptr &_exceptionPtr, + std::function _callerLambda, + TailArgs &&... tailArgs) noexcept + : PostingPromise( + _exceptionPtr, + std::move(_callerLambda), + std::forward(tailArgs)...) + {} + auto initial_suspend() noexcept { #ifdef CONFIG_LIBSSCL_DEBUG_CO diff --git a/include/spinscale/componentThread.h b/include/spinscale/componentThread.h index 9ac8bea..05ba620 100644 --- a/include/spinscale/componentThread.h +++ b/include/spinscale/componentThread.h @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include @@ -14,9 +13,11 @@ #include #include #include -#include #include #include +#include +#include +#include namespace sscl { @@ -165,21 +166,40 @@ public: struct ViralThreadLifetimeMgmtInvoker { + struct AsyncState + { + std::atomic settled{false}; + std::coroutine_handle<> callerSchedHandle; + }; + ViralThreadLifetimeMgmtInvoker( ThreadOp _threadOp, PuppetThread &_parentThread, const std::shared_ptr &_selfPtr = nullptr) : threadOp(_threadOp), + asyncState(std::make_shared()), parentThread(_parentThread), selfPtr(_selfPtr), lifetimeMgmtCallback{ nullptr, - [this]() + [asyncState = asyncState]() { - settled = true; - if (callerSchedHandle) { - callerSchedHandle.resume(); + asyncState->settled.store(true, std::memory_order_release); + + std::coroutine_handle<> handle = + asyncState->callerSchedHandle; + + if (!handle) { + return; } + + /** Post resume to the puppeteer queue: direct resume() from + * within an asio completion handler can destroy adapter + * coroutine state while the handler is still unwinding. + */ + boost::asio::post( + ComponentThread::getPptr()->getIoService(), + [handle]() { handle.resume(); }); }} { if (threadOp == ThreadOp::JOLT && selfPtr == nullptr) @@ -212,26 +232,32 @@ public: } } - bool await_ready() const noexcept { return settled; } + bool await_ready() const noexcept + { + return asyncState->settled.load(std::memory_order_acquire); + } bool await_suspend( std::coroutine_handle<> _callerSchedHandle) noexcept { - if (settled) { return false; } - callerSchedHandle = _callerSchedHandle; + if (asyncState->settled.load(std::memory_order_acquire)) { + return false; + } + + asyncState->callerSchedHandle = _callerSchedHandle; return true; } void await_resume() noexcept {} ThreadOp threadOp; - bool settled = false; - std::coroutine_handle<> callerSchedHandle; + std::shared_ptr asyncState; PuppetThread &parentThread; const std::shared_ptr selfPtr; cps::Callback lifetimeMgmtCallback; }; + // Thread lifetime management request invokers ViralThreadLifetimeMgmtInvoker startThreadAReq() { return ViralThreadLifetimeMgmtInvoker(ThreadOp::START, *this); } ViralThreadLifetimeMgmtInvoker pauseThreadAReq() diff --git a/include/spinscale/puppetApplication.h b/include/spinscale/puppetApplication.h index ac09fad..7a45fae 100644 --- a/include/spinscale/puppetApplication.h +++ b/include/spinscale/puppetApplication.h @@ -5,7 +5,10 @@ #include #include #include +#include #include + +#include #include #include @@ -19,22 +22,30 @@ public: const std::vector> &threads); ~PuppetApplication() = default; - co::NonViralNonPostingInvoker joltAllPuppetThreadsCReq( + co::ViralNonPostingInvoker joltAllPuppetThreadsCReq( std::exception_ptr &exceptionPtr, std::function callback); - co::NonViralNonPostingInvoker startAllPuppetThreadsCReq( + co::ViralNonPostingInvoker startAllPuppetThreadsCReq( std::exception_ptr &exceptionPtr, std::function callback); - co::NonViralNonPostingInvoker pauseAllPuppetThreadsCReq( + co::ViralNonPostingInvoker pauseAllPuppetThreadsCReq( std::exception_ptr &exceptionPtr, std::function callback); - co::NonViralNonPostingInvoker resumeAllPuppetThreadsCReq( + co::ViralNonPostingInvoker resumeAllPuppetThreadsCReq( std::exception_ptr &exceptionPtr, std::function callback); - co::NonViralNonPostingInvoker exitAllPuppetThreadsCReq( + co::ViralNonPostingInvoker exitAllPuppetThreadsCReq( std::exception_ptr &exceptionPtr, std::function callback); // CPU distribution method void distributeAndPinThreadsAcrossCpus(); protected: - // Collection of PuppetThread instances + using PuppetLifetimeMgmtInvoker = + PuppetThread::ViralThreadLifetimeMgmtInvoker; + using PuppetLifetimeMgmtGroup = co::Group; + + void addAllPuppetLifetimeInvokersToGroup( + PuppetLifetimeMgmtGroup &group, + std::vector &invokers, + PuppetThread::ThreadOp threadOp) const; + std::vector> componentThreads; /** @@ -57,6 +68,13 @@ protected: * a synchronization point for the entire system initialization. */ bool threadsHaveBeenJolted = false; + +private: + co::ViralNonPostingInvoker allPuppetThreadsLifetimeOpCReq( + std::exception_ptr &exceptionPtr, + std::function callback, + PuppetThread::ThreadOp threadOp, + std::string_view emptyThreadsLogMessage); }; } // namespace sscl diff --git a/src/componentThread.cpp b/src/componentThread.cpp index cd897bc..a3ac71c 100644 --- a/src/componentThread.cpp +++ b/src/componentThread.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include diff --git a/src/puppetApplication.cpp b/src/puppetApplication.cpp index 6dac615..9c58640 100644 --- a/src/puppetApplication.cpp +++ b/src/puppetApplication.cpp @@ -8,7 +8,7 @@ namespace sscl { -namespace puppet_application_detail { +namespace { constexpr std::string_view noPuppetThreadsToStartLogMessage = "Mrntt: No puppet threads to start"; @@ -19,14 +19,18 @@ constexpr std::string_view noPuppetThreadsToResumeLogMessage = constexpr std::string_view noPuppetThreadsToExitLogMessage = "Mrntt: No puppet threads to exit"; -using PuppetLifetimeInvoker = PuppetThread::ViralThreadLifetimeMgmtInvoker; -using PuppetLifetimeGroup = co::Group; +} // namespace -void addAllPuppetLifetimeInvokersToGroup( - PuppetLifetimeGroup &group, - std::vector &invokers, - const std::vector> &componentThreads, - PuppetThread::ThreadOp threadOp) +PuppetApplication::PuppetApplication( + const std::vector> &threads) +: componentThreads(threads) +{ +} + +void PuppetApplication::addAllPuppetLifetimeInvokersToGroup( + PuppetLifetimeMgmtGroup &group, + std::vector &invokers, + PuppetThread::ThreadOp threadOp) const { invokers.reserve(componentThreads.size()); @@ -58,40 +62,8 @@ void addAllPuppetLifetimeInvokersToGroup( } } -co::NonViralNonPostingInvoker genericAllPuppetThreadsLifetimeOpCReq( - const std::vector> &componentThreads, - PuppetThread::ThreadOp threadOp, - std::string_view emptyThreadsLogMessage, - [[maybe_unused]] std::exception_ptr &exceptionPtr, - [[maybe_unused]] std::function callback) -{ - if (componentThreads.empty()) - { - std::cout << emptyThreadsLogMessage << "\n"; - co_return; - } - - PuppetLifetimeGroup group; - std::vector invokers; - - addAllPuppetLifetimeInvokersToGroup( - group, invokers, componentThreads, threadOp); - - co_await group.getAwaitAllSettlementsInvoker(); - group.checkForAndReThrowGroupExceptions(); - - co_return; -} - -} // namespace puppet_application_detail - -PuppetApplication::PuppetApplication( - const std::vector> &threads) -: componentThreads(threads) -{ -} - -co::NonViralNonPostingInvoker PuppetApplication::joltAllPuppetThreadsCReq( +co::ViralNonPostingInvoker +PuppetApplication::joltAllPuppetThreadsCReq( [[maybe_unused]] std::exception_ptr &exceptionPtr, [[maybe_unused]] std::function callback) { @@ -108,64 +80,94 @@ co::NonViralNonPostingInvoker PuppetApplication::joltAllPuppetThreadsCReq( co_return; } - puppet_application_detail::PuppetLifetimeGroup group; - std::vector invokers; + PuppetLifetimeMgmtGroup group; + std::vector invokers; - puppet_application_detail::addAllPuppetLifetimeInvokersToGroup( - group, invokers, componentThreads, PuppetThread::ThreadOp::JOLT); - - co_await group.getAwaitAllSettlementsInvoker(); + addAllPuppetLifetimeInvokersToGroup( + group, invokers, PuppetThread::ThreadOp::JOLT); + PuppetLifetimeMgmtGroup::AwaitAllSettlementsInvoker groupAwaitAll( + group); + co_await groupAwaitAll; group.checkForAndReThrowGroupExceptions(); threadsHaveBeenJolted = true; co_return; } -co::NonViralNonPostingInvoker PuppetApplication::startAllPuppetThreadsCReq( - std::exception_ptr &exceptionPtr, std::function callback) +co::ViralNonPostingInvoker +PuppetApplication::allPuppetThreadsLifetimeOpCReq( + [[maybe_unused]] std::exception_ptr &exceptionPtr, + [[maybe_unused]] std::function callback, + PuppetThread::ThreadOp threadOp, + std::string_view emptyThreadsLogMessage) { - return puppet_application_detail::genericAllPuppetThreadsLifetimeOpCReq( - componentThreads, PuppetThread::ThreadOp::START, - puppet_application_detail::noPuppetThreadsToStartLogMessage, - exceptionPtr, callback); + if (componentThreads.empty()) + { + std::cout << emptyThreadsLogMessage << "\n"; + co_return; + } + + PuppetLifetimeMgmtGroup group; + std::vector invokers; + + addAllPuppetLifetimeInvokersToGroup(group, invokers, threadOp); + PuppetLifetimeMgmtGroup::AwaitAllSettlementsInvoker groupAwaitAll( + group); + co_await groupAwaitAll; + group.checkForAndReThrowGroupExceptions(); + + co_return; } -co::NonViralNonPostingInvoker PuppetApplication::pauseAllPuppetThreadsCReq( +co::ViralNonPostingInvoker +PuppetApplication::startAllPuppetThreadsCReq( std::exception_ptr &exceptionPtr, std::function callback) { - return puppet_application_detail::genericAllPuppetThreadsLifetimeOpCReq( - componentThreads, PuppetThread::ThreadOp::PAUSE, - puppet_application_detail::noPuppetThreadsToPauseLogMessage, - exceptionPtr, callback); + return allPuppetThreadsLifetimeOpCReq( + exceptionPtr, std::move(callback), + PuppetThread::ThreadOp::START, + noPuppetThreadsToStartLogMessage); } -co::NonViralNonPostingInvoker PuppetApplication::resumeAllPuppetThreadsCReq( +co::ViralNonPostingInvoker +PuppetApplication::pauseAllPuppetThreadsCReq( std::exception_ptr &exceptionPtr, std::function callback) { - return puppet_application_detail::genericAllPuppetThreadsLifetimeOpCReq( - componentThreads, PuppetThread::ThreadOp::RESUME, - puppet_application_detail::noPuppetThreadsToResumeLogMessage, - exceptionPtr, callback); + return allPuppetThreadsLifetimeOpCReq( + exceptionPtr, std::move(callback), + PuppetThread::ThreadOp::PAUSE, + noPuppetThreadsToPauseLogMessage); } -co::NonViralNonPostingInvoker PuppetApplication::exitAllPuppetThreadsCReq( +co::ViralNonPostingInvoker +PuppetApplication::resumeAllPuppetThreadsCReq( + std::exception_ptr &exceptionPtr, std::function callback) +{ + return allPuppetThreadsLifetimeOpCReq( + exceptionPtr, std::move(callback), + PuppetThread::ThreadOp::RESUME, + noPuppetThreadsToResumeLogMessage); +} + +co::ViralNonPostingInvoker +PuppetApplication::exitAllPuppetThreadsCReq( [[maybe_unused]] std::exception_ptr &exceptionPtr, [[maybe_unused]] std::function callback) { if (componentThreads.empty()) { - std::cout << puppet_application_detail::noPuppetThreadsToExitLogMessage - << "\n"; + std::cout << noPuppetThreadsToExitLogMessage << "\n"; co_return; } - puppet_application_detail::PuppetLifetimeGroup group; - std::vector invokers; + PuppetLifetimeMgmtGroup group; + std::vector invokers; - puppet_application_detail::addAllPuppetLifetimeInvokersToGroup( - group, invokers, componentThreads, PuppetThread::ThreadOp::EXIT); - - co_await group.getAwaitAllSettlementsInvoker(); + addAllPuppetLifetimeInvokersToGroup( + group, invokers, PuppetThread::ThreadOp::EXIT); + PuppetLifetimeMgmtGroup::AwaitAllSettlementsInvoker groupAwaitAll( + group); + co_await groupAwaitAll; group.checkForAndReThrowGroupExceptions(); for (auto &thread : componentThreads) { @@ -179,7 +181,6 @@ void PuppetApplication::distributeAndPinThreadsAcrossCpus() { int cpuCount = ComponentThread::getAvailableCpuCount(); - // Distribute and pin threads across CPUs int threadIndex = 0; for (auto& thread : componentThreads) {