#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "device.h" #include "protocol.h" #include "core.h" /** EXPLANATION: * This file contains the implementation of the Device class. * * FIXME: * We may need to check how smo-subnet-nbits is used in here because we didn't * actually check to see under what conditions it's required vs optional. Hence * we don't currently enforce correct usage of it, and we just assume that the * livoxGen1's policy of supplying a default value of 24 is correct. */ namespace livoxProto1 { // Static member definition for devices under construction std::unordered_map> Device::devicesUnderConstruction; namespace comms { DiscoveredDevice::DiscoveredDevice( const std::string &deviceIdentifier, DeviceType deviceType, const std::string &ipAddr) : deviceIdentifier(deviceIdentifier), deviceType(deviceType), ipAddr(ipAddr) { } DiscoveredDevice::DiscoveredDevice( const BroadcastMessage &msg, const std::string &ipAddr ) : DiscoveredDevice( reinterpret_cast(msg.broadcast_code), static_cast(msg.dev_type), ipAddr) { } std::string DiscoveredDevice::stringify(void) const { std::ostringstream oss; oss << "DiscoveredDevice{" << "identifier='" << deviceIdentifier << "', " << "ipAddr='" << ipAddr << "', " << "deviceType=" << (int)deviceType << " (" << getDeviceTypeName() << ")" << "}"; return oss.str(); } std::string DiscoveredDevice::getDeviceTypeName(void) const { switch (deviceType) { case DeviceType::Hub: return "Hub"; case DeviceType::Mid40: return "Mid-40"; case DeviceType::Tele15: return "Tele-15"; case DeviceType::Horizon: return "Horizon"; case DeviceType::Mid70: return "Mid-70"; case DeviceType::Avia: return "Avia"; default: return "Unknown"; } } } // namespace comms Device::Device(const std::string &deviceIdentifier, const std::shared_ptr& componentThread, int commandTimeoutMs, int retryDelayMs, const std::string& smoIp, uint8_t smoSubnetNbits, uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort) : discoveredDevice( deviceIdentifier, comms::DeviceType::Mid40, // Initialize empty. IP will be set upon successful connection. ""), nAttachedStimulusProducers(0), componentThread(componentThread), commandTimeoutMs(commandTimeoutMs), retryDelayMs(retryDelayMs), smoIp(smoIp), detectedSmoListeningIp(""), smoSubnetNbits(smoSubnetNbits), dataPort(dataPort), cmdPort(cmdPort), imuPort(imuPort), heartbeatActive(false), pcloudDataActive(false) { } Device::~Device() { stopHeartbeat(); if (pcloudDataActive.load()) { pcloudDataActive.store(false); } heartbeatTimer.reset(); } /** * Device::ConnectReq - Encapsulates all state and resources for async connection sequence * This class manages the overall device connection process including handshake and heartbeat setup */ class Device::ConnectReq : public smo::NonPostedAsynchronousContinuation { private: Device& device; boost::asio::deadline_timer delayTimer; public: ConnectReq(Device& dev, smo::Callback cb) : smo::NonPostedAsynchronousContinuation( std::move(cb)), device(dev), delayTimer(dev.componentThread->getIoService()) {} /** FIXME: * WE need to assign the ipAddr to the Device being connected up. */ // Callback methods for the connection sequence void connectReq1( std::shared_ptr context, bool success, const std::string& ipAddr ) { // Fail early - if handshake failed, try next method if (!success) { if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose) { std::cout << __func__ << ": Trying to connect to device by " << "identifier" << "\n"; } // Try direct connect by device identifier context->device.connectByDeviceIdentifierReq( {context, std::bind(&ConnectReq::connectReq2, context.get(), context, std::placeholders::_1, std::placeholders::_2)}); return; } // Success - store connection info and proceed to next step context->device.discoveredDevice.ipAddr = ipAddr; context->device.startHeartbeat(); context->connectReq3(context, success); } void connectReq2( std::shared_ptr context, bool success, const std::string& ipAddr ) { // Fail early - if this also failed, all connection attempts failed if (!success) { context->callOriginalCallbackWithFailure(); return; } // Success - store connection info and proceed to next step context->device.discoveredDevice.ipAddr = ipAddr; context->device.startHeartbeat(); context->connectReq3(context, success); } void connectReq3( std::shared_ptr context, bool success ) { if (!success) { std::cerr << __func__ << ": Failed to connect to device " "(" << context->device.discoveredDevice.deviceIdentifier << ") @(" << context->device.discoveredDevice.ipAddr << ").\n"; context->callOriginalCallbackWithFailure(); return; } context->callOriginalCallback(success); } // Public accessor for the original callback void callOriginalCallback(bool success) { callOriginalCb(success); } // Wrapper for failure cases void callOriginalCallbackWithFailure() { callOriginalCallback(false); } }; void Device::connectReq(smo::Callback callback) { // Create the connection request object to hold state and callbacks auto request = std::make_shared(*this, std::move(callback)); // Try connecting to known device first if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose) { std::cout << __func__ << ": Trying to connect to known device" << "\n"; } connectToKnownDeviceReq( {request, std::bind( &ConnectReq::connectReq1, request.get(), request, std::placeholders::_1, std::placeholders::_2)}); } class Device::ConnectToKnownDeviceReq : public smo::NonPostedAsynchronousContinuation< Device::connectToKnownDeviceReqCbFn> { public: Device& device; std::string deviceIP; std::shared_ptr deviceInfo; ConnectToKnownDeviceReq(Device& dev, smo::Callback cb) : smo::NonPostedAsynchronousContinuation< Device::connectToKnownDeviceReqCbFn>(std::move(cb)), device(dev) {} // Public accessor for the original callback void callOriginalCallback(bool success, const std::string& ipAddr) { callOriginalCb(success, ipAddr); } // Wrapper for failure cases void callOriginalCallbackWithFailure() { callOriginalCallback(false, ""); } // Callback methods for the connection sequence void connectToKnownDeviceReq1( std::shared_ptr context, bool success ) { // Return the IP address to the caller context->callOriginalCallback(success, context->deviceIP); } }; /** EXPLANATION: * This function is used to connect to a device that is already known to the * broadcastListener. */ void Device::connectToKnownDeviceReq( smo::Callback callback ) { // Create the connection request object to hold state and callbacks auto request = std::make_shared( *this, std::move(callback)); auto& protoState = livoxProto1::getProtoState(); if (!protoState.deviceManager) { request->callOriginalCallbackWithFailure(); return; } // Check if the device is known to the broadcastListener if (!protoState.deviceManager->broadcastListener.deviceExists( request->device.discoveredDevice.deviceIdentifier)) { request->callOriginalCallbackWithFailure(); return; } request->deviceInfo = protoState.deviceManager->broadcastListener.getDevice( request->device.discoveredDevice.deviceIdentifier); if (!request->deviceInfo) { request->callOriginalCallbackWithFailure(); return; } // Use the IP address from the broadcast message request->deviceIP = request->deviceInfo->ipAddr; // Determine the final listening IP address auto smoIpResult = request->device.getSmoIp(request->deviceIP); if (!smoIpResult.has_value()) { // Auto-detection failed, fail early std::cerr << __func__ << ": Failed to detect SMO listening IP for " << "known device (" << request->device.discoveredDevice.deviceIdentifier << ")" << " @(" << request->deviceIP << ").\n"; request->callOriginalCallbackWithFailure(); return; } if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose) { std::cout << __func__ << ": Detected SMO listening IP for known device " << request->device.discoveredDevice.deviceIdentifier << " @(" << request->deviceIP << ") is " << smoIpResult.value() << ". About to try to handshake.\n"; } request->device.detectedSmoListeningIp = smoIpResult.value(); // Execute handshake with the known device using async method request->device.executeHandshakeReq( request->deviceIP, {request, std::bind( &ConnectToKnownDeviceReq::connectToKnownDeviceReq1, request.get(), request, std::placeholders::_1)}); } class Device::ConnectByDeviceIdentifierReq : public smo::NonPostedAsynchronousContinuation< Device::connectByDeviceIdentifierReqCbFn> { public: Device& device; std::string deviceIP; ConnectByDeviceIdentifierReq( Device& dev, smo::Callback cb) : smo::NonPostedAsynchronousContinuation< Device::connectByDeviceIdentifierReqCbFn>( std::move(cb)), device(dev) {} // Public accessor for the original callback void callOriginalCallback(bool success, const std::string& ipAddr) { callOriginalCb(success, ipAddr); } // Wrapper for failure cases void callOriginalCallbackWithFailure() { callOriginalCallback(false, ""); } // Callback methods for the connection sequence void connectByDeviceIdentifierReq1( std::shared_ptr context, bool success ) { // Return the IP address to the caller context->callOriginalCallback(success, context->deviceIP); } }; void Device::connectByDeviceIdentifierReq( smo::Callback callback ) { /** EXPLANATION: * This method uses heuristic device IP construction from the serial number. * This requires smoIp to be provided because: * 1. We need the network prefix to generate a valid device IP address * 2. Without a target device IP, we cannot detect which interface faces the device * 3. Therefore, if smoIp is omitted, heuristic construction is impossible * * If smoIp is not provided, the driver must rely only on broadcast advertisements * from the device (handled by connectToKnownDeviceReq). */ // Check if smoIp is provided - required for heuristic construction if (smoIp.empty()) { callback.callbackFn(false, ""); return; } // Create the connection request object to hold state and callbacks auto request = std::make_shared( *this, std::move(callback)); // Generate device IP from serial number request->deviceIP = generateClientDeviceIpFromSerialNumber( request->device.discoveredDevice.deviceIdentifier); // For heuristic construction, always use the provided smoIp. request->device.detectedSmoListeningIp = request->device.smoIp; if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose) { std::cout << __func__ << ": About to try to connect to device by " << "identifier (" << discoveredDevice.deviceIdentifier << ")" << " at IP (" << smoIp << ").\n"; } // Execute handshake using async method request->device.executeHandshakeReq( request->deviceIP, {request, std::bind( &ConnectByDeviceIdentifierReq::connectByDeviceIdentifierReq1, request.get(), request, std::placeholders::_1)}); } class Device::ExecuteHandshakeReq : public smo::NonPostedAsynchronousContinuation< Device::executeHandshakeReqCbFn> { public: friend void Device::executeHandshakeReq( const std::string& deviceIP, smo::Callback callback); enum class SocketState { SOCKET_STILL_WAITING = 0, SOCKET_ERROR, SOCKET_RECV_SUCCESS, SOCKET_RECV_ERROR }; public: Device& device; std::string deviceIP; // Atomic state flags for async coordination std::atomic timerFired{false}; std::atomic socketState{SocketState::SOCKET_STILL_WAITING}; std::atomic handlerExecuted{false}; // Cmd fd desc. Will be returned to caller (shared with UdpCommandDemuxer) std::shared_ptr cmdEndpointFdDesc; boost::asio::deadline_timer timeoutTimer; // Received data storage uint8_t responseBuffer[1024]{}; ssize_t bytesReceived = -1; struct sockaddr_in senderAddr; socklen_t senderAddrLen = sizeof(senderAddr); public: ExecuteHandshakeReq( Device& dev, const std::string& deviceIP, std::shared_ptr &cmdEndpointFdDesc, smo::Callback cb) : smo::NonPostedAsynchronousContinuation( std::move(cb)), device(dev), deviceIP(deviceIP), cmdEndpointFdDesc(cmdEndpointFdDesc), timeoutTimer(device.componentThread->getIoService()) { } ~ExecuteHandshakeReq() { cleanup(); } // Public accessor for the original callback void callOriginalCallback(bool success) { callOriginalCb(success); } void callOriginalCallbackWithFailure() { /** EXPLANATION: * We have to call cleanupHandshakeSocket() here, specifically because * there are 5 references we need to clean up, and 2 of them are * actually recursive self-references within this class. * * The timer and the handshakeFdDesc are both given recursive * self-referencing shared_ptr's to this class when we eventually set up * the async callbacks for them. * * So merely letting the async continuation sequences go out of scope * won't cause this class to be destroyed. Rather, since the class * references itelf, we have to first break those references. Then and * only then will the shared_ptr's refcount go to 0 and the class will * be destroyed. * * We always unconditionally break the timer's reference inside of * the branch-unifying segment (executeHandshakeReq2), but we generally * only want to break the handshakeFdDesc's reference at the exact * moment when the sequence fails, because if it succeeds, we want to * commit the FD to the Device object being created (for heartbeats). * * Hence, we call cleanupHandshakeSocket() at the point of failure. * To break the self-reference when the sequence is successful, we * manually do that at the end of the sequence. */ cleanupHandshakeSocket(); callOriginalCallback(false); } private: bool sendHandshakeRequest() { /** EXPLANATION: * Prepare handshake request. */ comms::HandshakeRequest handshakeReq( device.detectedSmoListeningIp, device.dataPort, device.cmdPort, device.imuPort); handshakeReq.swapContentsToProtocolEndianness(); handshakeReq.header.setCrc16FromRawBytes(); handshakeReq.header.swapCrc16ToProtocolEndianness(); handshakeReq.footer.crc_32 = handshakeReq.calculateCrc32(); handshakeReq.footer.swapCrc32ToProtocolEndianness(); // Prepare device endpoint struct sockaddr_in deviceAddr; memset(&deviceAddr, 0, sizeof(deviceAddr)); deviceAddr.sin_family = AF_INET; deviceAddr.sin_addr.s_addr = inet_addr(deviceIP.c_str()); deviceAddr.sin_port = htons(65000); // Send handshake request directly (synchronous) ssize_t bytesSent = sendto( cmdEndpointFdDesc->native_handle(), &handshakeReq, sizeof(comms::HandshakeRequest), 0, (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); if (bytesSent < 0) { std::cerr << __func__ << ": Failed to send handshake request: " << strerror(errno) << std::endl; return false; } return true; } void setupAsyncCallbacks( const std::shared_ptr &request ) { if (!cmdEndpointFdDesc || !cmdEndpointFdDesc->is_open()) { throw std::runtime_error( std::string(__func__) + ": cmdEndpointFdDesc is null; cannot set up async callbacks " "for device " + deviceIP + "(" + device.discoveredDevice.deviceIdentifier + ")" + " handshake." "Check UdpCommandDemuxer initialization." ); } /** EXPLANATION: * We setup an async timer event to detect timeout, and register a UDP * command handler to wait for the device to respond to the handshake * request. If the device does not respond within the timeout period, * we will consider the handshake to have failed. */ timeoutTimer.expires_from_now( boost::posix_time::milliseconds(device.commandTimeoutMs)); timeoutTimer.async_wait( std::bind( &ExecuteHandshakeReq::executeHandshakeReq1_1, this, request, std::placeholders::_1)); /** EXPLANATION: * Register a UDP command handler for handshake ACK * (cmd_set=0x00, cmd_id=0x01). * The handler will be called by the UdpCommandDemuxer when a handshake * response is received. */ // Add device to temporary collection for devices under construction device.registerUdpCommandHandler( 0x00, 0x01, std::bind(&ExecuteHandshakeReq::executeHandshakeReq1_2, this, request, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), deviceIP); } void executeHandshakeReq1_1( std::shared_ptr, const boost::system::error_code& error) { // This is called from the timer callback if (!error) // Timeout occurred (not cancelled) { timerFired.store(true); executeHandshakeReq2(); } } void executeHandshakeReq1_2( std::shared_ptr context, const uint8_t* data, ssize_t bytesReceived, const struct sockaddr_in& senderAddr ) { // Store the received data context->bytesReceived = bytesReceived; context->senderAddr = senderAddr; context->senderAddrLen = sizeof(senderAddr); // Copy the data to our buffer if (bytesReceived > 0 && bytesReceived <= (ssize_t)sizeof(context->responseBuffer)) { memcpy(context->responseBuffer, data, bytesReceived); context->socketState.store(SocketState::SOCKET_RECV_SUCCESS); } else { context->socketState.store(SocketState::SOCKET_RECV_ERROR); std::cerr << __func__ << ": Invalid data size: " << bytesReceived << std::endl; } executeHandshakeReq2(); } void executeHandshakeReq2() { // Ensure we only execute once using atomic exchange if (handlerExecuted.exchange(true) == true) { return; } // Cancel timer if still running timeoutTimer.cancel(); device.unregisterUdpCommandHandler(0x00, 0x01, deviceIP); // Examine the flags and decide what happened SocketState finalSocketState = socketState.load(); bool finalTimerFired = timerFired.load(); // Check for timeout only if there was no socket activity if (finalTimerFired && finalSocketState == SocketState::SOCKET_STILL_WAITING) { std::cerr << __func__ << ": Command timeout with " << deviceIP << "(" << device.discoveredDevice.deviceIdentifier << ")" << std::endl; callOriginalCallbackWithFailure(); return; } // Socket error from boost::asio if (finalSocketState == SocketState::SOCKET_ERROR) { std::cerr << __func__ << ": Socket error during handshake with " << deviceIP << std::endl; callOriginalCallbackWithFailure(); return; } if (finalSocketState == SocketState::SOCKET_RECV_ERROR) { std::cerr << __func__ << ": Receive error during handshake with " << deviceIP << std::endl; callOriginalCallbackWithFailure(); return; } /* Result must have been RECV_SUCCESS state if we reach here. * Data was already read in the async callback, just validate it */ if (bytesReceived < (ssize_t)sizeof(comms::HandshakeResponse)) { std::cerr << __func__ << ": Response of size " << bytesReceived << " too small from " << deviceIP << std::endl; callOriginalCallbackWithFailure(); return; } comms::HandshakeResponse* resp = reinterpret_cast(responseBuffer); /** EXPLANATION: * Following the clean receiving flow: * 1. Swap CRC32 to host endianness first * 2. Validate CRC32 * 3. Swap CRC16 to host endianness * 4. Validate CRC16 * 5. Swap content to host endianness */ resp->footer.swapCrc32ToHostEndianness(); if (!resp->validateCrc32()) { std::cerr << __func__ << ": CRC32 validation failed from " << deviceIP << std::endl; callOriginalCallbackWithFailure(); return; } resp->header.swapCrc16ToHostEndianness(); if (!resp->header.validateCrc16()) { std::cerr << __func__ << ": CRC16 validation failed from " << deviceIP << std::endl; callOriginalCallbackWithFailure(); return; } resp->swapContentsToHostEndianness(); if (!resp->sanityCheck() || resp->ret_code != 0x00) { std::cerr << __func__ << ": Invalid response from " << deviceIP << std::endl; callOriginalCallbackWithFailure(); return; } if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose) { std::cout << __func__ << ": Handshake successful with " << deviceIP << "(" << device.discoveredDevice.deviceIdentifier << ")" << "\n"; } // Transfer any successful state to Device commit(); callOriginalCallback(true); } void commit() // Transfer successful state to Device object { // Clean up resources (timer) but not the socket FD // The socket FD is returned to the caller, not transferred to the device timeoutTimer.cancel(); } void cleanupHandshakeSocket() { // Obsolete - socket is managed by shared_ptr in UdpCommandDemuxer } void cleanup() // Clean up transient resources { timeoutTimer.cancel(); cleanupHandshakeSocket(); } }; void Device::executeHandshakeReq( const std::string& deviceIP, smo::Callback callback ) { // Get the command endpoint from the UdpCommandDemuxer auto& protoState = livoxProto1::getProtoState(); if (!protoState.deviceManager) { callback.callbackFn(false); return; } auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer .getCmdEndpointFdDesc(); if (!cmdEndpointFdDesc) { std::cerr << __func__ << ": UdpCommandDemuxer not started or no " "command endpoint available." << std::endl; callback.callbackFn(false); return; } // Create the handshake request object to hold state and callbacks auto request = std::make_shared( *this, deviceIP, cmdEndpointFdDesc, std::move(callback)); // Check if detectedSmoListeningIp is empty - this should not happen if (detectedSmoListeningIp.empty()) { // This should not happen as it should be set by the calling method request->callOriginalCallbackWithFailure(); return; } try { if (!request->sendHandshakeRequest()) { request->callOriginalCallbackWithFailure(); return; } request->setupAsyncCallbacks(request); } catch (const std::exception& e) { std::cerr << __func__ << ": Handshake failed with " << deviceIP << ": " << e.what() << std::endl; request->callOriginalCallbackWithFailure(); } } void Device::disconnectReq(smo::Callback callback) { // Stop heartbeat first stopHeartbeat(); if (discoveredDevice.ipAddr.empty()) { std::cout << __func__ << ": No device IP available, skipping " "disconnect message" << std::endl; callback.callbackFn(true); return; } // Get the command endpoint from the UdpCommandDemuxer auto& protoState = livoxProto1::getProtoState(); if (!protoState.deviceManager) { std::cout << __func__ << ": No device manager available, skipping " "disconnect message" << std::endl; callback.callbackFn(true); return; } auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer .getCmdEndpointFdDesc(); if (!cmdEndpointFdDesc) { std::cout << __func__ << ": No command endpoint available, skipping " "disconnect message" << std::endl; callback.callbackFn(true); return; } // Create disconnect message comms::DisconnectMessage disconnectMsg; disconnectMsg.swapContentsToProtocolEndianness(); disconnectMsg.header.setCrc16FromRawBytes(); disconnectMsg.header.swapCrc16ToProtocolEndianness(); disconnectMsg.footer.crc_32 = disconnectMsg.calculateCrc32(); disconnectMsg.footer.swapCrc32ToProtocolEndianness(); // Set up destination address struct sockaddr_in deviceAddr; deviceAddr.sin_family = AF_INET; deviceAddr.sin_addr.s_addr = inet_addr(discoveredDevice.ipAddr.c_str()); deviceAddr.sin_port = htons(65000); // Commands go to port 65000 // Send disconnect message ssize_t bytesSent = sendto( cmdEndpointFdDesc->native_handle(), &disconnectMsg, sizeof(disconnectMsg), 0, (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); if (bytesSent < 0) { std::cerr << __func__ << ": Failed to send disconnect message: " << strerror(errno) << std::endl; // Continue with disconnect even if message send fails } std::cout << __func__ << ": Sent disconnect message to " << discoveredDevice.ipAddr << ":" << 65000 << std::endl; callback.callbackFn(true); } std::string Device::generateClientDeviceIpFromSerialNumber( const std::string& broadcastCode ) { /** EXPLANATION: * The input string is either a serial number (14 chars) or a broadcast code * (15 chars). We need to determine which one it is and extract the serial * number from the broadcast code. * * To generate a default IP address, we use the device's subnet: X.X.X.1XX * where XX = last two digits of serial. We use the smoIp and smoSubnetNbits * to determine the network prefix. */ if (broadcastCode.empty()) { throw std::invalid_argument( std::string(__func__) + ": Broadcast code cannot be empty"); } std::string serialNumber; if (broadcastCode.length() == 14) { // Input is a serial number serialNumber = broadcastCode; } else if (broadcastCode.length() == 15) { // Input is a broadcast code (serial + selector) serialNumber = broadcastCode.substr(0, 14); } else { // Invalid length throw std::invalid_argument( std::string(__func__) + ": Broadcast code must be 14 or 15 characters long"); } // Extract last two digits of serial number if (serialNumber.length() < 2) { throw std::invalid_argument( std::string(__func__) + ": Serial number too short"); } std::string lastTwoDigits = serialNumber.substr(serialNumber.length() - 2); // Validate that last two characters are digits if (lastTwoDigits[0] < '0' || lastTwoDigits[0] > '9' || lastTwoDigits[1] < '0' || lastTwoDigits[1] > '9') { throw std::invalid_argument( std::string(__func__) + ": Last two characters of serial number must be digits"); } /** EXPLANATION: * Use the device's subnet: X.X.X.1XX where XX = last two digits of serial. * We use the smoIp and smoSubnetNbits to determine the network prefix. */ // Parse smoIp to extract network prefix auto smoIpOctets = comms::parseIPv4Address(smoIp); if (!smoIpOctets.has_value()) { throw std::invalid_argument( std::string(__func__) + ": Invalid smoIp format: must be X.X.X.X"); } // Generate subnet mask based on nbits uint32_t subnetMask = getSubnetMaskFor(smoSubnetNbits); uint32_t smoIpAddr = (std::stoi(smoIpOctets->octet1) << 24) | (std::stoi(smoIpOctets->octet2) << 16) | (std::stoi(smoIpOctets->octet3) << 8) | std::stoi(smoIpOctets->octet4); // Apply subnet mask to get network prefix uint32_t networkPrefix = smoIpAddr & subnetMask; // Extract octets from network prefix uint8_t octet1 = (networkPrefix >> 24) & 0xFF; uint8_t octet2 = (networkPrefix >> 16) & 0xFF; uint8_t octet3 = (networkPrefix >> 8) & 0xFF; // Use the first three octets and append "1" + last two digits return std::to_string(octet1) + "." + std::to_string(octet2) + "." + std::to_string(octet3) + ".1" + lastTwoDigits; } static void discardHeartbeatAck( const uint8_t* data, ssize_t bytesReceived, const struct sockaddr_in& senderAddr ) { (void)senderAddr; // Check if we have enough data for a HeartbeatACK message if (bytesReceived < static_cast(sizeof(livoxProto1::comms::HeartbeatACK))) { std::cout << __func__ << ": Received heartbeat ACK with insufficient " "data (" << bytesReceived << " bytes, expected " << sizeof(livoxProto1::comms::HeartbeatACK) << ")" << std::endl; return; } // Directly use a non-const reference to HeartbeatACK structure livoxProto1::comms::HeartbeatACK& ack = *reinterpret_cast( const_cast(data)); ack.swapContentsToHostEndianness(); if (!ack.validateCrc32()) { std::cerr << __func__ << ": Discarded heartbeat ACK - CRC32 validation " "failed" << std::endl; return; } if (!ack.header.validateCrc16()) { std::cerr << __func__ << ": Discarded heartbeat ACK - CRC16 validation " "failed" << std::endl; return; } if (!ack.sanityCheck()) { std::cerr << __func__ << ": Discarded heartbeat ACK - sanity check " "failed" << std::endl; return; } if (ack.work_state == 0x01) { return; } // Print work_state with human-readable description std::string workStateStr; switch (ack.work_state) { case 0x00: workStateStr = "Initializing"; break; case 0x01: workStateStr = "Normal"; break; case 0x02: workStateStr = "Power-Saving"; break; case 0x03: workStateStr = "Standby"; break; case 0x04: workStateStr = "Error"; break; default: workStateStr = "Unknown"; break; } std::cerr << __func__ << ": Lidar not ready for operation: work_state: 0x" << std::hex << static_cast(ack.work_state) << std::dec << " (" << workStateStr << "), ack_msg: 0x" << std::hex << ack.ack_msg << std::dec << std::endl; } void Device::startHeartbeat() { if (!componentThread || discoveredDevice.ipAddr.empty()) { throw std::runtime_error( std::string(__func__) + ": Can't start heartbeat without component thread or IP"); } // Register heartbeat ACK handler (cmd_set=0x00, cmd_id=0x03) smo::SpinLock::Guard lock(heartbeatActiveLock); registerUdpCommandHandler( 0x00, 0x03, discardHeartbeatAck, discoveredDevice.ipAddr); // Create heartbeat timer heartbeatTimer = std::make_unique( componentThread->getIoService()); heartbeatActive.store(true); // Send first heartbeat immediately sendHeartbeat(); } void Device::stopHeartbeat() { { smo::SpinLock::Guard lock(heartbeatActiveLock); heartbeatActive.store(false); unregisterUdpCommandHandler(0x00, 0x03, discoveredDevice.ipAddr); } if (heartbeatTimer) { heartbeatTimer->cancel(); heartbeatTimer.reset(); } } void Device::sendHeartbeat() { if (!heartbeatActive.load()) { std::cerr << __func__ << ": Ending heartbeat loop due to " "heartbeatActive==false.\n"; return; } if (discoveredDevice.ipAddr.empty()) { std::cerr << __func__ << ": Ending heartbeat loop due to " "discoveredDevice.ipAddr.empty().\n"; return; } // Get the command endpoint from the UdpCommandDemuxer auto& protoState = livoxProto1::getProtoState(); if (!protoState.deviceManager) { std::cerr << __func__ << ": No device manager available\n"; return; } auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer .getCmdEndpointFdDesc(); if (!cmdEndpointFdDesc) { std::cerr << __func__ << ": No command endpoint available\n"; return; } try { comms::HeartbeatMessage heartbeatMsg; heartbeatMsg.swapContentsToProtocolEndianness(); heartbeatMsg.header.setCrc16FromRawBytes(); heartbeatMsg.header.swapCrc16ToProtocolEndianness(); heartbeatMsg.footer.crc_32 = heartbeatMsg.calculateCrc32(); heartbeatMsg.footer.swapCrc32ToProtocolEndianness(); // Set up destination address for raw socket struct sockaddr_in deviceAddr; memset(&deviceAddr, 0, sizeof(deviceAddr)); deviceAddr.sin_family = AF_INET; deviceAddr.sin_addr.s_addr = inet_addr(discoveredDevice.ipAddr.c_str()); // Heartbeats and commands go to port 65000 deviceAddr.sin_port = htons(65000); ssize_t bytesSent = sendto( cmdEndpointFdDesc->native_handle(), &heartbeatMsg, sizeof(heartbeatMsg), 0, (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); if (bytesSent < 0) { std::cerr << "[" << __func__ << "] Failed to send heartbeat: " << strerror(errno) << std::endl; return; } /** EXPLANATION: * Schedule next heartbeat in 1 second, per the spec. */ heartbeatTimer->expires_from_now(boost::posix_time::seconds(1)); heartbeatTimer->async_wait( [this](const boost::system::error_code& error) { onHeartbeatTimer(error); } ); } catch (const std::exception& e) { std::cerr << __func__ << ": Heartbeat send failed for device " << discoveredDevice.deviceIdentifier << ": " << e.what() << std::endl; } } void Device::onHeartbeatTimer(const boost::system::error_code& error) { // Timer was cancelled, heartbeat stopped if (error == boost::asio::error::operation_aborted) { return; } if (error) { std::cerr << "[" << __func__ << "] Heartbeat timer error for device " << discoveredDevice.deviceIdentifier << ": " << error.message() << std::endl; return; } // Send next heartbeat { smo::SpinLock::Guard lock(heartbeatActiveLock); if (!heartbeatActive.load()) { return; } sendHeartbeat(); } } uint32_t Device::getSubnetMaskFor(uint8_t nbits) { if (nbits > 32) { throw std::invalid_argument( std::string(__func__) + ": nbits must be between 0 and 32"); } // Generate subnet mask: set the first nbits to 1, rest to 0 if (nbits == 0) { return 0x00000000; } else if (nbits == 32) { return 0xFFFFFFFF; } else { // Create mask with nbits set to 1 from the left return (0xFFFFFFFF << (32 - nbits)); } } std::optional Device::detectSmoIp(const std::string& deviceIP) { /** EXPLANATION: * This function detects the SMO IP address of the interface that's facing * the device by iterating through all network interfaces and checking for * the interface that has the IP address in the same subnet as the device's * IP address. */ try { // Parse the device IP to get the network prefix auto deviceIpOctets = comms::parseIPv4Address(deviceIP); if (!deviceIpOctets.has_value()) { return std::nullopt; } // Convert device IP octets to integers for bitwise operations uint32_t deviceIpAddr = (std::stoi(deviceIpOctets->octet1) << 24) | (std::stoi(deviceIpOctets->octet2) << 16) | (std::stoi(deviceIpOctets->octet3) << 8) | std::stoi(deviceIpOctets->octet4); // Generate subnet mask based on nbits uint32_t subnetMask = getSubnetMaskFor(smoSubnetNbits); /* Get all network interfaces using getifaddrs (Linux/Unix specific) * * FIXME: Add Windows support using GetAdaptersAddresses when porting */ struct ifaddrs *ifaddr; if (getifaddrs(&ifaddr) == -1) { return std::nullopt; } // Use unique_ptr for automatic cleanup (RAII) to free ifaddrs auto ifaddr_deleter = [](struct ifaddrs* ptr) { freeifaddrs(ptr); }; std::unique_ptr ifaddr_ptr( ifaddr, ifaddr_deleter); std::string found_ip; /** EXPLANATION: * Iterate through all network interfaces and check if the IP address is * in the same subnet as the device's IP address. */ for (struct ifaddrs *ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next) { if (ifa->ifa_addr == nullptr) continue; // Check if it's IPv4 if (ifa->ifa_addr->sa_family != AF_INET) { continue; } // Get the IPv4 address struct sockaddr_in* addr_in = (struct sockaddr_in*)ifa->ifa_addr; char ip_str[INET_ADDRSTRLEN]; if (inet_ntop( AF_INET, &addr_in->sin_addr, ip_str, INET_ADDRSTRLEN) == nullptr) { continue; } std::string ip = ip_str; // Check if this IP is in the same subnet auto ipOctets = comms::parseIPv4Address(ip); if (!ipOctets.has_value()) { continue; } // Convert IP octets to integer uint32_t ipAddr = (std::stoi(ipOctets->octet1) << 24) | (std::stoi(ipOctets->octet2) << 16) | (std::stoi(ipOctets->octet3) << 8) | std::stoi(ipOctets->octet4); /* Check if this iface's IP is in the same subnet as the device's IP * using the calculated mask. Only compare the bits that are set in * the subnet mask. */ if ((ipAddr & subnetMask) == (deviceIpAddr & subnetMask)) { found_ip = ip; break; } } // Return the found IP (empty string if none found) if (!found_ip.empty()) { return found_ip; } return std::nullopt; } catch (const std::exception& e) { std::cerr << "Error detecting SMO IP: " << e.what() << std::endl; return std::nullopt; } } std::optional Device::getSmoIp(const std::string& deviceIP) { /** EXPLANATION: * This is only used when connecting to a device that's already known to * the broadcastListener. * It is NOT and SHOULD not be used when connecting by heuristic IP * construction using the client device's serial number. * * Determines the SMO listening IP address for this device. * If smoIp is provided, validate it against detected IP and return it. * If smoIp is empty, attempt auto-detection based on device IP. * Returns std::optional - empty if detection fails. */ auto detectedIp = detectSmoIp(deviceIP); if (!smoIp.empty()) { // smoIp was provided, validate it against detected IP if (detectedIp.has_value() && detectedIp.value() != smoIp) { // Print warning if provided smoIp doesn't match detected IP std::cerr << "Warning: Provided smo-ip (" << smoIp << ") doesn't match detected IP (" << detectedIp.value() << ") for device " << discoveredDevice.deviceIdentifier << " @(" << deviceIP << ")" << ". Using provided smo-ip anyway." << std::endl; } return smoIp; } if (detectedIp.has_value()) { return detectedIp.value(); } // Auto-detection failed return std::nullopt; } // Base class for both enable and disable pcloud data requests template class EnDisablePcloudDataReq : public smo::NonPostedAsynchronousContinuation { public: enum class SocketState { SOCKET_STILL_WAITING = 0, SOCKET_ERROR, SOCKET_RECV_SUCCESS, SOCKET_RECV_ERROR }; public: Device& device; // Atomic state flags for async coordination std::atomic timerFired{false}; std::atomic socketState{SocketState::SOCKET_STILL_WAITING}; std::atomic handlerExecuted{false}; // The timeout timer. boost::asio::deadline_timer timeoutTimer; // Received data storage uint8_t responseBuffer[1024]{}; ssize_t bytesReceived = -1; struct sockaddr_in senderAddr; socklen_t senderAddrLen = sizeof(senderAddr); protected: EnDisablePcloudDataReq( Device& dev, smo::Callback cb) : smo::NonPostedAsynchronousContinuation(std::move(cb)), device(dev), timeoutTimer(device.componentThread->getIoService()) {} public: virtual ~EnDisablePcloudDataReq() { cleanup(); } // Public accessor for the original callback void callOriginalCallback(bool success) { this->callOriginalCb(success); } void callOriginalCallbackWithFailure() { callOriginalCallback(false); } protected: void setupAsyncCallbacks( const std::shared_ptr> &request ) { // Setup timeout timer timeoutTimer.expires_from_now( boost::posix_time::milliseconds(device.commandTimeoutMs)); timeoutTimer.async_wait( std::bind( &EnDisablePcloudDataReq::enDisablePcloudDataReq1_1, this, request, std::placeholders::_1)); // Register UDP command handler for sampling response (cmd_set=0x00, cmd_id=0x04) device.registerUdpCommandHandler( 0x00, 0x04, std::bind( &EnDisablePcloudDataReq::enDisablePcloudDataReq1_2, this, request, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), device.discoveredDevice.ipAddr); } void enDisablePcloudDataReq1_1( std::shared_ptr> context, const boost::system::error_code& error ) { (void)error; // Suppress unused parameter warning context->timerFired = true; context->enDisablePcloudDataReq2(context); } void enDisablePcloudDataReq1_2( std::shared_ptr> context, const uint8_t* data, ssize_t bytesReceived, const struct sockaddr_in& senderAddr ) { // Store the received data context->bytesReceived = bytesReceived; context->senderAddr = senderAddr; context->senderAddrLen = sizeof(senderAddr); // Copy the data to our buffer if (bytesReceived > 0 && bytesReceived <= (ssize_t)sizeof(context->responseBuffer)) { memcpy(context->responseBuffer, data, bytesReceived); context->socketState = SocketState::SOCKET_RECV_SUCCESS; } else { context->socketState = SocketState::SOCKET_RECV_ERROR; std::cerr << __func__ << ": Invalid data size: " << bytesReceived << std::endl; } context->enDisablePcloudDataReq2(context); } void enDisablePcloudDataReq2( std::shared_ptr> context ) { // Only execute once if (context->handlerExecuted.exchange(true)) { return; } context->timeoutTimer.cancel(); device.unregisterUdpCommandHandler( 0x00, 0x04, device.discoveredDevice.ipAddr); SocketState finalSocketState = context->socketState.load(); bool finalTimerFired = context->timerFired.load(); if (finalTimerFired && finalSocketState == SocketState::SOCKET_STILL_WAITING) { std::cerr << __func__ << ": Command timeout for device " << context->device.discoveredDevice.deviceIdentifier << std::endl; context->callOriginalCallbackWithFailure(); return; } if (finalSocketState == SocketState::SOCKET_ERROR) { std::cerr << __func__ << ": Socket error during command for device " << context->device.discoveredDevice.deviceIdentifier << std::endl; context->callOriginalCallbackWithFailure(); return; } if (finalSocketState == SocketState::SOCKET_RECV_ERROR) { std::cerr << __func__ << ": Receive error during command for device " << context->device.discoveredDevice.deviceIdentifier << std::endl; context->callOriginalCallbackWithFailure(); return; } // Result must have been RECV_SUCCESS state if we reach here if (context->bytesReceived < (ssize_t)sizeof(livoxProto1::comms::SamplingResponse)) { std::cerr << __func__ << ": Response of size " << context->bytesReceived << " is too small for sampling response (expected " << sizeof(livoxProto1::comms::SamplingResponse) << ")" << std::endl; context->callOriginalCallbackWithFailure(); return; } // Parse response using protocol structure livoxProto1::comms::SamplingResponse* response = reinterpret_cast( context->responseBuffer); response->swapContentsToHostEndianness(); if (!response->sanityCheck()) { std::cerr << __func__ << ": Invalid sampling response structure.\n"; context->callOriginalCallbackWithFailure(); return; } // Check for error first, return early on failure if (!(response->command.cmd_set == 0x00 && response->command.cmd_id == 0x04 && response->ret_code == 0x00)) { if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose) { std::cout << __func__ << ": Failed to en/disable pcloud data " "for device " "(" << context->device.discoveredDevice.deviceIdentifier << ") @(" << context->device.discoveredDevice.ipAddr << "). " << "cmd_set: " << (int)response->command.cmd_set << ", cmd_id: " << (int)response->command.cmd_id << ", ret_code: " << (int)response->ret_code << "\n"; } context->callOriginalCallbackWithFailure(); return; } context->setPcloudDataActiveState(); context->callOriginalCallback(true); } void cleanup() { timeoutTimer.cancel(); } // Pure virtual methods that derived classes must implement virtual uint8_t getEnableFlag() const = 0; virtual const char* getCommandName() const = 0; virtual void setPcloudDataActiveState() = 0; // Common sendCommand implementation bool sendCommand() { // Create start/stop sampling message using protocol structure livoxProto1::comms::StartStopSamplingMessage message; // Set enable flag based on derived class implementation message.enable = getEnableFlag(); message.swapContentsToProtocolEndianness(); message.header.setCrc16FromRawBytes(); message.header.swapCrc16ToProtocolEndianness(); message.footer.crc_32 = message.calculateCrc32(); message.footer.swapCrc32ToProtocolEndianness(); struct sockaddr_in deviceAddr; memset(&deviceAddr, 0, sizeof(deviceAddr)); deviceAddr.sin_family = AF_INET; deviceAddr.sin_addr.s_addr = inet_addr(device.discoveredDevice.ipAddr.c_str()); deviceAddr.sin_port = htons(65000); // Command port // Send command directly (synchronous) // Get the command endpoint from the UdpCommandDemuxer auto& protoState = livoxProto1::getProtoState(); if (!protoState.deviceManager) { std::cerr << __func__ << ": No device manager available" << std::endl; return false; } auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer .getCmdEndpointFdDesc(); if (!cmdEndpointFdDesc) { std::cerr << __func__ << ": No command endpoint available" << std::endl; return false; } ssize_t bytesSent = sendto( cmdEndpointFdDesc->native_handle(), &message, sizeof(message), 0, (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); if (bytesSent < 0) { std::cerr << __func__ << ": Failed to send " << getCommandName() << " command: " << strerror(errno) << std::endl; return false; } return true; } }; class Device::EnablePcloudDataReq : public EnDisablePcloudDataReq { public: friend void Device::enablePcloudDataReq( smo::Callback callback); EnablePcloudDataReq( Device& dev, smo::Callback cb) : EnDisablePcloudDataReq(dev, std::move(cb)) {} ~EnablePcloudDataReq() { cleanup(); } private: uint8_t getEnableFlag() const override { return 0x01; // Start sampling } const char* getCommandName() const override { return "enable pcloud data"; } void setPcloudDataActiveState() override { device.pcloudDataActive.store(true); } }; class Device::DisablePcloudDataReq : public EnDisablePcloudDataReq { public: friend void Device::disablePcloudDataReq( smo::Callback callback); DisablePcloudDataReq( Device& dev, smo::Callback cb) : EnDisablePcloudDataReq( dev, std::move(cb)) {} ~DisablePcloudDataReq() { cleanup(); } private: uint8_t getEnableFlag() const override { return 0x00; // Stop sampling } const char* getCommandName() const override { return "disable pcloud data"; } void setPcloudDataActiveState() override { device.pcloudDataActive.store(false); } }; void Device::enablePcloudDataReq( smo::Callback callback ) { auto request = std::make_shared( *this, std::move(callback)); // Check if heartbeat socket is available if (discoveredDevice.ipAddr.empty()) { std::cerr << __func__ << ": No device IP available for device " << discoveredDevice.deviceIdentifier << std::endl; request->callOriginalCallbackWithFailure(); return; } // Send the start sampling command if (!request->sendCommand()) { request->callOriginalCallbackWithFailure(); return; } // Setup async callbacks request->setupAsyncCallbacks(request); } void Device::disablePcloudDataReq( smo::Callback callback ) { auto request = std::make_shared( *this, std::move(callback)); // Check if heartbeat socket is available if (discoveredDevice.ipAddr.empty()) { std::cerr << __func__ << ": No device IP available for device " << discoveredDevice.deviceIdentifier << std::endl; request->callOriginalCallbackWithFailure(); return; } /* Unconditionally close the pcloud data socket early since there's no good * reason to only close it if the command packet succeeds and ACKs. We want * to stop receiving data immediately when disable is requested. */ cleanupPcloudDataSocket(); // Send the stop sampling command if (!request->sendCommand()) { request->callOriginalCallbackWithFailure(); return; } // Setup async callbacks request->setupAsyncCallbacks(request); } void Device::cleanupPcloudDataSocket() { pcloudDataActive.store(false); } void Device::handleUdpDgram( const uint8_t *data, ssize_t bytesReceived, const struct sockaddr_in &senderAddr ) { // Check minimum size for any valid protocol message if (bytesReceived < static_cast( sizeof(comms::Header) + sizeof(comms::Command))) { // Too small for header + command return; } // Extract command set and command ID from the first two bytes after the header uint8_t cmd_set = data[sizeof(comms::Header)]; uint8_t cmd_id = data[sizeof(comms::Header) + 1]; // Look for a registered handler for this command auto key = std::make_pair(cmd_set, cmd_id); auto it = udpCommandHandlers.find(key); if (it != udpCommandHandlers.end()) { // Found a registered handler, invoke it try { it->second(data, bytesReceived, senderAddr); } catch (const std::exception& e) { std::cerr << __func__ << ": Exception in command handler for " << discoveredDevice.deviceIdentifier << " cmd_set=" << (int)cmd_set << " cmd_id=" << (int)cmd_id << ": " << e.what() << std::endl; } } // Unknown command types are silently ignored } void Device::registerUdpCommandHandler( uint8_t cmd_set, uint8_t cmd_id, std::function handler, const std::string& deviceIP ) { /** EXPLANATION: * Register a UDP command handler for the given cmd_set and cmd_id. * If the handler already exists for the given device IP, replace it. * If the handler does not exist, add it to the temporary collection. * * Adding a handler to a cmd_set+cmd_id pair which already has a handler * results in the new handler replacing the old one. */ auto key = std::make_pair(cmd_set, cmd_id); udpCommandHandlers[key] = handler; // Don't move, we need to copy /** EXPLANATION: * Add to temporary collection if deviceIP is provided (not empty) */ if (!deviceIP.empty()) { auto& handlers = devicesUnderConstruction[deviceIP]; auto it = std::find_if(handlers.begin(), handlers.end(), [cmd_set, cmd_id](const CommandHandler& existing) { return existing.cmd_set == cmd_set && existing.cmd_id == cmd_id; }); // Create the new command handler CommandHandler cmdHandler; cmdHandler.cmd_set = cmd_set; cmdHandler.cmd_id = cmd_id; cmdHandler.handler = std::move(handler); if (it != handlers.end()) { // Replace existing handler *it = std::move(cmdHandler); } else { // Add new handler handlers.push_back(std::move(cmdHandler)); } } } void Device::unregisterUdpCommandHandler( uint8_t cmd_set, uint8_t cmd_id, const std::string& deviceIP ) { auto key = std::make_pair(cmd_set, cmd_id); udpCommandHandlers.erase(key); /** EXPLANATION: * Remove from temporary collection if deviceIP is provided (not empty) */ if (!deviceIP.empty()) { auto it = devicesUnderConstruction.find(deviceIP); if (it != devicesUnderConstruction.end()) { // Remove the specific command handler for this cmd_set/cmd_id auto& handlers = it->second; handlers.erase( std::remove_if(handlers.begin(), handlers.end(), [cmd_set, cmd_id](const CommandHandler& h) { return h.cmd_set == cmd_set && h.cmd_id == cmd_id; }), handlers.end()); // If no handlers left for this IP, remove the entire entry if (handlers.empty()) { devicesUnderConstruction.erase(it); } } } } // SetReturnModeReq continuation class class Device::SetReturnModeReq : public smo::NonPostedAsynchronousContinuation { public: enum class SocketState { SOCKET_STILL_WAITING = 0, SOCKET_ERROR, SOCKET_RECV_SUCCESS, SOCKET_RECV_ERROR }; public: Device& device; uint8_t returnMode; // Atomic state flags for async coordination std::atomic timerFired{false}; std::atomic socketState{SocketState::SOCKET_STILL_WAITING}; std::atomic handlerExecuted{false}; // The timeout timer. boost::asio::deadline_timer timeoutTimer; // Received data storage uint8_t responseBuffer[1024]{}; ssize_t bytesReceived = -1; struct sockaddr_in senderAddr; socklen_t senderAddrLen = sizeof(senderAddr); public: friend void Device::setReturnModeReq( uint8_t returnMode, smo::Callback callback); SetReturnModeReq( Device& dev, uint8_t mode, smo::Callback cb) : smo::NonPostedAsynchronousContinuation( std::move(cb)), device(dev), returnMode(mode), timeoutTimer(device.componentThread->getIoService()) {} virtual ~SetReturnModeReq() { cleanup(); } // Public accessor for the original callback void callOriginalCallback(bool success) { this->callOriginalCb(success); } void callOriginalCallbackWithFailure() { this->callOriginalCb(false); } void setupAsyncCallbacks(std::shared_ptr request) { // Set up timeout timer timeoutTimer.expires_from_now(boost::posix_time::milliseconds( device.commandTimeoutMs)); timeoutTimer.async_wait( std::bind(&SetReturnModeReq::setReturnModeReq1_1, this, request, std::placeholders::_1)); // Register UDP command handler for set return mode response (cmd_set=0x01, cmd_id=0x06) device.registerUdpCommandHandler( 0x01, 0x06, std::bind(&SetReturnModeReq::setReturnModeReq1_2, this, request, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), device.discoveredDevice.ipAddr); } void setReturnModeReq1_1( std::shared_ptr context, const boost::system::error_code& error ) { if (error == boost::asio::error::operation_aborted) { // Timer was cancelled, ignore return; } context->timerFired.store(true); context->setReturnModeReq2(context); } void setReturnModeReq1_2( std::shared_ptr context, const uint8_t* data, ssize_t bytesReceived, const struct sockaddr_in& senderAddr ) { // Store the received data context->bytesReceived = bytesReceived; context->senderAddr = senderAddr; context->senderAddrLen = sizeof(senderAddr); // Copy the data to our buffer if (bytesReceived > 0 && bytesReceived <= (ssize_t)sizeof(context->responseBuffer)) { memcpy(context->responseBuffer, data, bytesReceived); context->socketState = SocketState::SOCKET_RECV_SUCCESS; } else { context->socketState = SocketState::SOCKET_RECV_ERROR; std::cerr << __func__ << ": Invalid data size: " << bytesReceived << std::endl; } context->setReturnModeReq2(context); } void setReturnModeReq2( std::shared_ptr context ) { // Only execute once if (context->handlerExecuted.exchange(true)) { return; } context->timeoutTimer.cancel(); device.unregisterUdpCommandHandler( 0x01, 0x06, device.discoveredDevice.ipAddr); SocketState finalSocketState = context->socketState.load(); bool finalTimerFired = context->timerFired.load(); // Check for timeout only if there was no socket activity if (finalTimerFired && finalSocketState == SocketState::SOCKET_STILL_WAITING) { std::cerr << __func__ << ": Set return mode timeout with " << device.discoveredDevice.ipAddr << "(" << device.discoveredDevice.deviceIdentifier << ")" << "\n"; context->callOriginalCallbackWithFailure(); return; } // Socket error from boost::asio if (finalSocketState == SocketState::SOCKET_ERROR) { std::cerr << __func__ << ": Socket error during set return mode with " << device.discoveredDevice.ipAddr << std::endl; context->callOriginalCallbackWithFailure(); return; } if (finalSocketState == SocketState::SOCKET_RECV_ERROR) { std::cerr << __func__ << ": Receive error during set return mode with " << device.discoveredDevice.ipAddr << "\n"; context->callOriginalCallbackWithFailure(); return; } /* Result must have been RECV_SUCCESS state if we reach here. * Data was already read in the async callback, just validate it */ if (context->bytesReceived < static_cast(sizeof(comms::SetLiDARReturnModeResponse))) { std::cerr << __func__ << ": Response of size " << context->bytesReceived << " too small from " << device.discoveredDevice.ipAddr << "\n"; context->callOriginalCallbackWithFailure(); return; } comms::SetLiDARReturnModeResponse* response = reinterpret_cast( context->responseBuffer); response->swapContentsToHostEndianness(); // Early callback return on error; success path only if all checks pass if (response->command.cmd_set != 0x01 || response->command.cmd_id != 0x06 || response->ret_code != 0x00) { context->callOriginalCallbackWithFailure(); return; } device.currentReturnMode = Device::ReturnMode(context->returnMode); context->callOriginalCallback(true); } void cleanup() { timeoutTimer.cancel(); } bool sendCommand() { // Get the command endpoint from the UdpCommandDemuxer auto& protoState = livoxProto1::getProtoState(); if (!protoState.deviceManager) { std::cerr << __func__ << ": No device manager available.\n"; return false; } auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer .getCmdEndpointFdDesc(); if (!cmdEndpointFdDesc) { std::cerr << __func__ << ": No command endpoint available.\n"; return false; } // Create set return mode message comms::SetLiDARReturnMode setReturnModeMsg; setReturnModeMsg.mode = returnMode; setReturnModeMsg.swapContentsToProtocolEndianness(); setReturnModeMsg.header.setCrc16FromRawBytes(); setReturnModeMsg.header.swapCrc16ToProtocolEndianness(); setReturnModeMsg.footer.crc_32 = setReturnModeMsg.calculateCrc32(); setReturnModeMsg.footer.swapCrc32ToProtocolEndianness(); // Set up destination address struct sockaddr_in deviceAddr; deviceAddr.sin_family = AF_INET; deviceAddr.sin_addr.s_addr = inet_addr( device.discoveredDevice.ipAddr.c_str()); deviceAddr.sin_port = htons(65000); // Commands go to port 65000 // Send set return mode message ssize_t bytesSent = sendto( cmdEndpointFdDesc->native_handle(), &setReturnModeMsg, sizeof(setReturnModeMsg), 0, (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); if (bytesSent < 0) { std::cerr << __func__ << ": Failed to send set return mode message: " << strerror(errno) << std::endl; return false; } std::cout << __func__ << ": Sent set return mode message to " << device.discoveredDevice.ipAddr << ":" << 65000 << std::endl; return true; } }; // GetReturnModeReq continuation class class Device::GetReturnModeReq : public smo::NonPostedAsynchronousContinuation { public: enum class SocketState { SOCKET_STILL_WAITING = 0, SOCKET_ERROR, SOCKET_RECV_SUCCESS, SOCKET_RECV_ERROR }; public: Device& device; // Atomic state flags for async coordination std::atomic timerFired{false}; std::atomic socketState{SocketState::SOCKET_STILL_WAITING}; std::atomic handlerExecuted{false}; // The timeout timer. boost::asio::deadline_timer timeoutTimer; // Received data storage uint8_t responseBuffer[1024]{}; ssize_t bytesReceived = -1; struct sockaddr_in senderAddr; socklen_t senderAddrLen = sizeof(senderAddr); public: friend void Device::getReturnModeReq( smo::Callback callback); GetReturnModeReq( Device& dev, smo::Callback cb) : smo::NonPostedAsynchronousContinuation( std::move(cb)), device(dev), timeoutTimer(device.componentThread->getIoService()) {} virtual ~GetReturnModeReq() { cleanup(); } // Public accessor for the original callback void callOriginalCallback(bool success, uint8_t returnMode) { this->callOriginalCb(success, returnMode); } void callOriginalCallbackWithFailure() { this->callOriginalCb(false, 0); } void setupAsyncCallbacks(std::shared_ptr request) { // Set up timeout timer timeoutTimer.expires_from_now(boost::posix_time::milliseconds( device.commandTimeoutMs)); timeoutTimer.async_wait( std::bind(&GetReturnModeReq::getReturnModeReq1_1, this, request, std::placeholders::_1)); // Register UDP command handler for get return mode response (cmd_set=0x01, cmd_id=0x07) device.registerUdpCommandHandler( 0x01, 0x07, std::bind(&GetReturnModeReq::getReturnModeReq1_2, this, request, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), device.discoveredDevice.ipAddr); } void getReturnModeReq1_1( std::shared_ptr context, const boost::system::error_code& error ) { if (error == boost::asio::error::operation_aborted) { // Timer was cancelled, ignore return; } context->timerFired.store(true); context->getReturnModeReq2(context); } void getReturnModeReq1_2( std::shared_ptr context, const uint8_t* data, ssize_t bytesReceived, const struct sockaddr_in& senderAddr ) { // Store the received data context->bytesReceived = bytesReceived; context->senderAddr = senderAddr; context->senderAddrLen = sizeof(senderAddr); // Copy the data to our buffer if (bytesReceived > 0 && bytesReceived <= (ssize_t)sizeof(context->responseBuffer)) { memcpy(context->responseBuffer, data, bytesReceived); context->socketState = SocketState::SOCKET_RECV_SUCCESS; } else { context->socketState = SocketState::SOCKET_RECV_ERROR; std::cerr << __func__ << ": Invalid data size: " << bytesReceived << std::endl; } context->getReturnModeReq2(context); } void getReturnModeReq2( std::shared_ptr context ) { // Only execute once if (context->handlerExecuted.exchange(true)) { return; } context->timeoutTimer.cancel(); device.unregisterUdpCommandHandler( 0x01, 0x07, device.discoveredDevice.ipAddr); SocketState finalSocketState = context->socketState.load(); bool finalTimerFired = context->timerFired.load(); // Check for timeout only if there was no socket activity if (finalTimerFired && finalSocketState == SocketState::SOCKET_STILL_WAITING) { std::cerr << __func__ << ": Get return mode timeout with " << device.discoveredDevice.ipAddr << "(" << device.discoveredDevice.deviceIdentifier << ")" << "\n"; context->callOriginalCallbackWithFailure(); return; } // Socket error from boost::asio if (finalSocketState == SocketState::SOCKET_ERROR) { std::cerr << __func__ << ": Socket error during get return mode with " << device.discoveredDevice.ipAddr << std::endl; context->callOriginalCallbackWithFailure(); return; } if (finalSocketState == SocketState::SOCKET_RECV_ERROR) { std::cerr << __func__ << ": Receive error during get return mode with " << device.discoveredDevice.ipAddr << std::endl; context->callOriginalCallbackWithFailure(); return; } /* Result must have been RECV_SUCCESS state if we reach here. * Data was already read in the async callback, just validate it */ if (context->bytesReceived < static_cast(sizeof(comms::GetLiDARReturnModeResponse))) { std::cerr << __func__ << ": Response of size " << context->bytesReceived << " too small from " << device.discoveredDevice.ipAddr << std::endl; context->callOriginalCallbackWithFailure(); return; } comms::GetLiDARReturnModeResponse* response = reinterpret_cast( context->responseBuffer); response->swapContentsToHostEndianness(); // Check if response indicates success if (!(response->command.cmd_set == 0x01 && response->command.cmd_id == 0x07 && response->ret_code == 0x00)) { context->callOriginalCallbackWithFailure(); return; } device.currentReturnMode = Device::ReturnMode(response->mode); context->callOriginalCallback(true, response->mode); } void cleanup() { timeoutTimer.cancel(); } bool sendCommand() { // Get the command endpoint from the UdpCommandDemuxer auto& protoState = livoxProto1::getProtoState(); if (!protoState.deviceManager) { std::cerr << __func__ << ": No device manager available.\n"; return false; } auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer .getCmdEndpointFdDesc(); if (!cmdEndpointFdDesc) { std::cerr << __func__ << ": No command endpoint available.\n"; return false; } // Create get return mode message comms::GetLiDARReturnMode getReturnModeMsg; getReturnModeMsg.swapContentsToProtocolEndianness(); getReturnModeMsg.header.setCrc16FromRawBytes(); getReturnModeMsg.header.swapCrc16ToProtocolEndianness(); getReturnModeMsg.footer.crc_32 = getReturnModeMsg.calculateCrc32(); getReturnModeMsg.footer.swapCrc32ToProtocolEndianness(); // Set up destination address struct sockaddr_in deviceAddr; deviceAddr.sin_family = AF_INET; deviceAddr.sin_addr.s_addr = inet_addr(device.discoveredDevice.ipAddr.c_str()); deviceAddr.sin_port = htons(65000); // Commands go to port 65000 // Send get return mode message ssize_t bytesSent = sendto( cmdEndpointFdDesc->native_handle(), &getReturnModeMsg, sizeof(getReturnModeMsg), 0, (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); if (bytesSent < 0) { std::cerr << __func__ << ": Failed to send get return mode message: " << strerror(errno) << std::endl; return false; } std::cout << __func__ << ": Sent get return mode message to " << device.discoveredDevice.ipAddr << ":" << 65000 << std::endl; return true; } }; void Device::setReturnModeReq( uint8_t returnMode, smo::Callback callback ) { auto request = std::make_shared( *this, returnMode, std::move(callback)); // Check if device IP is available if (discoveredDevice.ipAddr.empty()) { std::cerr << __func__ << ": No device IP available for device " << discoveredDevice.deviceIdentifier << std::endl; request->callOriginalCallbackWithFailure(); return; } // Send the set return mode command if (!request->sendCommand()) { request->callOriginalCallbackWithFailure(); return; } // Setup async callbacks request->setupAsyncCallbacks(request); } void Device::getReturnModeReq( smo::Callback callback ) { auto request = std::make_shared( *this, std::move(callback)); // Check if device IP is available if (discoveredDevice.ipAddr.empty()) { std::cerr << __func__ << ": No device IP available for device " << discoveredDevice.deviceIdentifier << std::endl; request->callOriginalCallbackWithFailure(); return; } // Send the get return mode command if (!request->sendCommand()) { request->callOriginalCallbackWithFailure(); return; } // Setup async callbacks request->setupAsyncCallbacks(request); } } // namespace livoxProto1