092a0954a0
We added a timestamp to each Lockvoker so that we can detect when a lockvoker has been in a qutex for "too long", where "too long" is defined arbitrarily as 500ms. Next we're going to change the way we create callbacks to enable us to more explicitly access the sh_ptr<AsyncContin> via the callback object.
210 lines
6.3 KiB
C++
210 lines
6.3 KiB
C++
#ifndef SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
|
|
#define SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
|
|
|
|
#include <config.h>
|
|
#include <functional>
|
|
#include <memory>
|
|
#include <atomic>
|
|
#include <chrono>
|
|
#include <iostream>
|
|
#include <componentThread.h>
|
|
#include <lockSet.h>
|
|
#include <asynchronousContinuation.h>
|
|
#include <lockerAndInvokerBase.h>
|
|
|
|
namespace smo {
|
|
|
|
template <class OriginalCbFnT>
|
|
class SerializedAsynchronousContinuation
|
|
: public PostedAsynchronousContinuation<OriginalCbFnT>
|
|
{
|
|
public:
|
|
SerializedAsynchronousContinuation(
|
|
const std::shared_ptr<ComponentThread> &caller,
|
|
OriginalCbFnT originalCbFn,
|
|
std::vector<std::reference_wrapper<SpinLock>> requiredLocks = {})
|
|
: PostedAsynchronousContinuation<OriginalCbFnT>(caller, originalCbFn),
|
|
requiredLocks(*this, std::move(requiredLocks))
|
|
{}
|
|
|
|
template<typename... Args>
|
|
void callOriginalCb(Args&&... args)
|
|
{
|
|
requiredLocks.release();
|
|
PostedAsynchronousContinuation<OriginalCbFnT>::callOriginalCb(
|
|
std::forward<Args>(args)...);
|
|
}
|
|
|
|
public:
|
|
LockSet<OriginalCbFnT> requiredLocks;
|
|
std::atomic<bool> isAwakeOrBeingAwakened{false};
|
|
|
|
/**
|
|
* @brief LockerAndInvoker - Template class for lockvoking mechanism
|
|
*
|
|
* This class wraps a std::bind result and provides locking functionality.
|
|
* When locks cannot be acquired, the object re-posts itself to the io_service
|
|
* queue, implementing the "spinqueueing" pattern.
|
|
*/
|
|
template <class InvocationTargetT>
|
|
class LockerAndInvoker
|
|
: public LockerAndInvokerBase
|
|
{
|
|
public:
|
|
/**
|
|
* @brief Constructor that immediately posts to io_service
|
|
* @param serializedContinuation Reference to the serialized continuation
|
|
* containing LockSet and target io_service
|
|
* @param target The ComponentThread whose io_service to post to
|
|
* @param invocationTarget The std::bind result to invoke when locks are acquired
|
|
*/
|
|
LockerAndInvoker(
|
|
SerializedAsynchronousContinuation<OriginalCbFnT>
|
|
&serializedContinuation,
|
|
const std::shared_ptr<ComponentThread>& target,
|
|
InvocationTargetT invocationTarget)
|
|
: LockerAndInvokerBase(&serializedContinuation),
|
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
|
creationTimestamp(std::chrono::steady_clock::now()),
|
|
#endif
|
|
serializedContinuation(serializedContinuation),
|
|
target(target),
|
|
invocationTarget(std::move(invocationTarget))
|
|
{
|
|
firstWake();
|
|
}
|
|
|
|
/**
|
|
* @brief Function call operator - tries to acquire locks and either
|
|
* invokes the target or returns (already registered in qutex queues)
|
|
*/
|
|
void operator()()
|
|
{
|
|
if (ComponentThread::getSelf() != target)
|
|
{
|
|
throw std::runtime_error(
|
|
"LockerAndInvoker::operator(): Thread safety violation - "
|
|
"executing on wrong ComponentThread");
|
|
}
|
|
|
|
Qutex *firstFailedQutexPtr = nullptr;
|
|
bool deadlockLikely = isDeadlockLikely();
|
|
|
|
if (!serializedContinuation.requiredLocks.tryAcquireOrBackOff(
|
|
*this, (deadlockLikely ? &firstFailedQutexPtr : nullptr)))
|
|
{
|
|
// Just allow this lockvoker to be dropped from its io_service.
|
|
allowAwakening();
|
|
if (!deadlockLikely)
|
|
{ return; }
|
|
|
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
|
handleLikelyDeadlock(*firstFailedQutexPtr);
|
|
#endif
|
|
return;
|
|
}
|
|
|
|
/** EXPLANATION:
|
|
* Successfully acquired all locks, so unregister from qutex queues.
|
|
* We do this here so that we can free up queue slots in the qutex
|
|
* queues for other lockvokers that may be waiting to acquire the
|
|
* locks. The size of the qutex queues does matter for other
|
|
* contending lockvokers; and so also does their position in the
|
|
* queues.
|
|
*
|
|
* The alternative is to leave ourself in the queues until we
|
|
* eventually release all locks; and given that we may hold locks
|
|
* even across true async hardware bottlenecks, this could take a
|
|
* long time.
|
|
*
|
|
* Granted, the fact that we own the locks means that even though
|
|
* we've removed ourselves from the queues, other lockvokers still
|
|
* can't acquire the locks anyway.
|
|
*/
|
|
serializedContinuation.requiredLocks.unregisterFromQutexQueues();
|
|
invocationTarget();
|
|
}
|
|
|
|
/**
|
|
* @brief Get the iterator for this lockvoker in the specified Qutex's queue
|
|
* @param qutex The Qutex to get the iterator for
|
|
* @return Iterator pointing to this lockvoker in the Qutex's queue
|
|
*/
|
|
LockerAndInvokerBase::List::iterator
|
|
getLockvokerIteratorForQutex(Qutex& qutex) override
|
|
{
|
|
return serializedContinuation.requiredLocks.getLockUsageDesc(
|
|
qutex).second;
|
|
}
|
|
|
|
/**
|
|
* @brief Awaken this lockvoker by posting it to its io_service
|
|
* @param forceAwaken If true, post even if already awake
|
|
*/
|
|
void awaken(bool forceAwaken = false) override
|
|
{
|
|
bool prevVal = serializedContinuation.isAwakeOrBeingAwakened
|
|
.exchange(true);
|
|
|
|
if (prevVal == true && !forceAwaken)
|
|
{ return; }
|
|
|
|
target->getIoService().post(*this);
|
|
}
|
|
|
|
private:
|
|
// Allow awakening by resetting the awake flag
|
|
void allowAwakening()
|
|
{ serializedContinuation.isAwakeOrBeingAwakened.store(false); }
|
|
|
|
/**
|
|
* @brief First wake - register in queues and awaken
|
|
*
|
|
* Sets isAwake=true before calling awaken with forceAwaken to ensure
|
|
* that none of the locks we just registered with awaken()s a duplicate
|
|
* copy of this lockvoker on the io_service.
|
|
*/
|
|
void firstWake()
|
|
{
|
|
serializedContinuation.isAwakeOrBeingAwakened.store(true);
|
|
serializedContinuation.requiredLocks.registerInQutexQueues(*this);
|
|
// Force awaken since we just set the flag above
|
|
awaken(true);
|
|
}
|
|
|
|
// Has CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS elapsed since creation?
|
|
bool isDeadlockLikely() const
|
|
{
|
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
|
auto now = std::chrono::steady_clock::now();
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|
now - creationTimestamp);
|
|
return elapsed.count() >= CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS;
|
|
#else
|
|
return false;
|
|
#endif
|
|
}
|
|
|
|
/**
|
|
* @brief Handle a likely deadlock situation by logging debug information
|
|
* @param firstFailedQutex The first qutex that failed acquisition
|
|
*/
|
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
|
void handleLikelyDeadlock(Qutex& firstFailedQutex);
|
|
#endif
|
|
|
|
private:
|
|
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
|
|
std::chrono::steady_clock::time_point creationTimestamp;
|
|
#endif
|
|
SerializedAsynchronousContinuation<OriginalCbFnT>
|
|
&serializedContinuation;
|
|
InvocationTargetT invocationTarget;
|
|
std::shared_ptr<ComponentThread> target;
|
|
};
|
|
};
|
|
|
|
} // namespace smo
|
|
|
|
#endif // SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
|