Qutexes: Implement them and supporting classes

Implements: LockSet, SerializedAsynchronousContinuation,
	LockerAndInvoker, LockerAndInvokerBase, Qutex.

Very big leap in functionality here. See qutexes.md for
an explanation of what we've done.
This commit is contained in:
2025-09-20 18:16:46 -04:00
parent f05c465d61
commit 32179eee5e
10 changed files with 825 additions and 211 deletions
-25
View File
@@ -104,31 +104,6 @@ public:
std::shared_ptr<ComponentThread> caller;
};
template <class OriginalCbFnT>
class SerializedAsynchronousContinuation
: public PostedAsynchronousContinuation<OriginalCbFnT>
{
public:
SerializedAsynchronousContinuation(
const std::shared_ptr<ComponentThread> &caller,
OriginalCbFnT originalCbFn,
std::vector<std::reference_wrapper<SpinLock>> requiredLocks = {})
: PostedAsynchronousContinuation<OriginalCbFnT>(caller, originalCbFn),
requiredLocks(*this, std::move(requiredLocks))
{}
template<typename... Args>
void callOriginalCb(Args&&... args)
{
requiredLocks.release();
PostedAsynchronousContinuation<OriginalCbFnT>::callOriginalCb(
std::forward<Args>(args)...);
}
public:
LockSet<OriginalCbFnT> requiredLocks;
};
} // namespace smo
#endif // ASYNCHRONOUS_CONTINUATION_H
+78 -48
View File
@@ -5,82 +5,112 @@
#include <functional>
#include <atomic>
#include <stdexcept>
#include <utility>
#include <memory>
#include <spinLock.h>
#include <lockerAndInvokerBase.h>
namespace smo {
// Forward declarations
template <class OriginalCbFnT>
class SerializedAsynchronousContinuation;
class Qutex;
/**
* @brief LockSet - Manages a collection of locks for acquisition/release
*/
template <class OriginalCbFnT>
class LockSet
{
public:
/** EXPLANATION:
* Tracks both the Qutex that must be acquired, as well as the parent
* LockerAndInvoker that this LockSet has registered into that Qutex's
* queue.
*/
typedef std::pair<
std::reference_wrapper<Qutex>,
typename LockerAndInvokerBase::List::iterator> LockUsageDesc;
public:
/**
* @brief Constructor
* @param requiredLocks Vector of lock references that must be acquired
* @param parentContinuation Reference to the parent
* SerializedAsynchronousContinuation
* @param qutexes Vector of Qutex references that must be acquired
*/
LockSet(std::vector<std::reference_wrapper<SpinLock>> requiredLocks = {})
: requiredLocks(std::move(requiredLocks)), allLocksAcquired(false)
{}
LockSet(
SerializedAsynchronousContinuation<OriginalCbFnT> &parentContinuation,
std::vector<std::reference_wrapper<Qutex>> qutexes = {})
: parentContinuation(parentContinuation), allLocksAcquired(false),
registeredInQutexQueues(false)
{
/* Convert Qutex references to LockUsageDesc (iterators will be filled
* in during registration)
*/
locks.reserve(qutexes.size());
for (auto& qutexRef : qutexes)
{
locks.emplace_back(
qutexRef,
typename LockerAndInvokerBase::List::iterator{});
}
}
/**
* @brief Try to acquire all locks in order
* @brief Register the LockSet with all its Qutex locks
* @param lockvoker The LockerAndInvoker to register with each Qutex
*
* EXPLANATION:
* I'm not sure an unregisterFromQutexQueues() method is needed.
* Why? Because if an async sequence can't acquire all locks, it will
* simply never leave the qutexQ until it eventually does. The only other
* time it will leave the qutexQ is when the program terminates.
*
* I'm not sure we'll actually cancal all in-flight async sequences --
* and especially not all those that aren't even in any io_service queues.
* To whatever extent these objects get cleaned up, they'll probably be
* cleaned up in the qutexQ's std::list destructor -- and that won't
* execute any fancy cleanup logic. It'll just clear() out the list.
*/
template <class InvocationTargetT>
void registerInQutexQueues(
const typename SerializedAsynchronousContinuation<OriginalCbFnT>::template LockerAndInvoker<InvocationTargetT> &lockvoker);
/**
* @brief Try to acquire all locks in order; back off if acquisition fails
* @param lockvoker The LockerAndInvoker attempting to acquire the locks
* @return true if all locks were acquired, false otherwise
*/
bool tryAcquire()
{
if (allLocksAcquired)
{
throw std::runtime_error(
std::string(__func__) +
": LockSet::tryAcquire() called but allLocksAcquired is "
"already true");
}
// Try to acquire all required locks
int nAcquired = 0;
for (auto& lockRef : requiredLocks)
{
if (!lockRef.get().tryAcquire()) { break; }
nAcquired++;
}
if (nAcquired < static_cast<int>(requiredLocks.size()))
{
// Release any locks we managed to acquire
for (int i = 0; i < nAcquired; i++) {
requiredLocks[i].get().release();
}
allLocksAcquired = false;
return false;
}
allLocksAcquired = true;
return true;
}
bool tryAcquireOrBackOff(LockerAndInvokerBase &lockvoker);
void unregisterFromQutexQueues();
/**
* @brief Release all locks
* @param lockvoker The LockerAndInvoker that owns the locks
*/
void release()
void release(LockerAndInvokerBase &lockvoker);
const LockUsageDesc &getLockUsageDesc(const Qutex &criterionLock) const
{
if (!allLocksAcquired)
for (auto& lockUsageDesc : locks)
{
throw std::runtime_error(
std::string(__func__) +
": LockSet::release() called but allLocksAcquired is false");
if (&lockUsageDesc.first.get() == &criterionLock) {
return lockUsageDesc;
}
}
for (auto& lockRef : requiredLocks) {
lockRef.get().release();
}
allLocksAcquired = false;
// Should never happen if the LockSet is properly constructed
throw std::runtime_error(
std::string(__func__) +
": Qutex not found in this LockSet");
}
private:
std::vector<std::reference_wrapper<SpinLock>> requiredLocks;
std::atomic<bool> allLocksAcquired;
SerializedAsynchronousContinuation<OriginalCbFnT> &parentContinuation;
std::vector<LockUsageDesc> locks;
bool allLocksAcquired, registeredInQutexQueues;
};
} // namespace smo
+82
View File
@@ -0,0 +1,82 @@
#ifndef LOCKER_AND_INVOKER_BASE_H
#define LOCKER_AND_INVOKER_BASE_H
#include <functional>
#include <list>
#include <memory>
namespace smo {
// Forward declaration
class Qutex;
/**
* @brief LockerAndInvokerBase - Base class for lockvoking mechanism
*
* This base class contains the common functionality needed by Qutex,
* including the serialized continuation reference and comparison operators.
*/
class LockerAndInvokerBase
{
public:
/**
* @brief Constructor
* @param serializedContinuationVaddr Raw pointer to the serialized continuation
*/
explicit LockerAndInvokerBase(const void* serializedContinuationVaddr)
: serializedContinuationVaddr(serializedContinuationVaddr)
{}
/**
* @brief Typedef for list of LockerAndInvokerBase shared pointers
*/
typedef std::list<std::shared_ptr<LockerAndInvokerBase>> List;
/**
* @brief Get the iterator for this lockvoker in the specified Qutex's queue
* @param qutex The Qutex to get the iterator for
* @return Iterator pointing to this lockvoker in the Qutex's queue
*/
virtual List::iterator getLockvokerIteratorForQutex(Qutex& qutex) = 0;
/**
* @brief Awaken this lockvoker by posting it to its io_service
* @param forceAwaken If true, post even if already awake
*/
virtual void awaken(bool forceAwaken = false) = 0;
/**
* @brief Equality operator
*
* Compare by the address of the continuation objects. Why?
* Because there's no guarantee that the lockvoker object that was
* passed in by the io_service invocation is the same object as that
* which is in the qutexQs. Especially because we make_shared() a
* copy when registerInQutexQueues()ing.
*
* Generally when we "wake" a lockvoker by enqueuing it, boost's
* io_service::post will copy the lockvoker object.
*/
bool operator==(const LockerAndInvokerBase &other) const
{
return serializedContinuationVaddr == other.serializedContinuationVaddr;
}
/**
* @brief Inequality operator
*/
bool operator!=(const LockerAndInvokerBase &other) const
{
return serializedContinuationVaddr != other.serializedContinuationVaddr;
}
protected:
/* Never let this monstrosity be seen beyond this class's scope.
* Remember what I've taught you, quasi-modo?
*/
const void* serializedContinuationVaddr;
};
} // namespace smo
#endif // LOCKER_AND_INVOKER_BASE_H
-67
View File
@@ -1,67 +0,0 @@
#ifndef LOCKVOKER_H
#define LOCKVOKER_H
#include <functional>
#include <boost/asio.hpp>
#include <componentThread.h>
#include <lockSpec.h>
#include <asynchronousContinuation.h>
namespace smo {
/**
* @brief LockerAndInvoker - Template class for lockvoking mechanism
*
* This class wraps a std::bind result and provides locking functionality.
* When locks cannot be acquired, the object re-posts itself to the io_service
* queue, implementing the "spinqueueing" pattern.
*/
template <class InvocationTargetT>
class LockerAndInvoker
{
public:
/**
* @brief Constructor that immediately posts to io_service
* @param serializedContinuation Reference to the serialized continuation
* containing LockSet and target io_service
* @param invocationTarget The std::bind result to invoke when locks are acquired
*/
LockerAndInvoker(
SerializedAsynchronousContinuation<void()>& serializedContinuation,
InvocationTargetT invocationTarget)
: serializedContinuation(serializedContinuation),
invocationTarget(std::move(invocationTarget))
{
post();
}
/**
* @brief Post this object to the io_service
*/
void post()
{ serializedContinuation.caller->getIoService().post(*this); }
/**
* @brief Function call operator - tries to acquire locks and either invokes
* the target or re-posts itself
*/
void operator()()
{
if (!serializedContinuation.tryAcquire())
{
// Re-post ourselves to try again later
post();
return;
}
invocationTarget();
}
private:
SerializedAsynchronousContinuation<void()>& serializedContinuation;
InvocationTargetT invocationTarget;
};
} // namespace smo
#endif // LOCKVOKER_H
+27 -71
View File
@@ -4,13 +4,13 @@
#include <list>
#include <atomic>
#include <memory>
#include <string>
#include <stdexcept>
#include <spinLock.h>
#include <lockerAndInvokerBase.h>
namespace smo {
// Forward declarations
class LockerAndInvoker;
/**
* @brief Qutex - Queue-based mutex for asynchronous lock management
*
@@ -21,30 +21,22 @@ class LockerAndInvoker;
class Qutex
{
public:
typedef std::list<std::shared_ptr<LockerAndInvoker>> LockerAndInvokerList;
/**
* @brief Constructor
*/
Qutex()
{
// TODO: Initialize member variables
}
: isOwned(false)
{}
/**
* @brief Register a lockvoker in the queue
* @param lockvoker The lockvoker to register
* @return Iterator pointing to the registered lockvoker in the queue
*/
LockerAndInvokerList::iterator registerInQueue(
const std::shared_ptr<LockerAndInvoker> &lockvoker
LockerAndInvokerBase::List::iterator registerInQueue(
const std::shared_ptr<LockerAndInvokerBase> &lockvoker
)
{
// TODO: Implement registration logic
// - Acquire the spinlock
// - Insert lockvoker at the rear of the queue
// - Return iterator to the inserted element
// - Release the spinlock
lock.acquire();
auto it = queue.insert(queue.end(), lockvoker);
lock.release();
@@ -54,14 +46,21 @@ public:
/**
* @brief Unregister a lockvoker from the queue
* @param it Iterator pointing to the lockvoker to unregister
* @param shouldLock Whether to acquire the spinlock before erasing (default: true)
*/
void unregisterFromQueue(LockerAndInvokerList::iterator it)
void unregisterFromQueue(
LockerAndInvokerBase::List::iterator it, bool shouldLock = true
)
{
// TODO: Implement unregistration logic
// - Acquire the spinlock
// - Erase the element at the given iterator
// - Release the spinlock
(void)it; // Suppress unused parameter warning
if (shouldLock)
{
lock.acquire();
queue.erase(it);
lock.release();
}
else {
queue.erase(it);
}
}
/**
@@ -70,68 +69,25 @@ public:
* @param nRequiredLocks Number of locks required by the lockvoker's LockSet
* @return true if the lock was successfully acquired, false otherwise
*/
bool tryAcquire(LockerAndInvoker &tryingLockvoker, int nRequiredLocks)
{
// TODO: Implement acquisition logic
// - Acquire the spinlock
// - Check if lock is already owned
// - For single-lock requests, grant immediately if available
// - For multi-lock requests, check if in top X% of queue
// - Set isOwned flag if successful
// - Release the spinlock
// - Return success/failure
(void)tryingLockvoker; // Suppress unused parameter warning
(void)nRequiredLocks; // Suppress unused parameter warning
return false; // Placeholder return value
}
bool tryAcquire(
const LockerAndInvokerBase &tryingLockvoker, int nRequiredLocks);
/**
* @brief Handle backoff when a lockvoker fails to acquire all required locks
* @param failedAcquirer The lockvoker that failed to acquire all locks
* @param nRequiredLocks Number of locks required by the lockvoker's LockSet
*/
void backoff(LockerAndInvoker &failedAcquirer)
{
// TODO: Implement backoff logic
// - Acquire the spinlock
// - If failedAcquirer is at front, rotate queue items
// - Move failedAcquirer to appropriate position in queue
// - Release the spinlock
// - Wake up the new front item
(void)failedAcquirer; // Suppress unused parameter warning
}
void backoff(const LockerAndInvokerBase &failedAcquirer, int nRequiredLocks);
/**
* @brief Release the lock and wake up the next waiting lockvoker
* @param prevOwner The lockvoker that previously owned the lock
*/
void release(LockerAndInvoker &prevOwner)
{
// TODO: Implement release logic
// - Acquire the spinlock
// - Unregister the previous owner from the queue
// - Clear the isOwned flag
// - Get the new front item
// - Release the spinlock
// - Wake up the new front item (conditionally)
(void)prevOwner; // Suppress unused parameter warning
}
/**
* @brief Wake up a specific lockvoker
* @param lockvoker The lockvoker to wake up
*/
void wakeUp(LockerAndInvoker &lockvoker)
{
// TODO: Implement wake-up logic
// - Post the lockvoker's invocation to its io_service
// - This will cause the lockvoker to retry acquisition
(void)lockvoker; // Suppress unused parameter warning
}
void release();
public:
SpinLock lock;
std::atomic<bool> isOwned;
LockerAndInvokerList queue;
LockerAndInvokerBase::List queue;
bool isOwned;
};
} // namespace smo
@@ -0,0 +1,172 @@
#ifndef SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
#define SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
#include <functional>
#include <memory>
#include <atomic>
#include <componentThread.h>
#include <lockSet.h>
#include <asynchronousContinuation.h>
#include <lockerAndInvokerBase.h>
namespace smo {
template <class OriginalCbFnT>
class SerializedAsynchronousContinuation
: public PostedAsynchronousContinuation<OriginalCbFnT>
{
public:
SerializedAsynchronousContinuation(
const std::shared_ptr<ComponentThread> &caller,
OriginalCbFnT originalCbFn,
std::vector<std::reference_wrapper<SpinLock>> requiredLocks = {})
: PostedAsynchronousContinuation<OriginalCbFnT>(caller, originalCbFn),
requiredLocks(*this, std::move(requiredLocks))
{}
template<typename... Args>
void callOriginalCb(Args&&... args)
{
requiredLocks.release();
PostedAsynchronousContinuation<OriginalCbFnT>::callOriginalCb(
std::forward<Args>(args)...);
}
public:
LockSet<OriginalCbFnT> requiredLocks;
std::atomic<bool> isAwakeOrBeingAwakened{false};
/**
* @brief LockerAndInvoker - Template class for lockvoking mechanism
*
* This class wraps a std::bind result and provides locking functionality.
* When locks cannot be acquired, the object re-posts itself to the io_service
* queue, implementing the "spinqueueing" pattern.
*/
template <class InvocationTargetT>
class LockerAndInvoker
: public LockerAndInvokerBase
{
public:
/**
* @brief Constructor that immediately posts to io_service
* @param serializedContinuation Reference to the serialized continuation
* containing LockSet and target io_service
* @param target The ComponentThread whose io_service to post to
* @param invocationTarget The std::bind result to invoke when locks are acquired
*/
LockerAndInvoker(
SerializedAsynchronousContinuation<OriginalCbFnT>
&serializedContinuation,
const std::shared_ptr<ComponentThread>& target,
InvocationTargetT invocationTarget)
: LockerAndInvokerBase(&serializedContinuation),
serializedContinuation(serializedContinuation),
target(target),
invocationTarget(std::move(invocationTarget))
{
firstWake();
}
/**
* @brief Function call operator - tries to acquire locks and either
* invokes the target or returns (already registered in qutex queues)
*/
void operator()()
{
if (ComponentThread::getSelf() != target)
{
throw std::runtime_error(
"LockerAndInvoker::operator(): Thread safety violation - "
"executing on wrong ComponentThread");
}
if (!serializedContinuation.requiredLocks.tryAcquireOrBackOff(
*this))
{
// Just allow this lockvoker to be dropped from its io_service.
allowAwakening();
return;
}
/** EXPLANATION:
* Successfully acquired all locks, so unregister from qutex queues.
* We do this here so that we can free up queue slots in the qutex
* queues for other lockvokers that may be waiting to acquire the
* locks. The size of the qutex queues does matter for other
* contending lockvokers; and so also does their position in the
* queues.
*
* The alternative is to leave ourself in the queues until we
* eventually release all locks; and given that we may hold locks
* even across true async hardware bottlenecks, this could take a
* long time.
*
* Granted, the fact that we own the locks means that even though
* we've removed ourselves from the queues, other lockvokers still
* can't acquire the locks anyway.
*/
serializedContinuation.requiredLocks.unregisterFromQutexQueues();
invocationTarget();
}
/**
* @brief Get the iterator for this lockvoker in the specified Qutex's queue
* @param qutex The Qutex to get the iterator for
* @return Iterator pointing to this lockvoker in the Qutex's queue
*/
LockerAndInvokerBase::List::iterator
getLockvokerIteratorForQutex(Qutex& qutex) override
{
return serializedContinuation.requiredLocks.getLockUsageDesc(
qutex).second;
}
/**
* @brief Awaken this lockvoker by posting it to its io_service
* @param forceAwaken If true, post even if already awake
*/
void awaken(bool forceAwaken = false) override
{
bool prevVal = serializedContinuation.isAwakeOrBeingAwakened
.exchange(true);
if (prevVal == true && !forceAwaken)
{ return; }
target->getIoService().post(*this);
}
/**
* @brief Allow awakening by resetting the awake flag
*/
void allowAwakening()
{ serializedContinuation.isAwakeOrBeingAwakened.store(false); }
/**
* @brief First wake - register in queues and awaken
*
* Sets isAwake=true before calling awaken with forceAwaken to ensure
* that none of the locks we just registered with awaken()s a duplicate
* copy of this lockvoker on the io_service.
*/
void firstWake()
{
serializedContinuation.isAwakeOrBeingAwakened.store(true);
serializedContinuation.requiredLocks.registerInQutexQueues(*this);
// Force awaken since we just set the flag above
awaken(true);
}
private:
SerializedAsynchronousContinuation<OriginalCbFnT>
&serializedContinuation;
InvocationTargetT invocationTarget;
std::shared_ptr<ComponentThread> target;
};
};
} // namespace smo
#endif // SERIALIZED_ASYNCHRONOUS_CONTINUATION_H