Files
salmanoff/commonLibs/livoxProto1/device.cpp
T
hayodea f587b45b38 livoxProto1: Connecting to bcast-advertised device works :)
We tested it.
It's important to note that between test runs, we need to take
into account the fact that the Avia stops sending bcast adverts
when it's been handshaken.

So the retry-delay-ms may be longer due to the fact that the Avia
may not be sending adverts for a good portion of that retry-delay-ms
time.
2025-09-07 11:42:32 -04:00

695 lines
20 KiB
C++

#include <sstream>
#include <thread>
#include <chrono>
#include <string>
#include <stdexcept>
#include <memory>
#include <unistd.h>
#include <ifaddrs.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <errno.h>
#include <cstring>
#include <netinet/in.h>
#include <boost/asio.hpp>
#include "device.h"
#include "protocol.h"
#include "core.h"
namespace livoxProto1 {
namespace comms {
DiscoveredDevice::DiscoveredDevice(
const std::string &deviceIdentifier,
DeviceType deviceType,
const std::string &ipAddr)
: deviceIdentifier(deviceIdentifier),
deviceType(deviceType),
ipAddr(ipAddr)
{
}
DiscoveredDevice::DiscoveredDevice(
const BroadcastMessage &msg, const std::string &ipAddr
)
: DiscoveredDevice(
reinterpret_cast<const char*>(msg.broadcast_code),
static_cast<DeviceType>(msg.dev_type),
ipAddr)
{
}
std::string DiscoveredDevice::stringify(void) const
{
std::ostringstream oss;
oss << "DiscoveredDevice{"
<< "identifier='" << deviceIdentifier << "', "
<< "ipAddr='" << ipAddr << "', "
<< "deviceType=" << (int)deviceType << " (" << getDeviceTypeName() << ")"
<< "}";
return oss.str();
}
std::string DiscoveredDevice::getDeviceTypeName(void) const
{
switch (deviceType)
{
case DeviceType::Hub: return "Hub";
case DeviceType::Mid40: return "Mid-40";
case DeviceType::Tele15: return "Tele-15";
case DeviceType::Horizon: return "Horizon";
case DeviceType::Mid70: return "Mid-70";
case DeviceType::Avia: return "Avia";
default: return "Unknown";
}
}
} // namespace comms
Device::Device(const std::string &deviceIdentifier,
const std::shared_ptr<smo::ComponentThread>& componentThread,
int handshakeTimeoutMs, int retryDelayMs,
const std::string& smoIp, uint8_t smoSubnetNbits,
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort)
: discoveredDevice(
deviceIdentifier, comms::DeviceType::Mid40,
// Initialize empty. IP will be set upon successful connection.
""),
componentThread(componentThread),
handshakeTimeoutMs(handshakeTimeoutMs), retryDelayMs(retryDelayMs),
smoIp(smoIp), smoSubnetNbits(smoSubnetNbits),
dataPort(dataPort), cmdPort(cmdPort), imuPort(imuPort),
heartbeatSocketFd(-1),
heartbeatActive(false)
{
connect();
}
Device::~Device()
{
if (heartbeatActive.load()) {
heartbeatActive.store(false);
if (heartbeatTimer) {
heartbeatTimer->cancel();
}
}
heartbeatTimer.reset();
if (heartbeatSocketFd >= 0) {
close(heartbeatSocketFd);
heartbeatSocketFd = -1;
}
}
void Device::connect()
{
/** EXPLANATION:
* First check the broadcastListener to see if the device is already known.
* * If it is, return the DiscoveredDevice..
* If it is not, attempt to connect to the device by assuming that its IP
* address is the same as the last 2 octets of the deviceIdentifier.
* * If the connection is successful, return the DiscoveredDevice.
* If the connection is not successful, delay by retryDelayMs and check
* the broadcastListener again.
* * If the connection is successful return the DiscoveredDevice.
* If the connection is not successful, throw exception?
*
* If the connection is successful at any point, also set up the heartbeat
* pulse signal to be sent periodically by us to the device over the wire.
*/
// Try connecting to known device first
if (connectToKnownDevice()) {
startHeartbeat();
return;
}
// Try direct connect by device identifier
if (connectByDeviceIdentifier()) {
startHeartbeat();
return;
}
/** FIXME:
* This won't work unless we return control to the ComponentThread's run()
* loop during the retry-delay time so that the ComponentThread can actually
* run the BroadcastListener's recv() calls and actually receive the
* device's broadcast messages.
* Think about ways to perhaps make all of this asynchronous. The main issue
* is that this whole sequence is synchronous.
*/
// Wait retry delay, then try known device again
std::this_thread::sleep_for(std::chrono::milliseconds(retryDelayMs));
if (connectToKnownDevice()) {
startHeartbeat();
return;
}
// All connection attempts failed
throw std::runtime_error(
std::string(__func__) + ": Failed to connect to device: "
+ discoveredDevice.deviceIdentifier);
}
bool Device::connectToKnownDevice()
{
// Get the global DeviceManager instance
auto& protoState = livoxProto1::getProtoState();
if (!protoState.deviceManager)
{
throw std::runtime_error(
std::string(__func__)
+ ": DeviceManager is not initialized in connectToKnownDevice()");
}
// Check if the device is known to the broadcastListener
if (!protoState.deviceManager->broadcastListener.deviceExists(
discoveredDevice.deviceIdentifier))
{
return false;
}
// Get the device info from broadcastListener
auto deviceInfo = protoState.deviceManager->broadcastListener.getDevice(
discoveredDevice.deviceIdentifier);
if (!deviceInfo)
{ return false; }
// Use the IP address from the broadcast message
std::string deviceIP = deviceInfo->ipAddr;
// Execute handshake with the known device
bool success = executeHandshake(
deviceIP, handshakeTimeoutMs, dataPort, cmdPort, imuPort);
// If successful, update our device's IP address with the one from broadcast
if (success) {
discoveredDevice.ipAddr = deviceInfo->ipAddr;
}
return success;
}
bool Device::connectByDeviceIdentifier()
{
std::string deviceIP = generateClientDeviceIpFromSerialNumber(
discoveredDevice.deviceIdentifier);
bool success = executeHandshake(
deviceIP, handshakeTimeoutMs, dataPort, cmdPort, imuPort);
// If successful, store the calculated IP address
if (success) {
discoveredDevice.ipAddr = deviceIP;
}
return success;
}
bool Device::executeHandshake(
const std::string& deviceIP, int timeoutMs,
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort
)
{
try {
/** EXPLANATION:
* This io_context queue here will allow us to async-bridge the
* handshake request and response without having to worry about
* re-enqueueing messages, as we would have to do if we used the
* io_context of a ComponentThread.
*/
boost::asio::io_context io_context;
boost::asio::ip::udp::socket socket(io_context);
socket.open(boost::asio::ip::udp::v4());
// Bind socket to cmdPort so we can receive the handshake response
boost::asio::ip::udp::endpoint localEndpoint(
boost::asio::ip::address_v4::any(), cmdPort);
socket.bind(localEndpoint);
std::cout << __func__ << ": Bound socket to local port " << cmdPort
<< " for handshake response" << std::endl;
// Get the IP addr of the SMO machine's iface that is facing the device.
std::string smoIp = getSmoIp(deviceIP);
comms::HandshakeRequest handshakeReq(smoIp, dataPort, cmdPort, imuPort);
handshakeReq.swapContentsToProtocolEndianness();
handshakeReq.header.setCrc16FromRawBytes();
handshakeReq.header.swapCrc16ToProtocolEndianness();
handshakeReq.footer.crc_32 = handshakeReq.calculateCrc32();
handshakeReq.footer.swapCrc32ToProtocolEndianness();
boost::asio::ip::udp::endpoint deviceEndpoint(
boost::asio::ip::address::from_string(deviceIP), 65000);
socket.send_to(
boost::asio::buffer(&handshakeReq, sizeof(handshakeReq)),
deviceEndpoint);
std::cout << __func__ << ": Sent handshake request to "
<< deviceIP << ":65000" << std::endl;
// Wait for response with timeout using deadline_timer
boost::asio::deadline_timer timer(io_context);
timer.expires_from_now(boost::posix_time::milliseconds(timeoutMs));
uint8_t responseBuffer[1024];
boost::asio::ip::udp::endpoint senderEndpoint;
std::atomic<bool> timeoutOccurred{false};
timer.async_wait(
[&timeoutOccurred](const boost::system::error_code& ec) {
if (!ec) { timeoutOccurred.store(true); }
}
);
size_t bytesReceived = 0;
boost::system::error_code receiveError;
socket.async_receive_from(
boost::asio::buffer(responseBuffer, sizeof(responseBuffer)),
senderEndpoint,
[&bytesReceived, &receiveError](
const boost::system::error_code& ec, size_t bytes)
{
bytesReceived = bytes;
receiveError = ec;
}
);
while (!timeoutOccurred.load() && !receiveError && bytesReceived == 0) {
io_context.run_one();
}
timer.cancel();
if (timeoutOccurred.load())
{
std::cerr << __func__ << ": Handshake timeout with " << deviceIP
<< "(" << deviceEndpoint << ")" << std::endl;
return false;
}
if (receiveError)
{
std::cerr << __func__ << ": Handshake error with " << deviceIP
<< ": " << receiveError.message() << "(" << deviceEndpoint << ")"
<< std::endl;
return false;
}
if (bytesReceived < sizeof(comms::HandshakeResponse))
{
std::cerr << __func__ << ": Handshake failed - response too small from "
<< deviceIP << "(" << deviceEndpoint << ")" << std::endl;
return false;
}
// Parse response as complete frame
comms::HandshakeResponse* resp = reinterpret_cast<
comms::HandshakeResponse*
>(responseBuffer);
// Following the clean receiving flow:
// 1. Swap CRC32 to host endianness first
resp->footer.swapCrc32ToHostEndianness();
// 2. Validate CRC32 (on whole message excluding footer CRC32 field)
if (!resp->validateCrc32())
{
std::cerr << __func__ << ": Handshake failed - CRC32 validation "
"failed from " << deviceIP << "(" << deviceEndpoint << ")"
<< std::endl;
return false;
}
// 3. Swap CRC16 to host endianness
resp->header.swapCrc16ToHostEndianness();
// 4. Validate CRC16 (on header only)
if (!resp->header.validateCrc16())
{
std::cerr << __func__ << ": Handshake failed - CRC16 validation "
"failed from " << deviceIP << "(" << deviceEndpoint << ")"
<< std::endl;
return false;
}
// 5. Swap content to host endianness
resp->swapContentsToHostEndianness();
if (!resp->sanityCheck() || resp->ret_code != 0x00)
{
std::cerr << __func__ << ": Handshake failed - invalid response from "
<< deviceIP << "(" << deviceEndpoint << ")"
<< std::endl;
return false;
}
std::cout << __func__ << ": Handshake successful with " << deviceIP
<< "(" << deviceEndpoint << ")" << std::endl;
return true;
} catch (const std::exception& e) {
std::cerr << __func__ << ": Handshake failed with " << deviceIP << ": "
<< e.what() << std::endl;
}
return false;
}
std::string Device::generateClientDeviceIpFromSerialNumber(
const std::string& broadcastCode
)
{
/** EXPLANATION:
* The input string is either a serial number (14 chars) or a broadcast code
* (15 chars). We need to determine which one it is and extract the serial
* number from the broadcast code.
*
* To generate a default IP address, we use the device's subnet: X.X.X.1XX
* where XX = last two digits of serial. We use the smoIp and smoSubnetNbits
* to determine the network prefix.
*/
if (broadcastCode.empty())
{
throw std::invalid_argument(
std::string(__func__) + ": Broadcast code cannot be empty");
}
std::string serialNumber;
if (broadcastCode.length() == 14)
{
// Input is a serial number
serialNumber = broadcastCode;
} else if (broadcastCode.length() == 15)
{
// Input is a broadcast code (serial + selector)
serialNumber = broadcastCode.substr(0, 14);
} else
{
// Invalid length
throw std::invalid_argument(
std::string(__func__) +
": Broadcast code must be 14 or 15 characters long");
}
// Extract last two digits of serial number
if (serialNumber.length() < 2)
{
throw std::invalid_argument(
std::string(__func__) + ": Serial number too short");
}
std::string lastTwoDigits = serialNumber.substr(serialNumber.length() - 2);
// Validate that last two characters are digits
if (lastTwoDigits[0] < '0' || lastTwoDigits[0] > '9' ||
lastTwoDigits[1] < '0' || lastTwoDigits[1] > '9')
{
throw std::invalid_argument(
std::string(__func__) +
": Last two characters of serial number must be digits");
}
/** EXPLANATION:
* Use the device's subnet: X.X.X.1XX where XX = last two digits of serial.
* We use the smoIp and smoSubnetNbits to determine the network prefix.
*/
// Parse smoIp to extract network prefix
auto smoIpOctets = comms::parseIPv4Address(smoIp);
if (!smoIpOctets.has_value())
{
throw std::invalid_argument(
std::string(__func__) + ": Invalid smoIp format: must be X.X.X.X");
}
// Generate subnet mask based on nbits
uint32_t subnetMask = getSubnetMaskFor(smoSubnetNbits);
uint32_t smoIpAddr = (std::stoi(smoIpOctets->octet1) << 24) |
(std::stoi(smoIpOctets->octet2) << 16) |
(std::stoi(smoIpOctets->octet3) << 8) |
std::stoi(smoIpOctets->octet4);
// Apply subnet mask to get network prefix
uint32_t networkPrefix = smoIpAddr & subnetMask;
// Extract octets from network prefix
uint8_t octet1 = (networkPrefix >> 24) & 0xFF;
uint8_t octet2 = (networkPrefix >> 16) & 0xFF;
uint8_t octet3 = (networkPrefix >> 8) & 0xFF;
// Use the first three octets and append "1" + last two digits
return std::to_string(octet1) + "." + std::to_string(octet2) + "." +
std::to_string(octet3) + ".1" + lastTwoDigits;
}
void Device::startHeartbeat()
{
if (!componentThread || discoveredDevice.ipAddr.empty()) {
return; // Can't start heartbeat without component thread or IP
}
/** EXPLANATION:
* Create raw UDP socket for heartbeat sending to avoid boost::asio dlopen
* issues.
*/
heartbeatSocketFd = socket(AF_INET, SOCK_DGRAM, 0);
if (heartbeatSocketFd < 0)
{
std::cerr << "[" << __func__ << "] Failed to create heartbeat socket: "
<< strerror(errno) << std::endl;
return;
}
/** EXPLANATION:
* Bind heartbeat socket to cmdPort so heartbeats come from the same port
* as handshake.
*/
struct sockaddr_in localAddr;
memset(&localAddr, 0, sizeof(localAddr));
localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = INADDR_ANY;
localAddr.sin_port = htons(cmdPort);
if (bind(
heartbeatSocketFd, (struct sockaddr*)&localAddr, sizeof(localAddr)) < 0)
{
std::cerr << "[" << __func__ << "] Failed to bind heartbeat socket to "
"port " << cmdPort << ": " << strerror(errno) << std::endl;
close(heartbeatSocketFd);
heartbeatSocketFd = -1;
return;
}
// Create heartbeat timer
heartbeatTimer = std::make_unique<boost::asio::deadline_timer>(
componentThread->getIoService());
heartbeatActive.store(true);
// Send first heartbeat immediately
sendHeartbeat();
}
void Device::sendHeartbeat()
{
if (!heartbeatActive.load() || heartbeatSocketFd < 0
|| discoveredDevice.ipAddr.empty())
{
return;
}
try {
comms::HeartbeatMessage heartbeatMsg;
heartbeatMsg.swapContentsToProtocolEndianness();
heartbeatMsg.header.setCrc16FromRawBytes();
heartbeatMsg.header.swapCrc16ToProtocolEndianness();
heartbeatMsg.footer.crc_32 = heartbeatMsg.calculateCrc32();
heartbeatMsg.footer.swapCrc32ToProtocolEndianness();
// Set up destination address for raw socket
struct sockaddr_in deviceAddr;
memset(&deviceAddr, 0, sizeof(deviceAddr));
deviceAddr.sin_family = AF_INET;
deviceAddr.sin_addr.s_addr = inet_addr(discoveredDevice.ipAddr.c_str());
// Heartbeats and commands go to port 65000
deviceAddr.sin_port = htons(65000);
ssize_t bytesSent = sendto(
heartbeatSocketFd, &heartbeatMsg, sizeof(heartbeatMsg), 0,
(struct sockaddr*)&deviceAddr, sizeof(deviceAddr));
if (bytesSent < 0)
{
std::cerr << "[" << __func__ << "] Failed to send heartbeat: "
<< strerror(errno) << std::endl;
return;
}
/** EXPLANATION:
* Schedule next heartbeat in 1 second, per the spec.
*/
heartbeatTimer->expires_from_now(boost::posix_time::seconds(1));
heartbeatTimer->async_wait(
[this](const boost::system::error_code& error) {
onHeartbeatTimer(error);
}
);
}
catch (const std::exception& e)
{
heartbeatActive.store(false);
std::cerr << "[" << __func__ << "] Heartbeat send failed for device "
<< discoveredDevice.deviceIdentifier
<< ": " << e.what() << std::endl;
}
}
void Device::onHeartbeatTimer(const boost::system::error_code& error)
{
// Timer was cancelled, heartbeat stopped
if (error == boost::asio::error::operation_aborted) {
return;
}
if (error)
{
heartbeatActive.store(false);
std::cerr << "[" << __func__ << "] Heartbeat timer error for device "
<< discoveredDevice.deviceIdentifier
<< ": " << error.message() << std::endl;
return;
}
// Send next heartbeat
sendHeartbeat();
}
uint32_t Device::getSubnetMaskFor(uint8_t nbits)
{
if (nbits > 32) {
throw std::invalid_argument(
std::string(__func__) + ": nbits must be between 0 and 32");
}
// Generate subnet mask: set the first nbits to 1, rest to 0
if (nbits == 0) {
return 0x00000000;
} else if (nbits == 32) {
return 0xFFFFFFFF;
} else {
// Create mask with nbits set to 1 from the left
return (0xFFFFFFFF << (32 - nbits));
}
}
std::optional<std::string> Device::detectSmoIp(const std::string& deviceIP)
{
/** EXPLANATION:
* This function detects the SMO IP address of the interface that's facing
* the device by iterating through all network interfaces and checking for
* the interface that has the IP address in the same subnet as the device's
* IP address.
*/
try {
// Parse the device IP to get the network prefix
auto deviceIpOctets = comms::parseIPv4Address(deviceIP);
if (!deviceIpOctets.has_value()) {
return std::nullopt;
}
// Convert device IP octets to integers for bitwise operations
uint32_t deviceIpAddr = (std::stoi(deviceIpOctets->octet1) << 24) |
(std::stoi(deviceIpOctets->octet2) << 16) |
(std::stoi(deviceIpOctets->octet3) << 8) |
std::stoi(deviceIpOctets->octet4);
// Generate subnet mask based on nbits
uint32_t subnetMask = getSubnetMaskFor(smoSubnetNbits);
/* Get all network interfaces using getifaddrs (Linux/Unix specific)
*
* FIXME: Add Windows support using GetAdaptersAddresses when porting
*/
struct ifaddrs *ifaddr;
if (getifaddrs(&ifaddr) == -1) {
return std::nullopt;
}
// Use unique_ptr for automatic cleanup (RAII) to free ifaddrs
auto ifaddr_deleter = [](struct ifaddrs* ptr) { freeifaddrs(ptr); };
std::unique_ptr<struct ifaddrs, decltype(ifaddr_deleter)> ifaddr_ptr(
ifaddr, ifaddr_deleter);
std::string found_ip;
/** EXPLANATION:
* Iterate through all network interfaces and check if the IP address is
* in the same subnet as the device's IP address.
*/
for (struct ifaddrs *ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next)
{
if (ifa->ifa_addr == nullptr) continue;
// Check if it's IPv4
if (ifa->ifa_addr->sa_family != AF_INET) { continue; }
// Get the IPv4 address
struct sockaddr_in* addr_in = (struct sockaddr_in*)ifa->ifa_addr;
char ip_str[INET_ADDRSTRLEN];
if (inet_ntop(
AF_INET, &addr_in->sin_addr, ip_str, INET_ADDRSTRLEN)
== nullptr)
{
continue;
}
std::string ip = ip_str;
// Check if this IP is in the same subnet
auto ipOctets = comms::parseIPv4Address(ip);
if (!ipOctets.has_value()) { continue; }
// Convert IP octets to integer
uint32_t ipAddr = (std::stoi(ipOctets->octet1) << 24) |
(std::stoi(ipOctets->octet2) << 16) |
(std::stoi(ipOctets->octet3) << 8) |
std::stoi(ipOctets->octet4);
/* Check if this iface's IP is in the same subnet as the device's IP
* using the calculated mask. Only compare the bits that are set in
* the subnet mask.
*/
if ((ipAddr & subnetMask) == (deviceIpAddr & subnetMask))
{
found_ip = ip;
break;
}
}
// Return the found IP (empty string if none found)
if (!found_ip.empty()) {
return found_ip;
}
return std::nullopt;
} catch (const std::exception& e) {
std::cerr << "Error detecting SMO IP: " << e.what() << std::endl;
return std::nullopt;
}
}
std::string Device::getSmoIp(const std::string& deviceIP)
{
/** EXPLANATION:
* If smo-ip was provided, return it.
* Otherwise, try to detect it based on the client device's IP address.
* If detection failed, throw an exception.
*/
if (!smoIp.empty()) {
return smoIp;
}
auto detectedIp = detectSmoIp(deviceIP);
if (detectedIp.has_value()) {
return detectedIp.value();
}
throw std::runtime_error(
std::string(__func__) + ": Failed to detect SMO IP address for device "
+ deviceIP + " with subnet mask /" + std::to_string(smoSubnetNbits));
}
} // namespace livoxProto1