#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "device.h" #include "protocol.h" #include "core.h" namespace livoxProto1 { namespace comms { DiscoveredDevice::DiscoveredDevice( const std::string &deviceIdentifier, DeviceType deviceType, const std::string &ipAddr) : deviceIdentifier(deviceIdentifier), deviceType(deviceType), ipAddr(ipAddr) { } DiscoveredDevice::DiscoveredDevice( const BroadcastMessage &msg, const std::string &ipAddr ) : DiscoveredDevice( reinterpret_cast(msg.broadcast_code), static_cast(msg.dev_type), ipAddr) { } std::string DiscoveredDevice::stringify(void) const { std::ostringstream oss; oss << "DiscoveredDevice{" << "identifier='" << deviceIdentifier << "', " << "ipAddr='" << ipAddr << "', " << "deviceType=" << (int)deviceType << " (" << getDeviceTypeName() << ")" << "}"; return oss.str(); } std::string DiscoveredDevice::getDeviceTypeName(void) const { switch (deviceType) { case DeviceType::Hub: return "Hub"; case DeviceType::Mid40: return "Mid-40"; case DeviceType::Tele15: return "Tele-15"; case DeviceType::Horizon: return "Horizon"; case DeviceType::Mid70: return "Mid-70"; case DeviceType::Avia: return "Avia"; default: return "Unknown"; } } } // namespace comms Device::Device(const std::string &deviceIdentifier, const std::shared_ptr& componentThread, int handshakeTimeoutMs, int retryDelayMs, const std::string& smoIp, uint8_t smoSubnetNbits, uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort) : discoveredDevice( deviceIdentifier, comms::DeviceType::Mid40, // Initialize empty. IP will be set upon successful connection. ""), componentThread(componentThread), handshakeTimeoutMs(handshakeTimeoutMs), retryDelayMs(retryDelayMs), smoIp(smoIp), detectedSmoListeningIp(""), smoSubnetNbits(smoSubnetNbits), dataPort(dataPort), cmdPort(cmdPort), imuPort(imuPort), heartbeatFd(-1), heartbeatActive(false) { } Device::~Device() { if (heartbeatActive.load()) { heartbeatActive.store(false); if (heartbeatTimer) { heartbeatTimer->cancel(); } } heartbeatTimer.reset(); if (heartbeatFd >= 0) { close(heartbeatFd); heartbeatFd = -1; } } /** * Device::ConnectReq - Encapsulates all state and resources for async connection sequence * This class manages the overall device connection process including handshake and heartbeat setup */ class Device::ConnectReq : public AsynchronousContinuation { private: Device& device; std::unique_ptr retryTimer; public: ConnectReq(Device& dev, Device::connectReqCbFn cb) : AsynchronousContinuation(std::move(cb)), device(dev) {} /** FIXME: * WE need to assign the ipAddr to the Device being connected up. */ // Callback methods for the connection sequence void connectReq1( std::shared_ptr context, bool success, const std::string& ipAddr, int fd ) { if (success) { // Store the IP address in the device context->device.discoveredDevice.ipAddr = ipAddr; // Store the handshake FD for heartbeats context->device.heartbeatFd = fd; context->device.startHeartbeat(); context->originalCbFn(true); return; } if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": Trying to connect to device by " << "identifier" << "\n"; } // Try direct connect by device identifier context->device.connectByDeviceIdentifierReq( std::bind(&ConnectReq::connectReq2, context.get(), context, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); } void connectReq2( std::shared_ptr context, bool success, const std::string& ipAddr, int fd ) { if (success) { // Store the IP address in the device context->device.discoveredDevice.ipAddr = ipAddr; // Store the handshake FD for heartbeats context->device.heartbeatFd = fd; context->device.startHeartbeat(); context->originalCbFn(true); return; } // Start retry timer if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": Starting retry delay (" << context->device.retryDelayMs << "ms), then trying known " << "device again" << "\n"; } context->retryTimer = std::make_unique( context->device.componentThread->getIoService()); context->retryTimer->expires_from_now( boost::posix_time::milliseconds(context->device.retryDelayMs)); context->retryTimer->async_wait( std::bind( &ConnectReq::connectReq3, context.get(), context, std::placeholders::_1)); } void connectReq3( std::shared_ptr context, const boost::system::error_code& error ) { if (error) { context->originalCbFn(false); return; } if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": Trying to connect to known device " << "again" << "\n"; } context->device.connectToKnownDeviceReq( std::bind(&ConnectReq::connectReq4, context.get(), context, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); } void connectReq4( std::shared_ptr context, bool success, const std::string& ipAddr, int fd ) { if (success) { context->device.discoveredDevice.ipAddr = ipAddr; context->device.heartbeatFd = fd; context->device.startHeartbeat(); context->originalCbFn(true); return; } // All connection attempts failed context->originalCbFn(false); } }; void Device::connectReq(Device::connectReqCbFn callback) { // Create the connection request object to hold state and callbacks auto request = std::make_shared(*this, std::move(callback)); // Try connecting to known device first if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": Trying to connect to known device" << "\n"; } connectToKnownDeviceReq( std::bind( &ConnectReq::connectReq1, request.get(), request, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); } class Device::ConnectToKnownDeviceReq : public AsynchronousContinuation { public: Device& device; std::string deviceIP; std::shared_ptr deviceInfo; ConnectToKnownDeviceReq(Device& dev, Device::connectToKnownDeviceReqCbFn cb) : AsynchronousContinuation(std::move(cb)), device(dev) {} // Public accessor for the original callback void callOriginalCallback(bool success, const std::string& ipAddr, int fd) { originalCbFn(success, ipAddr, fd); } // Wrapper for failure cases void callOriginalCallbackWithFailure() { callOriginalCallback(false, "", -1); } // Callback methods for the connection sequence void connectToKnownDeviceReq1( std::shared_ptr context, bool success, int fd ) { // Return the IP address and raw FD to the caller context->callOriginalCallback(success, context->deviceIP, fd); } }; /** EXPLANATION: * This function is used to connect to a device that is already known to the * broadcastListener. */ void Device::connectToKnownDeviceReq( Device::connectToKnownDeviceReqCbFn callback ) { // Create the connection request object to hold state and callbacks auto request = std::make_shared( *this, std::move(callback)); auto& protoState = livoxProto1::getProtoState(); if (!protoState.deviceManager) { request->callOriginalCallbackWithFailure(); return; } // Check if the device is known to the broadcastListener if (!protoState.deviceManager->broadcastListener.deviceExists( request->device.discoveredDevice.deviceIdentifier)) { request->callOriginalCallbackWithFailure(); return; } request->deviceInfo = protoState.deviceManager->broadcastListener.getDevice( request->device.discoveredDevice.deviceIdentifier); if (!request->deviceInfo) { request->callOriginalCallbackWithFailure(); return; } // Use the IP address from the broadcast message request->deviceIP = request->deviceInfo->ipAddr; // Determine the final listening IP address auto smoIpResult = request->device.getSmoIp(request->deviceIP); if (!smoIpResult.has_value()) { // Auto-detection failed, fail early std::cerr << __func__ << ": Failed to detect SMO listening IP for " << "known device (" << request->device.discoveredDevice.deviceIdentifier << ")" << " @(" << request->deviceIP << ").\n"; request->callOriginalCallbackWithFailure(); return; } if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": Detected SMO listening IP for known device " << request->device.discoveredDevice.deviceIdentifier << " @(" << request->deviceIP << ") is " << smoIpResult.value() << ". About to try to handshake.\n"; } request->device.detectedSmoListeningIp = smoIpResult.value(); // Execute handshake with the known device using async method request->device.executeHandshakeReq( request->deviceIP, std::bind( &ConnectToKnownDeviceReq::connectToKnownDeviceReq1, request.get(), request, std::placeholders::_1, std::placeholders::_2)); } class Device::ConnectByDeviceIdentifierReq : public AsynchronousContinuation { public: Device& device; std::string deviceIP; ConnectByDeviceIdentifierReq( Device& dev, Device::connectByDeviceIdentifierReqCbFn cb) : AsynchronousContinuation(std::move(cb)), device(dev) {} // Public accessor for the original callback void callOriginalCallback(bool success, const std::string& ipAddr, int fd) { originalCbFn(success, ipAddr, fd); } // Wrapper for failure cases void callOriginalCallbackWithFailure() { callOriginalCallback(false, "", -1); } // Callback methods for the connection sequence void connectByDeviceIdentifierReq1( std::shared_ptr context, bool success, int fd ) { // Return the IP address and raw FD to the caller context->callOriginalCallback(success, context->deviceIP, fd); } }; void Device::connectByDeviceIdentifierReq( Device::connectByDeviceIdentifierReqCbFn callback ) { /** EXPLANATION: * This method uses heuristic device IP construction from the serial number. * This requires smoIp to be provided because: * 1. We need the network prefix to generate a valid device IP address * 2. Without a target device IP, we cannot detect which interface faces the device * 3. Therefore, if smoIp is omitted, heuristic construction is impossible * * If smoIp is not provided, the driver must rely only on broadcast advertisements * from the device (handled by connectToKnownDeviceReq). */ // Check if smoIp is provided - required for heuristic construction if (smoIp.empty()) { callback(false, "", -1); return; } // Create the connection request object to hold state and callbacks auto request = std::make_shared( *this, std::move(callback)); // Generate device IP from serial number request->deviceIP = generateClientDeviceIpFromSerialNumber( request->device.discoveredDevice.deviceIdentifier); // For heuristic construction, always use the provided smoIp. request->device.detectedSmoListeningIp = request->device.smoIp; if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": About to try to connect to device by " << "identifier (" << discoveredDevice.deviceIdentifier << ")" << " at IP (" << smoIp << ").\n"; } // Execute handshake using async method request->device.executeHandshakeReq( request->deviceIP, std::bind( &ConnectByDeviceIdentifierReq::connectByDeviceIdentifierReq1, request.get(), request, std::placeholders::_1, std::placeholders::_2)); } class Device::ExecuteHandshakeReq : public AsynchronousContinuation { public: friend void Device::executeHandshakeReq( const std::string& deviceIP, Device::executeHandshakeReqCbFn callback); enum class SocketState { SOCKET_STILL_WAITING = 0, SOCKET_ERROR, SOCKET_RECV_SUCCESS, SOCKET_RECV_ERROR }; public: Device& device; std::string deviceIP; // Atomic state flags for async coordination std::atomic timerFired{false}; std::atomic socketState{SocketState::SOCKET_STILL_WAITING}; std::atomic handlerExecuted{false}; // The stream descriptor that will be returned to the caller boost::asio::posix::stream_descriptor handshakeFdDesc; boost::asio::deadline_timer timeoutTimer; // Received data storage uint8_t responseBuffer[1024]{}; ssize_t bytesReceived = -1; struct sockaddr_in senderAddr; socklen_t senderAddrLen = sizeof(senderAddr); public: ExecuteHandshakeReq( Device& dev, const std::string& deviceIP, Device::executeHandshakeReqCbFn cb) : AsynchronousContinuation(std::move(cb)), device(dev), deviceIP(deviceIP), handshakeFdDesc(device.componentThread->getIoService()), timeoutTimer(device.componentThread->getIoService()) { } ~ExecuteHandshakeReq() { cleanup(); } // Public accessor for the original callback void callOriginalCallback(bool success, int fd) { originalCbFn(success, fd); } void callOriginalCallbackWithFailure() { /** EXPLANATION: * We have to call cleanupHandshakeSocket() here, specifically because * there are 5 references we need to clean up, and 2 of them are * actually recursive self-references within this class. * * The timer and the handshakeFdDesc are both given recursive * self-referencing shared_ptr's to this class when we eventually set up * the async callbacks for them. * * So merely letting the async continuation sequences go out of scope * won't cause this class to be destroyed. Rather, since the class * references itelf, we have to first break those references. Then and * only then will the shared_ptr's refcount go to 0 and the class will * be destroyed. * * We always unconditionally break the timer's reference inside of * the branch-unifying segment (executeHandshakeReq2), but we generally * only want to break the handshakeFdDesc's reference at the exact * moment when the sequence fails, because if it succeeds, we want to * commit the FD to the Device object being created (for heartbeats). * * Hence, we call cleanupHandshakeSocket() at the point of failure. * To break the self-reference when the sequence is successful, we * manually do that at the end of the sequence. */ cleanupHandshakeSocket(); callOriginalCallback(false, -1); } private: bool setupSocket() { /** EXPLANATION: * Create non-blocking UDP socket for handshake. We can't use * boost::asio::socket because it causes a segfault if associated with * an io_service from the main program (it's a boost bug). */ int socketFd = socket(AF_INET, SOCK_DGRAM, 0); if (socketFd < 0) { std::cerr << __func__ << ": Failed to create socket: " << strerror(errno) << std::endl; return false; } int flags = fcntl(socketFd, F_GETFL, 0); if (flags < 0 || fcntl(socketFd, F_SETFL, flags | O_NONBLOCK) < 0) { std::cerr << __func__ << ": Failed to set socket non-blocking: " << strerror(errno) << std::endl; return false; } // Bind socket to cmdPort so we can receive the handshake response struct sockaddr_in localAddr; memset(&localAddr, 0, sizeof(localAddr)); localAddr.sin_family = AF_INET; localAddr.sin_addr.s_addr = INADDR_ANY; localAddr.sin_port = htons(device.cmdPort); if (bind(socketFd, (struct sockaddr*)&localAddr, sizeof(localAddr)) < 0) { std::cerr << __func__ << ": Failed to bind socket to port " << device.cmdPort << ": " << strerror(errno) << std::endl; return false; } // Assign the socket FD to the stream descriptor handshakeFdDesc.assign(socketFd); return true; } bool sendHandshakeRequest() { /** EXPLANATION: * Prepare handshake request. */ comms::HandshakeRequest handshakeReq( device.detectedSmoListeningIp, device.dataPort, device.cmdPort, device.imuPort); handshakeReq.swapContentsToProtocolEndianness(); handshakeReq.header.setCrc16FromRawBytes(); handshakeReq.header.swapCrc16ToProtocolEndianness(); handshakeReq.footer.crc_32 = handshakeReq.calculateCrc32(); handshakeReq.footer.swapCrc32ToProtocolEndianness(); // Prepare device endpoint struct sockaddr_in deviceAddr; memset(&deviceAddr, 0, sizeof(deviceAddr)); deviceAddr.sin_family = AF_INET; deviceAddr.sin_addr.s_addr = inet_addr(deviceIP.c_str()); deviceAddr.sin_port = htons(65000); // Send handshake request directly (synchronous) ssize_t bytesSent = sendto( handshakeFdDesc.native_handle(), &handshakeReq, sizeof(comms::HandshakeRequest), 0, (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); if (bytesSent < 0) { std::cerr << __func__ << ": Failed to send handshake request: " << strerror(errno) << std::endl; return false; } return true; } void setupAsyncCallbacks( const std::shared_ptr &request ) { if (!handshakeFdDesc.is_open()) { throw std::runtime_error( std::string(__func__) + ": handshakeFdDesc is not open; cannot set up async callbacks " "for device " + deviceIP + "(" + device.discoveredDevice.deviceIdentifier + ")" + " handshake." "Check socket initialization and bining." ); } /** EXPLANATION: * We setup an async timer event to detect timeout, and wait for the * device to respond to the handshake request. If the device does not * respond within the timeout period, we will consider the handshake * to have failed. */ timeoutTimer.expires_from_now( boost::posix_time::milliseconds(device.handshakeTimeoutMs)); timeoutTimer.async_wait( std::bind( &ExecuteHandshakeReq::executeHandshakeReq1_1, this, request, std::placeholders::_1)); /** EXPLANATION: * Since we're using POSIX sockets calls on the underlying * native_handle, Let's use async_wait with POLLIN to detect when data * is available for reading. */ handshakeFdDesc.async_wait( boost::asio::posix::stream_descriptor::wait_read, std::bind(&ExecuteHandshakeReq::executeHandshakeReq1_2, this, request, std::placeholders::_1)); } void executeHandshakeReq1_1( std::shared_ptr, const boost::system::error_code& error) { // This is called from the timer callback if (!error) // Timeout occurred (not cancelled) { timerFired.store(true); executeHandshakeReq2(); } } void executeHandshakeReq1_2( std::shared_ptr, const boost::system::error_code& error ) { // This is called from the socket read callback if (error) { socketState.store(SocketState::SOCKET_ERROR); std::cerr << __func__ << ": Socket read error: " << error.message() << std::endl; } else { // Socket is readable, now actually read the data bytesReceived = recvfrom( handshakeFdDesc.native_handle(), responseBuffer, sizeof(responseBuffer), 0, (struct sockaddr*)&senderAddr, &senderAddrLen); if (bytesReceived > 0) { socketState.store(SocketState::SOCKET_RECV_SUCCESS); } else if (bytesReceived == 0) { socketState.store(SocketState::SOCKET_RECV_ERROR); std::cerr << __func__ << ": Received 0 bytes from recvfrom" << std::endl; } else { socketState.store(SocketState::SOCKET_ERROR); std::cerr << __func__ << ": recvfrom failed: " << strerror(errno) << " (errno: " << errno << ")" << std::endl; } } executeHandshakeReq2(); } void executeHandshakeReq2() { // Ensure we only execute once using atomic exchange if (handlerExecuted.exchange(true) == true) { return; } // Examine the flags and decide what happened SocketState finalSocketState = socketState.load(); bool finalTimerFired = timerFired.load(); // Cancel timer if still running timeoutTimer.cancel(); // Check for timeout only if there was no socket activity if (finalTimerFired && finalSocketState == SocketState::SOCKET_STILL_WAITING) { std::cerr << __func__ << ": Handshake timeout with " << deviceIP << "(" << device.discoveredDevice.deviceIdentifier << ")" << std::endl; callOriginalCallbackWithFailure(); return; } // Socket error from boost::asio if (finalSocketState == SocketState::SOCKET_ERROR) { std::cerr << __func__ << ": Socket error during handshake with " << deviceIP << std::endl; callOriginalCallbackWithFailure(); return; } if (finalSocketState == SocketState::SOCKET_RECV_ERROR) { std::cerr << __func__ << ": Receive error during handshake with " << deviceIP << std::endl; callOriginalCallbackWithFailure(); return; } /* Result must have been RECV_SUCCESS state if we reach here. * Data was already read in the async callback, just validate it */ if (bytesReceived < (ssize_t)sizeof(comms::HandshakeResponse)) { std::cerr << __func__ << ": Response of size " << bytesReceived << " too small from " << deviceIP << std::endl; callOriginalCallbackWithFailure(); return; } comms::HandshakeResponse* resp = reinterpret_cast(responseBuffer); /** EXPLANATION: * Following the clean receiving flow: * 1. Swap CRC32 to host endianness first * 2. Validate CRC32 * 3. Swap CRC16 to host endianness * 4. Validate CRC16 * 5. Swap content to host endianness */ resp->footer.swapCrc32ToHostEndianness(); if (!resp->validateCrc32()) { std::cerr << __func__ << ": CRC32 validation failed from " << deviceIP << std::endl; callOriginalCallbackWithFailure(); return; } resp->header.swapCrc16ToHostEndianness(); if (!resp->header.validateCrc16()) { std::cerr << __func__ << ": CRC16 validation failed from " << deviceIP << std::endl; callOriginalCallbackWithFailure(); return; } resp->swapContentsToHostEndianness(); if (!resp->sanityCheck() || resp->ret_code != 0x00) { std::cerr << __func__ << ": Invalid response from " << deviceIP << std::endl; callOriginalCallbackWithFailure(); return; } if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": Handshake successful with " << deviceIP << "(" << device.discoveredDevice.deviceIdentifier << ")" << "\n"; } // Transfer any successful state to Device commit(); int rawFd = handshakeFdDesc.release(); callOriginalCallback(true, rawFd); } void commit() // Transfer successful state to Device object { // Clean up resources (timer) but not the socket FD // The socket FD is returned to the caller, not transferred to the device timeoutTimer.cancel(); } void cleanupHandshakeSocket() { int fd = handshakeFdDesc.release(); if (fd != -1) { close(fd); } } void cleanup() // Clean up transient resources { timeoutTimer.cancel(); cleanupHandshakeSocket(); } }; void Device::executeHandshakeReq( const std::string& deviceIP, Device::executeHandshakeReqCbFn callback ) { // Create the handshake request object to hold state and callbacks auto request = std::make_shared( *this, deviceIP, std::move(callback)); // Check if detectedSmoListeningIp is empty - this should not happen if (detectedSmoListeningIp.empty()) { // This should not happen as it should be set by the calling method request->callOriginalCallbackWithFailure(); return; } try { if (!request->setupSocket()) { request->callOriginalCallbackWithFailure(); return; } if (!request->sendHandshakeRequest()) { request->callOriginalCallbackWithFailure(); return; } request->setupAsyncCallbacks(request); } catch (const std::exception& e) { std::cerr << __func__ << ": Handshake failed with " << deviceIP << ": " << e.what() << std::endl; request->callOriginalCallbackWithFailure(); } } void Device::disconnectReq(Device::disconnectReqCbFn callback) { // Stop heartbeat first heartbeatActive.store(false); if (heartbeatFd == -1) { std::cout << __func__ << ": No heartbeat socket available, skipping " "disconnect message" << std::endl; callback(true); return; } // Create disconnect message comms::DisconnectMessage disconnectMsg; disconnectMsg.swapContentsToProtocolEndianness(); disconnectMsg.header.setCrc16FromRawBytes(); disconnectMsg.header.swapCrc16ToProtocolEndianness(); disconnectMsg.footer.crc_32 = disconnectMsg.calculateCrc32(); disconnectMsg.footer.swapCrc32ToProtocolEndianness(); // Set up destination address struct sockaddr_in deviceAddr; deviceAddr.sin_family = AF_INET; deviceAddr.sin_addr.s_addr = inet_addr(discoveredDevice.ipAddr.c_str()); deviceAddr.sin_port = htons(65000); // Commands go to port 65000 // Send disconnect message ssize_t bytesSent = sendto( heartbeatFd, &disconnectMsg, sizeof(disconnectMsg), 0, (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); if (bytesSent < 0) { std::cerr << __func__ << ": Failed to send disconnect message: " << strerror(errno) << std::endl; // Continue with disconnect even if message send fails } std::cout << __func__ << ": Sent disconnect message to " << discoveredDevice.ipAddr << ":" << 65000 << std::endl; // Close the heartbeat socket close(heartbeatFd); heartbeatFd = -1; callback(true); } std::string Device::generateClientDeviceIpFromSerialNumber( const std::string& broadcastCode ) { /** EXPLANATION: * The input string is either a serial number (14 chars) or a broadcast code * (15 chars). We need to determine which one it is and extract the serial * number from the broadcast code. * * To generate a default IP address, we use the device's subnet: X.X.X.1XX * where XX = last two digits of serial. We use the smoIp and smoSubnetNbits * to determine the network prefix. */ if (broadcastCode.empty()) { throw std::invalid_argument( std::string(__func__) + ": Broadcast code cannot be empty"); } std::string serialNumber; if (broadcastCode.length() == 14) { // Input is a serial number serialNumber = broadcastCode; } else if (broadcastCode.length() == 15) { // Input is a broadcast code (serial + selector) serialNumber = broadcastCode.substr(0, 14); } else { // Invalid length throw std::invalid_argument( std::string(__func__) + ": Broadcast code must be 14 or 15 characters long"); } // Extract last two digits of serial number if (serialNumber.length() < 2) { throw std::invalid_argument( std::string(__func__) + ": Serial number too short"); } std::string lastTwoDigits = serialNumber.substr(serialNumber.length() - 2); // Validate that last two characters are digits if (lastTwoDigits[0] < '0' || lastTwoDigits[0] > '9' || lastTwoDigits[1] < '0' || lastTwoDigits[1] > '9') { throw std::invalid_argument( std::string(__func__) + ": Last two characters of serial number must be digits"); } /** EXPLANATION: * Use the device's subnet: X.X.X.1XX where XX = last two digits of serial. * We use the smoIp and smoSubnetNbits to determine the network prefix. */ // Parse smoIp to extract network prefix auto smoIpOctets = comms::parseIPv4Address(smoIp); if (!smoIpOctets.has_value()) { throw std::invalid_argument( std::string(__func__) + ": Invalid smoIp format: must be X.X.X.X"); } // Generate subnet mask based on nbits uint32_t subnetMask = getSubnetMaskFor(smoSubnetNbits); uint32_t smoIpAddr = (std::stoi(smoIpOctets->octet1) << 24) | (std::stoi(smoIpOctets->octet2) << 16) | (std::stoi(smoIpOctets->octet3) << 8) | std::stoi(smoIpOctets->octet4); // Apply subnet mask to get network prefix uint32_t networkPrefix = smoIpAddr & subnetMask; // Extract octets from network prefix uint8_t octet1 = (networkPrefix >> 24) & 0xFF; uint8_t octet2 = (networkPrefix >> 16) & 0xFF; uint8_t octet3 = (networkPrefix >> 8) & 0xFF; // Use the first three octets and append "1" + last two digits return std::to_string(octet1) + "." + std::to_string(octet2) + "." + std::to_string(octet3) + ".1" + lastTwoDigits; } void Device::startHeartbeat() { if (!componentThread || discoveredDevice.ipAddr.empty()) { throw std::runtime_error( std::string(__func__) + ": Can't start heartbeat without component thread or IP"); } // Check if we have the handshake socket available for heartbeat use if (heartbeatFd < 0) { throw std::runtime_error( std::string(__func__) + ": Expected to find handshake socket present but didn't find it"); } // Create heartbeat timer heartbeatTimer = std::make_unique( componentThread->getIoService()); heartbeatActive.store(true); // Send first heartbeat immediately sendHeartbeat(); } void Device::sendHeartbeat() { if (!heartbeatActive.load()) { std::cerr << __func__ << ": Ending heartbeat loop due to " "heartbeatActive==false.\n"; return; } if (heartbeatFd < 0 || discoveredDevice.ipAddr.empty()) { std::cerr << __func__ << ": Ending heartbeat loop due to " "heartbeatFd==-1 or discoveredDevice.ipAddr.empty().\n"; return; } try { comms::HeartbeatMessage heartbeatMsg; heartbeatMsg.swapContentsToProtocolEndianness(); heartbeatMsg.header.setCrc16FromRawBytes(); heartbeatMsg.header.swapCrc16ToProtocolEndianness(); heartbeatMsg.footer.crc_32 = heartbeatMsg.calculateCrc32(); heartbeatMsg.footer.swapCrc32ToProtocolEndianness(); // Set up destination address for raw socket struct sockaddr_in deviceAddr; memset(&deviceAddr, 0, sizeof(deviceAddr)); deviceAddr.sin_family = AF_INET; deviceAddr.sin_addr.s_addr = inet_addr(discoveredDevice.ipAddr.c_str()); // Heartbeats and commands go to port 65000 deviceAddr.sin_port = htons(65000); ssize_t bytesSent = sendto( heartbeatFd, &heartbeatMsg, sizeof(heartbeatMsg), 0, (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); if (bytesSent < 0) { std::cerr << "[" << __func__ << "] Failed to send heartbeat: " << strerror(errno) << std::endl; return; } /** EXPLANATION: * Schedule next heartbeat in 1 second, per the spec. */ heartbeatTimer->expires_from_now(boost::posix_time::seconds(1)); heartbeatTimer->async_wait( [this](const boost::system::error_code& error) { onHeartbeatTimer(error); } ); } catch (const std::exception& e) { heartbeatActive.store(false); std::cerr << __func__ << ": Heartbeat send failed for device " << discoveredDevice.deviceIdentifier << ": " << e.what() << std::endl; } } void Device::onHeartbeatTimer(const boost::system::error_code& error) { // Timer was cancelled, heartbeat stopped if (error == boost::asio::error::operation_aborted) { return; } if (error) { heartbeatActive.store(false); std::cerr << "[" << __func__ << "] Heartbeat timer error for device " << discoveredDevice.deviceIdentifier << ": " << error.message() << std::endl; return; } // Send next heartbeat sendHeartbeat(); } uint32_t Device::getSubnetMaskFor(uint8_t nbits) { if (nbits > 32) { throw std::invalid_argument( std::string(__func__) + ": nbits must be between 0 and 32"); } // Generate subnet mask: set the first nbits to 1, rest to 0 if (nbits == 0) { return 0x00000000; } else if (nbits == 32) { return 0xFFFFFFFF; } else { // Create mask with nbits set to 1 from the left return (0xFFFFFFFF << (32 - nbits)); } } std::optional Device::detectSmoIp(const std::string& deviceIP) { /** EXPLANATION: * This function detects the SMO IP address of the interface that's facing * the device by iterating through all network interfaces and checking for * the interface that has the IP address in the same subnet as the device's * IP address. */ try { // Parse the device IP to get the network prefix auto deviceIpOctets = comms::parseIPv4Address(deviceIP); if (!deviceIpOctets.has_value()) { return std::nullopt; } // Convert device IP octets to integers for bitwise operations uint32_t deviceIpAddr = (std::stoi(deviceIpOctets->octet1) << 24) | (std::stoi(deviceIpOctets->octet2) << 16) | (std::stoi(deviceIpOctets->octet3) << 8) | std::stoi(deviceIpOctets->octet4); // Generate subnet mask based on nbits uint32_t subnetMask = getSubnetMaskFor(smoSubnetNbits); /* Get all network interfaces using getifaddrs (Linux/Unix specific) * * FIXME: Add Windows support using GetAdaptersAddresses when porting */ struct ifaddrs *ifaddr; if (getifaddrs(&ifaddr) == -1) { return std::nullopt; } // Use unique_ptr for automatic cleanup (RAII) to free ifaddrs auto ifaddr_deleter = [](struct ifaddrs* ptr) { freeifaddrs(ptr); }; std::unique_ptr ifaddr_ptr( ifaddr, ifaddr_deleter); std::string found_ip; /** EXPLANATION: * Iterate through all network interfaces and check if the IP address is * in the same subnet as the device's IP address. */ for (struct ifaddrs *ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next) { if (ifa->ifa_addr == nullptr) continue; // Check if it's IPv4 if (ifa->ifa_addr->sa_family != AF_INET) { continue; } // Get the IPv4 address struct sockaddr_in* addr_in = (struct sockaddr_in*)ifa->ifa_addr; char ip_str[INET_ADDRSTRLEN]; if (inet_ntop( AF_INET, &addr_in->sin_addr, ip_str, INET_ADDRSTRLEN) == nullptr) { continue; } std::string ip = ip_str; // Check if this IP is in the same subnet auto ipOctets = comms::parseIPv4Address(ip); if (!ipOctets.has_value()) { continue; } // Convert IP octets to integer uint32_t ipAddr = (std::stoi(ipOctets->octet1) << 24) | (std::stoi(ipOctets->octet2) << 16) | (std::stoi(ipOctets->octet3) << 8) | std::stoi(ipOctets->octet4); /* Check if this iface's IP is in the same subnet as the device's IP * using the calculated mask. Only compare the bits that are set in * the subnet mask. */ if ((ipAddr & subnetMask) == (deviceIpAddr & subnetMask)) { found_ip = ip; break; } } // Return the found IP (empty string if none found) if (!found_ip.empty()) { return found_ip; } return std::nullopt; } catch (const std::exception& e) { std::cerr << "Error detecting SMO IP: " << e.what() << std::endl; return std::nullopt; } } std::optional Device::getSmoIp(const std::string& deviceIP) { /** EXPLANATION: * This is only used when connecting to a device that's already known to * the broadcastListener. * It is NOT and SHOULD not be used when connecting by heuristic IP * construction using the client device's serial number. * * Determines the SMO listening IP address for this device. * If smoIp is provided, validate it against detected IP and return it. * If smoIp is empty, attempt auto-detection based on device IP. * Returns std::optional - empty if detection fails. */ auto detectedIp = detectSmoIp(deviceIP); if (!smoIp.empty()) { // smoIp was provided, validate it against detected IP if (detectedIp.has_value() && detectedIp.value() != smoIp) { // Print warning if provided smoIp doesn't match detected IP std::cerr << "Warning: Provided smo-ip (" << smoIp << ") doesn't match detected IP (" << detectedIp.value() << ") for device " << discoveredDevice.deviceIdentifier << " @(" << deviceIP << ")" << ". Using provided smo-ip anyway." << std::endl; } return smoIp; } if (detectedIp.has_value()) { return detectedIp.value(); } // Auto-detection failed return std::nullopt; } } // namespace livoxProto1