LivoxProto1: Add UdpCommandDemuxer.

We haven't genericized it with an unordered_map or integrated it
into device.cpp's async methods yet.
This commit is contained in:
2025-10-22 06:17:42 -04:00
parent 66a9db13c3
commit 10afec2532
7 changed files with 391 additions and 1 deletions
+1
View File
@@ -7,6 +7,7 @@ if(ENABLE_LIB_livoxProto1)
device.cpp
protocol.cpp
broadcastListener.cpp
udpCommandDemuxer.cpp
)
# Set config define for header generation
+2 -1
View File
@@ -29,7 +29,8 @@ ProtoState& getProtoState()
}
DeviceManager::DeviceManager()
: broadcastListener(protoState.componentThread)
: broadcastListener(protoState.componentThread),
udpCommandDemuxer(protoState.componentThread, *this)
{
broadcastListener.setDeviceGoneAwayCb(deviceGoneAwayInd);
}
+2
View File
@@ -9,6 +9,7 @@
#include <user/senseApiDesc.h>
#include "device.h"
#include "broadcastListener.h"
#include "udpCommandDemuxer.h"
#include "livoxProto1.h"
#include <callback.h>
@@ -50,6 +51,7 @@ private:
public:
std::vector<std::shared_ptr<Device>> devices;
comms::BroadcastListener broadcastListener;
comms::UdpCommandDemuxer udpCommandDemuxer;
// Nested continuation class for async device creation
class GetOrCreateDeviceReq;
+50
View File
@@ -1743,4 +1743,54 @@ void Device::cleanupPcloudDataSocket()
pcloudDataActive.store(false);
}
void Device::handleUdpDgram(const uint8_t *data, ssize_t bytesReceived,
const struct sockaddr_in &senderAddr)
{
(void)senderAddr;
// Check minimum size for any valid protocol message
if (bytesReceived < static_cast<ssize_t>(
sizeof(comms::Header) + sizeof(comms::Command)))
{
// Too small for header + command
return;
}
// Extract command set and command ID from the first two bytes after the header
uint8_t cmd_set = data[sizeof(comms::Header)];
uint8_t cmd_id = data[sizeof(comms::Header) + 1];
// Route based on command type
if (cmd_set == 0x00 && cmd_id == 0x01)
{
// Handshake ACK - check if we have enough data for HandshakeResponse
if (bytesReceived >= static_cast<ssize_t>(
sizeof(comms::HandshakeResponse)))
{
std::cout << __func__ << ": Received handshake ACK from "
<< discoveredDevice.deviceIdentifier << std::endl;
}
}
else if (cmd_set == 0x00 && cmd_id == 0x03)
{
// Heartbeat ACK - check if we have enough data for HeartbeatMessage
if (bytesReceived >= static_cast<ssize_t>(
sizeof(comms::HeartbeatMessage)))
{
// Empty intentionally.
}
}
else if (cmd_set == 0x00 && cmd_id == 0x04)
{
// Sampling response - check if we have enough data for SamplingResponse
if (bytesReceived >= static_cast<ssize_t>(
sizeof(comms::SamplingResponse)))
{
std::cout << __func__ << ": Received sampling response from "
<< discoveredDevice.deviceIdentifier << std::endl;
}
}
// Unknown command types are silently ignored
}
} // namespace livoxProto1
+6
View File
@@ -140,6 +140,12 @@ public:
std::atomic<bool> pcloudDataActive;
int pcloudDataFd; // Socket file descriptor for point cloud data reception
public:
// UDP datagram handling
void handleUdpDgram(
const uint8_t* data, ssize_t bytesReceived,
const struct sockaddr_in& senderAddr);
private:
// Point cloud data setup
bool setupPcloudDataSocket();
@@ -0,0 +1,264 @@
#include <iostream>
#include <cstring>
#include <functional>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <errno.h>
#include "udpCommandDemuxer.h"
#include "core.h"
namespace livoxProto1 {
namespace comms {
UdpCommandDemuxer::UdpCommandDemuxer(
const std::shared_ptr<smo::ComponentThread> &componentThread,
DeviceManager &deviceManager,
uint16_t commandPort
)
: componentThread(componentThread), deviceManager(deviceManager),
commandPort(commandPort),
timer(componentThread->getIoService()),
senderAddrLen(sizeof(senderAddr))
{
}
UdpCommandDemuxer::~UdpCommandDemuxer()
{
stop();
}
void UdpCommandDemuxer::start()
{
if (isActive.load())
{
std::cerr << __func__ << ": Demuxer is already running"
<< std::endl;
return;
}
try
{
setupSocket();
isActive.store(true);
shouldStop.store(false);
// Start the async receive loop
startAsyncReceive();
// Start the timer for responsiveness to stop()
timer.expires_from_now(boost::posix_time::milliseconds(1000));
timer.async_wait(
std::bind(
&UdpCommandDemuxer::onTimerTick, this, std::placeholders::_1));
std::cout
<< __func__ << ": UDP Command Demuxer started on port "
<< commandPort << std::endl;
}
catch (const std::exception &e)
{
std::cerr
<< __func__ << ": Failed to start demuxer: "
<< e.what() << std::endl;
isActive.store(false);
throw;
}
}
void UdpCommandDemuxer::stop()
{
if (!isActive.load())
{ return; }
shouldStop.store(true);
// Cancel timer
timer.cancel();
// Close socket and cleanup
if (socketDesc)
{
socketDesc->cancel();
socketDesc.reset();
}
isActive.store(false);
std::cout
<< __func__ << ": UDP Command Demuxer stopped"
<< std::endl;
}
void UdpCommandDemuxer::setupSocket()
{
// RAII class to manage socket file descriptor
struct SocketRAII
{
int fd;
SocketRAII(int socketFd) : fd(socketFd) {}
~SocketRAII() { if (fd >= 0) close(fd); }
void commit() { fd = -1; } // Transfer ownership, prevent close
int getFd() const { return fd; }
bool isValid() const { return fd >= 0; }
};
// Create UDP socket
SocketRAII socketGuard(socket(AF_INET, SOCK_DGRAM, 0));
if (!socketGuard.isValid())
{
throw std::runtime_error(
std::string(__func__)
+ ": Failed to create socket: " + strerror(errno));
}
// Set socket to non-blocking mode
int flags = fcntl(socketGuard.getFd(), F_GETFL, 0);
if (flags < 0 || fcntl(
socketGuard.getFd(), F_SETFL, flags | O_NONBLOCK) < 0)
{
throw std::runtime_error(
std::string(__func__)
+ ": Failed to set non-blocking mode: " + strerror(errno));
}
// Bind to command port
struct sockaddr_in localAddr;
memset(&localAddr, 0, sizeof(localAddr));
localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = INADDR_ANY;
localAddr.sin_port = htons(commandPort);
if (bind(
socketGuard.getFd(), (struct sockaddr *)&localAddr,
sizeof(localAddr)) < 0)
{
throw std::runtime_error(
std::string(__func__) + ": Failed to bind to port "
+ std::to_string(commandPort) + ": " + strerror(errno));
}
// Create boost wrapper for async operations
socketDesc =std::make_unique<boost::asio::posix::stream_descriptor>(
componentThread->getIoService(), socketGuard.getFd());
// Transfer ownership, prevent auto-close
socketGuard.commit();
}
void UdpCommandDemuxer::startAsyncReceive()
{
if (!isActive.load() || shouldStop.load())
{ return; }
socketDesc->async_wait(
boost::asio::posix::stream_descriptor::wait_read,
std::bind(
&UdpCommandDemuxer::onDataReady, this, std::placeholders::_1));
}
void UdpCommandDemuxer::onDataReady(const boost::system::error_code &error)
{
if (error)
{
if (error != boost::asio::error::operation_aborted)
{
std::cerr
<< __func__ << ": Socket error: "
<< error.message() << std::endl;
}
return;
}
if (!isActive.load() || shouldStop.load())
{ return; }
// Read the data
bytesReceived = recvfrom(
socketDesc->native_handle(), receiveBuffer,
sizeof(receiveBuffer), 0,
(struct sockaddr *)&senderAddr, &senderAddrLen);
if (bytesReceived > 0) {
processIncomingData();
}
else if (bytesReceived < 0)
{
if (errno != EAGAIN && errno != EWOULDBLOCK)
{
std::cerr << __func__ << ": recvfrom error: "
<< strerror(errno) << std::endl;
}
}
// Continue listening for more data
startAsyncReceive();
}
void UdpCommandDemuxer::onTimerTick(const boost::system::error_code &error)
{
if (error == boost::asio::error::operation_aborted)
{ return; }
if (shouldStop.load())
{
// Stop was called, cancel async operations and stop timer
if (socketDesc) {
socketDesc->cancel();
}
timer.cancel();
return;
}
// Re-arm timer for next tick
if (isActive.load())
{
timer.expires_from_now(boost::posix_time::milliseconds(1000));
timer.async_wait(
std::bind(
&UdpCommandDemuxer::onTimerTick, this, std::placeholders::_1));
}
}
void UdpCommandDemuxer::processIncomingData()
{
if (bytesReceived < 2)
{
// Too small to contain any meaningful data
return;
}
// Extract source IP address
char sourceIP[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &senderAddr.sin_addr, sourceIP, INET_ADDRSTRLEN);
// Find device with matching IP address
for (const auto &device : deviceManager.devices)
{
if (device->discoveredDevice.ipAddr != sourceIP) { continue; }
// Found matching device, route the datagram to it
try
{
device->handleUdpDgram(
receiveBuffer, bytesReceived, senderAddr);
}
catch (const std::exception &e)
{
std::cerr
<< __func__ << ": Device handler exception for IP "
<< sourceIP << ": " << e.what() << std::endl;
}
return;
}
// No device found with matching IP, discard the data
std::cerr
<< __func__ << ": No device found for source IP "
<< sourceIP << ", discarding datagram" << std::endl;
}
} // namespace comms
} // namespace livoxProto1
@@ -0,0 +1,66 @@
#ifndef UDP_COMMAND_DEMUXER_H
#define UDP_COMMAND_DEMUXER_H
#include <atomic>
#include <memory>
#include <boost/asio.hpp>
#include <componentThread.h>
namespace livoxProto1 {
// Forward declarations
class DeviceManager;
namespace comms {
/**
* UdpCommandDemuxer - Routes UDP command datagrams to appropriate devices
*
* This class listens on the command port (65000) for incoming UDP datagrams
* from Livox devices and routes them to the appropriate Device based on
* the source IP address.
*/
class UdpCommandDemuxer
{
public:
UdpCommandDemuxer(
const std::shared_ptr<smo::ComponentThread>& componentThread,
DeviceManager& deviceManager,
uint16_t commandPort = 65000);
~UdpCommandDemuxer();
void start();
void stop();
bool isRunning() const { return isActive.load(); }
private:
void setupSocket();
void startAsyncReceive();
void onDataReady(const boost::system::error_code& error);
void onTimerTick(const boost::system::error_code& error);
void processIncomingData();
std::shared_ptr<smo::ComponentThread> componentThread;
DeviceManager& deviceManager;
uint16_t commandPort;
// Socket and async objects
std::unique_ptr<boost::asio::posix::stream_descriptor> socketDesc;
boost::asio::deadline_timer timer;
// State management
std::atomic<bool> isActive{false};
std::atomic<bool> shouldStop{false};
// Receive buffer
uint8_t receiveBuffer[1024];
struct sockaddr_in senderAddr;
socklen_t senderAddrLen;
ssize_t bytesReceived;
};
} // namespace comms
} // namespace livoxProto1
#endif // UDP_COMMAND_DEMUXER_H