mirror of
https://github.com/latentPrion/libspinscale.git
synced 2026-06-23 19:48:32 +00:00
Compare commits
4 Commits
e7707dacdf
...
ca2cccaa9c
| Author | SHA1 | Date | |
|---|---|---|---|
| ca2cccaa9c | |||
| a14d622eaf | |||
| 16e0350245 | |||
| 5f265567d1 |
@@ -6,6 +6,7 @@
|
||||
#include <coroutine>
|
||||
#include <deque>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <type_traits>
|
||||
|
||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||
@@ -28,6 +29,15 @@ public:
|
||||
class ReleaseHandle;
|
||||
|
||||
CoQutex() noexcept = default;
|
||||
|
||||
CoQutex([[maybe_unused]] const std::string &_name) noexcept
|
||||
:
|
||||
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
||||
name(_name),
|
||||
#endif
|
||||
isOwned(false)
|
||||
{}
|
||||
|
||||
CoQutex(const CoQutex &) = delete;
|
||||
CoQutex(CoQutex &&) noexcept = delete;
|
||||
CoQutex &operator=(const CoQutex &) = delete;
|
||||
@@ -77,7 +87,13 @@ public:
|
||||
std::cout << __func__ << ": " << std::this_thread::get_id() << " Walking caller promise chain.\n";
|
||||
#endif
|
||||
if (link.holdsAcquiredLock(coQutex)) {
|
||||
throw std::runtime_error("Deadlock detected: CoQutex re-acquire on caller promise chain.");
|
||||
std::string message =
|
||||
"Deadlock detected: CoQutex re-acquire on caller promise chain";
|
||||
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
||||
message += " (" + coQutex.name + ")";
|
||||
#endif
|
||||
message += ".";
|
||||
throw std::runtime_error(message);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -130,6 +146,9 @@ private:
|
||||
waitingCoroutines.pop_front();
|
||||
}
|
||||
|
||||
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
||||
std::string name;
|
||||
#endif
|
||||
sscl::SpinLock spinLock;
|
||||
bool isOwned = false;
|
||||
std::deque<AcquireInvocationAndSuspensionPolicy::WaitingCoroutine> waitingCoroutines;
|
||||
|
||||
@@ -15,6 +15,21 @@ class Qutex;
|
||||
|
||||
/**
|
||||
* @brief LockSet - Manages a collection of locks for acquisition/release
|
||||
*
|
||||
* LockSet exists only because the CPS re-enqueuing model had no way to acquire
|
||||
* locks in a fine-grained way. A LockerAndInvoker could re-post only the entire
|
||||
* continuation, and only before that continuation began executing; there was no
|
||||
* mechanism to re-enqueue individual segments within a continuation. The
|
||||
* practical consequence was that all required Qutexes had to be acquired at
|
||||
* once up front, before the continuation body could run at all.
|
||||
*
|
||||
* releaseQutexEarly() was a partial workaround for finer-grained control, but
|
||||
* it only helped on the release side and did not solve the fundamental problem
|
||||
* of acquiring locks one-at-a-time mid-sequence.
|
||||
*
|
||||
* co::CoQutex supersedes this abstraction: coroutines can co_await individual
|
||||
* locks at the points where they are actually needed, which is the finer control
|
||||
* LockSet and releaseQutexEarly() were aiming for with limited success.
|
||||
*/
|
||||
class LockSet
|
||||
{
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
#ifndef MULTI_OPERATION_RESULT_SET_H
|
||||
#define MULTI_OPERATION_RESULT_SET_H
|
||||
|
||||
namespace sscl {
|
||||
|
||||
/** Plain aggregate for fan-out / fan-in results returned from coroutines. */
|
||||
struct MultiOperationResultSet
|
||||
{
|
||||
MultiOperationResultSet(
|
||||
unsigned int total = 0,
|
||||
unsigned int succeeded = 0,
|
||||
unsigned int failed = 0)
|
||||
: nTotal(total), nSucceeded(succeeded), nFailed(failed)
|
||||
{}
|
||||
|
||||
bool isComplete() const
|
||||
{ return nSucceeded + nFailed == nTotal; }
|
||||
|
||||
bool nTotalIsZero() const
|
||||
{ return nTotal == 0; }
|
||||
|
||||
unsigned int nTotal;
|
||||
unsigned int nSucceeded;
|
||||
unsigned int nFailed;
|
||||
};
|
||||
|
||||
} // namespace sscl
|
||||
|
||||
#endif // MULTI_OPERATION_RESULT_SET_H
|
||||
@@ -2,8 +2,6 @@
|
||||
#define PUPPET_APPLICATION_H
|
||||
|
||||
#include <config.h>
|
||||
#include <exception>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <string_view>
|
||||
#include <vector>
|
||||
@@ -22,16 +20,11 @@ public:
|
||||
const std::vector<std::shared_ptr<PuppetThread>> &threads);
|
||||
~PuppetApplication() = default;
|
||||
|
||||
co::ViralNonPostingInvoker<void> joltAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
||||
co::ViralNonPostingInvoker<void> startAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
||||
co::ViralNonPostingInvoker<void> pauseAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
||||
co::ViralNonPostingInvoker<void> resumeAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
||||
co::ViralNonPostingInvoker<void> exitAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
||||
co::ViralNonPostingInvoker<void> joltAllPuppetThreadsCReq();
|
||||
co::ViralNonPostingInvoker<void> startAllPuppetThreadsCReq();
|
||||
co::ViralNonPostingInvoker<void> pauseAllPuppetThreadsCReq();
|
||||
co::ViralNonPostingInvoker<void> resumeAllPuppetThreadsCReq();
|
||||
co::ViralNonPostingInvoker<void> exitAllPuppetThreadsCReq();
|
||||
|
||||
// CPU distribution method
|
||||
void distributeAndPinThreadsAcrossCpus();
|
||||
@@ -71,8 +64,6 @@ protected:
|
||||
|
||||
private:
|
||||
co::ViralNonPostingInvoker<void> allPuppetThreadsLifetimeOpCReq(
|
||||
std::exception_ptr &exceptionPtr,
|
||||
std::function<void()> callback,
|
||||
PuppetThread::ThreadOp threadOp,
|
||||
std::string_view emptyThreadsLogMessage);
|
||||
};
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
#ifndef SHARED_RESOURCE_GROUP_H
|
||||
#define SHARED_RESOURCE_GROUP_H
|
||||
|
||||
#include <string>
|
||||
|
||||
namespace sscl {
|
||||
|
||||
template <typename LockType, typename ResourceType>
|
||||
@@ -8,6 +10,11 @@ class SharedResourceGroup
|
||||
{
|
||||
public:
|
||||
SharedResourceGroup() = default;
|
||||
|
||||
explicit SharedResourceGroup(const std::string& lockName)
|
||||
: lock(lockName)
|
||||
{}
|
||||
|
||||
~SharedResourceGroup() = default;
|
||||
|
||||
LockType lock;
|
||||
|
||||
@@ -63,9 +63,7 @@ void PuppetApplication::addAllPuppetLifetimeInvokersToGroup(
|
||||
}
|
||||
|
||||
co::ViralNonPostingInvoker<void>
|
||||
PuppetApplication::joltAllPuppetThreadsCReq(
|
||||
[[maybe_unused]] std::exception_ptr &exceptionPtr,
|
||||
[[maybe_unused]] std::function<void()> callback)
|
||||
PuppetApplication::joltAllPuppetThreadsCReq()
|
||||
{
|
||||
if (threadsHaveBeenJolted)
|
||||
{
|
||||
@@ -94,8 +92,6 @@ PuppetApplication::joltAllPuppetThreadsCReq(
|
||||
|
||||
co::ViralNonPostingInvoker<void>
|
||||
PuppetApplication::allPuppetThreadsLifetimeOpCReq(
|
||||
[[maybe_unused]] std::exception_ptr &exceptionPtr,
|
||||
[[maybe_unused]] std::function<void()> callback,
|
||||
PuppetThread::ThreadOp threadOp,
|
||||
std::string_view emptyThreadsLogMessage)
|
||||
{
|
||||
@@ -116,39 +112,31 @@ PuppetApplication::allPuppetThreadsLifetimeOpCReq(
|
||||
}
|
||||
|
||||
co::ViralNonPostingInvoker<void>
|
||||
PuppetApplication::startAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr, std::function<void()> callback)
|
||||
PuppetApplication::startAllPuppetThreadsCReq()
|
||||
{
|
||||
return allPuppetThreadsLifetimeOpCReq(
|
||||
exceptionPtr, std::move(callback),
|
||||
PuppetThread::ThreadOp::START,
|
||||
noPuppetThreadsToStartLogMessage);
|
||||
}
|
||||
|
||||
co::ViralNonPostingInvoker<void>
|
||||
PuppetApplication::pauseAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr, std::function<void()> callback)
|
||||
PuppetApplication::pauseAllPuppetThreadsCReq()
|
||||
{
|
||||
return allPuppetThreadsLifetimeOpCReq(
|
||||
exceptionPtr, std::move(callback),
|
||||
PuppetThread::ThreadOp::PAUSE,
|
||||
noPuppetThreadsToPauseLogMessage);
|
||||
}
|
||||
|
||||
co::ViralNonPostingInvoker<void>
|
||||
PuppetApplication::resumeAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr, std::function<void()> callback)
|
||||
PuppetApplication::resumeAllPuppetThreadsCReq()
|
||||
{
|
||||
return allPuppetThreadsLifetimeOpCReq(
|
||||
exceptionPtr, std::move(callback),
|
||||
PuppetThread::ThreadOp::RESUME,
|
||||
noPuppetThreadsToResumeLogMessage);
|
||||
}
|
||||
|
||||
co::ViralNonPostingInvoker<void>
|
||||
PuppetApplication::exitAllPuppetThreadsCReq(
|
||||
std::exception_ptr &exceptionPtr,
|
||||
std::function<void()> callback)
|
||||
PuppetApplication::exitAllPuppetThreadsCReq()
|
||||
{
|
||||
if (componentThreads.empty())
|
||||
{
|
||||
@@ -157,7 +145,6 @@ PuppetApplication::exitAllPuppetThreadsCReq(
|
||||
}
|
||||
|
||||
co_await allPuppetThreadsLifetimeOpCReq(
|
||||
exceptionPtr, std::move(callback),
|
||||
PuppetThread::ThreadOp::EXIT,
|
||||
noPuppetThreadsToExitLogMessage);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user