341 lines
8.1 KiB
C++
341 lines
8.1 KiB
C++
#include <iostream>
|
|
#include <cstring>
|
|
#include <functional>
|
|
#include <unistd.h>
|
|
#include <sys/socket.h>
|
|
#include <netinet/in.h>
|
|
#include <arpa/inet.h>
|
|
#include <fcntl.h>
|
|
#include <errno.h>
|
|
|
|
#include "udpCommandDemuxer.h"
|
|
#include "core.h"
|
|
#include "device.h"
|
|
|
|
namespace livoxProto1 {
|
|
namespace comms {
|
|
|
|
UdpCommandDemuxer::UdpCommandDemuxer(
|
|
const std::shared_ptr<smo::ComponentThread> &componentThread,
|
|
DeviceManager &deviceManager,
|
|
uint16_t commandPort,
|
|
uint16_t dataPort
|
|
)
|
|
: componentThread(componentThread), deviceManager(deviceManager),
|
|
commandPort(commandPort), dataPort(dataPort),
|
|
senderAddrLen(sizeof(senderAddr))
|
|
{
|
|
}
|
|
|
|
UdpCommandDemuxer::~UdpCommandDemuxer()
|
|
{
|
|
stop();
|
|
}
|
|
|
|
void UdpCommandDemuxer::start()
|
|
{
|
|
if (isActive.load())
|
|
{
|
|
std::cerr << __func__ << ": Demuxer is already running"
|
|
<< std::endl;
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
{
|
|
smo::SpinLock::Guard lock(isActiveAndShouldStopLock);
|
|
|
|
setupSockets();
|
|
isActive.store(true);
|
|
shouldStop.store(false);
|
|
}
|
|
|
|
// Start the async receive loop
|
|
startAsyncReceive();
|
|
|
|
std::cout
|
|
<< __func__ << ": UDP Command Demuxer started on port "
|
|
<< commandPort << std::endl;
|
|
}
|
|
catch (const std::exception &e)
|
|
{
|
|
std::cerr
|
|
<< __func__ << ": Failed to start demuxer: "
|
|
<< e.what() << std::endl;
|
|
isActive.store(false);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
void UdpCommandDemuxer::stop()
|
|
{
|
|
{
|
|
smo::SpinLock::Guard lock(isActiveAndShouldStopLock);
|
|
if (!isActive.load())
|
|
{ return; }
|
|
|
|
shouldStop.store(true);
|
|
}
|
|
|
|
// Close socket and cleanup
|
|
if (cmdEndpointFdDesc)
|
|
{
|
|
cmdEndpointFdDesc->cancel();
|
|
cmdEndpointFdDesc.reset();
|
|
}
|
|
|
|
if (pcloudDataFdDesc)
|
|
{
|
|
pcloudDataFdDesc->cancel();
|
|
pcloudDataFdDesc.reset();
|
|
}
|
|
|
|
isActive.store(false);
|
|
std::cout
|
|
<< __func__ << ": UDP Command Demuxer stopped"
|
|
<< std::endl;
|
|
}
|
|
|
|
void UdpCommandDemuxer::setupSockets()
|
|
{
|
|
setupCommandSocket();
|
|
setupPcloudDataSocket();
|
|
}
|
|
|
|
void UdpCommandDemuxer::setupCommandSocket()
|
|
{
|
|
// RAII class to manage socket file descriptor
|
|
struct SocketRAII
|
|
{
|
|
int fd;
|
|
SocketRAII(int socketFd) : fd(socketFd) {}
|
|
~SocketRAII() { if (fd >= 0) close(fd); }
|
|
void commit() { fd = -1; } // Transfer ownership, prevent close
|
|
int getFd() const { return fd; }
|
|
bool isValid() const { return fd >= 0; }
|
|
};
|
|
|
|
// Create UDP socket
|
|
SocketRAII socketGuard(socket(AF_INET, SOCK_DGRAM, 0));
|
|
if (!socketGuard.isValid())
|
|
{
|
|
throw std::runtime_error(
|
|
std::string(__func__)
|
|
+ ": Failed to create socket: " + strerror(errno));
|
|
}
|
|
|
|
// Set socket to non-blocking mode
|
|
int flags = fcntl(socketGuard.getFd(), F_GETFL, 0);
|
|
if (flags < 0 || fcntl(
|
|
socketGuard.getFd(), F_SETFL, flags | O_NONBLOCK) < 0)
|
|
{
|
|
throw std::runtime_error(
|
|
std::string(__func__)
|
|
+ ": Failed to set non-blocking mode: " + strerror(errno));
|
|
}
|
|
|
|
// Bind to command port
|
|
struct sockaddr_in localAddr;
|
|
memset(&localAddr, 0, sizeof(localAddr));
|
|
localAddr.sin_family = AF_INET;
|
|
localAddr.sin_addr.s_addr = INADDR_ANY;
|
|
localAddr.sin_port = htons(commandPort);
|
|
|
|
if (bind(
|
|
socketGuard.getFd(), (struct sockaddr *)&localAddr,
|
|
sizeof(localAddr)) < 0)
|
|
{
|
|
throw std::runtime_error(
|
|
std::string(__func__) + ": Failed to bind to port "
|
|
+ std::to_string(commandPort) + ": " + strerror(errno));
|
|
}
|
|
|
|
// Create boost wrapper for async operations
|
|
cmdEndpointFdDesc = std::make_shared<boost::asio::posix::stream_descriptor>(
|
|
componentThread->getIoService(), socketGuard.getFd());
|
|
|
|
// Transfer ownership, prevent auto-close
|
|
socketGuard.commit();
|
|
}
|
|
|
|
void UdpCommandDemuxer::setupPcloudDataSocket()
|
|
{
|
|
// RAII class to manage socket file descriptor
|
|
struct SocketRAII
|
|
{
|
|
int fd;
|
|
SocketRAII(int socketFd) : fd(socketFd) {}
|
|
~SocketRAII() { if (fd >= 0) close(fd); }
|
|
void commit() { fd = -1; } // Transfer ownership, prevent close
|
|
int getFd() const { return fd; }
|
|
bool isValid() const { return fd >= 0; }
|
|
};
|
|
|
|
// Create UDP socket for point cloud data reception
|
|
SocketRAII socketGuard(socket(AF_INET, SOCK_DGRAM, 0));
|
|
if (!socketGuard.isValid())
|
|
{
|
|
throw std::runtime_error(
|
|
std::string(__func__)
|
|
+ ": Failed to create socket: " + strerror(errno));
|
|
}
|
|
|
|
// Set socket to non-blocking mode
|
|
int flags = fcntl(socketGuard.getFd(), F_GETFL, 0);
|
|
if (flags < 0 ||
|
|
fcntl(socketGuard.getFd(), F_SETFL, flags | O_NONBLOCK) < 0)
|
|
{
|
|
throw std::runtime_error(
|
|
std::string(__func__)
|
|
+ ": Failed to set non-blocking mode: " + strerror(errno));
|
|
}
|
|
|
|
// Bind to the data port
|
|
struct sockaddr_in localAddr;
|
|
memset(&localAddr, 0, sizeof(localAddr));
|
|
localAddr.sin_family = AF_INET;
|
|
localAddr.sin_addr.s_addr = INADDR_ANY;
|
|
localAddr.sin_port = htons(dataPort);
|
|
|
|
if (bind(
|
|
socketGuard.getFd(), (struct sockaddr *)&localAddr,
|
|
sizeof(localAddr)) < 0)
|
|
{
|
|
throw std::runtime_error(
|
|
std::string(__func__) + ": Failed to bind to data port: "
|
|
+ std::to_string(dataPort) + ": " + strerror(errno));
|
|
}
|
|
|
|
// Create boost wrapper for async operations
|
|
pcloudDataFdDesc = std::make_shared<boost::asio::posix::stream_descriptor>(
|
|
componentThread->getIoService(), socketGuard.getFd());
|
|
|
|
// Transfer ownership, prevent auto-close
|
|
socketGuard.commit();
|
|
}
|
|
|
|
void UdpCommandDemuxer::startAsyncReceive()
|
|
{
|
|
if (!isActive.load() || shouldStop.load())
|
|
{ return; }
|
|
|
|
cmdEndpointFdDesc->async_wait(
|
|
boost::asio::posix::stream_descriptor::wait_read,
|
|
std::bind(
|
|
&UdpCommandDemuxer::onDataReady, this, std::placeholders::_1));
|
|
}
|
|
|
|
void UdpCommandDemuxer::onDataReady(const boost::system::error_code &error)
|
|
{
|
|
if (error)
|
|
{
|
|
if (error != boost::asio::error::operation_aborted)
|
|
{
|
|
std::cerr
|
|
<< __func__ << ": Socket error: "
|
|
<< error.message() << std::endl;
|
|
}
|
|
return;
|
|
}
|
|
|
|
smo::SpinLock::Guard lock(isActiveAndShouldStopLock);
|
|
|
|
if (!isActive.load() || shouldStop.load())
|
|
{ return; }
|
|
|
|
// Read the data
|
|
bytesReceived = recvfrom(
|
|
cmdEndpointFdDesc->native_handle(), receiveBuffer,
|
|
sizeof(receiveBuffer), 0,
|
|
(struct sockaddr *)&senderAddr, &senderAddrLen);
|
|
|
|
if (bytesReceived > 0) {
|
|
processIncomingData();
|
|
}
|
|
else if (bytesReceived < 0)
|
|
{
|
|
if (errno != EAGAIN && errno != EWOULDBLOCK)
|
|
{
|
|
std::cerr << __func__ << ": recvfrom error: "
|
|
<< strerror(errno) << std::endl;
|
|
}
|
|
}
|
|
|
|
// Continue listening for more data
|
|
startAsyncReceive();
|
|
}
|
|
|
|
void UdpCommandDemuxer::processIncomingData()
|
|
{
|
|
if (bytesReceived < 2)
|
|
{
|
|
// Too small to contain any meaningful data
|
|
return;
|
|
}
|
|
|
|
// Extract source IP address
|
|
char sourceIP[INET_ADDRSTRLEN];
|
|
inet_ntop(AF_INET, &senderAddr.sin_addr, sourceIP, INET_ADDRSTRLEN);
|
|
|
|
// First, find device with matching IP address in DeviceManager collection
|
|
for (const auto &device : deviceManager.devices)
|
|
{
|
|
if (device->discoveredDevice.ipAddr != sourceIP) { continue; }
|
|
|
|
// Found matching device, route the datagram to it
|
|
try
|
|
{
|
|
device->handleUdpDgram(
|
|
receiveBuffer, bytesReceived, senderAddr);
|
|
}
|
|
catch (const std::exception &e)
|
|
{
|
|
std::cerr
|
|
<< __func__ << ": Device handler exception for IP "
|
|
<< sourceIP << ": " << e.what() << std::endl;
|
|
}
|
|
return;
|
|
}
|
|
|
|
// If not found in DeviceManager, check temporary collection (devices under construction)
|
|
auto tempIt = livoxProto1::Device::devicesUnderConstruction.find(sourceIP);
|
|
if (tempIt != livoxProto1::Device::devicesUnderConstruction.end())
|
|
{
|
|
// Extract command set and command ID from the datagram
|
|
if (bytesReceived >= static_cast<ssize_t>(
|
|
sizeof(livoxProto1::comms::Header) + sizeof(livoxProto1::comms::Command)))
|
|
{
|
|
uint8_t cmd_set = receiveBuffer[sizeof(livoxProto1::comms::Header)];
|
|
uint8_t cmd_id = receiveBuffer[sizeof(livoxProto1::comms::Header) + 1];
|
|
|
|
// Found matching device in temporary collection, invoke matching handlers
|
|
for (const auto& cmdHandler : tempIt->second)
|
|
{
|
|
if (cmdHandler.cmd_set != cmd_set || cmdHandler.cmd_id != cmd_id)
|
|
{ continue; }
|
|
|
|
try
|
|
{
|
|
cmdHandler.handler(receiveBuffer, bytesReceived, senderAddr);
|
|
}
|
|
catch (const std::exception &e)
|
|
{
|
|
std::cerr
|
|
<< __func__ << ": Temporary device handler exception for IP "
|
|
<< sourceIP << ": " << e.what() << std::endl;
|
|
}
|
|
}
|
|
}
|
|
return;
|
|
}
|
|
|
|
// No device found with matching IP in either collection, discard the data
|
|
std::cerr
|
|
<< __func__ << ": No device found for source IP "
|
|
<< sourceIP << ", discarding datagram" << std::endl;
|
|
}
|
|
|
|
} // namespace comms
|
|
} // namespace livoxProto1
|