diff --git a/include/asynchronousContinuation.h b/include/asynchronousContinuation.h index b861f57..95f2793 100644 --- a/include/asynchronousContinuation.h +++ b/include/asynchronousContinuation.h @@ -104,31 +104,6 @@ public: std::shared_ptr caller; }; -template -class SerializedAsynchronousContinuation -: public PostedAsynchronousContinuation -{ -public: - SerializedAsynchronousContinuation( - const std::shared_ptr &caller, - OriginalCbFnT originalCbFn, - std::vector> requiredLocks = {}) - : PostedAsynchronousContinuation(caller, originalCbFn), - requiredLocks(*this, std::move(requiredLocks)) - {} - - template - void callOriginalCb(Args&&... args) - { - requiredLocks.release(); - PostedAsynchronousContinuation::callOriginalCb( - std::forward(args)...); - } - -public: - LockSet requiredLocks; -}; - } // namespace smo #endif // ASYNCHRONOUS_CONTINUATION_H diff --git a/include/lockSet.h b/include/lockSet.h index e6bfd39..96aba72 100644 --- a/include/lockSet.h +++ b/include/lockSet.h @@ -5,82 +5,112 @@ #include #include #include +#include +#include #include +#include namespace smo { +// Forward declarations +template +class SerializedAsynchronousContinuation; +class Qutex; + /** * @brief LockSet - Manages a collection of locks for acquisition/release */ +template class LockSet { +public: + /** EXPLANATION: + * Tracks both the Qutex that must be acquired, as well as the parent + * LockerAndInvoker that this LockSet has registered into that Qutex's + * queue. + */ + typedef std::pair< + std::reference_wrapper, + typename LockerAndInvokerBase::List::iterator> LockUsageDesc; + public: /** * @brief Constructor - * @param requiredLocks Vector of lock references that must be acquired + * @param parentContinuation Reference to the parent + * SerializedAsynchronousContinuation + * @param qutexes Vector of Qutex references that must be acquired */ - LockSet(std::vector> requiredLocks = {}) - : requiredLocks(std::move(requiredLocks)), allLocksAcquired(false) - {} + LockSet( + SerializedAsynchronousContinuation &parentContinuation, + std::vector> qutexes = {}) + : parentContinuation(parentContinuation), allLocksAcquired(false), + registeredInQutexQueues(false) + { + /* Convert Qutex references to LockUsageDesc (iterators will be filled + * in during registration) + */ + locks.reserve(qutexes.size()); + for (auto& qutexRef : qutexes) + { + locks.emplace_back( + qutexRef, + typename LockerAndInvokerBase::List::iterator{}); + } + } /** - * @brief Try to acquire all locks in order + * @brief Register the LockSet with all its Qutex locks + * @param lockvoker The LockerAndInvoker to register with each Qutex + * + * EXPLANATION: + * I'm not sure an unregisterFromQutexQueues() method is needed. + * Why? Because if an async sequence can't acquire all locks, it will + * simply never leave the qutexQ until it eventually does. The only other + * time it will leave the qutexQ is when the program terminates. + * + * I'm not sure we'll actually cancal all in-flight async sequences -- + * and especially not all those that aren't even in any io_service queues. + * To whatever extent these objects get cleaned up, they'll probably be + * cleaned up in the qutexQ's std::list destructor -- and that won't + * execute any fancy cleanup logic. It'll just clear() out the list. + */ + template + void registerInQutexQueues( + const typename SerializedAsynchronousContinuation::template LockerAndInvoker &lockvoker); + + /** + * @brief Try to acquire all locks in order; back off if acquisition fails + * @param lockvoker The LockerAndInvoker attempting to acquire the locks * @return true if all locks were acquired, false otherwise */ - bool tryAcquire() - { - if (allLocksAcquired) - { - throw std::runtime_error( - std::string(__func__) + - ": LockSet::tryAcquire() called but allLocksAcquired is " - "already true"); - } - - // Try to acquire all required locks - int nAcquired = 0; - for (auto& lockRef : requiredLocks) - { - if (!lockRef.get().tryAcquire()) { break; } - nAcquired++; - } - - if (nAcquired < static_cast(requiredLocks.size())) - { - // Release any locks we managed to acquire - for (int i = 0; i < nAcquired; i++) { - requiredLocks[i].get().release(); - } - allLocksAcquired = false; - return false; - } - - allLocksAcquired = true; - return true; - } + bool tryAcquireOrBackOff(LockerAndInvokerBase &lockvoker); + void unregisterFromQutexQueues(); /** * @brief Release all locks + * @param lockvoker The LockerAndInvoker that owns the locks */ - void release() + void release(LockerAndInvokerBase &lockvoker); + + const LockUsageDesc &getLockUsageDesc(const Qutex &criterionLock) const { - if (!allLocksAcquired) + for (auto& lockUsageDesc : locks) { - throw std::runtime_error( - std::string(__func__) + - ": LockSet::release() called but allLocksAcquired is false"); + if (&lockUsageDesc.first.get() == &criterionLock) { + return lockUsageDesc; + } } - for (auto& lockRef : requiredLocks) { - lockRef.get().release(); - } - - allLocksAcquired = false; + // Should never happen if the LockSet is properly constructed + throw std::runtime_error( + std::string(__func__) + + ": Qutex not found in this LockSet"); } private: - std::vector> requiredLocks; - std::atomic allLocksAcquired; + SerializedAsynchronousContinuation &parentContinuation; + std::vector locks; + bool allLocksAcquired, registeredInQutexQueues; }; } // namespace smo diff --git a/include/lockerAndInvokerBase.h b/include/lockerAndInvokerBase.h new file mode 100644 index 0000000..cdf5fde --- /dev/null +++ b/include/lockerAndInvokerBase.h @@ -0,0 +1,82 @@ +#ifndef LOCKER_AND_INVOKER_BASE_H +#define LOCKER_AND_INVOKER_BASE_H + +#include +#include +#include + +namespace smo { + +// Forward declaration +class Qutex; + +/** + * @brief LockerAndInvokerBase - Base class for lockvoking mechanism + * + * This base class contains the common functionality needed by Qutex, + * including the serialized continuation reference and comparison operators. + */ +class LockerAndInvokerBase +{ +public: + /** + * @brief Constructor + * @param serializedContinuationVaddr Raw pointer to the serialized continuation + */ + explicit LockerAndInvokerBase(const void* serializedContinuationVaddr) + : serializedContinuationVaddr(serializedContinuationVaddr) + {} + + /** + * @brief Typedef for list of LockerAndInvokerBase shared pointers + */ + typedef std::list> List; + + /** + * @brief Get the iterator for this lockvoker in the specified Qutex's queue + * @param qutex The Qutex to get the iterator for + * @return Iterator pointing to this lockvoker in the Qutex's queue + */ + virtual List::iterator getLockvokerIteratorForQutex(Qutex& qutex) = 0; + + /** + * @brief Awaken this lockvoker by posting it to its io_service + * @param forceAwaken If true, post even if already awake + */ + virtual void awaken(bool forceAwaken = false) = 0; + + /** + * @brief Equality operator + * + * Compare by the address of the continuation objects. Why? + * Because there's no guarantee that the lockvoker object that was + * passed in by the io_service invocation is the same object as that + * which is in the qutexQs. Especially because we make_shared() a + * copy when registerInQutexQueues()ing. + * + * Generally when we "wake" a lockvoker by enqueuing it, boost's + * io_service::post will copy the lockvoker object. + */ + bool operator==(const LockerAndInvokerBase &other) const + { + return serializedContinuationVaddr == other.serializedContinuationVaddr; + } + + /** + * @brief Inequality operator + */ + bool operator!=(const LockerAndInvokerBase &other) const + { + return serializedContinuationVaddr != other.serializedContinuationVaddr; + } + +protected: + /* Never let this monstrosity be seen beyond this class's scope. + * Remember what I've taught you, quasi-modo? + */ + const void* serializedContinuationVaddr; +}; + +} // namespace smo + +#endif // LOCKER_AND_INVOKER_BASE_H diff --git a/include/lockvoker.h b/include/lockvoker.h deleted file mode 100644 index e253701..0000000 --- a/include/lockvoker.h +++ /dev/null @@ -1,67 +0,0 @@ -#ifndef LOCKVOKER_H -#define LOCKVOKER_H - -#include -#include -#include -#include -#include - -namespace smo { - -/** - * @brief LockerAndInvoker - Template class for lockvoking mechanism - * - * This class wraps a std::bind result and provides locking functionality. - * When locks cannot be acquired, the object re-posts itself to the io_service - * queue, implementing the "spinqueueing" pattern. - */ -template -class LockerAndInvoker -{ -public: - /** - * @brief Constructor that immediately posts to io_service - * @param serializedContinuation Reference to the serialized continuation - * containing LockSet and target io_service - * @param invocationTarget The std::bind result to invoke when locks are acquired - */ - LockerAndInvoker( - SerializedAsynchronousContinuation& serializedContinuation, - InvocationTargetT invocationTarget) - : serializedContinuation(serializedContinuation), - invocationTarget(std::move(invocationTarget)) - { - post(); - } - - /** - * @brief Post this object to the io_service - */ - void post() - { serializedContinuation.caller->getIoService().post(*this); } - - /** - * @brief Function call operator - tries to acquire locks and either invokes - * the target or re-posts itself - */ - void operator()() - { - if (!serializedContinuation.tryAcquire()) - { - // Re-post ourselves to try again later - post(); - return; - } - - invocationTarget(); - } - -private: - SerializedAsynchronousContinuation& serializedContinuation; - InvocationTargetT invocationTarget; -}; - -} // namespace smo - -#endif // LOCKVOKER_H diff --git a/include/qutex.h b/include/qutex.h index ec32f09..18af07a 100644 --- a/include/qutex.h +++ b/include/qutex.h @@ -4,13 +4,13 @@ #include #include #include +#include +#include #include +#include namespace smo { -// Forward declarations -class LockerAndInvoker; - /** * @brief Qutex - Queue-based mutex for asynchronous lock management * @@ -21,30 +21,22 @@ class LockerAndInvoker; class Qutex { public: - typedef std::list> LockerAndInvokerList; - /** * @brief Constructor */ Qutex() - { - // TODO: Initialize member variables - } + : isOwned(false) + {} /** * @brief Register a lockvoker in the queue * @param lockvoker The lockvoker to register * @return Iterator pointing to the registered lockvoker in the queue */ - LockerAndInvokerList::iterator registerInQueue( - const std::shared_ptr &lockvoker + LockerAndInvokerBase::List::iterator registerInQueue( + const std::shared_ptr &lockvoker ) { - // TODO: Implement registration logic - // - Acquire the spinlock - // - Insert lockvoker at the rear of the queue - // - Return iterator to the inserted element - // - Release the spinlock lock.acquire(); auto it = queue.insert(queue.end(), lockvoker); lock.release(); @@ -54,14 +46,21 @@ public: /** * @brief Unregister a lockvoker from the queue * @param it Iterator pointing to the lockvoker to unregister + * @param shouldLock Whether to acquire the spinlock before erasing (default: true) */ - void unregisterFromQueue(LockerAndInvokerList::iterator it) + void unregisterFromQueue( + LockerAndInvokerBase::List::iterator it, bool shouldLock = true + ) { - // TODO: Implement unregistration logic - // - Acquire the spinlock - // - Erase the element at the given iterator - // - Release the spinlock - (void)it; // Suppress unused parameter warning + if (shouldLock) + { + lock.acquire(); + queue.erase(it); + lock.release(); + } + else { + queue.erase(it); + } } /** @@ -70,68 +69,25 @@ public: * @param nRequiredLocks Number of locks required by the lockvoker's LockSet * @return true if the lock was successfully acquired, false otherwise */ - bool tryAcquire(LockerAndInvoker &tryingLockvoker, int nRequiredLocks) - { - // TODO: Implement acquisition logic - // - Acquire the spinlock - // - Check if lock is already owned - // - For single-lock requests, grant immediately if available - // - For multi-lock requests, check if in top X% of queue - // - Set isOwned flag if successful - // - Release the spinlock - // - Return success/failure - (void)tryingLockvoker; // Suppress unused parameter warning - (void)nRequiredLocks; // Suppress unused parameter warning - return false; // Placeholder return value - } + bool tryAcquire( + const LockerAndInvokerBase &tryingLockvoker, int nRequiredLocks); /** * @brief Handle backoff when a lockvoker fails to acquire all required locks * @param failedAcquirer The lockvoker that failed to acquire all locks + * @param nRequiredLocks Number of locks required by the lockvoker's LockSet */ - void backoff(LockerAndInvoker &failedAcquirer) - { - // TODO: Implement backoff logic - // - Acquire the spinlock - // - If failedAcquirer is at front, rotate queue items - // - Move failedAcquirer to appropriate position in queue - // - Release the spinlock - // - Wake up the new front item - (void)failedAcquirer; // Suppress unused parameter warning - } + void backoff(const LockerAndInvokerBase &failedAcquirer, int nRequiredLocks); /** * @brief Release the lock and wake up the next waiting lockvoker - * @param prevOwner The lockvoker that previously owned the lock */ - void release(LockerAndInvoker &prevOwner) - { - // TODO: Implement release logic - // - Acquire the spinlock - // - Unregister the previous owner from the queue - // - Clear the isOwned flag - // - Get the new front item - // - Release the spinlock - // - Wake up the new front item (conditionally) - (void)prevOwner; // Suppress unused parameter warning - } - - /** - * @brief Wake up a specific lockvoker - * @param lockvoker The lockvoker to wake up - */ - void wakeUp(LockerAndInvoker &lockvoker) - { - // TODO: Implement wake-up logic - // - Post the lockvoker's invocation to its io_service - // - This will cause the lockvoker to retry acquisition - (void)lockvoker; // Suppress unused parameter warning - } + void release(); public: SpinLock lock; - std::atomic isOwned; - LockerAndInvokerList queue; + LockerAndInvokerBase::List queue; + bool isOwned; }; } // namespace smo diff --git a/include/serializedAsynchronousContinuation.h b/include/serializedAsynchronousContinuation.h new file mode 100644 index 0000000..e632ebf --- /dev/null +++ b/include/serializedAsynchronousContinuation.h @@ -0,0 +1,172 @@ +#ifndef SERIALIZED_ASYNCHRONOUS_CONTINUATION_H +#define SERIALIZED_ASYNCHRONOUS_CONTINUATION_H + +#include +#include +#include +#include +#include +#include +#include + +namespace smo { + + +template +class SerializedAsynchronousContinuation +: public PostedAsynchronousContinuation +{ +public: + SerializedAsynchronousContinuation( + const std::shared_ptr &caller, + OriginalCbFnT originalCbFn, + std::vector> requiredLocks = {}) + : PostedAsynchronousContinuation(caller, originalCbFn), + requiredLocks(*this, std::move(requiredLocks)) + {} + + template + void callOriginalCb(Args&&... args) + { + requiredLocks.release(); + PostedAsynchronousContinuation::callOriginalCb( + std::forward(args)...); + } + +public: + LockSet requiredLocks; + std::atomic isAwakeOrBeingAwakened{false}; + + /** + * @brief LockerAndInvoker - Template class for lockvoking mechanism + * + * This class wraps a std::bind result and provides locking functionality. + * When locks cannot be acquired, the object re-posts itself to the io_service + * queue, implementing the "spinqueueing" pattern. + */ + template + class LockerAndInvoker + : public LockerAndInvokerBase + { + public: + /** + * @brief Constructor that immediately posts to io_service + * @param serializedContinuation Reference to the serialized continuation + * containing LockSet and target io_service + * @param target The ComponentThread whose io_service to post to + * @param invocationTarget The std::bind result to invoke when locks are acquired + */ + LockerAndInvoker( + SerializedAsynchronousContinuation + &serializedContinuation, + const std::shared_ptr& target, + InvocationTargetT invocationTarget) + : LockerAndInvokerBase(&serializedContinuation), + serializedContinuation(serializedContinuation), + target(target), + invocationTarget(std::move(invocationTarget)) + { + firstWake(); + } + + /** + * @brief Function call operator - tries to acquire locks and either + * invokes the target or returns (already registered in qutex queues) + */ + void operator()() + { + if (ComponentThread::getSelf() != target) + { + throw std::runtime_error( + "LockerAndInvoker::operator(): Thread safety violation - " + "executing on wrong ComponentThread"); + } + + if (!serializedContinuation.requiredLocks.tryAcquireOrBackOff( + *this)) + { + // Just allow this lockvoker to be dropped from its io_service. + allowAwakening(); + return; + } + + /** EXPLANATION: + * Successfully acquired all locks, so unregister from qutex queues. + * We do this here so that we can free up queue slots in the qutex + * queues for other lockvokers that may be waiting to acquire the + * locks. The size of the qutex queues does matter for other + * contending lockvokers; and so also does their position in the + * queues. + * + * The alternative is to leave ourself in the queues until we + * eventually release all locks; and given that we may hold locks + * even across true async hardware bottlenecks, this could take a + * long time. + * + * Granted, the fact that we own the locks means that even though + * we've removed ourselves from the queues, other lockvokers still + * can't acquire the locks anyway. + */ + serializedContinuation.requiredLocks.unregisterFromQutexQueues(); + invocationTarget(); + } + + /** + * @brief Get the iterator for this lockvoker in the specified Qutex's queue + * @param qutex The Qutex to get the iterator for + * @return Iterator pointing to this lockvoker in the Qutex's queue + */ + LockerAndInvokerBase::List::iterator + getLockvokerIteratorForQutex(Qutex& qutex) override + { + return serializedContinuation.requiredLocks.getLockUsageDesc( + qutex).second; + } + + /** + * @brief Awaken this lockvoker by posting it to its io_service + * @param forceAwaken If true, post even if already awake + */ + void awaken(bool forceAwaken = false) override + { + bool prevVal = serializedContinuation.isAwakeOrBeingAwakened + .exchange(true); + + if (prevVal == true && !forceAwaken) + { return; } + + target->getIoService().post(*this); + } + + /** + * @brief Allow awakening by resetting the awake flag + */ + void allowAwakening() + { serializedContinuation.isAwakeOrBeingAwakened.store(false); } + + /** + * @brief First wake - register in queues and awaken + * + * Sets isAwake=true before calling awaken with forceAwaken to ensure + * that none of the locks we just registered with awaken()s a duplicate + * copy of this lockvoker on the io_service. + */ + void firstWake() + { + serializedContinuation.isAwakeOrBeingAwakened.store(true); + serializedContinuation.requiredLocks.registerInQutexQueues(*this); + // Force awaken since we just set the flag above + awaken(true); + } + + private: + SerializedAsynchronousContinuation + &serializedContinuation; + InvocationTargetT invocationTarget; + std::shared_ptr target; + }; +}; + +} // namespace smo + +#endif // SERIALIZED_ASYNCHRONOUS_CONTINUATION_H diff --git a/smocore/CMakeLists.txt b/smocore/CMakeLists.txt index 53ff49e..1fd9bf8 100644 --- a/smocore/CMakeLists.txt +++ b/smocore/CMakeLists.txt @@ -9,6 +9,9 @@ add_library(smocore STATIC componentThread.cpp component.cpp painfulQuale.cpp + qutex.cpp + lockerAndInvokerBase.cpp + lockSet.cpp # Body body/body.cpp diff --git a/smocore/lockSet.cpp b/smocore/lockSet.cpp new file mode 100644 index 0000000..8b26f8f --- /dev/null +++ b/smocore/lockSet.cpp @@ -0,0 +1,131 @@ +#include +#include + +namespace smo { + +// Template method implementations that need full Qutex definition +// These will be explicitly instantiated for the types we need + +template +template +void LockSet::registerInQutexQueues( + const typename SerializedAsynchronousContinuation::template LockerAndInvoker &lockvoker + ) +{ + /** EXPLANATION: + * Register the lockvoker with each Qutex and store the returned + * iterator to its place within each Qutex's queue. We store the + * iterator so that we can quickly move the lockvoker around within + * the queue, and eventually, erase() it when we acquire all the + * locks. + * + * We create a copy of the Lockvoker and then give sh_ptrs to that + * *COPY*, to each Qutex's internal queue. This enables us to keep + * the AsyncContinuation sh_ptr (which the Lockvoker contains within + * itself) alive without wasting too much memory. + * + * This way the io_service objects can remove the lockvoker from + * their queues and there'll be a copy of the lockvoker in each + * Qutex's queue. + * + * For non-serialized, posted continuations, they won't be removed + * from the io_service queue until they're executed, so there's no + * need to create copies of them. Lockvokers are removed from their + * io_service, potentially without being executed if they fail to + * acquire all locks. + */ + auto sharedLockvoker = std::make_shared< + typename SerializedAsynchronousContinuation::template LockerAndInvoker>(lockvoker); + + for (auto& lockUsageDesc : locks) + { + lockUsageDesc.second = lockUsageDesc.first.get().registerInQueue( + sharedLockvoker); + } + + registeredInQutexQueues = true; +} + +template +bool LockSet::tryAcquireOrBackOff(LockerAndInvokerBase &lockvoker) +{ + if (!registeredInQutexQueues) + { + throw std::runtime_error( + std::string(__func__) + + ": LockSet::tryAcquireOrBackOff() called but not registered in " + "Qutex queues"); + } + if (allLocksAcquired) + { + throw std::runtime_error( + std::string(__func__) + + ": LockSet::tryAcquireOrBackOff() called but allLocksAcquired " + "is already true"); + } + + // Try to acquire all required locks + int nAcquired = 0; + const int nRequiredLocks = static_cast(locks.size()); + for (auto& lockUsageDesc : locks) + { + if (!lockUsageDesc.first.get().tryAcquire( + lockvoker, nRequiredLocks)) + { + break; + } + + nAcquired++; + } + + if (nAcquired < nRequiredLocks) + { + // Release any locks we managed to acquire + for (int i = 0; i < nAcquired; i++) { + locks[i].first.get().backoff(lockvoker, nRequiredLocks); + } + + return false; + } + + allLocksAcquired = true; + return true; +} + +template +void LockSet::unregisterFromQutexQueues() +{ + // Unregister from all qutex queues + for (auto& lockUsageDesc : locks) + { + auto it = lockUsageDesc.second; + lockUsageDesc.first.get().unregisterFromQueue(it); + } +} + +template +void LockSet::release(LockerAndInvokerBase &lockvoker) +{ + if (!registeredInQutexQueues) + { + throw std::runtime_error( + std::string(__func__) + + ": LockSet::release() called but not registered in Qutex " + "queues"); + } + + if (!allLocksAcquired) + { + throw std::runtime_error( + std::string(__func__) + + ": LockSet::release() called but allLocksAcquired is false"); + } + + for (auto& lockUsageDesc : locks) { + lockUsageDesc.first.get().release(); + } + + allLocksAcquired = false; +} + +} // namespace smo diff --git a/smocore/lockerAndInvokerBase.cpp b/smocore/lockerAndInvokerBase.cpp new file mode 100644 index 0000000..f375ccc --- /dev/null +++ b/smocore/lockerAndInvokerBase.cpp @@ -0,0 +1,5 @@ +#include + +namespace smo { + +} // namespace smo diff --git a/smocore/qutex.cpp b/smocore/qutex.cpp new file mode 100644 index 0000000..c96aadc --- /dev/null +++ b/smocore/qutex.cpp @@ -0,0 +1,327 @@ +#include +#include + +namespace smo { + +bool Qutex::tryAcquire( + const LockerAndInvokerBase &tryingLockvoker, int nRequiredLocks + ) +{ + lock.acquire(); + + const int qNItems = static_cast(queue.size()); + + // If queue is empty, this should never happen since we register before trying to acquire + if (qNItems < 1) + { + lock.release(); + + throw std::runtime_error( + std::string(__func__) + + ": tryAcquire called on empty queue - this should never happen"); + } + + // If lock is already owned, fail + if (isOwned) + { + lock.release(); + return false; + } + + /** EXPLANATION: + * Calculate how many items from the rear we need to scan + * + * For nRequiredLocks=1: must be at front (nRearItemsToScan = qNItems, scan all) + * For nRequiredLocks=2: must be in top 50% (nRearItemsToScan = qNItems/2) + * For nRequiredLocks=3: must be in top 66% (nRearItemsToScan = qNItems/3) + * etc. + */ + const int nRearItemsToScan = qNItems / nRequiredLocks; + + // If we're the only item in queue, or if the fraction calculation + // results in 0 rear items to scan, we automatically succeed + if (qNItems == 1 || nRearItemsToScan < 1) + { + isOwned = true; + lock.release(); + return true; + } + + // For single-lock requests, they must be at the front of the queue + if (nRequiredLocks == 1) + { + bool ret = false; + + if ((*queue.front()) == tryingLockvoker) + { + isOwned = true; + ret = true; + } + else { + ret = false; + } + + lock.release(); + return ret; + } + + // For multi-lock requests, check if the lockvoker is in the rear portion + // If it's NOT in the rear portion, then it's in the top X% and should succeed + auto rIt = queue.rbegin(); + auto rEndIt = queue.rend(); + bool foundInRear = false; + + for (int i = 0; i < nRearItemsToScan && rIt != rEndIt; ++rIt, ++i) + { + if ((**rIt) == tryingLockvoker) + { + foundInRear = true; + break; + } + } + + if (foundInRear) + { + // Found in rear portion - not in top X%, so fail + lock.release(); + return false; + } + + // Not found in rear portion - must be in top X%, so succeed + isOwned = true; + lock.release(); + return true; +} + +void Qutex::backoff( + const LockerAndInvokerBase &failedAcquirer, int nRequiredLocks + ) +{ + lock.acquire(); + + const int nQItems = static_cast(queue.size()); + + if (nQItems < 1) + { + lock.release(); + + throw std::runtime_error( + std::string(__func__) + + ": backoff called on empty queue - this should never happen"); + } + + // Rotate queue members if failedAcquirer is at front of queue + if ((*queue.front()) == failedAcquirer && nQItems > 1) + { + /** EXPLANATION: + * Rotate the top LockSet.size() items in the queue by moving + * the failedAcquirer to the last position in the top + * LockSet.size() items within the queue. + * + * I.e: if queue.size()==20, and lockSet.size()==5, then move + * failedAcquirer from the front to the 5th position in the queue, + * which should push the other 4 items forward. + * If queue.size()==3 and LockSet.size()==5, then just + * push_back(failedAcquirer). + * + * It is impossible for a Qutex queue to have only one + * item in it, yet for that Lockvoker item to have failed to + * acquire the Qutex. Being the only item in the ticketQ + * means that you must succeed at acquiring the Qutex. + */ + int indexOfItemToInsertCurrFrontBefore; + if (nQItems > nRequiredLocks) { + indexOfItemToInsertCurrFrontBefore = nRequiredLocks; + } else + { + // -1 means insert at back -- i.e, use list::end() as insertPos. + indexOfItemToInsertCurrFrontBefore = -1; + } + + /* EXPLANATION: + * Rotate them here. + * + * The reason why we do this rotation is to avoid a particular kind + * of deadlock wherein a grid of async requests is perfectly + * configured so as to guarantee that none of them can make any + * forward progress unless they get reordered. + * + * Consider 2 different locks with 2 different items in them + * each, both of which come from 2 particular requests: + * Qutex1: Lockvoker1, Lv2 + * Qutex2: Lv2, Lv1 + * + * Moreover, both of these lockvokers have requiredLocks.size()==2, + * and the particular 2 locks that each one requires are indeed + * Qutex1 and Qutex2. + * + * This particular setup basically means that in TL1's queue, Lv1 + * will wakeup since it's at the front of TL1. It'll successfully + * acquire TL1 (since it's at the front), and then it'll try to + * acquire TL2. But since Lv1 isn't in the top 50% of items in TL2's + * queue, Lv1 will fail to acquire TL2. + * + * Then similarly, in TL2's queue, Lv2 will wakeup since it's at + * the front. Again, it'll successfully acquire TL2 since it's at + * the front of TL2's queue. But then it'll try to acquire TL1. + * Since it's not in the top 50% of TL1's enqueued items, it'll fail + * to acquire TL1. + * + * N.B: This type of perfectly ordered deadlock can occur in any + * kind of NxN situation where ticketQ.size()==requiredLocks.size(). + * That could be 4x4, 5x5, 6x6, etc. It doesn't happen in 1x1 + * because a Lockvoker that only requires one lock will always just + * succeed if it's at the front of its queue. + * + * This state of affairs is stable and will persist unless these + * queues are reordered in some way. Hence: that's why we rotate the + * items in a QutexQ after backing off of it. Backing off means + * Not necessarily that the calling LockVoker failed to acquire + * THIS PARTICULAR Qutex, but rather than it failed to acquire + * ALL of its required locks. + * + * Hence, if we are backing out, we should also rotate the items + * in the queue if the current front item is the failed acquirer. + * So that's why we do this rotation here. + */ + + // Find the iterator for the failed acquirer (which is at the front) + auto frontIt = queue.begin(); + + // Find the position to insert before using indexOfItemToInsertCurrFrontBefore + auto insertPos = queue.begin(); + if (indexOfItemToInsertCurrFrontBefore == -1) + { + // -1 means insert at the back (before end()) + insertPos = queue.end(); + } + else + { + // Move to the specified position (0-based index) + for ( + int i = 0; + i < indexOfItemToInsertCurrFrontBefore + && insertPos != queue.end(); ++i) + { + ++insertPos; + } + } + + /** NOTE: + * According to https://en.cppreference.com/w/cpp/container/list/splice: + * "No iterators or references become invalidated. If *this and other + * refer to different objects, the iterators to the transferred elements + * now refer into *this, not into other." + * + * So our stored iterator inside of LockSet will still be valid after + * the splice, and we can use it to unregister the lockvoker later on. + */ + queue.splice(insertPos, queue, frontIt); + } + + isOwned = false; + LockerAndInvokerBase &newFront = *queue.front(); + + lock.release(); + + /** EXPLANATION: + * Why should this never happen? Well, if we were at the front of the queue + * and we failed to acquire the lock, we should have been rotated away from + * the front. On the other hand, if we were not at the front of the queue + * and we failed to acquire the lock, then we weren't at the front of the + * queue to begin with. + * The exception is if the queue has only one item in it. + * + * Hence there ought to be no way for the failedAcquirer to be at the front + * of the queue at this point UNLESS the queue has only one item in it. + */ + if (newFront == failedAcquirer && nQItems > 1) + { + throw std::runtime_error( + std::string(__func__) + + ": Failed acquirer is at the front of the queue at the end of " + "backoff, yet nQItems > 1 - this should never happen"); + } + + /** EXPLANATION: + * We should always awaken whoever is at the front of the queue, even if + * we didn't rotate. Why? Consider this scenario: + * + * Lv1 has LockSet.size==1. Lv2 has LockSet.size==3. + * Lv1's required lock overlaps with Lv2's set of 3 required locks. + * Lv1 registers itself in its 1 qutex's queue. + * Lv2 registers itself in all 3 of its qutexes' queues. + * Lv2 acquires the lock that it needs in common with Lv1. + * (Assume that Lv2 was not at the front of the common qutex's + * internal queue -- it only needed to be in the top 66%.) + * Lv1 tries to acquire the common lock and fails. It gets taken off of + * its io_service. It's now asleep until it gets + * re-added into an io_service. + * Lv2 fails to acquire the other 2 locks it needs and backoff()s from + * the common lock it shares with Lv1. + * + * If Lv2 does NOT awaken the item at the front of the common lock's + * queue (aka: Lv1), then Lv1 is doomed to never wake up again. + * + * Hence: backout() callers should always wake up the lockvoker at the + * front of their queue before leaving. + * + * The exception is if the item at the front is the backout() caller + * itself. This can happen if, for example a multi-locking lockvoker + * is backing off of a qutex within which it's the only waiter. + */ + if (nQItems > 1) { + newFront.awaken(); + } +} + +void Qutex::release() +{ + lock.acquire(); + + isOwned = false; + + // It's possible for there to be 0 items left in queue after unregistering. + if (queue.empty()) + { + lock.release(); + return; + } + + /** EXPLANATION: + * It would be nice to be able to optimize by only awakening if the + * release()ing lockvoker was at the front of the qutexQ, but if we + * don't unconditionally wakeup() the front item, we could get lost + * wakeups. Consider: + * + * Lv1 only has 1 requiredLock. + * Lv2 has 3 requiredLocks. One of its requiredLocks overlaps with + * Lv1's single requiredLock. So they both share a common lock. + * Lv3's currently owns Lv1 & Lv2's common requiredLock. + * Lv3 release()s that common lock. + * Lv1 happens to be next in queue after Lv3 unregisters itself. + * Lv3 wakes up Lv1. + * Just before Lv1 can acquire the common lock, Lv2 acquires it now, + * because it only needs to be in the top 66% to succeed. + * Lv1 checks the currOwner and sees that it's owned. Lv1 is now + * dequeued from its io_service. It won't be awakened until someone + * awakens it. + * Lv2 finishes its critical section and releas()es the common lock. + * Lv2 was not at the front of the qutexQ, so it does NOT awaken the + * current item at the front. + * + * Thus, Lv1 never gets awakened again. The end. + * This also means that no LockSet.size()==1 lockvoker will ever be able + * to run again since they can only run if they are at the front of the + * qutexQ. + * + * Therefore we must always awaken the front item when releas()ing. + */ + LockerAndInvokerBase &front = *queue.front(); + + lock.release(); + + front.awaken(); +} + +} // namespace smo