Impl ViralNonPostingInv; fix member coro thisptr;

This commit is contained in:
2026-05-24 02:25:04 -04:00
parent abdb857e55
commit daad2a8c95
9 changed files with 424 additions and 119 deletions
+1 -1
View File
@@ -439,7 +439,7 @@ struct Group
* would be impossible.
*
* So we should be able to call resume() directly here without
* post()ing to current_io_context().
* post()ing to ComponentThread::getSelf()->getIoService().
*
* EXPLANATION:
* However, in order to ensure that we keep this adapter coro
+72 -6
View File
@@ -47,7 +47,10 @@ struct NonViralPostingInvoker
*/
std::ostringstream oss;
oss << std::this_thread::get_id()
<< ": Missing completion lambda: non-viral coroutines require a completion lambda.";
<< ": Missing completion lambda: non-viral coroutines require a completion lambda."
<< " Promise type=" << typeid(*this).name()
<< ". This usually means promise construction did not bind the"
<< " (exception_ptr&, function<void()>, ...) constructor.";
throw std::runtime_error(oss.str());
}
@@ -153,12 +156,12 @@ struct ViralPostingInvoker
* from get_return_object(). ~NonPostingInvoker destroys the callee frame.
*/
struct NonViralNonPostingInvoker
: public NonPostingInvoker<NonPostingPromise>
: public NonPostingInvoker<NonPostingPromise<void>, void>
{
struct promise_type
: public NonPostingPromise
: public NonPostingPromise<void>
{
using NonPostingPromise::NonPostingPromise;
using NonPostingPromise<void>::NonPostingPromise;
NonViralNonPostingInvoker get_return_object()
{
@@ -169,7 +172,10 @@ struct NonViralNonPostingInvoker
{
std::ostringstream oss;
oss << std::this_thread::get_id()
<< ": Missing completion lambda: non-viral coroutines require a completion lambda.";
<< ": Missing completion lambda: non-viral coroutines require a completion lambda."
<< " Promise type=" << typeid(*this).name()
<< ". This usually means promise construction did not bind the"
<< " (exception_ptr&, function<void()>, ...) constructor.";
throw std::runtime_error(oss.str());
}
@@ -180,7 +186,7 @@ struct NonViralNonPostingInvoker
}
};
using NonPostingInvoker<NonPostingPromise>::NonPostingInvoker;
using NonPostingInvoker<NonPostingPromise<void>, void>::NonPostingInvoker;
bool await_ready() const noexcept
{ std::terminate(); }
@@ -192,6 +198,66 @@ struct NonViralNonPostingInvoker
{ std::terminate(); }
};
/** Viral awaitable non-posting coroutine: runs eagerly on the caller thread
* (initial_suspend is never). Caller resume uses symmetric transfer when the
* caller has registered before callee completion; otherwise PostBackStatus
* fast-paths await_resume on co_await.
*/
template <typename T = void>
struct ViralNonPostingInvoker
: public NonPostingInvoker<NonPostingPromise<T>, T>
{
struct promise_type
: public NonPostingPromise<T>
{
using NonPostingPromise<T>::NonPostingPromise;
ViralNonPostingInvoker<T> get_return_object() noexcept
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id()
<< " Returning ViralNonPostingInvoker.\n";
#endif
this->setSelfSchedHandle(
std::coroutine_handle<promise_type>::from_promise(*this));
return ViralNonPostingInvoker<T>(*this);
}
};
using NonPostingInvoker<NonPostingPromise<T>, T>::NonPostingInvoker;
bool await_ready() const noexcept
{ return false; }
template <typename CallerPromise>
bool await_suspend(
std::coroutine_handle<CallerPromise> callerSchedHandle) noexcept
{
static_assert(
std::is_base_of_v<PromiseChainLink, CallerPromise>,
"ViralNonPostingInvoker caller promise must derive from "
"PromiseChainLink");
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id()
<< " Setting callerSchedHandle.\n";
#endif
const bool suspendCaller =
this->setCallerSchedHandle(callerSchedHandle);
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id()
<< " CallerFlowExecutor returned suspend=" << suspendCaller
<< ".\n";
#endif
return suspendCaller;
}
auto await_resume()
{
return NonPostingInvoker<NonPostingPromise<T>, T>::await_resume();
}
};
} // namespace sscl::co
#endif // INVOKERS_H
+45 -4
View File
@@ -5,13 +5,14 @@
#include <coroutine>
#include <iostream>
#include <thread>
#include <type_traits>
#include <utility>
#include <spinscale/co/nonPostingPromise.h>
namespace sscl::co {
template <typename PromiseType>
template <typename PromiseType, typename T>
class NonPostingInvoker
{
public:
@@ -39,15 +40,55 @@ public:
}
}
ReturnValues<void> &completedReturnValues() noexcept
template <typename CallerPromise>
bool setCallerSchedHandle(
std::coroutine_handle<CallerPromise> callerSchedHandle) noexcept
{
static_assert(
std::is_base_of_v<PromiseChainLink, CallerPromise>,
"NonPostingInvoker caller promise must derive from PromiseChainLink");
calleePromise.callerSchedHandle = callerSchedHandle;
calleePromise.setCallerPromiseChainLink(
&callerSchedHandle.promise());
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id()
<< " Done setting callerSchedHandle; running CallerFlowExecutor.\n";
#endif
return calleePromise.postBackStatus.getCallerFlowExecutor()();
}
ReturnValues<T> &completedReturnValues() noexcept
{ return calleePromise.returnValues; }
const ReturnValues<void> &completedReturnValues() const noexcept
const ReturnValues<T> &completedReturnValues() const noexcept
{ return calleePromise.returnValues; }
private:
auto await_resume()
{
calleePromise.postBackStatus.reset();
ReturnValues<T> &returnValues = calleePromise.returnValues;
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id()
<< " About to check for and rethrow any exception.\n";
#endif
if (returnValues.myExceptionPtr) {
std::exception_ptr const captured = returnValues.myExceptionPtr;
std::rethrow_exception(captured);
}
if constexpr (!std::is_void_v<T>)
{
T result = std::move(returnValues.myReturnValue);
return result;
}
}
protected:
PromiseType &calleePromise;
private:
/** Every live invoker owns destruction of its callee coroutine frame in
* ~NonPostingInvoker (via calleePromise.selfSchedHandle).
*
+139 -17
View File
@@ -9,15 +9,99 @@
#include <thread>
#include <utility>
#include <spinscale/spinLock.h>
#include <spinscale/co/coQutex.h>
#include <spinscale/co/promiseChainLink.h>
#include <spinscale/co/promises.h>
namespace sscl::co {
template <typename T>
struct NonPostingPromise
: public PromiseChainLink
: public PromiseChainLink,
public PostingPromiseReturnOps<NonPostingPromise<T>, T>
{
struct PostBackStatus
{
struct CalleeFlowExecutor;
struct CallerFlowExecutor;
friend struct CalleeFlowExecutor;
friend struct CallerFlowExecutor;
explicit PostBackStatus(NonPostingPromise &calleePromiseIn) noexcept
: calleePromise(calleePromiseIn)
{}
void reset() noexcept
{
sscl::SpinLock::Guard guard(lock);
callerHasSetCallerSchedHandle = false;
calleeIsReadyToPostBack = false;
}
struct FlowExecutor
{
explicit FlowExecutor(PostBackStatus &parentIn) noexcept
: parent(parentIn)
{}
PostBackStatus &parent;
};
struct CalleeFlowExecutor
: public FlowExecutor
{
explicit CalleeFlowExecutor(PostBackStatus &parentIn) noexcept
: FlowExecutor(parentIn)
{}
bool operator()() noexcept
{
sscl::SpinLock::Guard guard(this->parent.lock);
this->parent.calleeIsReadyToPostBack = true;
if (this->parent.callerHasSetCallerSchedHandle) {
return true;
}
return false;
}
};
struct CallerFlowExecutor
: public FlowExecutor
{
explicit CallerFlowExecutor(PostBackStatus &parentIn) noexcept
: FlowExecutor(parentIn)
{}
bool operator()() noexcept
{
sscl::SpinLock::Guard guard(this->parent.lock);
this->parent.callerHasSetCallerSchedHandle = true;
if (this->parent.calleeIsReadyToPostBack) {
return false;
}
return true;
}
};
CalleeFlowExecutor getCalleeFlowExecutor() noexcept
{
return CalleeFlowExecutor(*this);
}
CallerFlowExecutor getCallerFlowExecutor() noexcept
{
return CallerFlowExecutor(*this);
}
NonPostingPromise &calleePromise;
private:
sscl::SpinLock lock;
bool callerHasSetCallerSchedHandle = false;
bool calleeIsReadyToPostBack = false;
};
/** Completion work must run from this awaiter's await_suspend, not
* synchronously inside promise.final_suspend() before it returns: the
* hidden coroutine segment index in the coroutine state is only advanced
@@ -26,16 +110,19 @@ struct NonPostingPromise
struct FinalSuspendNonPostingInvoker
: public std::suspend_always
{
explicit FinalSuspendNonPostingInvoker(NonPostingPromise &calleePromiseIn) noexcept
: calleePromise(calleePromiseIn)
explicit FinalSuspendNonPostingInvoker(
NonPostingPromise &calleePromiseIn) noexcept
: calleePromise(calleePromiseIn)
{}
bool await_suspend(std::coroutine_handle<> const) noexcept
std::coroutine_handle<> await_suspend(
std::coroutine_handle<> const) noexcept
{
if (calleePromise.callerLambda)
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << "final_suspend" << ": " << std::this_thread::get_id()
std::cout << "final_suspend" << ": "
<< std::this_thread::get_id()
<< " Non-viral non-posting: invoking callerLambda directly.\n";
#endif
if (calleePromise.returnValues.myExceptionPtr) {
@@ -44,15 +131,29 @@ struct NonPostingPromise
}
calleePromise.callerLambda();
return std::noop_coroutine();
}
return true;
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << "final_suspend" << ": " << std::this_thread::get_id()
<< " Viral non-posting: running CalleeFlowExecutor.\n";
#endif
const bool symmetricTransferToCaller =
calleePromise.postBackStatus.getCalleeFlowExecutor()();
if (symmetricTransferToCaller && calleePromise.callerSchedHandle) {
return calleePromise.callerSchedHandle;
}
return std::noop_coroutine();
}
NonPostingPromise &calleePromise;
};
NonPostingPromise() noexcept
: returnValues()
: returnValues(),
postBackStatus(*this)
{}
template <typename... TailArgs>
@@ -61,13 +162,27 @@ struct NonPostingPromise
std::function<void()> callerLambdaIn,
TailArgs &&...) noexcept
: returnValues(callerExceptionPtr),
callerLambda(std::move(callerLambdaIn))
callerLambda(std::move(callerLambdaIn)),
postBackStatus(*this)
{}
template <typename ObjectArg, typename... TailArgs>
requires (!std::same_as<std::remove_cvref_t<ObjectArg>, std::exception_ptr>)
NonPostingPromise(
ObjectArg &&,
std::exception_ptr &callerExceptionPtr,
std::function<void()> callerLambdaIn,
TailArgs &&...) noexcept
: NonPostingPromise(
callerExceptionPtr,
std::move(callerLambdaIn))
{}
~NonPostingPromise() noexcept
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
std::cout << __func__ << ": " << std::this_thread::get_id() << " Destructing.\n";
std::cout << __func__ << ": " << std::this_thread::get_id()
<< " Destructing.\n";
#endif
}
@@ -77,9 +192,6 @@ struct NonPostingPromise
auto final_suspend() noexcept
{ return FinalSuspendNonPostingInvoker(*this); }
void return_void() noexcept
{ return; }
void unhandled_exception() noexcept
{
returnValues.myExceptionPtr = std::current_exception();
@@ -90,16 +202,26 @@ struct NonPostingPromise
eraseFirstMatchingAcquiredLock(coQutex);
}
const PromiseChainLink *callerPromiseChainLink() const noexcept override
{ return callerChainLink; }
PromiseChainLink *callerPromiseChainLink() noexcept override
{ return callerChainLink; }
void setSelfSchedHandle(std::coroutine_handle<> schedHandle) noexcept
{
selfSchedHandle = schedHandle;
}
{ selfSchedHandle = schedHandle; }
ReturnValues<void> returnValues;
void setCallerPromiseChainLink(PromiseChainLink *chainLink) noexcept
{ callerChainLink = chainLink; }
ReturnValues<T> returnValues;
std::function<void()> callerLambda;
PostBackStatus postBackStatus;
std::coroutine_handle<> selfSchedHandle;
std::coroutine_handle<> callerSchedHandle;
PromiseChainLink *callerChainLink = nullptr;
template <typename>
template <typename, typename>
friend class NonPostingInvoker;
};
+30
View File
@@ -6,6 +6,7 @@
#include <exception>
#include <functional>
#include <iostream>
#include <typeinfo>
#include <thread>
#include <type_traits>
#include <utility>
@@ -267,6 +268,22 @@ struct PostingPromise
postBackStatus(*this)
{}
/** Member coroutines pass the implicit object parameter before explicit
* (exceptionPtr, callback, ...) args. Discard the object and delegate to
* the free-function constructor shape.
*/
template <typename ObjectArg, typename... TailArgs>
requires (!std::same_as<std::remove_cvref_t<ObjectArg>, std::exception_ptr>)
PostingPromise(
ObjectArg &&,
std::exception_ptr &_callerExceptionPtr,
std::function<void()> _callerLambda,
TailArgs &&...) noexcept
: PostingPromise(
_callerExceptionPtr,
std::move(_callerLambda))
{}
~PostingPromise() noexcept
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
@@ -345,6 +362,19 @@ struct TaggedPostingPromise
std::forward<TailArgs>(tailArgs)...)
{}
template <typename ObjectArg, typename... TailArgs>
requires (!std::same_as<std::remove_cvref_t<ObjectArg>, std::exception_ptr>)
TaggedPostingPromise(
ObjectArg &&,
std::exception_ptr &_exceptionPtr,
std::function<void()> _callerLambda,
TailArgs &&... tailArgs) noexcept
: PostingPromise<T>(
_exceptionPtr,
std::move(_callerLambda),
std::forward<TailArgs>(tailArgs)...)
{}
auto initial_suspend() noexcept
{
#ifdef CONFIG_LIBSSCL_DEBUG_CO
+37 -11
View File
@@ -5,7 +5,6 @@
#include <atomic>
#include <thread>
#include <unordered_map>
#include <boost/asio/io_service.hpp>
#include <stdexcept>
#include <queue>
#include <functional>
@@ -14,9 +13,11 @@
#include <unistd.h>
#include <memory>
#include <coroutine>
#include <spinscale/cps/callback.h>
#include <cstdint>
#include <string>
#include <boost/asio/io_service.hpp>
#include <boost/asio/post.hpp>
#include <spinscale/cps/callback.h>
namespace sscl {
@@ -165,21 +166,40 @@ public:
struct ViralThreadLifetimeMgmtInvoker
{
struct AsyncState
{
std::atomic<bool> settled{false};
std::coroutine_handle<> callerSchedHandle;
};
ViralThreadLifetimeMgmtInvoker(
ThreadOp _threadOp,
PuppetThread &_parentThread,
const std::shared_ptr<PuppetThread> &_selfPtr = nullptr)
: threadOp(_threadOp),
asyncState(std::make_shared<AsyncState>()),
parentThread(_parentThread),
selfPtr(_selfPtr),
lifetimeMgmtCallback{
nullptr,
[this]()
[asyncState = asyncState]()
{
settled = true;
if (callerSchedHandle) {
callerSchedHandle.resume();
asyncState->settled.store(true, std::memory_order_release);
std::coroutine_handle<> handle =
asyncState->callerSchedHandle;
if (!handle) {
return;
}
/** Post resume to the puppeteer queue: direct resume() from
* within an asio completion handler can destroy adapter
* coroutine state while the handler is still unwinding.
*/
boost::asio::post(
ComponentThread::getPptr()->getIoService(),
[handle]() { handle.resume(); });
}}
{
if (threadOp == ThreadOp::JOLT && selfPtr == nullptr)
@@ -212,26 +232,32 @@ public:
}
}
bool await_ready() const noexcept { return settled; }
bool await_ready() const noexcept
{
return asyncState->settled.load(std::memory_order_acquire);
}
bool await_suspend(
std::coroutine_handle<> _callerSchedHandle) noexcept
{
if (settled) { return false; }
callerSchedHandle = _callerSchedHandle;
if (asyncState->settled.load(std::memory_order_acquire)) {
return false;
}
asyncState->callerSchedHandle = _callerSchedHandle;
return true;
}
void await_resume() noexcept {}
ThreadOp threadOp;
bool settled = false;
std::coroutine_handle<> callerSchedHandle;
std::shared_ptr<AsyncState> asyncState;
PuppetThread &parentThread;
const std::shared_ptr<PuppetThread> selfPtr;
cps::Callback<threadLifetimeMgmtOpCbFn> lifetimeMgmtCallback;
};
// Thread lifetime management request invokers
ViralThreadLifetimeMgmtInvoker startThreadAReq()
{ return ViralThreadLifetimeMgmtInvoker(ThreadOp::START, *this); }
ViralThreadLifetimeMgmtInvoker pauseThreadAReq()
+24 -6
View File
@@ -5,7 +5,10 @@
#include <exception>
#include <functional>
#include <memory>
#include <string_view>
#include <vector>
#include <spinscale/co/group.h>
#include <spinscale/co/invokers.h>
#include <spinscale/componentThread.h>
@@ -19,22 +22,30 @@ public:
const std::vector<std::shared_ptr<PuppetThread>> &threads);
~PuppetApplication() = default;
co::NonViralNonPostingInvoker joltAllPuppetThreadsCReq(
co::ViralNonPostingInvoker<void> joltAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback);
co::NonViralNonPostingInvoker startAllPuppetThreadsCReq(
co::ViralNonPostingInvoker<void> startAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback);
co::NonViralNonPostingInvoker pauseAllPuppetThreadsCReq(
co::ViralNonPostingInvoker<void> pauseAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback);
co::NonViralNonPostingInvoker resumeAllPuppetThreadsCReq(
co::ViralNonPostingInvoker<void> resumeAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback);
co::NonViralNonPostingInvoker exitAllPuppetThreadsCReq(
co::ViralNonPostingInvoker<void> exitAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback);
// CPU distribution method
void distributeAndPinThreadsAcrossCpus();
protected:
// Collection of PuppetThread instances
using PuppetLifetimeMgmtInvoker =
PuppetThread::ViralThreadLifetimeMgmtInvoker;
using PuppetLifetimeMgmtGroup = co::Group<PuppetLifetimeMgmtInvoker>;
void addAllPuppetLifetimeInvokersToGroup(
PuppetLifetimeMgmtGroup &group,
std::vector<PuppetLifetimeMgmtInvoker> &invokers,
PuppetThread::ThreadOp threadOp) const;
std::vector<std::shared_ptr<PuppetThread>> componentThreads;
/**
@@ -57,6 +68,13 @@ protected:
* a synchronization point for the entire system initialization.
*/
bool threadsHaveBeenJolted = false;
private:
co::ViralNonPostingInvoker<void> allPuppetThreadsLifetimeOpCReq(
std::exception_ptr &exceptionPtr,
std::function<void()> callback,
PuppetThread::ThreadOp threadOp,
std::string_view emptyThreadsLogMessage);
};
} // namespace sscl