From 5db1cfdac87fa949996e4e2beb71b2316efad049 Mon Sep 17 00:00:00 2001 From: Hayodea Hekol Date: Thu, 23 Oct 2025 00:24:23 -0400 Subject: [PATCH] LivoxProto1: Pcloud data stream now working --- commonLibs/livoxProto1/device.cpp | 215 +++++++++++++++------------- commonLibs/livoxProto1/device.h | 9 +- commonLibs/livoxProto1/protocol.cpp | 3 +- 3 files changed, 120 insertions(+), 107 deletions(-) diff --git a/commonLibs/livoxProto1/device.cpp b/commonLibs/livoxProto1/device.cpp index 2230906..675998e 100644 --- a/commonLibs/livoxProto1/device.cpp +++ b/commonLibs/livoxProto1/device.cpp @@ -100,7 +100,6 @@ componentThread(componentThread), handshakeTimeoutMs(handshakeTimeoutMs), retryDelayMs(retryDelayMs), smoIp(smoIp), detectedSmoListeningIp(""), smoSubnetNbits(smoSubnetNbits), dataPort(dataPort), cmdPort(cmdPort), imuPort(imuPort), -heartbeatFd(-1), heartbeatActive(false), pcloudDataActive(false), pcloudDataFd(-1) @@ -109,15 +108,7 @@ pcloudDataFd(-1) Device::~Device() { - if (heartbeatActive.load()) - { - heartbeatActive.store(false); - if (heartbeatTimer) { - heartbeatTimer->cancel(); - } - - unregisterUdpCommandHandler(0x00, 0x03); - } + stopHeartbeat(); if (pcloudDataActive.load()) { pcloudDataActive.store(false); @@ -128,10 +119,6 @@ Device::~Device() heartbeatTimer.reset(); pcloudDataSocketDesc.reset(); - if (heartbeatFd >= 0) { - close(heartbeatFd); - heartbeatFd = -1; - } if (pcloudDataFd >= 0) { close(pcloudDataFd); pcloudDataFd = -1; @@ -160,7 +147,7 @@ public: // Callback methods for the connection sequence void connectReq1( std::shared_ptr context, - bool success, const std::string& ipAddr, int fd + bool success, const std::string& ipAddr ) { // Fail early - if handshake failed, try next method @@ -176,15 +163,13 @@ public: context->device.connectByDeviceIdentifierReq( {context, std::bind(&ConnectReq::connectReq2, context.get(), context, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3)}); + std::placeholders::_1, std::placeholders::_2)}); return; } // Success - store connection info and proceed to next step context->device.discoveredDevice.ipAddr = ipAddr; - context->device.heartbeatFd = fd; context->device.startHeartbeat(); context->device.enablePcloudDataReq( @@ -194,7 +179,7 @@ public: void connectReq2( std::shared_ptr context, - bool success, const std::string& ipAddr, int fd + bool success, const std::string& ipAddr ) { // Fail early - if this also failed, all connection attempts failed @@ -206,7 +191,6 @@ public: // Success - store connection info and proceed to next step context->device.discoveredDevice.ipAddr = ipAddr; - context->device.heartbeatFd = fd; context->device.startHeartbeat(); context->device.enablePcloudDataReq( @@ -236,8 +220,7 @@ void Device::connectReq(smo::Callback callback) connectToKnownDeviceReq( {request, std::bind( &ConnectReq::connectReq1, request.get(), request, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3)}); + std::placeholders::_1, std::placeholders::_2)}); } class Device::ConnectToKnownDeviceReq @@ -255,20 +238,20 @@ public: {} // Public accessor for the original callback - void callOriginalCallback(bool success, const std::string& ipAddr, int fd) - { callOriginalCb(success, ipAddr, fd); } + void callOriginalCallback(bool success, const std::string& ipAddr) + { callOriginalCb(success, ipAddr); } // Wrapper for failure cases void callOriginalCallbackWithFailure() - { callOriginalCallback(false, "", -1); } + { callOriginalCallback(false, ""); } // Callback methods for the connection sequence void connectToKnownDeviceReq1( - std::shared_ptr context, bool success, int fd + std::shared_ptr context, bool success ) { - // Return the IP address and raw FD to the caller - context->callOriginalCallback(success, context->deviceIP, fd); + // Return the IP address to the caller + context->callOriginalCallback(success, context->deviceIP); } }; @@ -340,7 +323,7 @@ void Device::connectToKnownDeviceReq( {request, std::bind( &ConnectToKnownDeviceReq::connectToKnownDeviceReq1, request.get(), request, - std::placeholders::_1, std::placeholders::_2)}); + std::placeholders::_1)}); } class Device::ConnectByDeviceIdentifierReq @@ -359,21 +342,21 @@ public: {} // Public accessor for the original callback - void callOriginalCallback(bool success, const std::string& ipAddr, int fd) - { callOriginalCb(success, ipAddr, fd); } + void callOriginalCallback(bool success, const std::string& ipAddr) + { callOriginalCb(success, ipAddr); } // Wrapper for failure cases void callOriginalCallbackWithFailure() - { callOriginalCallback(false, "", -1); } + { callOriginalCallback(false, ""); } // Callback methods for the connection sequence void connectByDeviceIdentifierReq1( std::shared_ptr context, - bool success, int fd + bool success ) { - // Return the IP address and raw FD to the caller - context->callOriginalCallback(success, context->deviceIP, fd); + // Return the IP address to the caller + context->callOriginalCallback(success, context->deviceIP); } }; @@ -395,7 +378,7 @@ void Device::connectByDeviceIdentifierReq( // Check if smoIp is provided - required for heuristic construction if (smoIp.empty()) { - callback.callbackFn(false, "", -1); + callback.callbackFn(false, ""); return; } @@ -423,7 +406,7 @@ void Device::connectByDeviceIdentifierReq( {request, std::bind( &ConnectByDeviceIdentifierReq::connectByDeviceIdentifierReq1, request.get(), request, - std::placeholders::_1, std::placeholders::_2)}); + std::placeholders::_1)}); } class Device::ExecuteHandshakeReq @@ -482,8 +465,8 @@ public: } // Public accessor for the original callback - void callOriginalCallback(bool success, int fd) - { callOriginalCb(success, fd); } + void callOriginalCallback(bool success) + { callOriginalCb(success); } void callOriginalCallbackWithFailure() { @@ -513,7 +496,7 @@ public: * manually do that at the end of the sequence. */ cleanupHandshakeSocket(); - callOriginalCallback(false, -1); + callOriginalCallback(false); } private: @@ -741,8 +724,7 @@ private: // Transfer any successful state to Device commit(); - int rawFd = cmdEndpointFdDesc->native_handle(); - callOriginalCallback(true, rawFd); + callOriginalCallback(true); } void commit() // Transfer successful state to Device object @@ -772,7 +754,7 @@ void Device::executeHandshakeReq( auto& protoState = livoxProto1::getProtoState(); if (!protoState.deviceManager) { - callback.callbackFn(false, -1); + callback.callbackFn(false); return; } @@ -783,7 +765,7 @@ void Device::executeHandshakeReq( { std::cerr << __func__ << ": UdpCommandDemuxer not started or no " "command endpoint available." << std::endl; - callback.callbackFn(false, -1); + callback.callbackFn(false); return; } @@ -818,13 +800,31 @@ void Device::executeHandshakeReq( void Device::disconnectReq(smo::Callback callback) { // Stop heartbeat first - heartbeatActive.store(false); - // Unregister heartbeat ACK handler - unregisterUdpCommandHandler(0x00, 0x03); + stopHeartbeat(); - if (heartbeatFd == -1) + if (discoveredDevice.ipAddr.empty()) { - std::cout << __func__ << ": No heartbeat socket available, skipping " + std::cout << __func__ << ": No device IP available, skipping " + "disconnect message" << std::endl; + callback.callbackFn(true); + return; + } + + // Get the command endpoint from the UdpCommandDemuxer + auto& protoState = livoxProto1::getProtoState(); + if (!protoState.deviceManager) + { + std::cout << __func__ << ": No device manager available, skipping " + "disconnect message" << std::endl; + callback.callbackFn(true); + return; + } + + auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer + .getCmdEndpointFdDesc(); + if (!cmdEndpointFdDesc) + { + std::cout << __func__ << ": No command endpoint available, skipping " "disconnect message" << std::endl; callback.callbackFn(true); return; @@ -846,7 +846,8 @@ void Device::disconnectReq(smo::Callback callback) // Send disconnect message ssize_t bytesSent = sendto( - heartbeatFd, &disconnectMsg, sizeof(disconnectMsg), 0, + cmdEndpointFdDesc->native_handle(), + &disconnectMsg, sizeof(disconnectMsg), 0, (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); if (bytesSent < 0) @@ -859,9 +860,6 @@ void Device::disconnectReq(smo::Callback callback) std::cout << __func__ << ": Sent disconnect message to " << discoveredDevice.ipAddr << ":" << 65000 << std::endl; - // Close the heartbeat socket - close(heartbeatFd); - heartbeatFd = -1; callback.callbackFn(true); } @@ -959,7 +957,6 @@ static void discardHeartbeatAck( (void)data; (void)bytesReceived; (void)senderAddr; - std::cout << "Got heartbeat ACK\n"; } void Device::startHeartbeat() @@ -971,16 +968,9 @@ void Device::startHeartbeat() ": Can't start heartbeat without component thread or IP"); } - // Check if we have the handshake socket available for heartbeat use - if (heartbeatFd < 0) - { - throw std::runtime_error( - std::string(__func__) + - ": Expected to find handshake socket present but didn't find it"); - } - // Register heartbeat ACK handler (cmd_set=0x00, cmd_id=0x03) - registerUdpCommandHandler(0x00, 0x03, discardHeartbeatAck); + registerUdpCommandHandler( + 0x00, 0x03, discardHeartbeatAck, discoveredDevice.ipAddr); // Create heartbeat timer heartbeatTimer = std::make_unique( @@ -992,6 +982,19 @@ void Device::startHeartbeat() sendHeartbeat(); } +void Device::stopHeartbeat() +{ + if (heartbeatActive.load()) + { + heartbeatActive.store(false); + if (heartbeatTimer) { + heartbeatTimer->cancel(); + } + + unregisterUdpCommandHandler(0x00, 0x03, discoveredDevice.ipAddr); + } +} + void Device::sendHeartbeat() { if (!heartbeatActive.load()) @@ -1001,10 +1004,26 @@ void Device::sendHeartbeat() return; } - if (heartbeatFd < 0 || discoveredDevice.ipAddr.empty()) + if (discoveredDevice.ipAddr.empty()) { std::cerr << __func__ << ": Ending heartbeat loop due to " - "heartbeatFd==-1 or discoveredDevice.ipAddr.empty().\n"; + "discoveredDevice.ipAddr.empty().\n"; + return; + } + + // Get the command endpoint from the UdpCommandDemuxer + auto& protoState = livoxProto1::getProtoState(); + if (!protoState.deviceManager) + { + std::cerr << __func__ << ": No device manager available\n"; + return; + } + + auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer + .getCmdEndpointFdDesc(); + if (!cmdEndpointFdDesc) + { + std::cerr << __func__ << ": No command endpoint available\n"; return; } @@ -1025,7 +1044,8 @@ void Device::sendHeartbeat() deviceAddr.sin_port = htons(65000); ssize_t bytesSent = sendto( - heartbeatFd, &heartbeatMsg, sizeof(heartbeatMsg), 0, + cmdEndpointFdDesc->native_handle(), + &heartbeatMsg, sizeof(heartbeatMsg), 0, (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); if (bytesSent < 0) @@ -1282,17 +1302,6 @@ public: } protected: - bool setupSocket() - { - // Use the existing heartbeat socket for sending commands and receiving responses - if (device.heartbeatFd < 0) - { - std::cerr << __func__ << ": No heartbeat socket available" - << std::endl; - return false; - } - return true; - } void setupAsyncCallbacks( const std::shared_ptr> &request @@ -1314,7 +1323,8 @@ protected: std::bind(&EnDisablePcloudDataReq::enDisablePcloudDataReq1_2, this, request, std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3)); + std::placeholders::_3), + device.discoveredDevice.ipAddr); } void enDisablePcloudDataReq1_1( @@ -1334,7 +1344,8 @@ protected: ) { // Unregister the handler to decrement refcount - device.unregisterUdpCommandHandler(0x00, 0x04); + device.unregisterUdpCommandHandler( + 0x00, 0x04, device.discoveredDevice.ipAddr); // Store the received data context->bytesReceived = bytesReceived; @@ -1454,13 +1465,14 @@ protected: { // Create start/stop sampling message using protocol structure livoxProto1::comms::StartStopSamplingMessage message; - // Set enable flag based on derived class implementation message.enable = getEnableFlag(); - // Calculate and set CRC32 - message.footer.crc_32 = message.calculateCrc32(); message.swapContentsToProtocolEndianness(); + message.header.setCrc16FromRawBytes(); + message.header.swapCrc16ToProtocolEndianness(); + message.footer.crc_32 = message.calculateCrc32(); + message.footer.swapCrc32ToProtocolEndianness(); struct sockaddr_in deviceAddr; memset(&deviceAddr, 0, sizeof(deviceAddr)); @@ -1470,8 +1482,24 @@ protected: deviceAddr.sin_port = htons(65000); // Command port // Send command directly (synchronous) + // Get the command endpoint from the UdpCommandDemuxer + auto& protoState = livoxProto1::getProtoState(); + if (!protoState.deviceManager) + { + std::cerr << __func__ << ": No device manager available" << std::endl; + return false; + } + + auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer + .getCmdEndpointFdDesc(); + if (!cmdEndpointFdDesc) + { + std::cerr << __func__ << ": No command endpoint available" << std::endl; + return false; + } + ssize_t bytesSent = sendto( - device.heartbeatFd, + cmdEndpointFdDesc->native_handle(), &message, sizeof(message), 0, (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); @@ -1564,21 +1592,14 @@ void Device::enablePcloudDataReq( *this, std::move(callback)); // Check if heartbeat socket is available - if (heartbeatFd < 0) + if (discoveredDevice.ipAddr.empty()) { - std::cerr << __func__ << ": No heartbeat socket available for device " + std::cerr << __func__ << ": No device IP available for device " << discoveredDevice.deviceIdentifier << std::endl; request->callOriginalCallbackWithFailure(); return; } - // Setup socket for async operations - if (!request->setupSocket()) - { - request->callOriginalCallbackWithFailure(); - return; - } - // Set up the point cloud data socket for actual data reception if (!setupPcloudDataSocket()) { @@ -1606,9 +1627,9 @@ void Device::disablePcloudDataReq( *this, std::move(callback)); // Check if heartbeat socket is available - if (heartbeatFd < 0) + if (discoveredDevice.ipAddr.empty()) { - std::cerr << __func__ << ": No heartbeat socket available for device " + std::cerr << __func__ << ": No device IP available for device " << discoveredDevice.deviceIdentifier << std::endl; request->callOriginalCallbackWithFailure(); return; @@ -1620,12 +1641,6 @@ void Device::disablePcloudDataReq( */ cleanupPcloudDataSocket(); - // Setup socket for async operations - if (!request->setupSocket()) - { - request->callOriginalCallbackWithFailure(); - return; - } // Send the stop sampling command if (!request->sendCommand()) diff --git a/commonLibs/livoxProto1/device.h b/commonLibs/livoxProto1/device.h index 5fc357f..292b5d5 100644 --- a/commonLibs/livoxProto1/device.h +++ b/commonLibs/livoxProto1/device.h @@ -105,6 +105,7 @@ public: private: // Heartbeat mechanism void startHeartbeat(); + void stopHeartbeat(); void sendHeartbeat(); void onHeartbeatTimer(const boost::system::error_code& error); std::string generateClientDeviceIpFromSerialNumber( @@ -129,12 +130,12 @@ public: // Callback function type definitions for async methods typedef std::function connectReqCbFn; typedef std::function< - void(bool success, const std::string& ipAddr, int fd)> + void(bool success, const std::string& ipAddr)> connectToKnownDeviceReqCbFn; typedef std::function< - void(bool success, const std::string& ipAddr, int fd)> + void(bool success, const std::string& ipAddr)> connectByDeviceIdentifierReqCbFn; - typedef std::function executeHandshakeReqCbFn; + typedef std::function executeHandshakeReqCbFn; typedef std::function disconnectReqCbFn; typedef std::function enablePcloudDataReqCbFn; typedef std::function disablePcloudDataReqCbFn; @@ -154,8 +155,6 @@ public: // Heartbeat state std::unique_ptr heartbeatTimer; - // FIXME: Might be useful to rename this to commandAndHeartbeatFd. - int heartbeatFd; // Socket file descriptor used for heartbeat std::atomic heartbeatActive; // Point cloud data state diff --git a/commonLibs/livoxProto1/protocol.cpp b/commonLibs/livoxProto1/protocol.cpp index 7148e89..317c36d 100644 --- a/commonLibs/livoxProto1/protocol.cpp +++ b/commonLibs/livoxProto1/protocol.cpp @@ -714,7 +714,7 @@ StartStopSamplingMessage::StartStopSamplingMessage() // Initialize header header.sof = 0xAA; header.version = 1; - header.length = sizeof(StartStopSamplingMessage) - sizeof(Header) - sizeof(Footer); + header.length = sizeof(StartStopSamplingMessage); header.cmd_type = 0x02; // MSG type header.seq_num = 0; // Will be set by caller if needed header.crc_16 = 0; // Will be calculated @@ -743,7 +743,6 @@ void StartStopSamplingMessage::swapContentsToProtocolEndianness() { header.swapToProtocolEndianness(); command.swapToProtocolEndianness(); - footer.swapToProtocolEndianness(); }