Files
salmanoff/include/serializedAsynchronousContinuation.h
T

343 lines
10 KiB
C++

#ifndef SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
#define SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
#include <config.h>
#include <functional>
#include <memory>
#include <atomic>
#include <chrono>
#include <iostream>
#include <componentThread.h>
#include <lockSet.h>
#include <asynchronousContinuation.h>
#include <lockerAndInvokerBase.h>
#include <callback.h>
#include <qutexAcquisitionHistoryTracker.h>
namespace smo {
template <class OriginalCbFnT>
class SerializedAsynchronousContinuation
: public PostedAsynchronousContinuation<OriginalCbFnT>
{
public:
SerializedAsynchronousContinuation(
const std::shared_ptr<ComponentThread> &caller,
Callback<OriginalCbFnT> originalCbFn,
std::vector<std::reference_wrapper<Qutex>> requiredLocks = {})
: PostedAsynchronousContinuation<OriginalCbFnT>(caller, originalCbFn),
requiredLocks(*this, std::move(requiredLocks))
{}
template<typename... Args>
void callOriginalCb(Args&&... args)
{
requiredLocks.release();
PostedAsynchronousContinuation<OriginalCbFnT>::callOriginalCb(
std::forward<Args>(args)...);
}
// Return list of all qutexes in predecessors' LockSets; excludes self.
[[nodiscard]]
std::unique_ptr<std::forward_list<std::reference_wrapper<Qutex>>>
getAcquiredQutexHistory() const;
public:
LockSet<OriginalCbFnT> requiredLocks;
std::atomic<bool> isAwakeOrBeingAwakened{false};
/**
* @brief LockerAndInvoker - Template class for lockvoking mechanism
*
* This class wraps a std::bind result and provides locking functionality.
* When locks cannot be acquired, the object re-posts itself to the io_service
* queue, implementing the "spinqueueing" pattern.
*/
template <class InvocationTargetT>
class LockerAndInvoker
: public LockerAndInvokerBase
{
public:
/**
* @brief Constructor that immediately posts to io_service
* @param serializedContinuation Reference to the serialized continuation
* containing LockSet and target io_service
* @param target The ComponentThread whose io_service to post to
* @param invocationTarget The std::bind result to invoke when locks are acquired
*/
LockerAndInvoker(
SerializedAsynchronousContinuation<OriginalCbFnT>
&serializedContinuation,
const std::shared_ptr<ComponentThread>& target,
InvocationTargetT invocationTarget)
: LockerAndInvokerBase(&serializedContinuation),
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
creationTimestamp(std::chrono::steady_clock::now()),
#endif
serializedContinuation(serializedContinuation),
target(target),
invocationTarget(std::move(invocationTarget))
{
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
if (traceContinuationHistoryForDeadlock())
{
handleDeadlock();
throw std::runtime_error(
"LockerAndInvoker::LockerAndInvoker(): Deadlock detected");
}
#endif // CONFIG_ENABLE_DEBUG_LOCKS
firstWake();
}
/**
* @brief Function call operator - tries to acquire locks and either
* invokes the target or returns (already registered in qutex queues)
*/
void operator()()
{
if (ComponentThread::getSelf() != target)
{
throw std::runtime_error(
"LockerAndInvoker::operator(): Thread safety violation - "
"executing on wrong ComponentThread");
}
Qutex *firstFailedQutexPtr = nullptr;
bool deadlockLikely = isDeadlockLikely();
bool gridlockLikely = isGridlockLikely();
if (!serializedContinuation.requiredLocks.tryAcquireOrBackOff(
*this,
((deadlockLikely || gridlockLikely)
? &firstFailedQutexPtr : nullptr)))
{
// Just allow this lockvoker to be dropped from its io_service.
allowAwakening();
if (!deadlockLikely && !gridlockLikely)
{ return; }
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
Qutex &firstFailedQutex = *firstFailedQutexPtr;
bool isDeadlock = traceContinuationHistoryForDeadlockOn(
firstFailedQutex);
bool gridlockIsHeuristicallyLikely = false;
bool gridlockIsAlgorithmicallyLikely = false;
if (gridlockLikely)
{
auto& tracker = QutexAcquisitionHistoryTracker
::getInstance();
auto heldLocks = serializedContinuation
.getAcquiredQutexHistory();
// Add this continuation to the tracker
auto currentContinuationShPtr = serializedContinuation
.shared_from_this();
tracker.addIfNotExists(
currentContinuationShPtr,
firstFailedQutex, std::move(heldLocks));
gridlockIsHeuristicallyLikely = tracker
.heuristicallyTraceContinuationHistoryForGridlockOn(
firstFailedQutex, currentContinuationShPtr);
if (gridlockIsHeuristicallyLikely)
{
gridlockIsAlgorithmicallyLikely = tracker
.completelyTraceContinuationHistoryForGridlockOn(
firstFailedQutex);
}
}
bool isGridlock = (gridlockIsHeuristicallyLikely
|| gridlockIsAlgorithmicallyLikely);
if (!isDeadlock && !isGridlock)
{ return; }
if (isDeadlock) { handleDeadlock(firstFailedQutex); }
if (isGridlock) { handleGridlock(firstFailedQutex); }
#endif
return;
}
/** EXPLANATION:
* Successfully acquired all locks, so unregister from qutex queues.
* We do this here so that we can free up queue slots in the qutex
* queues for other lockvokers that may be waiting to acquire the
* locks. The size of the qutex queues does matter for other
* contending lockvokers; and so also does their position in the
* queues.
*
* The alternative is to leave ourself in the queues until we
* eventually release all locks; and given that we may hold locks
* even across true async hardware bottlenecks, this could take a
* long time.
*
* Granted, the fact that we own the locks means that even though
* we've removed ourselves from the queues, other lockvokers still
* can't acquire the locks anyway.
*/
serializedContinuation.requiredLocks.unregisterFromQutexQueues();
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
/** EXPLANATION:
* If we were being tracked for gridlock detection but successfully
* acquired all locks, it was a false positive due to timed delay,
* long-running operation, or I/O delay
*/
if (gridlockLikely)
{
bool removed = QutexAcquisitionHistoryTracker::getInstance()
.remove(serializedContinuation.shared_from_this());
if (removed)
{
std::cerr
<< "LockerAndInvoker::operator(): False positive "
"gridlock detection - continuation @"
<< serializedContinuation.get()
<< " was being tracked but successfully acquired all "
"locks. This was likely due to timed delay, "
"long-running operation, or I/O delay."
<< std::endl;
}
}
#endif
invocationTarget();
}
/**
* @brief Get the iterator for this lockvoker in the specified Qutex's queue
* @param qutex The Qutex to get the iterator for
* @return Iterator pointing to this lockvoker in the Qutex's queue
*/
LockerAndInvokerBase::List::iterator
getLockvokerIteratorForQutex(Qutex& qutex) const override
{
return serializedContinuation.requiredLocks.getLockUsageDesc(
qutex).second;
}
/**
* @brief Awaken this lockvoker by posting it to its io_service
* @param forceAwaken If true, post even if already awake
*/
void awaken(bool forceAwaken = false) override
{
bool prevVal = serializedContinuation.isAwakeOrBeingAwakened
.exchange(true);
if (prevVal == true && !forceAwaken)
{ return; }
target->getIoService().post(*this);
}
size_t getLockSetSize() const override
{ return serializedContinuation.requiredLocks.locks.size(); }
Qutex& getLockAt(size_t index) const override
{
return serializedContinuation.requiredLocks.locks[index]
.first.get();
}
private:
// Allow awakening by resetting the awake flag
void allowAwakening()
{ serializedContinuation.isAwakeOrBeingAwakened.store(false); }
/** EXPLANATION:
* Creates a shared_ptr copy of this lockvoker and registers it with
* the serialized continuation's LockSet.
*/
void registerInLockSet()
{
auto sharedLockvoker = std::make_shared<
LockerAndInvoker<InvocationTargetT>>(*this);
serializedContinuation.requiredLocks.registerInQutexQueues(
sharedLockvoker);
}
/**
* @brief First wake - register in queues and awaken
*
* Sets isAwake=true before calling awaken with forceAwaken to ensure
* that none of the locks we just registered with awaken()s a duplicate
* copy of this lockvoker on the io_service.
*/
void firstWake()
{
serializedContinuation.isAwakeOrBeingAwakened.store(true);
registerInLockSet();
// Force awaken since we just set the flag above
awaken(true);
}
// Has CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS elapsed since creation?
bool isDeadlockLikely() const
{
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
auto now = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
now - creationTimestamp);
return elapsed.count() >= CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS;
#else
return false;
#endif
}
// Wrapper around isDeadlockLikely for gridlock detection
bool isGridlockLikely() const
{ return isDeadlockLikely(); }
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
struct obsolete {
bool traceContinuationHistoryForGridlockOn(Qutex &firstFailedQutex);
};
bool traceContinuationHistoryForDeadlockOn(Qutex &firstFailedQutex);
bool traceContinuationHistoryForDeadlock(void)
{
for (auto& lockUsageDesc
: serializedContinuation.requiredLocks.locks)
{
if (traceContinuationHistoryForDeadlockOn(
lockUsageDesc.first.get()))
{
return true;
}
}
return false;
}
/**
* @brief Handle a likely deadlock situation by logging debug information
* @param firstFailedQutex The first qutex that failed acquisition
*/
void handleDeadlock(Qutex &firstFailedQutex);
void handleGridlock(Qutex &firstFailedQutex);
#endif
private:
#ifdef CONFIG_ENABLE_DEBUG_LOCKS
std::chrono::steady_clock::time_point creationTimestamp;
#endif
SerializedAsynchronousContinuation<OriginalCbFnT>
&serializedContinuation;
std::shared_ptr<ComponentThread> target;
InvocationTargetT invocationTarget;
};
};
} // namespace smo
#endif // SERIALIZED_ASYNCHRONOUS_CONTINUATION_H