From b04b0db1557832b5b952bd2902f2bf3169142be6 Mon Sep 17 00:00:00 2001 From: Hayodea Hekol Date: Tue, 9 Jun 2026 05:46:51 -0400 Subject: [PATCH] Add a Nursery class for dynamically managing nonviral coros --- README.md | 44 ++ include/spinscale/co/nonViralTaskNursery.h | 509 +++++++++++++++++++++ 2 files changed, 553 insertions(+) create mode 100644 include/spinscale/co/nonViralTaskNursery.h diff --git a/README.md b/README.md index d47b6c9..1eed605 100644 --- a/README.md +++ b/README.md @@ -177,6 +177,49 @@ group.checkForAndReThrowGroupExceptions(); Settlements record whether a member completed or threw. The original invoker can be recovered from a descriptor when a caller needs typed return values. +### Non-Viral Task Nursery + +`co::NonViralTaskNursery` is the structured-concurrency owner for non-viral +invokers at non-coroutine boundaries. Unlike `co::Group`, it is for callback-style +entry from ordinary code (HTTP handlers, timers, shutdown sequences), not for +`co_await` orchestration inside coroutines. + +```cpp +sscl::co::NonViralTaskNursery nursery; +nursery.openAdmission(); + +auto lease = nursery.getNewSlotLease(); +lease.getSyncCanceler().startAcceptingWork(); +lease.fillSlot( + [&lease]() + { + return component.someNonViralCReq( + lease.getExceptionStorage(), + lease.getCallerLambda(), + lease.getSyncCanceler()); + }); +lease.commit(); + +nursery.requestCancelOnAll(); +nursery.closeAdmission(); +nursery.syncAwaitAllSettlements(ioContext); +``` + +Each slot owns a `SyncCancelerForAsyncWork`. `requestCancelOnAll()` only signals +cooperative stop; it does not destroy invokers. Invokers are retired when their +completion callbacks run. Call `closeAdmission()` explicitly before +`asyncAwaitAllSettlements()` or `syncAwaitAllSettlements()`; those APIs wait until +all slots have retired naturally and throw if admission is still open. + +`Slot::Lease` is commit-required: an uncommitted lease removes its reservation on +destruction. `fillSlot()` takes an invoker factory (deferred construction) because +non-viral coroutines may complete synchronously during invoker construction. +`Slot::Handle` is an opaque slot pointer valid only while the slot remains in the +nursery. + +Slot metadata (`exceptionPtr`, lease/settlement status, canceler) lives on `Slot`. +`MemberInvokerBase` is invoker type-erasure only. + ### Coroutine-Aware Locking `co::CoQutex` is a coroutine-aware mutual exclusion primitive. It tracks @@ -228,6 +271,7 @@ component orchestration: - Runtime-selected posting uses `DynamicViralPostingInvoker`. - Component lifecycle batches are viral non-posting coroutines. - `co::Group` is the primary structured fan-out/fan-in primitive. +- `co::NonViralTaskNursery` owns non-viral invoker lifetimes at outer boundaries. Expect breaking changes when they simplify the ownership, lifecycle, or post-to/post-back model. diff --git a/include/spinscale/co/nonViralTaskNursery.h b/include/spinscale/co/nonViralTaskNursery.h new file mode 100644 index 0000000..4c9de96 --- /dev/null +++ b/include/spinscale/co/nonViralTaskNursery.h @@ -0,0 +1,509 @@ +#ifndef NON_VIRAL_TASK_NURSERY_H +#define NON_VIRAL_TASK_NURSERY_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include + +namespace sscl::co { + +namespace detail { + +struct MemberInvokerBase +{ + virtual ~MemberInvokerBase() = default; +}; + +template +struct MemberInvoker : MemberInvokerBase +{ + explicit MemberInvoker(Invoker &&invokerIn) + : invoker(std::move(invokerIn)) + {} + + Invoker invoker; +}; + +} // namespace detail + +/** Structured-concurrency owner for non-viral invokers at non-coroutine boundaries. + * + * The nursery owns invoker lifetimes until natural completion, wraps completion + * callbacks, tracks unsettled slots, fans out cooperative cancel via per-slot + * SyncCancelerForAsyncWork, and provides drain APIs. + * + * Call closeAdmission() explicitly before asyncAwaitAllSettlements() or + * syncAwaitAllSettlements(). syncAwaitAllSettlements() caller must pass the + * io_context where non-viral posting completions will land, and must ensure + * that io_context is prepared to run (e.g. not left stopped without restart). + */ +class NonViralTaskNursery +{ +public: + enum class SlotLeaseStatus { + RESERVED, ACTIVE_UNSETTLED, RETIRED + }; + + enum class SlotSettlementStatus { + UNSETTLED, COMPLETED, EXCEPTION_THROWN + }; + + class Slot + { + public: + /** Opaque handle to a nursery slot. Valid while the slot remains in storage. */ + class Handle + { + public: + bool operator==(const Handle &_other) const noexcept + { return &slot == &_other.slot; } + + bool operator!=(const Handle &_other) const noexcept + { return &slot != &_other.slot; } + + private: + friend class NonViralTaskNursery; + friend class Lease; + + explicit Handle(Slot &_slot) noexcept + : slot(_slot) + {} + + Slot &slot; + }; + + class Lease + { + public: + Lease(Lease &&_other) noexcept + : nursery(_other.nursery), slot(_other.slot), + slotCommittedSoLeaseShouldntDestroy( + std::exchange( + _other.slotCommittedSoLeaseShouldntDestroy, + true)) + {} + + Lease(const Lease &) = delete; + Lease &operator=(const Lease &) = delete; + Lease &operator=(Lease &&) = delete; + + ~Lease() + { + if (!slotCommittedSoLeaseShouldntDestroy) { + nursery.releaseUncommittedSlot(slot); + } + } + + std::exception_ptr &getExceptionStorage() + { return slot.exceptionPtr; } + + std::function getCallerLambda() + { return nursery.buildCallerLambdaForSlot(slot); } + + sscl::SyncCancelerForAsyncWork &getSyncCanceler() + { return slot.syncCanceler; } + + void setOnSettledHook(std::function hook) + { + if (slot.leaseStatus != SlotLeaseStatus::RESERVED) + { + throw std::runtime_error( + std::string(__func__) + + ": must be called before fillSlot()"); + } + + slot.onSettledHook = std::move(hook); + } + + /** Factory must create the invoker. Deferred construction is + * required because non-viral coroutines may complete synchronously + * during invoker construction, before fillSlot() can store the + * record. + */ + template + void fillSlot(InvokerFactory &&invokerFactory) + { + Slot &reservedSlot = slot; + + if (reservedSlot.memberInvoker) + { + throw std::runtime_error( + std::string(__func__) + ": slot already filled"); + } + + if (reservedSlot.leaseStatus != SlotLeaseStatus::RESERVED) + { + throw std::runtime_error( + std::string(__func__) + ": slot is not reserved"); + } + + reservedSlot.leaseStatus = SlotLeaseStatus::ACTIVE_UNSETTLED; + auto invoker = invokerFactory(); + + if (reservedSlot.leaseStatus == SlotLeaseStatus::RETIRED) + { + /** EXPLANATION: + * Non-viral coroutines may complete synchronously inside + * the factory. Retirement already ran; the local invoker + * must be allowed to destroy the callee frame on return. + */ + return; + } + + reservedSlot.memberInvoker = std::make_unique< + detail::MemberInvoker>>( + std::move(invoker)); + } + + void commit() + { + if (slotCommittedSoLeaseShouldntDestroy) + { + throw std::runtime_error( + std::string(__func__) + ": lease already committed"); + } + + Slot &reservedSlot = slot; + + if (reservedSlot.leaseStatus == SlotLeaseStatus::RESERVED) + { + throw std::runtime_error( + std::string(__func__) + + ": fillSlot() required before commit()"); + } + + slotCommittedSoLeaseShouldntDestroy = true; + } + + Handle handle() const + { + return Handle(slot); + } + + private: + friend class NonViralTaskNursery; + + Lease(NonViralTaskNursery &_nursery, Slot &_slot) noexcept + : nursery(_nursery), slot(_slot) + {} + + NonViralTaskNursery &nursery; + Slot &slot; + bool slotCommittedSoLeaseShouldntDestroy = false; + }; + + private: + friend class NonViralTaskNursery; + friend class Lease; + + SlotLeaseStatus leaseStatus = SlotLeaseStatus::RESERVED; + SlotSettlementStatus settlementStatus = SlotSettlementStatus::UNSETTLED; + std::exception_ptr exceptionPtr = nullptr; + sscl::SyncCancelerForAsyncWork syncCanceler; + std::unique_ptr memberInvoker; + std::function onSettledHook; + }; + + void openAdmission() + { + sscl::SpinLock::Guard guard(s.lock); + s.rsrc.admissionOpen = true; + } + + void closeAdmission() + { + sscl::SpinLock::Guard guard(s.lock); + s.rsrc.admissionOpen = false; + } + + bool admissionIsOpen() const + { + sscl::SpinLock::Guard guard(s.lock); + return s.rsrc.admissionOpen; + } + + bool allSettled() const + { return unsettledCount() == 0; } + + std::size_t unsettledCount() const + { + sscl::SpinLock::Guard guard(s.lock); + return countUnsettledSlotsUnlocked(); + } + + Slot::Lease getNewSlotLease() + { + sscl::SpinLock::Guard guard(s.lock); + + if (!s.rsrc.admissionOpen) + { + throw std::runtime_error( + std::string(__func__) + ": admission closed"); + } + + pruneRetiredSlotsUnlocked(); + s.rsrc.slots.emplace_back(); + Slot &slot = s.rsrc.slots.back(); + return Slot::Lease(*this, slot); + } + + void requestCancelOnAll() + { + sscl::SpinLock::Guard guard(s.lock); + + for (auto &slot : s.rsrc.slots) + { + if (slot.leaseStatus != SlotLeaseStatus::ACTIVE_UNSETTLED) { + continue; + } + + slot.syncCanceler.requestStop(); + } + } + + void asyncAwaitAllSettlements(std::function callback) + { + std::function waiterToInvoke; + + { + sscl::SpinLock::Guard guard(s.lock); + + if (s.rsrc.admissionOpen) + { + throw std::runtime_error( + std::string(__func__) + + ": admission must be closed before awaiting drain"); + } + + if (countUnsettledSlotsUnlocked() == 0) { + waiterToInvoke = std::move(callback); + } + else if (s.rsrc.drainWaiter) + { + throw std::runtime_error( + std::string(__func__) + + ": drain waiter already registered"); + } + else { + s.rsrc.drainWaiter = std::move(callback); + } + } + + if (waiterToInvoke) { + waiterToInvoke(); + } + } + + void syncAwaitAllSettlements(boost::asio::io_context &ioContext) + { + if (admissionIsOpen()) + { + throw std::runtime_error( + std::string(__func__) + + ": admission must be closed before awaiting drain"); + } + + if (allSettled()) { + return; + } + + if (ioContext.stopped()) + { + throw std::runtime_error( + std::string(__func__) + ": provided io_context is stopped"); + } + + /** EXPLANATION: + * Drain may run on the thread that processes a completion callback, + * not necessarily the thread blocked in waitForAsyncOperationComplete. + * Keep the bridge off the waiter thread's stack. + */ + auto bridge = std::make_shared( + ioContext); + asyncAwaitAllSettlements( + [bridge]() + { + bridge->setAsyncOperationComplete(); + }); + + bridge->waitForAsyncOperationCompleteOrIoContextStopped(); + } + + template + Slot::Handle launch(InvokerFactory &&factory) + { + auto lease = getNewSlotLease(); + lease.getSyncCanceler().startAcceptingWork(); + lease.fillSlot( + [&factory, &lease]() + { + return std::forward(factory)(lease); + }); + lease.commit(); + return lease.handle(); + } + +private: + friend class Slot::Lease; + + struct State + { + bool admissionOpen = false; + std::list slots; + std::function drainWaiter; + }; + + std::size_t countUnsettledSlotsUnlocked() const + { + std::size_t count = 0; + + for (const auto &slot : s.rsrc.slots) + { + if (slot.leaseStatus == SlotLeaseStatus::ACTIVE_UNSETTLED) { + ++count; + } + } + + return count; + } + + void releaseUncommittedSlot(Slot &slot) + { + std::function waiterToInvoke; + + { + sscl::SpinLock::Guard guard(s.lock); + + if (slot.leaseStatus != SlotLeaseStatus::RESERVED) { + return; + } + + slot.leaseStatus = SlotLeaseStatus::RETIRED; + slot.memberInvoker.reset(); + waiterToInvoke = takeDrainWaiterIfDrainedUnlocked(); + } + + if (waiterToInvoke) { waiterToInvoke(); } + } + + std::function buildCallerLambdaForSlot(Slot &slot) + { + return + [this, slot = std::ref(slot)]() + { + retireSlot(slot.get()); + }; + } + + void retireSlot(Slot &slot) + { + std::function waiterToInvoke; + + { + sscl::SpinLock::Guard guard(s.lock); + + if (slot.leaseStatus != SlotLeaseStatus::ACTIVE_UNSETTLED) + { + throw std::runtime_error( + std::string(__func__) + ": slot is not active and " + "unsettled"); + } + if (slot.settlementStatus != SlotSettlementStatus::UNSETTLED) { + throw std::runtime_error( + std::string(__func__) + ": slot is not unsettled"); + } + + if (!verifySlotIsManagedUnlocked(slot)) { + return; + } + + try { + if (slot.onSettledHook) { slot.onSettledHook(); } + } + catch (const std::exception &e) + { + std::cerr + << "NonViralTaskNursery: onSettled hook exception: " + << e.what() << std::endl; + } + catch (...) + { + std::cerr + << "NonViralTaskNursery: onSettled hook unknown exception" + << std::endl; + } + + if (slot.exceptionPtr) { + slot.settlementStatus = SlotSettlementStatus::EXCEPTION_THROWN; + } else { + slot.settlementStatus = SlotSettlementStatus::COMPLETED; + } + + slot.leaseStatus = SlotLeaseStatus::RETIRED; + slot.memberInvoker.reset(); + waiterToInvoke = takeDrainWaiterIfDrainedUnlocked(); + } + + if (waiterToInvoke) { waiterToInvoke(); } + } + + /** Caller must hold s.lock. */ + bool verifySlotIsManagedUnlocked(const Slot &slot) const + { + for (const auto &trackedSlot : s.rsrc.slots) + { + if (&trackedSlot == &slot) { + return true; + } + } + + return false; + } + + /** Caller must hold s.lock. */ + void pruneRetiredSlotsUnlocked() + { + for (auto it = s.rsrc.slots.begin(); it != s.rsrc.slots.end();) + { + if (it->leaseStatus == SlotLeaseStatus::RETIRED) { + it = s.rsrc.slots.erase(it); + } else { + ++it; + } + } + } + + /** Caller must hold s.lock. */ + std::function takeDrainWaiterIfDrainedUnlocked() + { + if (s.rsrc.admissionOpen) { + return {}; + } + + if (countUnsettledSlotsUnlocked() != 0) { + return {}; + } + + return std::exchange(s.rsrc.drainWaiter, {}); + } + +public: + mutable sscl::SharedResourceGroup s; +}; + +} // namespace sscl::co + +#endif // NON_VIRAL_TASK_NURSERY_H