#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "device.h" #include "protocol.h" #include "core.h" /** EXPLANATION: * This file contains the implementation of the Device class. * * FIXME: * We may need to check how smo-subnet-nbits is used in here because we didn't * actually check to see under what conditions it's required vs optional. Hence * we don't currently enforce correct usage of it, and we just assume that the * livoxGen1's policy of supplying a default value of 24 is correct. */ namespace livoxProto1 { namespace comms { DiscoveredDevice::DiscoveredDevice( const std::string &deviceIdentifier, DeviceType deviceType, const std::string &ipAddr) : deviceIdentifier(deviceIdentifier), deviceType(deviceType), ipAddr(ipAddr) { } DiscoveredDevice::DiscoveredDevice( const BroadcastMessage &msg, const std::string &ipAddr ) : DiscoveredDevice( reinterpret_cast(msg.broadcast_code), static_cast(msg.dev_type), ipAddr) { } std::string DiscoveredDevice::stringify(void) const { std::ostringstream oss; oss << "DiscoveredDevice{" << "identifier='" << deviceIdentifier << "', " << "ipAddr='" << ipAddr << "', " << "deviceType=" << (int)deviceType << " (" << getDeviceTypeName() << ")" << "}"; return oss.str(); } std::string DiscoveredDevice::getDeviceTypeName(void) const { switch (deviceType) { case DeviceType::Hub: return "Hub"; case DeviceType::Mid40: return "Mid-40"; case DeviceType::Tele15: return "Tele-15"; case DeviceType::Horizon: return "Horizon"; case DeviceType::Mid70: return "Mid-70"; case DeviceType::Avia: return "Avia"; default: return "Unknown"; } } } // namespace comms Device::Device(const std::string &deviceIdentifier, const std::shared_ptr& componentThread, int handshakeTimeoutMs, int retryDelayMs, const std::string& smoIp, uint8_t smoSubnetNbits, uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort) : discoveredDevice( deviceIdentifier, comms::DeviceType::Mid40, // Initialize empty. IP will be set upon successful connection. ""), componentThread(componentThread), handshakeTimeoutMs(handshakeTimeoutMs), retryDelayMs(retryDelayMs), smoIp(smoIp), detectedSmoListeningIp(""), smoSubnetNbits(smoSubnetNbits), dataPort(dataPort), cmdPort(cmdPort), imuPort(imuPort), heartbeatFd(-1), heartbeatActive(false), pcloudDataActive(false), pcloudDataFd(-1) { } Device::~Device() { if (heartbeatActive.load()) { heartbeatActive.store(false); if (heartbeatTimer) { heartbeatTimer->cancel(); } } if (pcloudDataActive.load()) { pcloudDataActive.store(false); if (pcloudDataSocketDesc) { pcloudDataSocketDesc->cancel(); } } heartbeatTimer.reset(); pcloudDataSocketDesc.reset(); if (heartbeatFd >= 0) { close(heartbeatFd); heartbeatFd = -1; } if (pcloudDataFd >= 0) { close(pcloudDataFd); pcloudDataFd = -1; } } /** * Device::ConnectReq - Encapsulates all state and resources for async connection sequence * This class manages the overall device connection process including handshake and heartbeat setup */ class Device::ConnectReq : public smo::NonPostedAsynchronousContinuation { private: Device& device; public: ConnectReq(Device& dev, smo::Callback cb) : smo::NonPostedAsynchronousContinuation( std::move(cb)), device(dev) {} /** FIXME: * WE need to assign the ipAddr to the Device being connected up. */ // Callback methods for the connection sequence void connectReq1( std::shared_ptr context, bool success, const std::string& ipAddr, int fd ) { if (success) { // Store the IP address in the device context->device.discoveredDevice.ipAddr = ipAddr; // Store the handshake FD for heartbeats context->device.heartbeatFd = fd; context->device.startHeartbeat(); context->callOriginalCb(true); return; } if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": Trying to connect to device by " << "identifier" << "\n"; } // Try direct connect by device identifier context->device.connectByDeviceIdentifierReq( {context, std::bind(&ConnectReq::connectReq2, context.get(), context, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)}); } void connectReq2( std::shared_ptr context, bool success, const std::string& ipAddr, int fd ) { if (success) { // Store the IP address in the device context->device.discoveredDevice.ipAddr = ipAddr; // Store the handshake FD for heartbeats context->device.heartbeatFd = fd; context->device.startHeartbeat(); context->callOriginalCb(true); return; } // All connection attempts failed context->callOriginalCb(false); } }; void Device::connectReq(smo::Callback callback) { // Create the connection request object to hold state and callbacks auto request = std::make_shared(*this, std::move(callback)); // Try connecting to known device first if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": Trying to connect to known device" << "\n"; } connectToKnownDeviceReq( {request, std::bind( &ConnectReq::connectReq1, request.get(), request, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)}); } class Device::ConnectToKnownDeviceReq : public smo::NonPostedAsynchronousContinuation< Device::connectToKnownDeviceReqCbFn> { public: Device& device; std::string deviceIP; std::shared_ptr deviceInfo; ConnectToKnownDeviceReq(Device& dev, smo::Callback cb) : smo::NonPostedAsynchronousContinuation< Device::connectToKnownDeviceReqCbFn>(std::move(cb)), device(dev) {} // Public accessor for the original callback void callOriginalCallback(bool success, const std::string& ipAddr, int fd) { callOriginalCb(success, ipAddr, fd); } // Wrapper for failure cases void callOriginalCallbackWithFailure() { callOriginalCallback(false, "", -1); } // Callback methods for the connection sequence void connectToKnownDeviceReq1( std::shared_ptr context, bool success, int fd ) { // Return the IP address and raw FD to the caller context->callOriginalCallback(success, context->deviceIP, fd); } }; /** EXPLANATION: * This function is used to connect to a device that is already known to the * broadcastListener. */ void Device::connectToKnownDeviceReq( smo::Callback callback ) { // Create the connection request object to hold state and callbacks auto request = std::make_shared( *this, std::move(callback)); auto& protoState = livoxProto1::getProtoState(); if (!protoState.deviceManager) { request->callOriginalCallbackWithFailure(); return; } // Check if the device is known to the broadcastListener if (!protoState.deviceManager->broadcastListener.deviceExists( request->device.discoveredDevice.deviceIdentifier)) { request->callOriginalCallbackWithFailure(); return; } request->deviceInfo = protoState.deviceManager->broadcastListener.getDevice( request->device.discoveredDevice.deviceIdentifier); if (!request->deviceInfo) { request->callOriginalCallbackWithFailure(); return; } // Use the IP address from the broadcast message request->deviceIP = request->deviceInfo->ipAddr; // Determine the final listening IP address auto smoIpResult = request->device.getSmoIp(request->deviceIP); if (!smoIpResult.has_value()) { // Auto-detection failed, fail early std::cerr << __func__ << ": Failed to detect SMO listening IP for " << "known device (" << request->device.discoveredDevice.deviceIdentifier << ")" << " @(" << request->deviceIP << ").\n"; request->callOriginalCallbackWithFailure(); return; } if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": Detected SMO listening IP for known device " << request->device.discoveredDevice.deviceIdentifier << " @(" << request->deviceIP << ") is " << smoIpResult.value() << ". About to try to handshake.\n"; } request->device.detectedSmoListeningIp = smoIpResult.value(); // Execute handshake with the known device using async method request->device.executeHandshakeReq( request->deviceIP, {request, std::bind( &ConnectToKnownDeviceReq::connectToKnownDeviceReq1, request.get(), request, std::placeholders::_1, std::placeholders::_2)}); } class Device::ConnectByDeviceIdentifierReq : public smo::NonPostedAsynchronousContinuation< Device::connectByDeviceIdentifierReqCbFn> { public: Device& device; std::string deviceIP; ConnectByDeviceIdentifierReq( Device& dev, smo::Callback cb) : smo::NonPostedAsynchronousContinuation< Device::connectByDeviceIdentifierReqCbFn>( std::move(cb)), device(dev) {} // Public accessor for the original callback void callOriginalCallback(bool success, const std::string& ipAddr, int fd) { callOriginalCb(success, ipAddr, fd); } // Wrapper for failure cases void callOriginalCallbackWithFailure() { callOriginalCallback(false, "", -1); } // Callback methods for the connection sequence void connectByDeviceIdentifierReq1( std::shared_ptr context, bool success, int fd ) { // Return the IP address and raw FD to the caller context->callOriginalCallback(success, context->deviceIP, fd); } }; void Device::connectByDeviceIdentifierReq( smo::Callback callback ) { /** EXPLANATION: * This method uses heuristic device IP construction from the serial number. * This requires smoIp to be provided because: * 1. We need the network prefix to generate a valid device IP address * 2. Without a target device IP, we cannot detect which interface faces the device * 3. Therefore, if smoIp is omitted, heuristic construction is impossible * * If smoIp is not provided, the driver must rely only on broadcast advertisements * from the device (handled by connectToKnownDeviceReq). */ // Check if smoIp is provided - required for heuristic construction if (smoIp.empty()) { callback.callbackFn(false, "", -1); return; } // Create the connection request object to hold state and callbacks auto request = std::make_shared( *this, std::move(callback)); // Generate device IP from serial number request->deviceIP = generateClientDeviceIpFromSerialNumber( request->device.discoveredDevice.deviceIdentifier); // For heuristic construction, always use the provided smoIp. request->device.detectedSmoListeningIp = request->device.smoIp; if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": About to try to connect to device by " << "identifier (" << discoveredDevice.deviceIdentifier << ")" << " at IP (" << smoIp << ").\n"; } // Execute handshake using async method request->device.executeHandshakeReq( request->deviceIP, {request, std::bind( &ConnectByDeviceIdentifierReq::connectByDeviceIdentifierReq1, request.get(), request, std::placeholders::_1, std::placeholders::_2)}); } class Device::ExecuteHandshakeReq : public smo::NonPostedAsynchronousContinuation< Device::executeHandshakeReqCbFn> { public: friend void Device::executeHandshakeReq( const std::string& deviceIP, smo::Callback callback); enum class SocketState { SOCKET_STILL_WAITING = 0, SOCKET_ERROR, SOCKET_RECV_SUCCESS, SOCKET_RECV_ERROR }; public: Device& device; std::string deviceIP; // Atomic state flags for async coordination std::atomic timerFired{false}; std::atomic socketState{SocketState::SOCKET_STILL_WAITING}; std::atomic handlerExecuted{false}; // The stream descriptor that will be returned to the caller boost::asio::posix::stream_descriptor handshakeFdDesc; boost::asio::deadline_timer timeoutTimer; // Received data storage uint8_t responseBuffer[1024]{}; ssize_t bytesReceived = -1; struct sockaddr_in senderAddr; socklen_t senderAddrLen = sizeof(senderAddr); public: ExecuteHandshakeReq( Device& dev, const std::string& deviceIP, smo::Callback cb) : smo::NonPostedAsynchronousContinuation( std::move(cb)), device(dev), deviceIP(deviceIP), handshakeFdDesc(device.componentThread->getIoService()), timeoutTimer(device.componentThread->getIoService()) { } ~ExecuteHandshakeReq() { cleanup(); } // Public accessor for the original callback void callOriginalCallback(bool success, int fd) { callOriginalCb(success, fd); } void callOriginalCallbackWithFailure() { /** EXPLANATION: * We have to call cleanupHandshakeSocket() here, specifically because * there are 5 references we need to clean up, and 2 of them are * actually recursive self-references within this class. * * The timer and the handshakeFdDesc are both given recursive * self-referencing shared_ptr's to this class when we eventually set up * the async callbacks for them. * * So merely letting the async continuation sequences go out of scope * won't cause this class to be destroyed. Rather, since the class * references itelf, we have to first break those references. Then and * only then will the shared_ptr's refcount go to 0 and the class will * be destroyed. * * We always unconditionally break the timer's reference inside of * the branch-unifying segment (executeHandshakeReq2), but we generally * only want to break the handshakeFdDesc's reference at the exact * moment when the sequence fails, because if it succeeds, we want to * commit the FD to the Device object being created (for heartbeats). * * Hence, we call cleanupHandshakeSocket() at the point of failure. * To break the self-reference when the sequence is successful, we * manually do that at the end of the sequence. */ cleanupHandshakeSocket(); callOriginalCallback(false, -1); } private: bool setupSocket() { /** EXPLANATION: * Create non-blocking UDP socket for handshake. We can't use * boost::asio::socket because it causes a segfault if associated with * an io_service from the main program (it's a boost bug). */ int socketFd = socket(AF_INET, SOCK_DGRAM, 0); if (socketFd < 0) { std::cerr << __func__ << ": Failed to create socket: " << strerror(errno) << std::endl; return false; } int flags = fcntl(socketFd, F_GETFL, 0); if (flags < 0 || fcntl(socketFd, F_SETFL, flags | O_NONBLOCK) < 0) { std::cerr << __func__ << ": Failed to set socket non-blocking: " << strerror(errno) << std::endl; return false; } // Bind socket to cmdPort so we can receive the handshake response struct sockaddr_in localAddr; memset(&localAddr, 0, sizeof(localAddr)); localAddr.sin_family = AF_INET; localAddr.sin_addr.s_addr = INADDR_ANY; localAddr.sin_port = htons(device.cmdPort); if (bind(socketFd, (struct sockaddr*)&localAddr, sizeof(localAddr)) < 0) { std::cerr << __func__ << ": Failed to bind socket to port " << device.cmdPort << ": " << strerror(errno) << std::endl; return false; } // Assign the socket FD to the stream descriptor handshakeFdDesc.assign(socketFd); return true; } bool sendHandshakeRequest() { /** EXPLANATION: * Prepare handshake request. */ comms::HandshakeRequest handshakeReq( device.detectedSmoListeningIp, device.dataPort, device.cmdPort, device.imuPort); handshakeReq.swapContentsToProtocolEndianness(); handshakeReq.header.setCrc16FromRawBytes(); handshakeReq.header.swapCrc16ToProtocolEndianness(); handshakeReq.footer.crc_32 = handshakeReq.calculateCrc32(); handshakeReq.footer.swapCrc32ToProtocolEndianness(); // Prepare device endpoint struct sockaddr_in deviceAddr; memset(&deviceAddr, 0, sizeof(deviceAddr)); deviceAddr.sin_family = AF_INET; deviceAddr.sin_addr.s_addr = inet_addr(deviceIP.c_str()); deviceAddr.sin_port = htons(65000); // Send handshake request directly (synchronous) ssize_t bytesSent = sendto( handshakeFdDesc.native_handle(), &handshakeReq, sizeof(comms::HandshakeRequest), 0, (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); if (bytesSent < 0) { std::cerr << __func__ << ": Failed to send handshake request: " << strerror(errno) << std::endl; return false; } return true; } void setupAsyncCallbacks( const std::shared_ptr &request ) { if (!handshakeFdDesc.is_open()) { throw std::runtime_error( std::string(__func__) + ": handshakeFdDesc is not open; cannot set up async callbacks " "for device " + deviceIP + "(" + device.discoveredDevice.deviceIdentifier + ")" + " handshake." "Check socket initialization and bining." ); } /** EXPLANATION: * We setup an async timer event to detect timeout, and wait for the * device to respond to the handshake request. If the device does not * respond within the timeout period, we will consider the handshake * to have failed. */ timeoutTimer.expires_from_now( boost::posix_time::milliseconds(device.handshakeTimeoutMs)); timeoutTimer.async_wait( std::bind( &ExecuteHandshakeReq::executeHandshakeReq1_1, this, request, std::placeholders::_1)); /** EXPLANATION: * Since we're using POSIX sockets calls on the underlying * native_handle, Let's use async_wait with POLLIN to detect when data * is available for reading. */ handshakeFdDesc.async_wait( boost::asio::posix::stream_descriptor::wait_read, std::bind( &ExecuteHandshakeReq::executeHandshakeReq1_2, this, request, std::placeholders::_1)); } void executeHandshakeReq1_1( std::shared_ptr, const boost::system::error_code& error) { // This is called from the timer callback if (!error) // Timeout occurred (not cancelled) { timerFired.store(true); executeHandshakeReq2(); } } void executeHandshakeReq1_2( std::shared_ptr, const boost::system::error_code& error ) { // This is called from the socket read callback if (error) { socketState.store(SocketState::SOCKET_ERROR); std::cerr << __func__ << ": Socket read error: " << error.message() << std::endl; } else { // Socket is readable, now actually read the data bytesReceived = recvfrom( handshakeFdDesc.native_handle(), responseBuffer, sizeof(responseBuffer), 0, (struct sockaddr*)&senderAddr, &senderAddrLen); if (bytesReceived > 0) { socketState.store(SocketState::SOCKET_RECV_SUCCESS); } else if (bytesReceived == 0) { socketState.store(SocketState::SOCKET_RECV_ERROR); std::cerr << __func__ << ": Received 0 bytes from recvfrom" << std::endl; } else { socketState.store(SocketState::SOCKET_ERROR); std::cerr << __func__ << ": recvfrom failed: " << strerror(errno) << " (errno: " << errno << ")" << std::endl; } } executeHandshakeReq2(); } void executeHandshakeReq2() { // Ensure we only execute once using atomic exchange if (handlerExecuted.exchange(true) == true) { return; } // Examine the flags and decide what happened SocketState finalSocketState = socketState.load(); bool finalTimerFired = timerFired.load(); // Cancel timer if still running timeoutTimer.cancel(); // Check for timeout only if there was no socket activity if (finalTimerFired && finalSocketState == SocketState::SOCKET_STILL_WAITING) { std::cerr << __func__ << ": Handshake timeout with " << deviceIP << "(" << device.discoveredDevice.deviceIdentifier << ")" << std::endl; callOriginalCallbackWithFailure(); return; } // Socket error from boost::asio if (finalSocketState == SocketState::SOCKET_ERROR) { std::cerr << __func__ << ": Socket error during handshake with " << deviceIP << std::endl; callOriginalCallbackWithFailure(); return; } if (finalSocketState == SocketState::SOCKET_RECV_ERROR) { std::cerr << __func__ << ": Receive error during handshake with " << deviceIP << std::endl; callOriginalCallbackWithFailure(); return; } /* Result must have been RECV_SUCCESS state if we reach here. * Data was already read in the async callback, just validate it */ if (bytesReceived < (ssize_t)sizeof(comms::HandshakeResponse)) { std::cerr << __func__ << ": Response of size " << bytesReceived << " too small from " << deviceIP << std::endl; callOriginalCallbackWithFailure(); return; } comms::HandshakeResponse* resp = reinterpret_cast(responseBuffer); /** EXPLANATION: * Following the clean receiving flow: * 1. Swap CRC32 to host endianness first * 2. Validate CRC32 * 3. Swap CRC16 to host endianness * 4. Validate CRC16 * 5. Swap content to host endianness */ resp->footer.swapCrc32ToHostEndianness(); if (!resp->validateCrc32()) { std::cerr << __func__ << ": CRC32 validation failed from " << deviceIP << std::endl; callOriginalCallbackWithFailure(); return; } resp->header.swapCrc16ToHostEndianness(); if (!resp->header.validateCrc16()) { std::cerr << __func__ << ": CRC16 validation failed from " << deviceIP << std::endl; callOriginalCallbackWithFailure(); return; } resp->swapContentsToHostEndianness(); if (!resp->sanityCheck() || resp->ret_code != 0x00) { std::cerr << __func__ << ": Invalid response from " << deviceIP << std::endl; callOriginalCallbackWithFailure(); return; } if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": Handshake successful with " << deviceIP << "(" << device.discoveredDevice.deviceIdentifier << ")" << "\n"; } // Transfer any successful state to Device commit(); int rawFd = handshakeFdDesc.release(); callOriginalCallback(true, rawFd); } void commit() // Transfer successful state to Device object { // Clean up resources (timer) but not the socket FD // The socket FD is returned to the caller, not transferred to the device timeoutTimer.cancel(); } void cleanupHandshakeSocket() { int fd = handshakeFdDesc.release(); if (fd != -1) { close(fd); } } void cleanup() // Clean up transient resources { timeoutTimer.cancel(); cleanupHandshakeSocket(); } }; void Device::executeHandshakeReq( const std::string& deviceIP, smo::Callback callback ) { // Create the handshake request object to hold state and callbacks auto request = std::make_shared( *this, deviceIP, std::move(callback)); // Check if detectedSmoListeningIp is empty - this should not happen if (detectedSmoListeningIp.empty()) { // This should not happen as it should be set by the calling method request->callOriginalCallbackWithFailure(); return; } try { if (!request->setupSocket()) { request->callOriginalCallbackWithFailure(); return; } if (!request->sendHandshakeRequest()) { request->callOriginalCallbackWithFailure(); return; } request->setupAsyncCallbacks(request); } catch (const std::exception& e) { std::cerr << __func__ << ": Handshake failed with " << deviceIP << ": " << e.what() << std::endl; request->callOriginalCallbackWithFailure(); } } void Device::disconnectReq(smo::Callback callback) { // Stop heartbeat first heartbeatActive.store(false); if (heartbeatFd == -1) { std::cout << __func__ << ": No heartbeat socket available, skipping " "disconnect message" << std::endl; callback.callbackFn(true); return; } // Create disconnect message comms::DisconnectMessage disconnectMsg; disconnectMsg.swapContentsToProtocolEndianness(); disconnectMsg.header.setCrc16FromRawBytes(); disconnectMsg.header.swapCrc16ToProtocolEndianness(); disconnectMsg.footer.crc_32 = disconnectMsg.calculateCrc32(); disconnectMsg.footer.swapCrc32ToProtocolEndianness(); // Set up destination address struct sockaddr_in deviceAddr; deviceAddr.sin_family = AF_INET; deviceAddr.sin_addr.s_addr = inet_addr(discoveredDevice.ipAddr.c_str()); deviceAddr.sin_port = htons(65000); // Commands go to port 65000 // Send disconnect message ssize_t bytesSent = sendto( heartbeatFd, &disconnectMsg, sizeof(disconnectMsg), 0, (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); if (bytesSent < 0) { std::cerr << __func__ << ": Failed to send disconnect message: " << strerror(errno) << std::endl; // Continue with disconnect even if message send fails } std::cout << __func__ << ": Sent disconnect message to " << discoveredDevice.ipAddr << ":" << 65000 << std::endl; // Close the heartbeat socket close(heartbeatFd); heartbeatFd = -1; callback.callbackFn(true); } std::string Device::generateClientDeviceIpFromSerialNumber( const std::string& broadcastCode ) { /** EXPLANATION: * The input string is either a serial number (14 chars) or a broadcast code * (15 chars). We need to determine which one it is and extract the serial * number from the broadcast code. * * To generate a default IP address, we use the device's subnet: X.X.X.1XX * where XX = last two digits of serial. We use the smoIp and smoSubnetNbits * to determine the network prefix. */ if (broadcastCode.empty()) { throw std::invalid_argument( std::string(__func__) + ": Broadcast code cannot be empty"); } std::string serialNumber; if (broadcastCode.length() == 14) { // Input is a serial number serialNumber = broadcastCode; } else if (broadcastCode.length() == 15) { // Input is a broadcast code (serial + selector) serialNumber = broadcastCode.substr(0, 14); } else { // Invalid length throw std::invalid_argument( std::string(__func__) + ": Broadcast code must be 14 or 15 characters long"); } // Extract last two digits of serial number if (serialNumber.length() < 2) { throw std::invalid_argument( std::string(__func__) + ": Serial number too short"); } std::string lastTwoDigits = serialNumber.substr(serialNumber.length() - 2); // Validate that last two characters are digits if (lastTwoDigits[0] < '0' || lastTwoDigits[0] > '9' || lastTwoDigits[1] < '0' || lastTwoDigits[1] > '9') { throw std::invalid_argument( std::string(__func__) + ": Last two characters of serial number must be digits"); } /** EXPLANATION: * Use the device's subnet: X.X.X.1XX where XX = last two digits of serial. * We use the smoIp and smoSubnetNbits to determine the network prefix. */ // Parse smoIp to extract network prefix auto smoIpOctets = comms::parseIPv4Address(smoIp); if (!smoIpOctets.has_value()) { throw std::invalid_argument( std::string(__func__) + ": Invalid smoIp format: must be X.X.X.X"); } // Generate subnet mask based on nbits uint32_t subnetMask = getSubnetMaskFor(smoSubnetNbits); uint32_t smoIpAddr = (std::stoi(smoIpOctets->octet1) << 24) | (std::stoi(smoIpOctets->octet2) << 16) | (std::stoi(smoIpOctets->octet3) << 8) | std::stoi(smoIpOctets->octet4); // Apply subnet mask to get network prefix uint32_t networkPrefix = smoIpAddr & subnetMask; // Extract octets from network prefix uint8_t octet1 = (networkPrefix >> 24) & 0xFF; uint8_t octet2 = (networkPrefix >> 16) & 0xFF; uint8_t octet3 = (networkPrefix >> 8) & 0xFF; // Use the first three octets and append "1" + last two digits return std::to_string(octet1) + "." + std::to_string(octet2) + "." + std::to_string(octet3) + ".1" + lastTwoDigits; } void Device::startHeartbeat() { if (!componentThread || discoveredDevice.ipAddr.empty()) { throw std::runtime_error( std::string(__func__) + ": Can't start heartbeat without component thread or IP"); } // Check if we have the handshake socket available for heartbeat use if (heartbeatFd < 0) { throw std::runtime_error( std::string(__func__) + ": Expected to find handshake socket present but didn't find it"); } // Create heartbeat timer heartbeatTimer = std::make_unique( componentThread->getIoService()); heartbeatActive.store(true); // Send first heartbeat immediately sendHeartbeat(); } void Device::sendHeartbeat() { if (!heartbeatActive.load()) { std::cerr << __func__ << ": Ending heartbeat loop due to " "heartbeatActive==false.\n"; return; } if (heartbeatFd < 0 || discoveredDevice.ipAddr.empty()) { std::cerr << __func__ << ": Ending heartbeat loop due to " "heartbeatFd==-1 or discoveredDevice.ipAddr.empty().\n"; return; } try { comms::HeartbeatMessage heartbeatMsg; heartbeatMsg.swapContentsToProtocolEndianness(); heartbeatMsg.header.setCrc16FromRawBytes(); heartbeatMsg.header.swapCrc16ToProtocolEndianness(); heartbeatMsg.footer.crc_32 = heartbeatMsg.calculateCrc32(); heartbeatMsg.footer.swapCrc32ToProtocolEndianness(); // Set up destination address for raw socket struct sockaddr_in deviceAddr; memset(&deviceAddr, 0, sizeof(deviceAddr)); deviceAddr.sin_family = AF_INET; deviceAddr.sin_addr.s_addr = inet_addr(discoveredDevice.ipAddr.c_str()); // Heartbeats and commands go to port 65000 deviceAddr.sin_port = htons(65000); ssize_t bytesSent = sendto( heartbeatFd, &heartbeatMsg, sizeof(heartbeatMsg), 0, (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); if (bytesSent < 0) { std::cerr << "[" << __func__ << "] Failed to send heartbeat: " << strerror(errno) << std::endl; return; } /** EXPLANATION: * Schedule next heartbeat in 1 second, per the spec. */ heartbeatTimer->expires_from_now(boost::posix_time::seconds(1)); heartbeatTimer->async_wait( [this](const boost::system::error_code& error) { onHeartbeatTimer(error); } ); } catch (const std::exception& e) { heartbeatActive.store(false); std::cerr << __func__ << ": Heartbeat send failed for device " << discoveredDevice.deviceIdentifier << ": " << e.what() << std::endl; } } void Device::onHeartbeatTimer(const boost::system::error_code& error) { // Timer was cancelled, heartbeat stopped if (error == boost::asio::error::operation_aborted) { return; } if (error) { heartbeatActive.store(false); std::cerr << "[" << __func__ << "] Heartbeat timer error for device " << discoveredDevice.deviceIdentifier << ": " << error.message() << std::endl; return; } // Send next heartbeat sendHeartbeat(); } uint32_t Device::getSubnetMaskFor(uint8_t nbits) { if (nbits > 32) { throw std::invalid_argument( std::string(__func__) + ": nbits must be between 0 and 32"); } // Generate subnet mask: set the first nbits to 1, rest to 0 if (nbits == 0) { return 0x00000000; } else if (nbits == 32) { return 0xFFFFFFFF; } else { // Create mask with nbits set to 1 from the left return (0xFFFFFFFF << (32 - nbits)); } } std::optional Device::detectSmoIp(const std::string& deviceIP) { /** EXPLANATION: * This function detects the SMO IP address of the interface that's facing * the device by iterating through all network interfaces and checking for * the interface that has the IP address in the same subnet as the device's * IP address. */ try { // Parse the device IP to get the network prefix auto deviceIpOctets = comms::parseIPv4Address(deviceIP); if (!deviceIpOctets.has_value()) { return std::nullopt; } // Convert device IP octets to integers for bitwise operations uint32_t deviceIpAddr = (std::stoi(deviceIpOctets->octet1) << 24) | (std::stoi(deviceIpOctets->octet2) << 16) | (std::stoi(deviceIpOctets->octet3) << 8) | std::stoi(deviceIpOctets->octet4); // Generate subnet mask based on nbits uint32_t subnetMask = getSubnetMaskFor(smoSubnetNbits); /* Get all network interfaces using getifaddrs (Linux/Unix specific) * * FIXME: Add Windows support using GetAdaptersAddresses when porting */ struct ifaddrs *ifaddr; if (getifaddrs(&ifaddr) == -1) { return std::nullopt; } // Use unique_ptr for automatic cleanup (RAII) to free ifaddrs auto ifaddr_deleter = [](struct ifaddrs* ptr) { freeifaddrs(ptr); }; std::unique_ptr ifaddr_ptr( ifaddr, ifaddr_deleter); std::string found_ip; /** EXPLANATION: * Iterate through all network interfaces and check if the IP address is * in the same subnet as the device's IP address. */ for (struct ifaddrs *ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next) { if (ifa->ifa_addr == nullptr) continue; // Check if it's IPv4 if (ifa->ifa_addr->sa_family != AF_INET) { continue; } // Get the IPv4 address struct sockaddr_in* addr_in = (struct sockaddr_in*)ifa->ifa_addr; char ip_str[INET_ADDRSTRLEN]; if (inet_ntop( AF_INET, &addr_in->sin_addr, ip_str, INET_ADDRSTRLEN) == nullptr) { continue; } std::string ip = ip_str; // Check if this IP is in the same subnet auto ipOctets = comms::parseIPv4Address(ip); if (!ipOctets.has_value()) { continue; } // Convert IP octets to integer uint32_t ipAddr = (std::stoi(ipOctets->octet1) << 24) | (std::stoi(ipOctets->octet2) << 16) | (std::stoi(ipOctets->octet3) << 8) | std::stoi(ipOctets->octet4); /* Check if this iface's IP is in the same subnet as the device's IP * using the calculated mask. Only compare the bits that are set in * the subnet mask. */ if ((ipAddr & subnetMask) == (deviceIpAddr & subnetMask)) { found_ip = ip; break; } } // Return the found IP (empty string if none found) if (!found_ip.empty()) { return found_ip; } return std::nullopt; } catch (const std::exception& e) { std::cerr << "Error detecting SMO IP: " << e.what() << std::endl; return std::nullopt; } } std::optional Device::getSmoIp(const std::string& deviceIP) { /** EXPLANATION: * This is only used when connecting to a device that's already known to * the broadcastListener. * It is NOT and SHOULD not be used when connecting by heuristic IP * construction using the client device's serial number. * * Determines the SMO listening IP address for this device. * If smoIp is provided, validate it against detected IP and return it. * If smoIp is empty, attempt auto-detection based on device IP. * Returns std::optional - empty if detection fails. */ auto detectedIp = detectSmoIp(deviceIP); if (!smoIp.empty()) { // smoIp was provided, validate it against detected IP if (detectedIp.has_value() && detectedIp.value() != smoIp) { // Print warning if provided smoIp doesn't match detected IP std::cerr << "Warning: Provided smo-ip (" << smoIp << ") doesn't match detected IP (" << detectedIp.value() << ") for device " << discoveredDevice.deviceIdentifier << " @(" << deviceIP << ")" << ". Using provided smo-ip anyway." << std::endl; } return smoIp; } if (detectedIp.has_value()) { return detectedIp.value(); } // Auto-detection failed return std::nullopt; } // Base class for both enable and disable pcloud data requests template class EnDisablePcloudDataReq : public smo::NonPostedAsynchronousContinuation { public: enum class SocketState { SOCKET_STILL_WAITING = 0, SOCKET_ERROR, SOCKET_RECV_SUCCESS, SOCKET_RECV_ERROR }; public: Device& device; // Atomic state flags for async coordination std::atomic timerFired{false}; std::atomic socketState{SocketState::SOCKET_STILL_WAITING}; std::atomic handlerExecuted{false}; // The timeout timer. boost::asio::deadline_timer timeoutTimer; /* This wrapper is just to enable us to use boost::stream_descriptor for its * convenient API when waiting for the enable/disable ACK dgram. */ boost::asio::posix::stream_descriptor cmdResponseBoostFdWrapper; // Received data storage uint8_t responseBuffer[1024]{}; ssize_t bytesReceived = -1; struct sockaddr_in senderAddr; socklen_t senderAddrLen = sizeof(senderAddr); protected: EnDisablePcloudDataReq( Device& dev, smo::Callback cb) : smo::NonPostedAsynchronousContinuation(std::move(cb)), device(dev), timeoutTimer(device.componentThread->getIoService()), cmdResponseBoostFdWrapper(device.componentThread->getIoService()) {} public: virtual ~EnDisablePcloudDataReq() { cleanup(); } // Public accessor for the original callback void callOriginalCallback(bool success) { this->callOriginalCb(success); } void callOriginalCallbackWithFailure() { /** * EXPLANATION: * We have to call cleanupCmdResponseFdBoostWrapper() here, specifically * because there are self-references within this class that need to be * cleaned up. * * The cmdResponseBoostFdWrapper holds a reference to the heartbeat * socket for async operations. When the sequence fails, we need to * break this reference to allow proper cleanup. * * Hence, we call cleanupCmdResponseFdBoostWrapper() at the point of * failure. */ cleanupCmdResponseFdBoostWrapper(); callOriginalCallback(false); } void cleanupCmdResponseFdBoostWrapper() { if (cmdResponseBoostFdWrapper.is_open()) { cmdResponseBoostFdWrapper.release(); // Don't close heartbeat socket } } protected: bool setupSocket() { // Use the existing heartbeat socket for sending commands and receiving responses if (device.heartbeatFd < 0) { std::cerr << __func__ << ": No heartbeat socket available" << std::endl; return false; } return true; } void setupAsyncCallbacks( const std::shared_ptr> &request ) { cmdResponseBoostFdWrapper.assign(device.heartbeatFd); // Setup timeout timer timeoutTimer.expires_from_now( boost::posix_time::milliseconds(device.handshakeTimeoutMs)); timeoutTimer.async_wait( std::bind( &EnDisablePcloudDataReq::enDisablePcloudDataReq1_1, this, request, std::placeholders::_1)); // Setup async wait for read-ready cmdResponseBoostFdWrapper.async_wait( boost::asio::posix::stream_descriptor::wait_read, std::bind( &EnDisablePcloudDataReq::enDisablePcloudDataReq1_2, this, request, std::placeholders::_1)); } void enDisablePcloudDataReq1_1( std::shared_ptr> context, const boost::system::error_code& error ) { (void)error; // Suppress unused parameter warning context->timerFired = true; context->enDisablePcloudDataReq2(context); } void enDisablePcloudDataReq1_2( std::shared_ptr> context, const boost::system::error_code& error ) { if (!error) { // Data is available for reading, perform the actual read context->bytesReceived = recvfrom( context->device.heartbeatFd, context->responseBuffer, sizeof(context->responseBuffer), 0, (struct sockaddr*)&context->senderAddr, &context->senderAddrLen); if (context->bytesReceived > 0) { context->socketState = SocketState::SOCKET_RECV_SUCCESS; } else { context->socketState = SocketState::SOCKET_RECV_ERROR; } } else { context->socketState = SocketState::SOCKET_RECV_ERROR; } context->enDisablePcloudDataReq2(context); } void enDisablePcloudDataReq2( std::shared_ptr> context ) { // Only execute once if (context->handlerExecuted.exchange(true)) { return; } SocketState finalSocketState = context->socketState.load(); bool finalTimerFired = context->timerFired.load(); context->timeoutTimer.cancel(); if (finalTimerFired && finalSocketState == SocketState::SOCKET_STILL_WAITING) { std::cerr << __func__ << ": Command timeout for device " << context->device.discoveredDevice.deviceIdentifier << std::endl; context->callOriginalCallbackWithFailure(); return; } if (finalSocketState == SocketState::SOCKET_ERROR) { std::cerr << __func__ << ": Socket error during command for device " << context->device.discoveredDevice.deviceIdentifier << std::endl; context->callOriginalCallbackWithFailure(); return; } if (finalSocketState == SocketState::SOCKET_RECV_ERROR) { std::cerr << __func__ << ": Receive error during command for device " << context->device.discoveredDevice.deviceIdentifier << std::endl; context->callOriginalCallbackWithFailure(); return; } // Result must have been RECV_SUCCESS state if we reach here if (context->bytesReceived < (ssize_t)sizeof(livoxProto1::comms::SamplingResponse)) { std::cerr << __func__ << ": Response of size " << context->bytesReceived << " is too small for sampling response (expected " << sizeof(livoxProto1::comms::SamplingResponse) << ")" << std::endl; context->callOriginalCallbackWithFailure(); return; } // Parse response using protocol structure livoxProto1::comms::SamplingResponse* response = reinterpret_cast( context->responseBuffer); response->swapContentsToHostEndianness(); if (!response->sanityCheck()) { std::cerr << __func__ << ": Invalid sampling response structure.\n"; context->callOriginalCallbackWithFailure(); return; } // Check if response indicates success if (response->command.cmd_set == 0x00 && response->command.cmd_id == 0x04 && response->ret_code == 0x00) { // Set the appropriate pcloud data active state based on command type context->setPcloudDataActiveState(); context->callOriginalCallback(true); return; } // If we get here, the command failed context->callOriginalCallbackWithFailure(); } void cleanup() { timeoutTimer.cancel(); cleanupCmdResponseFdBoostWrapper(); } // Pure virtual methods that derived classes must implement virtual uint8_t getEnableFlag() const = 0; virtual const char* getCommandName() const = 0; virtual void setPcloudDataActiveState() = 0; // Common sendCommand implementation bool sendCommand() { // Create start/stop sampling message using protocol structure livoxProto1::comms::StartStopSamplingMessage message; // Set enable flag based on derived class implementation message.enable = getEnableFlag(); // Calculate and set CRC32 message.footer.crc_32 = message.calculateCrc32(); message.swapContentsToProtocolEndianness(); struct sockaddr_in deviceAddr; memset(&deviceAddr, 0, sizeof(deviceAddr)); deviceAddr.sin_family = AF_INET; deviceAddr.sin_addr.s_addr = inet_addr(device.discoveredDevice.ipAddr.c_str()); deviceAddr.sin_port = htons(65000); // Command port // Send command directly (synchronous) ssize_t bytesSent = sendto( device.heartbeatFd, &message, sizeof(message), 0, (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); if (bytesSent < 0) { std::cerr << __func__ << ": Failed to send " << getCommandName() << " command: " << strerror(errno) << std::endl; return false; } return true; } }; class Device::EnablePcloudDataReq : public EnDisablePcloudDataReq { public: friend void Device::enablePcloudDataReq( smo::Callback callback); EnablePcloudDataReq( Device& dev, smo::Callback cb) : EnDisablePcloudDataReq(dev, std::move(cb)) {} ~EnablePcloudDataReq() { cleanup(); } private: uint8_t getEnableFlag() const override { return 0x01; // Start sampling } const char* getCommandName() const override { return "enable pcloud data"; } void setPcloudDataActiveState() override { device.pcloudDataActive.store(true); } }; class Device::DisablePcloudDataReq : public EnDisablePcloudDataReq { public: friend void Device::disablePcloudDataReq( smo::Callback callback); DisablePcloudDataReq( Device& dev, smo::Callback cb) : EnDisablePcloudDataReq(dev, std::move(cb)) {} ~DisablePcloudDataReq() { cleanup(); } private: uint8_t getEnableFlag() const override { return 0x00; // Stop sampling } const char* getCommandName() const override { return "disable pcloud data"; } void setPcloudDataActiveState() override { device.pcloudDataActive.store(false); } }; void Device::enablePcloudDataReq( smo::Callback callback ) { auto request = std::make_shared( *this, std::move(callback)); // Check if heartbeat socket is available if (heartbeatFd < 0) { std::cerr << __func__ << ": No heartbeat socket available for device " << discoveredDevice.deviceIdentifier << std::endl; request->callOriginalCallbackWithFailure(); return; } // Setup socket for async operations if (!request->setupSocket()) { request->callOriginalCallbackWithFailure(); return; } // Set up the point cloud data socket for actual data reception if (!setupPcloudDataSocket()) { std::cerr << __func__ << ": Failed to set up point cloud data socket" << std::endl; // Don't fail the command, but log the issue } // Send the start sampling command if (!request->sendCommand()) { request->callOriginalCallbackWithFailure(); return; } // Setup async callbacks request->setupAsyncCallbacks(request); } void Device::disablePcloudDataReq( smo::Callback callback ) { auto request = std::make_shared( *this, std::move(callback)); // Check if heartbeat socket is available if (heartbeatFd < 0) { std::cerr << __func__ << ": No heartbeat socket available for device " << discoveredDevice.deviceIdentifier << std::endl; request->callOriginalCallbackWithFailure(); return; } /* Unconditionally close the pcloud data socket early since there's no good * reason to only close it if the command packet succeeds and ACKs. We want * to stop receiving data immediately when disable is requested. */ cleanupPcloudDataSocket(); // Setup socket for async operations if (!request->setupSocket()) { request->callOriginalCallbackWithFailure(); return; } // Send the stop sampling command if (!request->sendCommand()) { request->callOriginalCallbackWithFailure(); return; } // Setup async callbacks request->setupAsyncCallbacks(request); } bool Device::setupPcloudDataSocket() { // RAII class to manage socket file descriptor struct SocketRAII { int fd; SocketRAII(int socketFd) : fd(socketFd) {} ~SocketRAII() { if (fd >= 0) close(fd); } void commit() { fd = -1; } // Transfer ownership, prevent close int getFd() const { return fd; } bool isValid() const { return fd >= 0; } }; // Create UDP socket for point cloud data reception SocketRAII socketGuard(socket(AF_INET, SOCK_DGRAM, 0)); if (!socketGuard.isValid()) { std::cerr << __func__ << ": Failed to create socket: " << strerror(errno) << std::endl; return false; } // Set socket to non-blocking mode int flags = fcntl(socketGuard.getFd(), F_GETFL, 0); if (flags < 0 || fcntl(socketGuard.getFd(), F_SETFL, flags | O_NONBLOCK) < 0) { std::cerr << __func__ << ": Failed to set non-blocking mode: " << strerror(errno) << std::endl; return false; } // Bind to the data port (65001) struct sockaddr_in localAddr; memset(&localAddr, 0, sizeof(localAddr)); localAddr.sin_family = AF_INET; localAddr.sin_addr.s_addr = INADDR_ANY; localAddr.sin_port = htons(65001); // Data port if (bind( socketGuard.getFd(), (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) { std::cerr << __func__ << ": Failed to bind to data port: " << strerror(errno) << std::endl; return false; } // Create boost wrapper for async operations pcloudDataSocketDesc = std::make_unique( componentThread->getIoService(), socketGuard.getFd()); pcloudDataFd = socketGuard.getFd(); // Transfer ownership, prevent auto-close socketGuard.commit(); return true; } void Device::cleanupPcloudDataSocket() { if (pcloudDataSocketDesc) { pcloudDataSocketDesc->cancel(); pcloudDataSocketDesc.reset(); } if (pcloudDataFd >= 0) { close(pcloudDataFd); pcloudDataFd = -1; } pcloudDataActive.store(false); } } // namespace livoxProto1