Compare commits

...

2 Commits

9 changed files with 490 additions and 161 deletions
+66 -33
View File
@@ -1,6 +1,7 @@
#ifndef GROUP_H #ifndef GROUP_H
#define GROUP_H #define GROUP_H
#include <any>
#include <cassert> #include <cassert>
#include <coroutine> #include <coroutine>
#include <cstddef> #include <cstddef>
@@ -60,6 +61,19 @@ concept AwaitableIface = requires(T &t) {
{ get_operator_co_await(t) }; { get_operator_co_await(t) };
} && AwaiterIface<decltype(get_operator_co_await(std::declval<T &>()))>; } && AwaiterIface<decltype(get_operator_co_await(std::declval<T &>()))>;
template<AwaiterIface T>
T &asAwaiter(T &t) noexcept
{
return t;
}
template<AwaitableIface T>
auto asAwaiter(T &t) noexcept(noexcept(get_operator_co_await(t)))
-> decltype(get_operator_co_await(t))
{
return get_operator_co_await(t);
}
} // namespace detail } // namespace detail
template <typename T> template <typename T>
@@ -71,8 +85,27 @@ concept AwaiterIface = detail::AwaiterIface<T>;
template <typename T> template <typename T>
concept AwaitableOrAwaiterIface = AwaiterIface<T> || AwaitableIface<T>; concept AwaitableOrAwaiterIface = AwaiterIface<T> || AwaitableIface<T>;
template <typename Invoker> /** Typical usage — parallel members, then gather:
requires AwaitableOrAwaiterIface<Invoker> *
* co::Group group;
*
* auto bodyInit = body.initializeCReq(exceptionPtr, noopCallback);
* auto legInit = leg.initializeCReq(exceptionPtr, noopCallback);
* ViralNonPostingInvoker<void> batch = app.joltAllPuppetThreadsCReq(...);
*
* group.add(bodyInit);
* group.add(legInit);
* group.add(batch);
*
* co_await group.getAwaitAllSettlementsInvoker();
* group.checkForAndReThrowGroupExceptions();
*
* (void)bodyInit.completedReturnValues();
*
* // When walking settlement slots by index:
* settlements[i].invokerAs<BodyViralPostingInvoker<void>>()
* .completedReturnValues();
*/
struct Group struct Group
{ {
enum class AwaitingCondition { enum class AwaitingCondition {
@@ -91,9 +124,23 @@ struct Group
UNSETTLED, COMPLETED, EXCEPTION_THROWN UNSETTLED, COMPLETED, EXCEPTION_THROWN
}; };
SettlementDescriptor(Invoker &_invoker) template<typename Member>
: invoker(std::ref(_invoker)) void bindMemberRef(Member &member)
{} {
memberInvokerRef = std::ref(member);
}
template<typename Member>
Member &invokerAs() const
{
try {
return std::any_cast<std::reference_wrapper<Member>>(
memberInvokerRef).get();
} catch (const std::bad_any_cast &) {
throw std::runtime_error(
"Group settlement invoker type mismatch");
}
}
void setSettlementStatus() noexcept void setSettlementStatus() noexcept
{ {
@@ -109,7 +156,7 @@ struct Group
TypeE type = TypeE::UNSETTLED; TypeE type = TypeE::UNSETTLED;
std::exception_ptr calleeException = nullptr; std::exception_ptr calleeException = nullptr;
std::exception_ptr adapterException = nullptr; std::exception_ptr adapterException = nullptr;
std::reference_wrapper<Invoker> invoker; std::any memberInvokerRef;
}; };
struct SettlementAwaitingInvoker; struct SettlementAwaitingInvoker;
@@ -439,7 +486,7 @@ struct Group
* would be impossible. * would be impossible.
* *
* So we should be able to call resume() directly here without * So we should be able to call resume() directly here without
* post()ing to current_io_context(). * post()ing to ComponentThread::getSelf()->getIoService().
* *
* EXPLANATION: * EXPLANATION:
* However, in order to ensure that we keep this adapter coro * However, in order to ensure that we keep this adapter coro
@@ -466,28 +513,17 @@ struct Group
* target async fn, and also to convey its results back to the Group class. * target async fn, and also to convey its results back to the Group class.
* It's effectively a go-between coro that provides the outcomes that Invokers * It's effectively a go-between coro that provides the outcomes that Invokers
* normally provide, without needing, itself, to be co_awaited. * normally provide, without needing, itself, to be co_awaited.
*
* settlementIndex is captured by value (not a vector iterator) so adapter
* coros remain valid if settlements reallocate during concurrent add().
*/ */
NonAwaitableNonPostingAdapterCoro nonAwaitableAdapterCoro( template<AwaitableOrAwaiterIface Member>
NonAwaitableNonPostingAdapterCoro memberAdapterCoro(
Member &memberInvoker,
std::size_t settlementIndex) noexcept std::size_t settlementIndex) noexcept
{ {
/** EXPLANATION:
* It's very convenient that our design for the NonViralPostingInvoker
* coincidentally allows us to supply a lambda that can be used to test
* for the settlement conditions that are being waited on by the Group's
* co_awaiter.
*
* settlementIndex is captured by value (not a vector iterator) so adapter
* coros remain valid if settlements reallocate during concurrent add().
*/
try { try {
/* Return values remain in the callee promise until the caller-owned co_await detail::asAwaiter(memberInvoker);
* invoker is destroyed (~PostingInvoker). The group co_awaiter reads
* results via settlements[settlementIndex].invoker after awaiting.
*
* Index settlements[] each time; do not cache a reference across
* co_await because concurrent add() may reallocate the vector.
*/
co_await s.rsrc.settlements[settlementIndex].invoker.get();
} }
catch (...) catch (...)
{ {
@@ -505,12 +541,8 @@ struct Group
co_return; co_return;
} }
/** EXPLANATION: template<AwaitableOrAwaiterIface Member>
* Each invoker passed to add() must outlive this Group and the callee frame void add(Member &memberInvoker)
* (see ~PostingInvoker). The group co_awaiter reads return values from those
* invokers after awaiting; do not destroy an invoker until reads are done.
*/
void add(Invoker &invoker)
{ {
std::size_t settlementIndex = 0; std::size_t settlementIndex = 0;
@@ -525,10 +557,11 @@ struct Group
} }
settlementIndex = s.rsrc.settlements.size(); settlementIndex = s.rsrc.settlements.size();
s.rsrc.settlements.emplace_back(invoker); s.rsrc.settlements.emplace_back();
s.rsrc.settlements[settlementIndex].bindMemberRef(memberInvoker);
} }
nonAwaitableAdapterCoro(settlementIndex); memberAdapterCoro(memberInvoker, settlementIndex);
} }
void checkForAndReThrowGroupExceptions() const void checkForAndReThrowGroupExceptions() const
+72 -6
View File
@@ -47,7 +47,10 @@ struct NonViralPostingInvoker
*/ */
std::ostringstream oss; std::ostringstream oss;
oss << std::this_thread::get_id() oss << std::this_thread::get_id()
<< ": Missing completion lambda: non-viral coroutines require a completion lambda."; << ": Missing completion lambda: non-viral coroutines require a completion lambda."
<< " Promise type=" << typeid(*this).name()
<< ". This usually means promise construction did not bind the"
<< " (exception_ptr&, function<void()>, ...) constructor.";
throw std::runtime_error(oss.str()); throw std::runtime_error(oss.str());
} }
@@ -153,12 +156,12 @@ struct ViralPostingInvoker
* from get_return_object(). ~NonPostingInvoker destroys the callee frame. * from get_return_object(). ~NonPostingInvoker destroys the callee frame.
*/ */
struct NonViralNonPostingInvoker struct NonViralNonPostingInvoker
: public NonPostingInvoker<NonPostingPromise> : public NonPostingInvoker<NonPostingPromise<void>, void>
{ {
struct promise_type struct promise_type
: public NonPostingPromise : public NonPostingPromise<void>
{ {
using NonPostingPromise::NonPostingPromise; using NonPostingPromise<void>::NonPostingPromise;
NonViralNonPostingInvoker get_return_object() NonViralNonPostingInvoker get_return_object()
{ {
@@ -169,7 +172,10 @@ struct NonViralNonPostingInvoker
{ {
std::ostringstream oss; std::ostringstream oss;
oss << std::this_thread::get_id() oss << std::this_thread::get_id()
<< ": Missing completion lambda: non-viral coroutines require a completion lambda."; << ": Missing completion lambda: non-viral coroutines require a completion lambda."
<< " Promise type=" << typeid(*this).name()
<< ". This usually means promise construction did not bind the"
<< " (exception_ptr&, function<void()>, ...) constructor.";
throw std::runtime_error(oss.str()); throw std::runtime_error(oss.str());
} }
@@ -180,7 +186,7 @@ struct NonViralNonPostingInvoker
} }
}; };
using NonPostingInvoker<NonPostingPromise>::NonPostingInvoker; using NonPostingInvoker<NonPostingPromise<void>, void>::NonPostingInvoker;
bool await_ready() const noexcept bool await_ready() const noexcept
{ std::terminate(); } { std::terminate(); }
@@ -192,6 +198,66 @@ struct NonViralNonPostingInvoker
{ std::terminate(); } { std::terminate(); }
}; };
/** Viral awaitable non-posting coroutine: runs eagerly on the caller thread
* (initial_suspend is never). Caller resume uses symmetric transfer when the
* caller has registered before callee completion; otherwise PostBackStatus
* fast-paths await_resume on co_await.
*/
template <typename T = void>
struct ViralNonPostingInvoker
: public NonPostingInvoker<NonPostingPromise<T>, T>
{
struct promise_type
: public NonPostingPromise<T>
{
using NonPostingPromise<T>::NonPostingPromise;
ViralNonPostingInvoker<T> get_return_object() noexcept
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id()
<< " Returning ViralNonPostingInvoker.\n";
#endif
this->setSelfSchedHandle(
std::coroutine_handle<promise_type>::from_promise(*this));
return ViralNonPostingInvoker<T>(*this);
}
};
using NonPostingInvoker<NonPostingPromise<T>, T>::NonPostingInvoker;
bool await_ready() const noexcept
{ return false; }
template <typename CallerPromise>
bool await_suspend(
std::coroutine_handle<CallerPromise> callerSchedHandle) noexcept
{
static_assert(
std::is_base_of_v<PromiseChainLink, CallerPromise>,
"ViralNonPostingInvoker caller promise must derive from "
"PromiseChainLink");
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id()
<< " Setting callerSchedHandle.\n";
#endif
const bool suspendCaller =
this->setCallerSchedHandle(callerSchedHandle);
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id()
<< " CallerFlowExecutor returned suspend=" << suspendCaller
<< ".\n";
#endif
return suspendCaller;
}
auto await_resume()
{
return NonPostingInvoker<NonPostingPromise<T>, T>::await_resume();
}
};
} // namespace sscl::co } // namespace sscl::co
#endif // INVOKERS_H #endif // INVOKERS_H
+45 -4
View File
@@ -5,13 +5,14 @@
#include <coroutine> #include <coroutine>
#include <iostream> #include <iostream>
#include <thread> #include <thread>
#include <type_traits>
#include <utility> #include <utility>
#include <spinscale/co/nonPostingPromise.h> #include <spinscale/co/nonPostingPromise.h>
namespace sscl::co { namespace sscl::co {
template <typename PromiseType> template <typename PromiseType, typename T>
class NonPostingInvoker class NonPostingInvoker
{ {
public: public:
@@ -39,15 +40,55 @@ public:
} }
} }
ReturnValues<void> &completedReturnValues() noexcept template <typename CallerPromise>
bool setCallerSchedHandle(
std::coroutine_handle<CallerPromise> callerSchedHandle) noexcept
{
static_assert(
std::is_base_of_v<PromiseChainLink, CallerPromise>,
"NonPostingInvoker caller promise must derive from PromiseChainLink");
calleePromise.callerSchedHandle = callerSchedHandle;
calleePromise.setCallerPromiseChainLink(
&callerSchedHandle.promise());
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id()
<< " Done setting callerSchedHandle; running CallerFlowExecutor.\n";
#endif
return calleePromise.postBackStatus.getCallerFlowExecutor()();
}
ReturnValues<T> &completedReturnValues() noexcept
{ return calleePromise.returnValues; } { return calleePromise.returnValues; }
const ReturnValues<void> &completedReturnValues() const noexcept const ReturnValues<T> &completedReturnValues() const noexcept
{ return calleePromise.returnValues; } { return calleePromise.returnValues; }
private: auto await_resume()
{
calleePromise.postBackStatus.reset();
ReturnValues<T> &returnValues = calleePromise.returnValues;
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id()
<< " About to check for and rethrow any exception.\n";
#endif
if (returnValues.myExceptionPtr) {
std::exception_ptr const captured = returnValues.myExceptionPtr;
std::rethrow_exception(captured);
}
if constexpr (!std::is_void_v<T>)
{
T result = std::move(returnValues.myReturnValue);
return result;
}
}
protected:
PromiseType &calleePromise; PromiseType &calleePromise;
private:
/** Every live invoker owns destruction of its callee coroutine frame in /** Every live invoker owns destruction of its callee coroutine frame in
* ~NonPostingInvoker (via calleePromise.selfSchedHandle). * ~NonPostingInvoker (via calleePromise.selfSchedHandle).
* *
+139 -17
View File
@@ -9,15 +9,99 @@
#include <thread> #include <thread>
#include <utility> #include <utility>
#include <spinscale/spinLock.h>
#include <spinscale/co/coQutex.h> #include <spinscale/co/coQutex.h>
#include <spinscale/co/promiseChainLink.h> #include <spinscale/co/promiseChainLink.h>
#include <spinscale/co/promises.h> #include <spinscale/co/promises.h>
namespace sscl::co { namespace sscl::co {
template <typename T>
struct NonPostingPromise struct NonPostingPromise
: public PromiseChainLink : public PromiseChainLink,
public PostingPromiseReturnOps<NonPostingPromise<T>, T>
{ {
struct PostBackStatus
{
struct CalleeFlowExecutor;
struct CallerFlowExecutor;
friend struct CalleeFlowExecutor;
friend struct CallerFlowExecutor;
explicit PostBackStatus(NonPostingPromise &calleePromiseIn) noexcept
: calleePromise(calleePromiseIn)
{}
void reset() noexcept
{
sscl::SpinLock::Guard guard(lock);
callerHasSetCallerSchedHandle = false;
calleeIsReadyToPostBack = false;
}
struct FlowExecutor
{
explicit FlowExecutor(PostBackStatus &parentIn) noexcept
: parent(parentIn)
{}
PostBackStatus &parent;
};
struct CalleeFlowExecutor
: public FlowExecutor
{
explicit CalleeFlowExecutor(PostBackStatus &parentIn) noexcept
: FlowExecutor(parentIn)
{}
bool operator()() noexcept
{
sscl::SpinLock::Guard guard(this->parent.lock);
this->parent.calleeIsReadyToPostBack = true;
if (this->parent.callerHasSetCallerSchedHandle) {
return true;
}
return false;
}
};
struct CallerFlowExecutor
: public FlowExecutor
{
explicit CallerFlowExecutor(PostBackStatus &parentIn) noexcept
: FlowExecutor(parentIn)
{}
bool operator()() noexcept
{
sscl::SpinLock::Guard guard(this->parent.lock);
this->parent.callerHasSetCallerSchedHandle = true;
if (this->parent.calleeIsReadyToPostBack) {
return false;
}
return true;
}
};
CalleeFlowExecutor getCalleeFlowExecutor() noexcept
{
return CalleeFlowExecutor(*this);
}
CallerFlowExecutor getCallerFlowExecutor() noexcept
{
return CallerFlowExecutor(*this);
}
NonPostingPromise &calleePromise;
private:
sscl::SpinLock lock;
bool callerHasSetCallerSchedHandle = false;
bool calleeIsReadyToPostBack = false;
};
/** Completion work must run from this awaiter's await_suspend, not /** Completion work must run from this awaiter's await_suspend, not
* synchronously inside promise.final_suspend() before it returns: the * synchronously inside promise.final_suspend() before it returns: the
* hidden coroutine segment index in the coroutine state is only advanced * hidden coroutine segment index in the coroutine state is only advanced
@@ -26,16 +110,19 @@ struct NonPostingPromise
struct FinalSuspendNonPostingInvoker struct FinalSuspendNonPostingInvoker
: public std::suspend_always : public std::suspend_always
{ {
explicit FinalSuspendNonPostingInvoker(NonPostingPromise &calleePromiseIn) noexcept explicit FinalSuspendNonPostingInvoker(
: calleePromise(calleePromiseIn) NonPostingPromise &calleePromiseIn) noexcept
: calleePromise(calleePromiseIn)
{} {}
bool await_suspend(std::coroutine_handle<> const) noexcept std::coroutine_handle<> await_suspend(
std::coroutine_handle<> const) noexcept
{ {
if (calleePromise.callerLambda) if (calleePromise.callerLambda)
{ {
#ifdef CONFIG_LIBSSCL_DEBUG_CO #ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << "final_suspend" << ": " << std::this_thread::get_id() std::cout << "final_suspend" << ": "
<< std::this_thread::get_id()
<< " Non-viral non-posting: invoking callerLambda directly.\n"; << " Non-viral non-posting: invoking callerLambda directly.\n";
#endif #endif
if (calleePromise.returnValues.myExceptionPtr) { if (calleePromise.returnValues.myExceptionPtr) {
@@ -44,15 +131,29 @@ struct NonPostingPromise
} }
calleePromise.callerLambda(); calleePromise.callerLambda();
return std::noop_coroutine();
} }
return true;
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << "final_suspend" << ": " << std::this_thread::get_id()
<< " Viral non-posting: running CalleeFlowExecutor.\n";
#endif
const bool symmetricTransferToCaller =
calleePromise.postBackStatus.getCalleeFlowExecutor()();
if (symmetricTransferToCaller && calleePromise.callerSchedHandle) {
return calleePromise.callerSchedHandle;
}
return std::noop_coroutine();
} }
NonPostingPromise &calleePromise; NonPostingPromise &calleePromise;
}; };
NonPostingPromise() noexcept NonPostingPromise() noexcept
: returnValues() : returnValues(),
postBackStatus(*this)
{} {}
template <typename... TailArgs> template <typename... TailArgs>
@@ -61,13 +162,27 @@ struct NonPostingPromise
std::function<void()> callerLambdaIn, std::function<void()> callerLambdaIn,
TailArgs &&...) noexcept TailArgs &&...) noexcept
: returnValues(callerExceptionPtr), : returnValues(callerExceptionPtr),
callerLambda(std::move(callerLambdaIn)) callerLambda(std::move(callerLambdaIn)),
postBackStatus(*this)
{}
template <typename ObjectArg, typename... TailArgs>
requires (!std::same_as<std::remove_cvref_t<ObjectArg>, std::exception_ptr>)
NonPostingPromise(
ObjectArg &&,
std::exception_ptr &callerExceptionPtr,
std::function<void()> callerLambdaIn,
TailArgs &&...) noexcept
: NonPostingPromise(
callerExceptionPtr,
std::move(callerLambdaIn))
{} {}
~NonPostingPromise() noexcept ~NonPostingPromise() noexcept
{ {
#ifdef CONFIG_LIBSSCL_DEBUG_CO #ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id() << " Destructing.\n"; std::cout << __func__ << ": " << std::this_thread::get_id()
<< " Destructing.\n";
#endif #endif
} }
@@ -77,9 +192,6 @@ struct NonPostingPromise
auto final_suspend() noexcept auto final_suspend() noexcept
{ return FinalSuspendNonPostingInvoker(*this); } { return FinalSuspendNonPostingInvoker(*this); }
void return_void() noexcept
{ return; }
void unhandled_exception() noexcept void unhandled_exception() noexcept
{ {
returnValues.myExceptionPtr = std::current_exception(); returnValues.myExceptionPtr = std::current_exception();
@@ -90,16 +202,26 @@ struct NonPostingPromise
eraseFirstMatchingAcquiredLock(coQutex); eraseFirstMatchingAcquiredLock(coQutex);
} }
const PromiseChainLink *callerPromiseChainLink() const noexcept override
{ return callerChainLink; }
PromiseChainLink *callerPromiseChainLink() noexcept override
{ return callerChainLink; }
void setSelfSchedHandle(std::coroutine_handle<> schedHandle) noexcept void setSelfSchedHandle(std::coroutine_handle<> schedHandle) noexcept
{ { selfSchedHandle = schedHandle; }
selfSchedHandle = schedHandle;
}
ReturnValues<void> returnValues; void setCallerPromiseChainLink(PromiseChainLink *chainLink) noexcept
{ callerChainLink = chainLink; }
ReturnValues<T> returnValues;
std::function<void()> callerLambda; std::function<void()> callerLambda;
PostBackStatus postBackStatus;
std::coroutine_handle<> selfSchedHandle; std::coroutine_handle<> selfSchedHandle;
std::coroutine_handle<> callerSchedHandle;
PromiseChainLink *callerChainLink = nullptr;
template <typename> template <typename, typename>
friend class NonPostingInvoker; friend class NonPostingInvoker;
}; };
+30
View File
@@ -6,6 +6,7 @@
#include <exception> #include <exception>
#include <functional> #include <functional>
#include <iostream> #include <iostream>
#include <typeinfo>
#include <thread> #include <thread>
#include <type_traits> #include <type_traits>
#include <utility> #include <utility>
@@ -267,6 +268,22 @@ struct PostingPromise
postBackStatus(*this) postBackStatus(*this)
{} {}
/** Member coroutines pass the implicit object parameter before explicit
* (exceptionPtr, callback, ...) args. Discard the object and delegate to
* the free-function constructor shape.
*/
template <typename ObjectArg, typename... TailArgs>
requires (!std::same_as<std::remove_cvref_t<ObjectArg>, std::exception_ptr>)
PostingPromise(
ObjectArg &&,
std::exception_ptr &_callerExceptionPtr,
std::function<void()> _callerLambda,
TailArgs &&...) noexcept
: PostingPromise(
_callerExceptionPtr,
std::move(_callerLambda))
{}
~PostingPromise() noexcept ~PostingPromise() noexcept
{ {
#ifdef CONFIG_LIBSSCL_DEBUG_CO #ifdef CONFIG_LIBSSCL_DEBUG_CO
@@ -345,6 +362,19 @@ struct TaggedPostingPromise
std::forward<TailArgs>(tailArgs)...) std::forward<TailArgs>(tailArgs)...)
{} {}
template <typename ObjectArg, typename... TailArgs>
requires (!std::same_as<std::remove_cvref_t<ObjectArg>, std::exception_ptr>)
TaggedPostingPromise(
ObjectArg &&,
std::exception_ptr &_exceptionPtr,
std::function<void()> _callerLambda,
TailArgs &&... tailArgs) noexcept
: PostingPromise<T>(
_exceptionPtr,
std::move(_callerLambda),
std::forward<TailArgs>(tailArgs)...)
{}
auto initial_suspend() noexcept auto initial_suspend() noexcept
{ {
#ifdef CONFIG_LIBSSCL_DEBUG_CO #ifdef CONFIG_LIBSSCL_DEBUG_CO
+37 -11
View File
@@ -5,7 +5,6 @@
#include <atomic> #include <atomic>
#include <thread> #include <thread>
#include <unordered_map> #include <unordered_map>
#include <boost/asio/io_service.hpp>
#include <stdexcept> #include <stdexcept>
#include <queue> #include <queue>
#include <functional> #include <functional>
@@ -14,9 +13,11 @@
#include <unistd.h> #include <unistd.h>
#include <memory> #include <memory>
#include <coroutine> #include <coroutine>
#include <spinscale/cps/callback.h>
#include <cstdint> #include <cstdint>
#include <string> #include <string>
#include <boost/asio/io_service.hpp>
#include <boost/asio/post.hpp>
#include <spinscale/cps/callback.h>
namespace sscl { namespace sscl {
@@ -165,21 +166,40 @@ public:
struct ViralThreadLifetimeMgmtInvoker struct ViralThreadLifetimeMgmtInvoker
{ {
struct AsyncState
{
std::atomic<bool> settled{false};
std::coroutine_handle<> callerSchedHandle;
};
ViralThreadLifetimeMgmtInvoker( ViralThreadLifetimeMgmtInvoker(
ThreadOp _threadOp, ThreadOp _threadOp,
PuppetThread &_parentThread, PuppetThread &_parentThread,
const std::shared_ptr<PuppetThread> &_selfPtr = nullptr) const std::shared_ptr<PuppetThread> &_selfPtr = nullptr)
: threadOp(_threadOp), : threadOp(_threadOp),
asyncState(std::make_shared<AsyncState>()),
parentThread(_parentThread), parentThread(_parentThread),
selfPtr(_selfPtr), selfPtr(_selfPtr),
lifetimeMgmtCallback{ lifetimeMgmtCallback{
nullptr, nullptr,
[this]() [asyncState = asyncState]()
{ {
settled = true; asyncState->settled.store(true, std::memory_order_release);
if (callerSchedHandle) {
callerSchedHandle.resume(); std::coroutine_handle<> handle =
asyncState->callerSchedHandle;
if (!handle) {
return;
} }
/** Post resume to the puppeteer queue: direct resume() from
* within an asio completion handler can destroy adapter
* coroutine state while the handler is still unwinding.
*/
boost::asio::post(
ComponentThread::getPptr()->getIoService(),
[handle]() { handle.resume(); });
}} }}
{ {
if (threadOp == ThreadOp::JOLT && selfPtr == nullptr) if (threadOp == ThreadOp::JOLT && selfPtr == nullptr)
@@ -212,26 +232,32 @@ public:
} }
} }
bool await_ready() const noexcept { return settled; } bool await_ready() const noexcept
{
return asyncState->settled.load(std::memory_order_acquire);
}
bool await_suspend( bool await_suspend(
std::coroutine_handle<> _callerSchedHandle) noexcept std::coroutine_handle<> _callerSchedHandle) noexcept
{ {
if (settled) { return false; } if (asyncState->settled.load(std::memory_order_acquire)) {
callerSchedHandle = _callerSchedHandle; return false;
}
asyncState->callerSchedHandle = _callerSchedHandle;
return true; return true;
} }
void await_resume() noexcept {} void await_resume() noexcept {}
ThreadOp threadOp; ThreadOp threadOp;
bool settled = false; std::shared_ptr<AsyncState> asyncState;
std::coroutine_handle<> callerSchedHandle;
PuppetThread &parentThread; PuppetThread &parentThread;
const std::shared_ptr<PuppetThread> selfPtr; const std::shared_ptr<PuppetThread> selfPtr;
cps::Callback<threadLifetimeMgmtOpCbFn> lifetimeMgmtCallback; cps::Callback<threadLifetimeMgmtOpCbFn> lifetimeMgmtCallback;
}; };
// Thread lifetime management request invokers
ViralThreadLifetimeMgmtInvoker startThreadAReq() ViralThreadLifetimeMgmtInvoker startThreadAReq()
{ return ViralThreadLifetimeMgmtInvoker(ThreadOp::START, *this); } { return ViralThreadLifetimeMgmtInvoker(ThreadOp::START, *this); }
ViralThreadLifetimeMgmtInvoker pauseThreadAReq() ViralThreadLifetimeMgmtInvoker pauseThreadAReq()
+24 -6
View File
@@ -5,7 +5,10 @@
#include <exception> #include <exception>
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <string_view>
#include <vector> #include <vector>
#include <spinscale/co/group.h>
#include <spinscale/co/invokers.h> #include <spinscale/co/invokers.h>
#include <spinscale/componentThread.h> #include <spinscale/componentThread.h>
@@ -19,22 +22,30 @@ public:
const std::vector<std::shared_ptr<PuppetThread>> &threads); const std::vector<std::shared_ptr<PuppetThread>> &threads);
~PuppetApplication() = default; ~PuppetApplication() = default;
co::NonViralNonPostingInvoker joltAllPuppetThreadsCReq( co::ViralNonPostingInvoker<void> joltAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback); std::exception_ptr &exceptionPtr, std::function<void()> callback);
co::NonViralNonPostingInvoker startAllPuppetThreadsCReq( co::ViralNonPostingInvoker<void> startAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback); std::exception_ptr &exceptionPtr, std::function<void()> callback);
co::NonViralNonPostingInvoker pauseAllPuppetThreadsCReq( co::ViralNonPostingInvoker<void> pauseAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback); std::exception_ptr &exceptionPtr, std::function<void()> callback);
co::NonViralNonPostingInvoker resumeAllPuppetThreadsCReq( co::ViralNonPostingInvoker<void> resumeAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback); std::exception_ptr &exceptionPtr, std::function<void()> callback);
co::NonViralNonPostingInvoker exitAllPuppetThreadsCReq( co::ViralNonPostingInvoker<void> exitAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback); std::exception_ptr &exceptionPtr, std::function<void()> callback);
// CPU distribution method // CPU distribution method
void distributeAndPinThreadsAcrossCpus(); void distributeAndPinThreadsAcrossCpus();
protected: protected:
// Collection of PuppetThread instances using PuppetLifetimeMgmtInvoker =
PuppetThread::ViralThreadLifetimeMgmtInvoker;
using PuppetLifetimeMgmtGroup = co::Group;
void addAllPuppetLifetimeInvokersToGroup(
PuppetLifetimeMgmtGroup &group,
std::vector<PuppetLifetimeMgmtInvoker> &invokers,
PuppetThread::ThreadOp threadOp) const;
std::vector<std::shared_ptr<PuppetThread>> componentThreads; std::vector<std::shared_ptr<PuppetThread>> componentThreads;
/** /**
@@ -57,6 +68,13 @@ protected:
* a synchronization point for the entire system initialization. * a synchronization point for the entire system initialization.
*/ */
bool threadsHaveBeenJolted = false; bool threadsHaveBeenJolted = false;
private:
co::ViralNonPostingInvoker<void> allPuppetThreadsLifetimeOpCReq(
std::exception_ptr &exceptionPtr,
std::function<void()> callback,
PuppetThread::ThreadOp threadOp,
std::string_view emptyThreadsLogMessage);
}; };
} // namespace sscl } // namespace sscl
+1
View File
@@ -8,6 +8,7 @@
#include <spinscale/cps/asynchronousContinuation.h> #include <spinscale/cps/asynchronousContinuation.h>
#include <spinscale/cps/callback.h> #include <spinscale/cps/callback.h>
#include <spinscale/cps/callableTracer.h> #include <spinscale/cps/callableTracer.h>
#include <spinscale/co/invokers.h>
#include <spinscale/component.h> #include <spinscale/component.h>
#include <spinscale/componentThread.h> #include <spinscale/componentThread.h>
+76 -84
View File
@@ -8,7 +8,7 @@
namespace sscl { namespace sscl {
namespace puppet_application_detail { namespace {
constexpr std::string_view noPuppetThreadsToStartLogMessage = constexpr std::string_view noPuppetThreadsToStartLogMessage =
"Mrntt: No puppet threads to start"; "Mrntt: No puppet threads to start";
@@ -19,14 +19,18 @@ constexpr std::string_view noPuppetThreadsToResumeLogMessage =
constexpr std::string_view noPuppetThreadsToExitLogMessage = constexpr std::string_view noPuppetThreadsToExitLogMessage =
"Mrntt: No puppet threads to exit"; "Mrntt: No puppet threads to exit";
using PuppetLifetimeInvoker = PuppetThread::ViralThreadLifetimeMgmtInvoker; } // namespace
using PuppetLifetimeGroup = co::Group<PuppetLifetimeInvoker>;
void addAllPuppetLifetimeInvokersToGroup( PuppetApplication::PuppetApplication(
PuppetLifetimeGroup &group, const std::vector<std::shared_ptr<PuppetThread>> &threads)
std::vector<PuppetLifetimeInvoker> &invokers, : componentThreads(threads)
const std::vector<std::shared_ptr<PuppetThread>> &componentThreads, {
PuppetThread::ThreadOp threadOp) }
void PuppetApplication::addAllPuppetLifetimeInvokersToGroup(
PuppetLifetimeMgmtGroup &group,
std::vector<PuppetLifetimeMgmtInvoker> &invokers,
PuppetThread::ThreadOp threadOp) const
{ {
invokers.reserve(componentThreads.size()); invokers.reserve(componentThreads.size());
@@ -58,40 +62,8 @@ void addAllPuppetLifetimeInvokersToGroup(
} }
} }
co::NonViralNonPostingInvoker genericAllPuppetThreadsLifetimeOpCReq( co::ViralNonPostingInvoker<void>
const std::vector<std::shared_ptr<PuppetThread>> &componentThreads, PuppetApplication::joltAllPuppetThreadsCReq(
PuppetThread::ThreadOp threadOp,
std::string_view emptyThreadsLogMessage,
[[maybe_unused]] std::exception_ptr &exceptionPtr,
[[maybe_unused]] std::function<void()> callback)
{
if (componentThreads.empty())
{
std::cout << emptyThreadsLogMessage << "\n";
co_return;
}
PuppetLifetimeGroup group;
std::vector<PuppetLifetimeInvoker> invokers;
addAllPuppetLifetimeInvokersToGroup(
group, invokers, componentThreads, threadOp);
co_await group.getAwaitAllSettlementsInvoker();
group.checkForAndReThrowGroupExceptions();
co_return;
}
} // namespace puppet_application_detail
PuppetApplication::PuppetApplication(
const std::vector<std::shared_ptr<PuppetThread>> &threads)
: componentThreads(threads)
{
}
co::NonViralNonPostingInvoker PuppetApplication::joltAllPuppetThreadsCReq(
[[maybe_unused]] std::exception_ptr &exceptionPtr, [[maybe_unused]] std::exception_ptr &exceptionPtr,
[[maybe_unused]] std::function<void()> callback) [[maybe_unused]] std::function<void()> callback)
{ {
@@ -108,12 +80,11 @@ co::NonViralNonPostingInvoker PuppetApplication::joltAllPuppetThreadsCReq(
co_return; co_return;
} }
puppet_application_detail::PuppetLifetimeGroup group; PuppetLifetimeMgmtGroup group;
std::vector<puppet_application_detail::PuppetLifetimeInvoker> invokers; std::vector<PuppetLifetimeMgmtInvoker> invokers;
puppet_application_detail::addAllPuppetLifetimeInvokersToGroup(
group, invokers, componentThreads, PuppetThread::ThreadOp::JOLT);
addAllPuppetLifetimeInvokersToGroup(
group, invokers, PuppetThread::ThreadOp::JOLT);
co_await group.getAwaitAllSettlementsInvoker(); co_await group.getAwaitAllSettlementsInvoker();
group.checkForAndReThrowGroupExceptions(); group.checkForAndReThrowGroupExceptions();
@@ -121,53 +92,75 @@ co::NonViralNonPostingInvoker PuppetApplication::joltAllPuppetThreadsCReq(
co_return; co_return;
} }
co::NonViralNonPostingInvoker PuppetApplication::startAllPuppetThreadsCReq( co::ViralNonPostingInvoker<void>
std::exception_ptr &exceptionPtr, std::function<void()> callback) PuppetApplication::allPuppetThreadsLifetimeOpCReq(
{
return puppet_application_detail::genericAllPuppetThreadsLifetimeOpCReq(
componentThreads, PuppetThread::ThreadOp::START,
puppet_application_detail::noPuppetThreadsToStartLogMessage,
exceptionPtr, callback);
}
co::NonViralNonPostingInvoker PuppetApplication::pauseAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback)
{
return puppet_application_detail::genericAllPuppetThreadsLifetimeOpCReq(
componentThreads, PuppetThread::ThreadOp::PAUSE,
puppet_application_detail::noPuppetThreadsToPauseLogMessage,
exceptionPtr, callback);
}
co::NonViralNonPostingInvoker PuppetApplication::resumeAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback)
{
return puppet_application_detail::genericAllPuppetThreadsLifetimeOpCReq(
componentThreads, PuppetThread::ThreadOp::RESUME,
puppet_application_detail::noPuppetThreadsToResumeLogMessage,
exceptionPtr, callback);
}
co::NonViralNonPostingInvoker PuppetApplication::exitAllPuppetThreadsCReq(
[[maybe_unused]] std::exception_ptr &exceptionPtr, [[maybe_unused]] std::exception_ptr &exceptionPtr,
[[maybe_unused]] std::function<void()> callback) [[maybe_unused]] std::function<void()> callback,
PuppetThread::ThreadOp threadOp,
std::string_view emptyThreadsLogMessage)
{ {
if (componentThreads.empty()) if (componentThreads.empty())
{ {
std::cout << puppet_application_detail::noPuppetThreadsToExitLogMessage std::cout << emptyThreadsLogMessage << "\n";
<< "\n";
co_return; co_return;
} }
puppet_application_detail::PuppetLifetimeGroup group; PuppetLifetimeMgmtGroup group;
std::vector<puppet_application_detail::PuppetLifetimeInvoker> invokers; std::vector<PuppetLifetimeMgmtInvoker> invokers;
puppet_application_detail::addAllPuppetLifetimeInvokersToGroup(
group, invokers, componentThreads, PuppetThread::ThreadOp::EXIT);
addAllPuppetLifetimeInvokersToGroup(group, invokers, threadOp);
co_await group.getAwaitAllSettlementsInvoker(); co_await group.getAwaitAllSettlementsInvoker();
group.checkForAndReThrowGroupExceptions(); group.checkForAndReThrowGroupExceptions();
co_return;
}
co::ViralNonPostingInvoker<void>
PuppetApplication::startAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback)
{
return allPuppetThreadsLifetimeOpCReq(
exceptionPtr, std::move(callback),
PuppetThread::ThreadOp::START,
noPuppetThreadsToStartLogMessage);
}
co::ViralNonPostingInvoker<void>
PuppetApplication::pauseAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback)
{
return allPuppetThreadsLifetimeOpCReq(
exceptionPtr, std::move(callback),
PuppetThread::ThreadOp::PAUSE,
noPuppetThreadsToPauseLogMessage);
}
co::ViralNonPostingInvoker<void>
PuppetApplication::resumeAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback)
{
return allPuppetThreadsLifetimeOpCReq(
exceptionPtr, std::move(callback),
PuppetThread::ThreadOp::RESUME,
noPuppetThreadsToResumeLogMessage);
}
co::ViralNonPostingInvoker<void>
PuppetApplication::exitAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr,
std::function<void()> callback)
{
if (componentThreads.empty())
{
std::cout << noPuppetThreadsToExitLogMessage << "\n";
co_return;
}
co_await allPuppetThreadsLifetimeOpCReq(
exceptionPtr, std::move(callback),
PuppetThread::ThreadOp::EXIT,
noPuppetThreadsToExitLogMessage);
for (auto &thread : componentThreads) { for (auto &thread : componentThreads) {
thread->thread.join(); thread->thread.join();
} }
@@ -179,7 +172,6 @@ void PuppetApplication::distributeAndPinThreadsAcrossCpus()
{ {
int cpuCount = ComponentThread::getAvailableCpuCount(); int cpuCount = ComponentThread::getAvailableCpuCount();
// Distribute and pin threads across CPUs
int threadIndex = 0; int threadIndex = 0;
for (auto& thread : componentThreads) for (auto& thread : componentThreads)
{ {