Files
salmanoff/commonLibs/livoxProto1/device.cpp
T
hayodea d2bf5aceee livoxProto1: detectSmoIp should be based on target dev IP
We previously used the smoIp provided by the user, but this
function is intended to enable us to figure out the correct
IP to send to the target device in the Handshake message.
2025-09-06 22:46:03 -04:00

618 lines
17 KiB
C++

#include <sstream>
#include <thread>
#include <chrono>
#include <string>
#include <stdexcept>
#include <memory>
#include <unistd.h>
#include <ifaddrs.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <boost/asio.hpp>
#include "device.h"
#include "protocol.h"
#include "core.h"
namespace livoxProto1 {
namespace comms {
// DiscoveredDevice constructors
DiscoveredDevice::DiscoveredDevice(
const std::string &deviceIdentifier,
DeviceType deviceType,
const std::string &ipAddr)
: deviceIdentifier(deviceIdentifier),
deviceType(deviceType),
ipAddr(ipAddr)
{
}
DiscoveredDevice::DiscoveredDevice(
const BroadcastMessage &msg, const std::string &ipAddr
)
: DiscoveredDevice(
reinterpret_cast<const char*>(msg.broadcast_code),
static_cast<DeviceType>(msg.dev_type),
ipAddr)
{
}
std::string DiscoveredDevice::stringify(void) const
{
std::ostringstream oss;
oss << "DiscoveredDevice{"
<< "identifier='" << deviceIdentifier << "', "
<< "ipAddr='" << ipAddr << "', "
<< "deviceType=" << (int)deviceType << " (" << getDeviceTypeName() << ")"
<< "}";
return oss.str();
}
std::string DiscoveredDevice::getDeviceTypeName(void) const
{
switch (deviceType)
{
case DeviceType::Hub: return "Hub";
case DeviceType::Mid40: return "Mid-40";
case DeviceType::Tele15: return "Tele-15";
case DeviceType::Horizon: return "Horizon";
case DeviceType::Mid70: return "Mid-70";
case DeviceType::Avia: return "Avia";
default: return "Unknown";
}
}
} // namespace comms
// Device implementation
Device::Device(const std::string &deviceIdentifier,
const std::shared_ptr<smo::ComponentThread>& componentThread,
int handshakeTimeoutMs, int retryDelayMs,
const std::string& smoIp, uint8_t smoSubnetNbits,
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort)
: discoveredDevice(
deviceIdentifier, comms::DeviceType::Mid40,
// Initialize empty. IP will be set upon successful connection.
""),
componentThread(componentThread),
handshakeTimeoutMs(handshakeTimeoutMs), retryDelayMs(retryDelayMs),
smoIp(smoIp), smoSubnetNbits(smoSubnetNbits),
dataPort(dataPort), cmdPort(cmdPort), imuPort(imuPort),
heartbeatActive(false)
{
connect();
}
Device::~Device()
{
// Stop heartbeat if active
if (heartbeatActive.load()) {
heartbeatActive.store(false);
if (heartbeatTimer) {
heartbeatTimer->cancel();
}
}
// Clean up heartbeat resources
heartbeatTimer.reset();
heartbeatSocket.reset();
}
void Device::connect()
{
/** EXPLANATION:
* First check the broadcastListener to see if the device is already known.
* * If it is, return the DiscoveredDevice..
* If it is not, attempt to connect to the device by assuming that its IP
* address is the same as the last 2 octets of the deviceIdentifier.
* * If the connection is successful, return the DiscoveredDevice.
* If the connection is not successful, delay by retryDelayMs and check
* the broadcastListener again.
* * If the connection is successful return the DiscoveredDevice.
* If the connection is not successful, throw exception?
*
* If the connection is successful at any point, also set up the heartbeat
* pulse signal to be sent periodically by us to the device over the wire.
*/
// Try connecting to known device first
if (connectToKnownDevice()) {
startHeartbeat();
return;
}
// Try direct connect by device identifier
if (connectByDeviceIdentifier()) {
startHeartbeat();
return;
}
// Wait retry delay, then try known device again
std::this_thread::sleep_for(std::chrono::milliseconds(retryDelayMs));
if (connectToKnownDevice()) {
startHeartbeat();
return;
}
// All connection attempts failed
throw std::runtime_error(
std::string(__func__) + ": Failed to connect to device: "
+ discoveredDevice.deviceIdentifier);
}
bool Device::connectToKnownDevice()
{
// Get the global DeviceManager instance
auto& protoState = livoxProto1::getProtoState();
if (!protoState.deviceManager)
{
throw std::runtime_error(
std::string(__func__)
+ ": DeviceManager is not initialized in connectToKnownDevice()");
}
// Check if the device is known to the broadcastListener
if (!protoState.deviceManager->broadcastListener.deviceExists(
discoveredDevice.deviceIdentifier))
{
return false;
}
// Get the device info from broadcastListener
auto deviceInfo = protoState.deviceManager->broadcastListener.getDevice(
discoveredDevice.deviceIdentifier);
if (!deviceInfo)
{ return false; }
// Use the IP address from the broadcast message
std::string deviceIP = deviceInfo->ipAddr;
// Execute handshake with the known device
bool success = executeHandshake(
deviceIP, handshakeTimeoutMs, dataPort, cmdPort, imuPort);
// If successful, update our device's IP address with the one from broadcast
if (success) {
discoveredDevice.ipAddr = deviceInfo->ipAddr;
}
return success;
}
bool Device::connectByDeviceIdentifier()
{
std::string deviceIP = generateClientDeviceIpFromSerialNumber(
discoveredDevice.deviceIdentifier);
bool success = executeHandshake(
deviceIP, handshakeTimeoutMs, dataPort, cmdPort, imuPort);
// If successful, store the calculated IP address
if (success) {
discoveredDevice.ipAddr = deviceIP;
}
return success;
}
bool Device::executeHandshake(
const std::string& deviceIP, int timeoutMs,
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort
)
{
try {
// Create boost::asio UDP socket
boost::asio::io_context io_context;
boost::asio::ip::udp::socket socket(io_context);
socket.open(boost::asio::ip::udp::v4());
// Get the IP addr of the SMO machine's iface that is facing the device.
std::string smoIp = getSmoIp(deviceIP);
comms::HandshakeRequest handshakeReq(smoIp, dataPort, cmdPort, imuPort);
handshakeReq.swapContentsToProtocolEndianness();
handshakeReq.header.setCrc16FromRawBytes();
handshakeReq.header.swapCrc16ToProtocolEndianness();
handshakeReq.footer.crc_32 = handshakeReq.calculateCrc32();
handshakeReq.footer.swapCrc32ToProtocolEndianness();
boost::asio::ip::udp::endpoint deviceEndpoint(
boost::asio::ip::address::from_string(deviceIP), 65000);
socket.send_to(
boost::asio::buffer(&handshakeReq, sizeof(handshakeReq)),
deviceEndpoint);
std::cout << __func__ << ": Sent handshake request to "
<< deviceIP << ":65000" << std::endl;
// Wait for response with timeout using deadline_timer
boost::asio::deadline_timer timer(io_context);
timer.expires_from_now(boost::posix_time::milliseconds(timeoutMs));
uint8_t responseBuffer[1024];
boost::asio::ip::udp::endpoint senderEndpoint;
std::atomic<bool> timeoutOccurred{false};
timer.async_wait(
[&timeoutOccurred](const boost::system::error_code& ec) {
if (!ec) { timeoutOccurred.store(true); }
}
);
size_t bytesReceived = 0;
boost::system::error_code receiveError;
socket.async_receive_from(
boost::asio::buffer(responseBuffer, sizeof(responseBuffer)),
senderEndpoint,
[&bytesReceived, &receiveError](
const boost::system::error_code& ec, size_t bytes)
{
bytesReceived = bytes;
receiveError = ec;
}
);
while (!timeoutOccurred.load() && !receiveError && bytesReceived == 0) {
io_context.run_one();
}
timer.cancel();
if (timeoutOccurred.load())
{
std::cerr << __func__ << ": Handshake timeout with " << deviceIP
<< ":" << deviceEndpoint << std::endl;
return false;
}
if (receiveError)
{
std::cerr << __func__ << ": Handshake error with " << deviceIP
<< ": " << receiveError.message() << ":" << deviceEndpoint
<< std::endl;
return false;
}
if (bytesReceived < sizeof(comms::HandshakeResponse))
{
std::cerr << __func__ << ": Handshake failed - response too small from "
<< deviceIP << ":" << deviceEndpoint << std::endl;
return false;
}
// Parse response as complete frame
comms::HandshakeResponse* resp = reinterpret_cast<
comms::HandshakeResponse*
>(responseBuffer);
// Following the clean receiving flow:
// 1. Swap CRC32 to host endianness first
resp->footer.swapCrc32ToHostEndianness();
// 2. Validate CRC32 (on whole message excluding footer CRC32 field)
if (!resp->validateCrc32())
{
std::cerr << __func__ << ": Handshake failed - CRC32 validation "
"failed from " << deviceIP << ":" << deviceEndpoint
<< std::endl;
return false;
}
// 3. Swap CRC16 to host endianness
resp->header.swapCrc16ToHostEndianness();
// 4. Validate CRC16 (on header only)
if (!resp->header.validateCrc16())
{
std::cerr << __func__ << ": Handshake failed - CRC16 validation "
"failed from " << deviceIP << ":" << deviceEndpoint
<< std::endl;
return false;
}
// 5. Swap content to host endianness
resp->swapContentsToHostEndianness();
if (!resp->sanityCheck() || resp->ret_code != 0x00)
{
std::cerr << __func__ << ": Handshake failed - invalid response from "
<< deviceIP << ":" << deviceEndpoint
<< std::endl;
return false;
}
std::cout << __func__ << ": Handshake successful with " << deviceIP
<< ":" << deviceEndpoint << std::endl;
return true;
} catch (const std::exception& e) {
std::cerr << __func__ << ": Handshake failed with " << deviceIP << ": "
<< e.what() << std::endl;
}
return false;
}
std::string Device::generateClientDeviceIpFromSerialNumber(
const std::string& broadcastCode
)
{
// Determine if input is serial number (14 chars) or broadcast code (15 chars)
if (broadcastCode.empty())
{
throw std::invalid_argument(
std::string(__func__) + ": Broadcast code cannot be empty");
}
std::string serialNumber;
if (broadcastCode.length() == 14)
{
// Input is a serial number
serialNumber = broadcastCode;
} else if (broadcastCode.length() == 15)
{
// Input is a broadcast code (serial + selector)
serialNumber = broadcastCode.substr(0, 14);
} else
{
// Invalid length
throw std::invalid_argument(
std::string(__func__) +
": Broadcast code must be 14 or 15 characters long");
}
// Extract last two digits of serial number
if (serialNumber.length() < 2)
{
throw std::invalid_argument(
std::string(__func__) + ": Serial number too short");
}
std::string lastTwoDigits = serialNumber.substr(serialNumber.length() - 2);
// Validate that last two characters are digits
if (lastTwoDigits[0] < '0' || lastTwoDigits[0] > '9' ||
lastTwoDigits[1] < '0' || lastTwoDigits[1] > '9')
{
throw std::invalid_argument(
std::string(__func__) +
": Last two characters of serial number must be digits");
}
/** EXPLANATION:
* Use the device's subnet: X.X.X.1XX where XX = last two digits of serial.
* We use the smoIp and smoSubnetNbits to determine the network prefix.
*/
// Parse smoIp to extract network prefix
auto smoIpOctets = comms::parseIPv4Address(smoIp);
if (!smoIpOctets.has_value()) {
throw std::invalid_argument(
std::string(__func__) + ": Invalid smoIp format: must be X.X.X.X");
}
// Generate subnet mask based on nbits
uint32_t subnetMask = getSubnetMaskFor(smoSubnetNbits);
// Convert smoIp to uint32_t for bitwise operations
uint32_t smoIpAddr = (std::stoi(smoIpOctets->octet1) << 24) |
(std::stoi(smoIpOctets->octet2) << 16) |
(std::stoi(smoIpOctets->octet3) << 8) |
std::stoi(smoIpOctets->octet4);
// Apply subnet mask to get network prefix
uint32_t networkPrefix = smoIpAddr & subnetMask;
// Extract octets from network prefix
uint8_t octet1 = (networkPrefix >> 24) & 0xFF;
uint8_t octet2 = (networkPrefix >> 16) & 0xFF;
uint8_t octet3 = (networkPrefix >> 8) & 0xFF;
// Use the first three octets and append "1" + last two digits
return std::to_string(octet1) + "." + std::to_string(octet2) + "." +
std::to_string(octet3) + ".1" + lastTwoDigits;
}
void Device::startHeartbeat()
{
if (!componentThread || discoveredDevice.ipAddr.empty()) {
return; // Can't start heartbeat without component thread or IP
}
// Create heartbeat socket using the component thread's io_service
heartbeatSocket = std::make_unique<boost::asio::ip::udp::socket>(
componentThread->getIoService());
heartbeatSocket->open(boost::asio::ip::udp::v4());
// Create heartbeat timer
heartbeatTimer = std::make_unique<boost::asio::deadline_timer>(
componentThread->getIoService());
heartbeatActive.store(true);
// Send first heartbeat immediately
sendHeartbeat();
}
void Device::sendHeartbeat()
{
if (!heartbeatActive.load() || !heartbeatSocket
|| discoveredDevice.ipAddr.empty())
{
return;
}
try {
// Create heartbeat message using the new HeartbeatMessage type
comms::HeartbeatMessage heartbeatMsg;
heartbeatMsg.swapContentsToProtocolEndianness();
heartbeatMsg.header.setCrc16FromRawBytes();
heartbeatMsg.header.swapCrc16ToProtocolEndianness();
heartbeatMsg.footer.crc_32 = heartbeatMsg.calculateCrc32();
heartbeatMsg.footer.swapCrc32ToProtocolEndianness();
// Send the heartbeat packet
boost::asio::ip::udp::endpoint deviceEndpoint(
boost::asio::ip::address::from_string(discoveredDevice.ipAddr), cmdPort);
heartbeatSocket->send_to(
boost::asio::buffer(&heartbeatMsg, sizeof(heartbeatMsg)),
deviceEndpoint);
// Schedule next heartbeat in 1 second
heartbeatTimer->expires_from_now(boost::posix_time::seconds(1));
heartbeatTimer->async_wait(
[this](const boost::system::error_code& error) {
onHeartbeatTimer(error);
}
);
}
catch (const std::exception& e)
{
heartbeatActive.store(false);
std::cerr << "[" << __func__ << "] Heartbeat send failed for device "
<< discoveredDevice.deviceIdentifier
<< ": " << e.what() << std::endl;
}
}
void Device::onHeartbeatTimer(const boost::system::error_code& error)
{
// Timer was cancelled, heartbeat stopped
if (error == boost::asio::error::operation_aborted) {
return;
}
if (error)
{
heartbeatActive.store(false);
std::cerr << "[" << __func__ << "] Heartbeat timer error for device "
<< discoveredDevice.deviceIdentifier
<< ": " << error.message() << std::endl;
return;
}
// Send next heartbeat
sendHeartbeat();
}
uint32_t Device::getSubnetMaskFor(uint8_t nbits)
{
if (nbits > 32) {
throw std::invalid_argument(
std::string(__func__) + ": nbits must be between 0 and 32");
}
// Generate subnet mask: set the first nbits to 1, rest to 0
if (nbits == 0) {
return 0x00000000;
} else if (nbits == 32) {
return 0xFFFFFFFF;
} else {
// Create mask with nbits set to 1 from the left
return (0xFFFFFFFF << (32 - nbits));
}
}
std::optional<std::string> Device::detectSmoIp(const std::string& deviceIP)
{
/** EXPLANATION:
* This function detects the SMO IP address of the interface that's facing
* the device by iterating through all network interfaces and checking for
* the interface that has the IP address in the same subnet as the device's
* IP address.
*/
try {
// Parse the device IP to get the network prefix
auto deviceIpOctets = comms::parseIPv4Address(deviceIP);
if (!deviceIpOctets.has_value()) {
return std::nullopt;
}
// Convert device IP octets to integers for bitwise operations
uint32_t deviceIpAddr = (std::stoi(deviceIpOctets->octet1) << 24) |
(std::stoi(deviceIpOctets->octet2) << 16) |
(std::stoi(deviceIpOctets->octet3) << 8) |
std::stoi(deviceIpOctets->octet4);
// Generate subnet mask based on nbits
uint32_t subnetMask = getSubnetMaskFor(smoSubnetNbits);
/* Get all network interfaces using getifaddrs (Linux/Unix specific)
*
* FIXME: Add Windows support using GetAdaptersAddresses when porting
*/
struct ifaddrs *ifaddr;
if (getifaddrs(&ifaddr) == -1) {
return std::nullopt;
}
// Use unique_ptr for automatic cleanup (RAII) to free ifaddrs
auto ifaddr_deleter = [](struct ifaddrs* ptr) { freeifaddrs(ptr); };
std::unique_ptr<struct ifaddrs, decltype(ifaddr_deleter)> ifaddr_ptr(
ifaddr, ifaddr_deleter);
std::string found_ip;
// Iterate through all network interfaces
for (struct ifaddrs *ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next)
{
if (ifa->ifa_addr == nullptr) continue;
// Check if it's IPv4
if (ifa->ifa_addr->sa_family != AF_INET) { continue; }
// Get the IPv4 address
struct sockaddr_in* addr_in = (struct sockaddr_in*)ifa->ifa_addr;
char ip_str[INET_ADDRSTRLEN];
if (inet_ntop(
AF_INET, &addr_in->sin_addr, ip_str, INET_ADDRSTRLEN)
== nullptr)
{
continue;
}
std::string ip = ip_str;
// Check if this IP is in the same subnet
auto ipOctets = comms::parseIPv4Address(ip);
if (!ipOctets.has_value()) { continue; }
// Convert IP octets to integer
uint32_t ipAddr = (std::stoi(ipOctets->octet1) << 24) |
(std::stoi(ipOctets->octet2) << 16) |
(std::stoi(ipOctets->octet3) << 8) |
std::stoi(ipOctets->octet4);
/* Check if this iface's IP is in the same subnet as the device's IP
* using the calculated mask. Only compare the bits that are set in
* the subnet mask.
*/
if ((ipAddr & subnetMask) == (deviceIpAddr & subnetMask))
{
found_ip = ip;
break; // Exit loop, let unique_ptr handle cleanup
}
}
// Return the found IP (empty string if none found)
if (!found_ip.empty()) {
return found_ip;
}
return std::nullopt;
} catch (const std::exception& e) {
std::cerr << "Error detecting SMO IP: " << e.what() << std::endl;
return std::nullopt;
}
}
std::string Device::getSmoIp(const std::string& deviceIP)
{
// If smo-ip was provided, return it
if (!smoIp.empty()) {
return smoIp;
}
auto detectedIp = detectSmoIp(deviceIP);
if (detectedIp.has_value()) {
return detectedIp.value();
}
// If detection failed, throw an exception
throw std::runtime_error(
std::string(__func__) + ": Failed to detect SMO IP address for device "
+ deviceIP + " with subnet mask /" + std::to_string(smoSubnetNbits));
}
} // namespace livoxProto1