Add new Spinscale C++ Coroutine support

This commit is contained in:
2026-05-17 16:52:04 -04:00
parent b6eb502e56
commit ad4ea3ccac
11 changed files with 1790 additions and 0 deletions
+3
View File
@@ -8,4 +8,7 @@
/* Debug callable tracing configuration */
#cmakedefine CONFIG_DEBUG_TRACE_CALLABLES
/* Debug coroutine-type logging configuration */
#cmakedefine CONFIG_LIBSSCL_DEBUG_CO
#endif /* _CONFIG_H */
+214
View File
@@ -0,0 +1,214 @@
#ifndef CO_CONDITION_VARIABLE_H
#define CO_CONDITION_VARIABLE_H
#include <config.h>
#include <coroutine>
#include <deque>
#include <iostream>
#include <memory>
#include <thread>
#include <boost/asio/io_service.hpp>
#include <boost/asio/post.hpp>
#include <spinscale/componentThread.h>
#include <spinscale/spinLock.h>
namespace sscl::co {
/** Coroutine-friendly handoff: wait until `signal()` before running a completion
* step. Standalone primitive (posting promises use `PostBackStatus` instead).
*
* `clear()` only clears `isSignaled`; it does not wake or drain waiters. If
* `clear()` runs while coroutines are still waiting, they stay queued until a
* later `signal()` posts them.
*/
class CoConditionVariable
{
public:
/** Waiter queued under the CV spin lock; `signal()` drains and calls `post()`. */
class WaitingCoroutineBase
{
public:
explicit WaitingCoroutineBase(
boost::asio::io_service &callerIoContextIn) noexcept
: callerIoContext(callerIoContextIn)
{}
virtual ~WaitingCoroutineBase() = default;
virtual void post() noexcept = 0;
public:
boost::asio::io_service &callerIoContext;
};
template <typename Promise>
class TypedWaitingCoroutine
: public WaitingCoroutineBase
{
public:
TypedWaitingCoroutine(
boost::asio::io_service &callerIoContextIn,
std::coroutine_handle<Promise> callerSchedHandleIn) noexcept
: WaitingCoroutineBase(callerIoContextIn),
callerSchedHandle(callerSchedHandleIn)
{}
void post() noexcept override
{
boost::asio::post(callerIoContext, callerSchedHandle);
}
public:
std::coroutine_handle<Promise> callerSchedHandle;
};
struct OperationInvoker
{
explicit OperationInvoker(CoConditionVariable &parentCvIn) noexcept
: parentCv(parentCvIn)
{}
CoConditionVariable &parentCv;
};
struct WaitForInvoker
: public OperationInvoker
{
using OperationInvoker::OperationInvoker;
bool await_ready() const noexcept { return false; }
template <typename Promise>
bool await_suspend(std::coroutine_handle<Promise> cvCallerSchedHandle) noexcept
{
boost::asio::io_service &cvCallerIoContext =
sscl::ComponentThread::getSelf()->getIoService();
sscl::SpinLock::Guard guard(parentCv.spinLock);
if (parentCv.isSignaled) {
return false;
}
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id()
<< " CV not signaled: Enqueuing waiter coroutine.\n";
#endif
parentCv.enqueueWaitingCoroutine(
cvCallerSchedHandle, cvCallerIoContext);
return true;
}
void await_resume() const noexcept {}
};
/** Manual await-style API only (lowerCamelCase); not a coroutine awaiter.
* `FinalSuspendPostingInvoker` calls `awaitSuspend` explicitly.
*/
struct DecisionEnablingDerivableWaitForInvoker
: public OperationInvoker
{
struct DecisionFactors
{
sscl::SpinLock &cvInternalSpinLock;
bool wasAlreadySignaled;
DecisionFactors(sscl::SpinLock &cvLockIn, bool signaledIn) noexcept
: cvInternalSpinLock(cvLockIn),
wasAlreadySignaled(signaledIn)
{}
};
using OperationInvoker::OperationInvoker;
void operator co_await() const = delete;
bool awaitReady() const noexcept { return false; }
template <typename Promise>
DecisionFactors awaitSuspend(std::coroutine_handle<Promise> cvCallerSchedHandle) noexcept
{
boost::asio::io_service &cvCallerIoContext =
sscl::ComponentThread::getSelf()->getIoService();
parentCv.spinLock.acquire();
if (parentCv.isSignaled)
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id()
<< " CV already signaled: returning already-signaled DecisionFactors.\n";
#endif
return DecisionFactors(parentCv.spinLock, true);
}
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id()
<< " CV not signaled: returning not-signaled DecisionFactors.\n";
#endif
parentCv.enqueueWaitingCoroutine(
cvCallerSchedHandle, cvCallerIoContext);
return DecisionFactors(parentCv.spinLock, false);
}
void awaitResume() const noexcept {}
};
CoConditionVariable() noexcept = default;
CoConditionVariable(const CoConditionVariable &) = delete;
CoConditionVariable &operator=(const CoConditionVariable &) = delete;
CoConditionVariable(CoConditionVariable &&) noexcept = delete;
CoConditionVariable &operator=(CoConditionVariable &&) noexcept = delete;
~CoConditionVariable() noexcept = default;
WaitForInvoker getWaitForInvoker() noexcept
{ return WaitForInvoker(*this); }
DecisionEnablingDerivableWaitForInvoker
getDecisionEnablingDerivableWaitForInvoker() noexcept
{
return DecisionEnablingDerivableWaitForInvoker(*this);
}
void signal() noexcept
{
std::deque<std::unique_ptr<WaitingCoroutineBase>> drained;
{
sscl::SpinLock::Guard guard(spinLock);
isSignaled = true;
drained.swap(waitingCoroutines);
}
for (std::unique_ptr<WaitingCoroutineBase> &waiter : drained) {
waiter->post();
}
}
/** Only clears the signaled flag; waiters (if any) remain in the deque. */
void clear() noexcept
{
sscl::SpinLock::Guard guard(spinLock);
isSignaled = false;
}
template <typename Promise>
void enqueueWaitingCoroutine(
std::coroutine_handle<Promise> handle,
boost::asio::io_service &ctx) noexcept
{
waitingCoroutines.push_back(
std::make_unique<TypedWaitingCoroutine<Promise>>(ctx, handle));
}
private:
sscl::SpinLock spinLock;
bool isSignaled = false;
std::deque<std::unique_ptr<WaitingCoroutineBase>> waitingCoroutines;
};
} // namespace sscl::co
#endif // CO_CONDITION_VARIABLE_H
+198
View File
@@ -0,0 +1,198 @@
#ifndef CO_QUTEX_H
#define CO_QUTEX_H
#include <config.h>
#include <cassert>
#include <coroutine>
#include <deque>
#include <stdexcept>
#include <type_traits>
#ifdef CONFIG_LIBSSCL_DEBUG_CO
#include <iostream>
#include <thread>
#endif
#include <boost/asio/io_service.hpp>
#include <boost/asio/post.hpp>
#include <spinscale/componentThread.h>
#include <spinscale/co/promiseChainWalker.h>
#include <spinscale/spinLock.h>
namespace sscl::co {
class CoQutex
{
public:
class ReleaseHandle;
CoQutex() noexcept = default;
CoQutex(const CoQutex &) = delete;
CoQutex(CoQutex &&) noexcept = delete;
CoQutex &operator=(const CoQutex &) = delete;
CoQutex &operator=(CoQutex &&) noexcept = delete;
~CoQutex() = default;
struct AcquireInvocationAndSuspensionPolicy
{
AcquireInvocationAndSuspensionPolicy(CoQutex &_coQutex) noexcept
: coQutex(_coQutex)
{}
~AcquireInvocationAndSuspensionPolicy() noexcept = default;
struct WaitingCoroutine
{
WaitingCoroutine(
std::coroutine_handle<void> _callerSchedHandle,
boost::asio::io_service &_callerIoContext,
PromiseChainLink &_waitingPromise) noexcept
: callerSchedHandle(_callerSchedHandle),
callerIoContext(_callerIoContext),
waitingPromise(_waitingPromise)
{}
std::coroutine_handle<void> callerSchedHandle;
boost::asio::io_service &callerIoContext;
PromiseChainLink &waitingPromise;
};
bool await_ready() noexcept { return false; }
template <typename Promise>
bool await_suspend(std::coroutine_handle<Promise> callerSchedHandle)
{
static_assert(
std::is_base_of_v<PromiseChainLink, Promise>,
"CoQutex acquire requires a promise type derived from PromiseChainLink");
acquirerChainLink = &callerSchedHandle.promise();
walkCallerPromiseChainFrom(
static_cast<const PromiseChainLink &>(callerSchedHandle.promise()),
[this](const PromiseChainLink &link)
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id() << " Walking caller promise chain.\n";
#endif
if (link.holdsAcquiredLock(coQutex)) {
throw std::runtime_error("Deadlock detected: CoQutex re-acquire on caller promise chain.");
}
});
sscl::SpinLock::Guard guard(coQutex.spinLock);
if (!coQutex.isOwned) {
coQutex.isOwned = true;
return false;
}
coQutex.waitingCoroutines.emplace_back(
std::coroutine_handle<void>::from_address(callerSchedHandle.address()),
sscl::ComponentThread::getSelf()->getIoService(),
*acquirerChainLink);
return true;
}
ReleaseHandle
// [[nodiscard("store co_await result; lock is held until ReleaseHandle is released")]]
await_resume() noexcept;
CoQutex &coQutex;
private:
PromiseChainLink *acquirerChainLink = nullptr;
};
AcquireInvocationAndSuspensionPolicy
// [[nodiscard("store co_await result; lock is held until ReleaseHandle is released")]]
getAcquireInvocationAndSuspensionPolicy() noexcept
{
return AcquireInvocationAndSuspensionPolicy(*this);
}
private:
friend class ReleaseHandle;
void release() noexcept
{
sscl::SpinLock::Guard guard(spinLock);
assert(isOwned);
if (waitingCoroutines.empty()) {
isOwned = false;
return;
}
auto &frontWaitingCoroutine = waitingCoroutines.front();
boost::asio::post(
frontWaitingCoroutine.callerIoContext,
frontWaitingCoroutine.callerSchedHandle);
waitingCoroutines.pop_front();
}
sscl::SpinLock spinLock;
bool isOwned = false;
std::deque<AcquireInvocationAndSuspensionPolicy::WaitingCoroutine> waitingCoroutines;
};
//[[nodiscard("store co_await result; lock is held until ReleaseHandle is released")]]
class CoQutex::ReleaseHandle
{
public:
ReleaseHandle(PromiseChainLink &promiseChainLinkIn, CoQutex &coQutexIn) noexcept
: promiseChainLink(promiseChainLinkIn),
coQutex(coQutexIn)
{}
ReleaseHandle(const ReleaseHandle &) = delete;
ReleaseHandle &operator=(const ReleaseHandle &) = delete;
ReleaseHandle(ReleaseHandle &&other) noexcept
: promiseChainLink(other.promiseChainLink),
coQutex(other.coQutex),
armed(other.armed)
{
other.armed = false;
}
ReleaseHandle &operator=(ReleaseHandle &&other) noexcept = delete;
~ReleaseHandle() noexcept
{
if (armed)
{ release(); }
}
void release() noexcept
{
if (!armed)
{ return; }
armed = false;
promiseChainLink.removeAcquiredLock(coQutex);
coQutex.release();
}
void operator()() noexcept
{
release();
}
private:
PromiseChainLink &promiseChainLink;
CoQutex &coQutex;
bool armed = true;
};
inline CoQutex::ReleaseHandle
// [[nodiscard("store co_await result; lock is held until ReleaseHandle is released")]]
CoQutex::AcquireInvocationAndSuspensionPolicy::await_resume() noexcept
{
assert(acquirerChainLink != nullptr);
acquirerChainLink->addAcquiredLock(coQutex);
return CoQutex::ReleaseHandle(*acquirerChainLink, coQutex);
}
} // namespace sscl::co
#endif // CO_QUTEX_H
+599
View File
@@ -0,0 +1,599 @@
#ifndef GROUP_H
#define GROUP_H
#include <cassert>
#include <coroutine>
#include <cstddef>
#include <exception>
#include <functional>
#include <iostream>
#include <iterator>
#include <sstream>
#include <stdexcept>
#include <string>
#include <utility>
#include <vector>
#include <boost/asio/io_service.hpp>
#include <boost/asio/post.hpp>
#include <spinscale/componentThread.h>
#include <spinscale/co/promiseChainLink.h>
#include <spinscale/sharedResourceGroup.h>
#include <spinscale/spinLock.h>
namespace sscl::co {
namespace detail {
template <typename T, typename H>
concept await_suspend_returns_void = requires(T &t, H h) {
{ t.await_suspend(h) } -> std::same_as<void>;
};
template <typename T, typename H>
concept await_suspend_returns_bool = requires(T &t, H h) {
{ t.await_suspend(h) } -> std::convertible_to<bool>;
};
template <typename T, typename H>
concept await_suspend_returns_handle = requires(T &t, H h) {
{ t.await_suspend(h) } -> std::convertible_to<std::coroutine_handle<>>;
};
template <typename T, typename H>
concept await_suspend_ok = await_suspend_returns_void<T, H>
|| await_suspend_returns_bool<T, H>
|| await_suspend_returns_handle<T, H>;
template <typename T, typename H = std::coroutine_handle<>>
concept AwaiterIface = requires(T &t, H h) {
{ t.await_ready() } -> std::convertible_to<bool>;
{ t.await_resume() };
} && await_suspend_ok<T, H>;
template <typename T>
auto get_operator_co_await(T &t) -> decltype(operator co_await(t));
template <typename T>
concept AwaitableIface = requires(T &t) {
{ get_operator_co_await(t) };
} && AwaiterIface<decltype(get_operator_co_await(std::declval<T &>()))>;
} // namespace detail
template <typename T>
concept AwaitableIface = detail::AwaitableIface<T>;
template <typename T>
concept AwaiterIface = detail::AwaiterIface<T>;
template <typename T>
concept AwaitableOrAwaiterIface = AwaiterIface<T> || AwaitableIface<T>;
template <typename Invoker>
requires AwaitableOrAwaiterIface<Invoker>
struct Group
{
enum class AwaitingCondition {
NONE, FIRST_SETTLED, ALL_SETTLED
};
class SettlementDescriptor
{
public:
enum class TypeE {
/* We track EXCEPTIION_THROWN but we don't provide an
* awaitInvoker for exception events. The caller can
* wait for settlements and then scan the result set
* to manually deal with exceptions.
*/
UNSETTLED, COMPLETED, EXCEPTION_THROWN
};
SettlementDescriptor(Invoker &_invoker)
: invoker(std::ref(_invoker))
{}
void setSettlementStatus() noexcept
{
assert(type == TypeE::UNSETTLED);
if (calleeException) {
type = TypeE::EXCEPTION_THROWN;
} else {
type = TypeE::COMPLETED;
}
}
TypeE type = TypeE::UNSETTLED;
std::exception_ptr calleeException = nullptr;
std::exception_ptr adapterException = nullptr;
std::reference_wrapper<Invoker> invoker;
};
struct SettlementAwaitingInvoker;
struct AwaitFirstSettlementInvoker;
struct AwaitAllSettlementsInvoker;
// getAwaitNextSettlementInvoker();
AwaitFirstSettlementInvoker getAwaitFirstSettlementInvoker()
{ return AwaitFirstSettlementInvoker(*this); }
AwaitAllSettlementsInvoker getAwaitAllSettlementsInvoker()
{ return AwaitAllSettlementsInvoker(*this); }
bool verifyAllInvokersSettled() const
{
for (auto &desc : s.rsrc.settlements) {
if (desc.type == SettlementDescriptor::TypeE::UNSETTLED) {
return false;
}
}
return true;
}
bool firstInvokerSettled() const
{ return s.rsrc.firstSettledInvokerIdx >= 0; }
bool allInvokersSettled() const
{
const std::size_t nInvokersAdded = s.rsrc.settlements.size();
assert(s.rsrc.nInvokersSettled <= nInvokersAdded);
return s.rsrc.nInvokersSettled == nInvokersAdded;
}
/** Caller must hold s.lock. */
void throwIfNoMemberInvokersForCoAwaitUnderLock() const
{
if (s.rsrc.settlements.empty()) {
throw std::runtime_error(
"co_await: Group has no member invokers; call add() before awaiting");
}
}
struct SettlementAwaitingInvoker
{
explicit SettlementAwaitingInvoker(Group &_group)
: parentGroup(_group)
{}
bool await_ready() const { return false; }
/** EXPLANATION:
* This exists for if we ever need to re-make the adapter coro
* throw exceptions. But we decided to make it noexcept in order
* to avoid this complication.
*/
void checkForAndReThrowAdapterExceptions() const
{
std::ostringstream ostream;
bool doThrow = false;
for (auto &item : parentGroup.s.rsrc.settlements)
{
if (!item.adapterException) {
continue;
}
doThrow = true;
ostream << "Exc thrown in Group Adapter: ";
try {
std::rethrow_exception(item.adapterException);
} catch (const std::exception &e) {
ostream << e.what();
} catch (...) {
ostream << "<unknown exception type>";
}
ostream << "\n";
}
if (doThrow) {
throw std::runtime_error(ostream.str());
}
}
Group &parentGroup;
};
/** EXPLANATION:
* AwaitingCondition and the group-awaiter coroutine_handle are set only
* in await_suspend when this co_await actually suspends. Constructing
* several AwaitFirstSettlementInvoker / AwaitAllSettlementsInvoker
* objects without co_awaiting them is harmless.
*
* You may co_await await-all and later co_await await-first (in either
* construction order). After a suspending wait completes, the adapter
* clears handle state in updateSettlementsStateAndAwakenCallerIfConditionMet,
* so a later co_await on another handle (or a second co_await on the same
* handle, after the first finished) is legal.
*
* Only one group co_await may be suspended with a registered handle at a
* time; a second concurrent co_await trips assert(!callerHasSetSchedHandle)
* in debug builds.
*
* firstSettledInvokerIdx and calleeWasReadyToNotifyOfFirstSettlement are
* sticky for the Group lifetime (first member ever to settle), not per wave.
*/
struct AwaitFirstSettlementInvoker
: public SettlementAwaitingInvoker
{
using SettlementAwaitingInvoker::SettlementAwaitingInvoker;
bool await_suspend(std::coroutine_handle<> groupAwaiterSchedHandle)
{
/* No other group co_await may be suspended with a registered handle.
* Sequential co_await on the same object is allowed after the prior
* wait finished and clearCallerSchedHandleState() ran on wake.
*/
assert(!this->parentGroup.s.rsrc.callerHasSetSchedHandle);
sscl::SpinLock::Guard guard(this->parentGroup.s.lock);
this->parentGroup.throwIfNoMemberInvokersForCoAwaitUnderLock();
if (this->parentGroup.s.rsrc.calleeWasReadyToNotifyOfFirstSettlement) {
return false;
}
/* We store away the coro_handle of the
* group awaiter, and suspend that group awaiter.
*/
this->parentGroup.s.rsrc.setCallerSchedHandleAndCondition(
groupAwaiterSchedHandle, AwaitingCondition::FIRST_SETTLED);
return true;
}
std::pair<SettlementDescriptor &, std::vector<SettlementDescriptor> &>
await_resume()
{
assert(this->parentGroup.firstInvokerSettled());
return {
this->parentGroup.s.rsrc.settlements[
this->parentGroup.s.rsrc.firstSettledInvokerIdx],
this->parentGroup.s.rsrc.settlements
};
}
};
/** EXPLANATION:
* Same awaiting rules as AwaitFirstSettlementInvoker (see above).
*
* It is illegal to add() new members while a group co_await is suspended
* (groupAwaiterSchedHandle is registered). You may add() after co_await
* returns, including starting a new settlement wave before the next
* co_await.
*/
struct AwaitAllSettlementsInvoker
: public SettlementAwaitingInvoker
{
using SettlementAwaitingInvoker::SettlementAwaitingInvoker;
bool await_suspend(std::coroutine_handle<> groupAwaiterSchedHandle)
{
/* See AwaitFirstSettlementInvoker::await_suspend. Handle state is
* cleared when the adapter wakes a suspended group co_awaiter, not
* in await_resume.
*/
assert(!this->parentGroup.s.rsrc.callerHasSetSchedHandle);
sscl::SpinLock::Guard guard(this->parentGroup.s.lock);
this->parentGroup.throwIfNoMemberInvokersForCoAwaitUnderLock();
if (this->parentGroup.allInvokersSettled()) {
return false;
}
this->parentGroup.s.rsrc.setCallerSchedHandleAndCondition(
groupAwaiterSchedHandle, AwaitingCondition::ALL_SETTLED);
return true;
}
std::vector<SettlementDescriptor> &await_resume()
{
assert(this->parentGroup.allInvokersSettled());
return this->parentGroup.s.rsrc.settlements;
}
};
struct NonAwaitableNonPostingAdapterCoro
{
struct promise_type
: public PromiseChainLink
{
NonAwaitableNonPostingAdapterCoro get_return_object() noexcept
{ return {}; }
void removeAcquiredLock(CoQutex &) noexcept override
{}
std::suspend_never initial_suspend() noexcept { return {}; }
/** EXPLANATION:
* final_suspend must return suspend_never here so that
* this fire-and-forget adapter coro will be self-destroying.
*/
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() noexcept { return; }
void unhandled_exception() noexcept
{
try {
auto eptr = std::current_exception();
if (eptr) {
std::rethrow_exception(eptr);
}
} catch (const std::exception &e) {
std::cerr << "Unhandled exception in Group adapter coroutine:\n"
<< e.what() << "\n";
} catch (...) {
std::cerr << "Unhandled non-std exception in Group adapter coroutine\n";
}
std::terminate();
}
};
NonAwaitableNonPostingAdapterCoro() noexcept = default;
NonAwaitableNonPostingAdapterCoro operator co_await() const = delete;
bool await_ready() const { std::terminate(); return false; }
void await_suspend() const { std::terminate(); }
void await_resume() const { std::terminate(); }
};
std::pair<bool, bool>
updateSettlementsStateAndAwakenCallerIfConditionMet(
std::size_t settlementIndex) noexcept
{
bool isFirstSettlement = false;
bool isLastSettlement = false;
std::coroutine_handle<> groupAwaiterSchedHandleToWake = nullptr;
{
sscl::SpinLock::Guard guard(s.lock);
/* If we can be certain that the AllSettled condition won't
* be triggered repeatedly, then we can get rid of
* calleeWasReadyToNotifyOfLastSettlementForCurrentSet.
*/
assert(s.rsrc.nInvokersSettled < s.rsrc.settlements.size());
assert(settlementIndex < s.rsrc.settlements.size());
s.rsrc.nInvokersSettled++;
if (!firstInvokerSettled())
{
isFirstSettlement = true;
s.rsrc.firstSettledInvokerIdx = static_cast<int>(settlementIndex);
/* This should be set-once & sticky throughout the lifetime
* of the Group object. The first invoker only gets
* settled once, irrespective of how many
* AwaitFirstSettlementInvoker instances we create.
*/
s.rsrc.calleeWasReadyToNotifyOfFirstSettlement = true;
}
if (allInvokersSettled())
{
assert(s.rsrc.nInvokersSettled == s.rsrc.settlements.size());
assert(verifyAllInvokersSettled());
isLastSettlement = true;
}
/* If no group co_awaiter registered a handle (did not suspend, or
* already woke and clearCallerSchedHandleState ran), there is
* nothing to post back to.
*/
if (!s.rsrc.callerHasSetSchedHandle) {
return {isFirstSettlement, isLastSettlement};
}
/* If we're here, then callerHasSetSchedHandle must be true.
* I.e: an invoker has been created and co_awaited for one of the
* conditions.
* Therefore currentAwaitingCondition must also have been set,
* since currentAwaitingCondition is set in the invokers' ctors.
*/
assert(s.rsrc.currentAwaitingCondition != AwaitingCondition::NONE);
if ((isFirstSettlement
&& s.rsrc.currentAwaitingCondition == AwaitingCondition::FIRST_SETTLED)
|| (isLastSettlement
&& s.rsrc.currentAwaitingCondition == AwaitingCondition::ALL_SETTLED))
{
groupAwaiterSchedHandleToWake = s.rsrc.groupAwaiterSchedHandle;
/** We only clear here and not in await_resume, because if
* the caller hasn't already set it schedHandle by the time we're
* called, then when it eventually does call await_suspend, it
* won't set it then either.
*
* I.e: callerSchedHandle only needs to be cleared it if gets set
* in the first place;
* And it only gets set if we need to invoke the schedHandle from
* here.
* If the group co_awaiter is able to call await_resume, then it
* simply doesn't set its schedHandle at all.
*/
s.rsrc.clearCallerSchedHandleState();
}
}
if (groupAwaiterSchedHandleToWake)
{
/* We should be able to just directly resume() the group awaiter's handle
* here because that would invoke await_resume, which may destroy the
* callee's promise.
* And who is the callee? Is it not this coro here? And this coro
* hasn't been suspended. So we'd be destroying ourself while we're
* not suspended.
*
* But all of that only applies __IFF__ we actually do try to destroy
* the callee within the caller's Invoker. If we don't, then the callee
* should persist just fine. There's no implicit mechanism that
* will always destroy the callee coro state before the invoker
* is destroyed.
* If that was in fact the way it worked, then fire-and-forget coros
* would be impossible.
*
* So we should be able to call resume() directly here without
* post()ing to current_io_context().
*
* EXPLANATION:
* However, in order to ensure that we keep this adapter coro
* method exception-free, we are forced to post() rather than
* directly calling the handle.
*/
boost::asio::post(
sscl::ComponentThread::getSelf()->getIoService(),
groupAwaiterSchedHandleToWake);
}
return {isFirstSettlement, isLastSettlement};
}
/** EXPLANATION:
* This coro is a coro which has a promise, and does __not__ expose an awaitable
* iface and in fact should not be capable of being awaited, ultimately.
*
* Its purpose is to be an adapter that enables the Group class to invoke the
* invokers that are added to it, without having to co_await those invokers.
* Rather, the Group class simply invokes this function on them, and then this
* function both co_awaits the invoker on behalf of the Group class, and also
* performs the normal function of an invoker, which is both to invoke the
* target async fn, and also to convey its results back to the Group class.
* It's effectively a go-between coro that provides the outcomes that Invokers
* normally provide, without needing, itself, to be co_awaited.
*/
NonAwaitableNonPostingAdapterCoro nonAwaitableAdapterCoro(
std::size_t settlementIndex) noexcept
{
/** EXPLANATION:
* It's very convenient that our design for the NonViralNonSuspendingInvoker
* coincidentally allows us to supply a lambda that can be used to test
* for the settlement conditions that are being waited on by the Group's
* co_awaiter.
*
* settlementIndex is captured by value (not a vector iterator) so adapter
* coros remain valid if settlements reallocate during concurrent add().
*/
try {
/* Return values remain in the callee promise until the caller-owned
* invoker is destroyed (~PostingInvoker). The group co_awaiter reads
* results via settlements[settlementIndex].invoker after awaiting.
*
* Index settlements[] each time; do not cache a reference across
* co_await because concurrent add() may reallocate the vector.
*/
co_await s.rsrc.settlements[settlementIndex].invoker.get();
}
catch (...)
{
s.rsrc.settlements[settlementIndex].calleeException =
std::current_exception();
}
/* From here onwards, we mustn't throw(). Unhandled exceptions
* generated by the adapter coro itself will result in
* std::terminate().
*/
s.rsrc.settlements[settlementIndex].setSettlementStatus();
updateSettlementsStateAndAwakenCallerIfConditionMet(settlementIndex);
co_return;
}
/** EXPLANATION:
* Each invoker passed to add() must outlive this Group and the callee frame
* (see ~PostingInvoker). The group co_awaiter reads return values from those
* invokers after awaiting; do not destroy an invoker until reads are done.
*/
void add(Invoker &invoker)
{
std::size_t settlementIndex = 0;
{
sscl::SpinLock::Guard guard(s.lock);
if (s.rsrc.groupAwaiterSchedHandle)
{
throw std::runtime_error(
"add: New member invokers mustn't be added "
"while co_awaiting a given set");
}
settlementIndex = s.rsrc.settlements.size();
s.rsrc.settlements.emplace_back(invoker);
}
nonAwaitableAdapterCoro(settlementIndex);
}
void checkForAndReThrowGroupExceptions() const
{
std::ostringstream ostream;
bool doThrow = false;
for (auto &item : s.rsrc.settlements)
{
if (item.type != SettlementDescriptor::TypeE::EXCEPTION_THROWN) {
continue;
}
assert(item.calleeException);
doThrow = true;
ostream << "Exc thrown in Group Adapter: ";
try {
std::rethrow_exception(item.calleeException);
} catch (const std::exception &e) {
ostream << e.what();
} catch (...) {
ostream << "<unknown exception type>";
}
ostream << "\n";
}
if (doThrow) {
throw std::runtime_error(ostream.str());
}
}
struct State
{
void clearCallerSchedHandleState() noexcept
{
groupAwaiterSchedHandle = nullptr;
callerHasSetSchedHandle = false;
currentAwaitingCondition = AwaitingCondition::NONE;
}
void setCallerSchedHandleAndCondition(
std::coroutine_handle<> groupAwaiterSchedHandleIn,
AwaitingCondition awaitingCondition) noexcept
{
groupAwaiterSchedHandle = groupAwaiterSchedHandleIn;
callerHasSetSchedHandle = true;
currentAwaitingCondition = awaitingCondition;
}
int firstSettledInvokerIdx = -1;
std::size_t nInvokersSettled = 0;
std::coroutine_handle<> groupAwaiterSchedHandle = nullptr;
bool callerHasSetSchedHandle = false;
/* calleWasReady*First* is an indelible record of what
* occured during the first settlement's adapter's update.
*/
bool calleeWasReadyToNotifyOfFirstSettlement = false;
std::vector<SettlementDescriptor> settlements;
AwaitingCondition currentAwaitingCondition = AwaitingCondition::NONE;
};
sscl::SharedResourceGroup<sscl::SpinLock, State> s;
};
} // namespace sscl::co
#endif // GROUP_H
+161
View File
@@ -0,0 +1,161 @@
#ifndef INVOKERS_H
#define INVOKERS_H
#include <config.h>
#include <coroutine>
#include <iostream>
#include <sstream>
#include <stdexcept>
#include <thread>
#include <type_traits>
#include <spinscale/co/postingInvoker.h>
namespace sscl::co {
/** Non-viral coroutine entry that must not be co_awaited: promise is always
* PostingPromiseTemplate<void> (no return-value path to a caller).
*
* The invoker must outlive the callee frame: do not discard the return object
* from get_return_object(). ~PostingInvoker destroys the callee frame.
*/
template <template <typename> class PostingPromiseTemplate>
struct NonViralNonSuspendingInvoker
: public PostingInvoker<PostingPromiseTemplate<void>, void>
{
struct promise_type
: public PostingPromiseTemplate<void>
{
using PostingPromiseTemplate<void>::PostingPromiseTemplate;
NonViralNonSuspendingInvoker<PostingPromiseTemplate> get_return_object()
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id() << " Returning NonViralNonSuspendingInvoker.\n";
#endif
if (!this->callerLambda)
{
/** EXPLANATION:
* We require a completion lambda to be provided to the
* non-viral coroutines, because that's how we internally
* distinguish between non-viral and viral coroutines.
*
* Additionally, non-viral coroutines almost never have a
* good reason to not have a completion lambda.
*/
std::ostringstream oss;
oss << std::this_thread::get_id()
<< ": Missing completion lambda: non-viral coroutines require a completion lambda.";
throw std::runtime_error(oss.str());
}
this->setSelfSchedHandle(
std::coroutine_handle<promise_type>::from_promise(*this));
return NonViralNonSuspendingInvoker<PostingPromiseTemplate>(*this);
}
};
using PostingInvoker<PostingPromiseTemplate<void>, void>::PostingInvoker;
bool await_ready() const noexcept
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id() << " This shouldn't be called.\n";
#endif
return true;
}
void await_suspend(std::coroutine_handle<NonViralNonSuspendingInvoker<PostingPromiseTemplate>>) noexcept
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id() << " This shouldn't be called.\n";
#endif
}
void await_resume() noexcept
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id() << " This shouldn't be called.\n";
#endif
}
};
/** Viral awaitable: promise_type inherits PostingPromiseTemplate<T> (posting
* target chosen by the posting-promise alias, e.g. BodyPostingPromise<int>).
*
* The invoker must outlive the callee frame until results are read.
* ~PostingInvoker destroys the callee frame (not await_resume).
*/
template <template <typename> class PostingPromiseTemplate, typename T>
struct ViralSuspendingInvoker
: public PostingInvoker<PostingPromiseTemplate<T>, T>
{
struct promise_type
: public PostingPromiseTemplate<T>
{
using PostingPromiseTemplate<T>::PostingPromiseTemplate;
ViralSuspendingInvoker<PostingPromiseTemplate, T> get_return_object() noexcept
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id() << " Returning ViralSuspendingInvoker.\n";
#endif
this->setSelfSchedHandle(
std::coroutine_handle<promise_type>::from_promise(*this));
return ViralSuspendingInvoker<PostingPromiseTemplate, T>(*this);
}
};
using PostingInvoker<PostingPromiseTemplate<T>, T>::PostingInvoker;
bool await_ready() const noexcept
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id() << " Returning false.\n";
#endif
return false;
}
template <typename CallerPromise>
bool await_suspend(std::coroutine_handle<CallerPromise> callerSchedHandle) noexcept
{
static_assert(
std::is_base_of_v<PromiseChainLink, CallerPromise>,
"ViralSuspendingInvoker caller promise must derive from PromiseChainLink");
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id() << " Setting callerSchedHandle.\n";
#endif
const bool suspendCaller = this->setCallerSchedHandle(callerSchedHandle);
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id()
<< " CallerFlowExecutor returned suspend=" << suspendCaller << ".\n";
#endif
/** EXPLANATION:
* If the callee was ready to post-back, then we don't need to
* suspend the caller -- so return either false or
* a symmetric transfer handle to the `callerSchedHandle` we were
* passed as an argument.
*
* If the callee is not ready to post-back, then we need to suspend
* the caller so that the caller can suspend until the callee posts
* the callerSchedHandle to the callerIoContext -- so return true
* or std::noop_coroutine().
*/
return suspendCaller;
}
T await_resume()
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id() << " Resumed on caller thread, hopefully.\n";
#endif
return PostingInvoker<PostingPromiseTemplate<T>, T>::await_resume();
}
};
} // namespace sscl::co
#endif // INVOKERS_H
+107
View File
@@ -0,0 +1,107 @@
#ifndef POSTING_INVOKER_H
#define POSTING_INVOKER_H
#include <config.h>
#include <coroutine>
#include <iostream>
#include <thread>
#include <type_traits>
#include <utility>
#include <spinscale/co/promises.h>
namespace sscl::co {
template <typename PromiseType, typename T>
class PostingInvoker
{
public:
explicit PostingInvoker(PromiseType &_calleePromise) noexcept
: calleePromise(_calleePromise)
{}
PostingInvoker(const PostingInvoker &) = delete;
PostingInvoker &operator=(const PostingInvoker &) = delete;
PostingInvoker(PostingInvoker &&other) noexcept
: calleePromise(other.calleePromise),
ownsFrameDestroy_(std::exchange(other.ownsFrameDestroy_, false))
{}
PostingInvoker &operator=(PostingInvoker &&other) = delete;
~PostingInvoker() noexcept
{
if (!ownsFrameDestroy_) { return; }
std::coroutine_handle<> handle = calleePromise.selfSchedHandle;
if (handle) {
handle.destroy();
}
}
template <typename CallerPromise>
bool setCallerSchedHandle(std::coroutine_handle<CallerPromise> callerSchedHandle) noexcept
{
static_assert(
std::is_base_of_v<PromiseChainLink, CallerPromise>,
"PostingInvoker caller promise must derive from PromiseChainLink");
calleePromise.callerSchedHandle = callerSchedHandle;
calleePromise.setCallerPromiseChainLink(&callerSchedHandle.promise());
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id()
<< " Done setting callerSchedHandle; running CallerFlowExecutor.\n";
#endif
return calleePromise.postBackStatus.getCallerFlowExecutor()();
}
ReturnValues<T> &completedReturnValues() noexcept
{ return calleePromise.returnValues; }
const ReturnValues<T> &completedReturnValues() const noexcept
{ return calleePromise.returnValues; }
auto await_resume()
{
calleePromise.postBackStatus.reset();
ReturnValues<T> &returnValues = calleePromise.returnValues;
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id()
<< " About to check for and rethrow any exception.\n";
#endif
if (returnValues.myExceptionPtr)
{
std::exception_ptr const captured = returnValues.myExceptionPtr;
std::rethrow_exception(captured);
}
if constexpr (!std::is_void_v<T>)
{
T result = std::move(returnValues.myReturnValue);
return result;
}
}
private:
PromiseType &calleePromise;
/** EXPLANATION:
* Every live invoker owns destruction of its callee coroutine frame in
* ~PostingInvoker (via calleePromise.selfSchedHandle).
*
* The only time frame destruction is skipped is for a moved-from invoker
* after move construction or move assignment, so we do not double-destroy
* the same handle when get_return_object() returns the invoker by value.
*
* This is not an opt-out for viral vs non-viral callers or for "callee
* still running"; callers must keep the invoker alive until the callee
* frame is no longer needed.
*/
bool ownsFrameDestroy_ = true;
};
} // namespace sscl::co
#endif // POSTING_INVOKER_H
+72
View File
@@ -0,0 +1,72 @@
#ifndef PROMISE_CHAIN_LINK_H
#define PROMISE_CHAIN_LINK_H
#include <functional>
#include <list>
namespace sscl::co {
class CoQutex;
/**
* Non-template base for coroutine promises participating in a logical
* promise chain (analogous to libspinscale AsynchronousContinuationChainLink).
* A future deadlock detector can walk callerPromiseChainLink() without
* knowing concrete promise_type.
*/
class PromiseChainLink
{
public:
virtual ~PromiseChainLink() = default;
/** Reserved for deadlock detection: link toward the caller / outer coroutine. */
virtual const PromiseChainLink *callerPromiseChainLink() const noexcept
{ return nullptr; }
virtual PromiseChainLink *callerPromiseChainLink() noexcept
{ return nullptr; }
void addAcquiredLock(CoQutex &coQutex) noexcept
{ acquiredLocks.emplace_back(std::ref(coQutex)); }
bool holdsAcquiredLock(const CoQutex &coQutex) const noexcept
{ return findMatchingAcquiredLock(coQutex) != acquiredLocks.end(); }
virtual void removeAcquiredLock(CoQutex &coQutex) noexcept = 0;
protected:
using AcquiredLockList = std::list<std::reference_wrapper<CoQutex>>;
AcquiredLockList::iterator findMatchingAcquiredLock(CoQutex &coQutex) noexcept
{
for (auto it = acquiredLocks.begin(); it != acquiredLocks.end(); ++it) {
if (&it->get() == &coQutex) {
return it;
}
}
return acquiredLocks.end();
}
AcquiredLockList::const_iterator findMatchingAcquiredLock(const CoQutex &coQutex) const noexcept
{
for (auto it = acquiredLocks.begin(); it != acquiredLocks.end(); ++it) {
if (&it->get() == &coQutex) {
return it;
}
}
return acquiredLocks.end();
}
void eraseFirstMatchingAcquiredLock(CoQutex &coQutex) noexcept
{
auto match = findMatchingAcquiredLock(coQutex);
if (match != acquiredLocks.end()) {
acquiredLocks.erase(match);
}
}
AcquiredLockList acquiredLocks;
};
} // namespace sscl::co
#endif // PROMISE_CHAIN_LINK_H
+49
View File
@@ -0,0 +1,49 @@
#ifndef PROMISE_CHAIN_WALKER_H
#define PROMISE_CHAIN_WALKER_H
#include <cstddef>
#include <spinscale/co/promiseChainLink.h>
namespace sscl::co {
/**
* Upper bound on caller-chain links visited after the root (guards cycles / bugs).
*
* Design posture (cf. docs/3rdParty/smo/libspinscale — continuation tracing vs
* interpretation): this header performs trace-only walks along
* PromiseChainLink::callerPromiseChainLink(); deadlock interpretation stays at
* call sites / later policy.
*/
inline constexpr std::size_t kMaxCallerPromiseChainTraversalSteps = 4096;
inline const PromiseChainLink *nextOnCallerPromiseChain(
const PromiseChainLink &link) noexcept
{
return link.callerPromiseChainLink();
}
inline bool callerChainHopUnderStepLimit(
std::size_t hopIndex) noexcept
{
return hopIndex < kMaxCallerPromiseChainTraversalSteps;
}
template <typename Visitor>
void walkCallerPromiseChainFrom(
const PromiseChainLink &root, Visitor &&visitor)
{
visitor(root);
const PromiseChainLink *next = nextOnCallerPromiseChain(root);
for (std::size_t hopIndex = 0;
next != nullptr && callerChainHopUnderStepLimit(hopIndex);
++hopIndex)
{
visitor(*next);
next = nextOnCallerPromiseChain(*next);
}
}
} // namespace sscl::co
#endif // PROMISE_CHAIN_WALKER_H
+363
View File
@@ -0,0 +1,363 @@
#ifndef PROMISES_H
#define PROMISES_H
#include <config.h>
#include <coroutine>
#include <exception>
#include <functional>
#include <iostream>
#include <thread>
#include <type_traits>
#include <utility>
#include <boost/asio/io_context.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/post.hpp>
#include <spinscale/componentThread.h>
#include <spinscale/co/coQutex.h>
#include <spinscale/spinLock.h>
namespace sscl::co {
template <typename PromiseType, typename T>
class PostingInvoker;
template <typename T, bool IsVoid = std::is_void_v<T>>
struct ReturnValueStorage;
template <typename T>
struct ReturnValueStorage<T, false>
{
T myReturnValue{};
};
template <typename T>
struct ReturnValueStorage<T, true>
{
};
template <typename T>
struct ReturnValues
: public ReturnValueStorage<T>
{
ReturnValues() noexcept
: myExceptionPtr(myMemberExceptionPtr)
{}
explicit ReturnValues(std::exception_ptr &callerExceptionPtr) noexcept
: myExceptionPtr(callerExceptionPtr)
{}
~ReturnValues() noexcept
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id() << " Destructing.\n";
#endif
}
/** EXPLANATION:
* The exception_ptr ref here can either point to the exception_ptr
* a non-viral coroutine supplied to us as its storage space for
* where we should store any exception that is thrown;
*
* Or it could point to the member exception_ptr in this very class,
* which is used for viral coroutines that can bubble their exception
* up and automatically via the language runtime.
*/
std::exception_ptr &myExceptionPtr;
std::exception_ptr myMemberExceptionPtr = nullptr;
};
/** `return_value` / `return_void` only. ThreadTag is not a template parameter here:
* for tagged promises, PromiseType is `TaggedPostingPromise<T, ThreadTag>`.
*/
template <typename PromiseType, typename T, bool IsVoid = std::is_void_v<T>>
struct PostingPromiseReturnOps;
template <typename PromiseType, typename T>
struct PostingPromiseReturnOps<PromiseType, T, false>
{
void return_value(T returnValue) noexcept
{
static_cast<PromiseType *>(this)->returnValues.myReturnValue = std::move(returnValue);
}
};
template <typename PromiseType, typename T>
struct PostingPromiseReturnOps<PromiseType, T, true>
{
void return_void() noexcept
{
return;
}
};
template <typename T>
struct PostingPromise
: public PromiseChainLink
{
struct PostBackStatus
{
struct CalleeFlowExecutor;
struct CallerFlowExecutor;
friend struct CalleeFlowExecutor;
friend struct CallerFlowExecutor;
explicit PostBackStatus(PostingPromise &calleePromiseIn) noexcept
: calleePromise(calleePromiseIn)
{}
void reset() noexcept
{
sscl::SpinLock::Guard guard(lock);
callerHasSetCallerSchedHandle = false;
calleeIsReadyToPostBack = false;
}
struct FlowExecutor
{
explicit FlowExecutor(PostBackStatus &parentIn) noexcept
: parent(parentIn)
{}
PostBackStatus &parent;
};
struct CalleeFlowExecutor
: public FlowExecutor
{
explicit CalleeFlowExecutor(PostBackStatus &parentIn) noexcept
: FlowExecutor(parentIn)
{}
void operator()() noexcept
{
sscl::SpinLock::Guard guard(this->parent.lock);
this->parent.calleeIsReadyToPostBack = true;
if (this->parent.callerHasSetCallerSchedHandle)
{
boost::asio::post(
this->parent.calleePromise.callerIoContext,
this->parent.calleePromise.callerSchedHandle);
}
}
};
struct CallerFlowExecutor
: public FlowExecutor
{
explicit CallerFlowExecutor(PostBackStatus &parentIn) noexcept
: FlowExecutor(parentIn)
{}
bool operator()() noexcept
{
sscl::SpinLock::Guard guard(this->parent.lock);
this->parent.callerHasSetCallerSchedHandle = true;
if (this->parent.calleeIsReadyToPostBack) {
return false;
}
return true;
}
};
CalleeFlowExecutor getCalleeFlowExecutor() noexcept
{
return CalleeFlowExecutor(*this);
}
CallerFlowExecutor getCallerFlowExecutor() noexcept
{
return CallerFlowExecutor(*this);
}
PostingPromise &calleePromise;
private:
sscl::SpinLock lock;
bool callerHasSetCallerSchedHandle = false;
bool calleeIsReadyToPostBack = false;
};
/** Post-to must run from this awaiter's await_suspend, not synchronously inside
* promise.initial_suspend() before it returns: the implementation's hidden coroutine
* state (async segment / suspend index used on the next resume()) is only updated
* after initial_suspend has finished returning its awaiter. Posting the handle too
* early lets the callee resume before that update and re-enter initial_suspend from
* the start, duplicating the post. See docs/prompts/post-to-and-back-in-invokables.md.
*/
struct InitialSuspendPostingInvoker
: public std::suspend_always
{
InitialSuspendPostingInvoker(
boost::asio::io_context &targetIoContextIn,
std::coroutine_handle<> targetSchedHandleIn) noexcept
: targetIoContext(targetIoContextIn),
targetSchedHandle(targetSchedHandleIn)
{}
bool await_suspend(std::coroutine_handle<> const) noexcept
{
boost::asio::post(targetIoContext, targetSchedHandle);
return true;
}
boost::asio::io_context &targetIoContext;
std::coroutine_handle<> targetSchedHandle;
};
/** Post-back (non-viral completion post; viral CalleeFlowExecutor) must run from this
* awaiter's await_suspend, not synchronously inside promise.final_suspend() before it
* returns: the hidden coroutine segment index in the coroutine state is only advanced
* after final_suspend exits. Doing that work inside final_suspend's body risks the same
* kind of ordering bug as initial_suspend—resume observing the wrong segment. See
* docs/prompts/post-to-and-back-in-invokables.md.
*/
struct FinalSuspendPostingInvoker
: public std::suspend_always
{
explicit FinalSuspendPostingInvoker(PostingPromise &calleePromiseIn) noexcept
: calleePromise(calleePromiseIn)
{}
bool await_suspend(std::coroutine_handle<> const) noexcept
{
if (calleePromise.callerLambda)
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << "final_suspend" << ": " << std::this_thread::get_id()
<< " Non-viral: posting callerLambda completion to callerIoContext.\n";
#endif
boost::asio::post(
calleePromise.callerIoContext,
[&calleeRef = calleePromise]()
{
if (calleeRef.returnValues.myExceptionPtr) {
std::rethrow_exception(calleeRef.returnValues.myExceptionPtr);
}
calleeRef.callerLambda();
});
}
else
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << "final_suspend" << ": " << std::this_thread::get_id()
<< " Viral: running CalleeFlowExecutor.\n";
#endif
calleePromise.postBackStatus.getCalleeFlowExecutor()();
}
return true;
}
PostingPromise &calleePromise;
};
PostingPromise() noexcept
: returnValues(), postBackStatus(*this)
{}
template <typename... TailArgs>
PostingPromise(
std::exception_ptr &_callerExceptionPtr,
std::function<void()> _callerLambda,
TailArgs &&...) noexcept
: returnValues(_callerExceptionPtr),
callerLambda(std::move(_callerLambda)),
postBackStatus(*this)
{}
~PostingPromise() noexcept
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id() << " Destructing.\n";
#endif
}
void unhandled_exception() noexcept
{
returnValues.myExceptionPtr = std::current_exception();
}
void removeAcquiredLock(CoQutex &coQutex) noexcept override
{
eraseFirstMatchingAcquiredLock(coQutex);
}
const PromiseChainLink *callerPromiseChainLink() const noexcept override
{ return callerChainLink; }
PromiseChainLink *callerPromiseChainLink() noexcept override
{ return callerChainLink; }
/** Non-viral: post completion lambda to callerIoContext from this thread.
* Viral: run CalleeFlowExecutor (handshake flags); caller may post caller resume
* later via CallerFlowExecutor. See docs/caller-posts-to-own-io-context.md.
* Work runs in FinalSuspendPostingInvoker::await_suspend after the suspend point
* advances (see docs/prompts/post-to-and-back-in-invokables.md).
*/
auto final_suspend() noexcept
{
return FinalSuspendPostingInvoker(*this);
}
ReturnValues<T> returnValues;
std::function<void()> callerLambda;
boost::asio::io_service &callerIoContext =
sscl::ComponentThread::getSelf()->getIoService();
std::coroutine_handle<> selfSchedHandle;
std::coroutine_handle<void> callerSchedHandle;
PromiseChainLink *callerChainLink = nullptr;
PostBackStatus postBackStatus;
protected:
void setSelfSchedHandle(std::coroutine_handle<> schedHandle) noexcept
{
selfSchedHandle = schedHandle;
}
void setCallerPromiseChainLink(PromiseChainLink *chainLink) noexcept
{
callerChainLink = chainLink;
}
template <typename, typename>
friend class PostingInvoker;
};
template <typename T, typename ThreadTag>
struct TaggedPostingPromise
: public PostingPromise<T>,
public PostingPromiseReturnOps<TaggedPostingPromise<T, ThreadTag>, T>
{
TaggedPostingPromise() noexcept
: PostingPromise<T>()
{}
template <typename... TailArgs>
TaggedPostingPromise(
std::exception_ptr &_exceptionPtr,
std::function<void()> _callerLambda,
TailArgs &&... tailArgs) noexcept
: PostingPromise<T>(
_exceptionPtr,
std::move(_callerLambda),
std::forward<TailArgs>(tailArgs)...)
{}
auto initial_suspend() noexcept
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id() << " About to post selfSchedHandle to " << typeid(ThreadTag).name() << ".\n";
std::cout << __func__ << ": " << std::this_thread::get_id() << " Returning InitialSuspendPostingInvoker.\n";
#endif
return typename PostingPromise<T>::InitialSuspendPostingInvoker(
ThreadTag::io_context(),
this->selfSchedHandle);
}
};
} // namespace sscl::co
#endif // PROMISES_H
+19
View File
@@ -0,0 +1,19 @@
#ifndef SHARED_RESOURCE_GROUP_H
#define SHARED_RESOURCE_GROUP_H
namespace sscl {
template <typename LockType, typename ResourceType>
class SharedResourceGroup
{
public:
SharedResourceGroup() = default;
~SharedResourceGroup() = default;
LockType lock;
ResourceType rsrc;
};
} // namespace sscl
#endif // SHARED_RESOURCE_GROUP_H