livoxProto1: Open pcloudDataSocket in UdpCmdDemux

The pcloudData socket is now opened by UdpCommandDemuxer, when
libLivoxProto1 is initialized. We can now just pick up the socket
and be certain it'll be there if the lib is being executed.
This commit is contained in:
2025-11-01 22:41:58 -04:00
parent 10e615e75e
commit 45ad5c83ee
6 changed files with 101 additions and 99 deletions
+1 -92
View File
@@ -113,13 +113,9 @@ Device::~Device()
if (pcloudDataActive.load()) {
pcloudDataActive.store(false);
if (pcloudDataSocketDesc) {
pcloudDataSocketDesc->cancel();
}
}
heartbeatTimer.reset();
pcloudDataSocketDesc.reset();
}
/**
@@ -1347,9 +1343,6 @@ public:
// The timeout timer.
boost::asio::deadline_timer timeoutTimer;
// Temporary point cloud data socket descriptor (for enable operations)
boost::asio::posix::stream_descriptor tmpPcloudEnableFdDesc;
// Received data storage
uint8_t responseBuffer[1024]{};
ssize_t bytesReceived = -1;
@@ -1362,8 +1355,7 @@ protected:
smo::Callback<CallbackType> cb)
: smo::NonPostedAsynchronousContinuation<CallbackType>(std::move(cb)),
device(dev),
timeoutTimer(device.componentThread->getIoService()),
tmpPcloudEnableFdDesc(device.componentThread->getIoService())
timeoutTimer(device.componentThread->getIoService())
{}
public:
@@ -1540,10 +1532,6 @@ protected:
void cleanup()
{
timeoutTimer.cancel();
if (tmpPcloudEnableFdDesc.is_open()) {
tmpPcloudEnableFdDesc.cancel();
tmpPcloudEnableFdDesc.close();
}
}
// Pure virtual methods that derived classes must implement
@@ -1551,61 +1539,6 @@ protected:
virtual const char* getCommandName() const = 0;
virtual void setPcloudDataActiveState() = 0;
// Method to set up temporary point cloud data socket
bool setupTmpPcloudDataSocket()
{
// RAII class to manage socket file descriptor
struct SocketRAII
{
int fd;
SocketRAII(int socketFd) : fd(socketFd) {}
~SocketRAII() { if (fd >= 0) close(fd); }
void release() { fd = -1; } // Release ownership, prevent close
int getFd() const { return fd; }
bool isValid() const { return fd >= 0; }
};
// Create UDP socket for point cloud data reception
SocketRAII socketGuard(socket(AF_INET, SOCK_DGRAM, 0));
if (!socketGuard.isValid())
{
std::cerr << __func__ << ": Failed to create socket: "
<< strerror(errno) << std::endl;
return false;
}
// Set socket to non-blocking mode
int flags = fcntl(socketGuard.getFd(), F_GETFL, 0);
if (flags < 0 ||
fcntl(socketGuard.getFd(), F_SETFL, flags | O_NONBLOCK) < 0)
{
std::cerr << __func__ << ": Failed to set non-blocking mode: "
<< strerror(errno) << std::endl;
return false;
}
// Bind to the data port (65001)
struct sockaddr_in localAddr;
memset(&localAddr, 0, sizeof(localAddr));
localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = INADDR_ANY;
localAddr.sin_port = htons(65001); // Data port
if (bind(
socketGuard.getFd(), (struct sockaddr *)&localAddr,
sizeof(localAddr)) < 0)
{
std::cerr << __func__ << ": Failed to bind to data port: "
<< strerror(errno) << std::endl;
return false;
}
// Assign the file descriptor to the stream descriptor
tmpPcloudEnableFdDesc.assign(socketGuard.getFd());
socketGuard.release();
return true;
}
// Common sendCommand implementation
bool sendCommand()
{
@@ -1691,17 +1624,6 @@ private:
void setPcloudDataActiveState() override
{
// Transfer ownership of the socket from temporary to main descriptor
if (tmpPcloudEnableFdDesc.is_open())
{
// Close the temporary descriptor (but don't close the fd)
int fd = tmpPcloudEnableFdDesc.native_handle();
tmpPcloudEnableFdDesc.release();
// Give the transient FD to the Device object.
device.pcloudDataSocketDesc =
std::make_unique<boost::asio::posix::stream_descriptor>(
device.componentThread->getIoService(), fd);
}
device.pcloudDataActive.store(true);
}
};
@@ -1758,15 +1680,6 @@ void Device::enablePcloudDataReq(
return;
}
// Set up the temporary point cloud data socket for actual data reception
if (!request->setupTmpPcloudDataSocket())
{
std::cerr << __func__ << ": Failed to set up transient pcloud data FD."
<< std::endl;
request->callOriginalCallbackWithFailure();
return;
}
// Send the start sampling command
if (!request->sendCommand())
{
@@ -1813,10 +1726,6 @@ void Device::disablePcloudDataReq(
void Device::cleanupPcloudDataSocket()
{
if (pcloudDataSocketDesc) {
pcloudDataSocketDesc->cancel();
pcloudDataSocketDesc.reset();
}
pcloudDataActive.store(false);
}
-1
View File
@@ -163,7 +163,6 @@ public:
std::atomic<bool> heartbeatActive;
// Point cloud data state
std::unique_ptr<boost::asio::posix::stream_descriptor> pcloudDataSocketDesc;
std::atomic<bool> pcloudDataActive;
// Cached last-known return mode for this device
+15
View File
@@ -1,8 +1,10 @@
#include <stdexcept>
#include <callback.h>
#include <boost/asio/posix/stream_descriptor.hpp>
#include "livoxProto1.h"
#include "device.h"
#include "core.h"
#include "udpCommandDemuxer.h"
extern "C" {
@@ -104,4 +106,17 @@ void livoxProto1_device_getReturnModeReq(
device->getReturnModeReq(callback);
}
std::shared_ptr<boost::asio::posix::stream_descriptor>
livoxProto1_getPcloudDataFdDesc(void)
{
auto& protoState = livoxProto1::getProtoState();
if (!protoState.deviceManager)
{
throw std::runtime_error(std::string(__func__)
+ ": DeviceManager not initialized");
}
return protoState.deviceManager->udpCommandDemuxer.pcloudDataSocketDesc;
}
} // extern "C"
+4
View File
@@ -85,6 +85,9 @@ typedef void livoxProto1_device_getReturnModeReqFn(
std::shared_ptr<livoxProto1::Device> device,
smo::Callback<livoxProto1_device_getReturnModeReqCbFn> callback);
typedef std::shared_ptr<boost::asio::posix::stream_descriptor>
livoxProto1_getPcloudDataFdDescFn(void);
livoxProto1_mainFn livoxProto1_main;
livoxProto1_exitFn livoxProto1_exit;
livoxProto1_getOrCreateDeviceReqFn livoxProto1_getOrCreateDeviceReq;
@@ -93,6 +96,7 @@ livoxProto1_device_enablePcloudDataReqFn livoxProto1_device_enablePcloudDataReq;
livoxProto1_device_disablePcloudDataReqFn
livoxProto1_device_disablePcloudDataReq;
livoxProto1_device_getReturnModeReqFn livoxProto1_device_getReturnModeReq;
livoxProto1_getPcloudDataFdDescFn livoxProto1_getPcloudDataFdDesc;
#ifdef __cplusplus
}
+73 -4
View File
@@ -18,10 +18,11 @@ namespace comms {
UdpCommandDemuxer::UdpCommandDemuxer(
const std::shared_ptr<smo::ComponentThread> &componentThread,
DeviceManager &deviceManager,
uint16_t commandPort
uint16_t commandPort,
uint16_t dataPort
)
: componentThread(componentThread), deviceManager(deviceManager),
commandPort(commandPort),
commandPort(commandPort), dataPort(dataPort),
senderAddrLen(sizeof(senderAddr))
{
}
@@ -42,7 +43,7 @@ void UdpCommandDemuxer::start()
try
{
setupSocket();
setupSockets();
isActive.store(true);
shouldStop.store(false);
@@ -77,13 +78,25 @@ void UdpCommandDemuxer::stop()
cmdEndpointFdDesc.reset();
}
if (pcloudDataSocketDesc)
{
pcloudDataSocketDesc->cancel();
pcloudDataSocketDesc.reset();
}
isActive.store(false);
std::cout
<< __func__ << ": UDP Command Demuxer stopped"
<< std::endl;
}
void UdpCommandDemuxer::setupSocket()
void UdpCommandDemuxer::setupSockets()
{
setupCommandSocket();
setupPcloudDataSocket();
}
void UdpCommandDemuxer::setupCommandSocket()
{
// RAII class to manage socket file descriptor
struct SocketRAII
@@ -139,6 +152,62 @@ void UdpCommandDemuxer::setupSocket()
socketGuard.commit();
}
void UdpCommandDemuxer::setupPcloudDataSocket()
{
// RAII class to manage socket file descriptor
struct SocketRAII
{
int fd;
SocketRAII(int socketFd) : fd(socketFd) {}
~SocketRAII() { if (fd >= 0) close(fd); }
void commit() { fd = -1; } // Transfer ownership, prevent close
int getFd() const { return fd; }
bool isValid() const { return fd >= 0; }
};
// Create UDP socket for point cloud data reception
SocketRAII socketGuard(socket(AF_INET, SOCK_DGRAM, 0));
if (!socketGuard.isValid())
{
throw std::runtime_error(
std::string(__func__)
+ ": Failed to create socket: " + strerror(errno));
}
// Set socket to non-blocking mode
int flags = fcntl(socketGuard.getFd(), F_GETFL, 0);
if (flags < 0 ||
fcntl(socketGuard.getFd(), F_SETFL, flags | O_NONBLOCK) < 0)
{
throw std::runtime_error(
std::string(__func__)
+ ": Failed to set non-blocking mode: " + strerror(errno));
}
// Bind to the data port
struct sockaddr_in localAddr;
memset(&localAddr, 0, sizeof(localAddr));
localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = INADDR_ANY;
localAddr.sin_port = htons(dataPort);
if (bind(
socketGuard.getFd(), (struct sockaddr *)&localAddr,
sizeof(localAddr)) < 0)
{
throw std::runtime_error(
std::string(__func__) + ": Failed to bind to data port: "
+ std::to_string(dataPort) + ": " + strerror(errno));
}
// Create boost wrapper for async operations
pcloudDataSocketDesc = std::make_shared<boost::asio::posix::stream_descriptor>(
componentThread->getIoService(), socketGuard.getFd());
// Transfer ownership, prevent auto-close
socketGuard.commit();
}
void UdpCommandDemuxer::startAsyncReceive()
{
if (!isActive.load() || shouldStop.load())
+8 -2
View File
@@ -37,7 +37,8 @@ public:
UdpCommandDemuxer(
const std::shared_ptr<smo::ComponentThread>& componentThread,
DeviceManager& deviceManager,
uint16_t commandPort = 56001);
uint16_t commandPort = 56001,
uint16_t dataPort = 56000);
~UdpCommandDemuxer();
@@ -53,7 +54,9 @@ public:
}
private:
void setupSocket();
void setupSockets();
void setupCommandSocket();
void setupPcloudDataSocket();
void startAsyncReceive();
void onDataReady(const boost::system::error_code& error);
void processIncomingData();
@@ -61,7 +64,10 @@ private:
std::shared_ptr<smo::ComponentThread> componentThread;
DeviceManager& deviceManager;
uint16_t commandPort;
uint16_t dataPort;
// Socket and async objects
std::shared_ptr<boost::asio::posix::stream_descriptor> pcloudDataSocketDesc;
// Socket and async objects
std::shared_ptr<boost::asio::posix::stream_descriptor> cmdEndpointFdDesc;