Async: new hierachy; manages reply posting and unlocking
Async: Use new [Non]PostedAsyncCont and callOriginalCb This new hierarchy of classes gives us a central mechanism for managing both reply-posting and lockSpec unlocking. * callOriginalCb: Now uses a modern C++ variadic template design enabling it to handle both direct calling and std::bind() re-binding of an arbitrary number of arguments from the caller. This enables us to mostly eliminate the repeated, bespoke definitions of callOriginalCb littered throughout the codebase. We've also propagated these changes throughout the codebase in this patch.
This commit is contained in:
@@ -72,7 +72,8 @@ std::optional<std::shared_ptr<Device>> DeviceManager::getDevice(
|
|||||||
|
|
||||||
// GetOrCreateDeviceReq nested class implementation
|
// GetOrCreateDeviceReq nested class implementation
|
||||||
class DeviceManager::GetOrCreateDeviceReq
|
class DeviceManager::GetOrCreateDeviceReq
|
||||||
: public smo::AsynchronousContinuation<livoxProto1_getOrCreateDeviceReqCbFn>
|
: public smo::NonPostedAsynchronousContinuation<
|
||||||
|
livoxProto1_getOrCreateDeviceReqCbFn>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
DeviceManager& deviceManager;
|
DeviceManager& deviceManager;
|
||||||
@@ -84,14 +85,14 @@ public:
|
|||||||
DeviceManager& mgr,
|
DeviceManager& mgr,
|
||||||
std::shared_ptr<Device> device,
|
std::shared_ptr<Device> device,
|
||||||
livoxProto1_getOrCreateDeviceReqCbFn cb)
|
livoxProto1_getOrCreateDeviceReqCbFn cb)
|
||||||
: smo::AsynchronousContinuation<
|
: smo::NonPostedAsynchronousContinuation<
|
||||||
livoxProto1_getOrCreateDeviceReqCbFn>(std::move(cb)),
|
livoxProto1_getOrCreateDeviceReqCbFn>(std::move(cb)),
|
||||||
deviceManager(mgr), pendingDevice(device)
|
deviceManager(mgr), pendingDevice(device)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
// Public accessor for the original callback
|
// Public accessor for the original callback
|
||||||
void callOriginalCallback(bool success, std::shared_ptr<Device> device)
|
void callOriginalCallback(bool success, std::shared_ptr<Device> device)
|
||||||
{ originalCbFn(success, device); }
|
{ callOriginalCb(success, device); }
|
||||||
|
|
||||||
void callOriginalCallbackWithFailure()
|
void callOriginalCallbackWithFailure()
|
||||||
{ callOriginalCallback(false, nullptr); }
|
{ callOriginalCallback(false, nullptr); }
|
||||||
@@ -176,7 +177,8 @@ void DeviceManager::getOrCreateDeviceReq(
|
|||||||
}
|
}
|
||||||
|
|
||||||
class DeviceManager::DestroyDeviceReq
|
class DeviceManager::DestroyDeviceReq
|
||||||
: public smo::AsynchronousContinuation<livoxProto1_destroyDeviceReqCbFn>
|
: public smo::NonPostedAsynchronousContinuation<
|
||||||
|
livoxProto1_destroyDeviceReqCbFn>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
DeviceManager& deviceManager;
|
DeviceManager& deviceManager;
|
||||||
@@ -187,14 +189,14 @@ public:
|
|||||||
DeviceManager& mgr,
|
DeviceManager& mgr,
|
||||||
std::shared_ptr<Device> device,
|
std::shared_ptr<Device> device,
|
||||||
livoxProto1_destroyDeviceReqCbFn cb)
|
livoxProto1_destroyDeviceReqCbFn cb)
|
||||||
: smo::AsynchronousContinuation<
|
: smo::NonPostedAsynchronousContinuation<
|
||||||
livoxProto1_destroyDeviceReqCbFn>(std::move(cb)),
|
livoxProto1_destroyDeviceReqCbFn>(std::move(cb)),
|
||||||
deviceManager(mgr), pendingDevice(device)
|
deviceManager(mgr), pendingDevice(device)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
// Public accessor for the original callback
|
// Public accessor for the original callback
|
||||||
void callOriginalCallback(bool success)
|
void callOriginalCallback(bool success)
|
||||||
{ originalCbFn(success); }
|
{ callOriginalCb(success); }
|
||||||
|
|
||||||
void callOriginalCallbackWithFailure()
|
void callOriginalCallbackWithFailure()
|
||||||
{ callOriginalCallback(false); }
|
{ callOriginalCallback(false); }
|
||||||
|
|||||||
@@ -120,7 +120,7 @@ Device::~Device()
|
|||||||
* This class manages the overall device connection process including handshake and heartbeat setup
|
* This class manages the overall device connection process including handshake and heartbeat setup
|
||||||
*/
|
*/
|
||||||
class Device::ConnectReq
|
class Device::ConnectReq
|
||||||
: public smo::AsynchronousContinuation<Device::connectReqCbFn>
|
: public smo::NonPostedAsynchronousContinuation<Device::connectReqCbFn>
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
Device& device;
|
Device& device;
|
||||||
@@ -128,7 +128,7 @@ private:
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
ConnectReq(Device& dev, Device::connectReqCbFn cb)
|
ConnectReq(Device& dev, Device::connectReqCbFn cb)
|
||||||
: smo::AsynchronousContinuation<Device::connectReqCbFn>(
|
: smo::NonPostedAsynchronousContinuation<Device::connectReqCbFn>(
|
||||||
std::move(cb)), device(dev)
|
std::move(cb)), device(dev)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
@@ -148,7 +148,7 @@ public:
|
|||||||
// Store the handshake FD for heartbeats
|
// Store the handshake FD for heartbeats
|
||||||
context->device.heartbeatFd = fd;
|
context->device.heartbeatFd = fd;
|
||||||
context->device.startHeartbeat();
|
context->device.startHeartbeat();
|
||||||
context->originalCbFn(true);
|
context->callOriginalCb(true);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -177,7 +177,7 @@ public:
|
|||||||
// Store the handshake FD for heartbeats
|
// Store the handshake FD for heartbeats
|
||||||
context->device.heartbeatFd = fd;
|
context->device.heartbeatFd = fd;
|
||||||
context->device.startHeartbeat();
|
context->device.startHeartbeat();
|
||||||
context->originalCbFn(true);
|
context->callOriginalCb(true);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -208,7 +208,7 @@ public:
|
|||||||
{
|
{
|
||||||
if (error)
|
if (error)
|
||||||
{
|
{
|
||||||
context->originalCbFn(false);
|
context->callOriginalCb(false);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -234,12 +234,12 @@ public:
|
|||||||
context->device.discoveredDevice.ipAddr = ipAddr;
|
context->device.discoveredDevice.ipAddr = ipAddr;
|
||||||
context->device.heartbeatFd = fd;
|
context->device.heartbeatFd = fd;
|
||||||
context->device.startHeartbeat();
|
context->device.startHeartbeat();
|
||||||
context->originalCbFn(true);
|
context->callOriginalCb(true);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// All connection attempts failed
|
// All connection attempts failed
|
||||||
context->originalCbFn(false);
|
context->callOriginalCb(false);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -261,7 +261,8 @@ void Device::connectReq(Device::connectReqCbFn callback)
|
|||||||
}
|
}
|
||||||
|
|
||||||
class Device::ConnectToKnownDeviceReq
|
class Device::ConnectToKnownDeviceReq
|
||||||
: public smo::AsynchronousContinuation<Device::connectToKnownDeviceReqCbFn>
|
: public smo::NonPostedAsynchronousContinuation<
|
||||||
|
Device::connectToKnownDeviceReqCbFn>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
Device& device;
|
Device& device;
|
||||||
@@ -269,13 +270,13 @@ public:
|
|||||||
std::shared_ptr<livoxProto1::comms::DiscoveredDevice> deviceInfo;
|
std::shared_ptr<livoxProto1::comms::DiscoveredDevice> deviceInfo;
|
||||||
|
|
||||||
ConnectToKnownDeviceReq(Device& dev, Device::connectToKnownDeviceReqCbFn cb)
|
ConnectToKnownDeviceReq(Device& dev, Device::connectToKnownDeviceReqCbFn cb)
|
||||||
: smo::AsynchronousContinuation<Device::connectToKnownDeviceReqCbFn>(
|
: smo::NonPostedAsynchronousContinuation<
|
||||||
std::move(cb)), device(dev)
|
Device::connectToKnownDeviceReqCbFn>(std::move(cb)), device(dev)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
// Public accessor for the original callback
|
// Public accessor for the original callback
|
||||||
void callOriginalCallback(bool success, const std::string& ipAddr, int fd)
|
void callOriginalCallback(bool success, const std::string& ipAddr, int fd)
|
||||||
{ originalCbFn(success, ipAddr, fd); }
|
{ callOriginalCb(success, ipAddr, fd); }
|
||||||
|
|
||||||
// Wrapper for failure cases
|
// Wrapper for failure cases
|
||||||
void callOriginalCallbackWithFailure()
|
void callOriginalCallbackWithFailure()
|
||||||
@@ -363,7 +364,7 @@ void Device::connectToKnownDeviceReq(
|
|||||||
}
|
}
|
||||||
|
|
||||||
class Device::ConnectByDeviceIdentifierReq
|
class Device::ConnectByDeviceIdentifierReq
|
||||||
: public smo::AsynchronousContinuation<
|
: public smo::NonPostedAsynchronousContinuation<
|
||||||
Device::connectByDeviceIdentifierReqCbFn>
|
Device::connectByDeviceIdentifierReqCbFn>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@@ -372,13 +373,14 @@ public:
|
|||||||
|
|
||||||
ConnectByDeviceIdentifierReq(
|
ConnectByDeviceIdentifierReq(
|
||||||
Device& dev, Device::connectByDeviceIdentifierReqCbFn cb)
|
Device& dev, Device::connectByDeviceIdentifierReqCbFn cb)
|
||||||
: smo::AsynchronousContinuation<Device::connectByDeviceIdentifierReqCbFn>(
|
: smo::NonPostedAsynchronousContinuation<
|
||||||
std::move(cb)), device(dev)
|
Device::connectByDeviceIdentifierReqCbFn>(
|
||||||
|
std::move(cb)), device(dev)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
// Public accessor for the original callback
|
// Public accessor for the original callback
|
||||||
void callOriginalCallback(bool success, const std::string& ipAddr, int fd)
|
void callOriginalCallback(bool success, const std::string& ipAddr, int fd)
|
||||||
{ originalCbFn(success, ipAddr, fd); }
|
{ callOriginalCb(success, ipAddr, fd); }
|
||||||
|
|
||||||
// Wrapper for failure cases
|
// Wrapper for failure cases
|
||||||
void callOriginalCallbackWithFailure()
|
void callOriginalCallbackWithFailure()
|
||||||
@@ -445,7 +447,8 @@ void Device::connectByDeviceIdentifierReq(
|
|||||||
}
|
}
|
||||||
|
|
||||||
class Device::ExecuteHandshakeReq
|
class Device::ExecuteHandshakeReq
|
||||||
: public smo::AsynchronousContinuation<Device::executeHandshakeReqCbFn>
|
: public smo::NonPostedAsynchronousContinuation<
|
||||||
|
Device::executeHandshakeReqCbFn>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
friend void Device::executeHandshakeReq(
|
friend void Device::executeHandshakeReq(
|
||||||
@@ -482,7 +485,7 @@ public:
|
|||||||
ExecuteHandshakeReq(
|
ExecuteHandshakeReq(
|
||||||
Device& dev, const std::string& deviceIP,
|
Device& dev, const std::string& deviceIP,
|
||||||
Device::executeHandshakeReqCbFn cb)
|
Device::executeHandshakeReqCbFn cb)
|
||||||
: smo::AsynchronousContinuation<Device::executeHandshakeReqCbFn>(
|
: smo::NonPostedAsynchronousContinuation<Device::executeHandshakeReqCbFn>(
|
||||||
std::move(cb)),
|
std::move(cb)),
|
||||||
device(dev), deviceIP(deviceIP),
|
device(dev), deviceIP(deviceIP),
|
||||||
handshakeFdDesc(device.componentThread->getIoService()),
|
handshakeFdDesc(device.componentThread->getIoService()),
|
||||||
@@ -497,7 +500,7 @@ public:
|
|||||||
|
|
||||||
// Public accessor for the original callback
|
// Public accessor for the original callback
|
||||||
void callOriginalCallback(bool success, int fd)
|
void callOriginalCallback(bool success, int fd)
|
||||||
{ originalCbFn(success, fd); }
|
{ callOriginalCb(success, fd); }
|
||||||
|
|
||||||
void callOriginalCallbackWithFailure()
|
void callOriginalCallbackWithFailure()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -4,6 +4,7 @@
|
|||||||
#include <functional>
|
#include <functional>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <componentThread.h>
|
#include <componentThread.h>
|
||||||
|
#include <lockSpec.h>
|
||||||
|
|
||||||
namespace smo {
|
namespace smo {
|
||||||
|
|
||||||
@@ -15,20 +16,6 @@ namespace smo {
|
|||||||
*
|
*
|
||||||
* The template parameter OriginalCbFnT represents the signature of the original
|
* The template parameter OriginalCbFnT represents the signature of the original
|
||||||
* callback that will be invoked when the async sequence completes.
|
* callback that will be invoked when the async sequence completes.
|
||||||
*
|
|
||||||
* Usage:
|
|
||||||
* class MyAsyncReq
|
|
||||||
* : public AsynchronousContinuation<std::function<void(bool)>>
|
|
||||||
* {
|
|
||||||
* public:
|
|
||||||
* MyAsyncReq(std::function<void(bool)> originalCbFn)
|
|
||||||
* : AsynchronousContinuation(originalCbFn)
|
|
||||||
* {}
|
|
||||||
*
|
|
||||||
* // Segment methods take only the shared_ptr for lifetime management
|
|
||||||
* void myAsyncReq1(std::shared_ptr<MyAsyncReq> context);
|
|
||||||
* void myAsyncReq2(std::shared_ptr<MyAsyncReq> context);
|
|
||||||
* };
|
|
||||||
*/
|
*/
|
||||||
template <class OriginalCbFnT>
|
template <class OriginalCbFnT>
|
||||||
class AsynchronousContinuation
|
class AsynchronousContinuation
|
||||||
@@ -46,33 +33,101 @@ public:
|
|||||||
std::shared_ptr<AsynchronousContinuation<OriginalCbFnT>>
|
std::shared_ptr<AsynchronousContinuation<OriginalCbFnT>>
|
||||||
lifetimePreservingConveyance);
|
lifetimePreservingConveyance);
|
||||||
|
|
||||||
protected:
|
/**
|
||||||
|
* @brief Call the original callback with perfect forwarding
|
||||||
|
*
|
||||||
|
* IMPORTANT: This method cannot be virtual because templates cannot be
|
||||||
|
* virtual in C++. Therefore, this method MUST be called from the
|
||||||
|
* most-derived class reference/pointer context. Never call this method
|
||||||
|
* through a polymorphic base class reference/pointer, as it will not
|
||||||
|
* dispatch to the correct derived class implementation.
|
||||||
|
*
|
||||||
|
* @param args Arguments to forward to the original callback
|
||||||
|
*/
|
||||||
|
template<typename... Args>
|
||||||
|
void callOriginalCb(Args&&... args)
|
||||||
|
{
|
||||||
|
if (originalCbFn) { originalCbFn(std::forward<Args>(args)...); }
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
OriginalCbFnT originalCbFn;
|
OriginalCbFnT originalCbFn;
|
||||||
};
|
};
|
||||||
|
|
||||||
class ContinuationTarget
|
/**
|
||||||
|
* NonPostedAsynchronousContinuation - For continuations that don't post
|
||||||
|
* callbacks
|
||||||
|
*
|
||||||
|
* Note: We intentionally do not create a
|
||||||
|
* LockedNonPostedAsynchronousContinuation because the only way to implement
|
||||||
|
* non-posted locking would be via busy-spinning or sleeplocks. This would
|
||||||
|
* eliminate the throughput advantage from our Qspinning mechanism, which
|
||||||
|
* relies on re-posting to the io_service queue when locks are unavailable.
|
||||||
|
*/
|
||||||
|
template <class OriginalCbFnT>
|
||||||
|
class NonPostedAsynchronousContinuation
|
||||||
|
: public AsynchronousContinuation<OriginalCbFnT>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ContinuationTarget(
|
explicit NonPostedAsynchronousContinuation(OriginalCbFnT originalCbFn)
|
||||||
const std::shared_ptr<ComponentThread> &caller)
|
: AsynchronousContinuation<OriginalCbFnT>(originalCbFn)
|
||||||
: caller(caller)
|
|
||||||
{}
|
{}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
const std::shared_ptr<ComponentThread> caller;
|
using AsynchronousContinuation<OriginalCbFnT>::callOriginalCb;
|
||||||
};
|
};
|
||||||
|
|
||||||
template <class OriginalCbFnT>
|
template <class OriginalCbFnT>
|
||||||
class TargetedAsynchronousContinuation
|
class PostedAsynchronousContinuation
|
||||||
: public AsynchronousContinuation<OriginalCbFnT>, public ContinuationTarget
|
: public AsynchronousContinuation<OriginalCbFnT>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
TargetedAsynchronousContinuation(
|
PostedAsynchronousContinuation(
|
||||||
const std::shared_ptr<ComponentThread> &caller,
|
const std::shared_ptr<ComponentThread> &caller,
|
||||||
OriginalCbFnT originalCbFn)
|
OriginalCbFnT originalCbFn)
|
||||||
: AsynchronousContinuation<OriginalCbFnT>(originalCbFn),
|
: AsynchronousContinuation<OriginalCbFnT>(originalCbFn),
|
||||||
ContinuationTarget(caller)
|
caller(caller)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
|
template<typename... Args>
|
||||||
|
void callOriginalCb(Args&&... args)
|
||||||
|
{
|
||||||
|
if (AsynchronousContinuation<OriginalCbFnT>::originalCbFn) {
|
||||||
|
caller->getIoService().post(
|
||||||
|
std::bind(
|
||||||
|
AsynchronousContinuation<OriginalCbFnT>::originalCbFn,
|
||||||
|
std::forward<Args>(args)...));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
std::shared_ptr<ComponentThread> caller;
|
||||||
|
using AsynchronousContinuation<OriginalCbFnT>::callOriginalCb;
|
||||||
|
};
|
||||||
|
|
||||||
|
template <class OriginalCbFnT>
|
||||||
|
class SerializedAsynchronousContinuation
|
||||||
|
: public PostedAsynchronousContinuation<OriginalCbFnT>, public LockSpec
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
SerializedAsynchronousContinuation(
|
||||||
|
const std::shared_ptr<ComponentThread> &caller,
|
||||||
|
OriginalCbFnT originalCbFn,
|
||||||
|
std::vector<std::reference_wrapper<SpinLock>> requiredLocks = {})
|
||||||
|
: PostedAsynchronousContinuation<OriginalCbFnT>(caller, originalCbFn),
|
||||||
|
LockSpec(std::move(requiredLocks))
|
||||||
|
{}
|
||||||
|
|
||||||
|
template<typename... Args>
|
||||||
|
void callOriginalCb(Args&&... args)
|
||||||
|
{
|
||||||
|
LockSpec::release();
|
||||||
|
PostedAsynchronousContinuation<OriginalCbFnT>::callOriginalCb(
|
||||||
|
std::forward<Args>(args)...);
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
using AsynchronousContinuation<OriginalCbFnT>::callOriginalCb;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace smo
|
} // namespace smo
|
||||||
|
|||||||
+4
-15
@@ -16,27 +16,16 @@ Body::Body(Mind &parent, const std::shared_ptr<ComponentThread> &thread)
|
|||||||
}
|
}
|
||||||
|
|
||||||
class Body::InitializeReq
|
class Body::InitializeReq
|
||||||
: public AsynchronousContinuation<bodyLifetimeMgmtOpCbFn>,
|
: public PostedAsynchronousContinuation<bodyLifetimeMgmtOpCbFn>
|
||||||
public ContinuationTarget
|
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
InitializeReq(
|
InitializeReq(
|
||||||
Mind &parent, const std::shared_ptr<ComponentThread> &caller,
|
Mind &parent, const std::shared_ptr<ComponentThread> &caller,
|
||||||
bodyLifetimeMgmtOpCbFn callback)
|
bodyLifetimeMgmtOpCbFn callback)
|
||||||
: AsynchronousContinuation<bodyLifetimeMgmtOpCbFn>(callback),
|
: PostedAsynchronousContinuation<bodyLifetimeMgmtOpCbFn>(caller, callback),
|
||||||
ContinuationTarget(caller),
|
|
||||||
parent(parent)
|
parent(parent)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
void callOriginalCbFn(bool success)
|
|
||||||
{
|
|
||||||
if (originalCbFn)
|
|
||||||
{
|
|
||||||
caller->getIoService().post(
|
|
||||||
std::bind(originalCbFn, success));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Mind &parent;
|
Mind &parent;
|
||||||
|
|
||||||
@@ -104,7 +93,7 @@ public:
|
|||||||
<< results.nSucceeded << " of " << results.nTotal
|
<< results.nSucceeded << " of " << results.nTotal
|
||||||
<< " sense devices." << "\n";
|
<< " sense devices." << "\n";
|
||||||
|
|
||||||
callOriginalCbFn(results.nSucceeded > 0);
|
callOriginalCb(results.nSucceeded > 0);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -148,7 +137,7 @@ public:
|
|||||||
|
|
||||||
std::cout << "Mrntt: About to unload all sense api libs." << "\n";
|
std::cout << "Mrntt: About to unload all sense api libs." << "\n";
|
||||||
sense_api::SenseApiManager::getInstance().unloadAllSenseApiLibs();
|
sense_api::SenseApiManager::getInstance().unloadAllSenseApiLibs();
|
||||||
callOriginalCbFn(results.nSucceeded == results.nTotal);
|
callOriginalCb(results.nSucceeded == results.nTotal);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -105,32 +105,18 @@ void MindThread::main(MindThread& self)
|
|||||||
}
|
}
|
||||||
|
|
||||||
class MindThread::ThreadLifetimeMgmtOp
|
class MindThread::ThreadLifetimeMgmtOp
|
||||||
: public TargetedAsynchronousContinuation<threadLifetimeMgmtOpCbFn>
|
: public PostedAsynchronousContinuation<threadLifetimeMgmtOpCbFn>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ThreadLifetimeMgmtOp(
|
ThreadLifetimeMgmtOp(
|
||||||
const std::shared_ptr<ComponentThread> &caller,
|
const std::shared_ptr<ComponentThread> &caller,
|
||||||
const std::shared_ptr<MindThread> &target,
|
const std::shared_ptr<MindThread> &target,
|
||||||
threadLifetimeMgmtOpCbFn callback)
|
threadLifetimeMgmtOpCbFn callback)
|
||||||
: TargetedAsynchronousContinuation<threadLifetimeMgmtOpCbFn>(
|
: PostedAsynchronousContinuation<threadLifetimeMgmtOpCbFn>(
|
||||||
caller, callback),
|
caller, callback),
|
||||||
target(target)
|
target(target)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
void callOriginalCbFn(void)
|
|
||||||
{
|
|
||||||
if (originalCbFn)
|
|
||||||
{
|
|
||||||
/** EXPLANATION:
|
|
||||||
* This is only permissible because originalCbFn doesn't take any
|
|
||||||
* further arguments. If we had to bind argument to it before
|
|
||||||
* posting it, we'd have to std::bind them and then post the
|
|
||||||
* resulting function object.
|
|
||||||
*/
|
|
||||||
caller->getIoService().post(originalCbFn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
const std::shared_ptr<MindThread> target;
|
const std::shared_ptr<MindThread> target;
|
||||||
|
|
||||||
@@ -144,7 +130,7 @@ public:
|
|||||||
<< "\n";
|
<< "\n";
|
||||||
|
|
||||||
target->io_service.stop();
|
target->io_service.stop();
|
||||||
callOriginalCbFn();
|
callOriginalCb();
|
||||||
}
|
}
|
||||||
|
|
||||||
void startThreadReq1_posted(
|
void startThreadReq1_posted(
|
||||||
@@ -158,7 +144,7 @@ public:
|
|||||||
// Execute private setup sequence here
|
// Execute private setup sequence here
|
||||||
// This is where each thread would implement its specific initialization
|
// This is where each thread would implement its specific initialization
|
||||||
|
|
||||||
callOriginalCbFn();
|
callOriginalCb();
|
||||||
}
|
}
|
||||||
|
|
||||||
void exitThreadReq1_mainQueue_posted(
|
void exitThreadReq1_mainQueue_posted(
|
||||||
@@ -170,7 +156,7 @@ public:
|
|||||||
|
|
||||||
target->cleanup();
|
target->cleanup();
|
||||||
target->io_service.stop();
|
target->io_service.stop();
|
||||||
callOriginalCbFn();
|
callOriginalCb();
|
||||||
}
|
}
|
||||||
|
|
||||||
void exitThreadReq1_pauseQueue_posted(
|
void exitThreadReq1_pauseQueue_posted(
|
||||||
@@ -183,7 +169,7 @@ public:
|
|||||||
target->cleanup();
|
target->cleanup();
|
||||||
target->pause_io_service.stop();
|
target->pause_io_service.stop();
|
||||||
target->io_service.stop();
|
target->io_service.stop();
|
||||||
callOriginalCbFn();
|
callOriginalCb();
|
||||||
}
|
}
|
||||||
|
|
||||||
void pauseThreadReq1_posted(
|
void pauseThreadReq1_posted(
|
||||||
@@ -197,7 +183,7 @@ public:
|
|||||||
* our next operation is going to block the thread, so it won't
|
* our next operation is going to block the thread, so it won't
|
||||||
* have a chance to invoke the callback until it's unblocked.
|
* have a chance to invoke the callback until it's unblocked.
|
||||||
*/
|
*/
|
||||||
callOriginalCbFn();
|
callOriginalCb();
|
||||||
target->pause_io_service.reset();
|
target->pause_io_service.reset();
|
||||||
target->pause_io_service.run();
|
target->pause_io_service.run();
|
||||||
}
|
}
|
||||||
@@ -210,7 +196,7 @@ public:
|
|||||||
"resumeThread." << "\n";
|
"resumeThread." << "\n";
|
||||||
|
|
||||||
target->pause_io_service.stop();
|
target->pause_io_service.stop();
|
||||||
callOriginalCbFn();
|
callOriginalCb();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -39,31 +39,18 @@ const std::string DeviceManager::stringifyDeviceSpecs(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
class DeviceManager::NewDeviceAttachmentSpecInd
|
class DeviceManager::NewDeviceAttachmentSpecInd
|
||||||
: public TargetedAsynchronousContinuation<newDeviceAttachmentSpecIndCbFn>
|
: public PostedAsynchronousContinuation<newDeviceAttachmentSpecIndCbFn>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
NewDeviceAttachmentSpecInd(
|
NewDeviceAttachmentSpecInd(
|
||||||
std::shared_ptr<DeviceAttachmentSpec> s,
|
std::shared_ptr<DeviceAttachmentSpec> s,
|
||||||
const std::shared_ptr<ComponentThread> &caller,
|
const std::shared_ptr<ComponentThread> &caller,
|
||||||
newDeviceAttachmentSpecIndCbFn cb)
|
newDeviceAttachmentSpecIndCbFn cb)
|
||||||
: TargetedAsynchronousContinuation<newDeviceAttachmentSpecIndCbFn>(
|
: PostedAsynchronousContinuation<newDeviceAttachmentSpecIndCbFn>(
|
||||||
caller, cb),
|
caller, cb),
|
||||||
spec(s)
|
spec(s)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
void callOriginalCb(
|
|
||||||
bool success,
|
|
||||||
std::shared_ptr<Device> device,
|
|
||||||
std::shared_ptr<DeviceAttachmentSpec> deviceSpec)
|
|
||||||
{
|
|
||||||
if (originalCbFn)
|
|
||||||
{
|
|
||||||
caller->getIoService().post(
|
|
||||||
std::bind(
|
|
||||||
originalCbFn, success, device, deviceSpec));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
std::shared_ptr<DeviceAttachmentSpec> spec;
|
std::shared_ptr<DeviceAttachmentSpec> spec;
|
||||||
|
|
||||||
|
|||||||
@@ -10,27 +10,17 @@ namespace smo {
|
|||||||
namespace mrntt {
|
namespace mrntt {
|
||||||
|
|
||||||
class MarionetteComponent::MrnttLifetimeMgmtOp
|
class MarionetteComponent::MrnttLifetimeMgmtOp
|
||||||
: public AsynchronousContinuation<mrnttLifetimeMgmtOpCbFn>,
|
: public PostedAsynchronousContinuation<mrnttLifetimeMgmtOpCbFn>
|
||||||
public ContinuationTarget
|
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
MrnttLifetimeMgmtOp(
|
MrnttLifetimeMgmtOp(
|
||||||
MarionetteComponent &parent, const std::shared_ptr<ComponentThread> &caller,
|
MarionetteComponent &parent, const std::shared_ptr<ComponentThread> &caller,
|
||||||
mrnttLifetimeMgmtOpCbFn callback)
|
mrnttLifetimeMgmtOpCbFn callback)
|
||||||
: AsynchronousContinuation<mrnttLifetimeMgmtOpCbFn>(callback),
|
: PostedAsynchronousContinuation<mrnttLifetimeMgmtOpCbFn>(
|
||||||
ContinuationTarget(caller),
|
caller, callback),
|
||||||
parent(parent)
|
parent(parent)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
void callOriginalCbFn(bool success)
|
|
||||||
{
|
|
||||||
if (originalCbFn)
|
|
||||||
{
|
|
||||||
caller->getIoService().post(
|
|
||||||
std::bind(originalCbFn, success));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
MarionetteComponent &parent;
|
MarionetteComponent &parent;
|
||||||
|
|
||||||
@@ -62,11 +52,11 @@ public:
|
|||||||
{
|
{
|
||||||
std::cerr << __func__ << ": Failed to initialize globalMind"
|
std::cerr << __func__ << ": Failed to initialize globalMind"
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
context->callOriginalCbFn(false);
|
context->callOriginalCb(false);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
context->callOriginalCbFn(success);
|
context->callOriginalCb(success);
|
||||||
}
|
}
|
||||||
|
|
||||||
void finalizeReq1_posted(
|
void finalizeReq1_posted(
|
||||||
@@ -95,11 +85,11 @@ public:
|
|||||||
{
|
{
|
||||||
std::cerr << __func__ << ": globalMind finalization failed"
|
std::cerr << __func__ << ": globalMind finalization failed"
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
context->callOriginalCbFn(false);
|
context->callOriginalCb(false);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
context->callOriginalCbFn(success);
|
context->callOriginalCb(success);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
+9
-23
@@ -84,24 +84,17 @@ Mind::getMindThreads() const
|
|||||||
}
|
}
|
||||||
|
|
||||||
class Mind::MindLifetimeMgmtOp
|
class Mind::MindLifetimeMgmtOp
|
||||||
: public TargetedAsynchronousContinuation<mindLifetimeMgmtOpCbFn>
|
: public PostedAsynchronousContinuation<mindLifetimeMgmtOpCbFn>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
MindLifetimeMgmtOp(
|
MindLifetimeMgmtOp(
|
||||||
Mind &parent, const std::shared_ptr<ComponentThread> &caller,
|
Mind &parent, const std::shared_ptr<ComponentThread> &caller,
|
||||||
mindLifetimeMgmtOpCbFn callback)
|
mindLifetimeMgmtOpCbFn callback)
|
||||||
: TargetedAsynchronousContinuation<mindLifetimeMgmtOpCbFn>(
|
: PostedAsynchronousContinuation<mindLifetimeMgmtOpCbFn>(
|
||||||
caller, callback),
|
caller, callback),
|
||||||
parent(parent)
|
parent(parent)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
void callOriginalCbFn(bool success)
|
|
||||||
{
|
|
||||||
if (originalCbFn) {
|
|
||||||
caller->getIoService().post(std::bind(originalCbFn, success));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
Mind &parent;
|
Mind &parent;
|
||||||
|
|
||||||
@@ -147,7 +140,7 @@ public:
|
|||||||
)
|
)
|
||||||
{
|
{
|
||||||
std::cout << "Mrntt: Body component initialized." << "\n";
|
std::cout << "Mrntt: Body component initialized." << "\n";
|
||||||
callOriginalCbFn(success);
|
callOriginalCb(success);
|
||||||
}
|
}
|
||||||
|
|
||||||
void finalizeReq1_posted(
|
void finalizeReq1_posted(
|
||||||
@@ -198,7 +191,7 @@ public:
|
|||||||
)
|
)
|
||||||
{
|
{
|
||||||
std::cout << "Mrntt: All mind threads exited." << "\n";
|
std::cout << "Mrntt: All mind threads exited." << "\n";
|
||||||
callOriginalCbFn(true);
|
callOriginalCb(true);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -260,24 +253,17 @@ void Mind::distributeAndPinThreadsAcrossCpus()
|
|||||||
}
|
}
|
||||||
|
|
||||||
class Mind::MindThreadLifetimeMgmtOp
|
class Mind::MindThreadLifetimeMgmtOp
|
||||||
: public AsynchronousContinuation<mindThreadLifetimeMgmtOpCbFn>
|
: public NonPostedAsynchronousContinuation<mindThreadLifetimeMgmtOpCbFn>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
MindThreadLifetimeMgmtOp(
|
MindThreadLifetimeMgmtOp(
|
||||||
Mind &parent,unsigned int nThreads,
|
Mind &parent,unsigned int nThreads,
|
||||||
mindThreadLifetimeMgmtOpCbFn callback)
|
mindThreadLifetimeMgmtOpCbFn callback)
|
||||||
: AsynchronousContinuation<mindThreadLifetimeMgmtOpCbFn>(callback),
|
: NonPostedAsynchronousContinuation<mindThreadLifetimeMgmtOpCbFn>(callback),
|
||||||
loop(nThreads),
|
loop(nThreads),
|
||||||
parent(parent)
|
parent(parent)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
void callOriginalCbFn(void)
|
|
||||||
{
|
|
||||||
if (originalCbFn) {
|
|
||||||
originalCbFn();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
AsynchronousLoop loop;
|
AsynchronousLoop loop;
|
||||||
Mind &parent;
|
Mind &parent;
|
||||||
@@ -293,7 +279,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
parent.threadsHaveBeenJolted = true;
|
parent.threadsHaveBeenJolted = true;
|
||||||
callOriginalCbFn();
|
callOriginalCb();
|
||||||
}
|
}
|
||||||
|
|
||||||
void executeGenericOpOnAllMindThreadsReq1(
|
void executeGenericOpOnAllMindThreadsReq1(
|
||||||
@@ -305,7 +291,7 @@ public:
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
callOriginalCbFn();
|
callOriginalCb();
|
||||||
}
|
}
|
||||||
|
|
||||||
void exitAllMindThreadsReq1(
|
void exitAllMindThreadsReq1(
|
||||||
@@ -321,7 +307,7 @@ public:
|
|||||||
thread->thread.join();
|
thread->thread.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
callOriginalCbFn();
|
callOriginalCb();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -262,30 +262,18 @@ void SenseApiManager::finalizeAllSenseApiLibs(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
class SenseApiManager::AttachSenseDeviceReq
|
class SenseApiManager::AttachSenseDeviceReq
|
||||||
: public TargetedAsynchronousContinuation<attachSenseDeviceReqCbFn>
|
: public PostedAsynchronousContinuation<attachSenseDeviceReqCbFn>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
AttachSenseDeviceReq(
|
AttachSenseDeviceReq(
|
||||||
const std::shared_ptr<device::DeviceAttachmentSpec>& spec,
|
const std::shared_ptr<device::DeviceAttachmentSpec>& spec,
|
||||||
const std::shared_ptr<ComponentThread> &caller,
|
const std::shared_ptr<ComponentThread> &caller,
|
||||||
attachSenseDeviceReqCbFn cb)
|
attachSenseDeviceReqCbFn cb)
|
||||||
: TargetedAsynchronousContinuation<attachSenseDeviceReqCbFn>(
|
: PostedAsynchronousContinuation<attachSenseDeviceReqCbFn>(
|
||||||
caller, cb),
|
caller, cb),
|
||||||
spec(spec)
|
spec(spec)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
void callOriginalCb(
|
|
||||||
bool success, std::shared_ptr<device::DeviceAttachmentSpec> deviceSpec
|
|
||||||
)
|
|
||||||
{
|
|
||||||
if (originalCbFn)
|
|
||||||
{
|
|
||||||
caller->getIoService().post(
|
|
||||||
std::bind(
|
|
||||||
originalCbFn, success, deviceSpec));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
void attachSenseDeviceReq1_posted(
|
void attachSenseDeviceReq1_posted(
|
||||||
[[maybe_unused]] std::shared_ptr<AttachSenseDeviceReq> context
|
[[maybe_unused]] std::shared_ptr<AttachSenseDeviceReq> context
|
||||||
@@ -448,7 +436,7 @@ void SenseApiManager::detachSenseDeviceReq(
|
|||||||
}
|
}
|
||||||
|
|
||||||
class SenseApiManager::AttachAllSenseDevicesFromSpecsReq
|
class SenseApiManager::AttachAllSenseDevicesFromSpecsReq
|
||||||
: public TargetedAsynchronousContinuation<
|
: public PostedAsynchronousContinuation<
|
||||||
attachAllSenseDevicesFromSpecsReqCbFn>
|
attachAllSenseDevicesFromSpecsReqCbFn>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@@ -456,21 +444,11 @@ public:
|
|||||||
const unsigned int totalNSpecs,
|
const unsigned int totalNSpecs,
|
||||||
const std::shared_ptr<ComponentThread>& caller,
|
const std::shared_ptr<ComponentThread>& caller,
|
||||||
attachAllSenseDevicesFromSpecsReqCbFn cb)
|
attachAllSenseDevicesFromSpecsReqCbFn cb)
|
||||||
: TargetedAsynchronousContinuation<attachAllSenseDevicesFromSpecsReqCbFn>(
|
: PostedAsynchronousContinuation<attachAllSenseDevicesFromSpecsReqCbFn>(
|
||||||
caller, cb),
|
caller, cb),
|
||||||
loop(totalNSpecs)
|
loop(totalNSpecs)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
void callOriginalCallback()
|
|
||||||
{
|
|
||||||
if (originalCbFn)
|
|
||||||
{
|
|
||||||
caller->getIoService().post(
|
|
||||||
std::bind(
|
|
||||||
originalCbFn, loop));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
void attachAllSenseDevicesFromSpecsReq1_posted(
|
void attachAllSenseDevicesFromSpecsReq1_posted(
|
||||||
[[maybe_unused]] std::shared_ptr<AttachAllSenseDevicesFromSpecsReq> context
|
[[maybe_unused]] std::shared_ptr<AttachAllSenseDevicesFromSpecsReq> context
|
||||||
@@ -513,7 +491,7 @@ public:
|
|||||||
<< context->loop.nFailed.load() << " devices failed\n";
|
<< context->loop.nFailed.load() << " devices failed\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
context->callOriginalCallback();
|
context->callOriginalCb(loop);
|
||||||
}
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
@@ -588,7 +566,7 @@ public:
|
|||||||
<< context->loop.nFailed.load() << " devices failed\n";
|
<< context->loop.nFailed.load() << " devices failed\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
context->callOriginalCallback();
|
context->callOriginalCb(loop);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user