diff --git a/CMakeLists.txt b/CMakeLists.txt index e83c5b9..8500e0e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,6 +18,7 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic") option(ENABLE_DEBUG_LOCKS "Enable debug features for locking system" OFF) option(ENABLE_DEBUG_TRACE_CALLABLES "Enable callable tracing for debugging boost::asio post operations" OFF) +option(ENABLE_DEBUG_CO "Enable coroutine-type debug logging" OFF) # Qutex deadlock detection configuration if(NOT DEFINED DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS) @@ -43,6 +44,10 @@ if(ENABLE_DEBUG_LOCKS) set(CONFIG_ENABLE_DEBUG_LOCKS TRUE) endif() +if(ENABLE_DEBUG_CO) + set(CONFIG_LIBSSCL_DEBUG_CO TRUE) +endif() + if(ENABLE_DEBUG_TRACE_CALLABLES) set(CONFIG_DEBUG_TRACE_CALLABLES TRUE) # Suppress frame-address warnings when using __builtin_return_address() diff --git a/include/config.h.in b/include/config.h.in index 0c93e99..3e3050f 100644 --- a/include/config.h.in +++ b/include/config.h.in @@ -8,4 +8,7 @@ /* Debug callable tracing configuration */ #cmakedefine CONFIG_DEBUG_TRACE_CALLABLES +/* Debug coroutine-type logging configuration */ +#cmakedefine CONFIG_LIBSSCL_DEBUG_CO + #endif /* _CONFIG_H */ diff --git a/include/spinscale/co/coConditionVariable.h b/include/spinscale/co/coConditionVariable.h new file mode 100644 index 0000000..4391201 --- /dev/null +++ b/include/spinscale/co/coConditionVariable.h @@ -0,0 +1,214 @@ +#ifndef CO_CONDITION_VARIABLE_H +#define CO_CONDITION_VARIABLE_H + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +namespace sscl::co { + +/** Coroutine-friendly handoff: wait until `signal()` before running a completion + * step. Standalone primitive (posting promises use `PostBackStatus` instead). + * + * `clear()` only clears `isSignaled`; it does not wake or drain waiters. If + * `clear()` runs while coroutines are still waiting, they stay queued until a + * later `signal()` posts them. + */ +class CoConditionVariable +{ +public: + /** Waiter queued under the CV spin lock; `signal()` drains and calls `post()`. */ + class WaitingCoroutineBase + { + public: + explicit WaitingCoroutineBase( + boost::asio::io_service &callerIoContextIn) noexcept + : callerIoContext(callerIoContextIn) + {} + + virtual ~WaitingCoroutineBase() = default; + + virtual void post() noexcept = 0; + + public: + boost::asio::io_service &callerIoContext; + }; + + template + class TypedWaitingCoroutine + : public WaitingCoroutineBase + { + public: + TypedWaitingCoroutine( + boost::asio::io_service &callerIoContextIn, + std::coroutine_handle callerSchedHandleIn) noexcept + : WaitingCoroutineBase(callerIoContextIn), + callerSchedHandle(callerSchedHandleIn) + {} + + void post() noexcept override + { + boost::asio::post(callerIoContext, callerSchedHandle); + } + + public: + std::coroutine_handle callerSchedHandle; + }; + + struct OperationInvoker + { + explicit OperationInvoker(CoConditionVariable &parentCvIn) noexcept + : parentCv(parentCvIn) + {} + + CoConditionVariable &parentCv; + }; + + struct WaitForInvoker + : public OperationInvoker + { + using OperationInvoker::OperationInvoker; + + bool await_ready() const noexcept { return false; } + + template + bool await_suspend(std::coroutine_handle cvCallerSchedHandle) noexcept + { + boost::asio::io_service &cvCallerIoContext = + sscl::ComponentThread::getSelf()->getIoService(); + + sscl::SpinLock::Guard guard(parentCv.spinLock); + if (parentCv.isSignaled) { + return false; + } + +#ifdef CONFIG_LIBSSCL_DEBUG_CO + std::cout << __func__ << ": " << std::this_thread::get_id() + << " CV not signaled: Enqueuing waiter coroutine.\n"; +#endif + parentCv.enqueueWaitingCoroutine( + cvCallerSchedHandle, cvCallerIoContext); + + return true; + } + + void await_resume() const noexcept {} + }; + + /** Manual await-style API only (lowerCamelCase); not a coroutine awaiter. + * `FinalSuspendPostingInvoker` calls `awaitSuspend` explicitly. + */ + struct DecisionEnablingDerivableWaitForInvoker + : public OperationInvoker + { + struct DecisionFactors + { + sscl::SpinLock &cvInternalSpinLock; + bool wasAlreadySignaled; + + DecisionFactors(sscl::SpinLock &cvLockIn, bool signaledIn) noexcept + : cvInternalSpinLock(cvLockIn), + wasAlreadySignaled(signaledIn) + {} + }; + + using OperationInvoker::OperationInvoker; + + void operator co_await() const = delete; + + bool awaitReady() const noexcept { return false; } + + template + DecisionFactors awaitSuspend(std::coroutine_handle cvCallerSchedHandle) noexcept + { + boost::asio::io_service &cvCallerIoContext = + sscl::ComponentThread::getSelf()->getIoService(); + + parentCv.spinLock.acquire(); + if (parentCv.isSignaled) + { +#ifdef CONFIG_LIBSSCL_DEBUG_CO + std::cout << __func__ << ": " << std::this_thread::get_id() + << " CV already signaled: returning already-signaled DecisionFactors.\n"; +#endif + return DecisionFactors(parentCv.spinLock, true); + } + +#ifdef CONFIG_LIBSSCL_DEBUG_CO + std::cout << __func__ << ": " << std::this_thread::get_id() + << " CV not signaled: returning not-signaled DecisionFactors.\n"; +#endif + parentCv.enqueueWaitingCoroutine( + cvCallerSchedHandle, cvCallerIoContext); + + return DecisionFactors(parentCv.spinLock, false); + } + + void awaitResume() const noexcept {} + }; + + CoConditionVariable() noexcept = default; + CoConditionVariable(const CoConditionVariable &) = delete; + CoConditionVariable &operator=(const CoConditionVariable &) = delete; + CoConditionVariable(CoConditionVariable &&) noexcept = delete; + CoConditionVariable &operator=(CoConditionVariable &&) noexcept = delete; + ~CoConditionVariable() noexcept = default; + + WaitForInvoker getWaitForInvoker() noexcept + { return WaitForInvoker(*this); } + + DecisionEnablingDerivableWaitForInvoker + getDecisionEnablingDerivableWaitForInvoker() noexcept + { + return DecisionEnablingDerivableWaitForInvoker(*this); + } + + void signal() noexcept + { + std::deque> drained; + + { + sscl::SpinLock::Guard guard(spinLock); + isSignaled = true; + drained.swap(waitingCoroutines); + } + + for (std::unique_ptr &waiter : drained) { + waiter->post(); + } + } + + /** Only clears the signaled flag; waiters (if any) remain in the deque. */ + void clear() noexcept + { + sscl::SpinLock::Guard guard(spinLock); + isSignaled = false; + } + + template + void enqueueWaitingCoroutine( + std::coroutine_handle handle, + boost::asio::io_service &ctx) noexcept + { + waitingCoroutines.push_back( + std::make_unique>(ctx, handle)); + } + +private: + sscl::SpinLock spinLock; + bool isSignaled = false; + std::deque> waitingCoroutines; +}; + +} // namespace sscl::co + +#endif // CO_CONDITION_VARIABLE_H diff --git a/include/spinscale/co/coQutex.h b/include/spinscale/co/coQutex.h new file mode 100644 index 0000000..351f9f8 --- /dev/null +++ b/include/spinscale/co/coQutex.h @@ -0,0 +1,198 @@ +#ifndef CO_QUTEX_H +#define CO_QUTEX_H + +#include +#include +#include +#include +#include +#include + +#ifdef CONFIG_LIBSSCL_DEBUG_CO +#include +#include +#endif + +#include +#include + +#include +#include +#include + +namespace sscl::co { + +class CoQutex +{ +public: + class ReleaseHandle; + + CoQutex() noexcept = default; + CoQutex(const CoQutex &) = delete; + CoQutex(CoQutex &&) noexcept = delete; + CoQutex &operator=(const CoQutex &) = delete; + CoQutex &operator=(CoQutex &&) noexcept = delete; + ~CoQutex() = default; + + struct AcquireInvocationAndSuspensionPolicy + { + AcquireInvocationAndSuspensionPolicy(CoQutex &_coQutex) noexcept + : coQutex(_coQutex) + {} + + ~AcquireInvocationAndSuspensionPolicy() noexcept = default; + + struct WaitingCoroutine + { + WaitingCoroutine( + std::coroutine_handle _callerSchedHandle, + boost::asio::io_service &_callerIoContext, + PromiseChainLink &_waitingPromise) noexcept + : callerSchedHandle(_callerSchedHandle), + callerIoContext(_callerIoContext), + waitingPromise(_waitingPromise) + {} + + std::coroutine_handle callerSchedHandle; + boost::asio::io_service &callerIoContext; + PromiseChainLink &waitingPromise; + }; + + bool await_ready() noexcept { return false; } + + template + bool await_suspend(std::coroutine_handle callerSchedHandle) + { + static_assert( + std::is_base_of_v, + "CoQutex acquire requires a promise type derived from PromiseChainLink"); + + acquirerChainLink = &callerSchedHandle.promise(); + + walkCallerPromiseChainFrom( + static_cast(callerSchedHandle.promise()), + [this](const PromiseChainLink &link) + { +#ifdef CONFIG_LIBSSCL_DEBUG_CO + std::cout << __func__ << ": " << std::this_thread::get_id() << " Walking caller promise chain.\n"; +#endif + if (link.holdsAcquiredLock(coQutex)) { + throw std::runtime_error("Deadlock detected: CoQutex re-acquire on caller promise chain."); + } + }); + + sscl::SpinLock::Guard guard(coQutex.spinLock); + if (!coQutex.isOwned) { + coQutex.isOwned = true; + return false; + } + coQutex.waitingCoroutines.emplace_back( + std::coroutine_handle::from_address(callerSchedHandle.address()), + sscl::ComponentThread::getSelf()->getIoService(), + *acquirerChainLink); + return true; + } + + ReleaseHandle + // [[nodiscard("store co_await result; lock is held until ReleaseHandle is released")]] + await_resume() noexcept; + + CoQutex &coQutex; + + private: + PromiseChainLink *acquirerChainLink = nullptr; + }; + + AcquireInvocationAndSuspensionPolicy + // [[nodiscard("store co_await result; lock is held until ReleaseHandle is released")]] + getAcquireInvocationAndSuspensionPolicy() noexcept + { + return AcquireInvocationAndSuspensionPolicy(*this); + } + +private: + friend class ReleaseHandle; + + void release() noexcept + { + sscl::SpinLock::Guard guard(spinLock); + + assert(isOwned); + if (waitingCoroutines.empty()) { + isOwned = false; + return; + } + + auto &frontWaitingCoroutine = waitingCoroutines.front(); + boost::asio::post( + frontWaitingCoroutine.callerIoContext, + frontWaitingCoroutine.callerSchedHandle); + waitingCoroutines.pop_front(); + } + + sscl::SpinLock spinLock; + bool isOwned = false; + std::deque waitingCoroutines; +}; + +//[[nodiscard("store co_await result; lock is held until ReleaseHandle is released")]] +class CoQutex::ReleaseHandle +{ +public: + ReleaseHandle(PromiseChainLink &promiseChainLinkIn, CoQutex &coQutexIn) noexcept + : promiseChainLink(promiseChainLinkIn), + coQutex(coQutexIn) + {} + + ReleaseHandle(const ReleaseHandle &) = delete; + ReleaseHandle &operator=(const ReleaseHandle &) = delete; + + ReleaseHandle(ReleaseHandle &&other) noexcept + : promiseChainLink(other.promiseChainLink), + coQutex(other.coQutex), + armed(other.armed) + { + other.armed = false; + } + + ReleaseHandle &operator=(ReleaseHandle &&other) noexcept = delete; + + ~ReleaseHandle() noexcept + { + if (armed) + { release(); } + } + + void release() noexcept + { + if (!armed) + { return; } + + armed = false; + promiseChainLink.removeAcquiredLock(coQutex); + coQutex.release(); + } + + void operator()() noexcept + { + release(); + } + +private: + PromiseChainLink &promiseChainLink; + CoQutex &coQutex; + bool armed = true; +}; + +inline CoQutex::ReleaseHandle +// [[nodiscard("store co_await result; lock is held until ReleaseHandle is released")]] +CoQutex::AcquireInvocationAndSuspensionPolicy::await_resume() noexcept +{ + assert(acquirerChainLink != nullptr); + acquirerChainLink->addAcquiredLock(coQutex); + return CoQutex::ReleaseHandle(*acquirerChainLink, coQutex); +} + +} // namespace sscl::co + +#endif // CO_QUTEX_H diff --git a/include/spinscale/co/group.h b/include/spinscale/co/group.h new file mode 100644 index 0000000..e0e5b64 --- /dev/null +++ b/include/spinscale/co/group.h @@ -0,0 +1,599 @@ +#ifndef GROUP_H +#define GROUP_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include + +namespace sscl::co { + +namespace detail { + +template +concept await_suspend_returns_void = requires(T &t, H h) { + { t.await_suspend(h) } -> std::same_as; +}; + +template +concept await_suspend_returns_bool = requires(T &t, H h) { + { t.await_suspend(h) } -> std::convertible_to; +}; + +template +concept await_suspend_returns_handle = requires(T &t, H h) { + { t.await_suspend(h) } -> std::convertible_to>; +}; + +template +concept await_suspend_ok = await_suspend_returns_void + || await_suspend_returns_bool + || await_suspend_returns_handle; + +template > +concept AwaiterIface = requires(T &t, H h) { + { t.await_ready() } -> std::convertible_to; + { t.await_resume() }; +} && await_suspend_ok; + +template +auto get_operator_co_await(T &t) -> decltype(operator co_await(t)); + +template +concept AwaitableIface = requires(T &t) { + { get_operator_co_await(t) }; +} && AwaiterIface()))>; + +} // namespace detail + +template +concept AwaitableIface = detail::AwaitableIface; + +template +concept AwaiterIface = detail::AwaiterIface; + +template +concept AwaitableOrAwaiterIface = AwaiterIface || AwaitableIface; + +template +requires AwaitableOrAwaiterIface +struct Group +{ + enum class AwaitingCondition { + NONE, FIRST_SETTLED, ALL_SETTLED + }; + + class SettlementDescriptor + { + public: + enum class TypeE { + /* We track EXCEPTIION_THROWN but we don't provide an + * awaitInvoker for exception events. The caller can + * wait for settlements and then scan the result set + * to manually deal with exceptions. + */ + UNSETTLED, COMPLETED, EXCEPTION_THROWN + }; + + SettlementDescriptor(Invoker &_invoker) + : invoker(std::ref(_invoker)) + {} + + void setSettlementStatus() noexcept + { + assert(type == TypeE::UNSETTLED); + + if (calleeException) { + type = TypeE::EXCEPTION_THROWN; + } else { + type = TypeE::COMPLETED; + } + } + + TypeE type = TypeE::UNSETTLED; + std::exception_ptr calleeException = nullptr; + std::exception_ptr adapterException = nullptr; + std::reference_wrapper invoker; + }; + + struct SettlementAwaitingInvoker; + struct AwaitFirstSettlementInvoker; + struct AwaitAllSettlementsInvoker; + + // getAwaitNextSettlementInvoker(); + AwaitFirstSettlementInvoker getAwaitFirstSettlementInvoker() + { return AwaitFirstSettlementInvoker(*this); } + + AwaitAllSettlementsInvoker getAwaitAllSettlementsInvoker() + { return AwaitAllSettlementsInvoker(*this); } + + bool verifyAllInvokersSettled() const + { + for (auto &desc : s.rsrc.settlements) { + if (desc.type == SettlementDescriptor::TypeE::UNSETTLED) { + return false; + } + } + + return true; + } + + bool firstInvokerSettled() const + { return s.rsrc.firstSettledInvokerIdx >= 0; } + + bool allInvokersSettled() const + { + const std::size_t nInvokersAdded = s.rsrc.settlements.size(); + assert(s.rsrc.nInvokersSettled <= nInvokersAdded); + return s.rsrc.nInvokersSettled == nInvokersAdded; + } + + /** Caller must hold s.lock. */ + void throwIfNoMemberInvokersForCoAwaitUnderLock() const + { + if (s.rsrc.settlements.empty()) { + throw std::runtime_error( + "co_await: Group has no member invokers; call add() before awaiting"); + } + } + + struct SettlementAwaitingInvoker + { + explicit SettlementAwaitingInvoker(Group &_group) + : parentGroup(_group) + {} + + bool await_ready() const { return false; } + + /** EXPLANATION: + * This exists for if we ever need to re-make the adapter coro + * throw exceptions. But we decided to make it noexcept in order + * to avoid this complication. + */ + void checkForAndReThrowAdapterExceptions() const + { + std::ostringstream ostream; + bool doThrow = false; + + for (auto &item : parentGroup.s.rsrc.settlements) + { + if (!item.adapterException) { + continue; + } + + doThrow = true; + ostream << "Exc thrown in Group Adapter: "; + try { + std::rethrow_exception(item.adapterException); + } catch (const std::exception &e) { + ostream << e.what(); + } catch (...) { + ostream << ""; + } + ostream << "\n"; + } + + if (doThrow) { + throw std::runtime_error(ostream.str()); + } + } + + Group &parentGroup; + }; + + /** EXPLANATION: + * AwaitingCondition and the group-awaiter coroutine_handle are set only + * in await_suspend when this co_await actually suspends. Constructing + * several AwaitFirstSettlementInvoker / AwaitAllSettlementsInvoker + * objects without co_awaiting them is harmless. + * + * You may co_await await-all and later co_await await-first (in either + * construction order). After a suspending wait completes, the adapter + * clears handle state in updateSettlementsStateAndAwakenCallerIfConditionMet, + * so a later co_await on another handle (or a second co_await on the same + * handle, after the first finished) is legal. + * + * Only one group co_await may be suspended with a registered handle at a + * time; a second concurrent co_await trips assert(!callerHasSetSchedHandle) + * in debug builds. + * + * firstSettledInvokerIdx and calleeWasReadyToNotifyOfFirstSettlement are + * sticky for the Group lifetime (first member ever to settle), not per wave. + */ + struct AwaitFirstSettlementInvoker + : public SettlementAwaitingInvoker + { + using SettlementAwaitingInvoker::SettlementAwaitingInvoker; + + bool await_suspend(std::coroutine_handle<> groupAwaiterSchedHandle) + { + /* No other group co_await may be suspended with a registered handle. + * Sequential co_await on the same object is allowed after the prior + * wait finished and clearCallerSchedHandleState() ran on wake. + */ + assert(!this->parentGroup.s.rsrc.callerHasSetSchedHandle); + + sscl::SpinLock::Guard guard(this->parentGroup.s.lock); + + this->parentGroup.throwIfNoMemberInvokersForCoAwaitUnderLock(); + + if (this->parentGroup.s.rsrc.calleeWasReadyToNotifyOfFirstSettlement) { + return false; + } + + /* We store away the coro_handle of the + * group awaiter, and suspend that group awaiter. + */ + this->parentGroup.s.rsrc.setCallerSchedHandleAndCondition( + groupAwaiterSchedHandle, AwaitingCondition::FIRST_SETTLED); + + return true; + } + + std::pair &> + await_resume() + { + assert(this->parentGroup.firstInvokerSettled()); + return { + this->parentGroup.s.rsrc.settlements[ + this->parentGroup.s.rsrc.firstSettledInvokerIdx], + this->parentGroup.s.rsrc.settlements + }; + } + }; + + /** EXPLANATION: + * Same awaiting rules as AwaitFirstSettlementInvoker (see above). + * + * It is illegal to add() new members while a group co_await is suspended + * (groupAwaiterSchedHandle is registered). You may add() after co_await + * returns, including starting a new settlement wave before the next + * co_await. + */ + struct AwaitAllSettlementsInvoker + : public SettlementAwaitingInvoker + { + using SettlementAwaitingInvoker::SettlementAwaitingInvoker; + + bool await_suspend(std::coroutine_handle<> groupAwaiterSchedHandle) + { + /* See AwaitFirstSettlementInvoker::await_suspend. Handle state is + * cleared when the adapter wakes a suspended group co_awaiter, not + * in await_resume. + */ + assert(!this->parentGroup.s.rsrc.callerHasSetSchedHandle); + + sscl::SpinLock::Guard guard(this->parentGroup.s.lock); + + this->parentGroup.throwIfNoMemberInvokersForCoAwaitUnderLock(); + + if (this->parentGroup.allInvokersSettled()) { + return false; + } + + this->parentGroup.s.rsrc.setCallerSchedHandleAndCondition( + groupAwaiterSchedHandle, AwaitingCondition::ALL_SETTLED); + + return true; + } + + std::vector &await_resume() + { + assert(this->parentGroup.allInvokersSettled()); + return this->parentGroup.s.rsrc.settlements; + } + }; + + struct NonAwaitableNonPostingAdapterCoro + { + struct promise_type + : public PromiseChainLink + { + NonAwaitableNonPostingAdapterCoro get_return_object() noexcept + { return {}; } + + void removeAcquiredLock(CoQutex &) noexcept override + {} + + std::suspend_never initial_suspend() noexcept { return {}; } + /** EXPLANATION: + * final_suspend must return suspend_never here so that + * this fire-and-forget adapter coro will be self-destroying. + */ + std::suspend_never final_suspend() noexcept { return {}; } + void return_void() noexcept { return; } + void unhandled_exception() noexcept + { + try { + auto eptr = std::current_exception(); + if (eptr) { + std::rethrow_exception(eptr); + } + } catch (const std::exception &e) { + std::cerr << "Unhandled exception in Group adapter coroutine:\n" + << e.what() << "\n"; + } catch (...) { + std::cerr << "Unhandled non-std exception in Group adapter coroutine\n"; + } + + std::terminate(); + } + }; + + NonAwaitableNonPostingAdapterCoro() noexcept = default; + NonAwaitableNonPostingAdapterCoro operator co_await() const = delete; + bool await_ready() const { std::terminate(); return false; } + void await_suspend() const { std::terminate(); } + void await_resume() const { std::terminate(); } + }; + + std::pair + updateSettlementsStateAndAwakenCallerIfConditionMet( + std::size_t settlementIndex) noexcept + { + bool isFirstSettlement = false; + bool isLastSettlement = false; + std::coroutine_handle<> groupAwaiterSchedHandleToWake = nullptr; + + { + sscl::SpinLock::Guard guard(s.lock); + + /* If we can be certain that the AllSettled condition won't + * be triggered repeatedly, then we can get rid of + * calleeWasReadyToNotifyOfLastSettlementForCurrentSet. + */ + assert(s.rsrc.nInvokersSettled < s.rsrc.settlements.size()); + assert(settlementIndex < s.rsrc.settlements.size()); + s.rsrc.nInvokersSettled++; + + if (!firstInvokerSettled()) + { + isFirstSettlement = true; + s.rsrc.firstSettledInvokerIdx = static_cast(settlementIndex); + + /* This should be set-once & sticky throughout the lifetime + * of the Group object. The first invoker only gets + * settled once, irrespective of how many + * AwaitFirstSettlementInvoker instances we create. + */ + s.rsrc.calleeWasReadyToNotifyOfFirstSettlement = true; + } + + if (allInvokersSettled()) + { + assert(s.rsrc.nInvokersSettled == s.rsrc.settlements.size()); + assert(verifyAllInvokersSettled()); + isLastSettlement = true; + } + + /* If no group co_awaiter registered a handle (did not suspend, or + * already woke and clearCallerSchedHandleState ran), there is + * nothing to post back to. + */ + if (!s.rsrc.callerHasSetSchedHandle) { + return {isFirstSettlement, isLastSettlement}; + } + + /* If we're here, then callerHasSetSchedHandle must be true. + * I.e: an invoker has been created and co_awaited for one of the + * conditions. + * Therefore currentAwaitingCondition must also have been set, + * since currentAwaitingCondition is set in the invokers' ctors. + */ + assert(s.rsrc.currentAwaitingCondition != AwaitingCondition::NONE); + + if ((isFirstSettlement + && s.rsrc.currentAwaitingCondition == AwaitingCondition::FIRST_SETTLED) + || (isLastSettlement + && s.rsrc.currentAwaitingCondition == AwaitingCondition::ALL_SETTLED)) + { + groupAwaiterSchedHandleToWake = s.rsrc.groupAwaiterSchedHandle; + + /** We only clear here and not in await_resume, because if + * the caller hasn't already set it schedHandle by the time we're + * called, then when it eventually does call await_suspend, it + * won't set it then either. + * + * I.e: callerSchedHandle only needs to be cleared it if gets set + * in the first place; + * And it only gets set if we need to invoke the schedHandle from + * here. + * If the group co_awaiter is able to call await_resume, then it + * simply doesn't set its schedHandle at all. + */ + s.rsrc.clearCallerSchedHandleState(); + } + } + + if (groupAwaiterSchedHandleToWake) + { + /* We should be able to just directly resume() the group awaiter's handle + * here because that would invoke await_resume, which may destroy the + * callee's promise. + * And who is the callee? Is it not this coro here? And this coro + * hasn't been suspended. So we'd be destroying ourself while we're + * not suspended. + * + * But all of that only applies __IFF__ we actually do try to destroy + * the callee within the caller's Invoker. If we don't, then the callee + * should persist just fine. There's no implicit mechanism that + * will always destroy the callee coro state before the invoker + * is destroyed. + * If that was in fact the way it worked, then fire-and-forget coros + * would be impossible. + * + * So we should be able to call resume() directly here without + * post()ing to current_io_context(). + * + * EXPLANATION: + * However, in order to ensure that we keep this adapter coro + * method exception-free, we are forced to post() rather than + * directly calling the handle. + */ + boost::asio::post( + sscl::ComponentThread::getSelf()->getIoService(), + groupAwaiterSchedHandleToWake); + } + + return {isFirstSettlement, isLastSettlement}; + } + + /** EXPLANATION: + * This coro is a coro which has a promise, and does __not__ expose an awaitable + * iface and in fact should not be capable of being awaited, ultimately. + * + * Its purpose is to be an adapter that enables the Group class to invoke the + * invokers that are added to it, without having to co_await those invokers. + * Rather, the Group class simply invokes this function on them, and then this + * function both co_awaits the invoker on behalf of the Group class, and also + * performs the normal function of an invoker, which is both to invoke the + * target async fn, and also to convey its results back to the Group class. + * It's effectively a go-between coro that provides the outcomes that Invokers + * normally provide, without needing, itself, to be co_awaited. + */ + NonAwaitableNonPostingAdapterCoro nonAwaitableAdapterCoro( + std::size_t settlementIndex) noexcept + { + /** EXPLANATION: + * It's very convenient that our design for the NonViralNonSuspendingInvoker + * coincidentally allows us to supply a lambda that can be used to test + * for the settlement conditions that are being waited on by the Group's + * co_awaiter. + * + * settlementIndex is captured by value (not a vector iterator) so adapter + * coros remain valid if settlements reallocate during concurrent add(). + */ + try { + /* Return values remain in the callee promise until the caller-owned + * invoker is destroyed (~PostingInvoker). The group co_awaiter reads + * results via settlements[settlementIndex].invoker after awaiting. + * + * Index settlements[] each time; do not cache a reference across + * co_await because concurrent add() may reallocate the vector. + */ + co_await s.rsrc.settlements[settlementIndex].invoker.get(); + } + catch (...) + { + s.rsrc.settlements[settlementIndex].calleeException = + std::current_exception(); + } + + /* From here onwards, we mustn't throw(). Unhandled exceptions + * generated by the adapter coro itself will result in + * std::terminate(). + */ + s.rsrc.settlements[settlementIndex].setSettlementStatus(); + updateSettlementsStateAndAwakenCallerIfConditionMet(settlementIndex); + + co_return; + } + + /** EXPLANATION: + * Each invoker passed to add() must outlive this Group and the callee frame + * (see ~PostingInvoker). The group co_awaiter reads return values from those + * invokers after awaiting; do not destroy an invoker until reads are done. + */ + void add(Invoker &invoker) + { + std::size_t settlementIndex = 0; + + { + sscl::SpinLock::Guard guard(s.lock); + + if (s.rsrc.groupAwaiterSchedHandle) + { + throw std::runtime_error( + "add: New member invokers mustn't be added " + "while co_awaiting a given set"); + } + + settlementIndex = s.rsrc.settlements.size(); + s.rsrc.settlements.emplace_back(invoker); + } + + nonAwaitableAdapterCoro(settlementIndex); + } + + void checkForAndReThrowGroupExceptions() const + { + std::ostringstream ostream; + bool doThrow = false; + + for (auto &item : s.rsrc.settlements) + { + if (item.type != SettlementDescriptor::TypeE::EXCEPTION_THROWN) { + continue; + } + + assert(item.calleeException); + + doThrow = true; + ostream << "Exc thrown in Group Adapter: "; + try { + std::rethrow_exception(item.calleeException); + } catch (const std::exception &e) { + ostream << e.what(); + } catch (...) { + ostream << ""; + } + ostream << "\n"; + } + + if (doThrow) { + throw std::runtime_error(ostream.str()); + } + } + + struct State + { + void clearCallerSchedHandleState() noexcept + { + groupAwaiterSchedHandle = nullptr; + callerHasSetSchedHandle = false; + currentAwaitingCondition = AwaitingCondition::NONE; + } + + void setCallerSchedHandleAndCondition( + std::coroutine_handle<> groupAwaiterSchedHandleIn, + AwaitingCondition awaitingCondition) noexcept + { + groupAwaiterSchedHandle = groupAwaiterSchedHandleIn; + callerHasSetSchedHandle = true; + currentAwaitingCondition = awaitingCondition; + } + + int firstSettledInvokerIdx = -1; + std::size_t nInvokersSettled = 0; + std::coroutine_handle<> groupAwaiterSchedHandle = nullptr; + bool callerHasSetSchedHandle = false; + /* calleWasReady*First* is an indelible record of what + * occured during the first settlement's adapter's update. + */ + bool calleeWasReadyToNotifyOfFirstSettlement = false; + std::vector settlements; + AwaitingCondition currentAwaitingCondition = AwaitingCondition::NONE; + }; + + sscl::SharedResourceGroup s; +}; + +} // namespace sscl::co + +#endif // GROUP_H diff --git a/include/spinscale/co/invokers.h b/include/spinscale/co/invokers.h new file mode 100644 index 0000000..55bc00a --- /dev/null +++ b/include/spinscale/co/invokers.h @@ -0,0 +1,161 @@ +#ifndef INVOKERS_H +#define INVOKERS_H + +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace sscl::co { + +/** Non-viral coroutine entry that must not be co_awaited: promise is always + * PostingPromiseTemplate (no return-value path to a caller). + * + * The invoker must outlive the callee frame: do not discard the return object + * from get_return_object(). ~PostingInvoker destroys the callee frame. + */ +template