8123ec1227
We add the skeleton of a correct history tracer for gridlocks. The previous history tracer made the incorrect assumption that we would find the foreign sequence's currently desired LockSet inside of the lockvoker that it has stored inside of Qutex::currOwner.
269 lines
7.9 KiB
C++
269 lines
7.9 KiB
C++
#ifndef SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
|
|
#define SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
|
|
|
|
#include <config.h>
|
|
#include <functional>
|
|
#include <memory>
|
|
#include <atomic>
|
|
#include <chrono>
|
|
#include <iostream>
|
|
#include <componentThread.h>
|
|
#include <lockSet.h>
|
|
#include <asynchronousContinuation.h>
|
|
#include <lockerAndInvokerBase.h>
|
|
#include <callback.h>
|
|
|
|
namespace smo {
|
|
|
|
template <class OriginalCbFnT>
|
|
class SerializedAsynchronousContinuation
|
|
: public PostedAsynchronousContinuation<OriginalCbFnT>
|
|
{
|
|
public:
|
|
SerializedAsynchronousContinuation(
|
|
const std::shared_ptr<ComponentThread> &caller,
|
|
Callback<OriginalCbFnT> originalCbFn,
|
|
std::vector<std::reference_wrapper<SpinLock>> requiredLocks = {})
|
|
: PostedAsynchronousContinuation<OriginalCbFnT>(caller, originalCbFn),
|
|
requiredLocks(*this, std::move(requiredLocks))
|
|
{}
|
|
|
|
template<typename... Args>
|
|
void callOriginalCb(Args&&... args)
|
|
{
|
|
requiredLocks.release();
|
|
PostedAsynchronousContinuation<OriginalCbFnT>::callOriginalCb(
|
|
std::forward<Args>(args)...);
|
|
}
|
|
|
|
public:
|
|
LockSet<OriginalCbFnT> requiredLocks;
|
|
std::atomic<bool> isAwakeOrBeingAwakened{false};
|
|
|
|
/**
|
|
* @brief LockerAndInvoker - Template class for lockvoking mechanism
|
|
*
|
|
* This class wraps a std::bind result and provides locking functionality.
|
|
* When locks cannot be acquired, the object re-posts itself to the io_service
|
|
* queue, implementing the "spinqueueing" pattern.
|
|
*/
|
|
template <class InvocationTargetT>
|
|
class LockerAndInvoker
|
|
: public LockerAndInvokerBase
|
|
{
|
|
public:
|
|
/**
|
|
* @brief Constructor that immediately posts to io_service
|
|
* @param serializedContinuation Reference to the serialized continuation
|
|
* containing LockSet and target io_service
|
|
* @param target The ComponentThread whose io_service to post to
|
|
* @param invocationTarget The std::bind result to invoke when locks are acquired
|
|
*/
|
|
LockerAndInvoker(
|
|
SerializedAsynchronousContinuation<OriginalCbFnT>
|
|
&serializedContinuation,
|
|
const std::shared_ptr<ComponentThread>& target,
|
|
InvocationTargetT invocationTarget)
|
|
: LockerAndInvokerBase(&serializedContinuation),
|
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
|
creationTimestamp(std::chrono::steady_clock::now()),
|
|
#endif
|
|
serializedContinuation(serializedContinuation),
|
|
target(target),
|
|
invocationTarget(std::move(invocationTarget))
|
|
{
|
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
|
if (traceContinuationHistoryForDeadlock())
|
|
{
|
|
handleDeadlock();
|
|
throw std::runtime_error(
|
|
"LockerAndInvoker::LockerAndInvoker(): Deadlock detected");
|
|
}
|
|
#endif // CONFIG_ENABLE_DEBUG_LOCKS
|
|
|
|
firstWake();
|
|
}
|
|
|
|
/**
|
|
* @brief Function call operator - tries to acquire locks and either
|
|
* invokes the target or returns (already registered in qutex queues)
|
|
*/
|
|
void operator()()
|
|
{
|
|
if (ComponentThread::getSelf() != target)
|
|
{
|
|
throw std::runtime_error(
|
|
"LockerAndInvoker::operator(): Thread safety violation - "
|
|
"executing on wrong ComponentThread");
|
|
}
|
|
|
|
Qutex *firstFailedQutexPtr = nullptr;
|
|
bool deadlockLikely = isDeadlockLikely();
|
|
bool gridlockLikely = isGridlockLikely();
|
|
|
|
if (!serializedContinuation.requiredLocks.tryAcquireOrBackOff(
|
|
*this,
|
|
((deadlockLikely || gridlockLikely)
|
|
? &firstFailedQutexPtr : nullptr)))
|
|
{
|
|
// Just allow this lockvoker to be dropped from its io_service.
|
|
allowAwakening();
|
|
if (!deadlockLikely && !gridlockLikely)
|
|
{ return; }
|
|
|
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
|
Qutex &firstFailedQutex = *firstFailedQutexPtr;
|
|
bool isDeadlock = traceContinuationHistoryForDeadlockOn(
|
|
firstFailedQutex);
|
|
|
|
bool isGridlock = heuristicallyTraceContinuationHistoryForGridlockOn(
|
|
firstFailedQutex);
|
|
|
|
if (!isDeadlock && !isGridlock)
|
|
{ return; }
|
|
|
|
handleDeadlock(firstFailedQutex);
|
|
#endif
|
|
return;
|
|
}
|
|
|
|
/** EXPLANATION:
|
|
* Successfully acquired all locks, so unregister from qutex queues.
|
|
* We do this here so that we can free up queue slots in the qutex
|
|
* queues for other lockvokers that may be waiting to acquire the
|
|
* locks. The size of the qutex queues does matter for other
|
|
* contending lockvokers; and so also does their position in the
|
|
* queues.
|
|
*
|
|
* The alternative is to leave ourself in the queues until we
|
|
* eventually release all locks; and given that we may hold locks
|
|
* even across true async hardware bottlenecks, this could take a
|
|
* long time.
|
|
*
|
|
* Granted, the fact that we own the locks means that even though
|
|
* we've removed ourselves from the queues, other lockvokers still
|
|
* can't acquire the locks anyway.
|
|
*/
|
|
serializedContinuation.requiredLocks.unregisterFromQutexQueues();
|
|
invocationTarget();
|
|
}
|
|
|
|
/**
|
|
* @brief Get the iterator for this lockvoker in the specified Qutex's queue
|
|
* @param qutex The Qutex to get the iterator for
|
|
* @return Iterator pointing to this lockvoker in the Qutex's queue
|
|
*/
|
|
LockerAndInvokerBase::List::iterator
|
|
getLockvokerIteratorForQutex(Qutex& qutex) const override
|
|
{
|
|
return serializedContinuation.requiredLocks.getLockUsageDesc(
|
|
qutex).second;
|
|
}
|
|
|
|
/**
|
|
* @brief Awaken this lockvoker by posting it to its io_service
|
|
* @param forceAwaken If true, post even if already awake
|
|
*/
|
|
void awaken(bool forceAwaken = false) override
|
|
{
|
|
bool prevVal = serializedContinuation.isAwakeOrBeingAwakened
|
|
.exchange(true);
|
|
|
|
if (prevVal == true && !forceAwaken)
|
|
{ return; }
|
|
|
|
target->getIoService().post(*this);
|
|
}
|
|
|
|
size_t getLockSetSize() const override
|
|
{ return serializedContinuation.requiredLocks.locks.size(); }
|
|
|
|
Qutex& getLockAt(size_t index) const override
|
|
{
|
|
return serializedContinuation.requiredLocks.locks[index]
|
|
.first.get();
|
|
}
|
|
|
|
private:
|
|
// Allow awakening by resetting the awake flag
|
|
void allowAwakening()
|
|
{ serializedContinuation.isAwakeOrBeingAwakened.store(false); }
|
|
|
|
/**
|
|
* @brief First wake - register in queues and awaken
|
|
*
|
|
* Sets isAwake=true before calling awaken with forceAwaken to ensure
|
|
* that none of the locks we just registered with awaken()s a duplicate
|
|
* copy of this lockvoker on the io_service.
|
|
*/
|
|
void firstWake()
|
|
{
|
|
serializedContinuation.isAwakeOrBeingAwakened.store(true);
|
|
serializedContinuation.requiredLocks.registerInQutexQueues(*this);
|
|
// Force awaken since we just set the flag above
|
|
awaken(true);
|
|
}
|
|
|
|
// Has CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS elapsed since creation?
|
|
bool isDeadlockLikely() const
|
|
{
|
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
|
auto now = std::chrono::steady_clock::now();
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|
now - creationTimestamp);
|
|
return elapsed.count() >= CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS;
|
|
#else
|
|
return false;
|
|
#endif
|
|
}
|
|
|
|
// Wrapper around isDeadlockLikely for gridlock detection
|
|
bool isGridlockLikely() const
|
|
{ return isDeadlockLikely(); }
|
|
|
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
|
struct obsolete {
|
|
bool traceContinuationHistoryForGridlockOn(Qutex &firstFailedQutex);
|
|
};
|
|
|
|
bool traceContinuationHistoryForDeadlockOn(Qutex &firstFailedQutex);
|
|
bool heuristicallyTraceContinuationHistoryForGridlockOn(
|
|
Qutex &firstFailedQutex);
|
|
bool completelyTraceContinuationHistoryForGridlockOn(
|
|
Qutex &firstFailedQutex);
|
|
bool traceContinuationHistoryForDeadlock(void)
|
|
{
|
|
for (auto& lockUsageDesc
|
|
: serializedContinuation.requiredLocks.locks)
|
|
{
|
|
if (traceContinuationHistoryForDeadlockOn(
|
|
lockUsageDesc.first.get()))
|
|
{
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* @brief Handle a likely deadlock situation by logging debug information
|
|
* @param firstFailedQutex The first qutex that failed acquisition
|
|
*/
|
|
void handleDeadlock(Qutex& firstFailedQutex);
|
|
#endif
|
|
|
|
private:
|
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
|
std::chrono::steady_clock::time_point creationTimestamp;
|
|
#endif
|
|
SerializedAsynchronousContinuation<OriginalCbFnT>
|
|
&serializedContinuation;
|
|
InvocationTargetT invocationTarget;
|
|
std::shared_ptr<ComponentThread> target;
|
|
};
|
|
};
|
|
|
|
} // namespace smo
|
|
|
|
#endif // SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
|