Initial commit of libspinscale library

This commit is contained in:
2025-12-28 03:54:22 -04:00
parent 6dbab54f50
commit 3f3ff1283f
29 changed files with 3875 additions and 0 deletions

View File

@@ -0,0 +1,25 @@
#ifndef BOOST_ASIO_LINKAGE_FIX_H
#define BOOST_ASIO_LINKAGE_FIX_H
#include <boost/asio/detail/call_stack.hpp>
#include <boost/asio/detail/thread_context.hpp>
#include <boost/asio/detail/tss_ptr.hpp>
namespace boost {
namespace asio {
namespace detail {
/** EXPLANATION:
* Extern declaration of the template instantiation
* This ensures that the .o translation units don't have their
* own copies of `call_stack<>::top_` defined in them.
*/
extern template
tss_ptr<call_stack<thread_context, thread_info_base>::context>
call_stack<thread_context, thread_info_base>::top_;
} // namespace detail
} // namespace asio
} // namespace boost
#endif // BOOST_ASIO_LINKAGE_FIX_H

11
include/config.h.in Normal file
View File

@@ -0,0 +1,11 @@
#ifndef _CONFIG_H
#define _CONFIG_H
/* Debug locking configuration */
#cmakedefine CONFIG_ENABLE_DEBUG_LOCKS
#cmakedefine CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS @DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS@
/* Debug callable tracing configuration */
#cmakedefine CONFIG_DEBUG_TRACE_CALLABLES
#endif /* _CONFIG_H */

View File

@@ -0,0 +1,58 @@
#ifndef ASYNCHRONOUS_BRIDGE_H
#define ASYNCHRONOUS_BRIDGE_H
#include <boostAsioLinkageFix.h>
#include <atomic>
#include <boost/asio/io_service.hpp>
namespace sscl {
class AsynchronousBridge
{
public:
AsynchronousBridge(boost::asio::io_service &io_service)
: isAsyncOperationComplete(false), io_service(io_service)
{}
void setAsyncOperationComplete(void)
{
/** EXPLANATION:
* This empty post()ed message is necessary to ensure that the thread
* that's waiting on the io_service is signaled to wake up and check
* the io_service's queue.
*/
isAsyncOperationComplete.store(true);
io_service.post([]{});
}
void waitForAsyncOperationCompleteOrIoServiceStopped(void)
{
for (;;)
{
io_service.run_one();
if (isAsyncOperationComplete.load() || io_service.stopped())
{ break; }
/** EXPLANATION:
* In the mrntt and mind thread loops we call checkException() after
* run() returns, but we don't have to do that here because
* setException() calls stop.
*
* So if an exception is set on our thread, we'll break out of this
* loop due to the check for stopped() above, and that'll take us
* back out to the main loop, where we'll catch the exception.
*/
}
}
bool exitedBecauseIoServiceStopped(void) const
{ return io_service.stopped(); }
private:
std::atomic<bool> isAsyncOperationComplete;
boost::asio::io_service &io_service;
};
} // namespace sscl
#endif // ASYNCHRONOUS_BRIDGE_H

View File

@@ -0,0 +1,158 @@
#ifndef ASYNCHRONOUS_CONTINUATION_H
#define ASYNCHRONOUS_CONTINUATION_H
#include <functional>
#include <memory>
#include <exception>
#include <spinscale/componentThread.h>
#include <spinscale/callback.h>
#include <spinscale/callableTracer.h>
#include <spinscale/asynchronousContinuationChainLink.h>
namespace sscl {
/**
* AsynchronousContinuation - Template base class for async sequence management
*
* This template provides a common pattern for managing asynchronous operations
* that need to maintain object lifetime through a sequence of callbacks.
*
* The template parameter OriginalCbFnT represents the signature of the original
* callback that will be invoked when the async sequence completes.
*/
template <class OriginalCbFnT>
class AsynchronousContinuation
: public AsynchronousContinuationChainLink
{
public:
explicit AsynchronousContinuation(Callback<OriginalCbFnT> originalCb)
: originalCallback(std::move(originalCb))
{}
/** EXPLANATION:
* Each numbered segmented sequence persists the lifetime of the
* continuation object by taking a copy of its shared_ptr.
*/
typedef void (SegmentFn)(
std::shared_ptr<AsynchronousContinuation<OriginalCbFnT>>
lifetimePreservingConveyance);
/** EXPLANATION:
* When an exception is thrown in a an async callee, which pertains to an
* error in the data given by the caller, we ought not to throw the
* exception within the callee. Instead, we should store the exception
* in the continuation object and return it to the caller.
*
* The caller should then call checkException() to rethrow it on its
* own stack.
*
* This macro should be used by the caller to bubble the exception to the
* caller.
*/
#define CALLEE_SETEXC(continuation, type, exc_obj) \
(continuation)->exception = std::make_exception_ptr<type>(exc_obj)
#define CALLEE_SETEXC_CALLCB(continuation, type, exc_obj) \
do { \
CALLEE_SETEXC(continuation, type, exc_obj); \
(continuation)->callOriginalCb(); \
} while(0)
#define CALLEE_SETEXC_CALLCB_RET(continuation, type, exc_obj) \
do { \
CALLEE_SETEXC_CALLCB(continuation, type, exc_obj); \
return; \
} while(0)
// Call this in the caller to rethrow the exception.
void checkException()
{
if (exception)
{ std::rethrow_exception(exception); }
}
// Implement the virtual method from AsynchronousContinuationChainLink
virtual std::shared_ptr<AsynchronousContinuationChainLink>
getCallersContinuationShPtr() const override
{ return originalCallback.callerContinuation; }
public:
Callback<OriginalCbFnT> originalCallback;
std::exception_ptr exception;
};
/**
* NonPostedAsynchronousContinuation - For continuations that don't post
* callbacks
*
* Note: We intentionally do not create a
* LockedNonPostedAsynchronousContinuation because the only way to implement
* non-posted locking would be via busy-spinning or sleeplocks. This would
* eliminate the throughput advantage from our Qspinning mechanism, which
* relies on re-posting to the io_service queue when locks are unavailable.
*/
template <class OriginalCbFnT>
class NonPostedAsynchronousContinuation
: public AsynchronousContinuation<OriginalCbFnT>
{
public:
explicit NonPostedAsynchronousContinuation(
Callback<OriginalCbFnT> originalCb)
: AsynchronousContinuation<OriginalCbFnT>(originalCb)
{}
/**
* @brief Call the original callback with perfect forwarding
* (immediate execution)
*
* This implementation calls the original callback immediately without
* posting to any thread or queue. Used for non-posted continuations.
*
* @param args Arguments to forward to the original callback
*/
template<typename... Args>
void callOriginalCb(Args&&... args)
{
if (AsynchronousContinuation<OriginalCbFnT>::originalCallback
.callbackFn)
{
AsynchronousContinuation<OriginalCbFnT>::originalCallback
.callbackFn(std::forward<Args>(args)...);
}
}
};
template <class OriginalCbFnT>
class PostedAsynchronousContinuation
: public AsynchronousContinuation<OriginalCbFnT>
{
public:
PostedAsynchronousContinuation(
const std::shared_ptr<ComponentThread> &caller,
Callback<OriginalCbFnT> originalCbFn)
: AsynchronousContinuation<OriginalCbFnT>(originalCbFn),
caller(caller)
{}
template<typename... Args>
void callOriginalCb(Args&&... args)
{
if (AsynchronousContinuation<OriginalCbFnT>::originalCallback
.callbackFn)
{
caller->getIoService().post(
STC(std::bind(
AsynchronousContinuation<OriginalCbFnT>::originalCallback
.callbackFn,
std::forward<Args>(args)...)));
}
}
public:
std::shared_ptr<ComponentThread> caller;
};
} // namespace sscl
#endif // ASYNCHRONOUS_CONTINUATION_H

View File

@@ -0,0 +1,32 @@
#ifndef ASYNCHRONOUS_CONTINUATION_CHAIN_LINK_H
#define ASYNCHRONOUS_CONTINUATION_CHAIN_LINK_H
#include <memory>
namespace sscl {
/**
* @brief Base class for all asynchronous continuation chain links
*
* This non-template base class provides type erasure for the continuation
* chain, allowing RTTI and dynamic casting when walking the chain.
*
* The chain walking logic can use dynamic_cast to determine the most
* derived type and perform appropriate operations.
*
* Inherits from enable_shared_from_this to allow objects to obtain a
* shared_ptr to themselves, which is useful for gridlock detection tracking.
*/
class AsynchronousContinuationChainLink
: public std::enable_shared_from_this<AsynchronousContinuationChainLink>
{
public:
virtual ~AsynchronousContinuationChainLink() = default;
virtual std::shared_ptr<AsynchronousContinuationChainLink>
getCallersContinuationShPtr() const = 0;
};
} // namespace sscl
#endif // ASYNCHRONOUS_CONTINUATION_CHAIN_LINK_H

View File

@@ -0,0 +1,69 @@
#ifndef ASYNCHRONOUS_LOOP_H
#define ASYNCHRONOUS_LOOP_H
#include <atomic>
namespace sscl {
class AsynchronousLoop
{
public:
AsynchronousLoop(
const unsigned int nTotal,
unsigned int nSucceeded=0, unsigned int nFailed=0)
: nTotal(nTotal), nSucceeded(nSucceeded), nFailed(nFailed)
{}
AsynchronousLoop(const AsynchronousLoop& other)
: nTotal(other.nTotal),
nSucceeded(other.nSucceeded.load()), nFailed(other.nFailed.load())
{}
AsynchronousLoop& operator=(const AsynchronousLoop& other)
{
if (this != &other)
{
nTotal = other.nTotal;
nSucceeded.store(other.nSucceeded.load());
nFailed.store(other.nFailed.load());
}
return *this;
}
bool isComplete(void) const
{
return nSucceeded + nFailed == nTotal;
}
void incrementSuccessOrFailureDueTo(bool success)
{
if (success)
{ ++nSucceeded; }
else
{ ++nFailed; }
}
bool incrementSuccessOrFailureAndTestForCompletionDueTo(bool success)
{
incrementSuccessOrFailureDueTo(success);
return isComplete();
}
bool nTotalIsZero(void) const
{
return nTotal == 0;
}
void setRemainingIterationsToFailure()
{
nFailed.store(nTotal - nSucceeded.load());
}
public:
unsigned int nTotal;
std::atomic<unsigned int> nSucceeded, nFailed;
};
} // namespace sscl
#endif // ASYNCHRONOUS_LOOP_H

View File

@@ -0,0 +1,149 @@
#ifndef SPINSCALE_CALLABLE_TRACER_H
#define SPINSCALE_CALLABLE_TRACER_H
#include <config.h>
#include <string>
#include <functional>
#include <iostream>
#include <cstdint>
#include <spinscale/componentThread.h>
// Forward declaration - OptionParser is defined in smocore/include/opts.h
// If you need tracing, include opts.h before including this header
// The code will check for OPTS_H define to see if opts.h has been included
class OptionParser;
namespace sscl {
/**
* @brief CallableTracer - Wraps callables with metadata for debugging
*
* This class wraps any callable object with metadata (caller function name,
* line number, and return addresses) to help debug cases where callables
* posted to boost::asio::io_service have gone out of scope. The metadata
* can be accessed from the callable's address when debugging.
*/
class CallableTracer
{
public:
/**
* @brief Constructor that wraps a callable with metadata
* @param callerFuncName The name of the function that created this callable
* @param callerLine The line number where this callable was created
* @param returnAddr0 The return address of the direct caller
* @param returnAddr1 The return address of the caller before that
* @param callable The callable object to wrap
*/
template<typename CallableT>
explicit CallableTracer(
const char* callerFuncName,
int callerLine,
void* returnAddr0,
void* returnAddr1,
CallableT&& callable)
: callerFuncName(callerFuncName),
callerLine(callerLine),
returnAddr0(returnAddr0),
returnAddr1(returnAddr1),
callable(std::forward<CallableT>(callable))
{}
void operator()()
{
// OptionParser::getOptions() requires opts.h to be included
// Only check traceCallables if opts.h has been included (OPTS_H is defined)
#ifdef CONFIG_DEBUG_TRACE_CALLABLES
#ifdef OPTS_H
if (OptionParser::getOptions().traceCallables)
{
std::cout << "" << __func__ << ": On thread "
<< (ComponentThread::tlsInitialized()
? ComponentThread::getSelf()->name : "<TLS un-init'ed>")
<< ": Calling callable posted by:\n"
<< "\t" << callerFuncName << "\n\tat line " << (int)callerLine
<< " return addr 0: " << returnAddr0
<< ", return addr 1: " << returnAddr1
<< std::endl;
}
#endif
#endif
callable();
}
public:
/// Name of the function that created this callable
std::string callerFuncName;
/// Line number where this callable was created
int callerLine;
/// Return address of the direct caller
void* returnAddr0;
/// Return address of the caller before that
void* returnAddr1;
private:
/// The wrapped callable (type-erased using std::function)
std::function<void()> callable;
};
} // namespace sscl
/**
* @brief STC - SMO Traceable Callable macro
*
* When CONFIG_DEBUG_TRACE_CALLABLES is defined, wraps the callable with
* CallableTracer to store metadata (caller function name, line number,
* and return addresses). When not defined, returns the callable directly
* with no overhead.
*
* Uses compiler-specific macros to get fully qualified function names:
* - GCC/Clang: __PRETTY_FUNCTION__ (includes full signature with namespace/class)
* - MSVC: __FUNCSIG__ (includes full signature)
* - Fallback: __func__ (unqualified function name only)
*
* Uses compiler-specific builtins to get return addresses:
* - GCC/Clang: __builtin_return_address(0) and __builtin_return_address(1)
* - MSVC: _ReturnAddress() (only one level available)
* - Fallback: nullptr for return addresses
*
* Usage:
* thread->getIoService().post(
* STC(std::bind(&SomeClass::method, this, arg1, arg2)));
*/
#ifdef CONFIG_DEBUG_TRACE_CALLABLES
#if defined(__GNUC__) || defined(__clang__)
// GCC/Clang: __PRETTY_FUNCTION__ gives full signature
// e.g., "void smo::SomeClass::method(int, int)"
// __builtin_return_address(0) = direct caller
// __builtin_return_address(1) = caller before that
#define STC(arg) smo::CallableTracer( \
__PRETTY_FUNCTION__, \
__LINE__, \
__builtin_return_address(0), \
__builtin_return_address(1), \
arg)
#elif defined(_MSC_VER)
// MSVC: __FUNCSIG__ gives full signature
// e.g., "void __cdecl smo::SomeClass::method(int, int)"
// _ReturnAddress() = direct caller (only one level available)
#include <intrin.h>
#define STC(arg) smo::CallableTracer( \
__FUNCSIG__, \
__LINE__, \
_ReturnAddress(), \
nullptr, \
arg)
#else
// Fallback to standard __func__ (unqualified name only)
// No return address support
#define STC(arg) smo::CallableTracer( \
__func__, \
__LINE__, \
nullptr, \
nullptr, \
arg)
#endif
#else
#define STC(arg) arg
#endif
#endif // SPINSCALE_CALLABLE_TRACER_H

View File

@@ -0,0 +1,31 @@
#ifndef SPINSCALE_CALLBACK_H
#define SPINSCALE_CALLBACK_H
#include <memory>
namespace sscl {
// Forward declaration
class AsynchronousContinuationChainLink;
/**
* @brief Callback class that wraps a function and its caller continuation
*
* This class provides a way to pass both a callback function and the
* caller's continuation in a single object, enabling deadlock detection
* by walking the chain of continuations.
*
* Usage: Callback<CbFnT>{context, std::bind(...)}
*/
template<typename CbFnT>
class Callback
{
public:
// Aggregate initialization allows: Callback<CbFnT>{context, std::bind(...)}
std::shared_ptr<AsynchronousContinuationChainLink> callerContinuation;
CbFnT callbackFn;
};
} // namespace sscl
#endif // SPINSCALE_CALLBACK_H

View File

@@ -0,0 +1,41 @@
#ifndef COMPONENT_H
#define COMPONENT_H
#include <config.h>
#include <memory>
#include <functional>
#include <spinscale/callback.h>
#include <spinscale/puppetApplication.h>
namespace sscl {
class ComponentThread;
class Component
{
public:
Component(const std::shared_ptr<ComponentThread> &thread);
~Component() = default;
public:
std::shared_ptr<ComponentThread> thread;
public:
};
class PuppetComponent
: public Component
{
public:
PuppetComponent(
PuppetApplication &parent,
const std::shared_ptr<ComponentThread> &thread);
~PuppetComponent() = default;
public:
PuppetApplication &parent;
};
} // namespace sscl
#endif // COMPONENT_H

View File

@@ -0,0 +1,167 @@
#ifndef COMPONENT_THREAD_H
#define COMPONENT_THREAD_H
#include <boostAsioLinkageFix.h>
#include <atomic>
#include <thread>
#include <unordered_map>
#include <boost/asio/io_service.hpp>
#include <stdexcept>
#include <queue>
#include <functional>
#include <pthread.h>
#include <sched.h>
#include <unistd.h>
#include <memory>
#include <spinscale/callback.h>
#include <cstdint>
#include <string>
namespace sscl {
class MarionetteThread;
class PuppetThread;
// ThreadId is a generic type - application-specific enums should be defined elsewhere
typedef uint8_t ThreadId;
class ComponentThread
{
protected:
ComponentThread(ThreadId _id)
: id(_id), name(getThreadName(_id)),
work(io_service)
{}
public:
virtual ~ComponentThread() = default;
// getThreadName implementation is provided by application code
static std::string getThreadName(ThreadId id);
void cleanup(void);
boost::asio::io_service& getIoService(void) { return io_service; }
static const std::shared_ptr<ComponentThread> getSelf(void);
static bool tlsInitialized(void);
static std::shared_ptr<MarionetteThread> getMrntt();
typedef void (mainFn)(ComponentThread &self);
// CPU management methods
static int getAvailableCpuCount();
typedef std::function<void()> mindShutdownIndOpCbFn;
// Intentionally doesn't take a callback.
void exceptionInd(const std::shared_ptr<ComponentThread> &faultyThread);
// Intentionally doesn't take a callback.
void userShutdownInd();
public:
ThreadId id;
std::string name;
boost::asio::io_service io_service;
boost::asio::io_service::work work;
std::atomic<bool> keepLooping;
};
class MarionetteThread
: public std::enable_shared_from_this<MarionetteThread>,
public ComponentThread
{
public:
MarionetteThread(ThreadId id = 0)
: ComponentThread(id),
thread(main, std::ref(*this))
{
}
static void main(MarionetteThread& self);
void initializeTls(void);
public:
std::thread thread;
};
class PuppetThread
: public std::enable_shared_from_this<PuppetThread>,
public ComponentThread
{
public:
enum class ThreadOp
{
START,
PAUSE,
RESUME,
EXIT,
JOLT,
N_ITEMS
};
PuppetThread(ThreadId _id)
: ComponentThread(_id),
pinnedCpuId(-1),
pause_work(pause_io_service),
thread(main, std::ref(*this))
{
}
virtual ~PuppetThread() = default;
static void main(PuppetThread& self);
void initializeTls(void);
// Thread management methods
typedef std::function<void()> threadLifetimeMgmtOpCbFn;
void startThreadReq(Callback<threadLifetimeMgmtOpCbFn> callback);
void exitThreadReq(Callback<threadLifetimeMgmtOpCbFn> callback);
void pauseThreadReq(Callback<threadLifetimeMgmtOpCbFn> callback);
void resumeThreadReq(Callback<threadLifetimeMgmtOpCbFn> callback);
/**
* JOLTs this thread to begin processing after global initialization.
*
* JOLTing is the mechanism that allows threads to enter their main
* event loops and set up TLS vars after all global constructors have
* completed. This prevents race conditions during system startup.
*
* @param selfPtr Shared pointer to this thread (required because TLS
* isn't set up yet, so shared_from_this() can't be used)
* @param callback Callback to invoke when JOLT completes
*/
void joltThreadReq(
const std::shared_ptr<PuppetThread>& selfPtr,
Callback<threadLifetimeMgmtOpCbFn> callback);
// CPU management methods
void pinToCpu(int cpuId);
protected:
/**
* Handle exception - called from main() when an exception occurs.
* Derived classes can override to provide application-specific handling.
*/
virtual void handleException() {}
public:
int pinnedCpuId;
boost::asio::io_service pause_io_service;
boost::asio::io_service::work pause_work;
std::thread thread;
public:
class ThreadLifetimeMgmtOp;
};
namespace mrntt {
extern std::shared_ptr<MarionetteThread> thread;
// Forward declaration for marionette thread ID management
// Must be after sscl namespace so ThreadId is defined
extern ThreadId marionetteThreadId;
void setMarionetteThreadId(ThreadId id);
} // namespace mrntt
}
#endif // COMPONENT_THREAD_H

View File

@@ -0,0 +1,85 @@
#ifndef DEPENDENCY_GRAPH_H
#define DEPENDENCY_GRAPH_H
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <memory>
namespace sscl {
// Forward declarations
class AsynchronousContinuationChainLink;
/**
* @brief DependencyGraph - Represents a directed graph for lock dependency analysis
*
* This graph represents dependencies between continuations (lockvokers) where
* an edge from A to B means that continuation A wants a lock that is held by
* continuation B. This is used to detect circular dependencies (gridlocks).
*/
class DependencyGraph
{
public:
typedef std::shared_ptr<AsynchronousContinuationChainLink> Node;
// Each node maps to a set of nodes it depends on
typedef std::unordered_map<Node, std::unordered_set<Node>> AdjacencyList;
public:
void addNode(const Node& node);
/**
* @brief Add a directed edge from source to target
* @param source The continuation that wants a lock
* @param target The continuation that holds the wanted lock
*/
void addEdge(const Node& source, const Node& target);
/**
* @brief Find all cycles in the graph using DFS
* @return Vector of cycles, where each cycle is a vector of nodes
*/
std::vector<std::vector<Node>> findCycles() const;
/**
* @brief Check if there are any cycles in the graph
* @return true if cycles exist, false otherwise
*/
bool hasCycles() const;
/**
* @brief Get the number of nodes in the graph
* @return Number of nodes
*/
size_t getNodeCount() const;
/**
* @brief Get the adjacency list for debugging
* @return Reference to the adjacency list
*/
const AdjacencyList& getAdjacencyList() const { return adjacencyList; }
private:
/**
* @brief DFS helper for cycle detection
* @param node Current node being visited
* @param visited Set of nodes that have been fully processed
* @param recursionStack Set of nodes currently in the recursion stack
* @param path Current path being explored
* @param cycles Vector to store found cycles
*/
void dfsCycleDetection(
const Node& node,
std::unordered_set<Node>& visited,
std::unordered_set<Node>& recursionStack,
std::vector<Node>& path,
std::vector<std::vector<Node>>& cycles)
const;
private:
AdjacencyList adjacencyList;
};
} // namespace sscl
#endif // DEPENDENCY_GRAPH_H

260
include/spinscale/lockSet.h Normal file
View File

@@ -0,0 +1,260 @@
#ifndef LOCK_SET_H
#define LOCK_SET_H
#include <vector>
#include <stdexcept>
#include <utility>
#include <memory>
#include <optional>
#include <spinscale/qutex.h>
#include <spinscale/lockerAndInvokerBase.h>
namespace sscl {
// Forward declarations
template <class OriginalCbFnT>
class SerializedAsynchronousContinuation;
class Qutex;
/**
* @brief LockSet - Manages a collection of locks for acquisition/release
*/
template <class OriginalCbFnT>
class LockSet
{
public:
/** EXPLANATION:
* Tracks both the Qutex that must be acquired, as well as the parent
* LockerAndInvoker that this LockSet has registered into that Qutex's
* queue.
*/
struct LockUsageDesc
{
std::reference_wrapper<Qutex> qutex;
typename LockerAndInvokerBase::List::iterator iterator;
bool hasBeenReleased = false;
LockUsageDesc(std::reference_wrapper<Qutex> qutexRef,
typename LockerAndInvokerBase::List::iterator iter)
: qutex(qutexRef), iterator(iter), hasBeenReleased(false) {}
};
typedef std::vector<std::reference_wrapper<Qutex>> Set;
public:
/**
* @brief Constructor
* @param parentContinuation Reference to the parent
* SerializedAsynchronousContinuation
* @param qutexes Vector of Qutex references that must be acquired
*/
LockSet(
SerializedAsynchronousContinuation<OriginalCbFnT> &parentContinuation,
std::vector<std::reference_wrapper<Qutex>> qutexes = {})
: parentContinuation(parentContinuation), allLocksAcquired(false),
registeredInQutexQueues(false)
{
/* Convert Qutex references to LockUsageDesc (iterators will be filled
* in during registration)
*/
locks.reserve(qutexes.size());
for (auto& qutexRef : qutexes)
{
locks.emplace_back(
qutexRef,
typename LockerAndInvokerBase::List::iterator{});
}
}
/**
* @brief Register the LockSet with all its Qutex locks
* @param lockvoker The LockerAndInvoker to register with each Qutex
*
* EXPLANATION:
* I'm not sure an unregisterFromQutexQueues() method is needed.
* Why? Because if an async sequence can't acquire all locks, it will
* simply never leave the qutexQ until it eventually does. The only other
* time it will leave the qutexQ is when the program terminates.
*
* I'm not sure we'll actually cancal all in-flight async sequences --
* and especially not all those that aren't even in any io_service queues.
* To whatever extent these objects get cleaned up, they'll probably be
* cleaned up in the qutexQ's std::list destructor -- and that won't
* execute any fancy cleanup logic. It'll just clear() out the list.
*/
void registerInQutexQueues(
const std::shared_ptr<LockerAndInvokerBase> &lockvoker
)
{
/** EXPLANATION:
* Register the lockvoker with each Qutex and store the returned
* iterator to its place within each Qutex's queue. We store the
* iterator so that we can quickly move the lockvoker around within
* the queue, and eventually, erase() it when we acquire all the
* locks.
*/
for (auto& lockUsageDesc : locks)
{
lockUsageDesc.iterator = lockUsageDesc.qutex.get().registerInQueue(
lockvoker);
}
registeredInQutexQueues = true;
}
void unregisterFromQutexQueues()
{
if (!registeredInQutexQueues)
{
throw std::runtime_error(
std::string(__func__) +
": LockSet::unregisterFromQutexQueues() called but not "
"registered in Qutex queues");
}
// Unregister from all qutex queues
for (auto& lockUsageDesc : locks)
{
auto it = lockUsageDesc.iterator;
lockUsageDesc.qutex.get().unregisterFromQueue(it);
}
}
/**
* @brief Try to acquire all locks in order; back off if acquisition fails
* @param lockvoker The LockerAndInvoker attempting to acquire the locks
* @param firstFailedQutex Output parameter to receive the first Qutex that
* failed acquisition (can be nullptr)
* @return true if all locks were acquired, false otherwise
*/
bool tryAcquireOrBackOff(
LockerAndInvokerBase &lockvoker,
std::optional<std::reference_wrapper<Qutex>> &firstFailedQutex
= std::nullopt
)
{
if (!registeredInQutexQueues)
{
throw std::runtime_error(
std::string(__func__) +
": LockSet::tryAcquireOrBackOff() called but not registered in "
"Qutex queues");
}
if (allLocksAcquired)
{
throw std::runtime_error(
std::string(__func__) +
": LockSet::tryAcquireOrBackOff() called but allLocksAcquired "
"is already true");
}
// Try to acquire all required locks
int nAcquired = 0;
const int nRequiredLocks = static_cast<int>(locks.size());
for (auto& lockUsageDesc : locks)
{
if (!lockUsageDesc.qutex.get().tryAcquire(
lockvoker, nRequiredLocks))
{
// Set the first failed qutex for debugging
firstFailedQutex = std::ref(lockUsageDesc.qutex.get());
break;
}
nAcquired++;
}
if (nAcquired < nRequiredLocks)
{
// Release any locks we managed to acquire
for (int i = 0; i < nAcquired; i++) {
locks[i].qutex.get().backoff(lockvoker, nRequiredLocks);
}
return false;
}
allLocksAcquired = true;
return true;
}
// @brief Release all locks
void release()
{
if (!registeredInQutexQueues)
{
throw std::runtime_error(
std::string(__func__) +
": LockSet::release() called but not registered in Qutex "
"queues");
}
if (!allLocksAcquired)
{
throw std::runtime_error(
std::string(__func__) +
": LockSet::release() called but allLocksAcquired is false");
}
for (auto& lockUsageDesc : locks)
{
if (lockUsageDesc.hasBeenReleased) { continue; }
lockUsageDesc.qutex.get().release();
}
allLocksAcquired = false;
}
const LockUsageDesc &getLockUsageDesc(const Qutex &criterionLock) const
{
for (auto& lockUsageDesc : locks)
{
if (&lockUsageDesc.qutex.get() == &criterionLock) {
return lockUsageDesc;
}
}
// Should never happen if the LockSet is properly constructed
throw std::runtime_error(
std::string(__func__) +
": Qutex not found in this LockSet");
}
/**
* @brief Release a specific qutex early and mark it as released
* @param qutex The qutex to release early
*/
void releaseQutexEarly(Qutex &qutex)
{
if (!allLocksAcquired)
{
throw std::runtime_error(
std::string(__func__) +
": LockSet::releaseQutexEarly() called but allLocksAcquired is false");
}
auto& lockUsageDesc = const_cast<LockUsageDesc&>(
getLockUsageDesc(qutex));
if (!lockUsageDesc.hasBeenReleased)
{
lockUsageDesc.qutex.get().release();
lockUsageDesc.hasBeenReleased = true;
}
return;
}
public:
std::vector<LockUsageDesc> locks;
private:
SerializedAsynchronousContinuation<OriginalCbFnT> &parentContinuation;
bool allLocksAcquired, registeredInQutexQueues;
};
} // namespace sscl
#endif // LOCK_SET_H

View File

@@ -0,0 +1,87 @@
#ifndef LOCKER_AND_INVOKER_BASE_H
#define LOCKER_AND_INVOKER_BASE_H
#include <list>
#include <memory>
namespace sscl {
// Forward declaration
class Qutex;
/**
* @brief LockerAndInvokerBase - Base class for lockvoking mechanism
*
* This base class contains the common functionality needed by Qutex,
* including the serialized continuation reference and comparison operators.
*/
class LockerAndInvokerBase
{
public:
/**
* @brief Constructor
* @param serializedContinuationVaddr Raw pointer to the serialized continuation
*/
explicit LockerAndInvokerBase(const void* serializedContinuationVaddr)
: serializedContinuationVaddr(serializedContinuationVaddr)
{}
/**
* @brief Typedef for list of LockerAndInvokerBase shared pointers
*/
typedef std::list<std::shared_ptr<LockerAndInvokerBase>> List;
/**
* @brief Get the iterator for this lockvoker in the specified Qutex's queue
* @param qutex The Qutex to get the iterator for
* @return Iterator pointing to this lockvoker in the Qutex's queue
*/
virtual List::iterator getLockvokerIteratorForQutex(Qutex& qutex) const = 0;
/**
* @brief Awaken this lockvoker by posting it to its io_service
* @param forceAwaken If true, post even if already awake
*/
virtual void awaken(bool forceAwaken = false) = 0;
/* These two are ued to iterate through the lockset of a Lockvoker in a
* template-erased manner. We use them in the gridlock detection algorithm.
*/
virtual size_t getLockSetSize() const = 0;
virtual Qutex& getLockAt(size_t index) const = 0;
/**
* @brief Equality operator
*
* Compare by the address of the continuation objects. Why?
* Because there's no guarantee that the lockvoker object that was
* passed in by the io_service invocation is the same object as that
* which is in the qutexQs. Especially because we make_shared() a
* copy when registerInQutexQueues()ing.
*
* Generally when we "wake" a lockvoker by enqueuing it, boost's
* io_service::post will copy the lockvoker object.
*/
bool operator==(const LockerAndInvokerBase &other) const
{
return serializedContinuationVaddr == other.serializedContinuationVaddr;
}
/**
* @brief Inequality operator
*/
bool operator!=(const LockerAndInvokerBase &other) const
{
return serializedContinuationVaddr != other.serializedContinuationVaddr;
}
protected:
/* Never let this monstrosity be seen beyond this class's scope.
* Remember what I've taught you, quasi-modo?
*/
const void* serializedContinuationVaddr;
};
} // namespace sscl
#endif // LOCKER_AND_INVOKER_BASE_H

View File

@@ -0,0 +1,58 @@
#ifndef _MARIONETTE_H
#define _MARIONETTE_H
#include <cstdint>
#include <atomic>
#include <memory>
#include <spinscale/component.h>
namespace sscl {
class MarionetteThread;
namespace mrntt {
class MarionetteComponent
: public sscl::Component
{
public:
MarionetteComponent(const std::shared_ptr<sscl::ComponentThread> &thread);
~MarionetteComponent() = default;
public:
typedef std::function<void(bool)> mrnttLifetimeMgmtOpCbFn;
void initializeReq(sscl::Callback<mrnttLifetimeMgmtOpCbFn> callback);
void finalizeReq(sscl::Callback<mrnttLifetimeMgmtOpCbFn> callback);
// Intentionally doesn't take a callback.
void exceptionInd();
private:
class MrnttLifetimeMgmtOp;
class TerminationEvent;
};
extern std::shared_ptr<sscl::MarionetteThread> thread;
extern std::atomic<int> exitCode;
void exitMarionetteLoop();
void marionetteFinalizeReqCb(bool success);
extern MarionetteComponent mrntt;
} // namespace mrntt
struct CrtCommandLineArgs
{
CrtCommandLineArgs(int argc, char *argv[], char *envp[])
: argc(argc), argv(argv), envp(envp)
{}
int argc;
char **argv;
char **envp;
static void set(int argc, char *argv[], char *envp[]);
};
} // namespace sscl
#endif // _MARIONETTE_H

View File

@@ -0,0 +1,68 @@
#ifndef PUPPET_APPLICATION_H
#define PUPPET_APPLICATION_H
#include <config.h>
#include <functional>
#include <memory>
#include <vector>
#include <spinscale/callback.h>
#include <spinscale/componentThread.h>
namespace sscl {
class PuppetApplication
: public std::enable_shared_from_this<PuppetApplication>
{
public:
PuppetApplication(
const std::vector<std::shared_ptr<PuppetThread>> &threads);
~PuppetApplication() = default;
// Thread management methods
typedef std::function<void()> puppetThreadLifetimeMgmtOpCbFn;
void joltAllPuppetThreadsReq(
Callback<puppetThreadLifetimeMgmtOpCbFn> callback);
void startAllPuppetThreadsReq(
Callback<puppetThreadLifetimeMgmtOpCbFn> callback);
void pauseAllPuppetThreadsReq(
Callback<puppetThreadLifetimeMgmtOpCbFn> callback);
void resumeAllPuppetThreadsReq(
Callback<puppetThreadLifetimeMgmtOpCbFn> callback);
void exitAllPuppetThreadsReq(
Callback<puppetThreadLifetimeMgmtOpCbFn> callback);
// CPU distribution method
void distributeAndPinThreadsAcrossCpus();
protected:
// Collection of PuppetThread instances
std::vector<std::shared_ptr<PuppetThread>> componentThreads;
/**
* Indicates whether all puppet threads have been JOLTed at least once.
*
* JOLTing serves two critical purposes:
*
* 1. **Global Constructor Sequencing**: Since pthreads begin executing while
* global constructors are still being executed, globally defined pthreads
* cannot depend on global objects having been constructed. JOLTing is done
* by the CRT's main thread within main(), which provides a sequencing
* guarantee that global constructors have been called.
*
* 2. **shared_from_this Safety**: shared_from_this() requires a prior
* shared_ptr handle to be established. The global list of
* shared_ptr<ComponentThread> guarantees that at least one shared_ptr to
* each ComponentThread has been initialized before JOLTing occurs.
*
* This flag ensures that JOLTing happens exactly once and provides
* a synchronization point for the entire system initialization.
*/
bool threadsHaveBeenJolted = false;
private:
class PuppetThreadLifetimeMgmtOp;
};
} // namespace sscl
#endif // PUPPET_APPLICATION_H

107
include/spinscale/qutex.h Normal file
View File

@@ -0,0 +1,107 @@
#ifndef QUTEX_H
#define QUTEX_H
#include <config.h>
#include <list>
#include <memory>
#include <string>
#include <spinscale/spinLock.h>
#include <spinscale/lockerAndInvokerBase.h>
namespace sscl {
/**
* @brief Qutex - Queue-based mutex for asynchronous lock management
*
* A Qutex combines a spinlock, an ownership flag, and a queue of waiting
* lockvokers to provide efficient asynchronous lock management with
* priority-based acquisition for LockSets.
*/
class Qutex
{
public:
/**
* @brief Constructor
*/
Qutex([[maybe_unused]] const std::string &_name)
:
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
name(_name), currOwner(nullptr),
#endif
isOwned(false)
{}
/**
* @brief Register a lockvoker in the queue
* @param lockvoker The lockvoker to register
* @return Iterator pointing to the registered lockvoker in the queue
*/
LockerAndInvokerBase::List::iterator registerInQueue(
const std::shared_ptr<LockerAndInvokerBase> &lockvoker
)
{
lock.acquire();
auto it = queue.insert(queue.end(), lockvoker);
lock.release();
return it;
}
/**
* @brief Unregister a lockvoker from the queue
* @param it Iterator pointing to the lockvoker to unregister
* @param shouldLock Whether to acquire the spinlock before erasing (default: true)
*/
void unregisterFromQueue(
LockerAndInvokerBase::List::iterator it, bool shouldLock = true
)
{
if (shouldLock)
{
lock.acquire();
queue.erase(it);
lock.release();
}
else {
queue.erase(it);
}
}
/**
* @brief Try to acquire the lock for a lockvoker
* @param tryingLockvoker The lockvoker attempting to acquire the lock
* @param nRequiredLocks Number of locks required by the lockvoker's LockSet
* @return true if the lock was successfully acquired, false otherwise
*/
bool tryAcquire(
const LockerAndInvokerBase &tryingLockvoker, int nRequiredLocks);
/**
* @brief Handle backoff when a lockvoker fails to acquire all required locks
* @param failedAcquirer The lockvoker that failed to acquire all locks
* @param nRequiredLocks Number of locks required by the lockvoker's LockSet
*/
void backoff(const LockerAndInvokerBase &failedAcquirer, int nRequiredLocks);
/**
* @brief Release the lock and wake up the next waiting lockvoker
*/
void release();
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
std::shared_ptr<LockerAndInvokerBase> getCurrOwner() const
{ return currOwner; }
#endif
public:
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
std::string name;
std::shared_ptr<LockerAndInvokerBase> currOwner;
#endif
SpinLock lock;
LockerAndInvokerBase::List queue;
bool isOwned;
};
} // namespace sscl
#endif // QUTEX_H

View File

@@ -0,0 +1,164 @@
#ifndef QUTEX_ACQUISITION_HISTORY_TRACKER_H
#define QUTEX_ACQUISITION_HISTORY_TRACKER_H
#include <unordered_map>
#include <memory>
#include <forward_list>
#include <functional>
#include "spinLock.h"
namespace sscl {
// Forward declarations
class Qutex;
class AsynchronousContinuationChainLink;
class DependencyGraph;
/**
* @brief QutexAcquisitionHistoryTracker - Tracks acquisition history for
* gridlock detection
*
* This class maintains a central acquisition history to track all lockvokers
* suspected of being gridlocked. It stores information about what locks each
* timed-out lockvoker wants and what locks they hold in their continuation
* history.
*/
class QutexAcquisitionHistoryTracker
{
public:
/**
* @brief Type definition for the acquisition history entry
*
* pair.first: The firstFailedQutex that this lockvoker WANTS but can't
* acquire
* pair.second: A unique_ptr to a list of all acquired Qutexes in this
* lockvoker's continuation history
*/
typedef std::pair<
std::reference_wrapper<Qutex>,
std::unique_ptr<std::forward_list<std::reference_wrapper<Qutex>>>
> AcquisitionHistoryEntry;
/**
* @brief Type definition for the acquisition history map
*
* Key: std::shared_ptr<AsynchronousContinuationChainLink>
* (the continuation that contains the timed-out lockvoker)
* Value: AcquisitionHistoryEntry
* (its wanted lock (aka: firstFailedQutex/pair.first) + held locks)
*/
typedef std::unordered_map<
std::shared_ptr<AsynchronousContinuationChainLink>,
AcquisitionHistoryEntry
> AcquisitionHistoryMap;
public:
static QutexAcquisitionHistoryTracker& getInstance()
{
static QutexAcquisitionHistoryTracker instance;
return instance;
}
/**
* @brief Add a continuation to the acquisition history if it doesn't
* already exist
* @param continuation Shared pointer to the
* AsynchronousContinuationChainLink
* @param wantedLock The lock that this continuation wants but can't
* acquire
* @param heldLocks Unique pointer to list of locks held in this
* continuation's history (will be moved)
*/
void addIfNotExists(
std::shared_ptr<AsynchronousContinuationChainLink> &continuation,
Qutex& wantedLock,
std::unique_ptr<std::forward_list<std::reference_wrapper<Qutex>>>
heldLocks
)
{
acquisitionHistoryLock.acquire();
auto it = acquisitionHistory.find(continuation);
// If a continuation already exists, don't add it again
if (it != acquisitionHistory.end())
{
acquisitionHistoryLock.release();
return;
}
acquisitionHistory.emplace(continuation, std::make_pair(
std::ref(wantedLock), std::move(heldLocks)));
acquisitionHistoryLock.release();
}
/**
* @brief Remove a continuation from the acquisition history
*
* @param continuation Shared pointer to the
* AsynchronousContinuationChainLink to remove
* @return true if the continuation was found and removed, false if not found
*/
bool remove(
std::shared_ptr<AsynchronousContinuationChainLink> &continuation
)
{
acquisitionHistoryLock.acquire();
auto it = acquisitionHistory.find(continuation);
if (it != acquisitionHistory.end())
{
acquisitionHistory.erase(it);
acquisitionHistoryLock.release();
return true;
}
acquisitionHistoryLock.release();
return false;
}
bool heuristicallyTraceContinuationHistoryForGridlockOn(
Qutex &firstFailedQutex,
std::shared_ptr<AsynchronousContinuationChainLink>&
currentContinuation);
bool completelyTraceContinuationHistoryForGridlockOn(
Qutex &firstFailedQutex);
/**
* @brief Generates a dependency graph among known continuations, based on
* the currently known acquisition history. There may well be a cyclical
* dependency which hasn't been reported to the history tracker yet.
* @param dontAcquireLock If true, skips acquiring the internal spinlock
* (assumes caller already holds it)
*/
[[nodiscard]] std::unique_ptr<DependencyGraph> generateGraph(
bool dontAcquireLock = false);
// Disable copy constructor and assignment operator
QutexAcquisitionHistoryTracker(
const QutexAcquisitionHistoryTracker&) = delete;
QutexAcquisitionHistoryTracker& operator=(
const QutexAcquisitionHistoryTracker&) = delete;
private:
QutexAcquisitionHistoryTracker() = default;
~QutexAcquisitionHistoryTracker() = default;
private:
/** EXPLANATION:
* We use a SpinLock here instead of a Qutex because this acquisition
* history tracker is invoked within the LockerAndInvoker.
* Since LockerAndInvoker is too tightly coupled with Qutex workings, using
* a Qutex here would create a circular dependency or deadlock situation.
* Therefore, it's best to use a SpinLock on the history class to avoid
* these coupling issues.
*/
SpinLock acquisitionHistoryLock;
AcquisitionHistoryMap acquisitionHistory;
};
} // namespace sscl
#endif // QUTEX_ACQUISITION_HISTORY_TRACKER_H

View File

@@ -0,0 +1,588 @@
#ifndef SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
#define SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
#include <config.h>
#include <memory>
#include <atomic>
#include <chrono>
#include <iostream>
#include <optional>
#include <spinscale/componentThread.h>
#include <spinscale/lockSet.h>
#include <spinscale/asynchronousContinuation.h>
#include <spinscale/lockerAndInvokerBase.h>
#include <spinscale/callback.h>
#include <spinscale/qutexAcquisitionHistoryTracker.h>
namespace sscl {
template <class OriginalCbFnT>
class SerializedAsynchronousContinuation
: public PostedAsynchronousContinuation<OriginalCbFnT>
{
public:
SerializedAsynchronousContinuation(
const std::shared_ptr<ComponentThread> &caller,
Callback<OriginalCbFnT> originalCbFn,
std::vector<std::reference_wrapper<Qutex>> requiredLocks)
: PostedAsynchronousContinuation<OriginalCbFnT>(caller, originalCbFn),
requiredLocks(*this, std::move(requiredLocks))
{}
template<typename... Args>
void callOriginalCb(Args&&... args)
{
requiredLocks.release();
PostedAsynchronousContinuation<OriginalCbFnT>::callOriginalCb(
std::forward<Args>(args)...);
}
// Return list of all qutexes in predecessors' LockSets; excludes self.
[[nodiscard]]
std::unique_ptr<std::forward_list<std::reference_wrapper<Qutex>>>
getAcquiredQutexHistory() const;
/**
* @brief Release a specific qutex early
* @param qutex The qutex to release early
*/
void releaseQutexEarly(Qutex &qutex)
{ requiredLocks.releaseQutexEarly(qutex); }
public:
LockSet<OriginalCbFnT> requiredLocks;
std::atomic<bool> isAwakeOrBeingAwakened{false};
/**
* @brief LockerAndInvoker - Template class for lockvoking mechanism
*
* This class wraps a std::bind result and provides locking functionality.
* When locks cannot be acquired, the object re-posts itself to the io_service
* queue, implementing the "spinqueueing" pattern.
*/
template <class InvocationTargetT>
class LockerAndInvoker
: public LockerAndInvokerBase
{
public:
/**
* @brief Constructor that immediately posts to io_service
* @param serializedContinuation Reference to the serialized continuation
* containing LockSet and target io_service
* @param target The ComponentThread whose io_service to post to
* @param invocationTarget The std::bind result to invoke when locks are acquired
*/
LockerAndInvoker(
SerializedAsynchronousContinuation<OriginalCbFnT>
&serializedContinuation,
const std::shared_ptr<ComponentThread>& target,
InvocationTargetT invocationTarget)
: LockerAndInvokerBase(&serializedContinuation),
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
creationTimestamp(std::chrono::steady_clock::now()),
#endif
serializedContinuation(serializedContinuation),
target(target),
invocationTarget(std::move(invocationTarget))
{
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
std::optional<std::reference_wrapper<Qutex>> firstDuplicatedQutex =
traceContinuationHistoryForDeadlock();
if (firstDuplicatedQutex.has_value())
{
handleDeadlock(firstDuplicatedQutex.value().get());
throw std::runtime_error(
"LockerAndInvoker::LockerAndInvoker(): Deadlock detected");
}
#endif // CONFIG_ENABLE_DEBUG_LOCKS
firstWake();
}
/**
* @brief Function call operator - tries to acquire locks and either
* invokes the target or returns (already registered in qutex queues)
*/
void operator()();
/**
* @brief Get the iterator for this lockvoker in the specified Qutex's queue
* @param qutex The Qutex to get the iterator for
* @return Iterator pointing to this lockvoker in the Qutex's queue
*/
LockerAndInvokerBase::List::iterator
getLockvokerIteratorForQutex(Qutex& qutex) const override
{
return serializedContinuation.requiredLocks.getLockUsageDesc(
qutex).iterator;
}
/**
* @brief Awaken this lockvoker by posting it to its io_service
* @param forceAwaken If true, post even if already awake
*/
void awaken(bool forceAwaken = false) override
{
bool prevVal = serializedContinuation.isAwakeOrBeingAwakened
.exchange(true);
if (prevVal == true && !forceAwaken)
{ return; }
target->getIoService().post(*this);
}
size_t getLockSetSize() const override
{ return serializedContinuation.requiredLocks.locks.size(); }
Qutex& getLockAt(size_t index) const override
{
return serializedContinuation.requiredLocks.locks[index]
.qutex.get();
}
private:
// Allow awakening by resetting the awake flag
void allowAwakening()
{ serializedContinuation.isAwakeOrBeingAwakened.store(false); }
/** EXPLANATION:
* We create a copy of the Lockvoker and then give sh_ptrs to that
* *COPY*, to each Qutex's internal queue. This enables us to keep
* the AsyncContinuation sh_ptr (which the Lockvoker contains within
* itself) alive without wasting too much memory.
*
* This way the io_service objects can remove the lockvoker from
* their queues and there'll be a copy of the lockvoker in each
* Qutex's queue.
*
* For non-serialized, posted continuations, they won't be removed
* from the io_service queue until they're executed, so there's no
* need to create copies of them. Lockvokers are removed from their
* io_service, potentially without being executed if they fail to
* acquire all locks.
*/
void registerInLockSet()
{
auto sharedLockvoker = std::make_shared<
LockerAndInvoker<InvocationTargetT>>(*this);
serializedContinuation.requiredLocks.registerInQutexQueues(
sharedLockvoker);
}
/**
* @brief First wake - register in queues and awaken
*
* Sets isAwake=true before calling awaken with forceAwaken to ensure
* that none of the locks we just registered with awaken()s a duplicate
* copy of this lockvoker on the io_service.
*/
void firstWake()
{
serializedContinuation.isAwakeOrBeingAwakened.store(true);
registerInLockSet();
// Force awaken since we just set the flag above
awaken(true);
}
// Has CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS elapsed since creation?
bool isDeadlockLikely() const
{
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
auto now = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
now - creationTimestamp);
return elapsed.count() >= CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS;
#else
return false;
#endif
}
// Wrapper around isDeadlockLikely for gridlock detection
bool isGridlockLikely() const
{ return isDeadlockLikely(); }
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
struct obsolete {
bool traceContinuationHistoryForGridlockOn(Qutex &firstFailedQutex);
};
bool traceContinuationHistoryForDeadlockOn(Qutex &firstFailedQutex);
std::optional<std::reference_wrapper<Qutex>>
traceContinuationHistoryForDeadlock(void)
{
for (auto& lockUsageDesc
: serializedContinuation.requiredLocks.locks)
{
if (traceContinuationHistoryForDeadlockOn(
lockUsageDesc.qutex.get()))
{
return std::ref(lockUsageDesc.qutex.get());
}
}
return std::nullopt;
}
/**
* @brief Handle a likely deadlock situation by logging debug information
* @param firstFailedQutex The first qutex that failed acquisition
*/
void handleDeadlock(const Qutex &firstFailedQutex)
{
std::cerr << __func__ << ": Deadlock: "
<< "Lockvoker has been waiting for "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - this->creationTimestamp)
.count()
<< "ms, failed on qutex @" << &firstFailedQutex
<< " (" << firstFailedQutex.name << ")" << std::endl;
}
void handleGridlock(const Qutex &firstFailedQutex)
{
std::cerr << __func__ << ": Gridlock: "
<< "Lockvoker has been waiting for "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - this->creationTimestamp)
.count()
<< "ms, failed on qutex @" << &firstFailedQutex
<< " (" << firstFailedQutex.name << ")" << std::endl;
}
#endif
private:
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
std::chrono::steady_clock::time_point creationTimestamp;
#endif
SerializedAsynchronousContinuation<OriginalCbFnT>
&serializedContinuation;
std::shared_ptr<ComponentThread> target;
InvocationTargetT invocationTarget;
};
};
/******************************************************************************/
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
template <class OriginalCbFnT>
std::unique_ptr<std::forward_list<std::reference_wrapper<Qutex>>>
SerializedAsynchronousContinuation<OriginalCbFnT>::getAcquiredQutexHistory()
const
{
auto heldLocks = std::make_unique<
std::forward_list<std::reference_wrapper<Qutex>>>();
/** EXPLANATION:
* Walk through the continuation chain to collect all acquired locks
*
* We don't add the current continuation's locks because it's the one
* failing to acquire locks and backing off. So we start from the previous
* continuation.
*/
for (std::shared_ptr<AsynchronousContinuationChainLink> currContin =
this->getCallersContinuationShPtr();
currContin != nullptr;
currContin = currContin->getCallersContinuationShPtr())
{
auto serializedCont = std::dynamic_pointer_cast<
SerializedAsynchronousContinuation<OriginalCbFnT>>(currContin);
if (serializedCont == nullptr) { continue; }
// Add this continuation's locks to the held locks list
for (size_t i = 0; i < serializedCont->requiredLocks.locks.size(); ++i)
{
heldLocks->push_front(serializedCont->requiredLocks.locks[i].qutex);
}
}
return heldLocks;
}
template <class OriginalCbFnT>
template <class InvocationTargetT>
bool
SerializedAsynchronousContinuation<OriginalCbFnT>
::LockerAndInvoker<InvocationTargetT>
::traceContinuationHistoryForDeadlockOn(Qutex& firstFailedQutex)
{
/** EXPLANATION:
* In this function we will trace through the chain of continuations that
* led up to this Lockvoker's continuation. For each continuation which is
* a SerializedAsynchronousContinuation, we check through its LockSet to see
* if it contains the lock that failed acquisition. If it does, we have a
* deadlock.
*/
/* We can't start with the continuation directly referenced by this starting
* Lockvoker as it would contain the all locks we're currently trying to
* acquire...and rightly so because it's the continuation for this current
* lockvoker.
*/
for (std::shared_ptr<AsynchronousContinuationChainLink> currContin =
this->serializedContinuation.getCallersContinuationShPtr();
currContin != nullptr;
currContin = currContin->getCallersContinuationShPtr())
{
auto serializedCont = std::dynamic_pointer_cast<
SerializedAsynchronousContinuation<OriginalCbFnT>>(currContin);
if (serializedCont == nullptr) { continue; }
// Check if the firstFailedQutex is in this continuation's LockSet
try {
serializedCont->requiredLocks.getLockUsageDesc(firstFailedQutex);
} catch (const std::runtime_error& e) {
std::cerr << __func__ << ": " << e.what() << std::endl;
continue;
}
std::cout << __func__ << ":Deadlock detected: Found "
<< "firstFailedQutex @" << &firstFailedQutex
<< " (" << firstFailedQutex.name << ") in LockSet of "
<< "SerializedAsynchronousContinuation @"
<< serializedCont.get() << std::endl;
return true;
}
return false;
}
template <class OriginalCbFnT>
template <class InvocationTargetT>
bool
SerializedAsynchronousContinuation<OriginalCbFnT>
::LockerAndInvoker<InvocationTargetT>
::obsolete::traceContinuationHistoryForGridlockOn(Qutex &firstFailedQutex)
{
/** EXPLANATION:
* In this function we check for gridlocks which are slightly different
* from deadlocks. In a gridlock, two requests are waiting for locks that
* are held by the other. I.e:
*
* R1 holds LockA and is waiting for LockB.
* R2 holds LockB and is waiting for LockA.
*
* This differs from deadlocks because it's not a single request which is
* attempting to re-acquire a lock that it already holds.
*
* To detect this condition, we wait until the acquisition timeout has
* expired. Then: we extract the current owner of the first lock we're
* failing to acquire.
*
* From there, we go through each of the locks in the foreign owner's
* current (i.e: immediate, most recent continuation's) required LockSet.
* For each of the locks in the foreign owner's most immediate required
* LockSet, we trace backward in our *OWN* history to see if any of *OUR*
* continuations (excluding our most immediate continuation) contains that
* lock.
*
* If we find a match, that means that we're holding a lock that the foreign
* owner is waiting for. And we already know that the foreign owner is
* holding a lock that we're waiting for (when we extracted the current
* owner of the first failed lock in our most immediate Lockset).
*
* Hence, we have a gridlock.
*/
std::shared_ptr<LockerAndInvokerBase> foreignOwnerShPtr =
firstFailedQutex.getCurrOwner();
// If no current owner, can't be a gridlock
if (foreignOwnerShPtr == nullptr)
{ return false; }
// Use reference for the rest of the function for safety.
LockerAndInvokerBase &foreignOwner = *foreignOwnerShPtr;
/* For each lock in the foreign owner's LockSet, check if we hold it
* in any of our previous continuations (excluding our most immediate one)
*/
for (size_t i = 0; i < foreignOwner.getLockSetSize(); ++i)
{
Qutex& foreignLock = foreignOwner.getLockAt(i);
/* Skip the firstFailedQutex since we already know the foreign owner
* holds it -- hence it's impossible for any of our previous
* continuations to hold it.
*/
if (&foreignLock == &firstFailedQutex)
{ continue; }
/** EXPLANATION:
* Trace backward through our continuation history (excluding our most
* immediate continuation).
*
* The reason we exclude our most immediate continuation is because the
* LockSet acquisition algorithm backs off if it fails to acquire ALL
* locks in the set. So if the lock that the foreign owner is waiting
* for is in our most immediate continuation, and NOT in one of our
* previous continuations, then we will back off and the foreign owner
* should eventually be able to acquire that lock.
*/
for (std::shared_ptr<AsynchronousContinuationChainLink> currContin =
this->serializedContinuation.getCallersContinuationShPtr();
currContin != nullptr;
currContin = currContin->getCallersContinuationShPtr())
{
auto serializedCont = std::dynamic_pointer_cast<
SerializedAsynchronousContinuation<OriginalCbFnT>>(currContin);
if (serializedCont == nullptr) { continue; }
// Check if this continuation holds the foreign lock
try {
const auto& lockUsageDesc = serializedCont->requiredLocks
.getLockUsageDesc(foreignLock);
// Matched! We hold a lock that the foreign owner is waiting for
std::cout << __func__ << ": Gridlock detected: We hold lock @"
<< &foreignLock << " (" << foreignLock.name << ") in "
"continuation @" << serializedCont.get()
<< ", while foreign owner @" << &foreignOwner
<< " holds lock @" << &firstFailedQutex << " ("
<< firstFailedQutex.name << ") that we're waiting for"
<< std::endl;
return true;
} catch (const std::runtime_error& e) {
// This continuation doesn't hold the foreign lock. Continue.
continue;
}
}
}
return false;
}
#endif // CONFIG_ENABLE_DEBUG_LOCKS
template <class OriginalCbFnT>
template <class InvocationTargetT>
void SerializedAsynchronousContinuation<OriginalCbFnT>
::LockerAndInvoker<InvocationTargetT>::operator()()
{
if (ComponentThread::getSelf() != target)
{
throw std::runtime_error(
"LockerAndInvoker::operator(): Thread safety violation - "
"executing on wrong ComponentThread");
}
std::optional<std::reference_wrapper<Qutex>> firstFailedQutexRet;
bool deadlockLikely = isDeadlockLikely();
bool gridlockLikely = isGridlockLikely();
if (!serializedContinuation.requiredLocks.tryAcquireOrBackOff(
*this, firstFailedQutexRet))
{
// Just allow this lockvoker to be dropped from its io_service.
allowAwakening();
if (!deadlockLikely && !gridlockLikely)
{ return; }
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
Qutex &firstFailedQutex = firstFailedQutexRet.value().get();
bool isDeadlock = traceContinuationHistoryForDeadlockOn(
firstFailedQutex);
bool gridlockIsHeuristicallyLikely = false;
bool gridlockIsAlgorithmicallyLikely = false;
if (gridlockLikely)
{
auto& tracker = QutexAcquisitionHistoryTracker
::getInstance();
auto heldLocks = serializedContinuation
.getAcquiredQutexHistory();
// Add this continuation to the tracker
auto currentContinuationShPtr = serializedContinuation
.shared_from_this();
tracker.addIfNotExists(
currentContinuationShPtr,
firstFailedQutex, std::move(heldLocks));
gridlockIsHeuristicallyLikely = tracker
.heuristicallyTraceContinuationHistoryForGridlockOn(
firstFailedQutex, currentContinuationShPtr);
if (gridlockIsHeuristicallyLikely)
{
gridlockIsAlgorithmicallyLikely = tracker
.completelyTraceContinuationHistoryForGridlockOn(
firstFailedQutex);
}
}
bool isGridlock = (gridlockIsHeuristicallyLikely
|| gridlockIsAlgorithmicallyLikely);
if (!isDeadlock && !isGridlock)
{ return; }
if (isDeadlock) { handleDeadlock(firstFailedQutex); }
if (isGridlock) { handleGridlock(firstFailedQutex); }
#endif
return;
}
/** EXPLANATION:
* Successfully acquired all locks, so unregister from qutex queues.
* We do this here so that we can free up queue slots in the qutex
* queues for other lockvokers that may be waiting to acquire the
* locks. The size of the qutex queues does matter for other
* contending lockvokers; and so also does their position in the
* queues.
*
* The alternative is to leave ourself in the queues until we
* eventually release all locks; and given that we may hold locks
* even across true async hardware bottlenecks, this could take a
* long time.
*
* Granted, the fact that we own the locks means that even though
* we've removed ourselves from the queues, other lockvokers still
* can't acquire the locks anyway.
*/
serializedContinuation.requiredLocks.unregisterFromQutexQueues();
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
/** EXPLANATION:
* If we were being tracked for gridlock detection but successfully
* acquired all locks, it was a false positive due to timed delay,
* long-running operation, or I/O delay
*/
if (gridlockLikely)
{
std::shared_ptr<AsynchronousContinuationChainLink>
currentContinuationShPtr =
serializedContinuation.shared_from_this();
bool removed = QutexAcquisitionHistoryTracker::getInstance()
.remove(currentContinuationShPtr);
if (removed)
{
std::cerr
<< "LockerAndInvoker::operator(): False positive "
"gridlock detection - continuation @"
<< &serializedContinuation
<< " was being tracked but successfully acquired all "
"locks. This was likely due to timed delay, "
"long-running operation, or I/O delay."
<< std::endl;
}
}
#endif
invocationTarget();
}
} // namespace sscl
#endif // SERIALIZED_ASYNCHRONOUS_CONTINUATION_H

View File

@@ -0,0 +1,121 @@
#ifndef SPIN_LOCK_H
#define SPIN_LOCK_H
#include <atomic>
#ifdef __x86_64__
#include <immintrin.h>
#elif defined(__i386__)
#include <xmmintrin.h>
#elif defined(__arm__)
#include <arm_neon.h>
#elif defined(__aarch64__)
#include <arm_neon.h>
#elif defined(__aarch32__)
#include <arm_neon.h>
#endif
namespace sscl {
/**
* @brief Simple spinlock using std::atomic
*/
class SpinLock
{
public:
SpinLock()
: locked(false)
{}
bool tryAcquire()
{
bool expected = false;
return locked.compare_exchange_strong(expected, true);
}
inline void spinPause()
{
#ifdef __x86_64__
_mm_pause();
#elif defined(__i386__)
_mm_pause();
#elif defined(__arm__)
__asm__ volatile("yield");
#elif defined(__aarch64__)
__asm__ volatile("yield");
#elif defined(__aarch32__)
__asm__ volatile("yield");
#else
# error "Unsupported architecture"
#endif
}
void acquire()
{
while (!tryAcquire())
{
/** EXPLANATION:
* Busy-wait: keep trying to acquire the lock
* The CPU will spin here until the lock becomes available
*
* The spinPause() function is architecture-specific and is
* essential because I once fried an older Intel M-class laptop CPU
* when I forgot to include a PAUSE instruction in a for (;;){}
* loop. I'm not interested in frying my RPi or my other testbed
* robot boards.
*/
spinPause();
}
}
void release()
{
locked.store(false);
}
/**
* @brief RAII guard for SpinLock
* Locks the spinlock on construction and unlocks on destruction
*/
class Guard
{
public:
explicit Guard(SpinLock& lock)
: lock_(lock), unlocked_(false)
{
lock_.acquire();
}
~Guard()
{
if (!unlocked_) {
lock_.release();
}
}
void unlockPrematurely()
{
if (!unlocked_)
{
lock_.release();
unlocked_ = true;
}
}
// Non-copyable, non-movable
Guard(const Guard&) = delete;
Guard& operator=(const Guard&) = delete;
Guard(Guard&&) = delete;
Guard& operator=(Guard&&) = delete;
private:
SpinLock& lock_;
bool unlocked_;
};
private:
std::atomic<bool> locked;
};
} // namespace sscl
#endif // SPIN_LOCK_H