dbc9569775
CONT_SET_EXC: Set exception on the continuation, to be rethrown by the caller. CONT_SET_EXC_AND_RET: Convenience which returns immediately after setting the exception.
172 lines
5.2 KiB
C++
172 lines
5.2 KiB
C++
#ifndef SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
|
|
#define SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
|
|
|
|
#include <functional>
|
|
#include <memory>
|
|
#include <atomic>
|
|
#include <componentThread.h>
|
|
#include <lockSet.h>
|
|
#include <asynchronousContinuation.h>
|
|
#include <lockerAndInvokerBase.h>
|
|
|
|
namespace smo {
|
|
|
|
template <class OriginalCbFnT>
|
|
class SerializedAsynchronousContinuation
|
|
: public PostedAsynchronousContinuation<OriginalCbFnT>
|
|
{
|
|
public:
|
|
SerializedAsynchronousContinuation(
|
|
const std::shared_ptr<ComponentThread> &caller,
|
|
OriginalCbFnT originalCbFn,
|
|
std::vector<std::reference_wrapper<SpinLock>> requiredLocks = {})
|
|
: PostedAsynchronousContinuation<OriginalCbFnT>(caller, originalCbFn),
|
|
requiredLocks(*this, std::move(requiredLocks))
|
|
{}
|
|
|
|
template<typename... Args>
|
|
void callOriginalCb(Args&&... args)
|
|
{
|
|
requiredLocks.release();
|
|
PostedAsynchronousContinuation<OriginalCbFnT>::callOriginalCb(
|
|
std::forward<Args>(args)...);
|
|
}
|
|
|
|
public:
|
|
LockSet<OriginalCbFnT> requiredLocks;
|
|
std::atomic<bool> isAwakeOrBeingAwakened{false};
|
|
|
|
/**
|
|
* @brief LockerAndInvoker - Template class for lockvoking mechanism
|
|
*
|
|
* This class wraps a std::bind result and provides locking functionality.
|
|
* When locks cannot be acquired, the object re-posts itself to the io_service
|
|
* queue, implementing the "spinqueueing" pattern.
|
|
*/
|
|
template <class InvocationTargetT>
|
|
class LockerAndInvoker
|
|
: public LockerAndInvokerBase
|
|
{
|
|
public:
|
|
/**
|
|
* @brief Constructor that immediately posts to io_service
|
|
* @param serializedContinuation Reference to the serialized continuation
|
|
* containing LockSet and target io_service
|
|
* @param target The ComponentThread whose io_service to post to
|
|
* @param invocationTarget The std::bind result to invoke when locks are acquired
|
|
*/
|
|
LockerAndInvoker(
|
|
SerializedAsynchronousContinuation<OriginalCbFnT>
|
|
&serializedContinuation,
|
|
const std::shared_ptr<ComponentThread>& target,
|
|
InvocationTargetT invocationTarget)
|
|
: LockerAndInvokerBase(&serializedContinuation),
|
|
serializedContinuation(serializedContinuation),
|
|
target(target),
|
|
invocationTarget(std::move(invocationTarget))
|
|
{
|
|
firstWake();
|
|
}
|
|
|
|
/**
|
|
* @brief Function call operator - tries to acquire locks and either
|
|
* invokes the target or returns (already registered in qutex queues)
|
|
*/
|
|
void operator()()
|
|
{
|
|
if (ComponentThread::getSelf() != target)
|
|
{
|
|
throw std::runtime_error(
|
|
"LockerAndInvoker::operator(): Thread safety violation - "
|
|
"executing on wrong ComponentThread");
|
|
}
|
|
|
|
if (!serializedContinuation.requiredLocks.tryAcquireOrBackOff(
|
|
*this))
|
|
{
|
|
// Just allow this lockvoker to be dropped from its io_service.
|
|
allowAwakening();
|
|
return;
|
|
}
|
|
|
|
/** EXPLANATION:
|
|
* Successfully acquired all locks, so unregister from qutex queues.
|
|
* We do this here so that we can free up queue slots in the qutex
|
|
* queues for other lockvokers that may be waiting to acquire the
|
|
* locks. The size of the qutex queues does matter for other
|
|
* contending lockvokers; and so also does their position in the
|
|
* queues.
|
|
*
|
|
* The alternative is to leave ourself in the queues until we
|
|
* eventually release all locks; and given that we may hold locks
|
|
* even across true async hardware bottlenecks, this could take a
|
|
* long time.
|
|
*
|
|
* Granted, the fact that we own the locks means that even though
|
|
* we've removed ourselves from the queues, other lockvokers still
|
|
* can't acquire the locks anyway.
|
|
*/
|
|
serializedContinuation.requiredLocks.unregisterFromQutexQueues();
|
|
invocationTarget();
|
|
}
|
|
|
|
/**
|
|
* @brief Get the iterator for this lockvoker in the specified Qutex's queue
|
|
* @param qutex The Qutex to get the iterator for
|
|
* @return Iterator pointing to this lockvoker in the Qutex's queue
|
|
*/
|
|
LockerAndInvokerBase::List::iterator
|
|
getLockvokerIteratorForQutex(Qutex& qutex) override
|
|
{
|
|
return serializedContinuation.requiredLocks.getLockUsageDesc(
|
|
qutex).second;
|
|
}
|
|
|
|
/**
|
|
* @brief Awaken this lockvoker by posting it to its io_service
|
|
* @param forceAwaken If true, post even if already awake
|
|
*/
|
|
void awaken(bool forceAwaken = false) override
|
|
{
|
|
bool prevVal = serializedContinuation.isAwakeOrBeingAwakened
|
|
.exchange(true);
|
|
|
|
if (prevVal == true && !forceAwaken)
|
|
{ return; }
|
|
|
|
target->getIoService().post(*this);
|
|
}
|
|
|
|
/**
|
|
* @brief Allow awakening by resetting the awake flag
|
|
*/
|
|
void allowAwakening()
|
|
{ serializedContinuation.isAwakeOrBeingAwakened.store(false); }
|
|
|
|
/**
|
|
* @brief First wake - register in queues and awaken
|
|
*
|
|
* Sets isAwake=true before calling awaken with forceAwaken to ensure
|
|
* that none of the locks we just registered with awaken()s a duplicate
|
|
* copy of this lockvoker on the io_service.
|
|
*/
|
|
void firstWake()
|
|
{
|
|
serializedContinuation.isAwakeOrBeingAwakened.store(true);
|
|
serializedContinuation.requiredLocks.registerInQutexQueues(*this);
|
|
// Force awaken since we just set the flag above
|
|
awaken(true);
|
|
}
|
|
|
|
private:
|
|
SerializedAsynchronousContinuation<OriginalCbFnT>
|
|
&serializedContinuation;
|
|
InvocationTargetT invocationTarget;
|
|
std::shared_ptr<ComponentThread> target;
|
|
};
|
|
};
|
|
|
|
} // namespace smo
|
|
|
|
#endif // SERIALIZED_ASYNCHRONOUS_CONTINUATION_H
|