mirror of
https://github.com/latentPrion/libspinscale.git
synced 2026-06-23 19:48:32 +00:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ca2cccaa9c | |||
| a14d622eaf | |||
| 16e0350245 | |||
| 5f265567d1 |
@@ -6,6 +6,7 @@
|
|||||||
#include <coroutine>
|
#include <coroutine>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
|
#include <string>
|
||||||
#include <type_traits>
|
#include <type_traits>
|
||||||
|
|
||||||
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
#ifdef CONFIG_LIBSSCL_DEBUG_CO
|
||||||
@@ -28,6 +29,15 @@ public:
|
|||||||
class ReleaseHandle;
|
class ReleaseHandle;
|
||||||
|
|
||||||
CoQutex() noexcept = default;
|
CoQutex() noexcept = default;
|
||||||
|
|
||||||
|
CoQutex([[maybe_unused]] const std::string &_name) noexcept
|
||||||
|
:
|
||||||
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
||||||
|
name(_name),
|
||||||
|
#endif
|
||||||
|
isOwned(false)
|
||||||
|
{}
|
||||||
|
|
||||||
CoQutex(const CoQutex &) = delete;
|
CoQutex(const CoQutex &) = delete;
|
||||||
CoQutex(CoQutex &&) noexcept = delete;
|
CoQutex(CoQutex &&) noexcept = delete;
|
||||||
CoQutex &operator=(const CoQutex &) = delete;
|
CoQutex &operator=(const CoQutex &) = delete;
|
||||||
@@ -77,7 +87,13 @@ public:
|
|||||||
std::cout << __func__ << ": " << std::this_thread::get_id() << " Walking caller promise chain.\n";
|
std::cout << __func__ << ": " << std::this_thread::get_id() << " Walking caller promise chain.\n";
|
||||||
#endif
|
#endif
|
||||||
if (link.holdsAcquiredLock(coQutex)) {
|
if (link.holdsAcquiredLock(coQutex)) {
|
||||||
throw std::runtime_error("Deadlock detected: CoQutex re-acquire on caller promise chain.");
|
std::string message =
|
||||||
|
"Deadlock detected: CoQutex re-acquire on caller promise chain";
|
||||||
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
||||||
|
message += " (" + coQutex.name + ")";
|
||||||
|
#endif
|
||||||
|
message += ".";
|
||||||
|
throw std::runtime_error(message);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -130,6 +146,9 @@ private:
|
|||||||
waitingCoroutines.pop_front();
|
waitingCoroutines.pop_front();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
||||||
|
std::string name;
|
||||||
|
#endif
|
||||||
sscl::SpinLock spinLock;
|
sscl::SpinLock spinLock;
|
||||||
bool isOwned = false;
|
bool isOwned = false;
|
||||||
std::deque<AcquireInvocationAndSuspensionPolicy::WaitingCoroutine> waitingCoroutines;
|
std::deque<AcquireInvocationAndSuspensionPolicy::WaitingCoroutine> waitingCoroutines;
|
||||||
|
|||||||
@@ -15,6 +15,21 @@ class Qutex;
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief LockSet - Manages a collection of locks for acquisition/release
|
* @brief LockSet - Manages a collection of locks for acquisition/release
|
||||||
|
*
|
||||||
|
* LockSet exists only because the CPS re-enqueuing model had no way to acquire
|
||||||
|
* locks in a fine-grained way. A LockerAndInvoker could re-post only the entire
|
||||||
|
* continuation, and only before that continuation began executing; there was no
|
||||||
|
* mechanism to re-enqueue individual segments within a continuation. The
|
||||||
|
* practical consequence was that all required Qutexes had to be acquired at
|
||||||
|
* once up front, before the continuation body could run at all.
|
||||||
|
*
|
||||||
|
* releaseQutexEarly() was a partial workaround for finer-grained control, but
|
||||||
|
* it only helped on the release side and did not solve the fundamental problem
|
||||||
|
* of acquiring locks one-at-a-time mid-sequence.
|
||||||
|
*
|
||||||
|
* co::CoQutex supersedes this abstraction: coroutines can co_await individual
|
||||||
|
* locks at the points where they are actually needed, which is the finer control
|
||||||
|
* LockSet and releaseQutexEarly() were aiming for with limited success.
|
||||||
*/
|
*/
|
||||||
class LockSet
|
class LockSet
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -0,0 +1,29 @@
|
|||||||
|
#ifndef MULTI_OPERATION_RESULT_SET_H
|
||||||
|
#define MULTI_OPERATION_RESULT_SET_H
|
||||||
|
|
||||||
|
namespace sscl {
|
||||||
|
|
||||||
|
/** Plain aggregate for fan-out / fan-in results returned from coroutines. */
|
||||||
|
struct MultiOperationResultSet
|
||||||
|
{
|
||||||
|
MultiOperationResultSet(
|
||||||
|
unsigned int total = 0,
|
||||||
|
unsigned int succeeded = 0,
|
||||||
|
unsigned int failed = 0)
|
||||||
|
: nTotal(total), nSucceeded(succeeded), nFailed(failed)
|
||||||
|
{}
|
||||||
|
|
||||||
|
bool isComplete() const
|
||||||
|
{ return nSucceeded + nFailed == nTotal; }
|
||||||
|
|
||||||
|
bool nTotalIsZero() const
|
||||||
|
{ return nTotal == 0; }
|
||||||
|
|
||||||
|
unsigned int nTotal;
|
||||||
|
unsigned int nSucceeded;
|
||||||
|
unsigned int nFailed;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace sscl
|
||||||
|
|
||||||
|
#endif // MULTI_OPERATION_RESULT_SET_H
|
||||||
@@ -2,8 +2,6 @@
|
|||||||
#define PUPPET_APPLICATION_H
|
#define PUPPET_APPLICATION_H
|
||||||
|
|
||||||
#include <config.h>
|
#include <config.h>
|
||||||
#include <exception>
|
|
||||||
#include <functional>
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <string_view>
|
#include <string_view>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
@@ -22,16 +20,11 @@ public:
|
|||||||
const std::vector<std::shared_ptr<PuppetThread>> &threads);
|
const std::vector<std::shared_ptr<PuppetThread>> &threads);
|
||||||
~PuppetApplication() = default;
|
~PuppetApplication() = default;
|
||||||
|
|
||||||
co::ViralNonPostingInvoker<void> joltAllPuppetThreadsCReq(
|
co::ViralNonPostingInvoker<void> joltAllPuppetThreadsCReq();
|
||||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
co::ViralNonPostingInvoker<void> startAllPuppetThreadsCReq();
|
||||||
co::ViralNonPostingInvoker<void> startAllPuppetThreadsCReq(
|
co::ViralNonPostingInvoker<void> pauseAllPuppetThreadsCReq();
|
||||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
co::ViralNonPostingInvoker<void> resumeAllPuppetThreadsCReq();
|
||||||
co::ViralNonPostingInvoker<void> pauseAllPuppetThreadsCReq(
|
co::ViralNonPostingInvoker<void> exitAllPuppetThreadsCReq();
|
||||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
|
||||||
co::ViralNonPostingInvoker<void> resumeAllPuppetThreadsCReq(
|
|
||||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
|
||||||
co::ViralNonPostingInvoker<void> exitAllPuppetThreadsCReq(
|
|
||||||
std::exception_ptr &exceptionPtr, std::function<void()> callback);
|
|
||||||
|
|
||||||
// CPU distribution method
|
// CPU distribution method
|
||||||
void distributeAndPinThreadsAcrossCpus();
|
void distributeAndPinThreadsAcrossCpus();
|
||||||
@@ -71,8 +64,6 @@ protected:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
co::ViralNonPostingInvoker<void> allPuppetThreadsLifetimeOpCReq(
|
co::ViralNonPostingInvoker<void> allPuppetThreadsLifetimeOpCReq(
|
||||||
std::exception_ptr &exceptionPtr,
|
|
||||||
std::function<void()> callback,
|
|
||||||
PuppetThread::ThreadOp threadOp,
|
PuppetThread::ThreadOp threadOp,
|
||||||
std::string_view emptyThreadsLogMessage);
|
std::string_view emptyThreadsLogMessage);
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
#ifndef SHARED_RESOURCE_GROUP_H
|
#ifndef SHARED_RESOURCE_GROUP_H
|
||||||
#define SHARED_RESOURCE_GROUP_H
|
#define SHARED_RESOURCE_GROUP_H
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
|
||||||
namespace sscl {
|
namespace sscl {
|
||||||
|
|
||||||
template <typename LockType, typename ResourceType>
|
template <typename LockType, typename ResourceType>
|
||||||
@@ -8,6 +10,11 @@ class SharedResourceGroup
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
SharedResourceGroup() = default;
|
SharedResourceGroup() = default;
|
||||||
|
|
||||||
|
explicit SharedResourceGroup(const std::string& lockName)
|
||||||
|
: lock(lockName)
|
||||||
|
{}
|
||||||
|
|
||||||
~SharedResourceGroup() = default;
|
~SharedResourceGroup() = default;
|
||||||
|
|
||||||
LockType lock;
|
LockType lock;
|
||||||
|
|||||||
@@ -63,9 +63,7 @@ void PuppetApplication::addAllPuppetLifetimeInvokersToGroup(
|
|||||||
}
|
}
|
||||||
|
|
||||||
co::ViralNonPostingInvoker<void>
|
co::ViralNonPostingInvoker<void>
|
||||||
PuppetApplication::joltAllPuppetThreadsCReq(
|
PuppetApplication::joltAllPuppetThreadsCReq()
|
||||||
[[maybe_unused]] std::exception_ptr &exceptionPtr,
|
|
||||||
[[maybe_unused]] std::function<void()> callback)
|
|
||||||
{
|
{
|
||||||
if (threadsHaveBeenJolted)
|
if (threadsHaveBeenJolted)
|
||||||
{
|
{
|
||||||
@@ -94,8 +92,6 @@ PuppetApplication::joltAllPuppetThreadsCReq(
|
|||||||
|
|
||||||
co::ViralNonPostingInvoker<void>
|
co::ViralNonPostingInvoker<void>
|
||||||
PuppetApplication::allPuppetThreadsLifetimeOpCReq(
|
PuppetApplication::allPuppetThreadsLifetimeOpCReq(
|
||||||
[[maybe_unused]] std::exception_ptr &exceptionPtr,
|
|
||||||
[[maybe_unused]] std::function<void()> callback,
|
|
||||||
PuppetThread::ThreadOp threadOp,
|
PuppetThread::ThreadOp threadOp,
|
||||||
std::string_view emptyThreadsLogMessage)
|
std::string_view emptyThreadsLogMessage)
|
||||||
{
|
{
|
||||||
@@ -116,39 +112,31 @@ PuppetApplication::allPuppetThreadsLifetimeOpCReq(
|
|||||||
}
|
}
|
||||||
|
|
||||||
co::ViralNonPostingInvoker<void>
|
co::ViralNonPostingInvoker<void>
|
||||||
PuppetApplication::startAllPuppetThreadsCReq(
|
PuppetApplication::startAllPuppetThreadsCReq()
|
||||||
std::exception_ptr &exceptionPtr, std::function<void()> callback)
|
|
||||||
{
|
{
|
||||||
return allPuppetThreadsLifetimeOpCReq(
|
return allPuppetThreadsLifetimeOpCReq(
|
||||||
exceptionPtr, std::move(callback),
|
|
||||||
PuppetThread::ThreadOp::START,
|
PuppetThread::ThreadOp::START,
|
||||||
noPuppetThreadsToStartLogMessage);
|
noPuppetThreadsToStartLogMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
co::ViralNonPostingInvoker<void>
|
co::ViralNonPostingInvoker<void>
|
||||||
PuppetApplication::pauseAllPuppetThreadsCReq(
|
PuppetApplication::pauseAllPuppetThreadsCReq()
|
||||||
std::exception_ptr &exceptionPtr, std::function<void()> callback)
|
|
||||||
{
|
{
|
||||||
return allPuppetThreadsLifetimeOpCReq(
|
return allPuppetThreadsLifetimeOpCReq(
|
||||||
exceptionPtr, std::move(callback),
|
|
||||||
PuppetThread::ThreadOp::PAUSE,
|
PuppetThread::ThreadOp::PAUSE,
|
||||||
noPuppetThreadsToPauseLogMessage);
|
noPuppetThreadsToPauseLogMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
co::ViralNonPostingInvoker<void>
|
co::ViralNonPostingInvoker<void>
|
||||||
PuppetApplication::resumeAllPuppetThreadsCReq(
|
PuppetApplication::resumeAllPuppetThreadsCReq()
|
||||||
std::exception_ptr &exceptionPtr, std::function<void()> callback)
|
|
||||||
{
|
{
|
||||||
return allPuppetThreadsLifetimeOpCReq(
|
return allPuppetThreadsLifetimeOpCReq(
|
||||||
exceptionPtr, std::move(callback),
|
|
||||||
PuppetThread::ThreadOp::RESUME,
|
PuppetThread::ThreadOp::RESUME,
|
||||||
noPuppetThreadsToResumeLogMessage);
|
noPuppetThreadsToResumeLogMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
co::ViralNonPostingInvoker<void>
|
co::ViralNonPostingInvoker<void>
|
||||||
PuppetApplication::exitAllPuppetThreadsCReq(
|
PuppetApplication::exitAllPuppetThreadsCReq()
|
||||||
std::exception_ptr &exceptionPtr,
|
|
||||||
std::function<void()> callback)
|
|
||||||
{
|
{
|
||||||
if (componentThreads.empty())
|
if (componentThreads.empty())
|
||||||
{
|
{
|
||||||
@@ -157,7 +145,6 @@ PuppetApplication::exitAllPuppetThreadsCReq(
|
|||||||
}
|
}
|
||||||
|
|
||||||
co_await allPuppetThreadsLifetimeOpCReq(
|
co_await allPuppetThreadsLifetimeOpCReq(
|
||||||
exceptionPtr, std::move(callback),
|
|
||||||
PuppetThread::ThreadOp::EXIT,
|
PuppetThread::ThreadOp::EXIT,
|
||||||
noPuppetThreadsToExitLogMessage);
|
noPuppetThreadsToExitLogMessage);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user