mirror of
https://github.com/latentPrion/libspinscale.git
synced 2026-06-23 19:48:32 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e29bee52cf | |||
| daad2a8c95 |
@@ -1,6 +1,7 @@
|
||||
#ifndef GROUP_H
|
||||
#define GROUP_H
|
||||
|
||||
#include <any>
|
||||
#include <cassert>
|
||||
#include <coroutine>
|
||||
#include <cstddef>
|
||||
@@ -60,6 +61,19 @@ concept AwaitableIface = requires(T &t) {
|
||||
{ get_operator_co_await(t) };
|
||||
} && AwaiterIface<decltype(get_operator_co_await(std::declval<T &>()))>;
|
||||
|
||||
template<AwaiterIface T>
|
||||
T &asAwaiter(T &t) noexcept
|
||||
{
|
||||
return t;
|
||||
}
|
||||
|
||||
template<AwaitableIface T>
|
||||
auto asAwaiter(T &t) noexcept(noexcept(get_operator_co_await(t)))
|
||||
-> decltype(get_operator_co_await(t))
|
||||
{
|
||||
return get_operator_co_await(t);
|
||||
}
|
||||
|
||||
} // namespace detail
|
||||
|
||||
template <typename T>
|
||||
@@ -71,8 +85,27 @@ concept AwaiterIface = detail::AwaiterIface<T>;
|
||||
template <typename T>
|
||||
concept AwaitableOrAwaiterIface = AwaiterIface<T> || AwaitableIface<T>;
|
||||
|
||||
template <typename Invoker>
|
||||
requires AwaitableOrAwaiterIface<Invoker>
|
||||
/** Typical usage — parallel members, then gather:
|
||||
*
|
||||
* co::Group group;
|
||||
*
|
||||
* auto bodyInit = body.initializeCReq(exceptionPtr, noopCallback);
|
||||
* auto legInit = leg.initializeCReq(exceptionPtr, noopCallback);
|
||||
* ViralNonPostingInvoker<void> batch = app.joltAllPuppetThreadsCReq(...);
|
||||
*
|
||||
* group.add(bodyInit);
|
||||
* group.add(legInit);
|
||||
* group.add(batch);
|
||||
*
|
||||
* co_await group.getAwaitAllSettlementsInvoker();
|
||||
* group.checkForAndReThrowGroupExceptions();
|
||||
*
|
||||
* (void)bodyInit.completedReturnValues();
|
||||
*
|
||||
* // When walking settlement slots by index:
|
||||
* settlements[i].invokerAs<BodyViralPostingInvoker<void>>()
|
||||
* .completedReturnValues();
|
||||
*/
|
||||
struct Group
|
||||
{
|
||||
enum class AwaitingCondition {
|
||||
@@ -91,9 +124,23 @@ struct Group
|
||||
UNSETTLED, COMPLETED, EXCEPTION_THROWN
|
||||
};
|
||||
|
||||
SettlementDescriptor(Invoker &_invoker)
|
||||
: invoker(std::ref(_invoker))
|
||||
{}
|
||||
template<typename Member>
|
||||
void bindMemberRef(Member &member)
|
||||
{
|
||||
memberInvokerRef = std::ref(member);
|
||||
}
|
||||
|
||||
template<typename Member>
|
||||
Member &invokerAs() const
|
||||
{
|
||||
try {
|
||||
return std::any_cast<std::reference_wrapper<Member>>(
|
||||
memberInvokerRef).get();
|
||||
} catch (const std::bad_any_cast &) {
|
||||
throw std::runtime_error(
|
||||
"Group settlement invoker type mismatch");
|
||||
}
|
||||
}
|
||||
|
||||
void setSettlementStatus() noexcept
|
||||
{
|
||||
@@ -109,7 +156,7 @@ struct Group
|
||||
TypeE type = TypeE::UNSETTLED;
|
||||
std::exception_ptr calleeException = nullptr;
|
||||
std::exception_ptr adapterException = nullptr;
|
||||
std::reference_wrapper<Invoker> invoker;
|
||||
std::any memberInvokerRef;
|
||||
};
|
||||
|
||||
struct SettlementAwaitingInvoker;
|
||||
@@ -439,7 +486,7 @@ struct Group
|
||||
* would be impossible.
|
||||
*
|
||||
* So we should be able to call resume() directly here without
|
||||
* post()ing to current_io_context().
|
||||
* post()ing to ComponentThread::getSelf()->getIoService().
|
||||
*
|
||||
* EXPLANATION:
|
||||
* However, in order to ensure that we keep this adapter coro
|
||||
@@ -466,28 +513,17 @@ struct Group
|
||||
* target async fn, and also to convey its results back to the Group class.
|
||||
* It's effectively a go-between coro that provides the outcomes that Invokers
|
||||
* normally provide, without needing, itself, to be co_awaited.
|
||||
*
|
||||
* settlementIndex is captured by value (not a vector iterator) so adapter
|
||||
* coros remain valid if settlements reallocate during concurrent add().
|
||||
*/
|
||||
NonAwaitableNonPostingAdapterCoro nonAwaitableAdapterCoro(
|
||||
template<AwaitableOrAwaiterIface Member>
|
||||
NonAwaitableNonPostingAdapterCoro memberAdapterCoro(
|
||||
Member &memberInvoker,
|
||||
std::size_t settlementIndex) noexcept
|
||||
{
|
||||
/** EXPLANATION:
|
||||
* It's very convenient that our design for the NonViralPostingInvoker
|
||||
* coincidentally allows us to supply a lambda that can be used to test
|
||||
* for the settlement conditions that are being waited on by the Group's
|
||||
* co_awaiter.
|
||||
*
|
||||
* settlementIndex is captured by value (not a vector iterator) so adapter
|
||||
* coros remain valid if settlements reallocate during concurrent add().
|
||||
*/
|
||||
try {
|
||||
/* Return values remain in the callee promise until the caller-owned
|
||||
* invoker is destroyed (~PostingInvoker). The group co_awaiter reads
|
||||
* results via settlements[settlementIndex].invoker after awaiting.
|
||||
*
|
||||
* Index settlements[] each time; do not cache a reference across
|
||||
* co_await because concurrent add() may reallocate the vector.
|
||||
*/
|
||||
co_await s.rsrc.settlements[settlementIndex].invoker.get();
|
||||
co_await detail::asAwaiter(memberInvoker);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@@ -505,12 +541,8 @@ struct Group
|
||||
co_return;
|
||||
}
|
||||
|
||||
/** EXPLANATION:
|
||||
* Each invoker passed to add() must outlive this Group and the callee frame
|
||||
* (see ~PostingInvoker). The group co_awaiter reads return values from those
|
||||
* invokers after awaiting; do not destroy an invoker until reads are done.
|
||||
*/
|
||||
void add(Invoker &invoker)
|
||||
template<AwaitableOrAwaiterIface Member>
|
||||
void add(Member &memberInvoker)
|
||||
{
|
||||
std::size_t settlementIndex = 0;
|
||||
|
||||
@@ -525,10 +557,11 @@ struct Group
|
||||
}
|
||||
|
||||
settlementIndex = s.rsrc.settlements.size();
|
||||
s.rsrc.settlements.emplace_back(invoker);
|
||||
s.rsrc.settlements.emplace_back();
|
||||
s.rsrc.settlements[settlementIndex].bindMemberRef(memberInvoker);
|
||||
}
|
||||
|
||||
nonAwaitableAdapterCoro(settlementIndex);
|
||||
memberAdapterCoro(memberInvoker, settlementIndex);
|
||||
}
|
||||
|
||||
void checkForAndReThrowGroupExceptions() const
|
||||
|
||||
@@ -47,7 +47,10 @@ struct NonViralPostingInvoker
|
||||
*/
|
||||
std::ostringstream oss;
|
||||
oss << std::this_thread::get_id()
|
||||
<< ": Missing completion lambda: non-viral coroutines require a completion lambda.";
|
||||
<< ": Missing completion lambda: non-viral coroutines require a completion lambda."
|
||||
<< " Promise type=" << typeid(*this).name()
|
||||
<< ". This usually means promise construction did not bind the"
|
||||
<< " (exception_ptr&, function<void()>, ...) constructor.";
|
||||
throw std::runtime_error(oss.str());
|
||||
}
|
||||
|
||||
@@ -153,12 +156,12 @@ struct ViralPostingInvoker
|
||||
* from get_return_object(). ~NonPostingInvoker destroys the callee frame.
|
||||
*/
|
||||
struct NonViralNonPostingInvoker
|
||||
: public NonPostingInvoker<NonPostingPromise>
|
||||
: public NonPostingInvoker<NonPostingPromise<void>, void>
|
||||
{
|
||||
struct promise_type
|
||||
: public NonPostingPromise
|
||||
: public NonPostingPromise<void>
|
||||
{
|
||||
using NonPostingPromise::NonPostingPromise;
|
||||
using NonPostingPromise<void>::NonPostingPromise;
|
||||
|
||||
NonViralNonPostingInvoker get_return_object()
|
||||
{
|
||||
@@ -169,7 +172,10 @@ struct NonViralNonPostingInvoker
|
||||
{
|
||||
std::ostringstream oss;
|
||||
oss << std::this_thread::get_id()
|
||||
<< ": Missing completion lambda: non-viral coroutines require a completion lambda.";
|
||||
<< ": Missing completion lambda: non-viral coroutines require a completion lambda."
|
||||
<< " Promise type=" << typeid(*this).name()
|
||||
<< ". This usually means promise construction did not bind the"
|
||||
<< " (exception_ptr&, function<void()>, ...) constructor.";
|
||||
throw std::runtime_error(oss.str());
|
||||
}
|
||||
|
||||
@@ -180,7 +186,7 @@ struct NonViralNonPostingInvoker
|
||||
}
|
||||
};
|
||||
|
||||
using NonPostingInvoker<NonPostingPromise>::NonPostingInvoker;
|
||||
using NonPostingInvoker<NonPostingPromise<void>, void>::NonPostingInvoker;
|
||||
|
||||
bool await_ready() const noexcept
|
||||
{ std::terminate(); }
|
||||
@@ -192,6 +198,66 @@ struct NonViralNonPostingInvoker
|
||||
{ std::terminate(); }
|
||||
};
|
||||
|
||||
/** Viral awaitable non-posting coroutine: runs eagerly on the caller thread
|
||||
* (initial_suspend is never). Caller resume uses symmetric transfer when the
|
||||
* caller has registered before callee completion; otherwise PostBackStatus
|
||||
* fast-paths await_resume on co_await.
|
||||
*/
|
||||
template <typename T = void>
|
||||
struct ViralNonPostingInvoker
|
||||
: public NonPostingInvoker<NonPostingPromise<T>, T>
|
||||
{
|
||||
struct promise_type
|
||||
: public NonPostingPromise<T>
|
||||
{
|
||||
using NonPostingPromise<T>::NonPostingPromise;
|
||||
|
||||
ViralNonPostingInvoker<T> get_return_object() noexcept
|
||||
{
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id()
|
||||
<< " Returning ViralNonPostingInvoker.\n";
|
||||
#endif
|
||||
this->setSelfSchedHandle(
|
||||
std::coroutine_handle<promise_type>::from_promise(*this));
|
||||
|
||||
return ViralNonPostingInvoker<T>(*this);
|
||||
}
|
||||
};
|
||||
|
||||
using NonPostingInvoker<NonPostingPromise<T>, T>::NonPostingInvoker;
|
||||
|
||||
bool await_ready() const noexcept
|
||||
{ return false; }
|
||||
|
||||
template <typename CallerPromise>
|
||||
bool await_suspend(
|
||||
std::coroutine_handle<CallerPromise> callerSchedHandle) noexcept
|
||||
{
|
||||
static_assert(
|
||||
std::is_base_of_v<PromiseChainLink, CallerPromise>,
|
||||
"ViralNonPostingInvoker caller promise must derive from "
|
||||
"PromiseChainLink");
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id()
|
||||
<< " Setting callerSchedHandle.\n";
|
||||
#endif
|
||||
const bool suspendCaller =
|
||||
this->setCallerSchedHandle(callerSchedHandle);
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id()
|
||||
<< " CallerFlowExecutor returned suspend=" << suspendCaller
|
||||
<< ".\n";
|
||||
#endif
|
||||
return suspendCaller;
|
||||
}
|
||||
|
||||
auto await_resume()
|
||||
{
|
||||
return NonPostingInvoker<NonPostingPromise<T>, T>::await_resume();
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace sscl::co
|
||||
|
||||
#endif // INVOKERS_H
|
||||
|
||||
@@ -5,13 +5,14 @@
|
||||
#include <coroutine>
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
|
||||
#include <spinscale/co/nonPostingPromise.h>
|
||||
|
||||
namespace sscl::co {
|
||||
|
||||
template <typename PromiseType>
|
||||
template <typename PromiseType, typename T>
|
||||
class NonPostingInvoker
|
||||
{
|
||||
public:
|
||||
@@ -39,15 +40,55 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
ReturnValues<void> &completedReturnValues() noexcept
|
||||
template <typename CallerPromise>
|
||||
bool setCallerSchedHandle(
|
||||
std::coroutine_handle<CallerPromise> callerSchedHandle) noexcept
|
||||
{
|
||||
static_assert(
|
||||
std::is_base_of_v<PromiseChainLink, CallerPromise>,
|
||||
"NonPostingInvoker caller promise must derive from PromiseChainLink");
|
||||
|
||||
calleePromise.callerSchedHandle = callerSchedHandle;
|
||||
calleePromise.setCallerPromiseChainLink(
|
||||
&callerSchedHandle.promise());
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id()
|
||||
<< " Done setting callerSchedHandle; running CallerFlowExecutor.\n";
|
||||
#endif
|
||||
return calleePromise.postBackStatus.getCallerFlowExecutor()();
|
||||
}
|
||||
|
||||
ReturnValues<T> &completedReturnValues() noexcept
|
||||
{ return calleePromise.returnValues; }
|
||||
|
||||
const ReturnValues<void> &completedReturnValues() const noexcept
|
||||
const ReturnValues<T> &completedReturnValues() const noexcept
|
||||
{ return calleePromise.returnValues; }
|
||||
|
||||
private:
|
||||
auto await_resume()
|
||||
{
|
||||
calleePromise.postBackStatus.reset();
|
||||
|
||||
ReturnValues<T> &returnValues = calleePromise.returnValues;
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id()
|
||||
<< " About to check for and rethrow any exception.\n";
|
||||
#endif
|
||||
|
||||
if (returnValues.myExceptionPtr) {
|
||||
std::exception_ptr const captured = returnValues.myExceptionPtr;
|
||||
std::rethrow_exception(captured);
|
||||
}
|
||||
if constexpr (!std::is_void_v<T>)
|
||||
{
|
||||
T result = std::move(returnValues.myReturnValue);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
PromiseType &calleePromise;
|
||||
|
||||
private:
|
||||
/** Every live invoker owns destruction of its callee coroutine frame in
|
||||
* ~NonPostingInvoker (via calleePromise.selfSchedHandle).
|
||||
*
|
||||
|
||||
@@ -9,15 +9,99 @@
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
|
||||
#include <spinscale/spinLock.h>
|
||||
#include <spinscale/co/coQutex.h>
|
||||
#include <spinscale/co/promiseChainLink.h>
|
||||
#include <spinscale/co/promises.h>
|
||||
|
||||
namespace sscl::co {
|
||||
|
||||
template <typename T>
|
||||
struct NonPostingPromise
|
||||
: public PromiseChainLink
|
||||
: public PromiseChainLink,
|
||||
public PostingPromiseReturnOps<NonPostingPromise<T>, T>
|
||||
{
|
||||
struct PostBackStatus
|
||||
{
|
||||
struct CalleeFlowExecutor;
|
||||
struct CallerFlowExecutor;
|
||||
friend struct CalleeFlowExecutor;
|
||||
friend struct CallerFlowExecutor;
|
||||
|
||||
explicit PostBackStatus(NonPostingPromise &calleePromiseIn) noexcept
|
||||
: calleePromise(calleePromiseIn)
|
||||
{}
|
||||
|
||||
void reset() noexcept
|
||||
{
|
||||
sscl::SpinLock::Guard guard(lock);
|
||||
callerHasSetCallerSchedHandle = false;
|
||||
calleeIsReadyToPostBack = false;
|
||||
}
|
||||
|
||||
struct FlowExecutor
|
||||
{
|
||||
explicit FlowExecutor(PostBackStatus &parentIn) noexcept
|
||||
: parent(parentIn)
|
||||
{}
|
||||
|
||||
PostBackStatus &parent;
|
||||
};
|
||||
|
||||
struct CalleeFlowExecutor
|
||||
: public FlowExecutor
|
||||
{
|
||||
explicit CalleeFlowExecutor(PostBackStatus &parentIn) noexcept
|
||||
: FlowExecutor(parentIn)
|
||||
{}
|
||||
|
||||
bool operator()() noexcept
|
||||
{
|
||||
sscl::SpinLock::Guard guard(this->parent.lock);
|
||||
this->parent.calleeIsReadyToPostBack = true;
|
||||
if (this->parent.callerHasSetCallerSchedHandle) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
struct CallerFlowExecutor
|
||||
: public FlowExecutor
|
||||
{
|
||||
explicit CallerFlowExecutor(PostBackStatus &parentIn) noexcept
|
||||
: FlowExecutor(parentIn)
|
||||
{}
|
||||
|
||||
bool operator()() noexcept
|
||||
{
|
||||
sscl::SpinLock::Guard guard(this->parent.lock);
|
||||
this->parent.callerHasSetCallerSchedHandle = true;
|
||||
if (this->parent.calleeIsReadyToPostBack) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
CalleeFlowExecutor getCalleeFlowExecutor() noexcept
|
||||
{
|
||||
return CalleeFlowExecutor(*this);
|
||||
}
|
||||
|
||||
CallerFlowExecutor getCallerFlowExecutor() noexcept
|
||||
{
|
||||
return CallerFlowExecutor(*this);
|
||||
}
|
||||
|
||||
NonPostingPromise &calleePromise;
|
||||
|
||||
private:
|
||||
sscl::SpinLock lock;
|
||||
bool callerHasSetCallerSchedHandle = false;
|
||||
bool calleeIsReadyToPostBack = false;
|
||||
};
|
||||
|
||||
/** Completion work must run from this awaiter's await_suspend, not
|
||||
* synchronously inside promise.final_suspend() before it returns: the
|
||||
* hidden coroutine segment index in the coroutine state is only advanced
|
||||
@@ -26,16 +110,19 @@ struct NonPostingPromise
|
||||
struct FinalSuspendNonPostingInvoker
|
||||
: public std::suspend_always
|
||||
{
|
||||
explicit FinalSuspendNonPostingInvoker(NonPostingPromise &calleePromiseIn) noexcept
|
||||
: calleePromise(calleePromiseIn)
|
||||
explicit FinalSuspendNonPostingInvoker(
|
||||
NonPostingPromise &calleePromiseIn) noexcept
|
||||
: calleePromise(calleePromiseIn)
|
||||
{}
|
||||
|
||||
bool await_suspend(std::coroutine_handle<> const) noexcept
|
||||
std::coroutine_handle<> await_suspend(
|
||||
std::coroutine_handle<> const) noexcept
|
||||
{
|
||||
if (calleePromise.callerLambda)
|
||||
{
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << "final_suspend" << ": " << std::this_thread::get_id()
|
||||
std::cout << "final_suspend" << ": "
|
||||
<< std::this_thread::get_id()
|
||||
<< " Non-viral non-posting: invoking callerLambda directly.\n";
|
||||
#endif
|
||||
if (calleePromise.returnValues.myExceptionPtr) {
|
||||
@@ -44,15 +131,29 @@ struct NonPostingPromise
|
||||
}
|
||||
|
||||
calleePromise.callerLambda();
|
||||
return std::noop_coroutine();
|
||||
}
|
||||
return true;
|
||||
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << "final_suspend" << ": " << std::this_thread::get_id()
|
||||
<< " Viral non-posting: running CalleeFlowExecutor.\n";
|
||||
#endif
|
||||
const bool symmetricTransferToCaller =
|
||||
calleePromise.postBackStatus.getCalleeFlowExecutor()();
|
||||
|
||||
if (symmetricTransferToCaller && calleePromise.callerSchedHandle) {
|
||||
return calleePromise.callerSchedHandle;
|
||||
}
|
||||
|
||||
return std::noop_coroutine();
|
||||
}
|
||||
|
||||
NonPostingPromise &calleePromise;
|
||||
};
|
||||
|
||||
NonPostingPromise() noexcept
|
||||
: returnValues()
|
||||
: returnValues(),
|
||||
postBackStatus(*this)
|
||||
{}
|
||||
|
||||
template <typename... TailArgs>
|
||||
@@ -61,13 +162,27 @@ struct NonPostingPromise
|
||||
std::function<void()> callerLambdaIn,
|
||||
TailArgs &&...) noexcept
|
||||
: returnValues(callerExceptionPtr),
|
||||
callerLambda(std::move(callerLambdaIn))
|
||||
callerLambda(std::move(callerLambdaIn)),
|
||||
postBackStatus(*this)
|
||||
{}
|
||||
|
||||
template <typename ObjectArg, typename... TailArgs>
|
||||
requires (!std::same_as<std::remove_cvref_t<ObjectArg>, std::exception_ptr>)
|
||||
NonPostingPromise(
|
||||
ObjectArg &&,
|
||||
std::exception_ptr &callerExceptionPtr,
|
||||
std::function<void()> callerLambdaIn,
|
||||
TailArgs &&...) noexcept
|
||||
: NonPostingPromise(
|
||||
callerExceptionPtr,
|
||||
std::move(callerLambdaIn))
|
||||
{}
|
||||
|
||||
~NonPostingPromise() noexcept
|
||||
{
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id() << " Destructing.\n";
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id()
|
||||
<< " Destructing.\n";
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -77,9 +192,6 @@ struct NonPostingPromise
|
||||
auto final_suspend() noexcept
|
||||
{ return FinalSuspendNonPostingInvoker(*this); }
|
||||
|
||||
void return_void() noexcept
|
||||
{ return; }
|
||||
|
||||
void unhandled_exception() noexcept
|
||||
{
|
||||
returnValues.myExceptionPtr = std::current_exception();
|
||||
@@ -90,16 +202,26 @@ struct NonPostingPromise
|
||||
eraseFirstMatchingAcquiredLock(coQutex);
|
||||
}
|
||||
|
||||
const PromiseChainLink *callerPromiseChainLink() const noexcept override
|
||||
{ return callerChainLink; }
|
||||
|
||||
PromiseChainLink *callerPromiseChainLink() noexcept override
|
||||
{ return callerChainLink; }
|
||||
|
||||
void setSelfSchedHandle(std::coroutine_handle<> schedHandle) noexcept
|
||||
{
|
||||
selfSchedHandle = schedHandle;
|
||||
}
|
||||
{ selfSchedHandle = schedHandle; }
|
||||
|
||||
ReturnValues<void> returnValues;
|
||||
void setCallerPromiseChainLink(PromiseChainLink *chainLink) noexcept
|
||||
{ callerChainLink = chainLink; }
|
||||
|
||||
ReturnValues<T> returnValues;
|
||||
std::function<void()> callerLambda;
|
||||
PostBackStatus postBackStatus;
|
||||
std::coroutine_handle<> selfSchedHandle;
|
||||
std::coroutine_handle<> callerSchedHandle;
|
||||
PromiseChainLink *callerChainLink = nullptr;
|
||||
|
||||
template <typename>
|
||||
template <typename, typename>
|
||||
friend class NonPostingInvoker;
|
||||
};
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
#include <exception>
|
||||
#include <functional>
|
||||
#include <iostream>
|
||||
#include <typeinfo>
|
||||
#include <thread>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
@@ -267,6 +268,22 @@ struct PostingPromise
|
||||
postBackStatus(*this)
|
||||
{}
|
||||
|
||||
/** Member coroutines pass the implicit object parameter before explicit
|
||||
* (exceptionPtr, callback, ...) args. Discard the object and delegate to
|
||||
* the free-function constructor shape.
|
||||
*/
|
||||
template <typename ObjectArg, typename... TailArgs>
|
||||
requires (!std::same_as<std::remove_cvref_t<ObjectArg>, std::exception_ptr>)
|
||||
PostingPromise(
|
||||
ObjectArg &&,
|
||||
std::exception_ptr &_callerExceptionPtr,
|
||||
std::function<void()> _callerLambda,
|
||||
TailArgs &&...) noexcept
|
||||
: PostingPromise(
|
||||
_callerExceptionPtr,
|
||||
std::move(_callerLambda))
|
||||
{}
|
||||
|
||||
~PostingPromise() noexcept
|
||||
{
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
@@ -345,6 +362,19 @@ struct TaggedPostingPromise
|
||||
std::forward<TailArgs>(tailArgs)...)
|
||||
{}
|
||||
|
||||
template <typename ObjectArg, typename... TailArgs>
|
||||
requires (!std::same_as<std::remove_cvref_t<ObjectArg>, std::exception_ptr>)
|
||||
TaggedPostingPromise(
|
||||
ObjectArg &&,
|
||||
std::exception_ptr &_exceptionPtr,
|
||||
std::function<void()> _callerLambda,
|
||||
TailArgs &&... tailArgs) noexcept
|
||||
: PostingPromise<T>(
|
||||
_exceptionPtr,
|
||||
std::move(_callerLambda),
|
||||
std::forward<TailArgs>(tailArgs)...)
|
||||
{}
|
||||
|
||||
auto initial_suspend() noexcept
|
||||
{
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
#include <atomic>
|
||||
#include <thread>
|
||||
#include <unordered_map>
|
||||
#include <boost/asio/io_service.hpp>
|
||||
#include <stdexcept>
|
||||
#include <queue>
|
||||
#include <functional>
|
||||
@@ -14,9 +13,11 @@
|
||||
#include <unistd.h>
|
||||
#include <memory>
|
||||
#include <coroutine>
|
||||
#include <spinscale/cps/callback.h>
|
||||
#include <cstdint>
|
||||
#include <string>
|
||||
#include <boost/asio/io_service.hpp>
|
||||
#include <boost/asio/post.hpp>
|
||||
#include <spinscale/cps/callback.h>
|
||||
|
||||
namespace sscl {
|
||||
|
||||
@@ -165,21 +166,40 @@ public:
|
||||
|
||||
struct ViralThreadLifetimeMgmtInvoker
|
||||
{
|
||||
struct AsyncState
|
||||
{
|
||||
std::atomic<bool> settled{false};
|
||||
std::coroutine_handle<> callerSchedHandle;
|
||||
};
|
||||
|
||||
ViralThreadLifetimeMgmtInvoker(
|
||||
ThreadOp _threadOp,
|
||||
PuppetThread &_parentThread,
|
||||
const std::shared_ptr<PuppetThread> &_selfPtr = nullptr)
|
||||
: threadOp(_threadOp),
|
||||
asyncState(std::make_shared<AsyncState>()),
|
||||
parentThread(_parentThread),
|
||||
selfPtr(_selfPtr),
|
||||
lifetimeMgmtCallback{
|
||||
nullptr,
|
||||
[this]()
|
||||
[asyncState = asyncState]()
|
||||
{
|
||||
settled = true;
|
||||
if (callerSchedHandle) {
|
||||
callerSchedHandle.resume();
|
||||
asyncState->settled.store(true, std::memory_order_release);
|
||||
|
||||
std::coroutine_handle<> handle =
|
||||
asyncState->callerSchedHandle;
|
||||
|
||||
if (!handle) {
|
||||
return;
|
||||
}
|
||||
|
||||
/** Post resume to the puppeteer queue: direct resume() from
|
||||
* within an asio completion handler can destroy adapter
|
||||
* coroutine state while the handler is still unwinding.
|
||||
*/
|
||||
boost::asio::post(
|
||||
ComponentThread::getPptr()->getIoService(),
|
||||
[handle]() { handle.resume(); });
|
||||
}}
|
||||
{
|
||||
if (threadOp == ThreadOp::JOLT && selfPtr == nullptr)
|
||||
@@ -212,26 +232,32 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
bool await_ready() const noexcept { return settled; }
|
||||
bool await_ready() const noexcept
|
||||
{
|
||||
return asyncState->settled.load(std::memory_order_acquire);
|
||||
}
|
||||
|
||||
bool await_suspend(
|
||||
std::coroutine_handle<> _callerSchedHandle) noexcept
|
||||
{
|
||||
if (settled) { return false; }
|
||||
callerSchedHandle = _callerSchedHandle;
|
||||
if (asyncState->settled.load(std::memory_order_acquire)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
asyncState->callerSchedHandle = _callerSchedHandle;
|
||||
return true;
|
||||
}
|
||||
|
||||
void await_resume() noexcept {}
|
||||
|
||||
ThreadOp threadOp;
|
||||
bool settled = false;
|
||||
std::coroutine_handle<> callerSchedHandle;
|
||||
std::shared_ptr<AsyncState> asyncState;
|
||||
PuppetThread &parentThread;
|
||||
const std::shared_ptr<PuppetThread> selfPtr;
|
||||
cps::Callback<threadLifetimeMgmtOpCbFn> lifetimeMgmtCallback;
|
||||
};
|
||||
|
||||
// Thread lifetime management request invokers
|
||||
ViralThreadLifetimeMgmtInvoker startThreadAReq()
|
||||
{ return ViralThreadLifetimeMgmtInvoker(ThreadOp::START, *this); }
|
||||
ViralThreadLifetimeMgmtInvoker pauseThreadAReq()
|
||||
|
||||
@@ -5,7 +5,10 @@
|
||||
#include <exception>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <string_view>
|
||||
#include <vector>
|
||||
|
||||
#include <spinscale/co/group.h>
|
||||
#include <spinscale/co/invokers.h>
|
||||
#include <spinscale/componentThread.h>
|
||||
|
||||
@@ -19,22 +22,30 @@ public:
|
||||
const std::vector<std::shared_ptr<PuppetThread>> &threads);
|
||||
~PuppetApplication() = default;
|
||||
|
||||
co::NonViralNonPostingInvoker joltAllPuppetThreadsCReq(
|
||||
co::ViralNonPostingInvoker<void> joltAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
||||
co::NonViralNonPostingInvoker startAllPuppetThreadsCReq(
|
||||
co::ViralNonPostingInvoker<void> startAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
||||
co::NonViralNonPostingInvoker pauseAllPuppetThreadsCReq(
|
||||
co::ViralNonPostingInvoker<void> pauseAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
||||
co::NonViralNonPostingInvoker resumeAllPuppetThreadsCReq(
|
||||
co::ViralNonPostingInvoker<void> resumeAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
||||
co::NonViralNonPostingInvoker exitAllPuppetThreadsCReq(
|
||||
co::ViralNonPostingInvoker<void> exitAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
||||
|
||||
// CPU distribution method
|
||||
void distributeAndPinThreadsAcrossCpus();
|
||||
|
||||
protected:
|
||||
// Collection of PuppetThread instances
|
||||
using PuppetLifetimeMgmtInvoker =
|
||||
PuppetThread::ViralThreadLifetimeMgmtInvoker;
|
||||
using PuppetLifetimeMgmtGroup = co::Group;
|
||||
|
||||
void addAllPuppetLifetimeInvokersToGroup(
|
||||
PuppetLifetimeMgmtGroup &group,
|
||||
std::vector<PuppetLifetimeMgmtInvoker> &invokers,
|
||||
PuppetThread::ThreadOp threadOp) const;
|
||||
|
||||
std::vector<std::shared_ptr<PuppetThread>> componentThreads;
|
||||
|
||||
/**
|
||||
@@ -57,6 +68,13 @@ protected:
|
||||
* a synchronization point for the entire system initialization.
|
||||
*/
|
||||
bool threadsHaveBeenJolted = false;
|
||||
|
||||
private:
|
||||
co::ViralNonPostingInvoker<void> allPuppetThreadsLifetimeOpCReq(
|
||||
std::exception_ptr &exceptionPtr,
|
||||
std::function<void()> callback,
|
||||
PuppetThread::ThreadOp threadOp,
|
||||
std::string_view emptyThreadsLogMessage);
|
||||
};
|
||||
|
||||
} // namespace sscl
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
#include <spinscale/cps/asynchronousContinuation.h>
|
||||
#include <spinscale/cps/callback.h>
|
||||
#include <spinscale/cps/callableTracer.h>
|
||||
#include <spinscale/co/invokers.h>
|
||||
#include <spinscale/component.h>
|
||||
#include <spinscale/componentThread.h>
|
||||
|
||||
|
||||
+76
-84
@@ -8,7 +8,7 @@
|
||||
|
||||
namespace sscl {
|
||||
|
||||
namespace puppet_application_detail {
|
||||
namespace {
|
||||
|
||||
constexpr std::string_view noPuppetThreadsToStartLogMessage =
|
||||
"Mrntt: No puppet threads to start";
|
||||
@@ -19,14 +19,18 @@ constexpr std::string_view noPuppetThreadsToResumeLogMessage =
|
||||
constexpr std::string_view noPuppetThreadsToExitLogMessage =
|
||||
"Mrntt: No puppet threads to exit";
|
||||
|
||||
using PuppetLifetimeInvoker = PuppetThread::ViralThreadLifetimeMgmtInvoker;
|
||||
using PuppetLifetimeGroup = co::Group<PuppetLifetimeInvoker>;
|
||||
} // namespace
|
||||
|
||||
void addAllPuppetLifetimeInvokersToGroup(
|
||||
PuppetLifetimeGroup &group,
|
||||
std::vector<PuppetLifetimeInvoker> &invokers,
|
||||
const std::vector<std::shared_ptr<PuppetThread>> &componentThreads,
|
||||
PuppetThread::ThreadOp threadOp)
|
||||
PuppetApplication::PuppetApplication(
|
||||
const std::vector<std::shared_ptr<PuppetThread>> &threads)
|
||||
: componentThreads(threads)
|
||||
{
|
||||
}
|
||||
|
||||
void PuppetApplication::addAllPuppetLifetimeInvokersToGroup(
|
||||
PuppetLifetimeMgmtGroup &group,
|
||||
std::vector<PuppetLifetimeMgmtInvoker> &invokers,
|
||||
PuppetThread::ThreadOp threadOp) const
|
||||
{
|
||||
invokers.reserve(componentThreads.size());
|
||||
|
||||
@@ -58,40 +62,8 @@ void addAllPuppetLifetimeInvokersToGroup(
|
||||
}
|
||||
}
|
||||
|
||||
co::NonViralNonPostingInvoker genericAllPuppetThreadsLifetimeOpCReq(
|
||||
const std::vector<std::shared_ptr<PuppetThread>> &componentThreads,
|
||||
PuppetThread::ThreadOp threadOp,
|
||||
std::string_view emptyThreadsLogMessage,
|
||||
[[maybe_unused]] std::exception_ptr &exceptionPtr,
|
||||
[[maybe_unused]] std::function<void()> callback)
|
||||
{
|
||||
if (componentThreads.empty())
|
||||
{
|
||||
std::cout << emptyThreadsLogMessage << "\n";
|
||||
co_return;
|
||||
}
|
||||
|
||||
PuppetLifetimeGroup group;
|
||||
std::vector<PuppetLifetimeInvoker> invokers;
|
||||
|
||||
addAllPuppetLifetimeInvokersToGroup(
|
||||
group, invokers, componentThreads, threadOp);
|
||||
|
||||
co_await group.getAwaitAllSettlementsInvoker();
|
||||
group.checkForAndReThrowGroupExceptions();
|
||||
|
||||
co_return;
|
||||
}
|
||||
|
||||
} // namespace puppet_application_detail
|
||||
|
||||
PuppetApplication::PuppetApplication(
|
||||
const std::vector<std::shared_ptr<PuppetThread>> &threads)
|
||||
: componentThreads(threads)
|
||||
{
|
||||
}
|
||||
|
||||
co::NonViralNonPostingInvoker PuppetApplication::joltAllPuppetThreadsCReq(
|
||||
co::ViralNonPostingInvoker<void>
|
||||
PuppetApplication::joltAllPuppetThreadsCReq(
|
||||
[[maybe_unused]] std::exception_ptr &exceptionPtr,
|
||||
[[maybe_unused]] std::function<void()> callback)
|
||||
{
|
||||
@@ -108,12 +80,11 @@ co::NonViralNonPostingInvoker PuppetApplication::joltAllPuppetThreadsCReq(
|
||||
co_return;
|
||||
}
|
||||
|
||||
puppet_application_detail::PuppetLifetimeGroup group;
|
||||
std::vector<puppet_application_detail::PuppetLifetimeInvoker> invokers;
|
||||
|
||||
puppet_application_detail::addAllPuppetLifetimeInvokersToGroup(
|
||||
group, invokers, componentThreads, PuppetThread::ThreadOp::JOLT);
|
||||
PuppetLifetimeMgmtGroup group;
|
||||
std::vector<PuppetLifetimeMgmtInvoker> invokers;
|
||||
|
||||
addAllPuppetLifetimeInvokersToGroup(
|
||||
group, invokers, PuppetThread::ThreadOp::JOLT);
|
||||
co_await group.getAwaitAllSettlementsInvoker();
|
||||
group.checkForAndReThrowGroupExceptions();
|
||||
|
||||
@@ -121,53 +92,75 @@ co::NonViralNonPostingInvoker PuppetApplication::joltAllPuppetThreadsCReq(
|
||||
co_return;
|
||||
}
|
||||
|
||||
co::NonViralNonPostingInvoker PuppetApplication::startAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr, std::function<void()> callback)
|
||||
{
|
||||
return puppet_application_detail::genericAllPuppetThreadsLifetimeOpCReq(
|
||||
componentThreads, PuppetThread::ThreadOp::START,
|
||||
puppet_application_detail::noPuppetThreadsToStartLogMessage,
|
||||
exceptionPtr, callback);
|
||||
}
|
||||
|
||||
co::NonViralNonPostingInvoker PuppetApplication::pauseAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr, std::function<void()> callback)
|
||||
{
|
||||
return puppet_application_detail::genericAllPuppetThreadsLifetimeOpCReq(
|
||||
componentThreads, PuppetThread::ThreadOp::PAUSE,
|
||||
puppet_application_detail::noPuppetThreadsToPauseLogMessage,
|
||||
exceptionPtr, callback);
|
||||
}
|
||||
|
||||
co::NonViralNonPostingInvoker PuppetApplication::resumeAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr, std::function<void()> callback)
|
||||
{
|
||||
return puppet_application_detail::genericAllPuppetThreadsLifetimeOpCReq(
|
||||
componentThreads, PuppetThread::ThreadOp::RESUME,
|
||||
puppet_application_detail::noPuppetThreadsToResumeLogMessage,
|
||||
exceptionPtr, callback);
|
||||
}
|
||||
|
||||
co::NonViralNonPostingInvoker PuppetApplication::exitAllPuppetThreadsCReq(
|
||||
co::ViralNonPostingInvoker<void>
|
||||
PuppetApplication::allPuppetThreadsLifetimeOpCReq(
|
||||
[[maybe_unused]] std::exception_ptr &exceptionPtr,
|
||||
[[maybe_unused]] std::function<void()> callback)
|
||||
[[maybe_unused]] std::function<void()> callback,
|
||||
PuppetThread::ThreadOp threadOp,
|
||||
std::string_view emptyThreadsLogMessage)
|
||||
{
|
||||
if (componentThreads.empty())
|
||||
{
|
||||
std::cout << puppet_application_detail::noPuppetThreadsToExitLogMessage
|
||||
<< "\n";
|
||||
std::cout << emptyThreadsLogMessage << "\n";
|
||||
co_return;
|
||||
}
|
||||
|
||||
puppet_application_detail::PuppetLifetimeGroup group;
|
||||
std::vector<puppet_application_detail::PuppetLifetimeInvoker> invokers;
|
||||
|
||||
puppet_application_detail::addAllPuppetLifetimeInvokersToGroup(
|
||||
group, invokers, componentThreads, PuppetThread::ThreadOp::EXIT);
|
||||
PuppetLifetimeMgmtGroup group;
|
||||
std::vector<PuppetLifetimeMgmtInvoker> invokers;
|
||||
|
||||
addAllPuppetLifetimeInvokersToGroup(group, invokers, threadOp);
|
||||
co_await group.getAwaitAllSettlementsInvoker();
|
||||
group.checkForAndReThrowGroupExceptions();
|
||||
|
||||
co_return;
|
||||
}
|
||||
|
||||
co::ViralNonPostingInvoker<void>
|
||||
PuppetApplication::startAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr, std::function<void()> callback)
|
||||
{
|
||||
return allPuppetThreadsLifetimeOpCReq(
|
||||
exceptionPtr, std::move(callback),
|
||||
PuppetThread::ThreadOp::START,
|
||||
noPuppetThreadsToStartLogMessage);
|
||||
}
|
||||
|
||||
co::ViralNonPostingInvoker<void>
|
||||
PuppetApplication::pauseAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr, std::function<void()> callback)
|
||||
{
|
||||
return allPuppetThreadsLifetimeOpCReq(
|
||||
exceptionPtr, std::move(callback),
|
||||
PuppetThread::ThreadOp::PAUSE,
|
||||
noPuppetThreadsToPauseLogMessage);
|
||||
}
|
||||
|
||||
co::ViralNonPostingInvoker<void>
|
||||
PuppetApplication::resumeAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr, std::function<void()> callback)
|
||||
{
|
||||
return allPuppetThreadsLifetimeOpCReq(
|
||||
exceptionPtr, std::move(callback),
|
||||
PuppetThread::ThreadOp::RESUME,
|
||||
noPuppetThreadsToResumeLogMessage);
|
||||
}
|
||||
|
||||
co::ViralNonPostingInvoker<void>
|
||||
PuppetApplication::exitAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr,
|
||||
std::function<void()> callback)
|
||||
{
|
||||
if (componentThreads.empty())
|
||||
{
|
||||
std::cout << noPuppetThreadsToExitLogMessage << "\n";
|
||||
co_return;
|
||||
}
|
||||
|
||||
co_await allPuppetThreadsLifetimeOpCReq(
|
||||
exceptionPtr, std::move(callback),
|
||||
PuppetThread::ThreadOp::EXIT,
|
||||
noPuppetThreadsToExitLogMessage);
|
||||
|
||||
for (auto &thread : componentThreads) {
|
||||
thread->thread.join();
|
||||
}
|
||||
@@ -179,7 +172,6 @@ void PuppetApplication::distributeAndPinThreadsAcrossCpus()
|
||||
{
|
||||
int cpuCount = ComponentThread::getAvailableCpuCount();
|
||||
|
||||
// Distribute and pin threads across CPUs
|
||||
int threadIndex = 0;
|
||||
for (auto& thread : componentThreads)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user