#ifndef SERIALIZED_ASYNCHRONOUS_CONTINUATION_H #define SERIALIZED_ASYNCHRONOUS_CONTINUATION_H #include #include #include #include #include #include #include #include #include #include #include #include namespace smo { template class SerializedAsynchronousContinuation : public PostedAsynchronousContinuation { public: SerializedAsynchronousContinuation( const std::shared_ptr &caller, Callback originalCbFn, std::vector> requiredLocks = {}) : PostedAsynchronousContinuation(caller, originalCbFn), requiredLocks(*this, std::move(requiredLocks)) {} template void callOriginalCb(Args&&... args) { requiredLocks.release(); PostedAsynchronousContinuation::callOriginalCb( std::forward(args)...); } // Return list of all qutexes in predecessors' LockSets; excludes self. [[nodiscard]] std::unique_ptr>> getAcquiredQutexHistory() const; public: LockSet requiredLocks; std::atomic isAwakeOrBeingAwakened{false}; /** * @brief LockerAndInvoker - Template class for lockvoking mechanism * * This class wraps a std::bind result and provides locking functionality. * When locks cannot be acquired, the object re-posts itself to the io_service * queue, implementing the "spinqueueing" pattern. */ template class LockerAndInvoker : public LockerAndInvokerBase { public: /** * @brief Constructor that immediately posts to io_service * @param serializedContinuation Reference to the serialized continuation * containing LockSet and target io_service * @param target The ComponentThread whose io_service to post to * @param invocationTarget The std::bind result to invoke when locks are acquired */ LockerAndInvoker( SerializedAsynchronousContinuation &serializedContinuation, const std::shared_ptr& target, InvocationTargetT invocationTarget) : LockerAndInvokerBase(&serializedContinuation), #ifdef CONFIG_ENABLE_DEBUG_LOCKS creationTimestamp(std::chrono::steady_clock::now()), #endif serializedContinuation(serializedContinuation), target(target), invocationTarget(std::move(invocationTarget)) { #ifdef CONFIG_ENABLE_DEBUG_LOCKS if (traceContinuationHistoryForDeadlock()) { handleDeadlock(); throw std::runtime_error( "LockerAndInvoker::LockerAndInvoker(): Deadlock detected"); } #endif // CONFIG_ENABLE_DEBUG_LOCKS firstWake(); } /** * @brief Function call operator - tries to acquire locks and either * invokes the target or returns (already registered in qutex queues) */ void operator()() { if (ComponentThread::getSelf() != target) { throw std::runtime_error( "LockerAndInvoker::operator(): Thread safety violation - " "executing on wrong ComponentThread"); } Qutex *firstFailedQutexPtr = nullptr; bool deadlockLikely = isDeadlockLikely(); bool gridlockLikely = isGridlockLikely(); if (!serializedContinuation.requiredLocks.tryAcquireOrBackOff( *this, ((deadlockLikely || gridlockLikely) ? &firstFailedQutexPtr : nullptr))) { // Just allow this lockvoker to be dropped from its io_service. allowAwakening(); if (!deadlockLikely && !gridlockLikely) { return; } #ifdef CONFIG_ENABLE_DEBUG_LOCKS Qutex &firstFailedQutex = *firstFailedQutexPtr; bool isDeadlock = traceContinuationHistoryForDeadlockOn( firstFailedQutex); bool gridlockIsHeuristicallyLikely = false; bool gridlockIsAlgorithmicallyLikely = false; if (gridlockLikely) { auto tracker = QutexAcquisitionHistoryTracker ::getInstance(); auto heldLocks = serializedContinuation .getAcquiredQutexHistory(); // Add this continuation to the tracker auto currentContinuationShPtr = serializedContinuation .shared_from_this(); tracker.addIfNotExists( currentContinuationShPtr, firstFailedQutex, std::move(heldLocks)); gridlockIsHeuristicallyLikely = tracker .heuristicallyTraceContinuationHistoryForGridlockOn( firstFailedQutex, currentContinuationShPtr); if (gridlockIsHeuristicallyLikely) { gridlockIsAlgorithmicallyLikely = tracker .completelyTraceContinuationHistoryForGridlockOn( firstFailedQutex); } } bool isGridlock = (gridlockIsHeuristicallyLikely || gridlockIsAlgorithmicallyLikely); if (!isDeadlock && !isGridlock) { return; } if (isDeadlock) { handleDeadlock(firstFailedQutex); } if (isGridlock) { handleGridlock(firstFailedQutex); } #endif return; } /** EXPLANATION: * Successfully acquired all locks, so unregister from qutex queues. * We do this here so that we can free up queue slots in the qutex * queues for other lockvokers that may be waiting to acquire the * locks. The size of the qutex queues does matter for other * contending lockvokers; and so also does their position in the * queues. * * The alternative is to leave ourself in the queues until we * eventually release all locks; and given that we may hold locks * even across true async hardware bottlenecks, this could take a * long time. * * Granted, the fact that we own the locks means that even though * we've removed ourselves from the queues, other lockvokers still * can't acquire the locks anyway. */ serializedContinuation.requiredLocks.unregisterFromQutexQueues(); #ifdef CONFIG_ENABLE_DEBUG_LOCKS /** EXPLANATION: * If we were being tracked for gridlock detection but successfully * acquired all locks, it was a false positive due to timed delay, * long-running operation, or I/O delay */ if (gridlockLikely) { bool removed = QutexAcquisitionHistoryTracker::getInstance() .remove(serializedContinuation.shared_from_this()); if (removed) { std::cerr << "LockerAndInvoker::operator(): False positive " "gridlock detection - continuation @" << serializedContinuation.get() << " was being tracked but successfully acquired all " "locks. This was likely due to timed delay, " "long-running operation, or I/O delay." << std::endl; } } #endif invocationTarget(); } /** * @brief Get the iterator for this lockvoker in the specified Qutex's queue * @param qutex The Qutex to get the iterator for * @return Iterator pointing to this lockvoker in the Qutex's queue */ LockerAndInvokerBase::List::iterator getLockvokerIteratorForQutex(Qutex& qutex) const override { return serializedContinuation.requiredLocks.getLockUsageDesc( qutex).second; } /** * @brief Awaken this lockvoker by posting it to its io_service * @param forceAwaken If true, post even if already awake */ void awaken(bool forceAwaken = false) override { bool prevVal = serializedContinuation.isAwakeOrBeingAwakened .exchange(true); if (prevVal == true && !forceAwaken) { return; } target->getIoService().post(*this); } size_t getLockSetSize() const override { return serializedContinuation.requiredLocks.locks.size(); } Qutex& getLockAt(size_t index) const override { return serializedContinuation.requiredLocks.locks[index] .first.get(); } private: // Allow awakening by resetting the awake flag void allowAwakening() { serializedContinuation.isAwakeOrBeingAwakened.store(false); } /** * @brief First wake - register in queues and awaken * * Sets isAwake=true before calling awaken with forceAwaken to ensure * that none of the locks we just registered with awaken()s a duplicate * copy of this lockvoker on the io_service. */ void firstWake() { serializedContinuation.isAwakeOrBeingAwakened.store(true); serializedContinuation.requiredLocks.registerInQutexQueues(*this); // Force awaken since we just set the flag above awaken(true); } // Has CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS elapsed since creation? bool isDeadlockLikely() const { #ifdef CONFIG_ENABLE_DEBUG_LOCKS auto now = std::chrono::steady_clock::now(); auto elapsed = std::chrono::duration_cast( now - creationTimestamp); return elapsed.count() >= CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS; #else return false; #endif } // Wrapper around isDeadlockLikely for gridlock detection bool isGridlockLikely() const { return isDeadlockLikely(); } #ifdef CONFIG_ENABLE_DEBUG_LOCKS struct obsolete { bool traceContinuationHistoryForGridlockOn(Qutex &firstFailedQutex); }; bool traceContinuationHistoryForDeadlockOn(Qutex &firstFailedQutex); bool traceContinuationHistoryForDeadlock(void) { for (auto& lockUsageDesc : serializedContinuation.requiredLocks.locks) { if (traceContinuationHistoryForDeadlockOn( lockUsageDesc.first.get())) { return true; } } return false; } /** * @brief Handle a likely deadlock situation by logging debug information * @param firstFailedQutex The first qutex that failed acquisition */ void handleDeadlock(Qutex &firstFailedQutex); void handleGridlock(Qutex &firstFailedQutex); #endif private: #ifdef CONFIG_ENABLE_DEBUG_LOCKS std::chrono::steady_clock::time_point creationTimestamp; #endif SerializedAsynchronousContinuation &serializedContinuation; InvocationTargetT invocationTarget; std::shared_ptr target; }; }; } // namespace smo #endif // SERIALIZED_ASYNCHRONOUS_CONTINUATION_H