mirror of
https://github.com/latentPrion/libspinscale.git
synced 2026-06-23 19:48:32 +00:00
Add a Nursery class for dynamically managing nonviral coros
This commit is contained in:
@@ -177,6 +177,49 @@ group.checkForAndReThrowGroupExceptions();
|
||||
Settlements record whether a member completed or threw. The original invoker can
|
||||
be recovered from a descriptor when a caller needs typed return values.
|
||||
|
||||
### Non-Viral Task Nursery
|
||||
|
||||
`co::NonViralTaskNursery` is the structured-concurrency owner for non-viral
|
||||
invokers at non-coroutine boundaries. Unlike `co::Group`, it is for callback-style
|
||||
entry from ordinary code (HTTP handlers, timers, shutdown sequences), not for
|
||||
`co_await` orchestration inside coroutines.
|
||||
|
||||
```cpp
|
||||
sscl::co::NonViralTaskNursery nursery;
|
||||
nursery.openAdmission();
|
||||
|
||||
auto lease = nursery.getNewSlotLease();
|
||||
lease.getSyncCanceler().startAcceptingWork();
|
||||
lease.fillSlot(
|
||||
[&lease]()
|
||||
{
|
||||
return component.someNonViralCReq(
|
||||
lease.getExceptionStorage(),
|
||||
lease.getCallerLambda(),
|
||||
lease.getSyncCanceler());
|
||||
});
|
||||
lease.commit();
|
||||
|
||||
nursery.requestCancelOnAll();
|
||||
nursery.closeAdmission();
|
||||
nursery.syncAwaitAllSettlements(ioContext);
|
||||
```
|
||||
|
||||
Each slot owns a `SyncCancelerForAsyncWork`. `requestCancelOnAll()` only signals
|
||||
cooperative stop; it does not destroy invokers. Invokers are retired when their
|
||||
completion callbacks run. Call `closeAdmission()` explicitly before
|
||||
`asyncAwaitAllSettlements()` or `syncAwaitAllSettlements()`; those APIs wait until
|
||||
all slots have retired naturally and throw if admission is still open.
|
||||
|
||||
`Slot::Lease` is commit-required: an uncommitted lease removes its reservation on
|
||||
destruction. `fillSlot()` takes an invoker factory (deferred construction) because
|
||||
non-viral coroutines may complete synchronously during invoker construction.
|
||||
`Slot::Handle` is an opaque slot pointer valid only while the slot remains in the
|
||||
nursery.
|
||||
|
||||
Slot metadata (`exceptionPtr`, lease/settlement status, canceler) lives on `Slot`.
|
||||
`MemberInvokerBase` is invoker type-erasure only.
|
||||
|
||||
### Coroutine-Aware Locking
|
||||
|
||||
`co::CoQutex` is a coroutine-aware mutual exclusion primitive. It tracks
|
||||
@@ -228,6 +271,7 @@ component orchestration:
|
||||
- Runtime-selected posting uses `DynamicViralPostingInvoker<T>`.
|
||||
- Component lifecycle batches are viral non-posting coroutines.
|
||||
- `co::Group` is the primary structured fan-out/fan-in primitive.
|
||||
- `co::NonViralTaskNursery` owns non-viral invoker lifetimes at outer boundaries.
|
||||
|
||||
Expect breaking changes when they simplify the ownership, lifecycle, or
|
||||
post-to/post-back model.
|
||||
|
||||
@@ -0,0 +1,509 @@
|
||||
#ifndef NON_VIRAL_TASK_NURSERY_H
|
||||
#define NON_VIRAL_TASK_NURSERY_H
|
||||
|
||||
#include <cstddef>
|
||||
#include <exception>
|
||||
#include <functional>
|
||||
#include <iostream>
|
||||
#include <list>
|
||||
#include <memory>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
#include <boost/asio/io_context.hpp>
|
||||
|
||||
#include <spinscale/cps/asynchronousBridge.h>
|
||||
#include <spinscale/sharedResourceGroup.h>
|
||||
#include <spinscale/spinLock.h>
|
||||
#include <spinscale/syncCancelerForAsyncWork.h>
|
||||
|
||||
namespace sscl::co {
|
||||
|
||||
namespace detail {
|
||||
|
||||
struct MemberInvokerBase
|
||||
{
|
||||
virtual ~MemberInvokerBase() = default;
|
||||
};
|
||||
|
||||
template <class Invoker>
|
||||
struct MemberInvoker : MemberInvokerBase
|
||||
{
|
||||
explicit MemberInvoker(Invoker &&invokerIn)
|
||||
: invoker(std::move(invokerIn))
|
||||
{}
|
||||
|
||||
Invoker invoker;
|
||||
};
|
||||
|
||||
} // namespace detail
|
||||
|
||||
/** Structured-concurrency owner for non-viral invokers at non-coroutine boundaries.
|
||||
*
|
||||
* The nursery owns invoker lifetimes until natural completion, wraps completion
|
||||
* callbacks, tracks unsettled slots, fans out cooperative cancel via per-slot
|
||||
* SyncCancelerForAsyncWork, and provides drain APIs.
|
||||
*
|
||||
* Call closeAdmission() explicitly before asyncAwaitAllSettlements() or
|
||||
* syncAwaitAllSettlements(). syncAwaitAllSettlements() caller must pass the
|
||||
* io_context where non-viral posting completions will land, and must ensure
|
||||
* that io_context is prepared to run (e.g. not left stopped without restart).
|
||||
*/
|
||||
class NonViralTaskNursery
|
||||
{
|
||||
public:
|
||||
enum class SlotLeaseStatus {
|
||||
RESERVED, ACTIVE_UNSETTLED, RETIRED
|
||||
};
|
||||
|
||||
enum class SlotSettlementStatus {
|
||||
UNSETTLED, COMPLETED, EXCEPTION_THROWN
|
||||
};
|
||||
|
||||
class Slot
|
||||
{
|
||||
public:
|
||||
/** Opaque handle to a nursery slot. Valid while the slot remains in storage. */
|
||||
class Handle
|
||||
{
|
||||
public:
|
||||
bool operator==(const Handle &_other) const noexcept
|
||||
{ return &slot == &_other.slot; }
|
||||
|
||||
bool operator!=(const Handle &_other) const noexcept
|
||||
{ return &slot != &_other.slot; }
|
||||
|
||||
private:
|
||||
friend class NonViralTaskNursery;
|
||||
friend class Lease;
|
||||
|
||||
explicit Handle(Slot &_slot) noexcept
|
||||
: slot(_slot)
|
||||
{}
|
||||
|
||||
Slot &slot;
|
||||
};
|
||||
|
||||
class Lease
|
||||
{
|
||||
public:
|
||||
Lease(Lease &&_other) noexcept
|
||||
: nursery(_other.nursery), slot(_other.slot),
|
||||
slotCommittedSoLeaseShouldntDestroy(
|
||||
std::exchange(
|
||||
_other.slotCommittedSoLeaseShouldntDestroy,
|
||||
true))
|
||||
{}
|
||||
|
||||
Lease(const Lease &) = delete;
|
||||
Lease &operator=(const Lease &) = delete;
|
||||
Lease &operator=(Lease &&) = delete;
|
||||
|
||||
~Lease()
|
||||
{
|
||||
if (!slotCommittedSoLeaseShouldntDestroy) {
|
||||
nursery.releaseUncommittedSlot(slot);
|
||||
}
|
||||
}
|
||||
|
||||
std::exception_ptr &getExceptionStorage()
|
||||
{ return slot.exceptionPtr; }
|
||||
|
||||
std::function<void()> getCallerLambda()
|
||||
{ return nursery.buildCallerLambdaForSlot(slot); }
|
||||
|
||||
sscl::SyncCancelerForAsyncWork &getSyncCanceler()
|
||||
{ return slot.syncCanceler; }
|
||||
|
||||
void setOnSettledHook(std::function<void()> hook)
|
||||
{
|
||||
if (slot.leaseStatus != SlotLeaseStatus::RESERVED)
|
||||
{
|
||||
throw std::runtime_error(
|
||||
std::string(__func__)
|
||||
+ ": must be called before fillSlot()");
|
||||
}
|
||||
|
||||
slot.onSettledHook = std::move(hook);
|
||||
}
|
||||
|
||||
/** Factory must create the invoker. Deferred construction is
|
||||
* required because non-viral coroutines may complete synchronously
|
||||
* during invoker construction, before fillSlot() can store the
|
||||
* record.
|
||||
*/
|
||||
template <class InvokerFactory>
|
||||
void fillSlot(InvokerFactory &&invokerFactory)
|
||||
{
|
||||
Slot &reservedSlot = slot;
|
||||
|
||||
if (reservedSlot.memberInvoker)
|
||||
{
|
||||
throw std::runtime_error(
|
||||
std::string(__func__) + ": slot already filled");
|
||||
}
|
||||
|
||||
if (reservedSlot.leaseStatus != SlotLeaseStatus::RESERVED)
|
||||
{
|
||||
throw std::runtime_error(
|
||||
std::string(__func__) + ": slot is not reserved");
|
||||
}
|
||||
|
||||
reservedSlot.leaseStatus = SlotLeaseStatus::ACTIVE_UNSETTLED;
|
||||
auto invoker = invokerFactory();
|
||||
|
||||
if (reservedSlot.leaseStatus == SlotLeaseStatus::RETIRED)
|
||||
{
|
||||
/** EXPLANATION:
|
||||
* Non-viral coroutines may complete synchronously inside
|
||||
* the factory. Retirement already ran; the local invoker
|
||||
* must be allowed to destroy the callee frame on return.
|
||||
*/
|
||||
return;
|
||||
}
|
||||
|
||||
reservedSlot.memberInvoker = std::make_unique<
|
||||
detail::MemberInvoker<std::decay_t<decltype(invoker)>>>(
|
||||
std::move(invoker));
|
||||
}
|
||||
|
||||
void commit()
|
||||
{
|
||||
if (slotCommittedSoLeaseShouldntDestroy)
|
||||
{
|
||||
throw std::runtime_error(
|
||||
std::string(__func__) + ": lease already committed");
|
||||
}
|
||||
|
||||
Slot &reservedSlot = slot;
|
||||
|
||||
if (reservedSlot.leaseStatus == SlotLeaseStatus::RESERVED)
|
||||
{
|
||||
throw std::runtime_error(
|
||||
std::string(__func__)
|
||||
+ ": fillSlot() required before commit()");
|
||||
}
|
||||
|
||||
slotCommittedSoLeaseShouldntDestroy = true;
|
||||
}
|
||||
|
||||
Handle handle() const
|
||||
{
|
||||
return Handle(slot);
|
||||
}
|
||||
|
||||
private:
|
||||
friend class NonViralTaskNursery;
|
||||
|
||||
Lease(NonViralTaskNursery &_nursery, Slot &_slot) noexcept
|
||||
: nursery(_nursery), slot(_slot)
|
||||
{}
|
||||
|
||||
NonViralTaskNursery &nursery;
|
||||
Slot &slot;
|
||||
bool slotCommittedSoLeaseShouldntDestroy = false;
|
||||
};
|
||||
|
||||
private:
|
||||
friend class NonViralTaskNursery;
|
||||
friend class Lease;
|
||||
|
||||
SlotLeaseStatus leaseStatus = SlotLeaseStatus::RESERVED;
|
||||
SlotSettlementStatus settlementStatus = SlotSettlementStatus::UNSETTLED;
|
||||
std::exception_ptr exceptionPtr = nullptr;
|
||||
sscl::SyncCancelerForAsyncWork syncCanceler;
|
||||
std::unique_ptr<detail::MemberInvokerBase> memberInvoker;
|
||||
std::function<void()> onSettledHook;
|
||||
};
|
||||
|
||||
void openAdmission()
|
||||
{
|
||||
sscl::SpinLock::Guard guard(s.lock);
|
||||
s.rsrc.admissionOpen = true;
|
||||
}
|
||||
|
||||
void closeAdmission()
|
||||
{
|
||||
sscl::SpinLock::Guard guard(s.lock);
|
||||
s.rsrc.admissionOpen = false;
|
||||
}
|
||||
|
||||
bool admissionIsOpen() const
|
||||
{
|
||||
sscl::SpinLock::Guard guard(s.lock);
|
||||
return s.rsrc.admissionOpen;
|
||||
}
|
||||
|
||||
bool allSettled() const
|
||||
{ return unsettledCount() == 0; }
|
||||
|
||||
std::size_t unsettledCount() const
|
||||
{
|
||||
sscl::SpinLock::Guard guard(s.lock);
|
||||
return countUnsettledSlotsUnlocked();
|
||||
}
|
||||
|
||||
Slot::Lease getNewSlotLease()
|
||||
{
|
||||
sscl::SpinLock::Guard guard(s.lock);
|
||||
|
||||
if (!s.rsrc.admissionOpen)
|
||||
{
|
||||
throw std::runtime_error(
|
||||
std::string(__func__) + ": admission closed");
|
||||
}
|
||||
|
||||
pruneRetiredSlotsUnlocked();
|
||||
s.rsrc.slots.emplace_back();
|
||||
Slot &slot = s.rsrc.slots.back();
|
||||
return Slot::Lease(*this, slot);
|
||||
}
|
||||
|
||||
void requestCancelOnAll()
|
||||
{
|
||||
sscl::SpinLock::Guard guard(s.lock);
|
||||
|
||||
for (auto &slot : s.rsrc.slots)
|
||||
{
|
||||
if (slot.leaseStatus != SlotLeaseStatus::ACTIVE_UNSETTLED) {
|
||||
continue;
|
||||
}
|
||||
|
||||
slot.syncCanceler.requestStop();
|
||||
}
|
||||
}
|
||||
|
||||
void asyncAwaitAllSettlements(std::function<void()> callback)
|
||||
{
|
||||
std::function<void()> waiterToInvoke;
|
||||
|
||||
{
|
||||
sscl::SpinLock::Guard guard(s.lock);
|
||||
|
||||
if (s.rsrc.admissionOpen)
|
||||
{
|
||||
throw std::runtime_error(
|
||||
std::string(__func__)
|
||||
+ ": admission must be closed before awaiting drain");
|
||||
}
|
||||
|
||||
if (countUnsettledSlotsUnlocked() == 0) {
|
||||
waiterToInvoke = std::move(callback);
|
||||
}
|
||||
else if (s.rsrc.drainWaiter)
|
||||
{
|
||||
throw std::runtime_error(
|
||||
std::string(__func__)
|
||||
+ ": drain waiter already registered");
|
||||
}
|
||||
else {
|
||||
s.rsrc.drainWaiter = std::move(callback);
|
||||
}
|
||||
}
|
||||
|
||||
if (waiterToInvoke) {
|
||||
waiterToInvoke();
|
||||
}
|
||||
}
|
||||
|
||||
void syncAwaitAllSettlements(boost::asio::io_context &ioContext)
|
||||
{
|
||||
if (admissionIsOpen())
|
||||
{
|
||||
throw std::runtime_error(
|
||||
std::string(__func__)
|
||||
+ ": admission must be closed before awaiting drain");
|
||||
}
|
||||
|
||||
if (allSettled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (ioContext.stopped())
|
||||
{
|
||||
throw std::runtime_error(
|
||||
std::string(__func__) + ": provided io_context is stopped");
|
||||
}
|
||||
|
||||
/** EXPLANATION:
|
||||
* Drain may run on the thread that processes a completion callback,
|
||||
* not necessarily the thread blocked in waitForAsyncOperationComplete.
|
||||
* Keep the bridge off the waiter thread's stack.
|
||||
*/
|
||||
auto bridge = std::make_shared<sscl::cps::AsynchronousBridge>(
|
||||
ioContext);
|
||||
asyncAwaitAllSettlements(
|
||||
[bridge]()
|
||||
{
|
||||
bridge->setAsyncOperationComplete();
|
||||
});
|
||||
|
||||
bridge->waitForAsyncOperationCompleteOrIoContextStopped();
|
||||
}
|
||||
|
||||
template <class InvokerFactory>
|
||||
Slot::Handle launch(InvokerFactory &&factory)
|
||||
{
|
||||
auto lease = getNewSlotLease();
|
||||
lease.getSyncCanceler().startAcceptingWork();
|
||||
lease.fillSlot(
|
||||
[&factory, &lease]()
|
||||
{
|
||||
return std::forward<InvokerFactory>(factory)(lease);
|
||||
});
|
||||
lease.commit();
|
||||
return lease.handle();
|
||||
}
|
||||
|
||||
private:
|
||||
friend class Slot::Lease;
|
||||
|
||||
struct State
|
||||
{
|
||||
bool admissionOpen = false;
|
||||
std::list<Slot> slots;
|
||||
std::function<void()> drainWaiter;
|
||||
};
|
||||
|
||||
std::size_t countUnsettledSlotsUnlocked() const
|
||||
{
|
||||
std::size_t count = 0;
|
||||
|
||||
for (const auto &slot : s.rsrc.slots)
|
||||
{
|
||||
if (slot.leaseStatus == SlotLeaseStatus::ACTIVE_UNSETTLED) {
|
||||
++count;
|
||||
}
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
void releaseUncommittedSlot(Slot &slot)
|
||||
{
|
||||
std::function<void()> waiterToInvoke;
|
||||
|
||||
{
|
||||
sscl::SpinLock::Guard guard(s.lock);
|
||||
|
||||
if (slot.leaseStatus != SlotLeaseStatus::RESERVED) {
|
||||
return;
|
||||
}
|
||||
|
||||
slot.leaseStatus = SlotLeaseStatus::RETIRED;
|
||||
slot.memberInvoker.reset();
|
||||
waiterToInvoke = takeDrainWaiterIfDrainedUnlocked();
|
||||
}
|
||||
|
||||
if (waiterToInvoke) { waiterToInvoke(); }
|
||||
}
|
||||
|
||||
std::function<void()> buildCallerLambdaForSlot(Slot &slot)
|
||||
{
|
||||
return
|
||||
[this, slot = std::ref(slot)]()
|
||||
{
|
||||
retireSlot(slot.get());
|
||||
};
|
||||
}
|
||||
|
||||
void retireSlot(Slot &slot)
|
||||
{
|
||||
std::function<void()> waiterToInvoke;
|
||||
|
||||
{
|
||||
sscl::SpinLock::Guard guard(s.lock);
|
||||
|
||||
if (slot.leaseStatus != SlotLeaseStatus::ACTIVE_UNSETTLED)
|
||||
{
|
||||
throw std::runtime_error(
|
||||
std::string(__func__) + ": slot is not active and "
|
||||
"unsettled");
|
||||
}
|
||||
if (slot.settlementStatus != SlotSettlementStatus::UNSETTLED) {
|
||||
throw std::runtime_error(
|
||||
std::string(__func__) + ": slot is not unsettled");
|
||||
}
|
||||
|
||||
if (!verifySlotIsManagedUnlocked(slot)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (slot.onSettledHook) { slot.onSettledHook(); }
|
||||
}
|
||||
catch (const std::exception &e)
|
||||
{
|
||||
std::cerr
|
||||
<< "NonViralTaskNursery: onSettled hook exception: "
|
||||
<< e.what() << std::endl;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
std::cerr
|
||||
<< "NonViralTaskNursery: onSettled hook unknown exception"
|
||||
<< std::endl;
|
||||
}
|
||||
|
||||
if (slot.exceptionPtr) {
|
||||
slot.settlementStatus = SlotSettlementStatus::EXCEPTION_THROWN;
|
||||
} else {
|
||||
slot.settlementStatus = SlotSettlementStatus::COMPLETED;
|
||||
}
|
||||
|
||||
slot.leaseStatus = SlotLeaseStatus::RETIRED;
|
||||
slot.memberInvoker.reset();
|
||||
waiterToInvoke = takeDrainWaiterIfDrainedUnlocked();
|
||||
}
|
||||
|
||||
if (waiterToInvoke) { waiterToInvoke(); }
|
||||
}
|
||||
|
||||
/** Caller must hold s.lock. */
|
||||
bool verifySlotIsManagedUnlocked(const Slot &slot) const
|
||||
{
|
||||
for (const auto &trackedSlot : s.rsrc.slots)
|
||||
{
|
||||
if (&trackedSlot == &slot) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/** Caller must hold s.lock. */
|
||||
void pruneRetiredSlotsUnlocked()
|
||||
{
|
||||
for (auto it = s.rsrc.slots.begin(); it != s.rsrc.slots.end();)
|
||||
{
|
||||
if (it->leaseStatus == SlotLeaseStatus::RETIRED) {
|
||||
it = s.rsrc.slots.erase(it);
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Caller must hold s.lock. */
|
||||
std::function<void()> takeDrainWaiterIfDrainedUnlocked()
|
||||
{
|
||||
if (s.rsrc.admissionOpen) {
|
||||
return {};
|
||||
}
|
||||
|
||||
if (countUnsettledSlotsUnlocked() != 0) {
|
||||
return {};
|
||||
}
|
||||
|
||||
return std::exchange(s.rsrc.drainWaiter, {});
|
||||
}
|
||||
|
||||
public:
|
||||
mutable sscl::SharedResourceGroup<sscl::SpinLock, State> s;
|
||||
};
|
||||
|
||||
} // namespace sscl::co
|
||||
|
||||
#endif // NON_VIRAL_TASK_NURSERY_H
|
||||
Reference in New Issue
Block a user