mirror of
https://github.com/latentPrion/libspinscale.git
synced 2026-06-23 19:48:32 +00:00
Compare commits
3 Commits
b6eb502e56
...
1d1cb099db
| Author | SHA1 | Date | |
|---|---|---|---|
| 1d1cb099db | |||
| 0dcfa754b6 | |||
| ad4ea3ccac |
@@ -18,6 +18,7 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
|
||||
option(ENABLE_DEBUG_LOCKS "Enable debug features for locking system" OFF)
|
||||
option(ENABLE_DEBUG_TRACE_CALLABLES
|
||||
"Enable callable tracing for debugging boost::asio post operations" OFF)
|
||||
option(ENABLE_DEBUG_CO "Enable coroutine-type debug logging" OFF)
|
||||
|
||||
# Qutex deadlock detection configuration
|
||||
if(NOT DEFINED DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS)
|
||||
@@ -43,6 +44,10 @@ if(ENABLE_DEBUG_LOCKS)
|
||||
set(CONFIG_ENABLE_DEBUG_LOCKS TRUE)
|
||||
endif()
|
||||
|
||||
if(ENABLE_DEBUG_CO)
|
||||
set(CONFIG_LIBSSCL_DEBUG_CO TRUE)
|
||||
endif()
|
||||
|
||||
if(ENABLE_DEBUG_TRACE_CALLABLES)
|
||||
set(CONFIG_DEBUG_TRACE_CALLABLES TRUE)
|
||||
# Suppress frame-address warnings when using __builtin_return_address()
|
||||
|
||||
@@ -8,4 +8,7 @@
|
||||
/* Debug callable tracing configuration */
|
||||
#cmakedefine CONFIG_DEBUG_TRACE_CALLABLES
|
||||
|
||||
/* Debug coroutine-type logging configuration */
|
||||
#cmakedefine CONFIG_LIBSSCL_DEBUG_CO
|
||||
|
||||
#endif /* _CONFIG_H */
|
||||
|
||||
@@ -0,0 +1,214 @@
|
||||
#ifndef CO_CONDITION_VARIABLE_H
|
||||
#define CO_CONDITION_VARIABLE_H
|
||||
|
||||
#include <config.h>
|
||||
#include <coroutine>
|
||||
#include <deque>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <thread>
|
||||
|
||||
#include <boost/asio/io_service.hpp>
|
||||
#include <boost/asio/post.hpp>
|
||||
|
||||
#include <spinscale/componentThread.h>
|
||||
#include <spinscale/spinLock.h>
|
||||
|
||||
namespace sscl::co {
|
||||
|
||||
/** Coroutine-friendly handoff: wait until `signal()` before running a completion
|
||||
* step. Standalone primitive (posting promises use `PostBackStatus` instead).
|
||||
*
|
||||
* `clear()` only clears `isSignaled`; it does not wake or drain waiters. If
|
||||
* `clear()` runs while coroutines are still waiting, they stay queued until a
|
||||
* later `signal()` posts them.
|
||||
*/
|
||||
class CoConditionVariable
|
||||
{
|
||||
public:
|
||||
/** Waiter queued under the CV spin lock; `signal()` drains and calls `post()`. */
|
||||
class WaitingCoroutineBase
|
||||
{
|
||||
public:
|
||||
explicit WaitingCoroutineBase(
|
||||
boost::asio::io_service &callerIoContextIn) noexcept
|
||||
: callerIoContext(callerIoContextIn)
|
||||
{}
|
||||
|
||||
virtual ~WaitingCoroutineBase() = default;
|
||||
|
||||
virtual void post() noexcept = 0;
|
||||
|
||||
public:
|
||||
boost::asio::io_service &callerIoContext;
|
||||
};
|
||||
|
||||
template <typename Promise>
|
||||
class TypedWaitingCoroutine
|
||||
: public WaitingCoroutineBase
|
||||
{
|
||||
public:
|
||||
TypedWaitingCoroutine(
|
||||
boost::asio::io_service &callerIoContextIn,
|
||||
std::coroutine_handle<Promise> callerSchedHandleIn) noexcept
|
||||
: WaitingCoroutineBase(callerIoContextIn),
|
||||
callerSchedHandle(callerSchedHandleIn)
|
||||
{}
|
||||
|
||||
void post() noexcept override
|
||||
{
|
||||
boost::asio::post(callerIoContext, callerSchedHandle);
|
||||
}
|
||||
|
||||
public:
|
||||
std::coroutine_handle<Promise> callerSchedHandle;
|
||||
};
|
||||
|
||||
struct OperationInvoker
|
||||
{
|
||||
explicit OperationInvoker(CoConditionVariable &parentCvIn) noexcept
|
||||
: parentCv(parentCvIn)
|
||||
{}
|
||||
|
||||
CoConditionVariable &parentCv;
|
||||
};
|
||||
|
||||
struct WaitForInvoker
|
||||
: public OperationInvoker
|
||||
{
|
||||
using OperationInvoker::OperationInvoker;
|
||||
|
||||
bool await_ready() const noexcept { return false; }
|
||||
|
||||
template <typename Promise>
|
||||
bool await_suspend(std::coroutine_handle<Promise> cvCallerSchedHandle) noexcept
|
||||
{
|
||||
boost::asio::io_service &cvCallerIoContext =
|
||||
sscl::ComponentThread::getSelf()->getIoService();
|
||||
|
||||
sscl::SpinLock::Guard guard(parentCv.spinLock);
|
||||
if (parentCv.isSignaled) {
|
||||
return false;
|
||||
}
|
||||
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id()
|
||||
<< " CV not signaled: Enqueuing waiter coroutine.\n";
|
||||
#endif
|
||||
parentCv.enqueueWaitingCoroutine(
|
||||
cvCallerSchedHandle, cvCallerIoContext);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void await_resume() const noexcept {}
|
||||
};
|
||||
|
||||
/** Manual await-style API only (lowerCamelCase); not a coroutine awaiter.
|
||||
* `FinalSuspendPostingInvoker` calls `awaitSuspend` explicitly.
|
||||
*/
|
||||
struct DecisionEnablingDerivableWaitForInvoker
|
||||
: public OperationInvoker
|
||||
{
|
||||
struct DecisionFactors
|
||||
{
|
||||
sscl::SpinLock &cvInternalSpinLock;
|
||||
bool wasAlreadySignaled;
|
||||
|
||||
DecisionFactors(sscl::SpinLock &cvLockIn, bool signaledIn) noexcept
|
||||
: cvInternalSpinLock(cvLockIn),
|
||||
wasAlreadySignaled(signaledIn)
|
||||
{}
|
||||
};
|
||||
|
||||
using OperationInvoker::OperationInvoker;
|
||||
|
||||
void operator co_await() const = delete;
|
||||
|
||||
bool awaitReady() const noexcept { return false; }
|
||||
|
||||
template <typename Promise>
|
||||
DecisionFactors awaitSuspend(std::coroutine_handle<Promise> cvCallerSchedHandle) noexcept
|
||||
{
|
||||
boost::asio::io_service &cvCallerIoContext =
|
||||
sscl::ComponentThread::getSelf()->getIoService();
|
||||
|
||||
parentCv.spinLock.acquire();
|
||||
if (parentCv.isSignaled)
|
||||
{
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id()
|
||||
<< " CV already signaled: returning already-signaled DecisionFactors.\n";
|
||||
#endif
|
||||
return DecisionFactors(parentCv.spinLock, true);
|
||||
}
|
||||
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id()
|
||||
<< " CV not signaled: returning not-signaled DecisionFactors.\n";
|
||||
#endif
|
||||
parentCv.enqueueWaitingCoroutine(
|
||||
cvCallerSchedHandle, cvCallerIoContext);
|
||||
|
||||
return DecisionFactors(parentCv.spinLock, false);
|
||||
}
|
||||
|
||||
void awaitResume() const noexcept {}
|
||||
};
|
||||
|
||||
CoConditionVariable() noexcept = default;
|
||||
CoConditionVariable(const CoConditionVariable &) = delete;
|
||||
CoConditionVariable &operator=(const CoConditionVariable &) = delete;
|
||||
CoConditionVariable(CoConditionVariable &&) noexcept = delete;
|
||||
CoConditionVariable &operator=(CoConditionVariable &&) noexcept = delete;
|
||||
~CoConditionVariable() noexcept = default;
|
||||
|
||||
WaitForInvoker getWaitForInvoker() noexcept
|
||||
{ return WaitForInvoker(*this); }
|
||||
|
||||
DecisionEnablingDerivableWaitForInvoker
|
||||
getDecisionEnablingDerivableWaitForInvoker() noexcept
|
||||
{
|
||||
return DecisionEnablingDerivableWaitForInvoker(*this);
|
||||
}
|
||||
|
||||
void signal() noexcept
|
||||
{
|
||||
std::deque<std::unique_ptr<WaitingCoroutineBase>> drained;
|
||||
|
||||
{
|
||||
sscl::SpinLock::Guard guard(spinLock);
|
||||
isSignaled = true;
|
||||
drained.swap(waitingCoroutines);
|
||||
}
|
||||
|
||||
for (std::unique_ptr<WaitingCoroutineBase> &waiter : drained) {
|
||||
waiter->post();
|
||||
}
|
||||
}
|
||||
|
||||
/** Only clears the signaled flag; waiters (if any) remain in the deque. */
|
||||
void clear() noexcept
|
||||
{
|
||||
sscl::SpinLock::Guard guard(spinLock);
|
||||
isSignaled = false;
|
||||
}
|
||||
|
||||
template <typename Promise>
|
||||
void enqueueWaitingCoroutine(
|
||||
std::coroutine_handle<Promise> handle,
|
||||
boost::asio::io_service &ctx) noexcept
|
||||
{
|
||||
waitingCoroutines.push_back(
|
||||
std::make_unique<TypedWaitingCoroutine<Promise>>(ctx, handle));
|
||||
}
|
||||
|
||||
private:
|
||||
sscl::SpinLock spinLock;
|
||||
bool isSignaled = false;
|
||||
std::deque<std::unique_ptr<WaitingCoroutineBase>> waitingCoroutines;
|
||||
};
|
||||
|
||||
} // namespace sscl::co
|
||||
|
||||
#endif // CO_CONDITION_VARIABLE_H
|
||||
@@ -0,0 +1,198 @@
|
||||
#ifndef CO_QUTEX_H
|
||||
#define CO_QUTEX_H
|
||||
|
||||
#include <config.h>
|
||||
#include <cassert>
|
||||
#include <coroutine>
|
||||
#include <deque>
|
||||
#include <stdexcept>
|
||||
#include <type_traits>
|
||||
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
#endif
|
||||
|
||||
#include <boost/asio/io_service.hpp>
|
||||
#include <boost/asio/post.hpp>
|
||||
|
||||
#include <spinscale/componentThread.h>
|
||||
#include <spinscale/co/promiseChainWalker.h>
|
||||
#include <spinscale/spinLock.h>
|
||||
|
||||
namespace sscl::co {
|
||||
|
||||
class CoQutex
|
||||
{
|
||||
public:
|
||||
class ReleaseHandle;
|
||||
|
||||
CoQutex() noexcept = default;
|
||||
CoQutex(const CoQutex &) = delete;
|
||||
CoQutex(CoQutex &&) noexcept = delete;
|
||||
CoQutex &operator=(const CoQutex &) = delete;
|
||||
CoQutex &operator=(CoQutex &&) noexcept = delete;
|
||||
~CoQutex() = default;
|
||||
|
||||
struct AcquireInvocationAndSuspensionPolicy
|
||||
{
|
||||
AcquireInvocationAndSuspensionPolicy(CoQutex &_coQutex) noexcept
|
||||
: coQutex(_coQutex)
|
||||
{}
|
||||
|
||||
~AcquireInvocationAndSuspensionPolicy() noexcept = default;
|
||||
|
||||
struct WaitingCoroutine
|
||||
{
|
||||
WaitingCoroutine(
|
||||
std::coroutine_handle<void> _callerSchedHandle,
|
||||
boost::asio::io_service &_callerIoContext,
|
||||
PromiseChainLink &_waitingPromise) noexcept
|
||||
: callerSchedHandle(_callerSchedHandle),
|
||||
callerIoContext(_callerIoContext),
|
||||
waitingPromise(_waitingPromise)
|
||||
{}
|
||||
|
||||
std::coroutine_handle<void> callerSchedHandle;
|
||||
boost::asio::io_service &callerIoContext;
|
||||
PromiseChainLink &waitingPromise;
|
||||
};
|
||||
|
||||
bool await_ready() noexcept { return false; }
|
||||
|
||||
template <typename Promise>
|
||||
bool await_suspend(std::coroutine_handle<Promise> callerSchedHandle)
|
||||
{
|
||||
static_assert(
|
||||
std::is_base_of_v<PromiseChainLink, Promise>,
|
||||
"CoQutex acquire requires a promise type derived from PromiseChainLink");
|
||||
|
||||
acquirerChainLink = &callerSchedHandle.promise();
|
||||
|
||||
walkCallerPromiseChainFrom(
|
||||
static_cast<const PromiseChainLink &>(callerSchedHandle.promise()),
|
||||
[this](const PromiseChainLink &link)
|
||||
{
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id() << " Walking caller promise chain.\n";
|
||||
#endif
|
||||
if (link.holdsAcquiredLock(coQutex)) {
|
||||
throw std::runtime_error("Deadlock detected: CoQutex re-acquire on caller promise chain.");
|
||||
}
|
||||
});
|
||||
|
||||
sscl::SpinLock::Guard guard(coQutex.spinLock);
|
||||
if (!coQutex.isOwned) {
|
||||
coQutex.isOwned = true;
|
||||
return false;
|
||||
}
|
||||
coQutex.waitingCoroutines.emplace_back(
|
||||
std::coroutine_handle<void>::from_address(callerSchedHandle.address()),
|
||||
sscl::ComponentThread::getSelf()->getIoService(),
|
||||
*acquirerChainLink);
|
||||
return true;
|
||||
}
|
||||
|
||||
ReleaseHandle
|
||||
// [[nodiscard("store co_await result; lock is held until ReleaseHandle is released")]]
|
||||
await_resume() noexcept;
|
||||
|
||||
CoQutex &coQutex;
|
||||
|
||||
private:
|
||||
PromiseChainLink *acquirerChainLink = nullptr;
|
||||
};
|
||||
|
||||
AcquireInvocationAndSuspensionPolicy
|
||||
// [[nodiscard("store co_await result; lock is held until ReleaseHandle is released")]]
|
||||
getAcquireInvocationAndSuspensionPolicy() noexcept
|
||||
{
|
||||
return AcquireInvocationAndSuspensionPolicy(*this);
|
||||
}
|
||||
|
||||
private:
|
||||
friend class ReleaseHandle;
|
||||
|
||||
void release() noexcept
|
||||
{
|
||||
sscl::SpinLock::Guard guard(spinLock);
|
||||
|
||||
assert(isOwned);
|
||||
if (waitingCoroutines.empty()) {
|
||||
isOwned = false;
|
||||
return;
|
||||
}
|
||||
|
||||
auto &frontWaitingCoroutine = waitingCoroutines.front();
|
||||
boost::asio::post(
|
||||
frontWaitingCoroutine.callerIoContext,
|
||||
frontWaitingCoroutine.callerSchedHandle);
|
||||
waitingCoroutines.pop_front();
|
||||
}
|
||||
|
||||
sscl::SpinLock spinLock;
|
||||
bool isOwned = false;
|
||||
std::deque<AcquireInvocationAndSuspensionPolicy::WaitingCoroutine> waitingCoroutines;
|
||||
};
|
||||
|
||||
//[[nodiscard("store co_await result; lock is held until ReleaseHandle is released")]]
|
||||
class CoQutex::ReleaseHandle
|
||||
{
|
||||
public:
|
||||
ReleaseHandle(PromiseChainLink &promiseChainLinkIn, CoQutex &coQutexIn) noexcept
|
||||
: promiseChainLink(promiseChainLinkIn),
|
||||
coQutex(coQutexIn)
|
||||
{}
|
||||
|
||||
ReleaseHandle(const ReleaseHandle &) = delete;
|
||||
ReleaseHandle &operator=(const ReleaseHandle &) = delete;
|
||||
|
||||
ReleaseHandle(ReleaseHandle &&other) noexcept
|
||||
: promiseChainLink(other.promiseChainLink),
|
||||
coQutex(other.coQutex),
|
||||
armed(other.armed)
|
||||
{
|
||||
other.armed = false;
|
||||
}
|
||||
|
||||
ReleaseHandle &operator=(ReleaseHandle &&other) noexcept = delete;
|
||||
|
||||
~ReleaseHandle() noexcept
|
||||
{
|
||||
if (armed)
|
||||
{ release(); }
|
||||
}
|
||||
|
||||
void release() noexcept
|
||||
{
|
||||
if (!armed)
|
||||
{ return; }
|
||||
|
||||
armed = false;
|
||||
promiseChainLink.removeAcquiredLock(coQutex);
|
||||
coQutex.release();
|
||||
}
|
||||
|
||||
void operator()() noexcept
|
||||
{
|
||||
release();
|
||||
}
|
||||
|
||||
private:
|
||||
PromiseChainLink &promiseChainLink;
|
||||
CoQutex &coQutex;
|
||||
bool armed = true;
|
||||
};
|
||||
|
||||
inline CoQutex::ReleaseHandle
|
||||
// [[nodiscard("store co_await result; lock is held until ReleaseHandle is released")]]
|
||||
CoQutex::AcquireInvocationAndSuspensionPolicy::await_resume() noexcept
|
||||
{
|
||||
assert(acquirerChainLink != nullptr);
|
||||
acquirerChainLink->addAcquiredLock(coQutex);
|
||||
return CoQutex::ReleaseHandle(*acquirerChainLink, coQutex);
|
||||
}
|
||||
|
||||
} // namespace sscl::co
|
||||
|
||||
#endif // CO_QUTEX_H
|
||||
@@ -0,0 +1,599 @@
|
||||
#ifndef GROUP_H
|
||||
#define GROUP_H
|
||||
|
||||
#include <cassert>
|
||||
#include <coroutine>
|
||||
#include <cstddef>
|
||||
#include <exception>
|
||||
#include <functional>
|
||||
#include <iostream>
|
||||
#include <iterator>
|
||||
#include <sstream>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include <boost/asio/io_service.hpp>
|
||||
#include <boost/asio/post.hpp>
|
||||
|
||||
#include <spinscale/componentThread.h>
|
||||
#include <spinscale/co/promiseChainLink.h>
|
||||
#include <spinscale/sharedResourceGroup.h>
|
||||
#include <spinscale/spinLock.h>
|
||||
|
||||
namespace sscl::co {
|
||||
|
||||
namespace detail {
|
||||
|
||||
template <typename T, typename H>
|
||||
concept await_suspend_returns_void = requires(T &t, H h) {
|
||||
{ t.await_suspend(h) } -> std::same_as<void>;
|
||||
};
|
||||
|
||||
template <typename T, typename H>
|
||||
concept await_suspend_returns_bool = requires(T &t, H h) {
|
||||
{ t.await_suspend(h) } -> std::convertible_to<bool>;
|
||||
};
|
||||
|
||||
template <typename T, typename H>
|
||||
concept await_suspend_returns_handle = requires(T &t, H h) {
|
||||
{ t.await_suspend(h) } -> std::convertible_to<std::coroutine_handle<>>;
|
||||
};
|
||||
|
||||
template <typename T, typename H>
|
||||
concept await_suspend_ok = await_suspend_returns_void<T, H>
|
||||
|| await_suspend_returns_bool<T, H>
|
||||
|| await_suspend_returns_handle<T, H>;
|
||||
|
||||
template <typename T, typename H = std::coroutine_handle<>>
|
||||
concept AwaiterIface = requires(T &t, H h) {
|
||||
{ t.await_ready() } -> std::convertible_to<bool>;
|
||||
{ t.await_resume() };
|
||||
} && await_suspend_ok<T, H>;
|
||||
|
||||
template <typename T>
|
||||
auto get_operator_co_await(T &t) -> decltype(operator co_await(t));
|
||||
|
||||
template <typename T>
|
||||
concept AwaitableIface = requires(T &t) {
|
||||
{ get_operator_co_await(t) };
|
||||
} && AwaiterIface<decltype(get_operator_co_await(std::declval<T &>()))>;
|
||||
|
||||
} // namespace detail
|
||||
|
||||
template <typename T>
|
||||
concept AwaitableIface = detail::AwaitableIface<T>;
|
||||
|
||||
template <typename T>
|
||||
concept AwaiterIface = detail::AwaiterIface<T>;
|
||||
|
||||
template <typename T>
|
||||
concept AwaitableOrAwaiterIface = AwaiterIface<T> || AwaitableIface<T>;
|
||||
|
||||
template <typename Invoker>
|
||||
requires AwaitableOrAwaiterIface<Invoker>
|
||||
struct Group
|
||||
{
|
||||
enum class AwaitingCondition {
|
||||
NONE, FIRST_SETTLED, ALL_SETTLED
|
||||
};
|
||||
|
||||
class SettlementDescriptor
|
||||
{
|
||||
public:
|
||||
enum class TypeE {
|
||||
/* We track EXCEPTIION_THROWN but we don't provide an
|
||||
* awaitInvoker for exception events. The caller can
|
||||
* wait for settlements and then scan the result set
|
||||
* to manually deal with exceptions.
|
||||
*/
|
||||
UNSETTLED, COMPLETED, EXCEPTION_THROWN
|
||||
};
|
||||
|
||||
SettlementDescriptor(Invoker &_invoker)
|
||||
: invoker(std::ref(_invoker))
|
||||
{}
|
||||
|
||||
void setSettlementStatus() noexcept
|
||||
{
|
||||
assert(type == TypeE::UNSETTLED);
|
||||
|
||||
if (calleeException) {
|
||||
type = TypeE::EXCEPTION_THROWN;
|
||||
} else {
|
||||
type = TypeE::COMPLETED;
|
||||
}
|
||||
}
|
||||
|
||||
TypeE type = TypeE::UNSETTLED;
|
||||
std::exception_ptr calleeException = nullptr;
|
||||
std::exception_ptr adapterException = nullptr;
|
||||
std::reference_wrapper<Invoker> invoker;
|
||||
};
|
||||
|
||||
struct SettlementAwaitingInvoker;
|
||||
struct AwaitFirstSettlementInvoker;
|
||||
struct AwaitAllSettlementsInvoker;
|
||||
|
||||
// getAwaitNextSettlementInvoker();
|
||||
AwaitFirstSettlementInvoker getAwaitFirstSettlementInvoker()
|
||||
{ return AwaitFirstSettlementInvoker(*this); }
|
||||
|
||||
AwaitAllSettlementsInvoker getAwaitAllSettlementsInvoker()
|
||||
{ return AwaitAllSettlementsInvoker(*this); }
|
||||
|
||||
bool verifyAllInvokersSettled() const
|
||||
{
|
||||
for (auto &desc : s.rsrc.settlements) {
|
||||
if (desc.type == SettlementDescriptor::TypeE::UNSETTLED) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool firstInvokerSettled() const
|
||||
{ return s.rsrc.firstSettledInvokerIdx >= 0; }
|
||||
|
||||
bool allInvokersSettled() const
|
||||
{
|
||||
const std::size_t nInvokersAdded = s.rsrc.settlements.size();
|
||||
assert(s.rsrc.nInvokersSettled <= nInvokersAdded);
|
||||
return s.rsrc.nInvokersSettled == nInvokersAdded;
|
||||
}
|
||||
|
||||
/** Caller must hold s.lock. */
|
||||
void throwIfNoMemberInvokersForCoAwaitUnderLock() const
|
||||
{
|
||||
if (s.rsrc.settlements.empty()) {
|
||||
throw std::runtime_error(
|
||||
"co_await: Group has no member invokers; call add() before awaiting");
|
||||
}
|
||||
}
|
||||
|
||||
struct SettlementAwaitingInvoker
|
||||
{
|
||||
explicit SettlementAwaitingInvoker(Group &_group)
|
||||
: parentGroup(_group)
|
||||
{}
|
||||
|
||||
bool await_ready() const { return false; }
|
||||
|
||||
/** EXPLANATION:
|
||||
* This exists for if we ever need to re-make the adapter coro
|
||||
* throw exceptions. But we decided to make it noexcept in order
|
||||
* to avoid this complication.
|
||||
*/
|
||||
void checkForAndReThrowAdapterExceptions() const
|
||||
{
|
||||
std::ostringstream ostream;
|
||||
bool doThrow = false;
|
||||
|
||||
for (auto &item : parentGroup.s.rsrc.settlements)
|
||||
{
|
||||
if (!item.adapterException) {
|
||||
continue;
|
||||
}
|
||||
|
||||
doThrow = true;
|
||||
ostream << "Exc thrown in Group Adapter: ";
|
||||
try {
|
||||
std::rethrow_exception(item.adapterException);
|
||||
} catch (const std::exception &e) {
|
||||
ostream << e.what();
|
||||
} catch (...) {
|
||||
ostream << "<unknown exception type>";
|
||||
}
|
||||
ostream << "\n";
|
||||
}
|
||||
|
||||
if (doThrow) {
|
||||
throw std::runtime_error(ostream.str());
|
||||
}
|
||||
}
|
||||
|
||||
Group &parentGroup;
|
||||
};
|
||||
|
||||
/** EXPLANATION:
|
||||
* AwaitingCondition and the group-awaiter coroutine_handle are set only
|
||||
* in await_suspend when this co_await actually suspends. Constructing
|
||||
* several AwaitFirstSettlementInvoker / AwaitAllSettlementsInvoker
|
||||
* objects without co_awaiting them is harmless.
|
||||
*
|
||||
* You may co_await await-all and later co_await await-first (in either
|
||||
* construction order). After a suspending wait completes, the adapter
|
||||
* clears handle state in updateSettlementsStateAndAwakenCallerIfConditionMet,
|
||||
* so a later co_await on another handle (or a second co_await on the same
|
||||
* handle, after the first finished) is legal.
|
||||
*
|
||||
* Only one group co_await may be suspended with a registered handle at a
|
||||
* time; a second concurrent co_await trips assert(!callerHasSetSchedHandle)
|
||||
* in debug builds.
|
||||
*
|
||||
* firstSettledInvokerIdx and calleeWasReadyToNotifyOfFirstSettlement are
|
||||
* sticky for the Group lifetime (first member ever to settle), not per wave.
|
||||
*/
|
||||
struct AwaitFirstSettlementInvoker
|
||||
: public SettlementAwaitingInvoker
|
||||
{
|
||||
using SettlementAwaitingInvoker::SettlementAwaitingInvoker;
|
||||
|
||||
bool await_suspend(std::coroutine_handle<> groupAwaiterSchedHandle)
|
||||
{
|
||||
/* No other group co_await may be suspended with a registered handle.
|
||||
* Sequential co_await on the same object is allowed after the prior
|
||||
* wait finished and clearCallerSchedHandleState() ran on wake.
|
||||
*/
|
||||
assert(!this->parentGroup.s.rsrc.callerHasSetSchedHandle);
|
||||
|
||||
sscl::SpinLock::Guard guard(this->parentGroup.s.lock);
|
||||
|
||||
this->parentGroup.throwIfNoMemberInvokersForCoAwaitUnderLock();
|
||||
|
||||
if (this->parentGroup.s.rsrc.calleeWasReadyToNotifyOfFirstSettlement) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/* We store away the coro_handle of the
|
||||
* group awaiter, and suspend that group awaiter.
|
||||
*/
|
||||
this->parentGroup.s.rsrc.setCallerSchedHandleAndCondition(
|
||||
groupAwaiterSchedHandle, AwaitingCondition::FIRST_SETTLED);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::pair<SettlementDescriptor &, std::vector<SettlementDescriptor> &>
|
||||
await_resume()
|
||||
{
|
||||
assert(this->parentGroup.firstInvokerSettled());
|
||||
return {
|
||||
this->parentGroup.s.rsrc.settlements[
|
||||
this->parentGroup.s.rsrc.firstSettledInvokerIdx],
|
||||
this->parentGroup.s.rsrc.settlements
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
/** EXPLANATION:
|
||||
* Same awaiting rules as AwaitFirstSettlementInvoker (see above).
|
||||
*
|
||||
* It is illegal to add() new members while a group co_await is suspended
|
||||
* (groupAwaiterSchedHandle is registered). You may add() after co_await
|
||||
* returns, including starting a new settlement wave before the next
|
||||
* co_await.
|
||||
*/
|
||||
struct AwaitAllSettlementsInvoker
|
||||
: public SettlementAwaitingInvoker
|
||||
{
|
||||
using SettlementAwaitingInvoker::SettlementAwaitingInvoker;
|
||||
|
||||
bool await_suspend(std::coroutine_handle<> groupAwaiterSchedHandle)
|
||||
{
|
||||
/* See AwaitFirstSettlementInvoker::await_suspend. Handle state is
|
||||
* cleared when the adapter wakes a suspended group co_awaiter, not
|
||||
* in await_resume.
|
||||
*/
|
||||
assert(!this->parentGroup.s.rsrc.callerHasSetSchedHandle);
|
||||
|
||||
sscl::SpinLock::Guard guard(this->parentGroup.s.lock);
|
||||
|
||||
this->parentGroup.throwIfNoMemberInvokersForCoAwaitUnderLock();
|
||||
|
||||
if (this->parentGroup.allInvokersSettled()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
this->parentGroup.s.rsrc.setCallerSchedHandleAndCondition(
|
||||
groupAwaiterSchedHandle, AwaitingCondition::ALL_SETTLED);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::vector<SettlementDescriptor> &await_resume()
|
||||
{
|
||||
assert(this->parentGroup.allInvokersSettled());
|
||||
return this->parentGroup.s.rsrc.settlements;
|
||||
}
|
||||
};
|
||||
|
||||
struct NonAwaitableNonPostingAdapterCoro
|
||||
{
|
||||
struct promise_type
|
||||
: public PromiseChainLink
|
||||
{
|
||||
NonAwaitableNonPostingAdapterCoro get_return_object() noexcept
|
||||
{ return {}; }
|
||||
|
||||
void removeAcquiredLock(CoQutex &) noexcept override
|
||||
{}
|
||||
|
||||
std::suspend_never initial_suspend() noexcept { return {}; }
|
||||
/** EXPLANATION:
|
||||
* final_suspend must return suspend_never here so that
|
||||
* this fire-and-forget adapter coro will be self-destroying.
|
||||
*/
|
||||
std::suspend_never final_suspend() noexcept { return {}; }
|
||||
void return_void() noexcept { return; }
|
||||
void unhandled_exception() noexcept
|
||||
{
|
||||
try {
|
||||
auto eptr = std::current_exception();
|
||||
if (eptr) {
|
||||
std::rethrow_exception(eptr);
|
||||
}
|
||||
} catch (const std::exception &e) {
|
||||
std::cerr << "Unhandled exception in Group adapter coroutine:\n"
|
||||
<< e.what() << "\n";
|
||||
} catch (...) {
|
||||
std::cerr << "Unhandled non-std exception in Group adapter coroutine\n";
|
||||
}
|
||||
|
||||
std::terminate();
|
||||
}
|
||||
};
|
||||
|
||||
NonAwaitableNonPostingAdapterCoro() noexcept = default;
|
||||
NonAwaitableNonPostingAdapterCoro operator co_await() const = delete;
|
||||
bool await_ready() const { std::terminate(); return false; }
|
||||
void await_suspend() const { std::terminate(); }
|
||||
void await_resume() const { std::terminate(); }
|
||||
};
|
||||
|
||||
std::pair<bool, bool>
|
||||
updateSettlementsStateAndAwakenCallerIfConditionMet(
|
||||
std::size_t settlementIndex) noexcept
|
||||
{
|
||||
bool isFirstSettlement = false;
|
||||
bool isLastSettlement = false;
|
||||
std::coroutine_handle<> groupAwaiterSchedHandleToWake = nullptr;
|
||||
|
||||
{
|
||||
sscl::SpinLock::Guard guard(s.lock);
|
||||
|
||||
/* If we can be certain that the AllSettled condition won't
|
||||
* be triggered repeatedly, then we can get rid of
|
||||
* calleeWasReadyToNotifyOfLastSettlementForCurrentSet.
|
||||
*/
|
||||
assert(s.rsrc.nInvokersSettled < s.rsrc.settlements.size());
|
||||
assert(settlementIndex < s.rsrc.settlements.size());
|
||||
s.rsrc.nInvokersSettled++;
|
||||
|
||||
if (!firstInvokerSettled())
|
||||
{
|
||||
isFirstSettlement = true;
|
||||
s.rsrc.firstSettledInvokerIdx = static_cast<int>(settlementIndex);
|
||||
|
||||
/* This should be set-once & sticky throughout the lifetime
|
||||
* of the Group object. The first invoker only gets
|
||||
* settled once, irrespective of how many
|
||||
* AwaitFirstSettlementInvoker instances we create.
|
||||
*/
|
||||
s.rsrc.calleeWasReadyToNotifyOfFirstSettlement = true;
|
||||
}
|
||||
|
||||
if (allInvokersSettled())
|
||||
{
|
||||
assert(s.rsrc.nInvokersSettled == s.rsrc.settlements.size());
|
||||
assert(verifyAllInvokersSettled());
|
||||
isLastSettlement = true;
|
||||
}
|
||||
|
||||
/* If no group co_awaiter registered a handle (did not suspend, or
|
||||
* already woke and clearCallerSchedHandleState ran), there is
|
||||
* nothing to post back to.
|
||||
*/
|
||||
if (!s.rsrc.callerHasSetSchedHandle) {
|
||||
return {isFirstSettlement, isLastSettlement};
|
||||
}
|
||||
|
||||
/* If we're here, then callerHasSetSchedHandle must be true.
|
||||
* I.e: an invoker has been created and co_awaited for one of the
|
||||
* conditions.
|
||||
* Therefore currentAwaitingCondition must also have been set,
|
||||
* since currentAwaitingCondition is set in the invokers' ctors.
|
||||
*/
|
||||
assert(s.rsrc.currentAwaitingCondition != AwaitingCondition::NONE);
|
||||
|
||||
if ((isFirstSettlement
|
||||
&& s.rsrc.currentAwaitingCondition == AwaitingCondition::FIRST_SETTLED)
|
||||
|| (isLastSettlement
|
||||
&& s.rsrc.currentAwaitingCondition == AwaitingCondition::ALL_SETTLED))
|
||||
{
|
||||
groupAwaiterSchedHandleToWake = s.rsrc.groupAwaiterSchedHandle;
|
||||
|
||||
/** We only clear here and not in await_resume, because if
|
||||
* the caller hasn't already set it schedHandle by the time we're
|
||||
* called, then when it eventually does call await_suspend, it
|
||||
* won't set it then either.
|
||||
*
|
||||
* I.e: callerSchedHandle only needs to be cleared it if gets set
|
||||
* in the first place;
|
||||
* And it only gets set if we need to invoke the schedHandle from
|
||||
* here.
|
||||
* If the group co_awaiter is able to call await_resume, then it
|
||||
* simply doesn't set its schedHandle at all.
|
||||
*/
|
||||
s.rsrc.clearCallerSchedHandleState();
|
||||
}
|
||||
}
|
||||
|
||||
if (groupAwaiterSchedHandleToWake)
|
||||
{
|
||||
/* We should be able to just directly resume() the group awaiter's handle
|
||||
* here because that would invoke await_resume, which may destroy the
|
||||
* callee's promise.
|
||||
* And who is the callee? Is it not this coro here? And this coro
|
||||
* hasn't been suspended. So we'd be destroying ourself while we're
|
||||
* not suspended.
|
||||
*
|
||||
* But all of that only applies __IFF__ we actually do try to destroy
|
||||
* the callee within the caller's Invoker. If we don't, then the callee
|
||||
* should persist just fine. There's no implicit mechanism that
|
||||
* will always destroy the callee coro state before the invoker
|
||||
* is destroyed.
|
||||
* If that was in fact the way it worked, then fire-and-forget coros
|
||||
* would be impossible.
|
||||
*
|
||||
* So we should be able to call resume() directly here without
|
||||
* post()ing to current_io_context().
|
||||
*
|
||||
* EXPLANATION:
|
||||
* However, in order to ensure that we keep this adapter coro
|
||||
* method exception-free, we are forced to post() rather than
|
||||
* directly calling the handle.
|
||||
*/
|
||||
boost::asio::post(
|
||||
sscl::ComponentThread::getSelf()->getIoService(),
|
||||
groupAwaiterSchedHandleToWake);
|
||||
}
|
||||
|
||||
return {isFirstSettlement, isLastSettlement};
|
||||
}
|
||||
|
||||
/** EXPLANATION:
|
||||
* This coro is a coro which has a promise, and does __not__ expose an awaitable
|
||||
* iface and in fact should not be capable of being awaited, ultimately.
|
||||
*
|
||||
* Its purpose is to be an adapter that enables the Group class to invoke the
|
||||
* invokers that are added to it, without having to co_await those invokers.
|
||||
* Rather, the Group class simply invokes this function on them, and then this
|
||||
* function both co_awaits the invoker on behalf of the Group class, and also
|
||||
* performs the normal function of an invoker, which is both to invoke the
|
||||
* target async fn, and also to convey its results back to the Group class.
|
||||
* It's effectively a go-between coro that provides the outcomes that Invokers
|
||||
* normally provide, without needing, itself, to be co_awaited.
|
||||
*/
|
||||
NonAwaitableNonPostingAdapterCoro nonAwaitableAdapterCoro(
|
||||
std::size_t settlementIndex) noexcept
|
||||
{
|
||||
/** EXPLANATION:
|
||||
* It's very convenient that our design for the NonViralNonSuspendingInvoker
|
||||
* coincidentally allows us to supply a lambda that can be used to test
|
||||
* for the settlement conditions that are being waited on by the Group's
|
||||
* co_awaiter.
|
||||
*
|
||||
* settlementIndex is captured by value (not a vector iterator) so adapter
|
||||
* coros remain valid if settlements reallocate during concurrent add().
|
||||
*/
|
||||
try {
|
||||
/* Return values remain in the callee promise until the caller-owned
|
||||
* invoker is destroyed (~PostingInvoker). The group co_awaiter reads
|
||||
* results via settlements[settlementIndex].invoker after awaiting.
|
||||
*
|
||||
* Index settlements[] each time; do not cache a reference across
|
||||
* co_await because concurrent add() may reallocate the vector.
|
||||
*/
|
||||
co_await s.rsrc.settlements[settlementIndex].invoker.get();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
s.rsrc.settlements[settlementIndex].calleeException =
|
||||
std::current_exception();
|
||||
}
|
||||
|
||||
/* From here onwards, we mustn't throw(). Unhandled exceptions
|
||||
* generated by the adapter coro itself will result in
|
||||
* std::terminate().
|
||||
*/
|
||||
s.rsrc.settlements[settlementIndex].setSettlementStatus();
|
||||
updateSettlementsStateAndAwakenCallerIfConditionMet(settlementIndex);
|
||||
|
||||
co_return;
|
||||
}
|
||||
|
||||
/** EXPLANATION:
|
||||
* Each invoker passed to add() must outlive this Group and the callee frame
|
||||
* (see ~PostingInvoker). The group co_awaiter reads return values from those
|
||||
* invokers after awaiting; do not destroy an invoker until reads are done.
|
||||
*/
|
||||
void add(Invoker &invoker)
|
||||
{
|
||||
std::size_t settlementIndex = 0;
|
||||
|
||||
{
|
||||
sscl::SpinLock::Guard guard(s.lock);
|
||||
|
||||
if (s.rsrc.groupAwaiterSchedHandle)
|
||||
{
|
||||
throw std::runtime_error(
|
||||
"add: New member invokers mustn't be added "
|
||||
"while co_awaiting a given set");
|
||||
}
|
||||
|
||||
settlementIndex = s.rsrc.settlements.size();
|
||||
s.rsrc.settlements.emplace_back(invoker);
|
||||
}
|
||||
|
||||
nonAwaitableAdapterCoro(settlementIndex);
|
||||
}
|
||||
|
||||
void checkForAndReThrowGroupExceptions() const
|
||||
{
|
||||
std::ostringstream ostream;
|
||||
bool doThrow = false;
|
||||
|
||||
for (auto &item : s.rsrc.settlements)
|
||||
{
|
||||
if (item.type != SettlementDescriptor::TypeE::EXCEPTION_THROWN) {
|
||||
continue;
|
||||
}
|
||||
|
||||
assert(item.calleeException);
|
||||
|
||||
doThrow = true;
|
||||
ostream << "Exc thrown in Group Adapter: ";
|
||||
try {
|
||||
std::rethrow_exception(item.calleeException);
|
||||
} catch (const std::exception &e) {
|
||||
ostream << e.what();
|
||||
} catch (...) {
|
||||
ostream << "<unknown exception type>";
|
||||
}
|
||||
ostream << "\n";
|
||||
}
|
||||
|
||||
if (doThrow) {
|
||||
throw std::runtime_error(ostream.str());
|
||||
}
|
||||
}
|
||||
|
||||
struct State
|
||||
{
|
||||
void clearCallerSchedHandleState() noexcept
|
||||
{
|
||||
groupAwaiterSchedHandle = nullptr;
|
||||
callerHasSetSchedHandle = false;
|
||||
currentAwaitingCondition = AwaitingCondition::NONE;
|
||||
}
|
||||
|
||||
void setCallerSchedHandleAndCondition(
|
||||
std::coroutine_handle<> groupAwaiterSchedHandleIn,
|
||||
AwaitingCondition awaitingCondition) noexcept
|
||||
{
|
||||
groupAwaiterSchedHandle = groupAwaiterSchedHandleIn;
|
||||
callerHasSetSchedHandle = true;
|
||||
currentAwaitingCondition = awaitingCondition;
|
||||
}
|
||||
|
||||
int firstSettledInvokerIdx = -1;
|
||||
std::size_t nInvokersSettled = 0;
|
||||
std::coroutine_handle<> groupAwaiterSchedHandle = nullptr;
|
||||
bool callerHasSetSchedHandle = false;
|
||||
/* calleWasReady*First* is an indelible record of what
|
||||
* occured during the first settlement's adapter's update.
|
||||
*/
|
||||
bool calleeWasReadyToNotifyOfFirstSettlement = false;
|
||||
std::vector<SettlementDescriptor> settlements;
|
||||
AwaitingCondition currentAwaitingCondition = AwaitingCondition::NONE;
|
||||
};
|
||||
|
||||
sscl::SharedResourceGroup<sscl::SpinLock, State> s;
|
||||
};
|
||||
|
||||
} // namespace sscl::co
|
||||
|
||||
#endif // GROUP_H
|
||||
@@ -0,0 +1,149 @@
|
||||
#ifndef INVOKERS_H
|
||||
#define INVOKERS_H
|
||||
|
||||
#include <config.h>
|
||||
#include <coroutine>
|
||||
#include <exception>
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
#include <stdexcept>
|
||||
#include <thread>
|
||||
#include <type_traits>
|
||||
|
||||
#include <spinscale/co/postingInvoker.h>
|
||||
|
||||
namespace sscl::co {
|
||||
|
||||
/** Non-viral coroutine entry that must not be co_awaited: promise is always
|
||||
* PostingPromiseTemplate<void> (no return-value path to a caller).
|
||||
*
|
||||
* The invoker must outlive the callee frame: do not discard the return object
|
||||
* from get_return_object(). ~PostingInvoker destroys the callee frame.
|
||||
*/
|
||||
template <template <typename> class PostingPromiseTemplate>
|
||||
struct NonViralNonSuspendingInvoker
|
||||
: public PostingInvoker<PostingPromiseTemplate<void>, void>
|
||||
{
|
||||
struct promise_type
|
||||
: public PostingPromiseTemplate<void>
|
||||
{
|
||||
using PostingPromiseTemplate<void>::PostingPromiseTemplate;
|
||||
|
||||
NonViralNonSuspendingInvoker<PostingPromiseTemplate> get_return_object()
|
||||
{
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id() << " Returning NonViralNonSuspendingInvoker.\n";
|
||||
#endif
|
||||
if (!this->callerLambda)
|
||||
{
|
||||
/** EXPLANATION:
|
||||
* We require a completion lambda to be provided to the
|
||||
* non-viral coroutines, because that's how we internally
|
||||
* distinguish between non-viral and viral coroutines.
|
||||
*
|
||||
* Additionally, non-viral coroutines almost never have a
|
||||
* good reason to not have a completion lambda.
|
||||
*/
|
||||
std::ostringstream oss;
|
||||
oss << std::this_thread::get_id()
|
||||
<< ": Missing completion lambda: non-viral coroutines require a completion lambda.";
|
||||
throw std::runtime_error(oss.str());
|
||||
}
|
||||
|
||||
this->setSelfSchedHandle(
|
||||
std::coroutine_handle<promise_type>::from_promise(*this));
|
||||
|
||||
return NonViralNonSuspendingInvoker<PostingPromiseTemplate>(*this);
|
||||
}
|
||||
};
|
||||
|
||||
using PostingInvoker<PostingPromiseTemplate<void>, void>::PostingInvoker;
|
||||
|
||||
bool await_ready() const noexcept
|
||||
{ std::terminate(); }
|
||||
|
||||
void await_suspend(std::coroutine_handle<NonViralNonSuspendingInvoker<PostingPromiseTemplate>>) noexcept
|
||||
{ std::terminate(); }
|
||||
|
||||
void await_resume() noexcept
|
||||
{ std::terminate(); }
|
||||
};
|
||||
|
||||
/** Viral awaitable: promise_type inherits PostingPromiseTemplate<T> (posting
|
||||
* target chosen by the posting-promise alias, e.g. BodyPostingPromise<int>).
|
||||
*
|
||||
* The invoker must outlive the callee frame until results are read.
|
||||
* ~PostingInvoker destroys the callee frame (not await_resume).
|
||||
*/
|
||||
template <template <typename> class PostingPromiseTemplate, typename T>
|
||||
struct ViralSuspendingInvoker
|
||||
: public PostingInvoker<PostingPromiseTemplate<T>, T>
|
||||
{
|
||||
struct promise_type
|
||||
: public PostingPromiseTemplate<T>
|
||||
{
|
||||
using PostingPromiseTemplate<T>::PostingPromiseTemplate;
|
||||
|
||||
ViralSuspendingInvoker<PostingPromiseTemplate, T> get_return_object() noexcept
|
||||
{
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id() << " Returning ViralSuspendingInvoker.\n";
|
||||
#endif
|
||||
this->setSelfSchedHandle(
|
||||
std::coroutine_handle<promise_type>::from_promise(*this));
|
||||
|
||||
return ViralSuspendingInvoker<PostingPromiseTemplate, T>(*this);
|
||||
}
|
||||
};
|
||||
|
||||
using PostingInvoker<PostingPromiseTemplate<T>, T>::PostingInvoker;
|
||||
|
||||
bool await_ready() const noexcept
|
||||
{
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id() << " Returning false.\n";
|
||||
#endif
|
||||
return false;
|
||||
}
|
||||
|
||||
template <typename CallerPromise>
|
||||
bool await_suspend(std::coroutine_handle<CallerPromise> callerSchedHandle) noexcept
|
||||
{
|
||||
static_assert(
|
||||
std::is_base_of_v<PromiseChainLink, CallerPromise>,
|
||||
"ViralSuspendingInvoker caller promise must derive from PromiseChainLink");
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id() << " Setting callerSchedHandle.\n";
|
||||
#endif
|
||||
const bool suspendCaller = this->setCallerSchedHandle(callerSchedHandle);
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id()
|
||||
<< " CallerFlowExecutor returned suspend=" << suspendCaller << ".\n";
|
||||
#endif
|
||||
|
||||
/** EXPLANATION:
|
||||
* If the callee was ready to post-back, then we don't need to
|
||||
* suspend the caller -- so return either false or
|
||||
* a symmetric transfer handle to the `callerSchedHandle` we were
|
||||
* passed as an argument.
|
||||
*
|
||||
* If the callee is not ready to post-back, then we need to suspend
|
||||
* the caller so that the caller can suspend until the callee posts
|
||||
* the callerSchedHandle to the callerIoContext -- so return true
|
||||
* or std::noop_coroutine().
|
||||
*/
|
||||
return suspendCaller;
|
||||
}
|
||||
|
||||
T await_resume()
|
||||
{
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id() << " Resumed on caller thread, hopefully.\n";
|
||||
#endif
|
||||
return PostingInvoker<PostingPromiseTemplate<T>, T>::await_resume();
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace sscl::co
|
||||
|
||||
#endif // INVOKERS_H
|
||||
@@ -0,0 +1,107 @@
|
||||
#ifndef POSTING_INVOKER_H
|
||||
#define POSTING_INVOKER_H
|
||||
|
||||
#include <config.h>
|
||||
#include <coroutine>
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
|
||||
#include <spinscale/co/promises.h>
|
||||
|
||||
namespace sscl::co {
|
||||
|
||||
template <typename PromiseType, typename T>
|
||||
class PostingInvoker
|
||||
{
|
||||
public:
|
||||
explicit PostingInvoker(PromiseType &_calleePromise) noexcept
|
||||
: calleePromise(_calleePromise)
|
||||
{}
|
||||
|
||||
PostingInvoker(const PostingInvoker &) = delete;
|
||||
PostingInvoker &operator=(const PostingInvoker &) = delete;
|
||||
|
||||
PostingInvoker(PostingInvoker &&other) noexcept
|
||||
: calleePromise(other.calleePromise),
|
||||
ownsFrameDestroy_(std::exchange(other.ownsFrameDestroy_, false))
|
||||
{}
|
||||
|
||||
PostingInvoker &operator=(PostingInvoker &&other) = delete;
|
||||
|
||||
~PostingInvoker() noexcept
|
||||
{
|
||||
if (!ownsFrameDestroy_) { return; }
|
||||
|
||||
std::coroutine_handle<> handle = calleePromise.selfSchedHandle;
|
||||
if (handle) {
|
||||
handle.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
template <typename CallerPromise>
|
||||
bool setCallerSchedHandle(std::coroutine_handle<CallerPromise> callerSchedHandle) noexcept
|
||||
{
|
||||
static_assert(
|
||||
std::is_base_of_v<PromiseChainLink, CallerPromise>,
|
||||
"PostingInvoker caller promise must derive from PromiseChainLink");
|
||||
|
||||
calleePromise.callerSchedHandle = callerSchedHandle;
|
||||
calleePromise.setCallerPromiseChainLink(&callerSchedHandle.promise());
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id()
|
||||
<< " Done setting callerSchedHandle; running CallerFlowExecutor.\n";
|
||||
#endif
|
||||
return calleePromise.postBackStatus.getCallerFlowExecutor()();
|
||||
}
|
||||
|
||||
ReturnValues<T> &completedReturnValues() noexcept
|
||||
{ return calleePromise.returnValues; }
|
||||
|
||||
const ReturnValues<T> &completedReturnValues() const noexcept
|
||||
{ return calleePromise.returnValues; }
|
||||
|
||||
auto await_resume()
|
||||
{
|
||||
calleePromise.postBackStatus.reset();
|
||||
|
||||
ReturnValues<T> &returnValues = calleePromise.returnValues;
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id()
|
||||
<< " About to check for and rethrow any exception.\n";
|
||||
#endif
|
||||
|
||||
if (returnValues.myExceptionPtr)
|
||||
{
|
||||
std::exception_ptr const captured = returnValues.myExceptionPtr;
|
||||
std::rethrow_exception(captured);
|
||||
}
|
||||
if constexpr (!std::is_void_v<T>)
|
||||
{
|
||||
T result = std::move(returnValues.myReturnValue);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
PromiseType &calleePromise;
|
||||
|
||||
/** EXPLANATION:
|
||||
* Every live invoker owns destruction of its callee coroutine frame in
|
||||
* ~PostingInvoker (via calleePromise.selfSchedHandle).
|
||||
*
|
||||
* The only time frame destruction is skipped is for a moved-from invoker
|
||||
* after move construction or move assignment, so we do not double-destroy
|
||||
* the same handle when get_return_object() returns the invoker by value.
|
||||
*
|
||||
* This is not an opt-out for viral vs non-viral callers or for "callee
|
||||
* still running"; callers must keep the invoker alive until the callee
|
||||
* frame is no longer needed.
|
||||
*/
|
||||
bool ownsFrameDestroy_ = true;
|
||||
};
|
||||
|
||||
} // namespace sscl::co
|
||||
|
||||
#endif // POSTING_INVOKER_H
|
||||
@@ -0,0 +1,72 @@
|
||||
#ifndef PROMISE_CHAIN_LINK_H
|
||||
#define PROMISE_CHAIN_LINK_H
|
||||
|
||||
#include <functional>
|
||||
#include <list>
|
||||
|
||||
namespace sscl::co {
|
||||
|
||||
class CoQutex;
|
||||
|
||||
/**
|
||||
* Non-template base for coroutine promises participating in a logical
|
||||
* promise chain (analogous to libspinscale AsynchronousContinuationChainLink).
|
||||
* A future deadlock detector can walk callerPromiseChainLink() without
|
||||
* knowing concrete promise_type.
|
||||
*/
|
||||
class PromiseChainLink
|
||||
{
|
||||
public:
|
||||
virtual ~PromiseChainLink() = default;
|
||||
|
||||
/** Reserved for deadlock detection: link toward the caller / outer coroutine. */
|
||||
virtual const PromiseChainLink *callerPromiseChainLink() const noexcept
|
||||
{ return nullptr; }
|
||||
virtual PromiseChainLink *callerPromiseChainLink() noexcept
|
||||
{ return nullptr; }
|
||||
|
||||
void addAcquiredLock(CoQutex &coQutex) noexcept
|
||||
{ acquiredLocks.emplace_back(std::ref(coQutex)); }
|
||||
|
||||
bool holdsAcquiredLock(const CoQutex &coQutex) const noexcept
|
||||
{ return findMatchingAcquiredLock(coQutex) != acquiredLocks.end(); }
|
||||
|
||||
virtual void removeAcquiredLock(CoQutex &coQutex) noexcept = 0;
|
||||
|
||||
protected:
|
||||
using AcquiredLockList = std::list<std::reference_wrapper<CoQutex>>;
|
||||
|
||||
AcquiredLockList::iterator findMatchingAcquiredLock(CoQutex &coQutex) noexcept
|
||||
{
|
||||
for (auto it = acquiredLocks.begin(); it != acquiredLocks.end(); ++it) {
|
||||
if (&it->get() == &coQutex) {
|
||||
return it;
|
||||
}
|
||||
}
|
||||
return acquiredLocks.end();
|
||||
}
|
||||
|
||||
AcquiredLockList::const_iterator findMatchingAcquiredLock(const CoQutex &coQutex) const noexcept
|
||||
{
|
||||
for (auto it = acquiredLocks.begin(); it != acquiredLocks.end(); ++it) {
|
||||
if (&it->get() == &coQutex) {
|
||||
return it;
|
||||
}
|
||||
}
|
||||
return acquiredLocks.end();
|
||||
}
|
||||
|
||||
void eraseFirstMatchingAcquiredLock(CoQutex &coQutex) noexcept
|
||||
{
|
||||
auto match = findMatchingAcquiredLock(coQutex);
|
||||
if (match != acquiredLocks.end()) {
|
||||
acquiredLocks.erase(match);
|
||||
}
|
||||
}
|
||||
|
||||
AcquiredLockList acquiredLocks;
|
||||
};
|
||||
|
||||
} // namespace sscl::co
|
||||
|
||||
#endif // PROMISE_CHAIN_LINK_H
|
||||
@@ -0,0 +1,49 @@
|
||||
#ifndef PROMISE_CHAIN_WALKER_H
|
||||
#define PROMISE_CHAIN_WALKER_H
|
||||
|
||||
#include <cstddef>
|
||||
|
||||
#include <spinscale/co/promiseChainLink.h>
|
||||
|
||||
namespace sscl::co {
|
||||
|
||||
/**
|
||||
* Upper bound on caller-chain links visited after the root (guards cycles / bugs).
|
||||
*
|
||||
* Design posture (cf. docs/3rdParty/smo/libspinscale — continuation tracing vs
|
||||
* interpretation): this header performs trace-only walks along
|
||||
* PromiseChainLink::callerPromiseChainLink(); deadlock interpretation stays at
|
||||
* call sites / later policy.
|
||||
*/
|
||||
inline constexpr std::size_t kMaxCallerPromiseChainTraversalSteps = 4096;
|
||||
|
||||
inline const PromiseChainLink *nextOnCallerPromiseChain(
|
||||
const PromiseChainLink &link) noexcept
|
||||
{
|
||||
return link.callerPromiseChainLink();
|
||||
}
|
||||
|
||||
inline bool callerChainHopUnderStepLimit(
|
||||
std::size_t hopIndex) noexcept
|
||||
{
|
||||
return hopIndex < kMaxCallerPromiseChainTraversalSteps;
|
||||
}
|
||||
|
||||
template <typename Visitor>
|
||||
void walkCallerPromiseChainFrom(
|
||||
const PromiseChainLink &root, Visitor &&visitor)
|
||||
{
|
||||
visitor(root);
|
||||
const PromiseChainLink *next = nextOnCallerPromiseChain(root);
|
||||
for (std::size_t hopIndex = 0;
|
||||
next != nullptr && callerChainHopUnderStepLimit(hopIndex);
|
||||
++hopIndex)
|
||||
{
|
||||
visitor(*next);
|
||||
next = nextOnCallerPromiseChain(*next);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace sscl::co
|
||||
|
||||
#endif // PROMISE_CHAIN_WALKER_H
|
||||
@@ -0,0 +1,362 @@
|
||||
#ifndef PROMISES_H
|
||||
#define PROMISES_H
|
||||
|
||||
#include <config.h>
|
||||
#include <coroutine>
|
||||
#include <exception>
|
||||
#include <functional>
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
|
||||
#include <boost/asio/io_service.hpp>
|
||||
#include <boost/asio/post.hpp>
|
||||
|
||||
#include <spinscale/componentThread.h>
|
||||
#include <spinscale/co/coQutex.h>
|
||||
#include <spinscale/spinLock.h>
|
||||
|
||||
namespace sscl::co {
|
||||
|
||||
template <typename PromiseType, typename T>
|
||||
class PostingInvoker;
|
||||
|
||||
template <typename T, bool IsVoid = std::is_void_v<T>>
|
||||
struct ReturnValueStorage;
|
||||
|
||||
template <typename T>
|
||||
struct ReturnValueStorage<T, false>
|
||||
{
|
||||
T myReturnValue{};
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
struct ReturnValueStorage<T, true>
|
||||
{
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
struct ReturnValues
|
||||
: public ReturnValueStorage<T>
|
||||
{
|
||||
ReturnValues() noexcept
|
||||
: myExceptionPtr(myMemberExceptionPtr)
|
||||
{}
|
||||
|
||||
explicit ReturnValues(std::exception_ptr &callerExceptionPtr) noexcept
|
||||
: myExceptionPtr(callerExceptionPtr)
|
||||
{}
|
||||
|
||||
~ReturnValues() noexcept
|
||||
{
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id() << " Destructing.\n";
|
||||
#endif
|
||||
}
|
||||
|
||||
/** EXPLANATION:
|
||||
* The exception_ptr ref here can either point to the exception_ptr
|
||||
* a non-viral coroutine supplied to us as its storage space for
|
||||
* where we should store any exception that is thrown;
|
||||
*
|
||||
* Or it could point to the member exception_ptr in this very class,
|
||||
* which is used for viral coroutines that can bubble their exception
|
||||
* up and automatically via the language runtime.
|
||||
*/
|
||||
std::exception_ptr &myExceptionPtr;
|
||||
std::exception_ptr myMemberExceptionPtr = nullptr;
|
||||
};
|
||||
|
||||
/** `return_value` / `return_void` only. ThreadTag is not a template parameter here:
|
||||
* for tagged promises, PromiseType is `TaggedPostingPromise<T, ThreadTag>`.
|
||||
*/
|
||||
template <typename PromiseType, typename T, bool IsVoid = std::is_void_v<T>>
|
||||
struct PostingPromiseReturnOps;
|
||||
|
||||
template <typename PromiseType, typename T>
|
||||
struct PostingPromiseReturnOps<PromiseType, T, false>
|
||||
{
|
||||
void return_value(T returnValue) noexcept
|
||||
{
|
||||
static_cast<PromiseType *>(this)->returnValues.myReturnValue = std::move(returnValue);
|
||||
}
|
||||
};
|
||||
|
||||
template <typename PromiseType, typename T>
|
||||
struct PostingPromiseReturnOps<PromiseType, T, true>
|
||||
{
|
||||
void return_void() noexcept
|
||||
{
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
struct PostingPromise
|
||||
: public PromiseChainLink
|
||||
{
|
||||
struct PostBackStatus
|
||||
{
|
||||
struct CalleeFlowExecutor;
|
||||
struct CallerFlowExecutor;
|
||||
friend struct CalleeFlowExecutor;
|
||||
friend struct CallerFlowExecutor;
|
||||
|
||||
explicit PostBackStatus(PostingPromise &calleePromiseIn) noexcept
|
||||
: calleePromise(calleePromiseIn)
|
||||
{}
|
||||
|
||||
void reset() noexcept
|
||||
{
|
||||
sscl::SpinLock::Guard guard(lock);
|
||||
callerHasSetCallerSchedHandle = false;
|
||||
calleeIsReadyToPostBack = false;
|
||||
}
|
||||
|
||||
struct FlowExecutor
|
||||
{
|
||||
explicit FlowExecutor(PostBackStatus &parentIn) noexcept
|
||||
: parent(parentIn)
|
||||
{}
|
||||
|
||||
PostBackStatus &parent;
|
||||
};
|
||||
|
||||
struct CalleeFlowExecutor
|
||||
: public FlowExecutor
|
||||
{
|
||||
explicit CalleeFlowExecutor(PostBackStatus &parentIn) noexcept
|
||||
: FlowExecutor(parentIn)
|
||||
{}
|
||||
|
||||
void operator()() noexcept
|
||||
{
|
||||
sscl::SpinLock::Guard guard(this->parent.lock);
|
||||
this->parent.calleeIsReadyToPostBack = true;
|
||||
if (this->parent.callerHasSetCallerSchedHandle)
|
||||
{
|
||||
boost::asio::post(
|
||||
this->parent.calleePromise.callerIoContext,
|
||||
this->parent.calleePromise.callerSchedHandle);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct CallerFlowExecutor
|
||||
: public FlowExecutor
|
||||
{
|
||||
explicit CallerFlowExecutor(PostBackStatus &parentIn) noexcept
|
||||
: FlowExecutor(parentIn)
|
||||
{}
|
||||
|
||||
bool operator()() noexcept
|
||||
{
|
||||
sscl::SpinLock::Guard guard(this->parent.lock);
|
||||
this->parent.callerHasSetCallerSchedHandle = true;
|
||||
if (this->parent.calleeIsReadyToPostBack) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
CalleeFlowExecutor getCalleeFlowExecutor() noexcept
|
||||
{
|
||||
return CalleeFlowExecutor(*this);
|
||||
}
|
||||
|
||||
CallerFlowExecutor getCallerFlowExecutor() noexcept
|
||||
{
|
||||
return CallerFlowExecutor(*this);
|
||||
}
|
||||
|
||||
PostingPromise &calleePromise;
|
||||
|
||||
private:
|
||||
sscl::SpinLock lock;
|
||||
bool callerHasSetCallerSchedHandle = false;
|
||||
bool calleeIsReadyToPostBack = false;
|
||||
};
|
||||
|
||||
/** Post-to must run from this awaiter's await_suspend, not synchronously inside
|
||||
* promise.initial_suspend() before it returns: the implementation's hidden coroutine
|
||||
* state (async segment / suspend index used on the next resume()) is only updated
|
||||
* after initial_suspend has finished returning its awaiter. Posting the handle too
|
||||
* early lets the callee resume before that update and re-enter initial_suspend from
|
||||
* the start, duplicating the post. See docs/prompts/post-to-and-back-in-invokables.md.
|
||||
*/
|
||||
struct InitialSuspendPostingInvoker
|
||||
: public std::suspend_always
|
||||
{
|
||||
InitialSuspendPostingInvoker(
|
||||
boost::asio::io_service &targetIoServiceIn,
|
||||
std::coroutine_handle<> targetSchedHandleIn) noexcept
|
||||
: targetIoService(targetIoServiceIn),
|
||||
targetSchedHandle(targetSchedHandleIn)
|
||||
{}
|
||||
|
||||
bool await_suspend(std::coroutine_handle<> const) noexcept
|
||||
{
|
||||
boost::asio::post(targetIoService, targetSchedHandle);
|
||||
return true;
|
||||
}
|
||||
|
||||
boost::asio::io_service &targetIoService;
|
||||
std::coroutine_handle<> targetSchedHandle;
|
||||
};
|
||||
|
||||
/** Post-back (non-viral completion post; viral CalleeFlowExecutor) must run from this
|
||||
* awaiter's await_suspend, not synchronously inside promise.final_suspend() before it
|
||||
* returns: the hidden coroutine segment index in the coroutine state is only advanced
|
||||
* after final_suspend exits. Doing that work inside final_suspend's body risks the same
|
||||
* kind of ordering bug as initial_suspend—resume observing the wrong segment. See
|
||||
* docs/prompts/post-to-and-back-in-invokables.md.
|
||||
*/
|
||||
struct FinalSuspendPostingInvoker
|
||||
: public std::suspend_always
|
||||
{
|
||||
explicit FinalSuspendPostingInvoker(PostingPromise &calleePromiseIn) noexcept
|
||||
: calleePromise(calleePromiseIn)
|
||||
{}
|
||||
|
||||
bool await_suspend(std::coroutine_handle<> const) noexcept
|
||||
{
|
||||
if (calleePromise.callerLambda)
|
||||
{
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << "final_suspend" << ": " << std::this_thread::get_id()
|
||||
<< " Non-viral: posting callerLambda completion to callerIoContext.\n";
|
||||
#endif
|
||||
boost::asio::post(
|
||||
calleePromise.callerIoContext,
|
||||
[&calleeRef = calleePromise]()
|
||||
{
|
||||
if (calleeRef.returnValues.myExceptionPtr) {
|
||||
std::rethrow_exception(calleeRef.returnValues.myExceptionPtr);
|
||||
}
|
||||
|
||||
calleeRef.callerLambda();
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << "final_suspend" << ": " << std::this_thread::get_id()
|
||||
<< " Viral: running CalleeFlowExecutor.\n";
|
||||
#endif
|
||||
calleePromise.postBackStatus.getCalleeFlowExecutor()();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
PostingPromise &calleePromise;
|
||||
};
|
||||
|
||||
PostingPromise() noexcept
|
||||
: returnValues(), postBackStatus(*this)
|
||||
{}
|
||||
|
||||
template <typename... TailArgs>
|
||||
PostingPromise(
|
||||
std::exception_ptr &_callerExceptionPtr,
|
||||
std::function<void()> _callerLambda,
|
||||
TailArgs &&...) noexcept
|
||||
: returnValues(_callerExceptionPtr),
|
||||
callerLambda(std::move(_callerLambda)),
|
||||
postBackStatus(*this)
|
||||
{}
|
||||
|
||||
~PostingPromise() noexcept
|
||||
{
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id() << " Destructing.\n";
|
||||
#endif
|
||||
}
|
||||
|
||||
void unhandled_exception() noexcept
|
||||
{
|
||||
returnValues.myExceptionPtr = std::current_exception();
|
||||
}
|
||||
|
||||
void removeAcquiredLock(CoQutex &coQutex) noexcept override
|
||||
{
|
||||
eraseFirstMatchingAcquiredLock(coQutex);
|
||||
}
|
||||
|
||||
const PromiseChainLink *callerPromiseChainLink() const noexcept override
|
||||
{ return callerChainLink; }
|
||||
|
||||
PromiseChainLink *callerPromiseChainLink() noexcept override
|
||||
{ return callerChainLink; }
|
||||
|
||||
/** Non-viral: post completion lambda to callerIoContext from this thread.
|
||||
* Viral: run CalleeFlowExecutor (handshake flags); caller may post caller resume
|
||||
* later via CallerFlowExecutor. See docs/caller-posts-to-own-io-context.md.
|
||||
* Work runs in FinalSuspendPostingInvoker::await_suspend after the suspend point
|
||||
* advances (see docs/prompts/post-to-and-back-in-invokables.md).
|
||||
*/
|
||||
auto final_suspend() noexcept
|
||||
{
|
||||
return FinalSuspendPostingInvoker(*this);
|
||||
}
|
||||
|
||||
ReturnValues<T> returnValues;
|
||||
std::function<void()> callerLambda;
|
||||
boost::asio::io_service &callerIoContext =
|
||||
sscl::ComponentThread::getSelf()->getIoService();
|
||||
std::coroutine_handle<> selfSchedHandle;
|
||||
std::coroutine_handle<void> callerSchedHandle;
|
||||
PromiseChainLink *callerChainLink = nullptr;
|
||||
PostBackStatus postBackStatus;
|
||||
|
||||
protected:
|
||||
void setSelfSchedHandle(std::coroutine_handle<> schedHandle) noexcept
|
||||
{
|
||||
selfSchedHandle = schedHandle;
|
||||
}
|
||||
|
||||
void setCallerPromiseChainLink(PromiseChainLink *chainLink) noexcept
|
||||
{
|
||||
callerChainLink = chainLink;
|
||||
}
|
||||
|
||||
template <typename, typename>
|
||||
friend class PostingInvoker;
|
||||
};
|
||||
|
||||
template <typename T, typename ThreadTag>
|
||||
struct TaggedPostingPromise
|
||||
: public PostingPromise<T>,
|
||||
public PostingPromiseReturnOps<TaggedPostingPromise<T, ThreadTag>, T>
|
||||
{
|
||||
TaggedPostingPromise() noexcept
|
||||
: PostingPromise<T>()
|
||||
{}
|
||||
|
||||
template <typename... TailArgs>
|
||||
TaggedPostingPromise(
|
||||
std::exception_ptr &_exceptionPtr,
|
||||
std::function<void()> _callerLambda,
|
||||
TailArgs &&... tailArgs) noexcept
|
||||
: PostingPromise<T>(
|
||||
_exceptionPtr,
|
||||
std::move(_callerLambda),
|
||||
std::forward<TailArgs>(tailArgs)...)
|
||||
{}
|
||||
|
||||
auto initial_suspend() noexcept
|
||||
{
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id() << " About to post selfSchedHandle to " << typeid(ThreadTag).name() << ".\n";
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id() << " Returning InitialSuspendPostingInvoker.\n";
|
||||
#endif
|
||||
return typename PostingPromise<T>::InitialSuspendPostingInvoker(
|
||||
ThreadTag::io_service(),
|
||||
this->selfSchedHandle);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace sscl::co
|
||||
|
||||
#endif // PROMISES_H
|
||||
@@ -0,0 +1,19 @@
|
||||
#ifndef SHARED_RESOURCE_GROUP_H
|
||||
#define SHARED_RESOURCE_GROUP_H
|
||||
|
||||
namespace sscl {
|
||||
|
||||
template <typename LockType, typename ResourceType>
|
||||
class SharedResourceGroup
|
||||
{
|
||||
public:
|
||||
SharedResourceGroup() = default;
|
||||
~SharedResourceGroup() = default;
|
||||
|
||||
LockType lock;
|
||||
ResourceType rsrc;
|
||||
};
|
||||
|
||||
} // namespace sscl
|
||||
|
||||
#endif // SHARED_RESOURCE_GROUP_H
|
||||
Reference in New Issue
Block a user