Async: add TargetedContinuation

This class enables us to consistently represent continuations
that are intended to be posted on a particular target handling
thread. It hols a sh_ptr to the caller so that the target thread
can re-enqueue the response on the caller after processing the
REQ/IND op.
This commit is contained in:
2025-09-11 18:37:48 -04:00
parent f5195450e4
commit b8c931397d
3 changed files with 51 additions and 13 deletions
+6 -4
View File
@@ -72,7 +72,7 @@ std::optional<std::shared_ptr<Device>> DeviceManager::getDevice(
// GetOrCreateDeviceReq nested class implementation // GetOrCreateDeviceReq nested class implementation
class DeviceManager::GetOrCreateDeviceReq class DeviceManager::GetOrCreateDeviceReq
: public AsynchronousContinuation<livoxProto1_getOrCreateDeviceReqCbFn> : public smo::AsynchronousContinuation<livoxProto1_getOrCreateDeviceReqCbFn>
{ {
public: public:
DeviceManager& deviceManager; DeviceManager& deviceManager;
@@ -84,7 +84,8 @@ public:
DeviceManager& mgr, DeviceManager& mgr,
std::shared_ptr<Device> device, std::shared_ptr<Device> device,
livoxProto1_getOrCreateDeviceReqCbFn cb) livoxProto1_getOrCreateDeviceReqCbFn cb)
: AsynchronousContinuation(std::move(cb)), : smo::AsynchronousContinuation<
livoxProto1_getOrCreateDeviceReqCbFn>(std::move(cb)),
deviceManager(mgr), pendingDevice(device) deviceManager(mgr), pendingDevice(device)
{} {}
@@ -175,7 +176,7 @@ void DeviceManager::getOrCreateDeviceReq(
} }
class DeviceManager::DestroyDeviceReq class DeviceManager::DestroyDeviceReq
: public AsynchronousContinuation<livoxProto1_destroyDeviceReqCbFn> : public smo::AsynchronousContinuation<livoxProto1_destroyDeviceReqCbFn>
{ {
public: public:
DeviceManager& deviceManager; DeviceManager& deviceManager;
@@ -186,7 +187,8 @@ public:
DeviceManager& mgr, DeviceManager& mgr,
std::shared_ptr<Device> device, std::shared_ptr<Device> device,
livoxProto1_destroyDeviceReqCbFn cb) livoxProto1_destroyDeviceReqCbFn cb)
: AsynchronousContinuation(std::move(cb)), : smo::AsynchronousContinuation<
livoxProto1_destroyDeviceReqCbFn>(std::move(cb)),
deviceManager(mgr), pendingDevice(device) deviceManager(mgr), pendingDevice(device)
{} {}
+14 -8
View File
@@ -120,7 +120,7 @@ Device::~Device()
* This class manages the overall device connection process including handshake and heartbeat setup * This class manages the overall device connection process including handshake and heartbeat setup
*/ */
class Device::ConnectReq class Device::ConnectReq
: public AsynchronousContinuation<Device::connectReqCbFn> : public smo::AsynchronousContinuation<Device::connectReqCbFn>
{ {
private: private:
Device& device; Device& device;
@@ -128,7 +128,9 @@ private:
public: public:
ConnectReq(Device& dev, Device::connectReqCbFn cb) ConnectReq(Device& dev, Device::connectReqCbFn cb)
: AsynchronousContinuation(std::move(cb)), device(dev) {} : smo::AsynchronousContinuation<Device::connectReqCbFn>(
std::move(cb)), device(dev)
{}
/** FIXME: /** FIXME:
* WE need to assign the ipAddr to the Device being connected up. * WE need to assign the ipAddr to the Device being connected up.
@@ -259,7 +261,7 @@ void Device::connectReq(Device::connectReqCbFn callback)
} }
class Device::ConnectToKnownDeviceReq class Device::ConnectToKnownDeviceReq
: public AsynchronousContinuation<Device::connectToKnownDeviceReqCbFn> : public smo::AsynchronousContinuation<Device::connectToKnownDeviceReqCbFn>
{ {
public: public:
Device& device; Device& device;
@@ -267,7 +269,8 @@ public:
std::shared_ptr<livoxProto1::comms::DiscoveredDevice> deviceInfo; std::shared_ptr<livoxProto1::comms::DiscoveredDevice> deviceInfo;
ConnectToKnownDeviceReq(Device& dev, Device::connectToKnownDeviceReqCbFn cb) ConnectToKnownDeviceReq(Device& dev, Device::connectToKnownDeviceReqCbFn cb)
: AsynchronousContinuation(std::move(cb)), device(dev) : smo::AsynchronousContinuation<Device::connectToKnownDeviceReqCbFn>(
std::move(cb)), device(dev)
{} {}
// Public accessor for the original callback // Public accessor for the original callback
@@ -360,7 +363,8 @@ void Device::connectToKnownDeviceReq(
} }
class Device::ConnectByDeviceIdentifierReq class Device::ConnectByDeviceIdentifierReq
: public AsynchronousContinuation<Device::connectByDeviceIdentifierReqCbFn> : public smo::AsynchronousContinuation<
Device::connectByDeviceIdentifierReqCbFn>
{ {
public: public:
Device& device; Device& device;
@@ -368,7 +372,8 @@ public:
ConnectByDeviceIdentifierReq( ConnectByDeviceIdentifierReq(
Device& dev, Device::connectByDeviceIdentifierReqCbFn cb) Device& dev, Device::connectByDeviceIdentifierReqCbFn cb)
: AsynchronousContinuation(std::move(cb)), device(dev) : smo::AsynchronousContinuation<Device::connectByDeviceIdentifierReqCbFn>(
std::move(cb)), device(dev)
{} {}
// Public accessor for the original callback // Public accessor for the original callback
@@ -440,7 +445,7 @@ void Device::connectByDeviceIdentifierReq(
} }
class Device::ExecuteHandshakeReq class Device::ExecuteHandshakeReq
: public AsynchronousContinuation<Device::executeHandshakeReqCbFn> : public smo::AsynchronousContinuation<Device::executeHandshakeReqCbFn>
{ {
public: public:
friend void Device::executeHandshakeReq( friend void Device::executeHandshakeReq(
@@ -477,7 +482,8 @@ public:
ExecuteHandshakeReq( ExecuteHandshakeReq(
Device& dev, const std::string& deviceIP, Device& dev, const std::string& deviceIP,
Device::executeHandshakeReqCbFn cb) Device::executeHandshakeReqCbFn cb)
: AsynchronousContinuation(std::move(cb)), : smo::AsynchronousContinuation<Device::executeHandshakeReqCbFn>(
std::move(cb)),
device(dev), deviceIP(deviceIP), device(dev), deviceIP(deviceIP),
handshakeFdDesc(device.componentThread->getIoService()), handshakeFdDesc(device.componentThread->getIoService()),
timeoutTimer(device.componentThread->getIoService()) timeoutTimer(device.componentThread->getIoService())
+30
View File
@@ -3,6 +3,9 @@
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <componentThread.h>
namespace smo {
/** /**
* AsynchronousContinuation - Template base class for async sequence management * AsynchronousContinuation - Template base class for async sequence management
@@ -47,4 +50,31 @@ protected:
OriginalCbFnT originalCbFn; OriginalCbFnT originalCbFn;
}; };
class ContinuationTarget
{
public:
ContinuationTarget(
const std::shared_ptr<ComponentThread> &caller)
: caller(caller)
{}
public:
const std::shared_ptr<ComponentThread> caller;
};
template <class OriginalCbFnT>
class TargetedAsynchronousContinuation
: public AsynchronousContinuation<OriginalCbFnT>, public ContinuationTarget
{
public:
TargetedAsynchronousContinuation(
const std::shared_ptr<ComponentThread> &caller,
OriginalCbFnT originalCbFn)
: AsynchronousContinuation<OriginalCbFnT>(originalCbFn),
ContinuationTarget(caller)
{}
};
} // namespace smo
#endif // ASYNCHRONOUS_CONTINUATION_H #endif // ASYNCHRONOUS_CONTINUATION_H