Compare commits

...

4 Commits

Author SHA1 Message Date
hayodea ca2cccaa9c New multi-op result set class 2026-05-24 16:23:07 -04:00
hayodea a14d622eaf PuppetApp: Lifetime mgmt ops are now Viral
They no longer accept an exc_ptr and lambda for cb.
2026-05-24 16:11:08 -04:00
hayodea 16e0350245 CoQutex: Add instance name for debugging 2026-05-24 16:10:30 -04:00
hayodea 5f265567d1 Explain why CoQutex is superior to LockSet 2026-05-24 13:05:09 -04:00
6 changed files with 81 additions and 33 deletions
+20 -1
View File
@@ -6,6 +6,7 @@
#include <coroutine> #include <coroutine>
#include <deque> #include <deque>
#include <stdexcept> #include <stdexcept>
#include <string>
#include <type_traits> #include <type_traits>
#ifdef CONFIG_LIBSSCL_DEBUG_CO #ifdef CONFIG_LIBSSCL_DEBUG_CO
@@ -28,6 +29,15 @@ public:
class ReleaseHandle; class ReleaseHandle;
CoQutex() noexcept = default; CoQutex() noexcept = default;
CoQutex([[maybe_unused]] const std::string &_name) noexcept
:
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
name(_name),
#endif
isOwned(false)
{}
CoQutex(const CoQutex &) = delete; CoQutex(const CoQutex &) = delete;
CoQutex(CoQutex &&) noexcept = delete; CoQutex(CoQutex &&) noexcept = delete;
CoQutex &operator=(const CoQutex &) = delete; CoQutex &operator=(const CoQutex &) = delete;
@@ -77,7 +87,13 @@ public:
std::cout << __func__ << ": " << std::this_thread::get_id() << " Walking caller promise chain.\n"; std::cout << __func__ << ": " << std::this_thread::get_id() << " Walking caller promise chain.\n";
#endif #endif
if (link.holdsAcquiredLock(coQutex)) { if (link.holdsAcquiredLock(coQutex)) {
throw std::runtime_error("Deadlock detected: CoQutex re-acquire on caller promise chain."); std::string message =
"Deadlock detected: CoQutex re-acquire on caller promise chain";
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
message += " (" + coQutex.name + ")";
#endif
message += ".";
throw std::runtime_error(message);
} }
}); });
@@ -130,6 +146,9 @@ private:
waitingCoroutines.pop_front(); waitingCoroutines.pop_front();
} }
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
std::string name;
#endif
sscl::SpinLock spinLock; sscl::SpinLock spinLock;
bool isOwned = false; bool isOwned = false;
std::deque<AcquireInvocationAndSuspensionPolicy::WaitingCoroutine> waitingCoroutines; std::deque<AcquireInvocationAndSuspensionPolicy::WaitingCoroutine> waitingCoroutines;
+15
View File
@@ -15,6 +15,21 @@ class Qutex;
/** /**
* @brief LockSet - Manages a collection of locks for acquisition/release * @brief LockSet - Manages a collection of locks for acquisition/release
*
* LockSet exists only because the CPS re-enqueuing model had no way to acquire
* locks in a fine-grained way. A LockerAndInvoker could re-post only the entire
* continuation, and only before that continuation began executing; there was no
* mechanism to re-enqueue individual segments within a continuation. The
* practical consequence was that all required Qutexes had to be acquired at
* once up front, before the continuation body could run at all.
*
* releaseQutexEarly() was a partial workaround for finer-grained control, but
* it only helped on the release side and did not solve the fundamental problem
* of acquiring locks one-at-a-time mid-sequence.
*
* co::CoQutex supersedes this abstraction: coroutines can co_await individual
* locks at the points where they are actually needed, which is the finer control
* LockSet and releaseQutexEarly() were aiming for with limited success.
*/ */
class LockSet class LockSet
{ {
@@ -0,0 +1,29 @@
#ifndef MULTI_OPERATION_RESULT_SET_H
#define MULTI_OPERATION_RESULT_SET_H
namespace sscl {
/** Plain aggregate for fan-out / fan-in results returned from coroutines. */
struct MultiOperationResultSet
{
MultiOperationResultSet(
unsigned int total = 0,
unsigned int succeeded = 0,
unsigned int failed = 0)
: nTotal(total), nSucceeded(succeeded), nFailed(failed)
{}
bool isComplete() const
{ return nSucceeded + nFailed == nTotal; }
bool nTotalIsZero() const
{ return nTotal == 0; }
unsigned int nTotal;
unsigned int nSucceeded;
unsigned int nFailed;
};
} // namespace sscl
#endif // MULTI_OPERATION_RESULT_SET_H
+5 -14
View File
@@ -2,8 +2,6 @@
#define PUPPET_APPLICATION_H #define PUPPET_APPLICATION_H
#include <config.h> #include <config.h>
#include <exception>
#include <functional>
#include <memory> #include <memory>
#include <string_view> #include <string_view>
#include <vector> #include <vector>
@@ -22,16 +20,11 @@ public:
const std::vector<std::shared_ptr<PuppetThread>> &threads); const std::vector<std::shared_ptr<PuppetThread>> &threads);
~PuppetApplication() = default; ~PuppetApplication() = default;
co::ViralNonPostingInvoker<void> joltAllPuppetThreadsCReq( co::ViralNonPostingInvoker<void> joltAllPuppetThreadsCReq();
std::exception_ptr &exceptionPtr, std::function<void()> callback); co::ViralNonPostingInvoker<void> startAllPuppetThreadsCReq();
co::ViralNonPostingInvoker<void> startAllPuppetThreadsCReq( co::ViralNonPostingInvoker<void> pauseAllPuppetThreadsCReq();
std::exception_ptr &exceptionPtr, std::function<void()> callback); co::ViralNonPostingInvoker<void> resumeAllPuppetThreadsCReq();
co::ViralNonPostingInvoker<void> pauseAllPuppetThreadsCReq( co::ViralNonPostingInvoker<void> exitAllPuppetThreadsCReq();
std::exception_ptr &exceptionPtr, std::function<void()> callback);
co::ViralNonPostingInvoker<void> resumeAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback);
co::ViralNonPostingInvoker<void> exitAllPuppetThreadsCReq(
std::exception_ptr &exceptionPtr, std::function<void()> callback);
// CPU distribution method // CPU distribution method
void distributeAndPinThreadsAcrossCpus(); void distributeAndPinThreadsAcrossCpus();
@@ -71,8 +64,6 @@ protected:
private: private:
co::ViralNonPostingInvoker<void> allPuppetThreadsLifetimeOpCReq( co::ViralNonPostingInvoker<void> allPuppetThreadsLifetimeOpCReq(
std::exception_ptr &exceptionPtr,
std::function<void()> callback,
PuppetThread::ThreadOp threadOp, PuppetThread::ThreadOp threadOp,
std::string_view emptyThreadsLogMessage); std::string_view emptyThreadsLogMessage);
}; };
+7
View File
@@ -1,6 +1,8 @@
#ifndef SHARED_RESOURCE_GROUP_H #ifndef SHARED_RESOURCE_GROUP_H
#define SHARED_RESOURCE_GROUP_H #define SHARED_RESOURCE_GROUP_H
#include <string>
namespace sscl { namespace sscl {
template <typename LockType, typename ResourceType> template <typename LockType, typename ResourceType>
@@ -8,6 +10,11 @@ class SharedResourceGroup
{ {
public: public:
SharedResourceGroup() = default; SharedResourceGroup() = default;
explicit SharedResourceGroup(const std::string& lockName)
: lock(lockName)
{}
~SharedResourceGroup() = default; ~SharedResourceGroup() = default;
LockType lock; LockType lock;
+5 -18
View File
@@ -63,9 +63,7 @@ void PuppetApplication::addAllPuppetLifetimeInvokersToGroup(
} }
co::ViralNonPostingInvoker<void> co::ViralNonPostingInvoker<void>
PuppetApplication::joltAllPuppetThreadsCReq( PuppetApplication::joltAllPuppetThreadsCReq()
[[maybe_unused]] std::exception_ptr &exceptionPtr,
[[maybe_unused]] std::function<void()> callback)
{ {
if (threadsHaveBeenJolted) if (threadsHaveBeenJolted)
{ {
@@ -94,8 +92,6 @@ PuppetApplication::joltAllPuppetThreadsCReq(
co::ViralNonPostingInvoker<void> co::ViralNonPostingInvoker<void>
PuppetApplication::allPuppetThreadsLifetimeOpCReq( PuppetApplication::allPuppetThreadsLifetimeOpCReq(
[[maybe_unused]] std::exception_ptr &exceptionPtr,
[[maybe_unused]] std::function<void()> callback,
PuppetThread::ThreadOp threadOp, PuppetThread::ThreadOp threadOp,
std::string_view emptyThreadsLogMessage) std::string_view emptyThreadsLogMessage)
{ {
@@ -116,39 +112,31 @@ PuppetApplication::allPuppetThreadsLifetimeOpCReq(
} }
co::ViralNonPostingInvoker<void> co::ViralNonPostingInvoker<void>
PuppetApplication::startAllPuppetThreadsCReq( PuppetApplication::startAllPuppetThreadsCReq()
std::exception_ptr &exceptionPtr, std::function<void()> callback)
{ {
return allPuppetThreadsLifetimeOpCReq( return allPuppetThreadsLifetimeOpCReq(
exceptionPtr, std::move(callback),
PuppetThread::ThreadOp::START, PuppetThread::ThreadOp::START,
noPuppetThreadsToStartLogMessage); noPuppetThreadsToStartLogMessage);
} }
co::ViralNonPostingInvoker<void> co::ViralNonPostingInvoker<void>
PuppetApplication::pauseAllPuppetThreadsCReq( PuppetApplication::pauseAllPuppetThreadsCReq()
std::exception_ptr &exceptionPtr, std::function<void()> callback)
{ {
return allPuppetThreadsLifetimeOpCReq( return allPuppetThreadsLifetimeOpCReq(
exceptionPtr, std::move(callback),
PuppetThread::ThreadOp::PAUSE, PuppetThread::ThreadOp::PAUSE,
noPuppetThreadsToPauseLogMessage); noPuppetThreadsToPauseLogMessage);
} }
co::ViralNonPostingInvoker<void> co::ViralNonPostingInvoker<void>
PuppetApplication::resumeAllPuppetThreadsCReq( PuppetApplication::resumeAllPuppetThreadsCReq()
std::exception_ptr &exceptionPtr, std::function<void()> callback)
{ {
return allPuppetThreadsLifetimeOpCReq( return allPuppetThreadsLifetimeOpCReq(
exceptionPtr, std::move(callback),
PuppetThread::ThreadOp::RESUME, PuppetThread::ThreadOp::RESUME,
noPuppetThreadsToResumeLogMessage); noPuppetThreadsToResumeLogMessage);
} }
co::ViralNonPostingInvoker<void> co::ViralNonPostingInvoker<void>
PuppetApplication::exitAllPuppetThreadsCReq( PuppetApplication::exitAllPuppetThreadsCReq()
std::exception_ptr &exceptionPtr,
std::function<void()> callback)
{ {
if (componentThreads.empty()) if (componentThreads.empty())
{ {
@@ -157,7 +145,6 @@ PuppetApplication::exitAllPuppetThreadsCReq(
} }
co_await allPuppetThreadsLifetimeOpCReq( co_await allPuppetThreadsLifetimeOpCReq(
exceptionPtr, std::move(callback),
PuppetThread::ThreadOp::EXIT, PuppetThread::ThreadOp::EXIT,
noPuppetThreadsToExitLogMessage); noPuppetThreadsToExitLogMessage);