352 lines
10 KiB
C++
352 lines
10 KiB
C++
#ifndef SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
|
|
#define SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
|
|
|
|
#include <config.h>
|
|
#include <functional>
|
|
#include <memory>
|
|
#include <atomic>
|
|
#include <chrono>
|
|
#include <iostream>
|
|
#include <optional>
|
|
#include <componentThread.h>
|
|
#include <lockSet.h>
|
|
#include <asynchronousContinuation.h>
|
|
#include <lockerAndInvokerBase.h>
|
|
#include <callback.h>
|
|
#include <qutexAcquisitionHistoryTracker.h>
|
|
|
|
namespace smo {
|
|
|
|
template <class OriginalCbFnT>
|
|
class SerializedAsynchronousContinuation
|
|
: public PostedAsynchronousContinuation<OriginalCbFnT>
|
|
{
|
|
public:
|
|
SerializedAsynchronousContinuation(
|
|
const std::shared_ptr<ComponentThread> &caller,
|
|
Callback<OriginalCbFnT> originalCbFn,
|
|
std::vector<std::reference_wrapper<Qutex>> requiredLocks = {})
|
|
: PostedAsynchronousContinuation<OriginalCbFnT>(caller, originalCbFn),
|
|
requiredLocks(*this, std::move(requiredLocks))
|
|
{}
|
|
|
|
template<typename... Args>
|
|
void callOriginalCb(Args&&... args)
|
|
{
|
|
requiredLocks.release();
|
|
PostedAsynchronousContinuation<OriginalCbFnT>::callOriginalCb(
|
|
std::forward<Args>(args)...);
|
|
}
|
|
|
|
// Return list of all qutexes in predecessors' LockSets; excludes self.
|
|
[[nodiscard]]
|
|
std::unique_ptr<std::forward_list<std::reference_wrapper<Qutex>>>
|
|
getAcquiredQutexHistory() const;
|
|
|
|
public:
|
|
LockSet<OriginalCbFnT> requiredLocks;
|
|
std::atomic<bool> isAwakeOrBeingAwakened{false};
|
|
|
|
/**
|
|
* @brief LockerAndInvoker - Template class for lockvoking mechanism
|
|
*
|
|
* This class wraps a std::bind result and provides locking functionality.
|
|
* When locks cannot be acquired, the object re-posts itself to the io_service
|
|
* queue, implementing the "spinqueueing" pattern.
|
|
*/
|
|
template <class InvocationTargetT>
|
|
class LockerAndInvoker
|
|
: public LockerAndInvokerBase
|
|
{
|
|
public:
|
|
/**
|
|
* @brief Constructor that immediately posts to io_service
|
|
* @param serializedContinuation Reference to the serialized continuation
|
|
* containing LockSet and target io_service
|
|
* @param target The ComponentThread whose io_service to post to
|
|
* @param invocationTarget The std::bind result to invoke when locks are acquired
|
|
*/
|
|
LockerAndInvoker(
|
|
SerializedAsynchronousContinuation<OriginalCbFnT>
|
|
&serializedContinuation,
|
|
const std::shared_ptr<ComponentThread>& target,
|
|
InvocationTargetT invocationTarget)
|
|
: LockerAndInvokerBase(&serializedContinuation),
|
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
|
creationTimestamp(std::chrono::steady_clock::now()),
|
|
#endif
|
|
serializedContinuation(serializedContinuation),
|
|
target(target),
|
|
invocationTarget(std::move(invocationTarget))
|
|
{
|
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
|
std::optional<std::reference_wrapper<Qutex>> firstFailedQutex =
|
|
traceContinuationHistoryForDeadlock();
|
|
|
|
if (firstFailedQutex.has_value())
|
|
{
|
|
handleDeadlock(firstFailedQutex.value().get());
|
|
throw std::runtime_error(
|
|
"LockerAndInvoker::LockerAndInvoker(): Deadlock detected");
|
|
}
|
|
#endif // CONFIG_ENABLE_DEBUG_LOCKS
|
|
|
|
firstWake();
|
|
}
|
|
|
|
/**
|
|
* @brief Function call operator - tries to acquire locks and either
|
|
* invokes the target or returns (already registered in qutex queues)
|
|
*/
|
|
void operator()()
|
|
{
|
|
if (ComponentThread::getSelf() != target)
|
|
{
|
|
throw std::runtime_error(
|
|
"LockerAndInvoker::operator(): Thread safety violation - "
|
|
"executing on wrong ComponentThread");
|
|
}
|
|
|
|
Qutex *firstFailedQutexPtr = nullptr;
|
|
bool deadlockLikely = isDeadlockLikely();
|
|
bool gridlockLikely = isGridlockLikely();
|
|
|
|
if (!serializedContinuation.requiredLocks.tryAcquireOrBackOff(
|
|
*this,
|
|
((deadlockLikely || gridlockLikely)
|
|
? &firstFailedQutexPtr : nullptr)))
|
|
{
|
|
// Just allow this lockvoker to be dropped from its io_service.
|
|
allowAwakening();
|
|
if (!deadlockLikely && !gridlockLikely)
|
|
{ return; }
|
|
|
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
|
Qutex &firstFailedQutex = *firstFailedQutexPtr;
|
|
bool isDeadlock = traceContinuationHistoryForDeadlockOn(
|
|
firstFailedQutex);
|
|
|
|
bool gridlockIsHeuristicallyLikely = false;
|
|
bool gridlockIsAlgorithmicallyLikely = false;
|
|
|
|
if (gridlockLikely)
|
|
{
|
|
auto& tracker = QutexAcquisitionHistoryTracker
|
|
::getInstance();
|
|
|
|
auto heldLocks = serializedContinuation
|
|
.getAcquiredQutexHistory();
|
|
|
|
// Add this continuation to the tracker
|
|
auto currentContinuationShPtr = serializedContinuation
|
|
.shared_from_this();
|
|
|
|
tracker.addIfNotExists(
|
|
currentContinuationShPtr,
|
|
firstFailedQutex, std::move(heldLocks));
|
|
|
|
gridlockIsHeuristicallyLikely = tracker
|
|
.heuristicallyTraceContinuationHistoryForGridlockOn(
|
|
firstFailedQutex, currentContinuationShPtr);
|
|
|
|
if (gridlockIsHeuristicallyLikely)
|
|
{
|
|
gridlockIsAlgorithmicallyLikely = tracker
|
|
.completelyTraceContinuationHistoryForGridlockOn(
|
|
firstFailedQutex);
|
|
}
|
|
}
|
|
|
|
bool isGridlock = (gridlockIsHeuristicallyLikely
|
|
|| gridlockIsAlgorithmicallyLikely);
|
|
|
|
if (!isDeadlock && !isGridlock)
|
|
{ return; }
|
|
|
|
if (isDeadlock) { handleDeadlock(firstFailedQutex); }
|
|
if (isGridlock) { handleGridlock(firstFailedQutex); }
|
|
#endif
|
|
return;
|
|
}
|
|
|
|
/** EXPLANATION:
|
|
* Successfully acquired all locks, so unregister from qutex queues.
|
|
* We do this here so that we can free up queue slots in the qutex
|
|
* queues for other lockvokers that may be waiting to acquire the
|
|
* locks. The size of the qutex queues does matter for other
|
|
* contending lockvokers; and so also does their position in the
|
|
* queues.
|
|
*
|
|
* The alternative is to leave ourself in the queues until we
|
|
* eventually release all locks; and given that we may hold locks
|
|
* even across true async hardware bottlenecks, this could take a
|
|
* long time.
|
|
*
|
|
* Granted, the fact that we own the locks means that even though
|
|
* we've removed ourselves from the queues, other lockvokers still
|
|
* can't acquire the locks anyway.
|
|
*/
|
|
serializedContinuation.requiredLocks.unregisterFromQutexQueues();
|
|
|
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
|
/** EXPLANATION:
|
|
* If we were being tracked for gridlock detection but successfully
|
|
* acquired all locks, it was a false positive due to timed delay,
|
|
* long-running operation, or I/O delay
|
|
*/
|
|
if (gridlockLikely)
|
|
{
|
|
std::shared_ptr<AsynchronousContinuationChainLink>
|
|
currentContinuationShPtr =
|
|
serializedContinuation.shared_from_this();
|
|
|
|
bool removed = QutexAcquisitionHistoryTracker::getInstance()
|
|
.remove(currentContinuationShPtr);
|
|
|
|
if (removed)
|
|
{
|
|
std::cerr
|
|
<< "LockerAndInvoker::operator(): False positive "
|
|
"gridlock detection - continuation @"
|
|
<< &serializedContinuation
|
|
<< " was being tracked but successfully acquired all "
|
|
"locks. This was likely due to timed delay, "
|
|
"long-running operation, or I/O delay."
|
|
<< std::endl;
|
|
}
|
|
}
|
|
#endif
|
|
|
|
invocationTarget();
|
|
}
|
|
|
|
/**
|
|
* @brief Get the iterator for this lockvoker in the specified Qutex's queue
|
|
* @param qutex The Qutex to get the iterator for
|
|
* @return Iterator pointing to this lockvoker in the Qutex's queue
|
|
*/
|
|
LockerAndInvokerBase::List::iterator
|
|
getLockvokerIteratorForQutex(Qutex& qutex) const override
|
|
{
|
|
return serializedContinuation.requiredLocks.getLockUsageDesc(
|
|
qutex).second;
|
|
}
|
|
|
|
/**
|
|
* @brief Awaken this lockvoker by posting it to its io_service
|
|
* @param forceAwaken If true, post even if already awake
|
|
*/
|
|
void awaken(bool forceAwaken = false) override
|
|
{
|
|
bool prevVal = serializedContinuation.isAwakeOrBeingAwakened
|
|
.exchange(true);
|
|
|
|
if (prevVal == true && !forceAwaken)
|
|
{ return; }
|
|
|
|
target->getIoService().post(*this);
|
|
}
|
|
|
|
size_t getLockSetSize() const override
|
|
{ return serializedContinuation.requiredLocks.locks.size(); }
|
|
|
|
Qutex& getLockAt(size_t index) const override
|
|
{
|
|
return serializedContinuation.requiredLocks.locks[index]
|
|
.first.get();
|
|
}
|
|
|
|
private:
|
|
// Allow awakening by resetting the awake flag
|
|
void allowAwakening()
|
|
{ serializedContinuation.isAwakeOrBeingAwakened.store(false); }
|
|
|
|
/** EXPLANATION:
|
|
* Creates a shared_ptr copy of this lockvoker and registers it with
|
|
* the serialized continuation's LockSet.
|
|
*/
|
|
void registerInLockSet()
|
|
{
|
|
auto sharedLockvoker = std::make_shared<
|
|
LockerAndInvoker<InvocationTargetT>>(*this);
|
|
|
|
serializedContinuation.requiredLocks.registerInQutexQueues(
|
|
sharedLockvoker);
|
|
}
|
|
|
|
/**
|
|
* @brief First wake - register in queues and awaken
|
|
*
|
|
* Sets isAwake=true before calling awaken with forceAwaken to ensure
|
|
* that none of the locks we just registered with awaken()s a duplicate
|
|
* copy of this lockvoker on the io_service.
|
|
*/
|
|
void firstWake()
|
|
{
|
|
serializedContinuation.isAwakeOrBeingAwakened.store(true);
|
|
registerInLockSet();
|
|
// Force awaken since we just set the flag above
|
|
awaken(true);
|
|
}
|
|
|
|
// Has CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS elapsed since creation?
|
|
bool isDeadlockLikely() const
|
|
{
|
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
|
auto now = std::chrono::steady_clock::now();
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|
now - creationTimestamp);
|
|
return elapsed.count() >= CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS;
|
|
#else
|
|
return false;
|
|
#endif
|
|
}
|
|
|
|
// Wrapper around isDeadlockLikely for gridlock detection
|
|
bool isGridlockLikely() const
|
|
{ return isDeadlockLikely(); }
|
|
|
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
|
struct obsolete {
|
|
bool traceContinuationHistoryForGridlockOn(Qutex &firstFailedQutex);
|
|
};
|
|
|
|
bool traceContinuationHistoryForDeadlockOn(Qutex &firstFailedQutex);
|
|
std::optional<std::reference_wrapper<Qutex>>
|
|
traceContinuationHistoryForDeadlock(void)
|
|
{
|
|
for (auto& lockUsageDesc
|
|
: serializedContinuation.requiredLocks.locks)
|
|
{
|
|
if (traceContinuationHistoryForDeadlockOn(
|
|
lockUsageDesc.first.get()))
|
|
{
|
|
return std::ref(lockUsageDesc.first.get());
|
|
}
|
|
}
|
|
return std::nullopt;
|
|
}
|
|
|
|
/**
|
|
* @brief Handle a likely deadlock situation by logging debug information
|
|
* @param firstFailedQutex The first qutex that failed acquisition
|
|
*/
|
|
void handleDeadlock(Qutex &firstFailedQutex);
|
|
void handleGridlock(Qutex &firstFailedQutex);
|
|
#endif
|
|
|
|
private:
|
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
|
std::chrono::steady_clock::time_point creationTimestamp;
|
|
#endif
|
|
SerializedAsynchronousContinuation<OriginalCbFnT>
|
|
&serializedContinuation;
|
|
std::shared_ptr<ComponentThread> target;
|
|
InvocationTargetT invocationTarget;
|
|
};
|
|
};
|
|
|
|
} // namespace smo
|
|
|
|
#endif // SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
|