diff --git a/commonLibs/livoxProto1/CMakeLists.txt b/commonLibs/livoxProto1/CMakeLists.txt index edef854..12e82f6 100644 --- a/commonLibs/livoxProto1/CMakeLists.txt +++ b/commonLibs/livoxProto1/CMakeLists.txt @@ -7,6 +7,7 @@ if(ENABLE_LIB_livoxProto1) device.cpp protocol.cpp broadcastListener.cpp + udpCommandDemuxer.cpp ) # Set config define for header generation diff --git a/commonLibs/livoxProto1/core.cpp b/commonLibs/livoxProto1/core.cpp index 65bb0dc..beceb57 100644 --- a/commonLibs/livoxProto1/core.cpp +++ b/commonLibs/livoxProto1/core.cpp @@ -29,7 +29,8 @@ ProtoState& getProtoState() } DeviceManager::DeviceManager() -: broadcastListener(protoState.componentThread) +: broadcastListener(protoState.componentThread), + udpCommandDemuxer(protoState.componentThread, *this) { broadcastListener.setDeviceGoneAwayCb(deviceGoneAwayInd); } diff --git a/commonLibs/livoxProto1/core.h b/commonLibs/livoxProto1/core.h index 1f8e207..ab6c052 100644 --- a/commonLibs/livoxProto1/core.h +++ b/commonLibs/livoxProto1/core.h @@ -9,6 +9,7 @@ #include #include "device.h" #include "broadcastListener.h" +#include "udpCommandDemuxer.h" #include "livoxProto1.h" #include @@ -50,6 +51,7 @@ private: public: std::vector> devices; comms::BroadcastListener broadcastListener; + comms::UdpCommandDemuxer udpCommandDemuxer; // Nested continuation class for async device creation class GetOrCreateDeviceReq; diff --git a/commonLibs/livoxProto1/device.cpp b/commonLibs/livoxProto1/device.cpp index 2ee0d6e..abd2f99 100644 --- a/commonLibs/livoxProto1/device.cpp +++ b/commonLibs/livoxProto1/device.cpp @@ -1743,4 +1743,54 @@ void Device::cleanupPcloudDataSocket() pcloudDataActive.store(false); } +void Device::handleUdpDgram(const uint8_t *data, ssize_t bytesReceived, + const struct sockaddr_in &senderAddr) +{ + (void)senderAddr; + + // Check minimum size for any valid protocol message + if (bytesReceived < static_cast( + sizeof(comms::Header) + sizeof(comms::Command))) + { + // Too small for header + command + return; + } + + // Extract command set and command ID from the first two bytes after the header + uint8_t cmd_set = data[sizeof(comms::Header)]; + uint8_t cmd_id = data[sizeof(comms::Header) + 1]; + + // Route based on command type + if (cmd_set == 0x00 && cmd_id == 0x01) + { + // Handshake ACK - check if we have enough data for HandshakeResponse + if (bytesReceived >= static_cast( + sizeof(comms::HandshakeResponse))) + { + std::cout << __func__ << ": Received handshake ACK from " + << discoveredDevice.deviceIdentifier << std::endl; + } + } + else if (cmd_set == 0x00 && cmd_id == 0x03) + { + // Heartbeat ACK - check if we have enough data for HeartbeatMessage + if (bytesReceived >= static_cast( + sizeof(comms::HeartbeatMessage))) + { + // Empty intentionally. + } + } + else if (cmd_set == 0x00 && cmd_id == 0x04) + { + // Sampling response - check if we have enough data for SamplingResponse + if (bytesReceived >= static_cast( + sizeof(comms::SamplingResponse))) + { + std::cout << __func__ << ": Received sampling response from " + << discoveredDevice.deviceIdentifier << std::endl; + } + } + // Unknown command types are silently ignored +} + } // namespace livoxProto1 diff --git a/commonLibs/livoxProto1/device.h b/commonLibs/livoxProto1/device.h index 6ae872d..e88f1ed 100644 --- a/commonLibs/livoxProto1/device.h +++ b/commonLibs/livoxProto1/device.h @@ -140,6 +140,12 @@ public: std::atomic pcloudDataActive; int pcloudDataFd; // Socket file descriptor for point cloud data reception +public: + // UDP datagram handling + void handleUdpDgram( + const uint8_t* data, ssize_t bytesReceived, + const struct sockaddr_in& senderAddr); + private: // Point cloud data setup bool setupPcloudDataSocket(); diff --git a/commonLibs/livoxProto1/udpCommandDemuxer.cpp b/commonLibs/livoxProto1/udpCommandDemuxer.cpp new file mode 100644 index 0000000..dcc020b --- /dev/null +++ b/commonLibs/livoxProto1/udpCommandDemuxer.cpp @@ -0,0 +1,264 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "udpCommandDemuxer.h" +#include "core.h" + +namespace livoxProto1 { +namespace comms { + +UdpCommandDemuxer::UdpCommandDemuxer( + const std::shared_ptr &componentThread, + DeviceManager &deviceManager, + uint16_t commandPort +) +: componentThread(componentThread), deviceManager(deviceManager), +commandPort(commandPort), +timer(componentThread->getIoService()), +senderAddrLen(sizeof(senderAddr)) +{ +} + +UdpCommandDemuxer::~UdpCommandDemuxer() +{ + stop(); +} + +void UdpCommandDemuxer::start() +{ + if (isActive.load()) + { + std::cerr << __func__ << ": Demuxer is already running" + << std::endl; + return; + } + + try + { + setupSocket(); + isActive.store(true); + shouldStop.store(false); + + // Start the async receive loop + startAsyncReceive(); + + // Start the timer for responsiveness to stop() + timer.expires_from_now(boost::posix_time::milliseconds(1000)); + timer.async_wait( + std::bind( + &UdpCommandDemuxer::onTimerTick, this, std::placeholders::_1)); + + std::cout + << __func__ << ": UDP Command Demuxer started on port " + << commandPort << std::endl; + } + catch (const std::exception &e) + { + std::cerr + << __func__ << ": Failed to start demuxer: " + << e.what() << std::endl; + isActive.store(false); + throw; + } +} + +void UdpCommandDemuxer::stop() +{ + if (!isActive.load()) + { return; } + + shouldStop.store(true); + + // Cancel timer + timer.cancel(); + + // Close socket and cleanup + if (socketDesc) + { + socketDesc->cancel(); + socketDesc.reset(); + } + + isActive.store(false); + std::cout + << __func__ << ": UDP Command Demuxer stopped" + << std::endl; +} + +void UdpCommandDemuxer::setupSocket() +{ + // RAII class to manage socket file descriptor + struct SocketRAII + { + int fd; + SocketRAII(int socketFd) : fd(socketFd) {} + ~SocketRAII() { if (fd >= 0) close(fd); } + void commit() { fd = -1; } // Transfer ownership, prevent close + int getFd() const { return fd; } + bool isValid() const { return fd >= 0; } + }; + + // Create UDP socket + SocketRAII socketGuard(socket(AF_INET, SOCK_DGRAM, 0)); + if (!socketGuard.isValid()) + { + throw std::runtime_error( + std::string(__func__) + + ": Failed to create socket: " + strerror(errno)); + } + + // Set socket to non-blocking mode + int flags = fcntl(socketGuard.getFd(), F_GETFL, 0); + if (flags < 0 || fcntl( + socketGuard.getFd(), F_SETFL, flags | O_NONBLOCK) < 0) + { + throw std::runtime_error( + std::string(__func__) + + ": Failed to set non-blocking mode: " + strerror(errno)); + } + + // Bind to command port + struct sockaddr_in localAddr; + memset(&localAddr, 0, sizeof(localAddr)); + localAddr.sin_family = AF_INET; + localAddr.sin_addr.s_addr = INADDR_ANY; + localAddr.sin_port = htons(commandPort); + + if (bind( + socketGuard.getFd(), (struct sockaddr *)&localAddr, + sizeof(localAddr)) < 0) + { + throw std::runtime_error( + std::string(__func__) + ": Failed to bind to port " + + std::to_string(commandPort) + ": " + strerror(errno)); + } + + // Create boost wrapper for async operations + socketDesc =std::make_unique( + componentThread->getIoService(), socketGuard.getFd()); + + // Transfer ownership, prevent auto-close + socketGuard.commit(); +} + +void UdpCommandDemuxer::startAsyncReceive() +{ + if (!isActive.load() || shouldStop.load()) + { return; } + + socketDesc->async_wait( + boost::asio::posix::stream_descriptor::wait_read, + std::bind( + &UdpCommandDemuxer::onDataReady, this, std::placeholders::_1)); +} + +void UdpCommandDemuxer::onDataReady(const boost::system::error_code &error) +{ + if (error) + { + if (error != boost::asio::error::operation_aborted) + { + std::cerr + << __func__ << ": Socket error: " + << error.message() << std::endl; + } + return; + } + + if (!isActive.load() || shouldStop.load()) + { return; } + + // Read the data + bytesReceived = recvfrom( + socketDesc->native_handle(), receiveBuffer, + sizeof(receiveBuffer), 0, + (struct sockaddr *)&senderAddr, &senderAddrLen); + + if (bytesReceived > 0) { + processIncomingData(); + } + else if (bytesReceived < 0) + { + if (errno != EAGAIN && errno != EWOULDBLOCK) + { + std::cerr << __func__ << ": recvfrom error: " + << strerror(errno) << std::endl; + } + } + + // Continue listening for more data + startAsyncReceive(); +} + +void UdpCommandDemuxer::onTimerTick(const boost::system::error_code &error) +{ + if (error == boost::asio::error::operation_aborted) + { return; } + + if (shouldStop.load()) + { + // Stop was called, cancel async operations and stop timer + if (socketDesc) { + socketDesc->cancel(); + } + timer.cancel(); + return; + } + + // Re-arm timer for next tick + if (isActive.load()) + { + timer.expires_from_now(boost::posix_time::milliseconds(1000)); + timer.async_wait( + std::bind( + &UdpCommandDemuxer::onTimerTick, this, std::placeholders::_1)); + } +} + +void UdpCommandDemuxer::processIncomingData() +{ + if (bytesReceived < 2) + { + // Too small to contain any meaningful data + return; + } + + // Extract source IP address + char sourceIP[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &senderAddr.sin_addr, sourceIP, INET_ADDRSTRLEN); + + // Find device with matching IP address + for (const auto &device : deviceManager.devices) + { + if (device->discoveredDevice.ipAddr != sourceIP) { continue; } + + // Found matching device, route the datagram to it + try + { + device->handleUdpDgram( + receiveBuffer, bytesReceived, senderAddr); + } + catch (const std::exception &e) + { + std::cerr + << __func__ << ": Device handler exception for IP " + << sourceIP << ": " << e.what() << std::endl; + } + return; + } + + // No device found with matching IP, discard the data + std::cerr + << __func__ << ": No device found for source IP " + << sourceIP << ", discarding datagram" << std::endl; +} + +} // namespace comms +} // namespace livoxProto1 diff --git a/commonLibs/livoxProto1/udpCommandDemuxer.h b/commonLibs/livoxProto1/udpCommandDemuxer.h new file mode 100644 index 0000000..bb0cb43 --- /dev/null +++ b/commonLibs/livoxProto1/udpCommandDemuxer.h @@ -0,0 +1,66 @@ +#ifndef UDP_COMMAND_DEMUXER_H +#define UDP_COMMAND_DEMUXER_H + +#include +#include +#include +#include + +namespace livoxProto1 { + +// Forward declarations +class DeviceManager; + +namespace comms { + +/** + * UdpCommandDemuxer - Routes UDP command datagrams to appropriate devices + * + * This class listens on the command port (65000) for incoming UDP datagrams + * from Livox devices and routes them to the appropriate Device based on + * the source IP address. + */ +class UdpCommandDemuxer +{ +public: + UdpCommandDemuxer( + const std::shared_ptr& componentThread, + DeviceManager& deviceManager, + uint16_t commandPort = 65000); + + ~UdpCommandDemuxer(); + + void start(); + void stop(); + bool isRunning() const { return isActive.load(); } + +private: + void setupSocket(); + void startAsyncReceive(); + void onDataReady(const boost::system::error_code& error); + void onTimerTick(const boost::system::error_code& error); + void processIncomingData(); + + std::shared_ptr componentThread; + DeviceManager& deviceManager; + uint16_t commandPort; + + // Socket and async objects + std::unique_ptr socketDesc; + boost::asio::deadline_timer timer; + + // State management + std::atomic isActive{false}; + std::atomic shouldStop{false}; + + // Receive buffer + uint8_t receiveBuffer[1024]; + struct sockaddr_in senderAddr; + socklen_t senderAddrLen; + ssize_t bytesReceived; +}; + +} // namespace comms +} // namespace livoxProto1 + +#endif // UDP_COMMAND_DEMUXER_H