From 20cdf64afb58b8afe035f2e71ea281ff14a28444 Mon Sep 17 00:00:00 2001 From: Hayodea Hakol Date: Tue, 9 Sep 2025 12:07:49 -0400 Subject: [PATCH] livoxProto1: Implement async getOrCreateDeviceReq+destroyDeviceReq These are now both fully asynchronous. They also work fully and both connect and disconnect to/from the Avia just fine, in all tested scenarios. --- commonLibs/livoxProto1/broadcastListener.cpp | 12 +- commonLibs/livoxProto1/core.cpp | 180 +++- commonLibs/livoxProto1/core.h | 35 +- commonLibs/livoxProto1/device.cpp | 891 ++++++++++++++----- commonLibs/livoxProto1/device.h | 48 +- commonLibs/livoxProto1/livoxProto1.cpp | 31 +- commonLibs/livoxProto1/livoxProto1.h | 26 +- commonLibs/livoxProto1/protocol.cpp | 70 ++ commonLibs/livoxProto1/protocol.h | 17 + 9 files changed, 1041 insertions(+), 269 deletions(-) diff --git a/commonLibs/livoxProto1/broadcastListener.cpp b/commonLibs/livoxProto1/broadcastListener.cpp index d652627..064d7d7 100644 --- a/commonLibs/livoxProto1/broadcastListener.cpp +++ b/commonLibs/livoxProto1/broadcastListener.cpp @@ -1,5 +1,6 @@ #include #include +#include #include "broadcastListener.h" namespace livoxProto1 { @@ -94,9 +95,12 @@ void BroadcastListener::broadcastMsgInd( if (deviceExists(broadcastCode)) { // Device already exists, just log the update - std::cout << __func__ - << ": Received broadcast from known device: " - << broadcastCode << " at " << senderIP << std::endl; + if (OptionParser::getOptions().verbose) + { + std::cout << __func__ + << ": Received broadcast from known device: " + << broadcastCode << " at " << senderIP << "\n"; + } return; } @@ -106,7 +110,7 @@ void BroadcastListener::broadcastMsgInd( // Output device information using stringify std::cout << __func__ << ": Discovered new Livox device: " - << device->stringify() << std::endl; + << device->stringify() << "\n"; } void BroadcastListener::start(void) diff --git a/commonLibs/livoxProto1/core.cpp b/commonLibs/livoxProto1/core.cpp index 5e50cba..85b79b0 100644 --- a/commonLibs/livoxProto1/core.cpp +++ b/commonLibs/livoxProto1/core.cpp @@ -1,8 +1,16 @@ #include #include +#include +#include +#include +#include #include #include "protocol.h" #include "core.h" +#include "device.h" +#include "broadcastListener.h" +#include "livoxProto1.h" + namespace livoxProto1 { @@ -10,7 +18,8 @@ static ProtoState protoState = { .isInitialized = false, .componentThread = nullptr, - .deviceManager = nullptr + .deviceManager = nullptr, + .smoCallbacks = {} }; ProtoState& getProtoState() @@ -46,13 +55,80 @@ void DeviceManager::deviceGoneAwayInd(const comms::DiscoveredDevice &device) } } -std::shared_ptr DeviceManager::getOrCreateDevice( +std::optional> DeviceManager::getDevice( + const std::string &deviceIdentifier + ) +{ + for (auto& device : devices) + { + if (comms::deviceIdentifiersEqual( + device->discoveredDevice.deviceIdentifier, deviceIdentifier)) + { + return device; + } + } + return std::nullopt; +} + +// GetOrCreateDeviceReq nested class implementation +class DeviceManager::GetOrCreateDeviceReq +: public AsynchronousContinuation +{ +public: + DeviceManager& deviceManager; + // The device we're trying to connect (holds all connection parameters) + std::shared_ptr pendingDevice; + +public: + GetOrCreateDeviceReq( + DeviceManager& mgr, + std::shared_ptr device, + livoxProto1_getOrCreateDeviceReqCbFn cb) + : AsynchronousContinuation(std::move(cb)), + deviceManager(mgr), pendingDevice(device) + {} + + // Public accessor for the original callback + void callOriginalCallback(bool success, std::shared_ptr device) + { originalCbFn(success, device); } + + void callOriginalCallbackWithFailure() + { callOriginalCallback(false, nullptr); } + + void getOrCreateDeviceReq1( + std::shared_ptr context, bool connectSuccess + ) + { + if (!connectSuccess) + { + std::cerr << __func__ << ": Connection failed for device " + << context->pendingDevice->discoveredDevice.deviceIdentifier + << std::endl; + context->callOriginalCallbackWithFailure(); + return; + } + + // Connection successful, add device to collection + context->deviceManager.devices.push_back(context->pendingDevice); + if (OptionParser::getOptions().verbose) + { + std::cout << __func__ << ": Successfully connected and added device " + << context->pendingDevice->discoveredDevice.deviceIdentifier + << std::endl; + } + + // Return success with the connected device + context->callOriginalCallback(true, context->pendingDevice); + } +}; + +void DeviceManager::getOrCreateDeviceReq( const std::string &deviceIdentifier, const std::shared_ptr& componentThread, int handshakeTimeoutMs, int retryDelayMs, const std::string& smoIp, uint8_t smoSubnetNbits, - uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort - ) + uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort, + livoxProto1_getOrCreateDeviceReqCbFn callback) { // Validate smoIp format using Boost.Asio IPv4 validation if (!smoIp.empty() && !comms::isValidIPv4(smoIp)) @@ -73,43 +149,98 @@ std::shared_ptr DeviceManager::getOrCreateDevice( // First try to get existing device auto existingDevice = getDevice(deviceIdentifier); - if (existingDevice) { - return existingDevice; + if (existingDevice) + { + // Device already exists and is connected, return it + callback(true, existingDevice.value()); + return; } - // Device doesn't exist, create a new one + // Device doesn't exist, create a new one but don't add it to collection yet auto newDevice = std::make_shared( deviceIdentifier, componentThread, handshakeTimeoutMs, retryDelayMs, smoIp, smoSubnetNbits, dataPort, cmdPort, imuPort); - // Add to our collection - devices.push_back(newDevice); - return newDevice; + // Create the continuation request object to hold state and callbacks + auto request = std::make_shared( + *this, newDevice, std::move(callback)); + + // Start the connection process - only add to collection on success + request->pendingDevice->connectReq( + std::bind( + &DeviceManager::GetOrCreateDeviceReq::getOrCreateDeviceReq1, + request.get(), request, std::placeholders::_1)); } -std::shared_ptr DeviceManager::getDevice( - const std::string &deviceIdentifier - ) +class DeviceManager::DestroyDeviceReq +: public AsynchronousContinuation { - for (auto& device : devices) + public: + DeviceManager& deviceManager; + std::shared_ptr pendingDevice; + +public: + DestroyDeviceReq( + DeviceManager& mgr, + std::shared_ptr device, + livoxProto1_destroyDeviceReqCbFn cb) + : AsynchronousContinuation(std::move(cb)), + deviceManager(mgr), pendingDevice(device) + {} + + // Public accessor for the original callback + void callOriginalCallback(bool success) + { originalCbFn(success); } + + void callOriginalCallbackWithFailure() + { callOriginalCallback(false); } + + void destroyDeviceReq1( + std::shared_ptr context, bool success + ) { - if (comms::deviceIdentifiersEqual( - device->discoveredDevice.deviceIdentifier, deviceIdentifier)) - { - return device; - } + context->deviceManager.devices.erase( + std::remove( + context->deviceManager.devices.begin(), + context->deviceManager.devices.end(), + context->pendingDevice), + context->deviceManager.devices.end()); + + context->callOriginalCallback(success); } - return nullptr; -} +}; -bool DeviceManager::isDeviceKnown(const std::string& deviceIdentifier) +void DeviceManager::destroyDeviceReq( + std::shared_ptr dev, + livoxProto1_destroyDeviceReqCbFn callback +) { - return broadcastListener.deviceExists(deviceIdentifier); + /** EXPLANATION: + * Check to see if the device is in our collection. If so, call + * disconnectReq and then remove it. + */ + std::shared_ptr device = getDevice(dev->discoveredDevice). + value_or(nullptr); + + if (!device) + { + callback(false); + return; + } + + auto request = std::make_shared( + *this, device, std::move(callback)); + + device->disconnectReq( + std::bind( + &DeviceManager::DestroyDeviceReq::destroyDeviceReq1, + request.get(), request, std::placeholders::_1)); } -void main(const std::shared_ptr &componentThread) +void main(const std::shared_ptr &componentThread, + const smo::sense_api::SmoCallbacks& smoCallbacks) { if (protoState.isInitialized) { return; @@ -117,6 +248,7 @@ void main(const std::shared_ptr &componentThread) protoState.isInitialized = true; protoState.componentThread = componentThread; + protoState.smoCallbacks = smoCallbacks; protoState.deviceManager = std::make_unique(); protoState.deviceManager->broadcastListener.start(); } diff --git a/commonLibs/livoxProto1/core.h b/commonLibs/livoxProto1/core.h index 3512e64..6fae0de 100644 --- a/commonLibs/livoxProto1/core.h +++ b/commonLibs/livoxProto1/core.h @@ -5,8 +5,11 @@ #include #include #include +#include +#include #include "device.h" #include "broadcastListener.h" +#include "livoxProto1.h" namespace livoxProto1 { @@ -18,30 +21,43 @@ public: static void deviceGoneAwayInd(const comms::DiscoveredDevice &device); - std::shared_ptr getOrCreateDevice( + void getOrCreateDeviceReq( const std::string &deviceIdentifier, const std::shared_ptr& componentThread, int handshakeTimeoutMs, int retryDelayMs, const std::string& smoIp, uint8_t smoSubnetNbits, - uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort); + uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort, + livoxProto1_getOrCreateDeviceReqCbFn callback); - std::shared_ptr getDevice(const std::string &deviceIdentifier); - std::shared_ptr getDevice(const comms::DiscoveredDevice &device) - { return getDevice(device.deviceIdentifier); } + void destroyDeviceReq( + std::shared_ptr device, + livoxProto1_destroyDeviceReqCbFn callback); + + std::optional> getDevice( + const std::string &deviceIdentifier); + + std::optional> getDevice( + const comms::DiscoveredDevice &device) + { + return getDevice(device.deviceIdentifier); + } private: - // Helper methods - bool isDeviceKnown(const std::string& deviceIdentifier); - // Configuration static constexpr int RETRY_DELAY_SECONDS = 3; // seconds delay public: std::vector> devices; comms::BroadcastListener broadcastListener; + + // Nested continuation class for async device creation + class GetOrCreateDeviceReq; + class DestroyDeviceReq; }; -void main(const std::shared_ptr &componentThread); +void main( + const std::shared_ptr &componentThread, + const smo::sense_api::SmoCallbacks& smoCallbacks); void exit(void); // Global state structure @@ -50,6 +66,7 @@ struct ProtoState bool isInitialized = false; std::shared_ptr componentThread; std::unique_ptr deviceManager; + smo::sense_api::SmoCallbacks smoCallbacks; }; // Access to global state for extern "C" functions diff --git a/commonLibs/livoxProto1/device.cpp b/commonLibs/livoxProto1/device.cpp index 541a7c2..b7fb57b 100644 --- a/commonLibs/livoxProto1/device.cpp +++ b/commonLibs/livoxProto1/device.cpp @@ -8,10 +8,14 @@ #include #include #include +#include #include #include #include +#include #include +#include +#include #include "device.h" #include "protocol.h" #include "core.h" @@ -79,10 +83,9 @@ componentThread(componentThread), handshakeTimeoutMs(handshakeTimeoutMs), retryDelayMs(retryDelayMs), smoIp(smoIp), smoSubnetNbits(smoSubnetNbits), dataPort(dataPort), cmdPort(cmdPort), imuPort(imuPort), -heartbeatSocketFd(-1), +heartbeatFd(-1), heartbeatActive(false) { - connect(); } Device::~Device() @@ -95,256 +98,739 @@ Device::~Device() } heartbeatTimer.reset(); - if (heartbeatSocketFd >= 0) { - close(heartbeatSocketFd); - heartbeatSocketFd = -1; + if (heartbeatFd >= 0) { + close(heartbeatFd); + heartbeatFd = -1; } } -void Device::connect() +/** + * Device::ConnectReq - Encapsulates all state and resources for async connection sequence + * This class manages the overall device connection process including handshake and heartbeat setup + */ +class Device::ConnectReq +: public AsynchronousContinuation { - /** EXPLANATION: - * First check the broadcastListener to see if the device is already known. - * * If it is, return the DiscoveredDevice.. - * If it is not, attempt to connect to the device by assuming that its IP - * address is the same as the last 2 octets of the deviceIdentifier. - * * If the connection is successful, return the DiscoveredDevice. - * If the connection is not successful, delay by retryDelayMs and check - * the broadcastListener again. - * * If the connection is successful return the DiscoveredDevice. - * If the connection is not successful, throw exception? - * - * If the connection is successful at any point, also set up the heartbeat - * pulse signal to be sent periodically by us to the device over the wire. - */ - // Try connecting to known device first - if (connectToKnownDevice()) { - startHeartbeat(); - return; - } +private: + Device& device; + std::unique_ptr retryTimer; - // Try direct connect by device identifier - if (connectByDeviceIdentifier()) { - startHeartbeat(); - return; - } +public: + ConnectReq(Device& dev, Device::connectReqCbFn cb) + : AsynchronousContinuation(std::move(cb)), device(dev) {} /** FIXME: - * This won't work unless we return control to the ComponentThread's run() - * loop during the retry-delay time so that the ComponentThread can actually - * run the BroadcastListener's recv() calls and actually receive the - * device's broadcast messages. - * Think about ways to perhaps make all of this asynchronous. The main issue - * is that this whole sequence is synchronous. + * WE need to assign the ipAddr to the Device being connected up. */ - // Wait retry delay, then try known device again - std::this_thread::sleep_for(std::chrono::milliseconds(retryDelayMs)); - if (connectToKnownDevice()) { - startHeartbeat(); + // Callback methods for the connection sequence + void connectReq1( + std::shared_ptr context, + bool success, const std::string& ipAddr, int fd + ) + { + if (success) + { + // Store the IP address in the device + context->device.discoveredDevice.ipAddr = ipAddr; + // Store the handshake FD for heartbeats + context->device.heartbeatFd = fd; + context->device.startHeartbeat(); + context->originalCbFn(true); + return; + } + + if (OptionParser::getOptions().verbose) + { + std::cout << __func__ << ": Trying to connect to device by " + << "identifier" << "\n"; + } + + // Try direct connect by device identifier + context->device.connectByDeviceIdentifierReq( + std::bind(&ConnectReq::connectReq2, context.get(), context, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3)); + } + + void connectReq2( + std::shared_ptr context, + bool success, const std::string& ipAddr, int fd + ) + { + if (success) + { + // Store the IP address in the device + context->device.discoveredDevice.ipAddr = ipAddr; + // Store the handshake FD for heartbeats + context->device.heartbeatFd = fd; + context->device.startHeartbeat(); + context->originalCbFn(true); + return; + } + + // Start retry timer + if (OptionParser::getOptions().verbose) + { + std::cout << __func__ << ": Starting retry delay (" + << context->device.retryDelayMs << "ms), then trying known " + << "device again" << "\n"; + } + + context->retryTimer = std::make_unique( + context->device.componentThread->getIoService()); + + context->retryTimer->expires_from_now( + boost::posix_time::milliseconds(context->device.retryDelayMs)); + + context->retryTimer->async_wait( + std::bind( + &ConnectReq::connectReq3, context.get(), context, + std::placeholders::_1)); + } + + void connectReq3( + std::shared_ptr context, + const boost::system::error_code& error + ) + { + if (error) + { + context->originalCbFn(false); + return; + } + + if (OptionParser::getOptions().verbose) + { + std::cout << __func__ << ": Trying to connect to known device " + << "again" << "\n"; + } + + context->device.connectToKnownDeviceReq( + std::bind(&ConnectReq::connectReq4, context.get(), context, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3)); + } + + void connectReq4( + std::shared_ptr context, + bool success, const std::string& ipAddr, int fd + ) + { + if (success) + { + context->device.discoveredDevice.ipAddr = ipAddr; + context->device.heartbeatFd = fd; + context->device.startHeartbeat(); + context->originalCbFn(true); return; } // All connection attempts failed - throw std::runtime_error( - std::string(__func__) + ": Failed to connect to device: " - + discoveredDevice.deviceIdentifier); + context->originalCbFn(false); + } +}; + +void Device::connectReq(Device::connectReqCbFn callback) +{ + // Create the connection request object to hold state and callbacks + auto request = std::make_shared(*this, std::move(callback)); + + // Try connecting to known device first + if (OptionParser::getOptions().verbose) { + std::cout << __func__ << ": Trying to connect to known device" << "\n"; + } + + connectToKnownDeviceReq( + std::bind( + &ConnectReq::connectReq1, request.get(), request, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3)); } -bool Device::connectToKnownDevice() +class Device::ConnectToKnownDeviceReq +: public AsynchronousContinuation { - // Get the global DeviceManager instance +public: + Device& device; + std::string deviceIP; + std::shared_ptr deviceInfo; + + ConnectToKnownDeviceReq(Device& dev, Device::connectToKnownDeviceReqCbFn cb) + : AsynchronousContinuation(std::move(cb)), device(dev) + {} + + // Public accessor for the original callback + void callOriginalCallback(bool success, const std::string& ipAddr, int fd) + { originalCbFn(success, ipAddr, fd); } + + // Wrapper for failure cases + void callOriginalCallbackWithFailure() + { callOriginalCallback(false, "", -1); } + + // Callback methods for the connection sequence + void connectToKnownDeviceReq1( + std::shared_ptr context, bool success, int fd + ) + { + // Return the IP address and raw FD to the caller + context->callOriginalCallback(success, context->deviceIP, fd); + } +}; + +/** EXPLANATION: + * This function is used to connect to a device that is already known to the + * broadcastListener. + */ +void Device::connectToKnownDeviceReq( + Device::connectToKnownDeviceReqCbFn callback + ) +{ + // Create the connection request object to hold state and callbacks + auto request = std::make_shared(*this, std::move(callback)); + auto& protoState = livoxProto1::getProtoState(); if (!protoState.deviceManager) { - throw std::runtime_error( - std::string(__func__) - + ": DeviceManager is not initialized in connectToKnownDevice()"); + request->callOriginalCallbackWithFailure(); + return; } // Check if the device is known to the broadcastListener if (!protoState.deviceManager->broadcastListener.deviceExists( - discoveredDevice.deviceIdentifier)) + request->device.discoveredDevice.deviceIdentifier)) { - return false; + request->callOriginalCallbackWithFailure(); + return; } - // Get the device info from broadcastListener - auto deviceInfo = protoState.deviceManager->broadcastListener.getDevice( - discoveredDevice.deviceIdentifier); - if (!deviceInfo) - { return false; } + request->deviceInfo = protoState.deviceManager->broadcastListener.getDevice( + request->device.discoveredDevice.deviceIdentifier); + if (!request->deviceInfo) + { + request->callOriginalCallbackWithFailure(); + return; + } // Use the IP address from the broadcast message - std::string deviceIP = deviceInfo->ipAddr; - // Execute handshake with the known device - bool success = executeHandshake( - deviceIP, handshakeTimeoutMs, dataPort, cmdPort, imuPort); + request->deviceIP = request->deviceInfo->ipAddr; - // If successful, update our device's IP address with the one from broadcast - if (success) { - discoveredDevice.ipAddr = deviceInfo->ipAddr; - } - - return success; + // Execute handshake with the known device using async method + request->device.executeHandshakeReq( + request->deviceIP, + std::bind( + &ConnectToKnownDeviceReq::connectToKnownDeviceReq1, + request.get(), request, + std::placeholders::_1, std::placeholders::_2)); } -bool Device::connectByDeviceIdentifier() +class Device::ConnectByDeviceIdentifierReq +: public AsynchronousContinuation { - std::string deviceIP = generateClientDeviceIpFromSerialNumber( - discoveredDevice.deviceIdentifier); - bool success = executeHandshake( - deviceIP, handshakeTimeoutMs, dataPort, cmdPort, imuPort); +public: + Device& device; + std::string deviceIP; - // If successful, store the calculated IP address - if (success) { - discoveredDevice.ipAddr = deviceIP; + ConnectByDeviceIdentifierReq( + Device& dev, Device::connectByDeviceIdentifierReqCbFn cb) + : AsynchronousContinuation(std::move(cb)), device(dev) + {} + + // Public accessor for the original callback + void callOriginalCallback(bool success, const std::string& ipAddr, int fd) + { originalCbFn(success, ipAddr, fd); } + + // Wrapper for failure cases + void callOriginalCallbackWithFailure() + { callOriginalCallback(false, "", -1); } + + // Callback methods for the connection sequence + void connectByDeviceIdentifierReq1( + std::shared_ptr context, + bool success, int fd + ) + { + // Return the IP address and raw FD to the caller + context->callOriginalCallback(success, context->deviceIP, fd); } +}; - return success; -} - -bool Device::executeHandshake( - const std::string& deviceIP, int timeoutMs, - uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort +void Device::connectByDeviceIdentifierReq( + Device::connectByDeviceIdentifierReqCbFn callback ) { - try { + // Create the connection request object to hold state and callbacks + auto request = std::make_shared( + *this, std::move(callback)); + + // Generate device IP from serial number + request->deviceIP = generateClientDeviceIpFromSerialNumber( + request->device.discoveredDevice.deviceIdentifier); + + // Execute handshake using async method + request->device.executeHandshakeReq( + request->deviceIP, + std::bind( + &ConnectByDeviceIdentifierReq::connectByDeviceIdentifierReq1, + request.get(), request, + std::placeholders::_1, std::placeholders::_2)); +} + +class Device::ExecuteHandshakeReq +: public AsynchronousContinuation +{ +public: + friend void Device::executeHandshakeReq( + const std::string& deviceIP, Device::executeHandshakeReqCbFn callback); + + enum class SocketState + { + SOCKET_STILL_WAITING = 0, + SOCKET_ERROR, + SOCKET_RECV_SUCCESS, + SOCKET_RECV_ERROR + }; + +public: + Device& device; + std::string deviceIP; + + // Atomic state flags for async coordination + std::atomic timerFired{false}; + std::atomic socketState{SocketState::SOCKET_STILL_WAITING}; + std::atomic handlerExecuted{false}; + + // The stream descriptor that will be returned to the caller + boost::asio::posix::stream_descriptor handshakeFdDesc; + boost::asio::deadline_timer timeoutTimer; + + // Received data storage + uint8_t responseBuffer[1024]{}; + ssize_t bytesReceived = -1; + struct sockaddr_in senderAddr; + socklen_t senderAddrLen = sizeof(senderAddr); + +public: + ExecuteHandshakeReq( + Device& dev, const std::string& deviceIP, + Device::executeHandshakeReqCbFn cb) + : AsynchronousContinuation(std::move(cb)), + device(dev), deviceIP(deviceIP), + handshakeFdDesc(device.componentThread->getIoService()), + timeoutTimer(device.componentThread->getIoService()) + { + } + + ~ExecuteHandshakeReq() + { + cleanup(); + } + + // Public accessor for the original callback + void callOriginalCallback(bool success, int fd) + { originalCbFn(success, fd); } + + void callOriginalCallbackWithFailure() + { /** EXPLANATION: - * This io_context queue here will allow us to async-bridge the - * handshake request and response without having to worry about - * re-enqueueing messages, as we would have to do if we used the - * io_context of a ComponentThread. + * We have to call cleanupHandshakeSocket() here, specifically because + * there are 5 references we need to clean up, and 2 of them are + * actually recursive self-references within this class. + * + * The timer and the handshakeFdDesc are both given recursive + * self-referencing shared_ptr's to this class when we eventually set up + * the async callbacks for them. + * + * So merely letting the async continuation sequences go out of scope + * won't cause this class to be destroyed. Rather, since the class + * references itelf, we have to first break those references. Then and + * only then will the shared_ptr's refcount go to 0 and the class will + * be destroyed. + * + * We always unconditionally break the timer's reference inside of + * the branch-unifying segment (executeHandshakeReq2), but we generally + * only want to break the handshakeFdDesc's reference at the exact + * moment when the sequence fails, because if it succeeds, we want to + * commit the FD to the Device object being created (for heartbeats). + * + * Hence, we call cleanupHandshakeSocket() at the point of failure. + * To break the self-reference when the sequence is successful, we + * manually do that at the end of the sequence. */ - boost::asio::io_context io_context; - boost::asio::ip::udp::socket socket(io_context); - socket.open(boost::asio::ip::udp::v4()); + cleanupHandshakeSocket(); + callOriginalCallback(false, -1); + } + +private: + bool setupSocket() + { + /** EXPLANATION: + * Create non-blocking UDP socket for handshake. We can't use + * boost::asio::socket because it causes a segfault if associated with + * an io_service from the main program (it's a boost bug). + */ + int socketFd = socket(AF_INET, SOCK_DGRAM, 0); + if (socketFd < 0) + { + std::cerr << __func__ << ": Failed to create socket: " + << strerror(errno) << std::endl; + return false; + } + + int flags = fcntl(socketFd, F_GETFL, 0); + if (flags < 0 || fcntl(socketFd, F_SETFL, flags | O_NONBLOCK) < 0) + { + std::cerr << __func__ << ": Failed to set socket non-blocking: " + << strerror(errno) << std::endl; + return false; + } // Bind socket to cmdPort so we can receive the handshake response - boost::asio::ip::udp::endpoint localEndpoint( - boost::asio::ip::address_v4::any(), cmdPort); - socket.bind(localEndpoint); + struct sockaddr_in localAddr; + memset(&localAddr, 0, sizeof(localAddr)); + localAddr.sin_family = AF_INET; + localAddr.sin_addr.s_addr = INADDR_ANY; + localAddr.sin_port = htons(device.cmdPort); - std::cout << __func__ << ": Bound socket to local port " << cmdPort - << " for handshake response" << std::endl; + if (bind(socketFd, (struct sockaddr*)&localAddr, sizeof(localAddr)) < 0) + { + std::cerr << __func__ << ": Failed to bind socket to port " + << device.cmdPort << ": " << strerror(errno) << std::endl; + return false; + } - // Get the IP addr of the SMO machine's iface that is facing the device. - std::string smoIp = getSmoIp(deviceIP); + // Assign the socket FD to the stream descriptor + handshakeFdDesc.assign(socketFd); + return true; + } - comms::HandshakeRequest handshakeReq(smoIp, dataPort, cmdPort, imuPort); + bool sendHandshakeRequest() + { + /** EXPLANATION: + * Prepare handshake request. + */ + std::string smoIp = device.getSmoIp(deviceIP); + comms::HandshakeRequest handshakeReq( + smoIp, device.dataPort, device.cmdPort, device.imuPort); handshakeReq.swapContentsToProtocolEndianness(); handshakeReq.header.setCrc16FromRawBytes(); handshakeReq.header.swapCrc16ToProtocolEndianness(); handshakeReq.footer.crc_32 = handshakeReq.calculateCrc32(); handshakeReq.footer.swapCrc32ToProtocolEndianness(); - boost::asio::ip::udp::endpoint deviceEndpoint( - boost::asio::ip::address::from_string(deviceIP), 65000); - socket.send_to( - boost::asio::buffer(&handshakeReq, sizeof(handshakeReq)), - deviceEndpoint); + // Prepare device endpoint + struct sockaddr_in deviceAddr; + memset(&deviceAddr, 0, sizeof(deviceAddr)); + deviceAddr.sin_family = AF_INET; + deviceAddr.sin_addr.s_addr = inet_addr(deviceIP.c_str()); + deviceAddr.sin_port = htons(65000); - std::cout << __func__ << ": Sent handshake request to " - << deviceIP << ":65000" << std::endl; + // Send handshake request directly (synchronous) + ssize_t bytesSent = sendto( + handshakeFdDesc.native_handle(), + &handshakeReq, sizeof(comms::HandshakeRequest), 0, + (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); - // Wait for response with timeout using deadline_timer - boost::asio::deadline_timer timer(io_context); - timer.expires_from_now(boost::posix_time::milliseconds(timeoutMs)); - - uint8_t responseBuffer[1024]; - boost::asio::ip::udp::endpoint senderEndpoint; - std::atomic timeoutOccurred{false}; - - timer.async_wait( - [&timeoutOccurred](const boost::system::error_code& ec) { - if (!ec) { timeoutOccurred.store(true); } - } - ); - - size_t bytesReceived = 0; - boost::system::error_code receiveError; - socket.async_receive_from( - boost::asio::buffer(responseBuffer, sizeof(responseBuffer)), - senderEndpoint, - [&bytesReceived, &receiveError]( - const boost::system::error_code& ec, size_t bytes) - { - bytesReceived = bytes; - receiveError = ec; - } - ); - - while (!timeoutOccurred.load() && !receiveError && bytesReceived == 0) { - io_context.run_one(); - } - - timer.cancel(); - - if (timeoutOccurred.load()) + if (bytesSent < 0) { - std::cerr << __func__ << ": Handshake timeout with " << deviceIP - << "(" << deviceEndpoint << ")" << std::endl; + std::cerr << __func__ << ": Failed to send handshake request: " + << strerror(errno) << std::endl; return false; } - if (receiveError) + return true; + } + + void setupAsyncCallbacks( + const std::shared_ptr &request + ) + { + if (!handshakeFdDesc.is_open()) { - std::cerr << __func__ << ": Handshake error with " << deviceIP - << ": " << receiveError.message() << "(" << deviceEndpoint << ")" + throw std::runtime_error( + std::string(__func__) + + ": handshakeFdDesc is not open; cannot set up async callbacks " + "for device " + deviceIP + "(" + + device.discoveredDevice.deviceIdentifier + ")" + " handshake." + "Check socket initialization and bining." + ); + } + + + /** EXPLANATION: + * We setup an async timer event to detect timeout, and wait for the + * device to respond to the handshake request. If the device does not + * respond within the timeout period, we will consider the handshake + * to have failed. + */ + timeoutTimer.expires_from_now( + boost::posix_time::milliseconds(device.handshakeTimeoutMs)); + + timeoutTimer.async_wait( + std::bind( + &ExecuteHandshakeReq::executeHandshakeReq1_1, this, request, + std::placeholders::_1)); + + /** EXPLANATION: + * Since we're using POSIX sockets calls on the underlying + * native_handle, Let's use async_wait with POLLIN to detect when data + * is available for reading. + */ + handshakeFdDesc.async_wait( + boost::asio::posix::stream_descriptor::wait_read, + std::bind(&ExecuteHandshakeReq::executeHandshakeReq1_2, this, + request, + std::placeholders::_1)); + } + + void executeHandshakeReq1_1( + std::shared_ptr, + const boost::system::error_code& error) + { + // This is called from the timer callback + if (!error) // Timeout occurred (not cancelled) + { + timerFired.store(true); + executeHandshakeReq2(); + } + } + + void executeHandshakeReq1_2( + std::shared_ptr, + const boost::system::error_code& error + ) + { + // This is called from the socket read callback + if (error) + { + socketState.store(SocketState::SOCKET_ERROR); + std::cerr << __func__ << ": Socket read error: " << error.message() << std::endl; - return false; - } - - if (bytesReceived < sizeof(comms::HandshakeResponse)) + } else { - std::cerr << __func__ << ": Handshake failed - response too small from " - << deviceIP << "(" << deviceEndpoint << ")" << std::endl; - return false; + // Socket is readable, now actually read the data + bytesReceived = recvfrom( + handshakeFdDesc.native_handle(), + responseBuffer, sizeof(responseBuffer), 0, + (struct sockaddr*)&senderAddr, &senderAddrLen); + + if (bytesReceived > 0) + { + socketState.store(SocketState::SOCKET_RECV_SUCCESS); + } else if (bytesReceived == 0) + { + socketState.store(SocketState::SOCKET_RECV_ERROR); + std::cerr << __func__ << ": Received 0 bytes from recvfrom" + << std::endl; + } else + { + socketState.store(SocketState::SOCKET_ERROR); + std::cerr << __func__ << ": recvfrom failed: " + << strerror(errno) << " (errno: " << errno << ")" + << std::endl; + } } - // Parse response as complete frame - comms::HandshakeResponse* resp = reinterpret_cast< - comms::HandshakeResponse* - >(responseBuffer); + executeHandshakeReq2(); + } - // Following the clean receiving flow: - // 1. Swap CRC32 to host endianness first + void executeHandshakeReq2() + { + // Ensure we only execute once using atomic exchange + if (handlerExecuted.exchange(true) == true) { return; } + + // Examine the flags and decide what happened + SocketState finalSocketState = socketState.load(); + bool finalTimerFired = timerFired.load(); + + // Cancel timer if still running + timeoutTimer.cancel(); + + // Check for timeout only if there was no socket activity + if (finalTimerFired + && finalSocketState == SocketState::SOCKET_STILL_WAITING) + { + std::cerr << __func__ << ": Handshake timeout with " + << deviceIP << "(" << device.discoveredDevice.deviceIdentifier + << ")" << std::endl; + + callOriginalCallbackWithFailure(); + return; + } + + // Socket error from boost::asio + if (finalSocketState == SocketState::SOCKET_ERROR) + { + std::cerr << __func__ << ": Socket error during handshake with " + << deviceIP << std::endl; + callOriginalCallbackWithFailure(); + return; + } + + if (finalSocketState == SocketState::SOCKET_RECV_ERROR) + { + std::cerr << __func__ << ": Receive error during handshake with " + << deviceIP << std::endl; + callOriginalCallbackWithFailure(); + return; + } + + /* Result must have been RECV_SUCCESS state if we reach here. + * Data was already read in the async callback, just validate it + */ + if (bytesReceived < (ssize_t)sizeof(comms::HandshakeResponse)) + { + std::cerr << __func__ << ": Response of size " << bytesReceived + << " too small from " << deviceIP << std::endl; + callOriginalCallbackWithFailure(); + return; + } + + comms::HandshakeResponse* resp = + reinterpret_cast(responseBuffer); + + /** EXPLANATION: + * Following the clean receiving flow: + * 1. Swap CRC32 to host endianness first + * 2. Validate CRC32 + * 3. Swap CRC16 to host endianness + * 4. Validate CRC16 + * 5. Swap content to host endianness + */ resp->footer.swapCrc32ToHostEndianness(); - // 2. Validate CRC32 (on whole message excluding footer CRC32 field) if (!resp->validateCrc32()) { - std::cerr << __func__ << ": Handshake failed - CRC32 validation " - "failed from " << deviceIP << "(" << deviceEndpoint << ")" - << std::endl; - return false; + std::cerr << __func__ << ": CRC32 validation failed from " + << deviceIP << std::endl; + callOriginalCallbackWithFailure(); + return; } - // 3. Swap CRC16 to host endianness resp->header.swapCrc16ToHostEndianness(); - // 4. Validate CRC16 (on header only) if (!resp->header.validateCrc16()) { - std::cerr << __func__ << ": Handshake failed - CRC16 validation " - "failed from " << deviceIP << "(" << deviceEndpoint << ")" - << std::endl; - return false; + std::cerr << __func__ << ": CRC16 validation failed from " + << deviceIP << std::endl; + callOriginalCallbackWithFailure(); + return; } - // 5. Swap content to host endianness resp->swapContentsToHostEndianness(); if (!resp->sanityCheck() || resp->ret_code != 0x00) { - std::cerr << __func__ << ": Handshake failed - invalid response from " - << deviceIP << "(" << deviceEndpoint << ")" - << std::endl; - return false; + std::cerr << __func__ << ": Invalid response from " + << deviceIP << std::endl; + callOriginalCallbackWithFailure(); + return; } - std::cout << __func__ << ": Handshake successful with " << deviceIP - << "(" << deviceEndpoint << ")" << std::endl; - return true; + if (OptionParser::getOptions().verbose) + { + std::cout << __func__ << ": Handshake successful with " + << deviceIP << "(" + << device.discoveredDevice.deviceIdentifier + << ")" << "\n"; + } + + // Transfer any successful state to Device + commit(); + int rawFd = handshakeFdDesc.release(); + callOriginalCallback(true, rawFd); + } + + void commit() // Transfer successful state to Device object + { + // Clean up resources (timer) but not the socket FD + // The socket FD is returned to the caller, not transferred to the device + timeoutTimer.cancel(); + } + + void cleanupHandshakeSocket() + { + int fd = handshakeFdDesc.release(); + if (fd != -1) { + close(fd); + } + } + void cleanup() // Clean up transient resources + { + timeoutTimer.cancel(); + cleanupHandshakeSocket(); + } +}; + +void Device::executeHandshakeReq( + const std::string& deviceIP, Device::executeHandshakeReqCbFn callback + ) +{ + // Create the handshake request object to hold state and callbacks + auto request = std::make_shared( + *this, deviceIP, std::move(callback)); + + try { + if (!request->setupSocket()) + { + request->callOriginalCallbackWithFailure(); + return; + } + if (!request->sendHandshakeRequest()) + { + request->callOriginalCallbackWithFailure(); + return; + } + + request->setupAsyncCallbacks(request); + } catch (const std::exception& e) { std::cerr << __func__ << ": Handshake failed with " << deviceIP << ": " << e.what() << std::endl; + request->callOriginalCallbackWithFailure(); } - return false; +} + +void Device::disconnectReq(Device::disconnectReqCbFn callback) +{ + // Stop heartbeat first + heartbeatActive.store(false); + + if (heartbeatFd == -1) + { + std::cout << __func__ << ": No heartbeat socket available, skipping " + "disconnect message" << std::endl; + callback(true); + return; + } + + // Create disconnect message + comms::DisconnectMessage disconnectMsg; + disconnectMsg.swapContentsToProtocolEndianness(); + disconnectMsg.header.setCrc16FromRawBytes(); + disconnectMsg.header.swapCrc16ToProtocolEndianness(); + disconnectMsg.footer.crc_32 = disconnectMsg.calculateCrc32(); + disconnectMsg.footer.swapCrc32ToProtocolEndianness(); + + // Set up destination address + struct sockaddr_in deviceAddr; + deviceAddr.sin_family = AF_INET; + deviceAddr.sin_addr.s_addr = inet_addr(discoveredDevice.ipAddr.c_str()); + deviceAddr.sin_port = htons(65000); // Commands go to port 65000 + + // Send disconnect message + ssize_t bytesSent = sendto( + heartbeatFd, &disconnectMsg, sizeof(disconnectMsg), 0, + (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); + + if (bytesSent < 0) + { + std::cerr << __func__ << ": Failed to send disconnect message: " + << strerror(errno) << std::endl; + // Continue with disconnect even if message send fails + } + + std::cout << __func__ << ": Sent disconnect message to " + << discoveredDevice.ipAddr << ":" << 65000 << std::endl; + + // Close the heartbeat socket + close(heartbeatFd); + heartbeatFd = -1; + callback(true); } std::string Device::generateClientDeviceIpFromSerialNumber( @@ -435,40 +921,19 @@ std::string Device::generateClientDeviceIpFromSerialNumber( void Device::startHeartbeat() { - if (!componentThread || discoveredDevice.ipAddr.empty()) { - return; // Can't start heartbeat without component thread or IP + if (!componentThread || discoveredDevice.ipAddr.empty()) + { + throw std::runtime_error( + std::string(__func__) + + ": Can't start heartbeat without component thread or IP"); } - /** EXPLANATION: - * Create raw UDP socket for heartbeat sending to avoid boost::asio dlopen - * issues. - */ - heartbeatSocketFd = socket(AF_INET, SOCK_DGRAM, 0); - if (heartbeatSocketFd < 0) + // Check if we have the handshake socket available for heartbeat use + if (heartbeatFd < 0) { - std::cerr << "[" << __func__ << "] Failed to create heartbeat socket: " - << strerror(errno) << std::endl; - return; - } - - /** EXPLANATION: - * Bind heartbeat socket to cmdPort so heartbeats come from the same port - * as handshake. - */ - struct sockaddr_in localAddr; - memset(&localAddr, 0, sizeof(localAddr)); - localAddr.sin_family = AF_INET; - localAddr.sin_addr.s_addr = INADDR_ANY; - localAddr.sin_port = htons(cmdPort); - - if (bind( - heartbeatSocketFd, (struct sockaddr*)&localAddr, sizeof(localAddr)) < 0) - { - std::cerr << "[" << __func__ << "] Failed to bind heartbeat socket to " - "port " << cmdPort << ": " << strerror(errno) << std::endl; - close(heartbeatSocketFd); - heartbeatSocketFd = -1; - return; + throw std::runtime_error( + std::string(__func__) + + ": Expected to find handshake socket present but didn't find it"); } // Create heartbeat timer @@ -483,9 +948,17 @@ void Device::startHeartbeat() void Device::sendHeartbeat() { - if (!heartbeatActive.load() || heartbeatSocketFd < 0 - || discoveredDevice.ipAddr.empty()) + if (!heartbeatActive.load()) { + std::cerr << __func__ << ": Ending heartbeat loop due to " + "heartbeatActive==false.\n"; + return; + } + + if (heartbeatFd < 0 || discoveredDevice.ipAddr.empty()) + { + std::cerr << __func__ << ": Ending heartbeat loop due to " + "heartbeatFd==-1 or discoveredDevice.ipAddr.empty().\n"; return; } @@ -507,7 +980,7 @@ void Device::sendHeartbeat() deviceAddr.sin_port = htons(65000); ssize_t bytesSent = sendto( - heartbeatSocketFd, &heartbeatMsg, sizeof(heartbeatMsg), 0, + heartbeatFd, &heartbeatMsg, sizeof(heartbeatMsg), 0, (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); if (bytesSent < 0) @@ -530,7 +1003,7 @@ void Device::sendHeartbeat() catch (const std::exception& e) { heartbeatActive.store(false); - std::cerr << "[" << __func__ << "] Heartbeat send failed for device " + std::cerr << __func__ << ": Heartbeat send failed for device " << discoveredDevice.deviceIdentifier << ": " << e.what() << std::endl; } diff --git a/commonLibs/livoxProto1/device.h b/commonLibs/livoxProto1/device.h index db1658e..dc5dca9 100644 --- a/commonLibs/livoxProto1/device.h +++ b/commonLibs/livoxProto1/device.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -76,29 +77,50 @@ public: uint16_t dataPort, cmdPort, imuPort; private: - // Connection logic - void connect(); - bool connectToKnownDevice(); - bool connectByDeviceIdentifier(); - bool executeHandshake( - const std::string& deviceIP, int timeoutMs, - uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort); + // Heartbeat mechanism + void startHeartbeat(); + void sendHeartbeat(); + void onHeartbeatTimer(const boost::system::error_code& error); std::string generateClientDeviceIpFromSerialNumber( const std::string& broadcastCode); // IP detection methods std::optional detectSmoIp(const std::string& deviceIP); - std::string getSmoIp(const std::string& deviceIP); uint32_t getSubnetMaskFor(uint8_t nbits); - // Heartbeat mechanism - void startHeartbeat(); - void sendHeartbeat(); - void onHeartbeatTimer(const boost::system::error_code& error); + class ConnectReq; + class ConnectToKnownDeviceReq; + class ConnectByDeviceIdentifierReq; + class ExecuteHandshakeReq; + class DisconnectReq; + +public: + // Utility methods + std::string getSmoIp(const std::string& deviceIP); + + // Callback function type definitions for async methods + typedef std::function connectReqCbFn; + typedef std::function< + void(bool success, const std::string& ipAddr, int fd)> + connectToKnownDeviceReqCbFn; + typedef std::function< + void(bool success, const std::string& ipAddr, int fd)> + connectByDeviceIdentifierReqCbFn; + typedef std::function executeHandshakeReqCbFn; + typedef std::function disconnectReqCbFn; + + // Async connection methods + void connectReq(connectReqCbFn callback); + void connectToKnownDeviceReq(connectToKnownDeviceReqCbFn callback); + void connectByDeviceIdentifierReq( + connectByDeviceIdentifierReqCbFn callback); + void executeHandshakeReq( + const std::string& deviceIP, executeHandshakeReqCbFn callback); + void disconnectReq(disconnectReqCbFn callback); // Heartbeat state std::unique_ptr heartbeatTimer; - int heartbeatSocketFd; // Raw socket file descriptor + int heartbeatFd; // Socket file descriptor used for heartbeat std::atomic heartbeatActive; }; diff --git a/commonLibs/livoxProto1/livoxProto1.cpp b/commonLibs/livoxProto1/livoxProto1.cpp index de5f2da..d9607e6 100644 --- a/commonLibs/livoxProto1/livoxProto1.cpp +++ b/commonLibs/livoxProto1/livoxProto1.cpp @@ -6,12 +6,13 @@ extern "C" { -std::shared_ptr livoxProto1_getOrCreateDevice( +void livoxProto1_getOrCreateDeviceReq( const std::string& deviceIdentifier, const std::shared_ptr& componentThread, int handshakeTimeoutMs, int retryDelayMs, const std::string& smoIp, uint8_t smoSubnetNbits, - uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort + uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort, + livoxProto1_getOrCreateDeviceReqCbFn callback ) { // Get the global DeviceManager instance @@ -24,16 +25,34 @@ std::shared_ptr livoxProto1_getOrCreateDevice( } // Delegate to DeviceManager - return protoState.deviceManager->getOrCreateDevice( + protoState.deviceManager->getOrCreateDeviceReq( deviceIdentifier, componentThread, handshakeTimeoutMs, retryDelayMs, smoIp, smoSubnetNbits, - dataPort, cmdPort, imuPort); + dataPort, cmdPort, imuPort, + callback); } -void livoxProto1_main(const std::shared_ptr& componentThread) +void livoxProto1_destroyDeviceReq( + std::shared_ptr device, + livoxProto1_destroyDeviceReqCbFn callback +) { - livoxProto1::main(componentThread); + auto& protoState = livoxProto1::getProtoState(); + if (!protoState.deviceManager) + { + throw std::runtime_error(std::string(__func__) + + ": DeviceManager not initialized"); + } + + protoState.deviceManager->destroyDeviceReq(device, callback); +} + +void livoxProto1_main( + const std::shared_ptr& componentThread, + const smo::sense_api::SmoCallbacks& smoCallbacks) +{ + livoxProto1::main(componentThread, smoCallbacks); } void livoxProto1_exit(void) diff --git a/commonLibs/livoxProto1/livoxProto1.h b/commonLibs/livoxProto1/livoxProto1.h index 32ca282..0d75cf0 100644 --- a/commonLibs/livoxProto1/livoxProto1.h +++ b/commonLibs/livoxProto1/livoxProto1.h @@ -4,9 +4,13 @@ #include #include #include +#include // Forward declarations namespace smo { +namespace sense_api { + struct SmoCallbacks; +} class ComponentThread; } @@ -21,9 +25,11 @@ extern "C" { /** * Initialize the Livox protocol library * @param componentThread Component thread shared pointer + * @param smoCallbacks Callbacks provided by SMO */ typedef void livoxProto1_mainFn( - const std::shared_ptr& componentThread); + const std::shared_ptr& componentThread, + const smo::sense_api::SmoCallbacks& smoCallbacks); /** * Cleanup the Livox protocol library @@ -43,16 +49,28 @@ typedef void livoxProto1_exitFn(void); * @param imuPort IMU port (default: 56002) * @return Device pointer on success, nullptr on failure */ -typedef std::shared_ptr livoxProto1_getOrCreateDeviceFn( +typedef std::function< + void(bool success, std::shared_ptr device)> + livoxProto1_getOrCreateDeviceReqCbFn; + +typedef void livoxProto1_getOrCreateDeviceReqFn( const std::string& deviceIdentifier, const std::shared_ptr& componentThread, int handshakeTimeoutMs, int retryDelayMs, const std::string& smoIp, uint8_t smoSubnetNbits, - uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort); + uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort, + livoxProto1_getOrCreateDeviceReqCbFn callback); + +typedef std::function livoxProto1_destroyDeviceReqCbFn; +typedef void livoxProto1_destroyDeviceReqFn( + std::shared_ptr device, + livoxProto1_destroyDeviceReqCbFn callback); + livoxProto1_mainFn livoxProto1_main; livoxProto1_exitFn livoxProto1_exit; -livoxProto1_getOrCreateDeviceFn livoxProto1_getOrCreateDevice; +livoxProto1_getOrCreateDeviceReqFn livoxProto1_getOrCreateDeviceReq; +livoxProto1_destroyDeviceReqFn livoxProto1_destroyDeviceReq; #ifdef __cplusplus } diff --git a/commonLibs/livoxProto1/protocol.cpp b/commonLibs/livoxProto1/protocol.cpp index b48eef7..01ccc72 100644 --- a/commonLibs/livoxProto1/protocol.cpp +++ b/commonLibs/livoxProto1/protocol.cpp @@ -638,5 +638,75 @@ bool HeartbeatMessage::validateCrc32() const return isValid; } +// DisconnectMessage methods +DisconnectMessage::DisconnectMessage() +{ + // Initialize header + header.sof = 0xAA; + header.version = 0x01; + header.length = sizeof(Header) + sizeof(Command) + sizeof(Footer); + header.cmd_type = 0x00; // kCommandTypeCmd + header.seq_num = 0x0001; // Simple sequence number + header.crc_16 = 0; // Will be calculated + + // Initialize command + command.cmd_set = 0x00; // kCommandSetGeneral + command.cmd_id = 0x06; // kCommandIDGeneralDisconnect + + // Initialize footer + footer.crc_32 = 0; // Will be calculated + // Note: CRC16 will be calculated before sending (in swapToProtocolEndianness) +} + +uint32_t DisconnectMessage::calculateCrc32() const +{ + // Calculate CRC32 for the entire message excluding the footer.crc_32 field + const uint8_t* messageData = reinterpret_cast(this); + size_t messageSize = sizeof(DisconnectMessage) - sizeof(footer.crc_32); + + return comms::calculateCrc32(messageData, messageSize); +} + +void DisconnectMessage::swapContentsToProtocolEndianness() +{ + // Protocol is little-endian, so if host is already little-endian, no swap needed + if (endian::isLittleEndian()) { + return; + } + + // Host is big-endian, need to swap to little-endian + // Only swap content fields, not CRC fields + header.swapToProtocolEndianness(); + command.swapToProtocolEndianness(); + // Note: footer.swapToProtocolEndianness() swaps CRC, so we skip it here +} + +bool DisconnectMessage::sanityCheck() const +{ + return header.sanityCheck() && + command.sanityCheck() && + (command.cmd_set == 0x00) && (command.cmd_id == 0x06) && + footer.sanityCheck(); +} + +bool DisconnectMessage::validateCrc32() const +{ + // Use the calculateCrc32 method to avoid code duplication + uint32_t calculatedCrc = calculateCrc32(); + + // Compare with the CRC in the footer + bool isValid = (calculatedCrc == footer.crc_32); + + // Debug output only if validation fails + if (!isValid) + { + std::cout << "DisconnectMessage CRC32 Debug: calculated=0x" + << std::hex << calculatedCrc + << ", received=0x" << footer.crc_32 << std::dec << std::endl; + } + + return isValid; +} + } // namespace comms } // namespace livoxProto1 diff --git a/commonLibs/livoxProto1/protocol.h b/commonLibs/livoxProto1/protocol.h index 75e0c61..34cebc1 100644 --- a/commonLibs/livoxProto1/protocol.h +++ b/commonLibs/livoxProto1/protocol.h @@ -229,6 +229,23 @@ struct HeartbeatMessage bool validateCrc32() const; } __attribute__((packed)); +/** EXPLANATION: + * Complete disconnect command frame for disconnecting from Livox devices. + * This is the complete wire format including header, command fields, and footer. + */ +struct DisconnectMessage +{ + Header header; // 0-8: Protocol frame header + Command command; // 9-10: Command identification + Footer footer; // 11-14: Protocol frame footer + + DisconnectMessage(); + uint32_t calculateCrc32() const; + void swapContentsToProtocolEndianness(); + bool sanityCheck() const; + bool validateCrc32() const; +} __attribute__((packed)); + } // namespace comms } // namespace livoxProto1