#include #include #include #include #include #include #include #include #include #include "udpCommandDemuxer.h" #include "core.h" #include "device.h" namespace livoxProto1 { namespace comms { UdpCommandDemuxer::UdpCommandDemuxer( const std::shared_ptr &componentThread, DeviceManager &deviceManager, uint16_t commandPort ) : componentThread(componentThread), deviceManager(deviceManager), commandPort(commandPort), senderAddrLen(sizeof(senderAddr)) { } UdpCommandDemuxer::~UdpCommandDemuxer() { stop(); } void UdpCommandDemuxer::start() { if (isActive.load()) { std::cerr << __func__ << ": Demuxer is already running" << std::endl; return; } try { setupSocket(); isActive.store(true); shouldStop.store(false); // Start the async receive loop startAsyncReceive(); std::cout << __func__ << ": UDP Command Demuxer started on port " << commandPort << std::endl; } catch (const std::exception &e) { std::cerr << __func__ << ": Failed to start demuxer: " << e.what() << std::endl; isActive.store(false); throw; } } void UdpCommandDemuxer::stop() { if (!isActive.load()) { return; } shouldStop.store(true); // Close socket and cleanup if (cmdEndpointFdDesc) { cmdEndpointFdDesc->cancel(); cmdEndpointFdDesc.reset(); } isActive.store(false); std::cout << __func__ << ": UDP Command Demuxer stopped" << std::endl; } void UdpCommandDemuxer::setupSocket() { // RAII class to manage socket file descriptor struct SocketRAII { int fd; SocketRAII(int socketFd) : fd(socketFd) {} ~SocketRAII() { if (fd >= 0) close(fd); } void commit() { fd = -1; } // Transfer ownership, prevent close int getFd() const { return fd; } bool isValid() const { return fd >= 0; } }; // Create UDP socket SocketRAII socketGuard(socket(AF_INET, SOCK_DGRAM, 0)); if (!socketGuard.isValid()) { throw std::runtime_error( std::string(__func__) + ": Failed to create socket: " + strerror(errno)); } // Set socket to non-blocking mode int flags = fcntl(socketGuard.getFd(), F_GETFL, 0); if (flags < 0 || fcntl( socketGuard.getFd(), F_SETFL, flags | O_NONBLOCK) < 0) { throw std::runtime_error( std::string(__func__) + ": Failed to set non-blocking mode: " + strerror(errno)); } // Bind to command port struct sockaddr_in localAddr; memset(&localAddr, 0, sizeof(localAddr)); localAddr.sin_family = AF_INET; localAddr.sin_addr.s_addr = INADDR_ANY; localAddr.sin_port = htons(commandPort); if (bind( socketGuard.getFd(), (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) { throw std::runtime_error( std::string(__func__) + ": Failed to bind to port " + std::to_string(commandPort) + ": " + strerror(errno)); } // Create boost wrapper for async operations cmdEndpointFdDesc = std::make_shared( componentThread->getIoService(), socketGuard.getFd()); // Transfer ownership, prevent auto-close socketGuard.commit(); } void UdpCommandDemuxer::startAsyncReceive() { if (!isActive.load() || shouldStop.load()) { return; } cmdEndpointFdDesc->async_wait( boost::asio::posix::stream_descriptor::wait_read, std::bind( &UdpCommandDemuxer::onDataReady, this, std::placeholders::_1)); } void UdpCommandDemuxer::onDataReady(const boost::system::error_code &error) { if (error) { if (error != boost::asio::error::operation_aborted) { std::cerr << __func__ << ": Socket error: " << error.message() << std::endl; } return; } if (!isActive.load() || shouldStop.load()) { return; } // Read the data bytesReceived = recvfrom( cmdEndpointFdDesc->native_handle(), receiveBuffer, sizeof(receiveBuffer), 0, (struct sockaddr *)&senderAddr, &senderAddrLen); if (bytesReceived > 0) { processIncomingData(); } else if (bytesReceived < 0) { if (errno != EAGAIN && errno != EWOULDBLOCK) { std::cerr << __func__ << ": recvfrom error: " << strerror(errno) << std::endl; } } // Continue listening for more data startAsyncReceive(); } void UdpCommandDemuxer::processIncomingData() { if (bytesReceived < 2) { // Too small to contain any meaningful data return; } // Extract source IP address char sourceIP[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &senderAddr.sin_addr, sourceIP, INET_ADDRSTRLEN); // First, find device with matching IP address in DeviceManager collection for (const auto &device : deviceManager.devices) { if (device->discoveredDevice.ipAddr != sourceIP) { continue; } // Found matching device, route the datagram to it try { device->handleUdpDgram( receiveBuffer, bytesReceived, senderAddr); } catch (const std::exception &e) { std::cerr << __func__ << ": Device handler exception for IP " << sourceIP << ": " << e.what() << std::endl; } return; } // If not found in DeviceManager, check temporary collection (devices under construction) auto tempIt = livoxProto1::Device::devicesUnderConstruction.find(sourceIP); if (tempIt != livoxProto1::Device::devicesUnderConstruction.end()) { // Extract command set and command ID from the datagram if (bytesReceived >= static_cast( sizeof(livoxProto1::comms::Header) + sizeof(livoxProto1::comms::Command))) { uint8_t cmd_set = receiveBuffer[sizeof(livoxProto1::comms::Header)]; uint8_t cmd_id = receiveBuffer[sizeof(livoxProto1::comms::Header) + 1]; // Found matching device in temporary collection, invoke matching handlers for (const auto& cmdHandler : tempIt->second) { if (cmdHandler.cmd_set != cmd_set || cmdHandler.cmd_id != cmd_id) { continue; } try { cmdHandler.handler(receiveBuffer, bytesReceived, senderAddr); } catch (const std::exception &e) { std::cerr << __func__ << ": Temporary device handler exception for IP " << sourceIP << ": " << e.what() << std::endl; } } } return; } // No device found with matching IP in either collection, discard the data std::cerr << __func__ << ": No device found for source IP " << sourceIP << ", discarding datagram" << std::endl; } } // namespace comms } // namespace livoxProto1