mirror of
https://github.com/latentPrion/libspinscale.git
synced 2026-06-23 19:48:32 +00:00
Compare commits
2 Commits
abdb857e55
...
e29bee52cf
| Author | SHA1 | Date | |
|---|---|---|---|
| e29bee52cf | |||
| daad2a8c95 |
@@ -1,6 +1,7 @@
|
|||||||
#ifndef GROUP_H
|
#ifndef GROUP_H
|
||||||
#define GROUP_H
|
#define GROUP_H
|
||||||
|
|
||||||
|
#include <any>
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <coroutine>
|
#include <coroutine>
|
||||||
#include <cstddef>
|
#include <cstddef>
|
||||||
@@ -60,6 +61,19 @@ concept AwaitableIface = requires(T &t) {
|
|||||||
{ get_operator_co_await(t) };
|
{ get_operator_co_await(t) };
|
||||||
} && AwaiterIface<decltype(get_operator_co_await(std::declval<T &>()))>;
|
} && AwaiterIface<decltype(get_operator_co_await(std::declval<T &>()))>;
|
||||||
|
|
||||||
|
template<AwaiterIface T>
|
||||||
|
T &asAwaiter(T &t) noexcept
|
||||||
|
{
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
|
||||||
|
template<AwaitableIface T>
|
||||||
|
auto asAwaiter(T &t) noexcept(noexcept(get_operator_co_await(t)))
|
||||||
|
-> decltype(get_operator_co_await(t))
|
||||||
|
{
|
||||||
|
return get_operator_co_await(t);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace detail
|
} // namespace detail
|
||||||
|
|
||||||
template <typename T>
|
template <typename T>
|
||||||
@@ -71,8 +85,27 @@ concept AwaiterIface = detail::AwaiterIface<T>;
|
|||||||
template <typename T>
|
template <typename T>
|
||||||
concept AwaitableOrAwaiterIface = AwaiterIface<T> || AwaitableIface<T>;
|
concept AwaitableOrAwaiterIface = AwaiterIface<T> || AwaitableIface<T>;
|
||||||
|
|
||||||
template <typename Invoker>
|
/** Typical usage — parallel members, then gather:
|
||||||
requires AwaitableOrAwaiterIface<Invoker>
|
*
|
||||||
|
* co::Group group;
|
||||||
|
*
|
||||||
|
* auto bodyInit = body.initializeCReq(exceptionPtr, noopCallback);
|
||||||
|
* auto legInit = leg.initializeCReq(exceptionPtr, noopCallback);
|
||||||
|
* ViralNonPostingInvoker<void> batch = app.joltAllPuppetThreadsCReq(...);
|
||||||
|
*
|
||||||
|
* group.add(bodyInit);
|
||||||
|
* group.add(legInit);
|
||||||
|
* group.add(batch);
|
||||||
|
*
|
||||||
|
* co_await group.getAwaitAllSettlementsInvoker();
|
||||||
|
* group.checkForAndReThrowGroupExceptions();
|
||||||
|
*
|
||||||
|
* (void)bodyInit.completedReturnValues();
|
||||||
|
*
|
||||||
|
* // When walking settlement slots by index:
|
||||||
|
* settlements[i].invokerAs<BodyViralPostingInvoker<void>>()
|
||||||
|
* .completedReturnValues();
|
||||||
|
*/
|
||||||
struct Group
|
struct Group
|
||||||
{
|
{
|
||||||
enum class AwaitingCondition {
|
enum class AwaitingCondition {
|
||||||
@@ -91,9 +124,23 @@ struct Group
|
|||||||
UNSETTLED, COMPLETED, EXCEPTION_THROWN
|
UNSETTLED, COMPLETED, EXCEPTION_THROWN
|
||||||
};
|
};
|
||||||
|
|
||||||
SettlementDescriptor(Invoker &_invoker)
|
template<typename Member>
|
||||||
: invoker(std::ref(_invoker))
|
void bindMemberRef(Member &member)
|
||||||
{}
|
{
|
||||||
|
memberInvokerRef = std::ref(member);
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename Member>
|
||||||
|
Member &invokerAs() const
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
return std::any_cast<std::reference_wrapper<Member>>(
|
||||||
|
memberInvokerRef).get();
|
||||||
|
} catch (const std::bad_any_cast &) {
|
||||||
|
throw std::runtime_error(
|
||||||
|
"Group settlement invoker type mismatch");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void setSettlementStatus() noexcept
|
void setSettlementStatus() noexcept
|
||||||
{
|
{
|
||||||
@@ -109,7 +156,7 @@ struct Group
|
|||||||
TypeE type = TypeE::UNSETTLED;
|
TypeE type = TypeE::UNSETTLED;
|
||||||
std::exception_ptr calleeException = nullptr;
|
std::exception_ptr calleeException = nullptr;
|
||||||
std::exception_ptr adapterException = nullptr;
|
std::exception_ptr adapterException = nullptr;
|
||||||
std::reference_wrapper<Invoker> invoker;
|
std::any memberInvokerRef;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SettlementAwaitingInvoker;
|
struct SettlementAwaitingInvoker;
|
||||||
@@ -439,7 +486,7 @@ struct Group
|
|||||||
* would be impossible.
|
* would be impossible.
|
||||||
*
|
*
|
||||||
* So we should be able to call resume() directly here without
|
* So we should be able to call resume() directly here without
|
||||||
* post()ing to current_io_context().
|
* post()ing to ComponentThread::getSelf()->getIoService().
|
||||||
*
|
*
|
||||||
* EXPLANATION:
|
* EXPLANATION:
|
||||||
* However, in order to ensure that we keep this adapter coro
|
* However, in order to ensure that we keep this adapter coro
|
||||||
@@ -466,28 +513,17 @@ struct Group
|
|||||||
* target async fn, and also to convey its results back to the Group class.
|
* target async fn, and also to convey its results back to the Group class.
|
||||||
* It's effectively a go-between coro that provides the outcomes that Invokers
|
* It's effectively a go-between coro that provides the outcomes that Invokers
|
||||||
* normally provide, without needing, itself, to be co_awaited.
|
* normally provide, without needing, itself, to be co_awaited.
|
||||||
|
*
|
||||||
|
* settlementIndex is captured by value (not a vector iterator) so adapter
|
||||||
|
* coros remain valid if settlements reallocate during concurrent add().
|
||||||
*/
|
*/
|
||||||
NonAwaitableNonPostingAdapterCoro nonAwaitableAdapterCoro(
|
template<AwaitableOrAwaiterIface Member>
|
||||||
|
NonAwaitableNonPostingAdapterCoro memberAdapterCoro(
|
||||||
|
Member &memberInvoker,
|
||||||
std::size_t settlementIndex) noexcept
|
std::size_t settlementIndex) noexcept
|
||||||
{
|
{
|
||||||
/** EXPLANATION:
|
|
||||||
* It's very convenient that our design for the NonViralPostingInvoker
|
|
||||||
* coincidentally allows us to supply a lambda that can be used to test
|
|
||||||
* for the settlement conditions that are being waited on by the Group's
|
|
||||||
* co_awaiter.
|
|
||||||
*
|
|
||||||
* settlementIndex is captured by value (not a vector iterator) so adapter
|
|
||||||
* coros remain valid if settlements reallocate during concurrent add().
|
|
||||||
*/
|
|
||||||
try {
|
try {
|
||||||
/* Return values remain in the callee promise until the caller-owned
|
co_await detail::asAwaiter(memberInvoker);
|
||||||
* invoker is destroyed (~PostingInvoker). The group co_awaiter reads
|
|
||||||
* results via settlements[settlementIndex].invoker after awaiting.
|
|
||||||
*
|
|
||||||
* Index settlements[] each time; do not cache a reference across
|
|
||||||
* co_await because concurrent add() may reallocate the vector.
|
|
||||||
*/
|
|
||||||
co_await s.rsrc.settlements[settlementIndex].invoker.get();
|
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
@@ -505,12 +541,8 @@ struct Group
|
|||||||
co_return;
|
co_return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** EXPLANATION:
|
template<AwaitableOrAwaiterIface Member>
|
||||||
* Each invoker passed to add() must outlive this Group and the callee frame
|
void add(Member &memberInvoker)
|
||||||
* (see ~PostingInvoker). The group co_awaiter reads return values from those
|
|
||||||
* invokers after awaiting; do not destroy an invoker until reads are done.
|
|
||||||
*/
|
|
||||||
void add(Invoker &invoker)
|
|
||||||
{
|
{
|
||||||
std::size_t settlementIndex = 0;
|
std::size_t settlementIndex = 0;
|
||||||
|
|
||||||
@@ -525,10 +557,11 @@ struct Group
|
|||||||
}
|
}
|
||||||
|
|
||||||
settlementIndex = s.rsrc.settlements.size();
|
settlementIndex = s.rsrc.settlements.size();
|
||||||
s.rsrc.settlements.emplace_back(invoker);
|
s.rsrc.settlements.emplace_back();
|
||||||
|
s.rsrc.settlements[settlementIndex].bindMemberRef(memberInvoker);
|
||||||
}
|
}
|
||||||
|
|
||||||
nonAwaitableAdapterCoro(settlementIndex);
|
memberAdapterCoro(memberInvoker, settlementIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
void checkForAndReThrowGroupExceptions() const
|
void checkForAndReThrowGroupExceptions() const
|
||||||
|
|||||||
@@ -47,7 +47,10 @@ struct NonViralPostingInvoker
|
|||||||
*/
|
*/
|
||||||
std::ostringstream oss;
|
std::ostringstream oss;
|
||||||
oss << std::this_thread::get_id()
|
oss << std::this_thread::get_id()
|
||||||
<< ": Missing completion lambda: non-viral coroutines require a completion lambda.";
|
<< ": Missing completion lambda: non-viral coroutines require a completion lambda."
|
||||||
|
<< " Promise type=" << typeid(*this).name()
|
||||||
|
<< ". This usually means promise construction did not bind the"
|
||||||
|
<< " (exception_ptr&, function<void()>, ...) constructor.";
|
||||||
throw std::runtime_error(oss.str());
|
throw std::runtime_error(oss.str());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -153,12 +156,12 @@ struct ViralPostingInvoker
|
|||||||
* from get_return_object(). ~NonPostingInvoker destroys the callee frame.
|
* from get_return_object(). ~NonPostingInvoker destroys the callee frame.
|
||||||
*/
|
*/
|
||||||
struct NonViralNonPostingInvoker
|
struct NonViralNonPostingInvoker
|
||||||
: public NonPostingInvoker<NonPostingPromise>
|
: public NonPostingInvoker<NonPostingPromise<void>, void>
|
||||||
{
|
{
|
||||||
struct promise_type
|
struct promise_type
|
||||||
: public NonPostingPromise
|
: public NonPostingPromise<void>
|
||||||
{
|
{
|
||||||
using NonPostingPromise::NonPostingPromise;
|
using NonPostingPromise<void>::NonPostingPromise;
|
||||||
|
|
||||||
NonViralNonPostingInvoker get_return_object()
|
NonViralNonPostingInvoker get_return_object()
|
||||||
{
|
{
|
||||||
@@ -169,7 +172,10 @@ struct NonViralNonPostingInvoker
|
|||||||
{
|
{
|
||||||
std::ostringstream oss;
|
std::ostringstream oss;
|
||||||
oss << std::this_thread::get_id()
|
oss << std::this_thread::get_id()
|
||||||
<< ": Missing completion lambda: non-viral coroutines require a completion lambda.";
|
<< ": Missing completion lambda: non-viral coroutines require a completion lambda."
|
||||||
|
<< " Promise type=" << typeid(*this).name()
|
||||||
|
<< ". This usually means promise construction did not bind the"
|
||||||
|
<< " (exception_ptr&, function<void()>, ...) constructor.";
|
||||||
throw std::runtime_error(oss.str());
|
throw std::runtime_error(oss.str());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -180,7 +186,7 @@ struct NonViralNonPostingInvoker
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
using NonPostingInvoker<NonPostingPromise>::NonPostingInvoker;
|
using NonPostingInvoker<NonPostingPromise<void>, void>::NonPostingInvoker;
|
||||||
|
|
||||||
bool await_ready() const noexcept
|
bool await_ready() const noexcept
|
||||||
{ std::terminate(); }
|
{ std::terminate(); }
|
||||||
@@ -192,6 +198,66 @@ struct NonViralNonPostingInvoker
|
|||||||
{ std::terminate(); }
|
{ std::terminate(); }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/** Viral awaitable non-posting coroutine: runs eagerly on the caller thread
|
||||||
|
* (initial_suspend is never). Caller resume uses symmetric transfer when the
|
||||||
|
* caller has registered before callee completion; otherwise PostBackStatus
|
||||||
|
* fast-paths await_resume on co_await.
|
||||||
|
*/
|
||||||
|
template <typename T = void>
|
||||||
|
struct ViralNonPostingInvoker
|
||||||
|
: public NonPostingInvoker<NonPostingPromise<T>, T>
|
||||||
|
{
|
||||||
|
struct promise_type
|
||||||
|
: public NonPostingPromise<T>
|
||||||
|
{
|
||||||
|
using NonPostingPromise<T>::NonPostingPromise;
|
||||||
|
|
||||||
|
ViralNonPostingInvoker<T> get_return_object() noexcept
|
||||||
|
{
|
||||||
|
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||||
|
std::cout << __func__ << ": " << std::this_thread::get_id()
|
||||||
|
<< " Returning ViralNonPostingInvoker.\n";
|
||||||
|
#endif
|
||||||
|
this->setSelfSchedHandle(
|
||||||
|
std::coroutine_handle<promise_type>::from_promise(*this));
|
||||||
|
|
||||||
|
return ViralNonPostingInvoker<T>(*this);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
using NonPostingInvoker<NonPostingPromise<T>, T>::NonPostingInvoker;
|
||||||
|
|
||||||
|
bool await_ready() const noexcept
|
||||||
|
{ return false; }
|
||||||
|
|
||||||
|
template <typename CallerPromise>
|
||||||
|
bool await_suspend(
|
||||||
|
std::coroutine_handle<CallerPromise> callerSchedHandle) noexcept
|
||||||
|
{
|
||||||
|
static_assert(
|
||||||
|
std::is_base_of_v<PromiseChainLink, CallerPromise>,
|
||||||
|
"ViralNonPostingInvoker caller promise must derive from "
|
||||||
|
"PromiseChainLink");
|
||||||
|
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||||
|
std::cout << __func__ << ": " << std::this_thread::get_id()
|
||||||
|
<< " Setting callerSchedHandle.\n";
|
||||||
|
#endif
|
||||||
|
const bool suspendCaller =
|
||||||
|
this->setCallerSchedHandle(callerSchedHandle);
|
||||||
|
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||||
|
std::cout << __func__ << ": " << std::this_thread::get_id()
|
||||||
|
<< " CallerFlowExecutor returned suspend=" << suspendCaller
|
||||||
|
<< ".\n";
|
||||||
|
#endif
|
||||||
|
return suspendCaller;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto await_resume()
|
||||||
|
{
|
||||||
|
return NonPostingInvoker<NonPostingPromise<T>, T>::await_resume();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
} // namespace sscl::co
|
} // namespace sscl::co
|
||||||
|
|
||||||
#endif // INVOKERS_H
|
#endif // INVOKERS_H
|
||||||
|
|||||||
@@ -5,13 +5,14 @@
|
|||||||
#include <coroutine>
|
#include <coroutine>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
#include <type_traits>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
#include <spinscale/co/nonPostingPromise.h>
|
#include <spinscale/co/nonPostingPromise.h>
|
||||||
|
|
||||||
namespace sscl::co {
|
namespace sscl::co {
|
||||||
|
|
||||||
template <typename PromiseType>
|
template <typename PromiseType, typename T>
|
||||||
class NonPostingInvoker
|
class NonPostingInvoker
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@@ -39,15 +40,55 @@ public:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ReturnValues<void> &completedReturnValues() noexcept
|
template <typename CallerPromise>
|
||||||
|
bool setCallerSchedHandle(
|
||||||
|
std::coroutine_handle<CallerPromise> callerSchedHandle) noexcept
|
||||||
|
{
|
||||||
|
static_assert(
|
||||||
|
std::is_base_of_v<PromiseChainLink, CallerPromise>,
|
||||||
|
"NonPostingInvoker caller promise must derive from PromiseChainLink");
|
||||||
|
|
||||||
|
calleePromise.callerSchedHandle = callerSchedHandle;
|
||||||
|
calleePromise.setCallerPromiseChainLink(
|
||||||
|
&callerSchedHandle.promise());
|
||||||
|
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||||
|
std::cout << __func__ << ": " << std::this_thread::get_id()
|
||||||
|
<< " Done setting callerSchedHandle; running CallerFlowExecutor.\n";
|
||||||
|
#endif
|
||||||
|
return calleePromise.postBackStatus.getCallerFlowExecutor()();
|
||||||
|
}
|
||||||
|
|
||||||
|
ReturnValues<T> &completedReturnValues() noexcept
|
||||||
{ return calleePromise.returnValues; }
|
{ return calleePromise.returnValues; }
|
||||||
|
|
||||||
const ReturnValues<void> &completedReturnValues() const noexcept
|
const ReturnValues<T> &completedReturnValues() const noexcept
|
||||||
{ return calleePromise.returnValues; }
|
{ return calleePromise.returnValues; }
|
||||||
|
|
||||||
private:
|
auto await_resume()
|
||||||
|
{
|
||||||
|
calleePromise.postBackStatus.reset();
|
||||||
|
|
||||||
|
ReturnValues<T> &returnValues = calleePromise.returnValues;
|
||||||
|
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||||
|
std::cout << __func__ << ": " << std::this_thread::get_id()
|
||||||
|
<< " About to check for and rethrow any exception.\n";
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if (returnValues.myExceptionPtr) {
|
||||||
|
std::exception_ptr const captured = returnValues.myExceptionPtr;
|
||||||
|
std::rethrow_exception(captured);
|
||||||
|
}
|
||||||
|
if constexpr (!std::is_void_v<T>)
|
||||||
|
{
|
||||||
|
T result = std::move(returnValues.myReturnValue);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
PromiseType &calleePromise;
|
PromiseType &calleePromise;
|
||||||
|
|
||||||
|
private:
|
||||||
/** Every live invoker owns destruction of its callee coroutine frame in
|
/** Every live invoker owns destruction of its callee coroutine frame in
|
||||||
* ~NonPostingInvoker (via calleePromise.selfSchedHandle).
|
* ~NonPostingInvoker (via calleePromise.selfSchedHandle).
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -9,15 +9,99 @@
|
|||||||
#include <thread>
|
#include <thread>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
|
#include <spinscale/spinLock.h>
|
||||||
#include <spinscale/co/coQutex.h>
|
#include <spinscale/co/coQutex.h>
|
||||||
#include <spinscale/co/promiseChainLink.h>
|
#include <spinscale/co/promiseChainLink.h>
|
||||||
#include <spinscale/co/promises.h>
|
#include <spinscale/co/promises.h>
|
||||||
|
|
||||||
namespace sscl::co {
|
namespace sscl::co {
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
struct NonPostingPromise
|
struct NonPostingPromise
|
||||||
: public PromiseChainLink
|
: public PromiseChainLink,
|
||||||
|
public PostingPromiseReturnOps<NonPostingPromise<T>, T>
|
||||||
{
|
{
|
||||||
|
struct PostBackStatus
|
||||||
|
{
|
||||||
|
struct CalleeFlowExecutor;
|
||||||
|
struct CallerFlowExecutor;
|
||||||
|
friend struct CalleeFlowExecutor;
|
||||||
|
friend struct CallerFlowExecutor;
|
||||||
|
|
||||||
|
explicit PostBackStatus(NonPostingPromise &calleePromiseIn) noexcept
|
||||||
|
: calleePromise(calleePromiseIn)
|
||||||
|
{}
|
||||||
|
|
||||||
|
void reset() noexcept
|
||||||
|
{
|
||||||
|
sscl::SpinLock::Guard guard(lock);
|
||||||
|
callerHasSetCallerSchedHandle = false;
|
||||||
|
calleeIsReadyToPostBack = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct FlowExecutor
|
||||||
|
{
|
||||||
|
explicit FlowExecutor(PostBackStatus &parentIn) noexcept
|
||||||
|
: parent(parentIn)
|
||||||
|
{}
|
||||||
|
|
||||||
|
PostBackStatus &parent;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct CalleeFlowExecutor
|
||||||
|
: public FlowExecutor
|
||||||
|
{
|
||||||
|
explicit CalleeFlowExecutor(PostBackStatus &parentIn) noexcept
|
||||||
|
: FlowExecutor(parentIn)
|
||||||
|
{}
|
||||||
|
|
||||||
|
bool operator()() noexcept
|
||||||
|
{
|
||||||
|
sscl::SpinLock::Guard guard(this->parent.lock);
|
||||||
|
this->parent.calleeIsReadyToPostBack = true;
|
||||||
|
if (this->parent.callerHasSetCallerSchedHandle) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct CallerFlowExecutor
|
||||||
|
: public FlowExecutor
|
||||||
|
{
|
||||||
|
explicit CallerFlowExecutor(PostBackStatus &parentIn) noexcept
|
||||||
|
: FlowExecutor(parentIn)
|
||||||
|
{}
|
||||||
|
|
||||||
|
bool operator()() noexcept
|
||||||
|
{
|
||||||
|
sscl::SpinLock::Guard guard(this->parent.lock);
|
||||||
|
this->parent.callerHasSetCallerSchedHandle = true;
|
||||||
|
if (this->parent.calleeIsReadyToPostBack) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
CalleeFlowExecutor getCalleeFlowExecutor() noexcept
|
||||||
|
{
|
||||||
|
return CalleeFlowExecutor(*this);
|
||||||
|
}
|
||||||
|
|
||||||
|
CallerFlowExecutor getCallerFlowExecutor() noexcept
|
||||||
|
{
|
||||||
|
return CallerFlowExecutor(*this);
|
||||||
|
}
|
||||||
|
|
||||||
|
NonPostingPromise &calleePromise;
|
||||||
|
|
||||||
|
private:
|
||||||
|
sscl::SpinLock lock;
|
||||||
|
bool callerHasSetCallerSchedHandle = false;
|
||||||
|
bool calleeIsReadyToPostBack = false;
|
||||||
|
};
|
||||||
|
|
||||||
/** Completion work must run from this awaiter's await_suspend, not
|
/** Completion work must run from this awaiter's await_suspend, not
|
||||||
* synchronously inside promise.final_suspend() before it returns: the
|
* synchronously inside promise.final_suspend() before it returns: the
|
||||||
* hidden coroutine segment index in the coroutine state is only advanced
|
* hidden coroutine segment index in the coroutine state is only advanced
|
||||||
@@ -26,16 +110,19 @@ struct NonPostingPromise
|
|||||||
struct FinalSuspendNonPostingInvoker
|
struct FinalSuspendNonPostingInvoker
|
||||||
: public std::suspend_always
|
: public std::suspend_always
|
||||||
{
|
{
|
||||||
explicit FinalSuspendNonPostingInvoker(NonPostingPromise &calleePromiseIn) noexcept
|
explicit FinalSuspendNonPostingInvoker(
|
||||||
: calleePromise(calleePromiseIn)
|
NonPostingPromise &calleePromiseIn) noexcept
|
||||||
|
: calleePromise(calleePromiseIn)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
bool await_suspend(std::coroutine_handle<> const) noexcept
|
std::coroutine_handle<> await_suspend(
|
||||||
|
std::coroutine_handle<> const) noexcept
|
||||||
{
|
{
|
||||||
if (calleePromise.callerLambda)
|
if (calleePromise.callerLambda)
|
||||||
{
|
{
|
||||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||||
std::cout << "final_suspend" << ": " << std::this_thread::get_id()
|
std::cout << "final_suspend" << ": "
|
||||||
|
<< std::this_thread::get_id()
|
||||||
<< " Non-viral non-posting: invoking callerLambda directly.\n";
|
<< " Non-viral non-posting: invoking callerLambda directly.\n";
|
||||||
#endif
|
#endif
|
||||||
if (calleePromise.returnValues.myExceptionPtr) {
|
if (calleePromise.returnValues.myExceptionPtr) {
|
||||||
@@ -44,15 +131,29 @@ struct NonPostingPromise
|
|||||||
}
|
}
|
||||||
|
|
||||||
calleePromise.callerLambda();
|
calleePromise.callerLambda();
|
||||||
|
return std::noop_coroutine();
|
||||||
}
|
}
|
||||||
return true;
|
|
||||||
|
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||||
|
std::cout << "final_suspend" << ": " << std::this_thread::get_id()
|
||||||
|
<< " Viral non-posting: running CalleeFlowExecutor.\n";
|
||||||
|
#endif
|
||||||
|
const bool symmetricTransferToCaller =
|
||||||
|
calleePromise.postBackStatus.getCalleeFlowExecutor()();
|
||||||
|
|
||||||
|
if (symmetricTransferToCaller && calleePromise.callerSchedHandle) {
|
||||||
|
return calleePromise.callerSchedHandle;
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::noop_coroutine();
|
||||||
}
|
}
|
||||||
|
|
||||||
NonPostingPromise &calleePromise;
|
NonPostingPromise &calleePromise;
|
||||||
};
|
};
|
||||||
|
|
||||||
NonPostingPromise() noexcept
|
NonPostingPromise() noexcept
|
||||||
: returnValues()
|
: returnValues(),
|
||||||
|
postBackStatus(*this)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
template <typename... TailArgs>
|
template <typename... TailArgs>
|
||||||
@@ -61,13 +162,27 @@ struct NonPostingPromise
|
|||||||
std::function<void()> callerLambdaIn,
|
std::function<void()> callerLambdaIn,
|
||||||
TailArgs &&...) noexcept
|
TailArgs &&...) noexcept
|
||||||
: returnValues(callerExceptionPtr),
|
: returnValues(callerExceptionPtr),
|
||||||
callerLambda(std::move(callerLambdaIn))
|
callerLambda(std::move(callerLambdaIn)),
|
||||||
|
postBackStatus(*this)
|
||||||
|
{}
|
||||||
|
|
||||||
|
template <typename ObjectArg, typename... TailArgs>
|
||||||
|
requires (!std::same_as<std::remove_cvref_t<ObjectArg>, std::exception_ptr>)
|
||||||
|
NonPostingPromise(
|
||||||
|
ObjectArg &&,
|
||||||
|
std::exception_ptr &callerExceptionPtr,
|
||||||
|
std::function<void()> callerLambdaIn,
|
||||||
|
TailArgs &&...) noexcept
|
||||||
|
: NonPostingPromise(
|
||||||
|
callerExceptionPtr,
|
||||||
|
std::move(callerLambdaIn))
|
||||||
{}
|
{}
|
||||||
|
|
||||||
~NonPostingPromise() noexcept
|
~NonPostingPromise() noexcept
|
||||||
{
|
{
|
||||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||||
std::cout << __func__ << ": " << std::this_thread::get_id() << " Destructing.\n";
|
std::cout << __func__ << ": " << std::this_thread::get_id()
|
||||||
|
<< " Destructing.\n";
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -77,9 +192,6 @@ struct NonPostingPromise
|
|||||||
auto final_suspend() noexcept
|
auto final_suspend() noexcept
|
||||||
{ return FinalSuspendNonPostingInvoker(*this); }
|
{ return FinalSuspendNonPostingInvoker(*this); }
|
||||||
|
|
||||||
void return_void() noexcept
|
|
||||||
{ return; }
|
|
||||||
|
|
||||||
void unhandled_exception() noexcept
|
void unhandled_exception() noexcept
|
||||||
{
|
{
|
||||||
returnValues.myExceptionPtr = std::current_exception();
|
returnValues.myExceptionPtr = std::current_exception();
|
||||||
@@ -90,16 +202,26 @@ struct NonPostingPromise
|
|||||||
eraseFirstMatchingAcquiredLock(coQutex);
|
eraseFirstMatchingAcquiredLock(coQutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const PromiseChainLink *callerPromiseChainLink() const noexcept override
|
||||||
|
{ return callerChainLink; }
|
||||||
|
|
||||||
|
PromiseChainLink *callerPromiseChainLink() noexcept override
|
||||||
|
{ return callerChainLink; }
|
||||||
|
|
||||||
void setSelfSchedHandle(std::coroutine_handle<> schedHandle) noexcept
|
void setSelfSchedHandle(std::coroutine_handle<> schedHandle) noexcept
|
||||||
{
|
{ selfSchedHandle = schedHandle; }
|
||||||
selfSchedHandle = schedHandle;
|
|
||||||
}
|
|
||||||
|
|
||||||
ReturnValues<void> returnValues;
|
void setCallerPromiseChainLink(PromiseChainLink *chainLink) noexcept
|
||||||
|
{ callerChainLink = chainLink; }
|
||||||
|
|
||||||
|
ReturnValues<T> returnValues;
|
||||||
std::function<void()> callerLambda;
|
std::function<void()> callerLambda;
|
||||||
|
PostBackStatus postBackStatus;
|
||||||
std::coroutine_handle<> selfSchedHandle;
|
std::coroutine_handle<> selfSchedHandle;
|
||||||
|
std::coroutine_handle<> callerSchedHandle;
|
||||||
|
PromiseChainLink *callerChainLink = nullptr;
|
||||||
|
|
||||||
template <typename>
|
template <typename, typename>
|
||||||
friend class NonPostingInvoker;
|
friend class NonPostingInvoker;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,7 @@
|
|||||||
#include <exception>
|
#include <exception>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
#include <typeinfo>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <type_traits>
|
#include <type_traits>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
@@ -267,6 +268,22 @@ struct PostingPromise
|
|||||||
postBackStatus(*this)
|
postBackStatus(*this)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
|
/** Member coroutines pass the implicit object parameter before explicit
|
||||||
|
* (exceptionPtr, callback, ...) args. Discard the object and delegate to
|
||||||
|
* the free-function constructor shape.
|
||||||
|
*/
|
||||||
|
template <typename ObjectArg, typename... TailArgs>
|
||||||
|
requires (!std::same_as<std::remove_cvref_t<ObjectArg>, std::exception_ptr>)
|
||||||
|
PostingPromise(
|
||||||
|
ObjectArg &&,
|
||||||
|
std::exception_ptr &_callerExceptionPtr,
|
||||||
|
std::function<void()> _callerLambda,
|
||||||
|
TailArgs &&...) noexcept
|
||||||
|
: PostingPromise(
|
||||||
|
_callerExceptionPtr,
|
||||||
|
std::move(_callerLambda))
|
||||||
|
{}
|
||||||
|
|
||||||
~PostingPromise() noexcept
|
~PostingPromise() noexcept
|
||||||
{
|
{
|
||||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||||
@@ -345,6 +362,19 @@ struct TaggedPostingPromise
|
|||||||
std::forward<TailArgs>(tailArgs)...)
|
std::forward<TailArgs>(tailArgs)...)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
|
template <typename ObjectArg, typename... TailArgs>
|
||||||
|
requires (!std::same_as<std::remove_cvref_t<ObjectArg>, std::exception_ptr>)
|
||||||
|
TaggedPostingPromise(
|
||||||
|
ObjectArg &&,
|
||||||
|
std::exception_ptr &_exceptionPtr,
|
||||||
|
std::function<void()> _callerLambda,
|
||||||
|
TailArgs &&... tailArgs) noexcept
|
||||||
|
: PostingPromise<T>(
|
||||||
|
_exceptionPtr,
|
||||||
|
std::move(_callerLambda),
|
||||||
|
std::forward<TailArgs>(tailArgs)...)
|
||||||
|
{}
|
||||||
|
|
||||||
auto initial_suspend() noexcept
|
auto initial_suspend() noexcept
|
||||||
{
|
{
|
||||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||||
|
|||||||
@@ -5,7 +5,6 @@
|
|||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <boost/asio/io_service.hpp>
|
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
#include <queue>
|
#include <queue>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
@@ -14,9 +13,11 @@
|
|||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <coroutine>
|
#include <coroutine>
|
||||||
#include <spinscale/cps/callback.h>
|
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <boost/asio/io_service.hpp>
|
||||||
|
#include <boost/asio/post.hpp>
|
||||||
|
#include <spinscale/cps/callback.h>
|
||||||
|
|
||||||
namespace sscl {
|
namespace sscl {
|
||||||
|
|
||||||
@@ -165,21 +166,40 @@ public:
|
|||||||
|
|
||||||
struct ViralThreadLifetimeMgmtInvoker
|
struct ViralThreadLifetimeMgmtInvoker
|
||||||
{
|
{
|
||||||
|
struct AsyncState
|
||||||
|
{
|
||||||
|
std::atomic<bool> settled{false};
|
||||||
|
std::coroutine_handle<> callerSchedHandle;
|
||||||
|
};
|
||||||
|
|
||||||
ViralThreadLifetimeMgmtInvoker(
|
ViralThreadLifetimeMgmtInvoker(
|
||||||
ThreadOp _threadOp,
|
ThreadOp _threadOp,
|
||||||
PuppetThread &_parentThread,
|
PuppetThread &_parentThread,
|
||||||
const std::shared_ptr<PuppetThread> &_selfPtr = nullptr)
|
const std::shared_ptr<PuppetThread> &_selfPtr = nullptr)
|
||||||
: threadOp(_threadOp),
|
: threadOp(_threadOp),
|
||||||
|
asyncState(std::make_shared<AsyncState>()),
|
||||||
parentThread(_parentThread),
|
parentThread(_parentThread),
|
||||||
selfPtr(_selfPtr),
|
selfPtr(_selfPtr),
|
||||||
lifetimeMgmtCallback{
|
lifetimeMgmtCallback{
|
||||||
nullptr,
|
nullptr,
|
||||||
[this]()
|
[asyncState = asyncState]()
|
||||||
{
|
{
|
||||||
settled = true;
|
asyncState->settled.store(true, std::memory_order_release);
|
||||||
if (callerSchedHandle) {
|
|
||||||
callerSchedHandle.resume();
|
std::coroutine_handle<> handle =
|
||||||
|
asyncState->callerSchedHandle;
|
||||||
|
|
||||||
|
if (!handle) {
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Post resume to the puppeteer queue: direct resume() from
|
||||||
|
* within an asio completion handler can destroy adapter
|
||||||
|
* coroutine state while the handler is still unwinding.
|
||||||
|
*/
|
||||||
|
boost::asio::post(
|
||||||
|
ComponentThread::getPptr()->getIoService(),
|
||||||
|
[handle]() { handle.resume(); });
|
||||||
}}
|
}}
|
||||||
{
|
{
|
||||||
if (threadOp == ThreadOp::JOLT && selfPtr == nullptr)
|
if (threadOp == ThreadOp::JOLT && selfPtr == nullptr)
|
||||||
@@ -212,26 +232,32 @@ public:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool await_ready() const noexcept { return settled; }
|
bool await_ready() const noexcept
|
||||||
|
{
|
||||||
|
return asyncState->settled.load(std::memory_order_acquire);
|
||||||
|
}
|
||||||
|
|
||||||
bool await_suspend(
|
bool await_suspend(
|
||||||
std::coroutine_handle<> _callerSchedHandle) noexcept
|
std::coroutine_handle<> _callerSchedHandle) noexcept
|
||||||
{
|
{
|
||||||
if (settled) { return false; }
|
if (asyncState->settled.load(std::memory_order_acquire)) {
|
||||||
callerSchedHandle = _callerSchedHandle;
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
asyncState->callerSchedHandle = _callerSchedHandle;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void await_resume() noexcept {}
|
void await_resume() noexcept {}
|
||||||
|
|
||||||
ThreadOp threadOp;
|
ThreadOp threadOp;
|
||||||
bool settled = false;
|
std::shared_ptr<AsyncState> asyncState;
|
||||||
std::coroutine_handle<> callerSchedHandle;
|
|
||||||
PuppetThread &parentThread;
|
PuppetThread &parentThread;
|
||||||
const std::shared_ptr<PuppetThread> selfPtr;
|
const std::shared_ptr<PuppetThread> selfPtr;
|
||||||
cps::Callback<threadLifetimeMgmtOpCbFn> lifetimeMgmtCallback;
|
cps::Callback<threadLifetimeMgmtOpCbFn> lifetimeMgmtCallback;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Thread lifetime management request invokers
|
||||||
ViralThreadLifetimeMgmtInvoker startThreadAReq()
|
ViralThreadLifetimeMgmtInvoker startThreadAReq()
|
||||||
{ return ViralThreadLifetimeMgmtInvoker(ThreadOp::START, *this); }
|
{ return ViralThreadLifetimeMgmtInvoker(ThreadOp::START, *this); }
|
||||||
ViralThreadLifetimeMgmtInvoker pauseThreadAReq()
|
ViralThreadLifetimeMgmtInvoker pauseThreadAReq()
|
||||||
|
|||||||
@@ -5,7 +5,10 @@
|
|||||||
#include <exception>
|
#include <exception>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <string_view>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
|
#include <spinscale/co/group.h>
|
||||||
#include <spinscale/co/invokers.h>
|
#include <spinscale/co/invokers.h>
|
||||||
#include <spinscale/componentThread.h>
|
#include <spinscale/componentThread.h>
|
||||||
|
|
||||||
@@ -19,22 +22,30 @@ public:
|
|||||||
const std::vector<std::shared_ptr<PuppetThread>> &threads);
|
const std::vector<std::shared_ptr<PuppetThread>> &threads);
|
||||||
~PuppetApplication() = default;
|
~PuppetApplication() = default;
|
||||||
|
|
||||||
co::NonViralNonPostingInvoker joltAllPuppetThreadsCReq(
|
co::ViralNonPostingInvoker<void> joltAllPuppetThreadsCReq(
|
||||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
||||||
co::NonViralNonPostingInvoker startAllPuppetThreadsCReq(
|
co::ViralNonPostingInvoker<void> startAllPuppetThreadsCReq(
|
||||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
||||||
co::NonViralNonPostingInvoker pauseAllPuppetThreadsCReq(
|
co::ViralNonPostingInvoker<void> pauseAllPuppetThreadsCReq(
|
||||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
||||||
co::NonViralNonPostingInvoker resumeAllPuppetThreadsCReq(
|
co::ViralNonPostingInvoker<void> resumeAllPuppetThreadsCReq(
|
||||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
||||||
co::NonViralNonPostingInvoker exitAllPuppetThreadsCReq(
|
co::ViralNonPostingInvoker<void> exitAllPuppetThreadsCReq(
|
||||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
||||||
|
|
||||||
// CPU distribution method
|
// CPU distribution method
|
||||||
void distributeAndPinThreadsAcrossCpus();
|
void distributeAndPinThreadsAcrossCpus();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
// Collection of PuppetThread instances
|
using PuppetLifetimeMgmtInvoker =
|
||||||
|
PuppetThread::ViralThreadLifetimeMgmtInvoker;
|
||||||
|
using PuppetLifetimeMgmtGroup = co::Group;
|
||||||
|
|
||||||
|
void addAllPuppetLifetimeInvokersToGroup(
|
||||||
|
PuppetLifetimeMgmtGroup &group,
|
||||||
|
std::vector<PuppetLifetimeMgmtInvoker> &invokers,
|
||||||
|
PuppetThread::ThreadOp threadOp) const;
|
||||||
|
|
||||||
std::vector<std::shared_ptr<PuppetThread>> componentThreads;
|
std::vector<std::shared_ptr<PuppetThread>> componentThreads;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -57,6 +68,13 @@ protected:
|
|||||||
* a synchronization point for the entire system initialization.
|
* a synchronization point for the entire system initialization.
|
||||||
*/
|
*/
|
||||||
bool threadsHaveBeenJolted = false;
|
bool threadsHaveBeenJolted = false;
|
||||||
|
|
||||||
|
private:
|
||||||
|
co::ViralNonPostingInvoker<void> allPuppetThreadsLifetimeOpCReq(
|
||||||
|
std::exception_ptr &exceptionPtr,
|
||||||
|
std::function<void()> callback,
|
||||||
|
PuppetThread::ThreadOp threadOp,
|
||||||
|
std::string_view emptyThreadsLogMessage);
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace sscl
|
} // namespace sscl
|
||||||
|
|||||||
@@ -8,6 +8,7 @@
|
|||||||
#include <spinscale/cps/asynchronousContinuation.h>
|
#include <spinscale/cps/asynchronousContinuation.h>
|
||||||
#include <spinscale/cps/callback.h>
|
#include <spinscale/cps/callback.h>
|
||||||
#include <spinscale/cps/callableTracer.h>
|
#include <spinscale/cps/callableTracer.h>
|
||||||
|
#include <spinscale/co/invokers.h>
|
||||||
#include <spinscale/component.h>
|
#include <spinscale/component.h>
|
||||||
#include <spinscale/componentThread.h>
|
#include <spinscale/componentThread.h>
|
||||||
|
|
||||||
|
|||||||
+76
-84
@@ -8,7 +8,7 @@
|
|||||||
|
|
||||||
namespace sscl {
|
namespace sscl {
|
||||||
|
|
||||||
namespace puppet_application_detail {
|
namespace {
|
||||||
|
|
||||||
constexpr std::string_view noPuppetThreadsToStartLogMessage =
|
constexpr std::string_view noPuppetThreadsToStartLogMessage =
|
||||||
"Mrntt: No puppet threads to start";
|
"Mrntt: No puppet threads to start";
|
||||||
@@ -19,14 +19,18 @@ constexpr std::string_view noPuppetThreadsToResumeLogMessage =
|
|||||||
constexpr std::string_view noPuppetThreadsToExitLogMessage =
|
constexpr std::string_view noPuppetThreadsToExitLogMessage =
|
||||||
"Mrntt: No puppet threads to exit";
|
"Mrntt: No puppet threads to exit";
|
||||||
|
|
||||||
using PuppetLifetimeInvoker = PuppetThread::ViralThreadLifetimeMgmtInvoker;
|
} // namespace
|
||||||
using PuppetLifetimeGroup = co::Group<PuppetLifetimeInvoker>;
|
|
||||||
|
|
||||||
void addAllPuppetLifetimeInvokersToGroup(
|
PuppetApplication::PuppetApplication(
|
||||||
PuppetLifetimeGroup &group,
|
const std::vector<std::shared_ptr<PuppetThread>> &threads)
|
||||||
std::vector<PuppetLifetimeInvoker> &invokers,
|
: componentThreads(threads)
|
||||||
const std::vector<std::shared_ptr<PuppetThread>> &componentThreads,
|
{
|
||||||
PuppetThread::ThreadOp threadOp)
|
}
|
||||||
|
|
||||||
|
void PuppetApplication::addAllPuppetLifetimeInvokersToGroup(
|
||||||
|
PuppetLifetimeMgmtGroup &group,
|
||||||
|
std::vector<PuppetLifetimeMgmtInvoker> &invokers,
|
||||||
|
PuppetThread::ThreadOp threadOp) const
|
||||||
{
|
{
|
||||||
invokers.reserve(componentThreads.size());
|
invokers.reserve(componentThreads.size());
|
||||||
|
|
||||||
@@ -58,40 +62,8 @@ void addAllPuppetLifetimeInvokersToGroup(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
co::NonViralNonPostingInvoker genericAllPuppetThreadsLifetimeOpCReq(
|
co::ViralNonPostingInvoker<void>
|
||||||
const std::vector<std::shared_ptr<PuppetThread>> &componentThreads,
|
PuppetApplication::joltAllPuppetThreadsCReq(
|
||||||
PuppetThread::ThreadOp threadOp,
|
|
||||||
std::string_view emptyThreadsLogMessage,
|
|
||||||
[[maybe_unused]] std::exception_ptr &exceptionPtr,
|
|
||||||
[[maybe_unused]] std::function<void()> callback)
|
|
||||||
{
|
|
||||||
if (componentThreads.empty())
|
|
||||||
{
|
|
||||||
std::cout << emptyThreadsLogMessage << "\n";
|
|
||||||
co_return;
|
|
||||||
}
|
|
||||||
|
|
||||||
PuppetLifetimeGroup group;
|
|
||||||
std::vector<PuppetLifetimeInvoker> invokers;
|
|
||||||
|
|
||||||
addAllPuppetLifetimeInvokersToGroup(
|
|
||||||
group, invokers, componentThreads, threadOp);
|
|
||||||
|
|
||||||
co_await group.getAwaitAllSettlementsInvoker();
|
|
||||||
group.checkForAndReThrowGroupExceptions();
|
|
||||||
|
|
||||||
co_return;
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace puppet_application_detail
|
|
||||||
|
|
||||||
PuppetApplication::PuppetApplication(
|
|
||||||
const std::vector<std::shared_ptr<PuppetThread>> &threads)
|
|
||||||
: componentThreads(threads)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
co::NonViralNonPostingInvoker PuppetApplication::joltAllPuppetThreadsCReq(
|
|
||||||
[[maybe_unused]] std::exception_ptr &exceptionPtr,
|
[[maybe_unused]] std::exception_ptr &exceptionPtr,
|
||||||
[[maybe_unused]] std::function<void()> callback)
|
[[maybe_unused]] std::function<void()> callback)
|
||||||
{
|
{
|
||||||
@@ -108,12 +80,11 @@ co::NonViralNonPostingInvoker PuppetApplication::joltAllPuppetThreadsCReq(
|
|||||||
co_return;
|
co_return;
|
||||||
}
|
}
|
||||||
|
|
||||||
puppet_application_detail::PuppetLifetimeGroup group;
|
PuppetLifetimeMgmtGroup group;
|
||||||
std::vector<puppet_application_detail::PuppetLifetimeInvoker> invokers;
|
std::vector<PuppetLifetimeMgmtInvoker> invokers;
|
||||||
|
|
||||||
puppet_application_detail::addAllPuppetLifetimeInvokersToGroup(
|
|
||||||
group, invokers, componentThreads, PuppetThread::ThreadOp::JOLT);
|
|
||||||
|
|
||||||
|
addAllPuppetLifetimeInvokersToGroup(
|
||||||
|
group, invokers, PuppetThread::ThreadOp::JOLT);
|
||||||
co_await group.getAwaitAllSettlementsInvoker();
|
co_await group.getAwaitAllSettlementsInvoker();
|
||||||
group.checkForAndReThrowGroupExceptions();
|
group.checkForAndReThrowGroupExceptions();
|
||||||
|
|
||||||
@@ -121,53 +92,75 @@ co::NonViralNonPostingInvoker PuppetApplication::joltAllPuppetThreadsCReq(
|
|||||||
co_return;
|
co_return;
|
||||||
}
|
}
|
||||||
|
|
||||||
co::NonViralNonPostingInvoker PuppetApplication::startAllPuppetThreadsCReq(
|
co::ViralNonPostingInvoker<void>
|
||||||
std::exception_ptr &exceptionPtr, std::function<void()> callback)
|
PuppetApplication::allPuppetThreadsLifetimeOpCReq(
|
||||||
{
|
|
||||||
return puppet_application_detail::genericAllPuppetThreadsLifetimeOpCReq(
|
|
||||||
componentThreads, PuppetThread::ThreadOp::START,
|
|
||||||
puppet_application_detail::noPuppetThreadsToStartLogMessage,
|
|
||||||
exceptionPtr, callback);
|
|
||||||
}
|
|
||||||
|
|
||||||
co::NonViralNonPostingInvoker PuppetApplication::pauseAllPuppetThreadsCReq(
|
|
||||||
std::exception_ptr &exceptionPtr, std::function<void()> callback)
|
|
||||||
{
|
|
||||||
return puppet_application_detail::genericAllPuppetThreadsLifetimeOpCReq(
|
|
||||||
componentThreads, PuppetThread::ThreadOp::PAUSE,
|
|
||||||
puppet_application_detail::noPuppetThreadsToPauseLogMessage,
|
|
||||||
exceptionPtr, callback);
|
|
||||||
}
|
|
||||||
|
|
||||||
co::NonViralNonPostingInvoker PuppetApplication::resumeAllPuppetThreadsCReq(
|
|
||||||
std::exception_ptr &exceptionPtr, std::function<void()> callback)
|
|
||||||
{
|
|
||||||
return puppet_application_detail::genericAllPuppetThreadsLifetimeOpCReq(
|
|
||||||
componentThreads, PuppetThread::ThreadOp::RESUME,
|
|
||||||
puppet_application_detail::noPuppetThreadsToResumeLogMessage,
|
|
||||||
exceptionPtr, callback);
|
|
||||||
}
|
|
||||||
|
|
||||||
co::NonViralNonPostingInvoker PuppetApplication::exitAllPuppetThreadsCReq(
|
|
||||||
[[maybe_unused]] std::exception_ptr &exceptionPtr,
|
[[maybe_unused]] std::exception_ptr &exceptionPtr,
|
||||||
[[maybe_unused]] std::function<void()> callback)
|
[[maybe_unused]] std::function<void()> callback,
|
||||||
|
PuppetThread::ThreadOp threadOp,
|
||||||
|
std::string_view emptyThreadsLogMessage)
|
||||||
{
|
{
|
||||||
if (componentThreads.empty())
|
if (componentThreads.empty())
|
||||||
{
|
{
|
||||||
std::cout << puppet_application_detail::noPuppetThreadsToExitLogMessage
|
std::cout << emptyThreadsLogMessage << "\n";
|
||||||
<< "\n";
|
|
||||||
co_return;
|
co_return;
|
||||||
}
|
}
|
||||||
|
|
||||||
puppet_application_detail::PuppetLifetimeGroup group;
|
PuppetLifetimeMgmtGroup group;
|
||||||
std::vector<puppet_application_detail::PuppetLifetimeInvoker> invokers;
|
std::vector<PuppetLifetimeMgmtInvoker> invokers;
|
||||||
|
|
||||||
puppet_application_detail::addAllPuppetLifetimeInvokersToGroup(
|
|
||||||
group, invokers, componentThreads, PuppetThread::ThreadOp::EXIT);
|
|
||||||
|
|
||||||
|
addAllPuppetLifetimeInvokersToGroup(group, invokers, threadOp);
|
||||||
co_await group.getAwaitAllSettlementsInvoker();
|
co_await group.getAwaitAllSettlementsInvoker();
|
||||||
group.checkForAndReThrowGroupExceptions();
|
group.checkForAndReThrowGroupExceptions();
|
||||||
|
|
||||||
|
co_return;
|
||||||
|
}
|
||||||
|
|
||||||
|
co::ViralNonPostingInvoker<void>
|
||||||
|
PuppetApplication::startAllPuppetThreadsCReq(
|
||||||
|
std::exception_ptr &exceptionPtr, std::function<void()> callback)
|
||||||
|
{
|
||||||
|
return allPuppetThreadsLifetimeOpCReq(
|
||||||
|
exceptionPtr, std::move(callback),
|
||||||
|
PuppetThread::ThreadOp::START,
|
||||||
|
noPuppetThreadsToStartLogMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
co::ViralNonPostingInvoker<void>
|
||||||
|
PuppetApplication::pauseAllPuppetThreadsCReq(
|
||||||
|
std::exception_ptr &exceptionPtr, std::function<void()> callback)
|
||||||
|
{
|
||||||
|
return allPuppetThreadsLifetimeOpCReq(
|
||||||
|
exceptionPtr, std::move(callback),
|
||||||
|
PuppetThread::ThreadOp::PAUSE,
|
||||||
|
noPuppetThreadsToPauseLogMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
co::ViralNonPostingInvoker<void>
|
||||||
|
PuppetApplication::resumeAllPuppetThreadsCReq(
|
||||||
|
std::exception_ptr &exceptionPtr, std::function<void()> callback)
|
||||||
|
{
|
||||||
|
return allPuppetThreadsLifetimeOpCReq(
|
||||||
|
exceptionPtr, std::move(callback),
|
||||||
|
PuppetThread::ThreadOp::RESUME,
|
||||||
|
noPuppetThreadsToResumeLogMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
co::ViralNonPostingInvoker<void>
|
||||||
|
PuppetApplication::exitAllPuppetThreadsCReq(
|
||||||
|
std::exception_ptr &exceptionPtr,
|
||||||
|
std::function<void()> callback)
|
||||||
|
{
|
||||||
|
if (componentThreads.empty())
|
||||||
|
{
|
||||||
|
std::cout << noPuppetThreadsToExitLogMessage << "\n";
|
||||||
|
co_return;
|
||||||
|
}
|
||||||
|
|
||||||
|
co_await allPuppetThreadsLifetimeOpCReq(
|
||||||
|
exceptionPtr, std::move(callback),
|
||||||
|
PuppetThread::ThreadOp::EXIT,
|
||||||
|
noPuppetThreadsToExitLogMessage);
|
||||||
|
|
||||||
for (auto &thread : componentThreads) {
|
for (auto &thread : componentThreads) {
|
||||||
thread->thread.join();
|
thread->thread.join();
|
||||||
}
|
}
|
||||||
@@ -179,7 +172,6 @@ void PuppetApplication::distributeAndPinThreadsAcrossCpus()
|
|||||||
{
|
{
|
||||||
int cpuCount = ComponentThread::getAvailableCpuCount();
|
int cpuCount = ComponentThread::getAvailableCpuCount();
|
||||||
|
|
||||||
// Distribute and pin threads across CPUs
|
|
||||||
int threadIndex = 0;
|
int threadIndex = 0;
|
||||||
for (auto& thread : componentThreads)
|
for (auto& thread : componentThreads)
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user