Files
salmanoff/commonLibs/livoxProto1/udpCommandDemuxer.cpp
T

623 lines
16 KiB
C++
Raw Normal View History

2026-05-28 20:13:12 -04:00
2025-10-22 06:17:42 -04:00
#include <iostream>
#include <cstring>
2026-05-28 20:13:12 -04:00
#include <coroutine>
2025-10-22 06:17:42 -04:00
#include <functional>
2026-05-28 20:13:12 -04:00
#include <optional>
2025-10-22 06:17:42 -04:00
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <errno.h>
2026-05-28 20:13:12 -04:00
#include <adapters/boostAsio/deadlineTimerAReq.h>
#include <boost/asio/post.hpp>
#include <spinscale/co/group.h>
2025-10-22 06:17:42 -04:00
#include "udpCommandDemuxer.h"
2026-05-28 20:13:12 -04:00
#include "protocol.h"
2025-10-22 06:17:42 -04:00
#include "core.h"
#include "device.h"
2025-10-22 06:17:42 -04:00
namespace livoxProto1 {
namespace comms {
UdpCommandDemuxer::UdpCommandDemuxer(
2025-12-27 16:21:22 -04:00
const std::shared_ptr<sscl::ComponentThread> &componentThread,
2025-10-22 06:17:42 -04:00
DeviceManager &deviceManager,
uint16_t commandPort,
uint16_t dataPort
2025-10-22 06:17:42 -04:00
)
: componentThread(componentThread), deviceManager(deviceManager),
commandPort(commandPort), dataPort(dataPort),
2025-10-22 06:17:42 -04:00
senderAddrLen(sizeof(senderAddr))
{
}
UdpCommandDemuxer::~UdpCommandDemuxer()
{
stop();
}
void UdpCommandDemuxer::start()
{
if (isActive.load())
{
std::cerr << __func__ << ": Demuxer is already running"
<< std::endl;
return;
}
try
{
{
2025-12-27 16:21:22 -04:00
sscl::SpinLock::Guard lock(isActiveAndShouldStopLock);
setupSockets();
isActive.store(true);
shouldStop.store(false);
}
2025-10-22 06:17:42 -04:00
// Start the async receive loop
startAsyncReceive();
std::cout
<< __func__ << ": UDP Command Demuxer started on port "
<< commandPort << std::endl;
}
catch (const std::exception &e)
{
std::cerr
<< __func__ << ": Failed to start demuxer: "
<< e.what() << std::endl;
isActive.store(false);
throw;
}
}
void UdpCommandDemuxer::stop()
{
{
2025-12-27 16:21:22 -04:00
sscl::SpinLock::Guard lock(isActiveAndShouldStopLock);
if (!isActive.load())
{ return; }
2025-10-22 06:17:42 -04:00
shouldStop.store(true);
}
2025-10-22 06:17:42 -04:00
// Close socket and cleanup
if (cmdEndpointFdDesc)
2025-10-22 06:17:42 -04:00
{
cmdEndpointFdDesc->cancel();
cmdEndpointFdDesc.reset();
2025-10-22 06:17:42 -04:00
}
if (pcloudDataFdDesc)
{
pcloudDataFdDesc->cancel();
pcloudDataFdDesc.reset();
}
2025-10-22 06:17:42 -04:00
isActive.store(false);
std::cout
<< __func__ << ": UDP Command Demuxer stopped"
<< std::endl;
}
void UdpCommandDemuxer::setupSockets()
{
setupCommandSocket();
setupPcloudDataSocket();
}
void UdpCommandDemuxer::setupCommandSocket()
2025-10-22 06:17:42 -04:00
{
// RAII class to manage socket file descriptor
struct SocketRAII
{
int fd;
SocketRAII(int socketFd) : fd(socketFd) {}
~SocketRAII() { if (fd >= 0) close(fd); }
void commit() { fd = -1; } // Transfer ownership, prevent close
int getFd() const { return fd; }
bool isValid() const { return fd >= 0; }
};
// Create UDP socket
SocketRAII socketGuard(socket(AF_INET, SOCK_DGRAM, 0));
if (!socketGuard.isValid())
{
throw std::runtime_error(
std::string(__func__)
+ ": Failed to create socket: " + strerror(errno));
}
2025-12-25 18:18:46 -04:00
// Set SO_REUSEADDR to allow binding even if port is in TIME_WAIT
int reuse = 1;
if (setsockopt(
socketGuard.getFd(), SOL_SOCKET, SO_REUSEADDR,
&reuse, sizeof(reuse)) < 0)
{
throw std::runtime_error(
std::string(__func__)
+ ": Failed to set SO_REUSEADDR: " + strerror(errno));
}
2025-10-22 06:17:42 -04:00
// Set socket to non-blocking mode
int flags = fcntl(socketGuard.getFd(), F_GETFL, 0);
if (flags < 0 || fcntl(
socketGuard.getFd(), F_SETFL, flags | O_NONBLOCK) < 0)
{
throw std::runtime_error(
std::string(__func__)
+ ": Failed to set non-blocking mode: " + strerror(errno));
}
2025-12-25 18:18:46 -04:00
/** EXPLANATION:
* Bind to command port.
*
* WSL2 NAT PORT TRANSLATION ISSUE:
* On Windows 10, WSL2 uses NAT that translates UDP source ports when
* forwarding packets from WSL to the physical network. A socket bound to
* port 56001 in WSL may send from port 52511 on the wire.
*
* The device should use the cmd_port from the handshake message (56001),
* not the translated source port, so this may not break functionality.
* However, Windows NAT behavior can be unpredictable.
*
* Solutions:
* - Windows 11 22H2+: Use WSL2 mirror networking mode (.wslconfig)
* - Run natively on Linux
* - Accept the limitation (may work if device uses cmd_port from handshake)
*/
2025-10-22 06:17:42 -04:00
struct sockaddr_in localAddr;
memset(&localAddr, 0, sizeof(localAddr));
localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = INADDR_ANY;
localAddr.sin_port = htons(commandPort);
if (bind(
socketGuard.getFd(), (struct sockaddr *)&localAddr,
sizeof(localAddr)) < 0)
{
throw std::runtime_error(
std::string(__func__) + ": Failed to bind to port "
+ std::to_string(commandPort) + ": " + strerror(errno));
}
2025-12-25 18:18:46 -04:00
/* Verify the socket is actually bound to the expected port
* This helps catch WSL/Windows networking issues.
*/
struct sockaddr_in boundAddr;
socklen_t boundAddrLen = sizeof(boundAddr);
if (getsockname(
socketGuard.getFd(),
(struct sockaddr *)&boundAddr, &boundAddrLen) == 0)
{
uint16_t boundPort = ntohs(boundAddr.sin_port);
if (boundPort != commandPort)
{
std::cerr << __func__ << ": WARNING: Socket bound to port "
<< boundPort << " instead of expected port "
<< commandPort << std::endl;
}
#if 1
else
{
std::cout << __func__ << ": Successfully bound command socket "
"to port " << boundPort << std::endl;
}
#endif
}
2025-10-22 06:17:42 -04:00
// Create boost wrapper for async operations
cmdEndpointFdDesc = std::make_shared<boost::asio::posix::stream_descriptor>(
componentThread->getIoContext(), socketGuard.getFd());
2025-10-22 06:17:42 -04:00
// Transfer ownership, prevent auto-close
socketGuard.commit();
}
void UdpCommandDemuxer::setupPcloudDataSocket()
{
// RAII class to manage socket file descriptor
struct SocketRAII
{
int fd;
SocketRAII(int socketFd) : fd(socketFd) {}
~SocketRAII() { if (fd >= 0) close(fd); }
void commit() { fd = -1; } // Transfer ownership, prevent close
int getFd() const { return fd; }
bool isValid() const { return fd >= 0; }
};
// Create UDP socket for point cloud data reception
SocketRAII socketGuard(socket(AF_INET, SOCK_DGRAM, 0));
if (!socketGuard.isValid())
{
throw std::runtime_error(
std::string(__func__)
+ ": Failed to create socket: " + strerror(errno));
}
// Set socket to non-blocking mode
int flags = fcntl(socketGuard.getFd(), F_GETFL, 0);
if (flags < 0 ||
fcntl(socketGuard.getFd(), F_SETFL, flags | O_NONBLOCK) < 0)
{
throw std::runtime_error(
std::string(__func__)
+ ": Failed to set non-blocking mode: " + strerror(errno));
}
// Bind to the data port
struct sockaddr_in localAddr;
memset(&localAddr, 0, sizeof(localAddr));
localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = INADDR_ANY;
localAddr.sin_port = htons(dataPort);
if (bind(
socketGuard.getFd(), (struct sockaddr *)&localAddr,
sizeof(localAddr)) < 0)
{
throw std::runtime_error(
std::string(__func__) + ": Failed to bind to data port: "
+ std::to_string(dataPort) + ": " + strerror(errno));
}
// Create boost wrapper for async operations
pcloudDataFdDesc = std::make_shared<boost::asio::posix::stream_descriptor>(
componentThread->getIoContext(), socketGuard.getFd());
// Transfer ownership, prevent auto-close
socketGuard.commit();
}
2025-10-22 06:17:42 -04:00
void UdpCommandDemuxer::startAsyncReceive()
{
if (!isActive.load() || shouldStop.load())
{ return; }
cmdEndpointFdDesc->async_wait(
2025-10-22 06:17:42 -04:00
boost::asio::posix::stream_descriptor::wait_read,
std::bind(
&UdpCommandDemuxer::onDataReady, this, std::placeholders::_1));
}
void UdpCommandDemuxer::onDataReady(const boost::system::error_code &error)
{
if (error)
{
if (error != boost::asio::error::operation_aborted)
{
std::cerr
<< __func__ << ": Socket error: "
<< error.message() << std::endl;
}
return;
}
2025-12-27 16:21:22 -04:00
sscl::SpinLock::Guard lock(isActiveAndShouldStopLock);
2025-10-22 06:17:42 -04:00
if (!isActive.load() || shouldStop.load())
{ return; }
// Read the data
bytesReceived = recvfrom(
cmdEndpointFdDesc->native_handle(), receiveBuffer,
2025-10-22 06:17:42 -04:00
sizeof(receiveBuffer), 0,
(struct sockaddr *)&senderAddr, &senderAddrLen);
if (bytesReceived > 0) {
processIncomingData();
}
else if (bytesReceived < 0)
{
if (errno != EAGAIN && errno != EWOULDBLOCK)
{
std::cerr << __func__ << ": recvfrom error: "
<< strerror(errno) << std::endl;
}
}
// Continue listening for more data
startAsyncReceive();
}
void UdpCommandDemuxer::processIncomingData()
{
if (bytesReceived < 2)
{
// Too small to contain any meaningful data
return;
}
// Extract source IP address
char sourceIP[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &senderAddr.sin_addr, sourceIP, INET_ADDRSTRLEN);
2026-05-28 20:13:12 -04:00
if (bytesReceived >= static_cast<ssize_t>(
sizeof(Header) + sizeof(Command)))
{
const uint8_t cmdSet = receiveBuffer[sizeof(Header)];
const uint8_t cmdId = receiveBuffer[sizeof(Header) + 1];
if (tryCompletePendingCommandWait(
sourceIP, cmdSet, cmdId, receiveBuffer, bytesReceived))
{
return;
}
}
// First, find device with matching IP address in DeviceManager collection
2025-10-22 06:17:42 -04:00
for (const auto &device : deviceManager.devices)
{
if (device->discoveredDevice.ipAddr != sourceIP) { continue; }
// Found matching device, route the datagram to it
try
{
device->handleUdpDgram(
receiveBuffer, bytesReceived, senderAddr);
}
catch (const std::exception &e)
{
std::cerr
<< __func__ << ": Device handler exception for IP "
<< sourceIP << ": " << e.what() << std::endl;
}
return;
}
// If not found in DeviceManager, check temporary collection (devices under construction)
auto tempIt = livoxProto1::Device::devicesUnderConstruction.find(sourceIP);
if (tempIt != livoxProto1::Device::devicesUnderConstruction.end())
{
// Extract command set and command ID from the datagram
if (bytesReceived >= static_cast<ssize_t>(
2025-11-12 17:25:55 -04:00
sizeof(livoxProto1::comms::Header)
+ sizeof(livoxProto1::comms::Command)))
{
2025-11-12 17:25:55 -04:00
uint8_t cmd_set = receiveBuffer[
sizeof(livoxProto1::comms::Header)];
uint8_t cmd_id = receiveBuffer[
sizeof(livoxProto1::comms::Header) + 1];
2025-11-12 17:25:55 -04:00
// Found matching dev in temp collection, invoke matching handlers
for (const auto& cmdHandler : tempIt->second)
{
2025-11-12 17:25:55 -04:00
if (cmdHandler.cmd_set != cmd_set
|| cmdHandler.cmd_id != cmd_id)
{
continue;
}
2025-10-30 22:18:02 -04:00
try
{
2025-11-12 17:25:55 -04:00
cmdHandler.handler(
receiveBuffer, bytesReceived, senderAddr);
2025-10-30 22:18:02 -04:00
}
catch (const std::exception &e)
{
2025-11-12 17:25:55 -04:00
std::cerr << __func__ << ": Temporary device handler "
"exception for IP " << sourceIP << ": " << e.what()
<< std::endl;
}
}
}
return;
}
// No device found with matching IP in either collection, discard the data
2025-10-22 06:17:42 -04:00
std::cerr
<< __func__ << ": No device found for source IP "
<< sourceIP << ", discarding datagram" << std::endl;
}
2026-05-28 20:13:12 -04:00
struct UdpCommandDemuxer::PendingCommandWaitDesc
{
CommandWaitKey key;
boost::asio::io_context &resumeIoContext;
2026-05-28 20:13:12 -04:00
std::atomic<bool> settled{false};
UdpCommandResponseResult result{};
std::coroutine_handle<> callerSchedHandle;
PendingCommandWaitDesc(
CommandWaitKey keyIn,
boost::asio::io_context &resumeIoContextIn)
2026-05-28 20:13:12 -04:00
: key(std::move(keyIn)),
resumeIoContext(resumeIoContextIn)
2026-05-28 20:13:12 -04:00
{}
};
void UdpCommandDemuxer::settlePendingCommandWait(
const std::shared_ptr<PendingCommandWaitDesc> &wait,
UdpCommandResponseResult::Outcome outcome,
const uint8_t *data, ssize_t bytesReceived)
{
if (wait->settled.exchange(true)) {
return;
}
wait->result.outcome = outcome;
wait->result.bytesReceived = bytesReceived;
if (outcome == UdpCommandResponseResult::Outcome::Response
&& data != nullptr
&& bytesReceived > 0
&& bytesReceived
<= static_cast<ssize_t>(sizeof(wait->result.buffer)))
{
memcpy(wait->result.buffer, data, bytesReceived);
}
std::coroutine_handle<> handle = wait->callerSchedHandle;
if (!handle) {
return;
}
boost::asio::post(wait->resumeIoContext, handle);
2026-05-28 20:13:12 -04:00
}
std::shared_ptr<UdpCommandDemuxer::PendingCommandWaitDesc>
UdpCommandDemuxer::findAndRemovePendingCommandWait(const CommandWaitKey &key)
{
sscl::SpinLock::Guard guard(pendingWaits.lock);
const auto iterator = pendingWaits.rsrc.pendingWaits.find(key);
if (iterator == pendingWaits.rsrc.pendingWaits.end()) {
return nullptr;
}
std::shared_ptr<PendingCommandWaitDesc> wait = iterator->second;
pendingWaits.rsrc.pendingWaits.erase(iterator);
return wait;
}
void UdpCommandDemuxer::cancelPendingCommandWait(
uint8_t cmdSet, uint8_t cmdId,
const std::string &deviceIp)
{
std::shared_ptr<PendingCommandWaitDesc> wait = findAndRemovePendingCommandWait(
{deviceIp, cmdSet, cmdId});
if (!wait) { return; }
settlePendingCommandWait(
wait,
UdpCommandResponseResult::Outcome::Timeout,
nullptr, -1);
}
bool UdpCommandDemuxer::tryCompletePendingCommandWait(
const char *sourceIp,
uint8_t cmdSet, uint8_t cmdId,
const uint8_t *data, ssize_t bytesReceived)
{
std::shared_ptr<PendingCommandWaitDesc> wait = findAndRemovePendingCommandWait(
{sourceIp, cmdSet, cmdId});
if (!wait) { return false; }
const UdpCommandResponseResult::Outcome outcome =
(bytesReceived > 0
&& bytesReceived
<= static_cast<ssize_t>(sizeof(wait->result.buffer)))
? UdpCommandResponseResult::Outcome::Response
: UdpCommandResponseResult::Outcome::RecvError;
settlePendingCommandWait(wait, outcome, data, bytesReceived);
return true;
}
sscl::co::ViralNonPostingInvoker<UdpCommandResponseResult>
UdpCommandDemuxer::waitForCommandResponseCReq(
uint8_t cmdSet, uint8_t cmdId,
const std::string &deviceIp)
{
const CommandWaitKey key{deviceIp, cmdSet, cmdId};
auto wait = std::make_shared<PendingCommandWaitDesc>(
key, componentThread->getIoContext());
2026-05-28 20:13:12 -04:00
{
sscl::SpinLock::Guard guard(pendingWaits.lock);
pendingWaits.rsrc.pendingWaits[key] = wait;
}
struct PendingCommandWaitDescAwaiter
{
std::shared_ptr<PendingCommandWaitDesc> wait;
bool await_ready() const noexcept
{
return wait->settled.load(std::memory_order_acquire);
}
bool await_suspend(std::coroutine_handle<> caller) noexcept
{
if (wait->settled.load(std::memory_order_acquire)) {
return false;
}
wait->callerSchedHandle = caller;
return true;
}
UdpCommandResponseResult await_resume() const noexcept
{
return wait->result;
}
};
const UdpCommandResponseResult result =
co_await PendingCommandWaitDescAwaiter{wait};
if (findAndRemovePendingCommandWait(key))
{
std::cerr << __func__ << ": pending wait still registered after "
"settle for device " << deviceIp << " (cmd_set="
<< static_cast<int>(cmdSet) << ", cmd_id="
<< static_cast<int>(cmdId) << "); program error"
<< std::endl;
}
co_return result;
}
sscl::co::ViralNonPostingInvoker<UdpCommandResponseResult>
UdpCommandDemuxer::waitForCommandResponseCReq(
uint8_t cmdSet, uint8_t cmdId,
const std::string &deviceIp,
int timeoutMs)
{
/** EXPLANATION:
* We setup an async timer event to detect timeout, and register a UDP
* command handler to wait for the device to respond to the incoming command
* request. If the device does not respond within the timeout period,
* we will consider the command to have failed.
*/
boost::asio::io_context &ioContext = componentThread->getIoContext();
2026-06-09 11:19:42 -04:00
boost::asio::deadline_timer raceTimer(ioContext);
2026-05-28 20:13:12 -04:00
auto timerAwaiter = adapters::boostAsio::getDeadlineTimerAReqAwaiter(
ioContext,
2026-06-09 11:19:42 -04:00
raceTimer,
boost::posix_time::milliseconds(timeoutMs));
2026-05-28 20:13:12 -04:00
auto responseInvoker = waitForCommandResponseCReq(cmdSet, cmdId, deviceIp);
static constexpr int timerMemberSettlementIndex = 0;
sscl::co::Group group;
group.add(timerAwaiter);
group.add(responseInvoker);
co_await group.getAwaitFirstSettlementInvoker();
group.checkForAndReThrowGroupExceptions();
const bool timerWonFirst =
group.s.rsrc.firstSettledInvokerIdx == timerMemberSettlementIndex;
if (timerWonFirst) {
cancelPendingCommandWait(cmdSet, cmdId, deviceIp);
2026-06-09 11:19:42 -04:00
} else {
raceTimer.cancel();
2026-05-28 20:13:12 -04:00
}
/** Group member adapter coros are fire-and-forget; keep group alive until
* both members settle so the loser adapter does not touch freed state.
*/
co_await group.getAwaitAllSettlementsInvoker();
group.checkForAndReThrowGroupExceptions();
if (timerWonFirst)
{
UdpCommandResponseResult timeoutResult;
timeoutResult.outcome = UdpCommandResponseResult::Outcome::Timeout;
co_return timeoutResult;
}
co_return responseInvoker.completedReturnValues().myReturnValue;
}
2025-10-22 06:17:42 -04:00
} // namespace comms
} // namespace livoxProto1