Async: add sh_ptr<ContinuationChainLink> to Callback<>

This change enables us to finally implement the tracing of
continuations backward from the point of acquisition for deadlock
debugging.
This commit is contained in:
2025-09-27 18:30:09 -04:00
parent 2212aec080
commit 782bcd4567
26 changed files with 384 additions and 269 deletions
+11 -10
View File
@@ -4,6 +4,7 @@
#include <optional>
#include <opts.h>
#include <asynchronousContinuation.h>
#include <callback.h>
#include <user/senseApiDesc.h>
#include "protocol.h"
#include "core.h"
@@ -84,7 +85,7 @@ public:
GetOrCreateDeviceReq(
DeviceManager& mgr,
std::shared_ptr<Device> device,
livoxProto1_getOrCreateDeviceReqCbFn cb)
smo::Callback<livoxProto1_getOrCreateDeviceReqCbFn> cb)
: smo::NonPostedAsynchronousContinuation<
livoxProto1_getOrCreateDeviceReqCbFn>(std::move(cb)),
deviceManager(mgr), pendingDevice(device)
@@ -130,7 +131,7 @@ void DeviceManager::getOrCreateDeviceReq(
int handshakeTimeoutMs, int retryDelayMs,
const std::string& smoIp, uint8_t smoSubnetNbits,
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort,
livoxProto1_getOrCreateDeviceReqCbFn callback)
smo::Callback<livoxProto1_getOrCreateDeviceReqCbFn> callback)
{
// Validate smoIp format using Boost.Asio IPv4 validation
if (!smoIp.empty() && !comms::isValidIPv4(smoIp))
@@ -154,7 +155,7 @@ void DeviceManager::getOrCreateDeviceReq(
if (existingDevice)
{
// Device already exists and is connected, return it
callback(true, existingDevice.value());
callback.callbackFn(true, existingDevice.value());
return;
}
@@ -171,9 +172,9 @@ void DeviceManager::getOrCreateDeviceReq(
// Start the connection process - only add to collection on success
request->pendingDevice->connectReq(
std::bind(
{request, std::bind(
&DeviceManager::GetOrCreateDeviceReq::getOrCreateDeviceReq1,
request.get(), request, std::placeholders::_1));
request.get(), request, std::placeholders::_1)});
}
class DeviceManager::DestroyDeviceReq
@@ -188,7 +189,7 @@ public:
DestroyDeviceReq(
DeviceManager& mgr,
std::shared_ptr<Device> device,
livoxProto1_destroyDeviceReqCbFn cb)
smo::Callback<livoxProto1_destroyDeviceReqCbFn> cb)
: smo::NonPostedAsynchronousContinuation<
livoxProto1_destroyDeviceReqCbFn>(std::move(cb)),
deviceManager(mgr), pendingDevice(device)
@@ -218,7 +219,7 @@ public:
void DeviceManager::destroyDeviceReq(
std::shared_ptr<Device> dev,
livoxProto1_destroyDeviceReqCbFn callback
smo::Callback<livoxProto1_destroyDeviceReqCbFn> callback
)
{
/** EXPLANATION:
@@ -230,7 +231,7 @@ void DeviceManager::destroyDeviceReq(
if (!device)
{
callback(false);
callback.callbackFn(false);
return;
}
@@ -238,9 +239,9 @@ void DeviceManager::destroyDeviceReq(
*this, device, std::move(callback));
device->disconnectReq(
std::bind(
{request, std::bind(
&DeviceManager::DestroyDeviceReq::destroyDeviceReq1,
request.get(), request, std::placeholders::_1));
request.get(), request, std::placeholders::_1)});
}
void main(const std::shared_ptr<smo::ComponentThread> &componentThread,
+3 -2
View File
@@ -10,6 +10,7 @@
#include "device.h"
#include "broadcastListener.h"
#include "livoxProto1.h"
#include <callback.h>
namespace livoxProto1 {
@@ -27,11 +28,11 @@ public:
int handshakeTimeoutMs, int retryDelayMs,
const std::string& smoIp, uint8_t smoSubnetNbits,
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort,
livoxProto1_getOrCreateDeviceReqCbFn callback);
smo::Callback<livoxProto1_getOrCreateDeviceReqCbFn> callback);
void destroyDeviceReq(
std::shared_ptr<Device> device,
livoxProto1_destroyDeviceReqCbFn callback);
smo::Callback<livoxProto1_destroyDeviceReqCbFn> callback);
std::optional<std::shared_ptr<Device>> getDevice(
const std::string &deviceIdentifier);
+28 -24
View File
@@ -16,6 +16,7 @@
#include <boost/asio.hpp>
#include <opts.h>
#include <asynchronousContinuation.h>
#include <callback.h>
#include "device.h"
#include "protocol.h"
#include "core.h"
@@ -127,7 +128,7 @@ private:
std::unique_ptr<boost::asio::deadline_timer> retryTimer;
public:
ConnectReq(Device& dev, Device::connectReqCbFn cb)
ConnectReq(Device& dev, smo::Callback<Device::connectReqCbFn> cb)
: smo::NonPostedAsynchronousContinuation<Device::connectReqCbFn>(
std::move(cb)), device(dev)
{}
@@ -160,9 +161,9 @@ public:
// Try direct connect by device identifier
context->device.connectByDeviceIdentifierReq(
std::bind(&ConnectReq::connectReq2, context.get(), context,
{context, std::bind(&ConnectReq::connectReq2, context.get(), context,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3));
std::placeholders::_3)});
}
void connectReq2(
@@ -219,9 +220,9 @@ public:
}
context->device.connectToKnownDeviceReq(
std::bind(&ConnectReq::connectReq4, context.get(), context,
{context, std::bind(&ConnectReq::connectReq4, context.get(), context,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3));
std::placeholders::_3)});
}
void connectReq4(
@@ -243,7 +244,7 @@ public:
}
};
void Device::connectReq(Device::connectReqCbFn callback)
void Device::connectReq(smo::Callback<Device::connectReqCbFn> callback)
{
// Create the connection request object to hold state and callbacks
auto request = std::make_shared<ConnectReq>(*this, std::move(callback));
@@ -254,10 +255,10 @@ void Device::connectReq(Device::connectReqCbFn callback)
}
connectToKnownDeviceReq(
std::bind(
{request, std::bind(
&ConnectReq::connectReq1, request.get(), request,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3));
std::placeholders::_3)});
}
class Device::ConnectToKnownDeviceReq
@@ -269,7 +270,7 @@ public:
std::string deviceIP;
std::shared_ptr<livoxProto1::comms::DiscoveredDevice> deviceInfo;
ConnectToKnownDeviceReq(Device& dev, Device::connectToKnownDeviceReqCbFn cb)
ConnectToKnownDeviceReq(Device& dev, smo::Callback<Device::connectToKnownDeviceReqCbFn> cb)
: smo::NonPostedAsynchronousContinuation<
Device::connectToKnownDeviceReqCbFn>(std::move(cb)), device(dev)
{}
@@ -297,7 +298,7 @@ public:
* broadcastListener.
*/
void Device::connectToKnownDeviceReq(
Device::connectToKnownDeviceReqCbFn callback
smo::Callback<Device::connectToKnownDeviceReqCbFn> callback
)
{
// Create the connection request object to hold state and callbacks
@@ -357,10 +358,10 @@ void Device::connectToKnownDeviceReq(
// Execute handshake with the known device using async method
request->device.executeHandshakeReq(
request->deviceIP,
std::bind(
{request, std::bind(
&ConnectToKnownDeviceReq::connectToKnownDeviceReq1,
request.get(), request,
std::placeholders::_1, std::placeholders::_2));
std::placeholders::_1, std::placeholders::_2)});
}
class Device::ConnectByDeviceIdentifierReq
@@ -372,7 +373,7 @@ public:
std::string deviceIP;
ConnectByDeviceIdentifierReq(
Device& dev, Device::connectByDeviceIdentifierReqCbFn cb)
Device& dev, smo::Callback<Device::connectByDeviceIdentifierReqCbFn> cb)
: smo::NonPostedAsynchronousContinuation<
Device::connectByDeviceIdentifierReqCbFn>(
std::move(cb)), device(dev)
@@ -398,7 +399,7 @@ public:
};
void Device::connectByDeviceIdentifierReq(
Device::connectByDeviceIdentifierReqCbFn callback
smo::Callback<Device::connectByDeviceIdentifierReqCbFn> callback
)
{
/** EXPLANATION:
@@ -415,7 +416,7 @@ void Device::connectByDeviceIdentifierReq(
// Check if smoIp is provided - required for heuristic construction
if (smoIp.empty())
{
callback(false, "", -1);
callback.callbackFn(false, "", -1);
return;
}
@@ -440,10 +441,10 @@ void Device::connectByDeviceIdentifierReq(
// Execute handshake using async method
request->device.executeHandshakeReq(
request->deviceIP,
std::bind(
{request, std::bind(
&ConnectByDeviceIdentifierReq::connectByDeviceIdentifierReq1,
request.get(), request,
std::placeholders::_1, std::placeholders::_2));
std::placeholders::_1, std::placeholders::_2)});
}
class Device::ExecuteHandshakeReq
@@ -452,7 +453,8 @@ class Device::ExecuteHandshakeReq
{
public:
friend void Device::executeHandshakeReq(
const std::string& deviceIP, Device::executeHandshakeReqCbFn callback);
const std::string& deviceIP,
smo::Callback<Device::executeHandshakeReqCbFn> callback);
enum class SocketState
{
@@ -484,7 +486,7 @@ public:
public:
ExecuteHandshakeReq(
Device& dev, const std::string& deviceIP,
Device::executeHandshakeReqCbFn cb)
smo::Callback<Device::executeHandshakeReqCbFn> cb)
: smo::NonPostedAsynchronousContinuation<Device::executeHandshakeReqCbFn>(
std::move(cb)),
device(dev), deviceIP(deviceIP),
@@ -650,7 +652,8 @@ private:
*/
handshakeFdDesc.async_wait(
boost::asio::posix::stream_descriptor::wait_read,
std::bind(&ExecuteHandshakeReq::executeHandshakeReq1_2, this,
std::bind(
&ExecuteHandshakeReq::executeHandshakeReq1_2, this,
request,
std::placeholders::_1));
}
@@ -830,7 +833,8 @@ private:
};
void Device::executeHandshakeReq(
const std::string& deviceIP, Device::executeHandshakeReqCbFn callback
const std::string& deviceIP,
smo::Callback<Device::executeHandshakeReqCbFn> callback
)
{
// Create the handshake request object to hold state and callbacks
@@ -866,7 +870,7 @@ void Device::executeHandshakeReq(
}
}
void Device::disconnectReq(Device::disconnectReqCbFn callback)
void Device::disconnectReq(smo::Callback<Device::disconnectReqCbFn> callback)
{
// Stop heartbeat first
heartbeatActive.store(false);
@@ -875,7 +879,7 @@ void Device::disconnectReq(Device::disconnectReqCbFn callback)
{
std::cout << __func__ << ": No heartbeat socket available, skipping "
"disconnect message" << std::endl;
callback(true);
callback.callbackFn(true);
return;
}
@@ -911,7 +915,7 @@ void Device::disconnectReq(Device::disconnectReqCbFn callback)
// Close the heartbeat socket
close(heartbeatFd);
heartbeatFd = -1;
callback(true);
callback.callbackFn(true);
}
std::string Device::generateClientDeviceIpFromSerialNumber(
+8 -5
View File
@@ -13,6 +13,7 @@
#include <unistd.h>
#include <boost/asio.hpp>
#include "protocol.h"
#include <callback.h>
// Forward declaration
namespace smo {
@@ -111,13 +112,15 @@ public:
typedef std::function<void(bool success)> disconnectReqCbFn;
// Async connection methods
void connectReq(connectReqCbFn callback);
void connectToKnownDeviceReq(connectToKnownDeviceReqCbFn callback);
void connectReq(smo::Callback<connectReqCbFn> callback);
void connectToKnownDeviceReq(
smo::Callback<connectToKnownDeviceReqCbFn> callback);
void connectByDeviceIdentifierReq(
connectByDeviceIdentifierReqCbFn callback);
smo::Callback<connectByDeviceIdentifierReqCbFn> callback);
void executeHandshakeReq(
const std::string& deviceIP, executeHandshakeReqCbFn callback);
void disconnectReq(disconnectReqCbFn callback);
const std::string& deviceIP,
smo::Callback<executeHandshakeReqCbFn> callback);
void disconnectReq(smo::Callback<disconnectReqCbFn> callback);
// Heartbeat state
std::unique_ptr<boost::asio::deadline_timer> heartbeatTimer;
+3 -2
View File
@@ -1,4 +1,5 @@
#include <stdexcept>
#include <callback.h>
#include "livoxProto1.h"
#include "device.h"
#include "core.h"
@@ -12,7 +13,7 @@ void livoxProto1_getOrCreateDeviceReq(
int handshakeTimeoutMs, int retryDelayMs,
const std::string& smoIp, uint8_t smoSubnetNbits,
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort,
livoxProto1_getOrCreateDeviceReqCbFn callback
smo::Callback<livoxProto1_getOrCreateDeviceReqCbFn> callback
)
{
// Get the global DeviceManager instance
@@ -35,7 +36,7 @@ void livoxProto1_getOrCreateDeviceReq(
void livoxProto1_destroyDeviceReq(
std::shared_ptr<livoxProto1::Device> device,
livoxProto1_destroyDeviceReqCbFn callback
smo::Callback<livoxProto1_destroyDeviceReqCbFn> callback
)
{
auto& protoState = livoxProto1::getProtoState();
+3 -2
View File
@@ -5,6 +5,7 @@
#include <string>
#include <cstdint>
#include <functional>
#include <callback.h>
// Forward declarations
namespace smo {
@@ -59,12 +60,12 @@ typedef void livoxProto1_getOrCreateDeviceReqFn(
int handshakeTimeoutMs, int retryDelayMs,
const std::string& smoIp, uint8_t smoSubnetNbits,
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort,
livoxProto1_getOrCreateDeviceReqCbFn callback);
smo::Callback<livoxProto1_getOrCreateDeviceReqCbFn> callback);
typedef std::function<void(bool success)> livoxProto1_destroyDeviceReqCbFn;
typedef void livoxProto1_destroyDeviceReqFn(
std::shared_ptr<livoxProto1::Device> device,
livoxProto1_destroyDeviceReqCbFn callback);
smo::Callback<livoxProto1_destroyDeviceReqCbFn> callback);
livoxProto1_mainFn livoxProto1_main;