diff --git a/commonLibs/livoxProto1/device.cpp b/commonLibs/livoxProto1/device.cpp index 4c19ba7..2230906 100644 --- a/commonLibs/livoxProto1/device.cpp +++ b/commonLibs/livoxProto1/device.cpp @@ -33,6 +33,11 @@ */ namespace livoxProto1 { + +// Static member definition for devices under construction +std::unordered_map> + Device::devicesUnderConstruction; + namespace comms { DiscoveredDevice::DiscoveredDevice( @@ -104,11 +109,14 @@ pcloudDataFd(-1) Device::~Device() { - if (heartbeatActive.load()) { + if (heartbeatActive.load()) + { heartbeatActive.store(false); if (heartbeatTimer) { heartbeatTimer->cancel(); } + + unregisterUdpCommandHandler(0x00, 0x03); } if (pcloudDataActive.load()) { @@ -444,8 +452,8 @@ public: std::atomic socketState{SocketState::SOCKET_STILL_WAITING}; std::atomic handlerExecuted{false}; - // The stream descriptor that will be returned to the caller - boost::asio::posix::stream_descriptor handshakeFdDesc; + // Cmd fd desc. Will be returned to caller (shared with UdpCommandDemuxer) + std::shared_ptr cmdEndpointFdDesc; boost::asio::deadline_timer timeoutTimer; // Received data storage @@ -457,11 +465,13 @@ public: public: ExecuteHandshakeReq( Device& dev, const std::string& deviceIP, + std::shared_ptr + &cmdEndpointFdDesc, smo::Callback cb) : smo::NonPostedAsynchronousContinuation( std::move(cb)), device(dev), deviceIP(deviceIP), - handshakeFdDesc(device.componentThread->getIoService()), + cmdEndpointFdDesc(cmdEndpointFdDesc), timeoutTimer(device.componentThread->getIoService()) { } @@ -507,47 +517,6 @@ public: } private: - bool setupSocket() - { - /** EXPLANATION: - * Create non-blocking UDP socket for handshake. We can't use - * boost::asio::socket because it causes a segfault if associated with - * an io_service from the main program (it's a boost bug). - */ - int socketFd = socket(AF_INET, SOCK_DGRAM, 0); - if (socketFd < 0) - { - std::cerr << __func__ << ": Failed to create socket: " - << strerror(errno) << std::endl; - return false; - } - - int flags = fcntl(socketFd, F_GETFL, 0); - if (flags < 0 || fcntl(socketFd, F_SETFL, flags | O_NONBLOCK) < 0) - { - std::cerr << __func__ << ": Failed to set socket non-blocking: " - << strerror(errno) << std::endl; - return false; - } - - // Bind socket to cmdPort so we can receive the handshake response - struct sockaddr_in localAddr; - memset(&localAddr, 0, sizeof(localAddr)); - localAddr.sin_family = AF_INET; - localAddr.sin_addr.s_addr = INADDR_ANY; - localAddr.sin_port = htons(device.cmdPort); - - if (bind(socketFd, (struct sockaddr*)&localAddr, sizeof(localAddr)) < 0) - { - std::cerr << __func__ << ": Failed to bind socket to port " - << device.cmdPort << ": " << strerror(errno) << std::endl; - return false; - } - - // Assign the socket FD to the stream descriptor - handshakeFdDesc.assign(socketFd); - return true; - } bool sendHandshakeRequest() { @@ -572,7 +541,7 @@ private: // Send handshake request directly (synchronous) ssize_t bytesSent = sendto( - handshakeFdDesc.native_handle(), + cmdEndpointFdDesc->native_handle(), &handshakeReq, sizeof(comms::HandshakeRequest), 0, (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); @@ -590,23 +559,22 @@ private: const std::shared_ptr &request ) { - if (!handshakeFdDesc.is_open()) + if (!cmdEndpointFdDesc || !cmdEndpointFdDesc->is_open()) { throw std::runtime_error( std::string(__func__) + - ": handshakeFdDesc is not open; cannot set up async callbacks " + ": cmdEndpointFdDesc is null; cannot set up async callbacks " "for device " + deviceIP + "(" + device.discoveredDevice.deviceIdentifier + ")" + " handshake." - "Check socket initialization and bining." + "Check UdpCommandDemuxer initialization." ); } - /** EXPLANATION: - * We setup an async timer event to detect timeout, and wait for the - * device to respond to the handshake request. If the device does not - * respond within the timeout period, we will consider the handshake - * to have failed. + * We setup an async timer event to detect timeout, and register a UDP + * command handler to wait for the device to respond to the handshake + * request. If the device does not respond within the timeout period, + * we will consider the handshake to have failed. */ timeoutTimer.expires_from_now( boost::posix_time::milliseconds(device.handshakeTimeoutMs)); @@ -617,16 +585,19 @@ private: std::placeholders::_1)); /** EXPLANATION: - * Since we're using POSIX sockets calls on the underlying - * native_handle, Let's use async_wait with POLLIN to detect when data - * is available for reading. + * Register a UDP command handler for handshake ACK + * (cmd_set=0x00, cmd_id=0x01). + * The handler will be called by the UdpCommandDemuxer when a handshake + * response is received. */ - handshakeFdDesc.async_wait( - boost::asio::posix::stream_descriptor::wait_read, - std::bind( - &ExecuteHandshakeReq::executeHandshakeReq1_2, this, - request, - std::placeholders::_1)); + // Add device to temporary collection for devices under construction + device.registerUdpCommandHandler( + 0x00, 0x01, + std::bind(&ExecuteHandshakeReq::executeHandshakeReq1_2, + this, request, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3), + deviceIP); } void executeHandshakeReq1_1( @@ -641,40 +612,32 @@ private: } } - void executeHandshakeReq1_2( - std::shared_ptr, - const boost::system::error_code& error + // This is called from the UDP command handler + void executeHandshakeReq1_2( + std::shared_ptr context, + const uint8_t* data, ssize_t bytesReceived, + const struct sockaddr_in& senderAddr ) { - // This is called from the socket read callback - if (error) + // Unregister the handler to decrement refcount + device.unregisterUdpCommandHandler(0x00, 0x01, context->deviceIP); + + // Store the received data + context->bytesReceived = bytesReceived; + context->senderAddr = senderAddr; + context->senderAddrLen = sizeof(senderAddr); + + // Copy the data to our buffer + if (bytesReceived > 0 + && bytesReceived <= (ssize_t)sizeof(context->responseBuffer)) { - socketState.store(SocketState::SOCKET_ERROR); - std::cerr << __func__ << ": Socket read error: " << error.message() - << std::endl; + memcpy(context->responseBuffer, data, bytesReceived); + context->socketState.store(SocketState::SOCKET_RECV_SUCCESS); } else { - // Socket is readable, now actually read the data - bytesReceived = recvfrom( - handshakeFdDesc.native_handle(), - responseBuffer, sizeof(responseBuffer), 0, - (struct sockaddr*)&senderAddr, &senderAddrLen); - - if (bytesReceived > 0) - { - socketState.store(SocketState::SOCKET_RECV_SUCCESS); - } else if (bytesReceived == 0) - { - socketState.store(SocketState::SOCKET_RECV_ERROR); - std::cerr << __func__ << ": Received 0 bytes from recvfrom" - << std::endl; - } else - { - socketState.store(SocketState::SOCKET_ERROR); - std::cerr << __func__ << ": recvfrom failed: " - << strerror(errno) << " (errno: " << errno << ")" - << std::endl; - } + context->socketState.store(SocketState::SOCKET_RECV_ERROR); + std::cerr << __func__ << ": Invalid data size: " << bytesReceived + << std::endl; } executeHandshakeReq2(); @@ -778,7 +741,7 @@ private: // Transfer any successful state to Device commit(); - int rawFd = handshakeFdDesc.release(); + int rawFd = cmdEndpointFdDesc->native_handle(); callOriginalCallback(true, rawFd); } @@ -791,10 +754,7 @@ private: void cleanupHandshakeSocket() { - int fd = handshakeFdDesc.release(); - if (fd != -1) { - close(fd); - } + // Obsolete - socket is managed by shared_ptr in UdpCommandDemuxer } void cleanup() // Clean up transient resources { @@ -808,9 +768,28 @@ void Device::executeHandshakeReq( smo::Callback callback ) { + // Get the command endpoint from the UdpCommandDemuxer + auto& protoState = livoxProto1::getProtoState(); + if (!protoState.deviceManager) + { + callback.callbackFn(false, -1); + return; + } + + auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer + .getCmdEndpointFdDesc(); + + if (!cmdEndpointFdDesc) + { + std::cerr << __func__ << ": UdpCommandDemuxer not started or no " + "command endpoint available." << std::endl; + callback.callbackFn(false, -1); + return; + } + // Create the handshake request object to hold state and callbacks auto request = std::make_shared( - *this, deviceIP, std::move(callback)); + *this, deviceIP, cmdEndpointFdDesc, std::move(callback)); // Check if detectedSmoListeningIp is empty - this should not happen if (detectedSmoListeningIp.empty()) @@ -821,11 +800,6 @@ void Device::executeHandshakeReq( } try { - if (!request->setupSocket()) - { - request->callOriginalCallbackWithFailure(); - return; - } if (!request->sendHandshakeRequest()) { request->callOriginalCallbackWithFailure(); @@ -845,6 +819,8 @@ void Device::disconnectReq(smo::Callback callback) { // Stop heartbeat first heartbeatActive.store(false); + // Unregister heartbeat ACK handler + unregisterUdpCommandHandler(0x00, 0x03); if (heartbeatFd == -1) { @@ -975,6 +951,17 @@ std::string Device::generateClientDeviceIpFromSerialNumber( std::to_string(octet3) + ".1" + lastTwoDigits; } +static void discardHeartbeatAck( + const uint8_t* data, ssize_t bytesReceived, + const struct sockaddr_in& senderAddr + ) +{ + (void)data; + (void)bytesReceived; + (void)senderAddr; + std::cout << "Got heartbeat ACK\n"; +} + void Device::startHeartbeat() { if (!componentThread || discoveredDevice.ipAddr.empty()) @@ -992,6 +979,9 @@ void Device::startHeartbeat() ": Expected to find handshake socket present but didn't find it"); } + // Register heartbeat ACK handler (cmd_set=0x00, cmd_id=0x03) + registerUdpCommandHandler(0x00, 0x03, discardHeartbeatAck); + // Create heartbeat timer heartbeatTimer = std::make_unique( componentThread->getIoService()); @@ -1260,10 +1250,6 @@ public: // The timeout timer. boost::asio::deadline_timer timeoutTimer; - /* This wrapper is just to enable us to use boost::stream_descriptor for its - * convenient API when waiting for the enable/disable ACK dgram. - */ - boost::asio::posix::stream_descriptor cmdResponseBoostFdWrapper; // Received data storage uint8_t responseBuffer[1024]{}; @@ -1277,8 +1263,7 @@ protected: smo::Callback cb) : smo::NonPostedAsynchronousContinuation(std::move(cb)), device(dev), - timeoutTimer(device.componentThread->getIoService()), - cmdResponseBoostFdWrapper(device.componentThread->getIoService()) + timeoutTimer(device.componentThread->getIoService()) {} public: @@ -1293,30 +1278,9 @@ public: void callOriginalCallbackWithFailure() { - /** - * EXPLANATION: - * We have to call cleanupCmdResponseFdBoostWrapper() here, specifically - * because there are self-references within this class that need to be - * cleaned up. - * - * The cmdResponseBoostFdWrapper holds a reference to the heartbeat - * socket for async operations. When the sequence fails, we need to - * break this reference to allow proper cleanup. - * - * Hence, we call cleanupCmdResponseFdBoostWrapper() at the point of - * failure. - */ - cleanupCmdResponseFdBoostWrapper(); callOriginalCallback(false); } - void cleanupCmdResponseFdBoostWrapper() - { - if (cmdResponseBoostFdWrapper.is_open()) { - cmdResponseBoostFdWrapper.release(); // Don't close heartbeat socket - } - } - protected: bool setupSocket() { @@ -1334,8 +1298,6 @@ protected: const std::shared_ptr> &request ) { - cmdResponseBoostFdWrapper.assign(device.heartbeatFd); - // Setup timeout timer timeoutTimer.expires_from_now( boost::posix_time::milliseconds(device.handshakeTimeoutMs)); @@ -1346,13 +1308,13 @@ protected: this, request, std::placeholders::_1)); - // Setup async wait for read-ready - cmdResponseBoostFdWrapper.async_wait( - boost::asio::posix::stream_descriptor::wait_read, - std::bind( - &EnDisablePcloudDataReq::enDisablePcloudDataReq1_2, + // Register UDP command handler for sampling response (cmd_set=0x00, cmd_id=0x04) + device.registerUdpCommandHandler( + 0x00, 0x04, + std::bind(&EnDisablePcloudDataReq::enDisablePcloudDataReq1_2, this, request, - std::placeholders::_1)); + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3)); } void enDisablePcloudDataReq1_1( @@ -1367,24 +1329,30 @@ protected: void enDisablePcloudDataReq1_2( std::shared_ptr> context, - const boost::system::error_code& error + const uint8_t* data, ssize_t bytesReceived, + const struct sockaddr_in& senderAddr ) { - if (!error) - { - // Data is available for reading, perform the actual read - context->bytesReceived = recvfrom( - context->device.heartbeatFd, - context->responseBuffer, sizeof(context->responseBuffer), 0, - (struct sockaddr*)&context->senderAddr, &context->senderAddrLen); + // Unregister the handler to decrement refcount + device.unregisterUdpCommandHandler(0x00, 0x04); - if (context->bytesReceived > 0) - { context->socketState = SocketState::SOCKET_RECV_SUCCESS; } - else - { context->socketState = SocketState::SOCKET_RECV_ERROR; } + // Store the received data + context->bytesReceived = bytesReceived; + context->senderAddr = senderAddr; + context->senderAddrLen = sizeof(senderAddr); + + // Copy the data to our buffer + if (bytesReceived > 0 + && bytesReceived <= (ssize_t)sizeof(context->responseBuffer)) + { + memcpy(context->responseBuffer, data, bytesReceived); + context->socketState = SocketState::SOCKET_RECV_SUCCESS; + } else + { + context->socketState = SocketState::SOCKET_RECV_ERROR; + std::cerr << __func__ << ": Invalid data size: " << bytesReceived + << std::endl; } - else - { context->socketState = SocketState::SOCKET_RECV_ERROR; } context->enDisablePcloudDataReq2(context); } @@ -1474,7 +1442,6 @@ protected: void cleanup() { timeoutTimer.cancel(); - cleanupCmdResponseFdBoostWrapper(); } // Pure virtual methods that derived classes must implement @@ -1787,17 +1754,56 @@ void Device::registerUdpCommandHandler( uint8_t cmd_set, uint8_t cmd_id, std::function handler + const struct sockaddr_in& senderAddr)> handler, + const std::string& deviceIP ) { auto key = std::make_pair(cmd_set, cmd_id); - udpCommandHandlers[key] = std::move(handler); + udpCommandHandlers[key] = handler; // Don't move, we need to copy + + /** EXPLANATION: + * Add to temporary collection if deviceIP is provided (not empty) + */ + if (!deviceIP.empty()) + { + // Add command-specific handler to the list for this device IP + CommandHandler cmdHandler; + cmdHandler.cmd_set = cmd_set; + cmdHandler.cmd_id = cmd_id; + cmdHandler.handler = std::move(handler); + devicesUnderConstruction[deviceIP].push_back(std::move(cmdHandler)); + } } -void Device::unregisterUdpCommandHandler(uint8_t cmd_set, uint8_t cmd_id) +void Device::unregisterUdpCommandHandler( + uint8_t cmd_set, uint8_t cmd_id, const std::string& deviceIP + ) { auto key = std::make_pair(cmd_set, cmd_id); udpCommandHandlers.erase(key); + + /** EXPLANATION: + * Remove from temporary collection if deviceIP is provided (not empty) + */ + if (!deviceIP.empty()) + { + auto it = devicesUnderConstruction.find(deviceIP); + if (it != devicesUnderConstruction.end()) { + // Remove the specific command handler for this cmd_set/cmd_id + auto& handlers = it->second; + handlers.erase( + std::remove_if(handlers.begin(), handlers.end(), + [cmd_set, cmd_id](const CommandHandler& h) { + return h.cmd_set == cmd_set && h.cmd_id == cmd_id; + }), + handlers.end()); + + // If no handlers left for this IP, remove the entire entry + if (handlers.empty()) { + devicesUnderConstruction.erase(it); + } + } + } } } // namespace livoxProto1 diff --git a/commonLibs/livoxProto1/device.h b/commonLibs/livoxProto1/device.h index 7a3223d..5fc357f 100644 --- a/commonLibs/livoxProto1/device.h +++ b/commonLibs/livoxProto1/device.h @@ -90,6 +90,18 @@ public: uint8_t smoSubnetNbits; uint16_t dataPort, cmdPort, imuPort; + // Static collection for devices being constructed (not yet in DeviceManager) + // Maps device IP to list of command-specific UDP handlers for that device + struct CommandHandler { + uint8_t cmd_set; + uint8_t cmd_id; + std::function handler; + }; + static std::unordered_map> + devicesUnderConstruction; + private: // Heartbeat mechanism void startHeartbeat(); @@ -162,8 +174,11 @@ public: uint8_t cmd_set, uint8_t cmd_id, std::function handler); - void unregisterUdpCommandHandler(uint8_t cmd_set, uint8_t cmd_id); + const struct sockaddr_in& senderAddr)> handler, + const std::string& deviceIP = ""); + + void unregisterUdpCommandHandler( + uint8_t cmd_set, uint8_t cmd_id, const std::string& deviceIP = ""); private: // Point cloud data setup diff --git a/commonLibs/livoxProto1/udpCommandDemuxer.cpp b/commonLibs/livoxProto1/udpCommandDemuxer.cpp index dcc020b..1bbe0f0 100644 --- a/commonLibs/livoxProto1/udpCommandDemuxer.cpp +++ b/commonLibs/livoxProto1/udpCommandDemuxer.cpp @@ -10,6 +10,7 @@ #include "udpCommandDemuxer.h" #include "core.h" +#include "device.h" namespace livoxProto1 { namespace comms { @@ -80,10 +81,10 @@ void UdpCommandDemuxer::stop() timer.cancel(); // Close socket and cleanup - if (socketDesc) + if (cmdEndpointFdDesc) { - socketDesc->cancel(); - socketDesc.reset(); + cmdEndpointFdDesc->cancel(); + cmdEndpointFdDesc.reset(); } isActive.store(false); @@ -141,7 +142,7 @@ void UdpCommandDemuxer::setupSocket() } // Create boost wrapper for async operations - socketDesc =std::make_unique( + cmdEndpointFdDesc = std::make_shared( componentThread->getIoService(), socketGuard.getFd()); // Transfer ownership, prevent auto-close @@ -153,7 +154,7 @@ void UdpCommandDemuxer::startAsyncReceive() if (!isActive.load() || shouldStop.load()) { return; } - socketDesc->async_wait( + cmdEndpointFdDesc->async_wait( boost::asio::posix::stream_descriptor::wait_read, std::bind( &UdpCommandDemuxer::onDataReady, this, std::placeholders::_1)); @@ -177,7 +178,7 @@ void UdpCommandDemuxer::onDataReady(const boost::system::error_code &error) // Read the data bytesReceived = recvfrom( - socketDesc->native_handle(), receiveBuffer, + cmdEndpointFdDesc->native_handle(), receiveBuffer, sizeof(receiveBuffer), 0, (struct sockaddr *)&senderAddr, &senderAddrLen); @@ -205,8 +206,8 @@ void UdpCommandDemuxer::onTimerTick(const boost::system::error_code &error) if (shouldStop.load()) { // Stop was called, cancel async operations and stop timer - if (socketDesc) { - socketDesc->cancel(); + if (cmdEndpointFdDesc) { + cmdEndpointFdDesc->cancel(); } timer.cancel(); return; @@ -234,7 +235,7 @@ void UdpCommandDemuxer::processIncomingData() char sourceIP[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &senderAddr.sin_addr, sourceIP, INET_ADDRSTRLEN); - // Find device with matching IP address + // First, find device with matching IP address in DeviceManager collection for (const auto &device : deviceManager.devices) { if (device->discoveredDevice.ipAddr != sourceIP) { continue; } @@ -254,7 +255,39 @@ void UdpCommandDemuxer::processIncomingData() return; } - // No device found with matching IP, discard the data + // If not found in DeviceManager, check temporary collection (devices under construction) + auto tempIt = livoxProto1::Device::devicesUnderConstruction.find(sourceIP); + if (tempIt != livoxProto1::Device::devicesUnderConstruction.end()) + { + // Extract command set and command ID from the datagram + if (bytesReceived >= static_cast( + sizeof(livoxProto1::comms::Header) + sizeof(livoxProto1::comms::Command))) + { + uint8_t cmd_set = receiveBuffer[sizeof(livoxProto1::comms::Header)]; + uint8_t cmd_id = receiveBuffer[sizeof(livoxProto1::comms::Header) + 1]; + + // Found matching device in temporary collection, invoke matching handlers + for (const auto& cmdHandler : tempIt->second) + { + if (cmdHandler.cmd_set == cmd_set && cmdHandler.cmd_id == cmd_id) + { + try + { + cmdHandler.handler(receiveBuffer, bytesReceived, senderAddr); + } + catch (const std::exception &e) + { + std::cerr + << __func__ << ": Temporary device handler exception for IP " + << sourceIP << ": " << e.what() << std::endl; + } + } + } + } + return; + } + + // No device found with matching IP in either collection, discard the data std::cerr << __func__ << ": No device found for source IP " << sourceIP << ", discarding datagram" << std::endl; diff --git a/commonLibs/livoxProto1/udpCommandDemuxer.h b/commonLibs/livoxProto1/udpCommandDemuxer.h index abb8805..7da218c 100644 --- a/commonLibs/livoxProto1/udpCommandDemuxer.h +++ b/commonLibs/livoxProto1/udpCommandDemuxer.h @@ -27,7 +27,7 @@ public: UdpCommandDemuxer( const std::shared_ptr& componentThread, DeviceManager& deviceManager, - uint16_t commandPort = 65000); + uint16_t commandPort = 56001); ~UdpCommandDemuxer(); @@ -35,6 +35,13 @@ public: void stop(); bool isRunning() const { return isActive.load(); } + // Get shared pointer to command endpoint for handshake use + std::shared_ptr + getCmdEndpointFdDesc() const + { + return cmdEndpointFdDesc; + } + private: void setupSocket(); void startAsyncReceive(); @@ -47,7 +54,7 @@ private: uint16_t commandPort; // Socket and async objects - std::unique_ptr socketDesc; + std::shared_ptr cmdEndpointFdDesc; boost::asio::deadline_timer timer; // State management