LivoxProto1: Pcloud data stream now working

This commit is contained in:
2025-10-23 00:24:23 -04:00
parent a4d99e5d4d
commit 5db1cfdac8
3 changed files with 120 additions and 107 deletions
+115 -100
View File
@@ -100,7 +100,6 @@ componentThread(componentThread),
handshakeTimeoutMs(handshakeTimeoutMs), retryDelayMs(retryDelayMs), handshakeTimeoutMs(handshakeTimeoutMs), retryDelayMs(retryDelayMs),
smoIp(smoIp), detectedSmoListeningIp(""), smoSubnetNbits(smoSubnetNbits), smoIp(smoIp), detectedSmoListeningIp(""), smoSubnetNbits(smoSubnetNbits),
dataPort(dataPort), cmdPort(cmdPort), imuPort(imuPort), dataPort(dataPort), cmdPort(cmdPort), imuPort(imuPort),
heartbeatFd(-1),
heartbeatActive(false), heartbeatActive(false),
pcloudDataActive(false), pcloudDataActive(false),
pcloudDataFd(-1) pcloudDataFd(-1)
@@ -109,15 +108,7 @@ pcloudDataFd(-1)
Device::~Device() Device::~Device()
{ {
if (heartbeatActive.load()) stopHeartbeat();
{
heartbeatActive.store(false);
if (heartbeatTimer) {
heartbeatTimer->cancel();
}
unregisterUdpCommandHandler(0x00, 0x03);
}
if (pcloudDataActive.load()) { if (pcloudDataActive.load()) {
pcloudDataActive.store(false); pcloudDataActive.store(false);
@@ -128,10 +119,6 @@ Device::~Device()
heartbeatTimer.reset(); heartbeatTimer.reset();
pcloudDataSocketDesc.reset(); pcloudDataSocketDesc.reset();
if (heartbeatFd >= 0) {
close(heartbeatFd);
heartbeatFd = -1;
}
if (pcloudDataFd >= 0) { if (pcloudDataFd >= 0) {
close(pcloudDataFd); close(pcloudDataFd);
pcloudDataFd = -1; pcloudDataFd = -1;
@@ -160,7 +147,7 @@ public:
// Callback methods for the connection sequence // Callback methods for the connection sequence
void connectReq1( void connectReq1(
std::shared_ptr<ConnectReq> context, std::shared_ptr<ConnectReq> context,
bool success, const std::string& ipAddr, int fd bool success, const std::string& ipAddr
) )
{ {
// Fail early - if handshake failed, try next method // Fail early - if handshake failed, try next method
@@ -176,15 +163,13 @@ public:
context->device.connectByDeviceIdentifierReq( context->device.connectByDeviceIdentifierReq(
{context, std::bind(&ConnectReq::connectReq2, {context, std::bind(&ConnectReq::connectReq2,
context.get(), context, context.get(), context,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_1, std::placeholders::_2)});
std::placeholders::_3)});
return; return;
} }
// Success - store connection info and proceed to next step // Success - store connection info and proceed to next step
context->device.discoveredDevice.ipAddr = ipAddr; context->device.discoveredDevice.ipAddr = ipAddr;
context->device.heartbeatFd = fd;
context->device.startHeartbeat(); context->device.startHeartbeat();
context->device.enablePcloudDataReq( context->device.enablePcloudDataReq(
@@ -194,7 +179,7 @@ public:
void connectReq2( void connectReq2(
std::shared_ptr<ConnectReq> context, std::shared_ptr<ConnectReq> context,
bool success, const std::string& ipAddr, int fd bool success, const std::string& ipAddr
) )
{ {
// Fail early - if this also failed, all connection attempts failed // Fail early - if this also failed, all connection attempts failed
@@ -206,7 +191,6 @@ public:
// Success - store connection info and proceed to next step // Success - store connection info and proceed to next step
context->device.discoveredDevice.ipAddr = ipAddr; context->device.discoveredDevice.ipAddr = ipAddr;
context->device.heartbeatFd = fd;
context->device.startHeartbeat(); context->device.startHeartbeat();
context->device.enablePcloudDataReq( context->device.enablePcloudDataReq(
@@ -236,8 +220,7 @@ void Device::connectReq(smo::Callback<Device::connectReqCbFn> callback)
connectToKnownDeviceReq( connectToKnownDeviceReq(
{request, std::bind( {request, std::bind(
&ConnectReq::connectReq1, request.get(), request, &ConnectReq::connectReq1, request.get(), request,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_1, std::placeholders::_2)});
std::placeholders::_3)});
} }
class Device::ConnectToKnownDeviceReq class Device::ConnectToKnownDeviceReq
@@ -255,20 +238,20 @@ public:
{} {}
// Public accessor for the original callback // Public accessor for the original callback
void callOriginalCallback(bool success, const std::string& ipAddr, int fd) void callOriginalCallback(bool success, const std::string& ipAddr)
{ callOriginalCb(success, ipAddr, fd); } { callOriginalCb(success, ipAddr); }
// Wrapper for failure cases // Wrapper for failure cases
void callOriginalCallbackWithFailure() void callOriginalCallbackWithFailure()
{ callOriginalCallback(false, "", -1); } { callOriginalCallback(false, ""); }
// Callback methods for the connection sequence // Callback methods for the connection sequence
void connectToKnownDeviceReq1( void connectToKnownDeviceReq1(
std::shared_ptr<ConnectToKnownDeviceReq> context, bool success, int fd std::shared_ptr<ConnectToKnownDeviceReq> context, bool success
) )
{ {
// Return the IP address and raw FD to the caller // Return the IP address to the caller
context->callOriginalCallback(success, context->deviceIP, fd); context->callOriginalCallback(success, context->deviceIP);
} }
}; };
@@ -340,7 +323,7 @@ void Device::connectToKnownDeviceReq(
{request, std::bind( {request, std::bind(
&ConnectToKnownDeviceReq::connectToKnownDeviceReq1, &ConnectToKnownDeviceReq::connectToKnownDeviceReq1,
request.get(), request, request.get(), request,
std::placeholders::_1, std::placeholders::_2)}); std::placeholders::_1)});
} }
class Device::ConnectByDeviceIdentifierReq class Device::ConnectByDeviceIdentifierReq
@@ -359,21 +342,21 @@ public:
{} {}
// Public accessor for the original callback // Public accessor for the original callback
void callOriginalCallback(bool success, const std::string& ipAddr, int fd) void callOriginalCallback(bool success, const std::string& ipAddr)
{ callOriginalCb(success, ipAddr, fd); } { callOriginalCb(success, ipAddr); }
// Wrapper for failure cases // Wrapper for failure cases
void callOriginalCallbackWithFailure() void callOriginalCallbackWithFailure()
{ callOriginalCallback(false, "", -1); } { callOriginalCallback(false, ""); }
// Callback methods for the connection sequence // Callback methods for the connection sequence
void connectByDeviceIdentifierReq1( void connectByDeviceIdentifierReq1(
std::shared_ptr<ConnectByDeviceIdentifierReq> context, std::shared_ptr<ConnectByDeviceIdentifierReq> context,
bool success, int fd bool success
) )
{ {
// Return the IP address and raw FD to the caller // Return the IP address to the caller
context->callOriginalCallback(success, context->deviceIP, fd); context->callOriginalCallback(success, context->deviceIP);
} }
}; };
@@ -395,7 +378,7 @@ void Device::connectByDeviceIdentifierReq(
// Check if smoIp is provided - required for heuristic construction // Check if smoIp is provided - required for heuristic construction
if (smoIp.empty()) if (smoIp.empty())
{ {
callback.callbackFn(false, "", -1); callback.callbackFn(false, "");
return; return;
} }
@@ -423,7 +406,7 @@ void Device::connectByDeviceIdentifierReq(
{request, std::bind( {request, std::bind(
&ConnectByDeviceIdentifierReq::connectByDeviceIdentifierReq1, &ConnectByDeviceIdentifierReq::connectByDeviceIdentifierReq1,
request.get(), request, request.get(), request,
std::placeholders::_1, std::placeholders::_2)}); std::placeholders::_1)});
} }
class Device::ExecuteHandshakeReq class Device::ExecuteHandshakeReq
@@ -482,8 +465,8 @@ public:
} }
// Public accessor for the original callback // Public accessor for the original callback
void callOriginalCallback(bool success, int fd) void callOriginalCallback(bool success)
{ callOriginalCb(success, fd); } { callOriginalCb(success); }
void callOriginalCallbackWithFailure() void callOriginalCallbackWithFailure()
{ {
@@ -513,7 +496,7 @@ public:
* manually do that at the end of the sequence. * manually do that at the end of the sequence.
*/ */
cleanupHandshakeSocket(); cleanupHandshakeSocket();
callOriginalCallback(false, -1); callOriginalCallback(false);
} }
private: private:
@@ -741,8 +724,7 @@ private:
// Transfer any successful state to Device // Transfer any successful state to Device
commit(); commit();
int rawFd = cmdEndpointFdDesc->native_handle(); callOriginalCallback(true);
callOriginalCallback(true, rawFd);
} }
void commit() // Transfer successful state to Device object void commit() // Transfer successful state to Device object
@@ -772,7 +754,7 @@ void Device::executeHandshakeReq(
auto& protoState = livoxProto1::getProtoState(); auto& protoState = livoxProto1::getProtoState();
if (!protoState.deviceManager) if (!protoState.deviceManager)
{ {
callback.callbackFn(false, -1); callback.callbackFn(false);
return; return;
} }
@@ -783,7 +765,7 @@ void Device::executeHandshakeReq(
{ {
std::cerr << __func__ << ": UdpCommandDemuxer not started or no " std::cerr << __func__ << ": UdpCommandDemuxer not started or no "
"command endpoint available." << std::endl; "command endpoint available." << std::endl;
callback.callbackFn(false, -1); callback.callbackFn(false);
return; return;
} }
@@ -818,13 +800,31 @@ void Device::executeHandshakeReq(
void Device::disconnectReq(smo::Callback<Device::disconnectReqCbFn> callback) void Device::disconnectReq(smo::Callback<Device::disconnectReqCbFn> callback)
{ {
// Stop heartbeat first // Stop heartbeat first
heartbeatActive.store(false); stopHeartbeat();
// Unregister heartbeat ACK handler
unregisterUdpCommandHandler(0x00, 0x03);
if (heartbeatFd == -1) if (discoveredDevice.ipAddr.empty())
{ {
std::cout << __func__ << ": No heartbeat socket available, skipping " std::cout << __func__ << ": No device IP available, skipping "
"disconnect message" << std::endl;
callback.callbackFn(true);
return;
}
// Get the command endpoint from the UdpCommandDemuxer
auto& protoState = livoxProto1::getProtoState();
if (!protoState.deviceManager)
{
std::cout << __func__ << ": No device manager available, skipping "
"disconnect message" << std::endl;
callback.callbackFn(true);
return;
}
auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer
.getCmdEndpointFdDesc();
if (!cmdEndpointFdDesc)
{
std::cout << __func__ << ": No command endpoint available, skipping "
"disconnect message" << std::endl; "disconnect message" << std::endl;
callback.callbackFn(true); callback.callbackFn(true);
return; return;
@@ -846,7 +846,8 @@ void Device::disconnectReq(smo::Callback<Device::disconnectReqCbFn> callback)
// Send disconnect message // Send disconnect message
ssize_t bytesSent = sendto( ssize_t bytesSent = sendto(
heartbeatFd, &disconnectMsg, sizeof(disconnectMsg), 0, cmdEndpointFdDesc->native_handle(),
&disconnectMsg, sizeof(disconnectMsg), 0,
(struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); (struct sockaddr*)&deviceAddr, sizeof(deviceAddr));
if (bytesSent < 0) if (bytesSent < 0)
@@ -859,9 +860,6 @@ void Device::disconnectReq(smo::Callback<Device::disconnectReqCbFn> callback)
std::cout << __func__ << ": Sent disconnect message to " std::cout << __func__ << ": Sent disconnect message to "
<< discoveredDevice.ipAddr << ":" << 65000 << std::endl; << discoveredDevice.ipAddr << ":" << 65000 << std::endl;
// Close the heartbeat socket
close(heartbeatFd);
heartbeatFd = -1;
callback.callbackFn(true); callback.callbackFn(true);
} }
@@ -959,7 +957,6 @@ static void discardHeartbeatAck(
(void)data; (void)data;
(void)bytesReceived; (void)bytesReceived;
(void)senderAddr; (void)senderAddr;
std::cout << "Got heartbeat ACK\n";
} }
void Device::startHeartbeat() void Device::startHeartbeat()
@@ -971,16 +968,9 @@ void Device::startHeartbeat()
": Can't start heartbeat without component thread or IP"); ": Can't start heartbeat without component thread or IP");
} }
// Check if we have the handshake socket available for heartbeat use
if (heartbeatFd < 0)
{
throw std::runtime_error(
std::string(__func__) +
": Expected to find handshake socket present but didn't find it");
}
// Register heartbeat ACK handler (cmd_set=0x00, cmd_id=0x03) // Register heartbeat ACK handler (cmd_set=0x00, cmd_id=0x03)
registerUdpCommandHandler(0x00, 0x03, discardHeartbeatAck); registerUdpCommandHandler(
0x00, 0x03, discardHeartbeatAck, discoveredDevice.ipAddr);
// Create heartbeat timer // Create heartbeat timer
heartbeatTimer = std::make_unique<boost::asio::deadline_timer>( heartbeatTimer = std::make_unique<boost::asio::deadline_timer>(
@@ -992,6 +982,19 @@ void Device::startHeartbeat()
sendHeartbeat(); sendHeartbeat();
} }
void Device::stopHeartbeat()
{
if (heartbeatActive.load())
{
heartbeatActive.store(false);
if (heartbeatTimer) {
heartbeatTimer->cancel();
}
unregisterUdpCommandHandler(0x00, 0x03, discoveredDevice.ipAddr);
}
}
void Device::sendHeartbeat() void Device::sendHeartbeat()
{ {
if (!heartbeatActive.load()) if (!heartbeatActive.load())
@@ -1001,10 +1004,26 @@ void Device::sendHeartbeat()
return; return;
} }
if (heartbeatFd < 0 || discoveredDevice.ipAddr.empty()) if (discoveredDevice.ipAddr.empty())
{ {
std::cerr << __func__ << ": Ending heartbeat loop due to " std::cerr << __func__ << ": Ending heartbeat loop due to "
"heartbeatFd==-1 or discoveredDevice.ipAddr.empty().\n"; "discoveredDevice.ipAddr.empty().\n";
return;
}
// Get the command endpoint from the UdpCommandDemuxer
auto& protoState = livoxProto1::getProtoState();
if (!protoState.deviceManager)
{
std::cerr << __func__ << ": No device manager available\n";
return;
}
auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer
.getCmdEndpointFdDesc();
if (!cmdEndpointFdDesc)
{
std::cerr << __func__ << ": No command endpoint available\n";
return; return;
} }
@@ -1025,7 +1044,8 @@ void Device::sendHeartbeat()
deviceAddr.sin_port = htons(65000); deviceAddr.sin_port = htons(65000);
ssize_t bytesSent = sendto( ssize_t bytesSent = sendto(
heartbeatFd, &heartbeatMsg, sizeof(heartbeatMsg), 0, cmdEndpointFdDesc->native_handle(),
&heartbeatMsg, sizeof(heartbeatMsg), 0,
(struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); (struct sockaddr*)&deviceAddr, sizeof(deviceAddr));
if (bytesSent < 0) if (bytesSent < 0)
@@ -1282,17 +1302,6 @@ public:
} }
protected: protected:
bool setupSocket()
{
// Use the existing heartbeat socket for sending commands and receiving responses
if (device.heartbeatFd < 0)
{
std::cerr << __func__ << ": No heartbeat socket available"
<< std::endl;
return false;
}
return true;
}
void setupAsyncCallbacks( void setupAsyncCallbacks(
const std::shared_ptr<EnDisablePcloudDataReq<CallbackType>> &request const std::shared_ptr<EnDisablePcloudDataReq<CallbackType>> &request
@@ -1314,7 +1323,8 @@ protected:
std::bind(&EnDisablePcloudDataReq<CallbackType>::enDisablePcloudDataReq1_2, std::bind(&EnDisablePcloudDataReq<CallbackType>::enDisablePcloudDataReq1_2,
this, request, this, request,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3)); std::placeholders::_3),
device.discoveredDevice.ipAddr);
} }
void enDisablePcloudDataReq1_1( void enDisablePcloudDataReq1_1(
@@ -1334,7 +1344,8 @@ protected:
) )
{ {
// Unregister the handler to decrement refcount // Unregister the handler to decrement refcount
device.unregisterUdpCommandHandler(0x00, 0x04); device.unregisterUdpCommandHandler(
0x00, 0x04, device.discoveredDevice.ipAddr);
// Store the received data // Store the received data
context->bytesReceived = bytesReceived; context->bytesReceived = bytesReceived;
@@ -1454,13 +1465,14 @@ protected:
{ {
// Create start/stop sampling message using protocol structure // Create start/stop sampling message using protocol structure
livoxProto1::comms::StartStopSamplingMessage message; livoxProto1::comms::StartStopSamplingMessage message;
// Set enable flag based on derived class implementation // Set enable flag based on derived class implementation
message.enable = getEnableFlag(); message.enable = getEnableFlag();
// Calculate and set CRC32
message.footer.crc_32 = message.calculateCrc32();
message.swapContentsToProtocolEndianness(); message.swapContentsToProtocolEndianness();
message.header.setCrc16FromRawBytes();
message.header.swapCrc16ToProtocolEndianness();
message.footer.crc_32 = message.calculateCrc32();
message.footer.swapCrc32ToProtocolEndianness();
struct sockaddr_in deviceAddr; struct sockaddr_in deviceAddr;
memset(&deviceAddr, 0, sizeof(deviceAddr)); memset(&deviceAddr, 0, sizeof(deviceAddr));
@@ -1470,8 +1482,24 @@ protected:
deviceAddr.sin_port = htons(65000); // Command port deviceAddr.sin_port = htons(65000); // Command port
// Send command directly (synchronous) // Send command directly (synchronous)
// Get the command endpoint from the UdpCommandDemuxer
auto& protoState = livoxProto1::getProtoState();
if (!protoState.deviceManager)
{
std::cerr << __func__ << ": No device manager available" << std::endl;
return false;
}
auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer
.getCmdEndpointFdDesc();
if (!cmdEndpointFdDesc)
{
std::cerr << __func__ << ": No command endpoint available" << std::endl;
return false;
}
ssize_t bytesSent = sendto( ssize_t bytesSent = sendto(
device.heartbeatFd, cmdEndpointFdDesc->native_handle(),
&message, sizeof(message), 0, &message, sizeof(message), 0,
(struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); (struct sockaddr*)&deviceAddr, sizeof(deviceAddr));
@@ -1564,21 +1592,14 @@ void Device::enablePcloudDataReq(
*this, std::move(callback)); *this, std::move(callback));
// Check if heartbeat socket is available // Check if heartbeat socket is available
if (heartbeatFd < 0) if (discoveredDevice.ipAddr.empty())
{ {
std::cerr << __func__ << ": No heartbeat socket available for device " std::cerr << __func__ << ": No device IP available for device "
<< discoveredDevice.deviceIdentifier << std::endl; << discoveredDevice.deviceIdentifier << std::endl;
request->callOriginalCallbackWithFailure(); request->callOriginalCallbackWithFailure();
return; return;
} }
// Setup socket for async operations
if (!request->setupSocket())
{
request->callOriginalCallbackWithFailure();
return;
}
// Set up the point cloud data socket for actual data reception // Set up the point cloud data socket for actual data reception
if (!setupPcloudDataSocket()) if (!setupPcloudDataSocket())
{ {
@@ -1606,9 +1627,9 @@ void Device::disablePcloudDataReq(
*this, std::move(callback)); *this, std::move(callback));
// Check if heartbeat socket is available // Check if heartbeat socket is available
if (heartbeatFd < 0) if (discoveredDevice.ipAddr.empty())
{ {
std::cerr << __func__ << ": No heartbeat socket available for device " std::cerr << __func__ << ": No device IP available for device "
<< discoveredDevice.deviceIdentifier << std::endl; << discoveredDevice.deviceIdentifier << std::endl;
request->callOriginalCallbackWithFailure(); request->callOriginalCallbackWithFailure();
return; return;
@@ -1620,12 +1641,6 @@ void Device::disablePcloudDataReq(
*/ */
cleanupPcloudDataSocket(); cleanupPcloudDataSocket();
// Setup socket for async operations
if (!request->setupSocket())
{
request->callOriginalCallbackWithFailure();
return;
}
// Send the stop sampling command // Send the stop sampling command
if (!request->sendCommand()) if (!request->sendCommand())
+4 -5
View File
@@ -105,6 +105,7 @@ public:
private: private:
// Heartbeat mechanism // Heartbeat mechanism
void startHeartbeat(); void startHeartbeat();
void stopHeartbeat();
void sendHeartbeat(); void sendHeartbeat();
void onHeartbeatTimer(const boost::system::error_code& error); void onHeartbeatTimer(const boost::system::error_code& error);
std::string generateClientDeviceIpFromSerialNumber( std::string generateClientDeviceIpFromSerialNumber(
@@ -129,12 +130,12 @@ public:
// Callback function type definitions for async methods // Callback function type definitions for async methods
typedef std::function<void(bool success)> connectReqCbFn; typedef std::function<void(bool success)> connectReqCbFn;
typedef std::function< typedef std::function<
void(bool success, const std::string& ipAddr, int fd)> void(bool success, const std::string& ipAddr)>
connectToKnownDeviceReqCbFn; connectToKnownDeviceReqCbFn;
typedef std::function< typedef std::function<
void(bool success, const std::string& ipAddr, int fd)> void(bool success, const std::string& ipAddr)>
connectByDeviceIdentifierReqCbFn; connectByDeviceIdentifierReqCbFn;
typedef std::function<void(bool success, int fd)> executeHandshakeReqCbFn; typedef std::function<void(bool success)> executeHandshakeReqCbFn;
typedef std::function<void(bool success)> disconnectReqCbFn; typedef std::function<void(bool success)> disconnectReqCbFn;
typedef std::function<void(bool success)> enablePcloudDataReqCbFn; typedef std::function<void(bool success)> enablePcloudDataReqCbFn;
typedef std::function<void(bool success)> disablePcloudDataReqCbFn; typedef std::function<void(bool success)> disablePcloudDataReqCbFn;
@@ -154,8 +155,6 @@ public:
// Heartbeat state // Heartbeat state
std::unique_ptr<boost::asio::deadline_timer> heartbeatTimer; std::unique_ptr<boost::asio::deadline_timer> heartbeatTimer;
// FIXME: Might be useful to rename this to commandAndHeartbeatFd.
int heartbeatFd; // Socket file descriptor used for heartbeat
std::atomic<bool> heartbeatActive; std::atomic<bool> heartbeatActive;
// Point cloud data state // Point cloud data state
+1 -2
View File
@@ -714,7 +714,7 @@ StartStopSamplingMessage::StartStopSamplingMessage()
// Initialize header // Initialize header
header.sof = 0xAA; header.sof = 0xAA;
header.version = 1; header.version = 1;
header.length = sizeof(StartStopSamplingMessage) - sizeof(Header) - sizeof(Footer); header.length = sizeof(StartStopSamplingMessage);
header.cmd_type = 0x02; // MSG type header.cmd_type = 0x02; // MSG type
header.seq_num = 0; // Will be set by caller if needed header.seq_num = 0; // Will be set by caller if needed
header.crc_16 = 0; // Will be calculated header.crc_16 = 0; // Will be calculated
@@ -743,7 +743,6 @@ void StartStopSamplingMessage::swapContentsToProtocolEndianness()
{ {
header.swapToProtocolEndianness(); header.swapToProtocolEndianness();
command.swapToProtocolEndianness(); command.swapToProtocolEndianness();
footer.swapToProtocolEndianness();
} }