Files
salmanoff/include/serializedAsynchronousContinuation.h
T
hayodea 092a0954a0 Locking: Add basic reactive deadlock detection foundation
We added a timestamp to each Lockvoker so that we can detect when
a lockvoker has been in a qutex for "too long", where "too long"
is defined arbitrarily as 500ms.

Next we're going to change the way we create callbacks to enable
us to more explicitly access the sh_ptr<AsyncContin> via
the callback object.
2025-09-22 20:45:36 -04:00

210 lines
6.3 KiB
C++

#ifndef SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
#define SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
#include <config.h>
#include <functional>
#include <memory>
#include <atomic>
#include <chrono>
#include <iostream>
#include <componentThread.h>
#include <lockSet.h>
#include <asynchronousContinuation.h>
#include <lockerAndInvokerBase.h>
namespace smo {
template <class OriginalCbFnT>
class SerializedAsynchronousContinuation
: public PostedAsynchronousContinuation<OriginalCbFnT>
{
public:
SerializedAsynchronousContinuation(
const std::shared_ptr<ComponentThread> &caller,
OriginalCbFnT originalCbFn,
std::vector<std::reference_wrapper<SpinLock>> requiredLocks = {})
: PostedAsynchronousContinuation<OriginalCbFnT>(caller, originalCbFn),
requiredLocks(*this, std::move(requiredLocks))
{}
template<typename... Args>
void callOriginalCb(Args&&... args)
{
requiredLocks.release();
PostedAsynchronousContinuation<OriginalCbFnT>::callOriginalCb(
std::forward<Args>(args)...);
}
public:
LockSet<OriginalCbFnT> requiredLocks;
std::atomic<bool> isAwakeOrBeingAwakened{false};
/**
* @brief LockerAndInvoker - Template class for lockvoking mechanism
*
* This class wraps a std::bind result and provides locking functionality.
* When locks cannot be acquired, the object re-posts itself to the io_service
* queue, implementing the "spinqueueing" pattern.
*/
template <class InvocationTargetT>
class LockerAndInvoker
: public LockerAndInvokerBase
{
public:
/**
* @brief Constructor that immediately posts to io_service
* @param serializedContinuation Reference to the serialized continuation
* containing LockSet and target io_service
* @param target The ComponentThread whose io_service to post to
* @param invocationTarget The std::bind result to invoke when locks are acquired
*/
LockerAndInvoker(
SerializedAsynchronousContinuation<OriginalCbFnT>
&serializedContinuation,
const std::shared_ptr<ComponentThread>& target,
InvocationTargetT invocationTarget)
: LockerAndInvokerBase(&serializedContinuation),
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
creationTimestamp(std::chrono::steady_clock::now()),
#endif
serializedContinuation(serializedContinuation),
target(target),
invocationTarget(std::move(invocationTarget))
{
firstWake();
}
/**
* @brief Function call operator - tries to acquire locks and either
* invokes the target or returns (already registered in qutex queues)
*/
void operator()()
{
if (ComponentThread::getSelf() != target)
{
throw std::runtime_error(
"LockerAndInvoker::operator(): Thread safety violation - "
"executing on wrong ComponentThread");
}
Qutex *firstFailedQutexPtr = nullptr;
bool deadlockLikely = isDeadlockLikely();
if (!serializedContinuation.requiredLocks.tryAcquireOrBackOff(
*this, (deadlockLikely ? &firstFailedQutexPtr : nullptr)))
{
// Just allow this lockvoker to be dropped from its io_service.
allowAwakening();
if (!deadlockLikely)
{ return; }
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
handleLikelyDeadlock(*firstFailedQutexPtr);
#endif
return;
}
/** EXPLANATION:
* Successfully acquired all locks, so unregister from qutex queues.
* We do this here so that we can free up queue slots in the qutex
* queues for other lockvokers that may be waiting to acquire the
* locks. The size of the qutex queues does matter for other
* contending lockvokers; and so also does their position in the
* queues.
*
* The alternative is to leave ourself in the queues until we
* eventually release all locks; and given that we may hold locks
* even across true async hardware bottlenecks, this could take a
* long time.
*
* Granted, the fact that we own the locks means that even though
* we've removed ourselves from the queues, other lockvokers still
* can't acquire the locks anyway.
*/
serializedContinuation.requiredLocks.unregisterFromQutexQueues();
invocationTarget();
}
/**
* @brief Get the iterator for this lockvoker in the specified Qutex's queue
* @param qutex The Qutex to get the iterator for
* @return Iterator pointing to this lockvoker in the Qutex's queue
*/
LockerAndInvokerBase::List::iterator
getLockvokerIteratorForQutex(Qutex& qutex) override
{
return serializedContinuation.requiredLocks.getLockUsageDesc(
qutex).second;
}
/**
* @brief Awaken this lockvoker by posting it to its io_service
* @param forceAwaken If true, post even if already awake
*/
void awaken(bool forceAwaken = false) override
{
bool prevVal = serializedContinuation.isAwakeOrBeingAwakened
.exchange(true);
if (prevVal == true && !forceAwaken)
{ return; }
target->getIoService().post(*this);
}
private:
// Allow awakening by resetting the awake flag
void allowAwakening()
{ serializedContinuation.isAwakeOrBeingAwakened.store(false); }
/**
* @brief First wake - register in queues and awaken
*
* Sets isAwake=true before calling awaken with forceAwaken to ensure
* that none of the locks we just registered with awaken()s a duplicate
* copy of this lockvoker on the io_service.
*/
void firstWake()
{
serializedContinuation.isAwakeOrBeingAwakened.store(true);
serializedContinuation.requiredLocks.registerInQutexQueues(*this);
// Force awaken since we just set the flag above
awaken(true);
}
// Has CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS elapsed since creation?
bool isDeadlockLikely() const
{
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
auto now = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
now - creationTimestamp);
return elapsed.count() >= CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS;
#else
return false;
#endif
}
/**
* @brief Handle a likely deadlock situation by logging debug information
* @param firstFailedQutex The first qutex that failed acquisition
*/
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
void handleLikelyDeadlock(Qutex& firstFailedQutex);
#endif
private:
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
std::chrono::steady_clock::time_point creationTimestamp;
#endif
SerializedAsynchronousContinuation<OriginalCbFnT>
&serializedContinuation;
InvocationTargetT invocationTarget;
std::shared_ptr<ComponentThread> target;
};
};
} // namespace smo
#endif // SERIALIZED_ASYNCHRONOUS_CONTINUATION_H