LivoxProto1: ExecuteHandshake uses udpCommandDemuxer

UdpCommandDemuxer also now supports devices "under construction".
This commit is contained in:
2025-10-22 22:13:38 -04:00
parent 01ad1ff073
commit a4d99e5d4d
4 changed files with 231 additions and 170 deletions
+162 -156
View File
@@ -33,6 +33,11 @@
*/ */
namespace livoxProto1 { namespace livoxProto1 {
// Static member definition for devices under construction
std::unordered_map<std::string, std::vector<Device::CommandHandler>>
Device::devicesUnderConstruction;
namespace comms { namespace comms {
DiscoveredDevice::DiscoveredDevice( DiscoveredDevice::DiscoveredDevice(
@@ -104,11 +109,14 @@ pcloudDataFd(-1)
Device::~Device() Device::~Device()
{ {
if (heartbeatActive.load()) { if (heartbeatActive.load())
{
heartbeatActive.store(false); heartbeatActive.store(false);
if (heartbeatTimer) { if (heartbeatTimer) {
heartbeatTimer->cancel(); heartbeatTimer->cancel();
} }
unregisterUdpCommandHandler(0x00, 0x03);
} }
if (pcloudDataActive.load()) { if (pcloudDataActive.load()) {
@@ -444,8 +452,8 @@ public:
std::atomic<SocketState> socketState{SocketState::SOCKET_STILL_WAITING}; std::atomic<SocketState> socketState{SocketState::SOCKET_STILL_WAITING};
std::atomic<bool> handlerExecuted{false}; std::atomic<bool> handlerExecuted{false};
// The stream descriptor that will be returned to the caller // Cmd fd desc. Will be returned to caller (shared with UdpCommandDemuxer)
boost::asio::posix::stream_descriptor handshakeFdDesc; std::shared_ptr<boost::asio::posix::stream_descriptor> cmdEndpointFdDesc;
boost::asio::deadline_timer timeoutTimer; boost::asio::deadline_timer timeoutTimer;
// Received data storage // Received data storage
@@ -457,11 +465,13 @@ public:
public: public:
ExecuteHandshakeReq( ExecuteHandshakeReq(
Device& dev, const std::string& deviceIP, Device& dev, const std::string& deviceIP,
std::shared_ptr<boost::asio::posix::stream_descriptor>
&cmdEndpointFdDesc,
smo::Callback<Device::executeHandshakeReqCbFn> cb) smo::Callback<Device::executeHandshakeReqCbFn> cb)
: smo::NonPostedAsynchronousContinuation<Device::executeHandshakeReqCbFn>( : smo::NonPostedAsynchronousContinuation<Device::executeHandshakeReqCbFn>(
std::move(cb)), std::move(cb)),
device(dev), deviceIP(deviceIP), device(dev), deviceIP(deviceIP),
handshakeFdDesc(device.componentThread->getIoService()), cmdEndpointFdDesc(cmdEndpointFdDesc),
timeoutTimer(device.componentThread->getIoService()) timeoutTimer(device.componentThread->getIoService())
{ {
} }
@@ -507,47 +517,6 @@ public:
} }
private: private:
bool setupSocket()
{
/** EXPLANATION:
* Create non-blocking UDP socket for handshake. We can't use
* boost::asio::socket because it causes a segfault if associated with
* an io_service from the main program (it's a boost bug).
*/
int socketFd = socket(AF_INET, SOCK_DGRAM, 0);
if (socketFd < 0)
{
std::cerr << __func__ << ": Failed to create socket: "
<< strerror(errno) << std::endl;
return false;
}
int flags = fcntl(socketFd, F_GETFL, 0);
if (flags < 0 || fcntl(socketFd, F_SETFL, flags | O_NONBLOCK) < 0)
{
std::cerr << __func__ << ": Failed to set socket non-blocking: "
<< strerror(errno) << std::endl;
return false;
}
// Bind socket to cmdPort so we can receive the handshake response
struct sockaddr_in localAddr;
memset(&localAddr, 0, sizeof(localAddr));
localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = INADDR_ANY;
localAddr.sin_port = htons(device.cmdPort);
if (bind(socketFd, (struct sockaddr*)&localAddr, sizeof(localAddr)) < 0)
{
std::cerr << __func__ << ": Failed to bind socket to port "
<< device.cmdPort << ": " << strerror(errno) << std::endl;
return false;
}
// Assign the socket FD to the stream descriptor
handshakeFdDesc.assign(socketFd);
return true;
}
bool sendHandshakeRequest() bool sendHandshakeRequest()
{ {
@@ -572,7 +541,7 @@ private:
// Send handshake request directly (synchronous) // Send handshake request directly (synchronous)
ssize_t bytesSent = sendto( ssize_t bytesSent = sendto(
handshakeFdDesc.native_handle(), cmdEndpointFdDesc->native_handle(),
&handshakeReq, sizeof(comms::HandshakeRequest), 0, &handshakeReq, sizeof(comms::HandshakeRequest), 0,
(struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); (struct sockaddr*)&deviceAddr, sizeof(deviceAddr));
@@ -590,23 +559,22 @@ private:
const std::shared_ptr<ExecuteHandshakeReq> &request const std::shared_ptr<ExecuteHandshakeReq> &request
) )
{ {
if (!handshakeFdDesc.is_open()) if (!cmdEndpointFdDesc || !cmdEndpointFdDesc->is_open())
{ {
throw std::runtime_error( throw std::runtime_error(
std::string(__func__) + std::string(__func__) +
": handshakeFdDesc is not open; cannot set up async callbacks " ": cmdEndpointFdDesc is null; cannot set up async callbacks "
"for device " + deviceIP + "(" "for device " + deviceIP + "("
+ device.discoveredDevice.deviceIdentifier + ")" + " handshake." + device.discoveredDevice.deviceIdentifier + ")" + " handshake."
"Check socket initialization and bining." "Check UdpCommandDemuxer initialization."
); );
} }
/** EXPLANATION: /** EXPLANATION:
* We setup an async timer event to detect timeout, and wait for the * We setup an async timer event to detect timeout, and register a UDP
* device to respond to the handshake request. If the device does not * command handler to wait for the device to respond to the handshake
* respond within the timeout period, we will consider the handshake * request. If the device does not respond within the timeout period,
* to have failed. * we will consider the handshake to have failed.
*/ */
timeoutTimer.expires_from_now( timeoutTimer.expires_from_now(
boost::posix_time::milliseconds(device.handshakeTimeoutMs)); boost::posix_time::milliseconds(device.handshakeTimeoutMs));
@@ -617,16 +585,19 @@ private:
std::placeholders::_1)); std::placeholders::_1));
/** EXPLANATION: /** EXPLANATION:
* Since we're using POSIX sockets calls on the underlying * Register a UDP command handler for handshake ACK
* native_handle, Let's use async_wait with POLLIN to detect when data * (cmd_set=0x00, cmd_id=0x01).
* is available for reading. * The handler will be called by the UdpCommandDemuxer when a handshake
* response is received.
*/ */
handshakeFdDesc.async_wait( // Add device to temporary collection for devices under construction
boost::asio::posix::stream_descriptor::wait_read, device.registerUdpCommandHandler(
std::bind( 0x00, 0x01,
&ExecuteHandshakeReq::executeHandshakeReq1_2, this, std::bind(&ExecuteHandshakeReq::executeHandshakeReq1_2,
request, this, request,
std::placeholders::_1)); std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3),
deviceIP);
} }
void executeHandshakeReq1_1( void executeHandshakeReq1_1(
@@ -641,40 +612,32 @@ private:
} }
} }
void executeHandshakeReq1_2( // This is called from the UDP command handler
std::shared_ptr<ExecuteHandshakeReq>, void executeHandshakeReq1_2(
const boost::system::error_code& error std::shared_ptr<ExecuteHandshakeReq> context,
const uint8_t* data, ssize_t bytesReceived,
const struct sockaddr_in& senderAddr
) )
{ {
// This is called from the socket read callback // Unregister the handler to decrement refcount
if (error) device.unregisterUdpCommandHandler(0x00, 0x01, context->deviceIP);
// Store the received data
context->bytesReceived = bytesReceived;
context->senderAddr = senderAddr;
context->senderAddrLen = sizeof(senderAddr);
// Copy the data to our buffer
if (bytesReceived > 0
&& bytesReceived <= (ssize_t)sizeof(context->responseBuffer))
{ {
socketState.store(SocketState::SOCKET_ERROR); memcpy(context->responseBuffer, data, bytesReceived);
std::cerr << __func__ << ": Socket read error: " << error.message() context->socketState.store(SocketState::SOCKET_RECV_SUCCESS);
<< std::endl;
} else } else
{ {
// Socket is readable, now actually read the data context->socketState.store(SocketState::SOCKET_RECV_ERROR);
bytesReceived = recvfrom( std::cerr << __func__ << ": Invalid data size: " << bytesReceived
handshakeFdDesc.native_handle(), << std::endl;
responseBuffer, sizeof(responseBuffer), 0,
(struct sockaddr*)&senderAddr, &senderAddrLen);
if (bytesReceived > 0)
{
socketState.store(SocketState::SOCKET_RECV_SUCCESS);
} else if (bytesReceived == 0)
{
socketState.store(SocketState::SOCKET_RECV_ERROR);
std::cerr << __func__ << ": Received 0 bytes from recvfrom"
<< std::endl;
} else
{
socketState.store(SocketState::SOCKET_ERROR);
std::cerr << __func__ << ": recvfrom failed: "
<< strerror(errno) << " (errno: " << errno << ")"
<< std::endl;
}
} }
executeHandshakeReq2(); executeHandshakeReq2();
@@ -778,7 +741,7 @@ private:
// Transfer any successful state to Device // Transfer any successful state to Device
commit(); commit();
int rawFd = handshakeFdDesc.release(); int rawFd = cmdEndpointFdDesc->native_handle();
callOriginalCallback(true, rawFd); callOriginalCallback(true, rawFd);
} }
@@ -791,10 +754,7 @@ private:
void cleanupHandshakeSocket() void cleanupHandshakeSocket()
{ {
int fd = handshakeFdDesc.release(); // Obsolete - socket is managed by shared_ptr in UdpCommandDemuxer
if (fd != -1) {
close(fd);
}
} }
void cleanup() // Clean up transient resources void cleanup() // Clean up transient resources
{ {
@@ -808,9 +768,28 @@ void Device::executeHandshakeReq(
smo::Callback<Device::executeHandshakeReqCbFn> callback smo::Callback<Device::executeHandshakeReqCbFn> callback
) )
{ {
// Get the command endpoint from the UdpCommandDemuxer
auto& protoState = livoxProto1::getProtoState();
if (!protoState.deviceManager)
{
callback.callbackFn(false, -1);
return;
}
auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer
.getCmdEndpointFdDesc();
if (!cmdEndpointFdDesc)
{
std::cerr << __func__ << ": UdpCommandDemuxer not started or no "
"command endpoint available." << std::endl;
callback.callbackFn(false, -1);
return;
}
// Create the handshake request object to hold state and callbacks // Create the handshake request object to hold state and callbacks
auto request = std::make_shared<ExecuteHandshakeReq>( auto request = std::make_shared<ExecuteHandshakeReq>(
*this, deviceIP, std::move(callback)); *this, deviceIP, cmdEndpointFdDesc, std::move(callback));
// Check if detectedSmoListeningIp is empty - this should not happen // Check if detectedSmoListeningIp is empty - this should not happen
if (detectedSmoListeningIp.empty()) if (detectedSmoListeningIp.empty())
@@ -821,11 +800,6 @@ void Device::executeHandshakeReq(
} }
try { try {
if (!request->setupSocket())
{
request->callOriginalCallbackWithFailure();
return;
}
if (!request->sendHandshakeRequest()) if (!request->sendHandshakeRequest())
{ {
request->callOriginalCallbackWithFailure(); request->callOriginalCallbackWithFailure();
@@ -845,6 +819,8 @@ void Device::disconnectReq(smo::Callback<Device::disconnectReqCbFn> callback)
{ {
// Stop heartbeat first // Stop heartbeat first
heartbeatActive.store(false); heartbeatActive.store(false);
// Unregister heartbeat ACK handler
unregisterUdpCommandHandler(0x00, 0x03);
if (heartbeatFd == -1) if (heartbeatFd == -1)
{ {
@@ -975,6 +951,17 @@ std::string Device::generateClientDeviceIpFromSerialNumber(
std::to_string(octet3) + ".1" + lastTwoDigits; std::to_string(octet3) + ".1" + lastTwoDigits;
} }
static void discardHeartbeatAck(
const uint8_t* data, ssize_t bytesReceived,
const struct sockaddr_in& senderAddr
)
{
(void)data;
(void)bytesReceived;
(void)senderAddr;
std::cout << "Got heartbeat ACK\n";
}
void Device::startHeartbeat() void Device::startHeartbeat()
{ {
if (!componentThread || discoveredDevice.ipAddr.empty()) if (!componentThread || discoveredDevice.ipAddr.empty())
@@ -992,6 +979,9 @@ void Device::startHeartbeat()
": Expected to find handshake socket present but didn't find it"); ": Expected to find handshake socket present but didn't find it");
} }
// Register heartbeat ACK handler (cmd_set=0x00, cmd_id=0x03)
registerUdpCommandHandler(0x00, 0x03, discardHeartbeatAck);
// Create heartbeat timer // Create heartbeat timer
heartbeatTimer = std::make_unique<boost::asio::deadline_timer>( heartbeatTimer = std::make_unique<boost::asio::deadline_timer>(
componentThread->getIoService()); componentThread->getIoService());
@@ -1260,10 +1250,6 @@ public:
// The timeout timer. // The timeout timer.
boost::asio::deadline_timer timeoutTimer; boost::asio::deadline_timer timeoutTimer;
/* This wrapper is just to enable us to use boost::stream_descriptor for its
* convenient API when waiting for the enable/disable ACK dgram.
*/
boost::asio::posix::stream_descriptor cmdResponseBoostFdWrapper;
// Received data storage // Received data storage
uint8_t responseBuffer[1024]{}; uint8_t responseBuffer[1024]{};
@@ -1277,8 +1263,7 @@ protected:
smo::Callback<CallbackType> cb) smo::Callback<CallbackType> cb)
: smo::NonPostedAsynchronousContinuation<CallbackType>(std::move(cb)), : smo::NonPostedAsynchronousContinuation<CallbackType>(std::move(cb)),
device(dev), device(dev),
timeoutTimer(device.componentThread->getIoService()), timeoutTimer(device.componentThread->getIoService())
cmdResponseBoostFdWrapper(device.componentThread->getIoService())
{} {}
public: public:
@@ -1293,30 +1278,9 @@ public:
void callOriginalCallbackWithFailure() void callOriginalCallbackWithFailure()
{ {
/**
* EXPLANATION:
* We have to call cleanupCmdResponseFdBoostWrapper() here, specifically
* because there are self-references within this class that need to be
* cleaned up.
*
* The cmdResponseBoostFdWrapper holds a reference to the heartbeat
* socket for async operations. When the sequence fails, we need to
* break this reference to allow proper cleanup.
*
* Hence, we call cleanupCmdResponseFdBoostWrapper() at the point of
* failure.
*/
cleanupCmdResponseFdBoostWrapper();
callOriginalCallback(false); callOriginalCallback(false);
} }
void cleanupCmdResponseFdBoostWrapper()
{
if (cmdResponseBoostFdWrapper.is_open()) {
cmdResponseBoostFdWrapper.release(); // Don't close heartbeat socket
}
}
protected: protected:
bool setupSocket() bool setupSocket()
{ {
@@ -1334,8 +1298,6 @@ protected:
const std::shared_ptr<EnDisablePcloudDataReq<CallbackType>> &request const std::shared_ptr<EnDisablePcloudDataReq<CallbackType>> &request
) )
{ {
cmdResponseBoostFdWrapper.assign(device.heartbeatFd);
// Setup timeout timer // Setup timeout timer
timeoutTimer.expires_from_now( timeoutTimer.expires_from_now(
boost::posix_time::milliseconds(device.handshakeTimeoutMs)); boost::posix_time::milliseconds(device.handshakeTimeoutMs));
@@ -1346,13 +1308,13 @@ protected:
this, request, this, request,
std::placeholders::_1)); std::placeholders::_1));
// Setup async wait for read-ready // Register UDP command handler for sampling response (cmd_set=0x00, cmd_id=0x04)
cmdResponseBoostFdWrapper.async_wait( device.registerUdpCommandHandler(
boost::asio::posix::stream_descriptor::wait_read, 0x00, 0x04,
std::bind( std::bind(&EnDisablePcloudDataReq<CallbackType>::enDisablePcloudDataReq1_2,
&EnDisablePcloudDataReq<CallbackType>::enDisablePcloudDataReq1_2,
this, request, this, request,
std::placeholders::_1)); std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3));
} }
void enDisablePcloudDataReq1_1( void enDisablePcloudDataReq1_1(
@@ -1367,24 +1329,30 @@ protected:
void enDisablePcloudDataReq1_2( void enDisablePcloudDataReq1_2(
std::shared_ptr<EnDisablePcloudDataReq<CallbackType>> context, std::shared_ptr<EnDisablePcloudDataReq<CallbackType>> context,
const boost::system::error_code& error const uint8_t* data, ssize_t bytesReceived,
const struct sockaddr_in& senderAddr
) )
{ {
if (!error) // Unregister the handler to decrement refcount
{ device.unregisterUdpCommandHandler(0x00, 0x04);
// Data is available for reading, perform the actual read
context->bytesReceived = recvfrom(
context->device.heartbeatFd,
context->responseBuffer, sizeof(context->responseBuffer), 0,
(struct sockaddr*)&context->senderAddr, &context->senderAddrLen);
if (context->bytesReceived > 0) // Store the received data
{ context->socketState = SocketState::SOCKET_RECV_SUCCESS; } context->bytesReceived = bytesReceived;
else context->senderAddr = senderAddr;
{ context->socketState = SocketState::SOCKET_RECV_ERROR; } context->senderAddrLen = sizeof(senderAddr);
// Copy the data to our buffer
if (bytesReceived > 0
&& bytesReceived <= (ssize_t)sizeof(context->responseBuffer))
{
memcpy(context->responseBuffer, data, bytesReceived);
context->socketState = SocketState::SOCKET_RECV_SUCCESS;
} else
{
context->socketState = SocketState::SOCKET_RECV_ERROR;
std::cerr << __func__ << ": Invalid data size: " << bytesReceived
<< std::endl;
} }
else
{ context->socketState = SocketState::SOCKET_RECV_ERROR; }
context->enDisablePcloudDataReq2(context); context->enDisablePcloudDataReq2(context);
} }
@@ -1474,7 +1442,6 @@ protected:
void cleanup() void cleanup()
{ {
timeoutTimer.cancel(); timeoutTimer.cancel();
cleanupCmdResponseFdBoostWrapper();
} }
// Pure virtual methods that derived classes must implement // Pure virtual methods that derived classes must implement
@@ -1787,17 +1754,56 @@ void Device::registerUdpCommandHandler(
uint8_t cmd_set, uint8_t cmd_id, uint8_t cmd_set, uint8_t cmd_id,
std::function<void( std::function<void(
const uint8_t* data, ssize_t bytesReceived, const uint8_t* data, ssize_t bytesReceived,
const struct sockaddr_in& senderAddr)> handler const struct sockaddr_in& senderAddr)> handler,
const std::string& deviceIP
) )
{ {
auto key = std::make_pair(cmd_set, cmd_id); auto key = std::make_pair(cmd_set, cmd_id);
udpCommandHandlers[key] = std::move(handler); udpCommandHandlers[key] = handler; // Don't move, we need to copy
/** EXPLANATION:
* Add to temporary collection if deviceIP is provided (not empty)
*/
if (!deviceIP.empty())
{
// Add command-specific handler to the list for this device IP
CommandHandler cmdHandler;
cmdHandler.cmd_set = cmd_set;
cmdHandler.cmd_id = cmd_id;
cmdHandler.handler = std::move(handler);
devicesUnderConstruction[deviceIP].push_back(std::move(cmdHandler));
}
} }
void Device::unregisterUdpCommandHandler(uint8_t cmd_set, uint8_t cmd_id) void Device::unregisterUdpCommandHandler(
uint8_t cmd_set, uint8_t cmd_id, const std::string& deviceIP
)
{ {
auto key = std::make_pair(cmd_set, cmd_id); auto key = std::make_pair(cmd_set, cmd_id);
udpCommandHandlers.erase(key); udpCommandHandlers.erase(key);
/** EXPLANATION:
* Remove from temporary collection if deviceIP is provided (not empty)
*/
if (!deviceIP.empty())
{
auto it = devicesUnderConstruction.find(deviceIP);
if (it != devicesUnderConstruction.end()) {
// Remove the specific command handler for this cmd_set/cmd_id
auto& handlers = it->second;
handlers.erase(
std::remove_if(handlers.begin(), handlers.end(),
[cmd_set, cmd_id](const CommandHandler& h) {
return h.cmd_set == cmd_set && h.cmd_id == cmd_id;
}),
handlers.end());
// If no handlers left for this IP, remove the entire entry
if (handlers.empty()) {
devicesUnderConstruction.erase(it);
}
}
}
} }
} // namespace livoxProto1 } // namespace livoxProto1
+17 -2
View File
@@ -90,6 +90,18 @@ public:
uint8_t smoSubnetNbits; uint8_t smoSubnetNbits;
uint16_t dataPort, cmdPort, imuPort; uint16_t dataPort, cmdPort, imuPort;
// Static collection for devices being constructed (not yet in DeviceManager)
// Maps device IP to list of command-specific UDP handlers for that device
struct CommandHandler {
uint8_t cmd_set;
uint8_t cmd_id;
std::function<void(
const uint8_t* data, ssize_t bytesReceived,
const struct sockaddr_in& senderAddr)> handler;
};
static std::unordered_map<std::string, std::vector<CommandHandler>>
devicesUnderConstruction;
private: private:
// Heartbeat mechanism // Heartbeat mechanism
void startHeartbeat(); void startHeartbeat();
@@ -162,8 +174,11 @@ public:
uint8_t cmd_set, uint8_t cmd_id, uint8_t cmd_set, uint8_t cmd_id,
std::function<void( std::function<void(
const uint8_t* data, ssize_t bytesReceived, const uint8_t* data, ssize_t bytesReceived,
const struct sockaddr_in& senderAddr)> handler); const struct sockaddr_in& senderAddr)> handler,
void unregisterUdpCommandHandler(uint8_t cmd_set, uint8_t cmd_id); const std::string& deviceIP = "");
void unregisterUdpCommandHandler(
uint8_t cmd_set, uint8_t cmd_id, const std::string& deviceIP = "");
private: private:
// Point cloud data setup // Point cloud data setup
+43 -10
View File
@@ -10,6 +10,7 @@
#include "udpCommandDemuxer.h" #include "udpCommandDemuxer.h"
#include "core.h" #include "core.h"
#include "device.h"
namespace livoxProto1 { namespace livoxProto1 {
namespace comms { namespace comms {
@@ -80,10 +81,10 @@ void UdpCommandDemuxer::stop()
timer.cancel(); timer.cancel();
// Close socket and cleanup // Close socket and cleanup
if (socketDesc) if (cmdEndpointFdDesc)
{ {
socketDesc->cancel(); cmdEndpointFdDesc->cancel();
socketDesc.reset(); cmdEndpointFdDesc.reset();
} }
isActive.store(false); isActive.store(false);
@@ -141,7 +142,7 @@ void UdpCommandDemuxer::setupSocket()
} }
// Create boost wrapper for async operations // Create boost wrapper for async operations
socketDesc =std::make_unique<boost::asio::posix::stream_descriptor>( cmdEndpointFdDesc = std::make_shared<boost::asio::posix::stream_descriptor>(
componentThread->getIoService(), socketGuard.getFd()); componentThread->getIoService(), socketGuard.getFd());
// Transfer ownership, prevent auto-close // Transfer ownership, prevent auto-close
@@ -153,7 +154,7 @@ void UdpCommandDemuxer::startAsyncReceive()
if (!isActive.load() || shouldStop.load()) if (!isActive.load() || shouldStop.load())
{ return; } { return; }
socketDesc->async_wait( cmdEndpointFdDesc->async_wait(
boost::asio::posix::stream_descriptor::wait_read, boost::asio::posix::stream_descriptor::wait_read,
std::bind( std::bind(
&UdpCommandDemuxer::onDataReady, this, std::placeholders::_1)); &UdpCommandDemuxer::onDataReady, this, std::placeholders::_1));
@@ -177,7 +178,7 @@ void UdpCommandDemuxer::onDataReady(const boost::system::error_code &error)
// Read the data // Read the data
bytesReceived = recvfrom( bytesReceived = recvfrom(
socketDesc->native_handle(), receiveBuffer, cmdEndpointFdDesc->native_handle(), receiveBuffer,
sizeof(receiveBuffer), 0, sizeof(receiveBuffer), 0,
(struct sockaddr *)&senderAddr, &senderAddrLen); (struct sockaddr *)&senderAddr, &senderAddrLen);
@@ -205,8 +206,8 @@ void UdpCommandDemuxer::onTimerTick(const boost::system::error_code &error)
if (shouldStop.load()) if (shouldStop.load())
{ {
// Stop was called, cancel async operations and stop timer // Stop was called, cancel async operations and stop timer
if (socketDesc) { if (cmdEndpointFdDesc) {
socketDesc->cancel(); cmdEndpointFdDesc->cancel();
} }
timer.cancel(); timer.cancel();
return; return;
@@ -234,7 +235,7 @@ void UdpCommandDemuxer::processIncomingData()
char sourceIP[INET_ADDRSTRLEN]; char sourceIP[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &senderAddr.sin_addr, sourceIP, INET_ADDRSTRLEN); inet_ntop(AF_INET, &senderAddr.sin_addr, sourceIP, INET_ADDRSTRLEN);
// Find device with matching IP address // First, find device with matching IP address in DeviceManager collection
for (const auto &device : deviceManager.devices) for (const auto &device : deviceManager.devices)
{ {
if (device->discoveredDevice.ipAddr != sourceIP) { continue; } if (device->discoveredDevice.ipAddr != sourceIP) { continue; }
@@ -254,7 +255,39 @@ void UdpCommandDemuxer::processIncomingData()
return; return;
} }
// No device found with matching IP, discard the data // If not found in DeviceManager, check temporary collection (devices under construction)
auto tempIt = livoxProto1::Device::devicesUnderConstruction.find(sourceIP);
if (tempIt != livoxProto1::Device::devicesUnderConstruction.end())
{
// Extract command set and command ID from the datagram
if (bytesReceived >= static_cast<ssize_t>(
sizeof(livoxProto1::comms::Header) + sizeof(livoxProto1::comms::Command)))
{
uint8_t cmd_set = receiveBuffer[sizeof(livoxProto1::comms::Header)];
uint8_t cmd_id = receiveBuffer[sizeof(livoxProto1::comms::Header) + 1];
// Found matching device in temporary collection, invoke matching handlers
for (const auto& cmdHandler : tempIt->second)
{
if (cmdHandler.cmd_set == cmd_set && cmdHandler.cmd_id == cmd_id)
{
try
{
cmdHandler.handler(receiveBuffer, bytesReceived, senderAddr);
}
catch (const std::exception &e)
{
std::cerr
<< __func__ << ": Temporary device handler exception for IP "
<< sourceIP << ": " << e.what() << std::endl;
}
}
}
}
return;
}
// No device found with matching IP in either collection, discard the data
std::cerr std::cerr
<< __func__ << ": No device found for source IP " << __func__ << ": No device found for source IP "
<< sourceIP << ", discarding datagram" << std::endl; << sourceIP << ", discarding datagram" << std::endl;
+9 -2
View File
@@ -27,7 +27,7 @@ public:
UdpCommandDemuxer( UdpCommandDemuxer(
const std::shared_ptr<smo::ComponentThread>& componentThread, const std::shared_ptr<smo::ComponentThread>& componentThread,
DeviceManager& deviceManager, DeviceManager& deviceManager,
uint16_t commandPort = 65000); uint16_t commandPort = 56001);
~UdpCommandDemuxer(); ~UdpCommandDemuxer();
@@ -35,6 +35,13 @@ public:
void stop(); void stop();
bool isRunning() const { return isActive.load(); } bool isRunning() const { return isActive.load(); }
// Get shared pointer to command endpoint for handshake use
std::shared_ptr<boost::asio::posix::stream_descriptor>
getCmdEndpointFdDesc() const
{
return cmdEndpointFdDesc;
}
private: private:
void setupSocket(); void setupSocket();
void startAsyncReceive(); void startAsyncReceive();
@@ -47,7 +54,7 @@ private:
uint16_t commandPort; uint16_t commandPort;
// Socket and async objects // Socket and async objects
std::unique_ptr<boost::asio::posix::stream_descriptor> socketDesc; std::shared_ptr<boost::asio::posix::stream_descriptor> cmdEndpointFdDesc;
boost::asio::deadline_timer timer; boost::asio::deadline_timer timer;
// State management // State management