Files
salmanoff/commonLibs/livoxProto1/udpCommandDemuxer.cpp
T
hayodea b65b0f2370 UdpCmdDemux: remove stop-"responsiveness" timer
I think it's best to remove the timer tick from UdpCommandDemuxer.
I looked at it again and it doesn't actually help with responsiveness.
Whatever it contributes is no different from what stop() does.
They both just call timer.cancel and cmdsocket.cancel.
So if that doesn't stop the socket in stop(), it won't magically
stop it more effectively if I call it from a timer handler.
2025-10-31 08:20:27 -04:00

263 lines
6.1 KiB
C++

#include <iostream>
#include <cstring>
#include <functional>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <errno.h>
#include "udpCommandDemuxer.h"
#include "core.h"
#include "device.h"
namespace livoxProto1 {
namespace comms {
UdpCommandDemuxer::UdpCommandDemuxer(
const std::shared_ptr<smo::ComponentThread> &componentThread,
DeviceManager &deviceManager,
uint16_t commandPort
)
: componentThread(componentThread), deviceManager(deviceManager),
commandPort(commandPort),
senderAddrLen(sizeof(senderAddr))
{
}
UdpCommandDemuxer::~UdpCommandDemuxer()
{
stop();
}
void UdpCommandDemuxer::start()
{
if (isActive.load())
{
std::cerr << __func__ << ": Demuxer is already running"
<< std::endl;
return;
}
try
{
setupSocket();
isActive.store(true);
shouldStop.store(false);
// Start the async receive loop
startAsyncReceive();
std::cout
<< __func__ << ": UDP Command Demuxer started on port "
<< commandPort << std::endl;
}
catch (const std::exception &e)
{
std::cerr
<< __func__ << ": Failed to start demuxer: "
<< e.what() << std::endl;
isActive.store(false);
throw;
}
}
void UdpCommandDemuxer::stop()
{
if (!isActive.load())
{ return; }
shouldStop.store(true);
// Close socket and cleanup
if (cmdEndpointFdDesc)
{
cmdEndpointFdDesc->cancel();
cmdEndpointFdDesc.reset();
}
isActive.store(false);
std::cout
<< __func__ << ": UDP Command Demuxer stopped"
<< std::endl;
}
void UdpCommandDemuxer::setupSocket()
{
// RAII class to manage socket file descriptor
struct SocketRAII
{
int fd;
SocketRAII(int socketFd) : fd(socketFd) {}
~SocketRAII() { if (fd >= 0) close(fd); }
void commit() { fd = -1; } // Transfer ownership, prevent close
int getFd() const { return fd; }
bool isValid() const { return fd >= 0; }
};
// Create UDP socket
SocketRAII socketGuard(socket(AF_INET, SOCK_DGRAM, 0));
if (!socketGuard.isValid())
{
throw std::runtime_error(
std::string(__func__)
+ ": Failed to create socket: " + strerror(errno));
}
// Set socket to non-blocking mode
int flags = fcntl(socketGuard.getFd(), F_GETFL, 0);
if (flags < 0 || fcntl(
socketGuard.getFd(), F_SETFL, flags | O_NONBLOCK) < 0)
{
throw std::runtime_error(
std::string(__func__)
+ ": Failed to set non-blocking mode: " + strerror(errno));
}
// Bind to command port
struct sockaddr_in localAddr;
memset(&localAddr, 0, sizeof(localAddr));
localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = INADDR_ANY;
localAddr.sin_port = htons(commandPort);
if (bind(
socketGuard.getFd(), (struct sockaddr *)&localAddr,
sizeof(localAddr)) < 0)
{
throw std::runtime_error(
std::string(__func__) + ": Failed to bind to port "
+ std::to_string(commandPort) + ": " + strerror(errno));
}
// Create boost wrapper for async operations
cmdEndpointFdDesc = std::make_shared<boost::asio::posix::stream_descriptor>(
componentThread->getIoService(), socketGuard.getFd());
// Transfer ownership, prevent auto-close
socketGuard.commit();
}
void UdpCommandDemuxer::startAsyncReceive()
{
if (!isActive.load() || shouldStop.load())
{ return; }
cmdEndpointFdDesc->async_wait(
boost::asio::posix::stream_descriptor::wait_read,
std::bind(
&UdpCommandDemuxer::onDataReady, this, std::placeholders::_1));
}
void UdpCommandDemuxer::onDataReady(const boost::system::error_code &error)
{
if (error)
{
if (error != boost::asio::error::operation_aborted)
{
std::cerr
<< __func__ << ": Socket error: "
<< error.message() << std::endl;
}
return;
}
if (!isActive.load() || shouldStop.load())
{ return; }
// Read the data
bytesReceived = recvfrom(
cmdEndpointFdDesc->native_handle(), receiveBuffer,
sizeof(receiveBuffer), 0,
(struct sockaddr *)&senderAddr, &senderAddrLen);
if (bytesReceived > 0) {
processIncomingData();
}
else if (bytesReceived < 0)
{
if (errno != EAGAIN && errno != EWOULDBLOCK)
{
std::cerr << __func__ << ": recvfrom error: "
<< strerror(errno) << std::endl;
}
}
// Continue listening for more data
startAsyncReceive();
}
void UdpCommandDemuxer::processIncomingData()
{
if (bytesReceived < 2)
{
// Too small to contain any meaningful data
return;
}
// Extract source IP address
char sourceIP[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &senderAddr.sin_addr, sourceIP, INET_ADDRSTRLEN);
// First, find device with matching IP address in DeviceManager collection
for (const auto &device : deviceManager.devices)
{
if (device->discoveredDevice.ipAddr != sourceIP) { continue; }
// Found matching device, route the datagram to it
try
{
device->handleUdpDgram(
receiveBuffer, bytesReceived, senderAddr);
}
catch (const std::exception &e)
{
std::cerr
<< __func__ << ": Device handler exception for IP "
<< sourceIP << ": " << e.what() << std::endl;
}
return;
}
// If not found in DeviceManager, check temporary collection (devices under construction)
auto tempIt = livoxProto1::Device::devicesUnderConstruction.find(sourceIP);
if (tempIt != livoxProto1::Device::devicesUnderConstruction.end())
{
// Extract command set and command ID from the datagram
if (bytesReceived >= static_cast<ssize_t>(
sizeof(livoxProto1::comms::Header) + sizeof(livoxProto1::comms::Command)))
{
uint8_t cmd_set = receiveBuffer[sizeof(livoxProto1::comms::Header)];
uint8_t cmd_id = receiveBuffer[sizeof(livoxProto1::comms::Header) + 1];
// Found matching device in temporary collection, invoke matching handlers
for (const auto& cmdHandler : tempIt->second)
{
if (cmdHandler.cmd_set == cmd_set && cmdHandler.cmd_id == cmd_id)
{
try
{
cmdHandler.handler(receiveBuffer, bytesReceived, senderAddr);
}
catch (const std::exception &e)
{
std::cerr
<< __func__ << ": Temporary device handler exception for IP "
<< sourceIP << ": " << e.what() << std::endl;
}
}
}
}
return;
}
// No device found with matching IP in either collection, discard the data
std::cerr
<< __func__ << ": No device found for source IP "
<< sourceIP << ", discarding datagram" << std::endl;
}
} // namespace comms
} // namespace livoxProto1