livoxProto1: Implement async getOrCreateDeviceReq+destroyDeviceReq

These are now both fully asynchronous. They also work fully
and both connect and disconnect to/from the Avia just fine,
in all tested scenarios.
This commit is contained in:
2025-09-09 12:07:49 -04:00
parent 48121ec84c
commit 20cdf64afb
9 changed files with 1041 additions and 269 deletions
+8 -4
View File
@@ -1,5 +1,6 @@
#include <algorithm>
#include <iostream>
#include <opts.h>
#include "broadcastListener.h"
namespace livoxProto1 {
@@ -94,9 +95,12 @@ void BroadcastListener::broadcastMsgInd(
if (deviceExists(broadcastCode))
{
// Device already exists, just log the update
std::cout << __func__
<< ": Received broadcast from known device: "
<< broadcastCode << " at " << senderIP << std::endl;
if (OptionParser::getOptions().verbose)
{
std::cout << __func__
<< ": Received broadcast from known device: "
<< broadcastCode << " at " << senderIP << "\n";
}
return;
}
@@ -106,7 +110,7 @@ void BroadcastListener::broadcastMsgInd(
// Output device information using stringify
std::cout << __func__ << ": Discovered new Livox device: "
<< device->stringify() << std::endl;
<< device->stringify() << "\n";
}
void BroadcastListener::start(void)
+156 -24
View File
@@ -1,8 +1,16 @@
#include <algorithm>
#include <iostream>
#include <functional>
#include <optional>
#include <opts.h>
#include <AsynchronousContinuation.h>
#include <user/senseApiDesc.h>
#include "protocol.h"
#include "core.h"
#include "device.h"
#include "broadcastListener.h"
#include "livoxProto1.h"
namespace livoxProto1 {
@@ -10,7 +18,8 @@ static ProtoState protoState =
{
.isInitialized = false,
.componentThread = nullptr,
.deviceManager = nullptr
.deviceManager = nullptr,
.smoCallbacks = {}
};
ProtoState& getProtoState()
@@ -46,13 +55,80 @@ void DeviceManager::deviceGoneAwayInd(const comms::DiscoveredDevice &device)
}
}
std::shared_ptr<Device> DeviceManager::getOrCreateDevice(
std::optional<std::shared_ptr<Device>> DeviceManager::getDevice(
const std::string &deviceIdentifier
)
{
for (auto& device : devices)
{
if (comms::deviceIdentifiersEqual(
device->discoveredDevice.deviceIdentifier, deviceIdentifier))
{
return device;
}
}
return std::nullopt;
}
// GetOrCreateDeviceReq nested class implementation
class DeviceManager::GetOrCreateDeviceReq
: public AsynchronousContinuation<livoxProto1_getOrCreateDeviceReqCbFn>
{
public:
DeviceManager& deviceManager;
// The device we're trying to connect (holds all connection parameters)
std::shared_ptr<Device> pendingDevice;
public:
GetOrCreateDeviceReq(
DeviceManager& mgr,
std::shared_ptr<Device> device,
livoxProto1_getOrCreateDeviceReqCbFn cb)
: AsynchronousContinuation(std::move(cb)),
deviceManager(mgr), pendingDevice(device)
{}
// Public accessor for the original callback
void callOriginalCallback(bool success, std::shared_ptr<Device> device)
{ originalCbFn(success, device); }
void callOriginalCallbackWithFailure()
{ callOriginalCallback(false, nullptr); }
void getOrCreateDeviceReq1(
std::shared_ptr<GetOrCreateDeviceReq> context, bool connectSuccess
)
{
if (!connectSuccess)
{
std::cerr << __func__ << ": Connection failed for device "
<< context->pendingDevice->discoveredDevice.deviceIdentifier
<< std::endl;
context->callOriginalCallbackWithFailure();
return;
}
// Connection successful, add device to collection
context->deviceManager.devices.push_back(context->pendingDevice);
if (OptionParser::getOptions().verbose)
{
std::cout << __func__ << ": Successfully connected and added device "
<< context->pendingDevice->discoveredDevice.deviceIdentifier
<< std::endl;
}
// Return success with the connected device
context->callOriginalCallback(true, context->pendingDevice);
}
};
void DeviceManager::getOrCreateDeviceReq(
const std::string &deviceIdentifier,
const std::shared_ptr<smo::ComponentThread>& componentThread,
int handshakeTimeoutMs, int retryDelayMs,
const std::string& smoIp, uint8_t smoSubnetNbits,
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort
)
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort,
livoxProto1_getOrCreateDeviceReqCbFn callback)
{
// Validate smoIp format using Boost.Asio IPv4 validation
if (!smoIp.empty() && !comms::isValidIPv4(smoIp))
@@ -73,43 +149,98 @@ std::shared_ptr<Device> DeviceManager::getOrCreateDevice(
// First try to get existing device
auto existingDevice = getDevice(deviceIdentifier);
if (existingDevice) {
return existingDevice;
if (existingDevice)
{
// Device already exists and is connected, return it
callback(true, existingDevice.value());
return;
}
// Device doesn't exist, create a new one
// Device doesn't exist, create a new one but don't add it to collection yet
auto newDevice = std::make_shared<Device>(
deviceIdentifier, componentThread,
handshakeTimeoutMs, retryDelayMs,
smoIp, smoSubnetNbits,
dataPort, cmdPort, imuPort);
// Add to our collection
devices.push_back(newDevice);
return newDevice;
// Create the continuation request object to hold state and callbacks
auto request = std::make_shared<GetOrCreateDeviceReq>(
*this, newDevice, std::move(callback));
// Start the connection process - only add to collection on success
request->pendingDevice->connectReq(
std::bind(
&DeviceManager::GetOrCreateDeviceReq::getOrCreateDeviceReq1,
request.get(), request, std::placeholders::_1));
}
std::shared_ptr<Device> DeviceManager::getDevice(
const std::string &deviceIdentifier
)
class DeviceManager::DestroyDeviceReq
: public AsynchronousContinuation<livoxProto1_destroyDeviceReqCbFn>
{
for (auto& device : devices)
public:
DeviceManager& deviceManager;
std::shared_ptr<Device> pendingDevice;
public:
DestroyDeviceReq(
DeviceManager& mgr,
std::shared_ptr<Device> device,
livoxProto1_destroyDeviceReqCbFn cb)
: AsynchronousContinuation(std::move(cb)),
deviceManager(mgr), pendingDevice(device)
{}
// Public accessor for the original callback
void callOriginalCallback(bool success)
{ originalCbFn(success); }
void callOriginalCallbackWithFailure()
{ callOriginalCallback(false); }
void destroyDeviceReq1(
std::shared_ptr<DestroyDeviceReq> context, bool success
)
{
if (comms::deviceIdentifiersEqual(
device->discoveredDevice.deviceIdentifier, deviceIdentifier))
{
return device;
}
context->deviceManager.devices.erase(
std::remove(
context->deviceManager.devices.begin(),
context->deviceManager.devices.end(),
context->pendingDevice),
context->deviceManager.devices.end());
context->callOriginalCallback(success);
}
return nullptr;
}
};
bool DeviceManager::isDeviceKnown(const std::string& deviceIdentifier)
void DeviceManager::destroyDeviceReq(
std::shared_ptr<Device> dev,
livoxProto1_destroyDeviceReqCbFn callback
)
{
return broadcastListener.deviceExists(deviceIdentifier);
/** EXPLANATION:
* Check to see if the device is in our collection. If so, call
* disconnectReq and then remove it.
*/
std::shared_ptr<Device> device = getDevice(dev->discoveredDevice).
value_or(nullptr);
if (!device)
{
callback(false);
return;
}
auto request = std::make_shared<DestroyDeviceReq>(
*this, device, std::move(callback));
device->disconnectReq(
std::bind(
&DeviceManager::DestroyDeviceReq::destroyDeviceReq1,
request.get(), request, std::placeholders::_1));
}
void main(const std::shared_ptr<smo::ComponentThread> &componentThread)
void main(const std::shared_ptr<smo::ComponentThread> &componentThread,
const smo::sense_api::SmoCallbacks& smoCallbacks)
{
if (protoState.isInitialized) {
return;
@@ -117,6 +248,7 @@ void main(const std::shared_ptr<smo::ComponentThread> &componentThread)
protoState.isInitialized = true;
protoState.componentThread = componentThread;
protoState.smoCallbacks = smoCallbacks;
protoState.deviceManager = std::make_unique<DeviceManager>();
protoState.deviceManager->broadcastListener.start();
}
+26 -9
View File
@@ -5,8 +5,11 @@
#include <string>
#include <memory>
#include <cstdint>
#include <optional>
#include <user/senseApiDesc.h>
#include "device.h"
#include "broadcastListener.h"
#include "livoxProto1.h"
namespace livoxProto1 {
@@ -18,30 +21,43 @@ public:
static void deviceGoneAwayInd(const comms::DiscoveredDevice &device);
std::shared_ptr<Device> getOrCreateDevice(
void getOrCreateDeviceReq(
const std::string &deviceIdentifier,
const std::shared_ptr<smo::ComponentThread>& componentThread,
int handshakeTimeoutMs, int retryDelayMs,
const std::string& smoIp, uint8_t smoSubnetNbits,
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort);
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort,
livoxProto1_getOrCreateDeviceReqCbFn callback);
std::shared_ptr<Device> getDevice(const std::string &deviceIdentifier);
std::shared_ptr<Device> getDevice(const comms::DiscoveredDevice &device)
{ return getDevice(device.deviceIdentifier); }
void destroyDeviceReq(
std::shared_ptr<Device> device,
livoxProto1_destroyDeviceReqCbFn callback);
std::optional<std::shared_ptr<Device>> getDevice(
const std::string &deviceIdentifier);
std::optional<std::shared_ptr<Device>> getDevice(
const comms::DiscoveredDevice &device)
{
return getDevice(device.deviceIdentifier);
}
private:
// Helper methods
bool isDeviceKnown(const std::string& deviceIdentifier);
// Configuration
static constexpr int RETRY_DELAY_SECONDS = 3; // <N> seconds delay
public:
std::vector<std::shared_ptr<Device>> devices;
comms::BroadcastListener broadcastListener;
// Nested continuation class for async device creation
class GetOrCreateDeviceReq;
class DestroyDeviceReq;
};
void main(const std::shared_ptr<smo::ComponentThread> &componentThread);
void main(
const std::shared_ptr<smo::ComponentThread> &componentThread,
const smo::sense_api::SmoCallbacks& smoCallbacks);
void exit(void);
// Global state structure
@@ -50,6 +66,7 @@ struct ProtoState
bool isInitialized = false;
std::shared_ptr<smo::ComponentThread> componentThread;
std::unique_ptr<DeviceManager> deviceManager;
smo::sense_api::SmoCallbacks smoCallbacks;
};
// Access to global state for extern "C" functions
File diff suppressed because it is too large Load Diff
+35 -13
View File
@@ -6,6 +6,7 @@
#include <memory>
#include <atomic>
#include <optional>
#include <functional>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
@@ -76,29 +77,50 @@ public:
uint16_t dataPort, cmdPort, imuPort;
private:
// Connection logic
void connect();
bool connectToKnownDevice();
bool connectByDeviceIdentifier();
bool executeHandshake(
const std::string& deviceIP, int timeoutMs,
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort);
// Heartbeat mechanism
void startHeartbeat();
void sendHeartbeat();
void onHeartbeatTimer(const boost::system::error_code& error);
std::string generateClientDeviceIpFromSerialNumber(
const std::string& broadcastCode);
// IP detection methods
std::optional<std::string> detectSmoIp(const std::string& deviceIP);
std::string getSmoIp(const std::string& deviceIP);
uint32_t getSubnetMaskFor(uint8_t nbits);
// Heartbeat mechanism
void startHeartbeat();
void sendHeartbeat();
void onHeartbeatTimer(const boost::system::error_code& error);
class ConnectReq;
class ConnectToKnownDeviceReq;
class ConnectByDeviceIdentifierReq;
class ExecuteHandshakeReq;
class DisconnectReq;
public:
// Utility methods
std::string getSmoIp(const std::string& deviceIP);
// Callback function type definitions for async methods
typedef std::function<void(bool success)> connectReqCbFn;
typedef std::function<
void(bool success, const std::string& ipAddr, int fd)>
connectToKnownDeviceReqCbFn;
typedef std::function<
void(bool success, const std::string& ipAddr, int fd)>
connectByDeviceIdentifierReqCbFn;
typedef std::function<void(bool success, int fd)> executeHandshakeReqCbFn;
typedef std::function<void(bool success)> disconnectReqCbFn;
// Async connection methods
void connectReq(connectReqCbFn callback);
void connectToKnownDeviceReq(connectToKnownDeviceReqCbFn callback);
void connectByDeviceIdentifierReq(
connectByDeviceIdentifierReqCbFn callback);
void executeHandshakeReq(
const std::string& deviceIP, executeHandshakeReqCbFn callback);
void disconnectReq(disconnectReqCbFn callback);
// Heartbeat state
std::unique_ptr<boost::asio::deadline_timer> heartbeatTimer;
int heartbeatSocketFd; // Raw socket file descriptor
int heartbeatFd; // Socket file descriptor used for heartbeat
std::atomic<bool> heartbeatActive;
};
+25 -6
View File
@@ -6,12 +6,13 @@
extern "C" {
std::shared_ptr<livoxProto1::Device> livoxProto1_getOrCreateDevice(
void livoxProto1_getOrCreateDeviceReq(
const std::string& deviceIdentifier,
const std::shared_ptr<smo::ComponentThread>& componentThread,
int handshakeTimeoutMs, int retryDelayMs,
const std::string& smoIp, uint8_t smoSubnetNbits,
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort,
livoxProto1_getOrCreateDeviceReqCbFn callback
)
{
// Get the global DeviceManager instance
@@ -24,16 +25,34 @@ std::shared_ptr<livoxProto1::Device> livoxProto1_getOrCreateDevice(
}
// Delegate to DeviceManager
return protoState.deviceManager->getOrCreateDevice(
protoState.deviceManager->getOrCreateDeviceReq(
deviceIdentifier, componentThread,
handshakeTimeoutMs, retryDelayMs,
smoIp, smoSubnetNbits,
dataPort, cmdPort, imuPort);
dataPort, cmdPort, imuPort,
callback);
}
void livoxProto1_main(const std::shared_ptr<smo::ComponentThread>& componentThread)
void livoxProto1_destroyDeviceReq(
std::shared_ptr<livoxProto1::Device> device,
livoxProto1_destroyDeviceReqCbFn callback
)
{
livoxProto1::main(componentThread);
auto& protoState = livoxProto1::getProtoState();
if (!protoState.deviceManager)
{
throw std::runtime_error(std::string(__func__)
+ ": DeviceManager not initialized");
}
protoState.deviceManager->destroyDeviceReq(device, callback);
}
void livoxProto1_main(
const std::shared_ptr<smo::ComponentThread>& componentThread,
const smo::sense_api::SmoCallbacks& smoCallbacks)
{
livoxProto1::main(componentThread, smoCallbacks);
}
void livoxProto1_exit(void)
+22 -4
View File
@@ -4,9 +4,13 @@
#include <memory>
#include <string>
#include <cstdint>
#include <functional>
// Forward declarations
namespace smo {
namespace sense_api {
struct SmoCallbacks;
}
class ComponentThread;
}
@@ -21,9 +25,11 @@ extern "C" {
/**
* Initialize the Livox protocol library
* @param componentThread Component thread shared pointer
* @param smoCallbacks Callbacks provided by SMO
*/
typedef void livoxProto1_mainFn(
const std::shared_ptr<smo::ComponentThread>& componentThread);
const std::shared_ptr<smo::ComponentThread>& componentThread,
const smo::sense_api::SmoCallbacks& smoCallbacks);
/**
* Cleanup the Livox protocol library
@@ -43,16 +49,28 @@ typedef void livoxProto1_exitFn(void);
* @param imuPort IMU port (default: 56002)
* @return Device pointer on success, nullptr on failure
*/
typedef std::shared_ptr<livoxProto1::Device> livoxProto1_getOrCreateDeviceFn(
typedef std::function<
void(bool success, std::shared_ptr<livoxProto1::Device> device)>
livoxProto1_getOrCreateDeviceReqCbFn;
typedef void livoxProto1_getOrCreateDeviceReqFn(
const std::string& deviceIdentifier,
const std::shared_ptr<smo::ComponentThread>& componentThread,
int handshakeTimeoutMs, int retryDelayMs,
const std::string& smoIp, uint8_t smoSubnetNbits,
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort);
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort,
livoxProto1_getOrCreateDeviceReqCbFn callback);
typedef std::function<void(bool success)> livoxProto1_destroyDeviceReqCbFn;
typedef void livoxProto1_destroyDeviceReqFn(
std::shared_ptr<livoxProto1::Device> device,
livoxProto1_destroyDeviceReqCbFn callback);
livoxProto1_mainFn livoxProto1_main;
livoxProto1_exitFn livoxProto1_exit;
livoxProto1_getOrCreateDeviceFn livoxProto1_getOrCreateDevice;
livoxProto1_getOrCreateDeviceReqFn livoxProto1_getOrCreateDeviceReq;
livoxProto1_destroyDeviceReqFn livoxProto1_destroyDeviceReq;
#ifdef __cplusplus
}
+70
View File
@@ -638,5 +638,75 @@ bool HeartbeatMessage::validateCrc32() const
return isValid;
}
// DisconnectMessage methods
DisconnectMessage::DisconnectMessage()
{
// Initialize header
header.sof = 0xAA;
header.version = 0x01;
header.length = sizeof(Header) + sizeof(Command) + sizeof(Footer);
header.cmd_type = 0x00; // kCommandTypeCmd
header.seq_num = 0x0001; // Simple sequence number
header.crc_16 = 0; // Will be calculated
// Initialize command
command.cmd_set = 0x00; // kCommandSetGeneral
command.cmd_id = 0x06; // kCommandIDGeneralDisconnect
// Initialize footer
footer.crc_32 = 0; // Will be calculated
// Note: CRC16 will be calculated before sending (in swapToProtocolEndianness)
}
uint32_t DisconnectMessage::calculateCrc32() const
{
// Calculate CRC32 for the entire message excluding the footer.crc_32 field
const uint8_t* messageData = reinterpret_cast<const uint8_t*>(this);
size_t messageSize = sizeof(DisconnectMessage) - sizeof(footer.crc_32);
return comms::calculateCrc32(messageData, messageSize);
}
void DisconnectMessage::swapContentsToProtocolEndianness()
{
// Protocol is little-endian, so if host is already little-endian, no swap needed
if (endian::isLittleEndian()) {
return;
}
// Host is big-endian, need to swap to little-endian
// Only swap content fields, not CRC fields
header.swapToProtocolEndianness();
command.swapToProtocolEndianness();
// Note: footer.swapToProtocolEndianness() swaps CRC, so we skip it here
}
bool DisconnectMessage::sanityCheck() const
{
return header.sanityCheck() &&
command.sanityCheck() &&
(command.cmd_set == 0x00) && (command.cmd_id == 0x06) &&
footer.sanityCheck();
}
bool DisconnectMessage::validateCrc32() const
{
// Use the calculateCrc32 method to avoid code duplication
uint32_t calculatedCrc = calculateCrc32();
// Compare with the CRC in the footer
bool isValid = (calculatedCrc == footer.crc_32);
// Debug output only if validation fails
if (!isValid)
{
std::cout << "DisconnectMessage CRC32 Debug: calculated=0x"
<< std::hex << calculatedCrc
<< ", received=0x" << footer.crc_32 << std::dec << std::endl;
}
return isValid;
}
} // namespace comms
} // namespace livoxProto1
+17
View File
@@ -229,6 +229,23 @@ struct HeartbeatMessage
bool validateCrc32() const;
} __attribute__((packed));
/** EXPLANATION:
* Complete disconnect command frame for disconnecting from Livox devices.
* This is the complete wire format including header, command fields, and footer.
*/
struct DisconnectMessage
{
Header header; // 0-8: Protocol frame header
Command command; // 9-10: Command identification
Footer footer; // 11-14: Protocol frame footer
DisconnectMessage();
uint32_t calculateCrc32() const;
void swapContentsToProtocolEndianness();
bool sanityCheck() const;
bool validateCrc32() const;
} __attribute__((packed));
} // namespace comms
} // namespace livoxProto1