Files
salmanoff/commonLibs/livoxProto1/device.cpp
T

637 lines
18 KiB
C++
Raw Normal View History

#include <sstream>
#include <thread>
#include <chrono>
#include <string>
#include <stdexcept>
#include <memory>
#include <unistd.h>
#include <ifaddrs.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <boost/asio.hpp>
#include "device.h"
#include "protocol.h"
#include "core.h"
namespace livoxProto1 {
namespace comms {
DiscoveredDevice::DiscoveredDevice(
const std::string &deviceIdentifier,
DeviceType deviceType,
const std::string &ipAddr)
: deviceIdentifier(deviceIdentifier),
deviceType(deviceType),
ipAddr(ipAddr)
{
}
DiscoveredDevice::DiscoveredDevice(
const BroadcastMessage &msg, const std::string &ipAddr
)
: DiscoveredDevice(
reinterpret_cast<const char*>(msg.broadcast_code),
static_cast<DeviceType>(msg.dev_type),
ipAddr)
{
}
std::string DiscoveredDevice::stringify(void) const
{
std::ostringstream oss;
oss << "DiscoveredDevice{"
<< "identifier='" << deviceIdentifier << "', "
<< "ipAddr='" << ipAddr << "', "
<< "deviceType=" << (int)deviceType << " (" << getDeviceTypeName() << ")"
<< "}";
return oss.str();
}
std::string DiscoveredDevice::getDeviceTypeName(void) const
{
switch (deviceType)
{
case DeviceType::Hub: return "Hub";
case DeviceType::Mid40: return "Mid-40";
case DeviceType::Tele15: return "Tele-15";
case DeviceType::Horizon: return "Horizon";
case DeviceType::Mid70: return "Mid-70";
case DeviceType::Avia: return "Avia";
default: return "Unknown";
}
}
} // namespace comms
Device::Device(const std::string &deviceIdentifier,
const std::shared_ptr<smo::ComponentThread>& componentThread,
int handshakeTimeoutMs, int retryDelayMs,
const std::string& smoIp, uint8_t smoSubnetNbits,
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort)
: discoveredDevice(
deviceIdentifier, comms::DeviceType::Mid40,
// Initialize empty. IP will be set upon successful connection.
""),
componentThread(componentThread),
handshakeTimeoutMs(handshakeTimeoutMs), retryDelayMs(retryDelayMs),
smoIp(smoIp), smoSubnetNbits(smoSubnetNbits),
dataPort(dataPort), cmdPort(cmdPort), imuPort(imuPort),
heartbeatActive(false)
{
connect();
}
Device::~Device()
{
if (heartbeatActive.load()) {
heartbeatActive.store(false);
if (heartbeatTimer) {
heartbeatTimer->cancel();
}
}
heartbeatTimer.reset();
heartbeatSocket.reset();
}
void Device::connect()
{
/** EXPLANATION:
* First check the broadcastListener to see if the device is already known.
* * If it is, return the DiscoveredDevice..
* If it is not, attempt to connect to the device by assuming that its IP
* address is the same as the last 2 octets of the deviceIdentifier.
* * If the connection is successful, return the DiscoveredDevice.
* If the connection is not successful, delay by retryDelayMs and check
* the broadcastListener again.
* * If the connection is successful return the DiscoveredDevice.
* If the connection is not successful, throw exception?
*
* If the connection is successful at any point, also set up the heartbeat
* pulse signal to be sent periodically by us to the device over the wire.
*/
// Try connecting to known device first
if (connectToKnownDevice()) {
startHeartbeat();
return;
}
// Try direct connect by device identifier
if (connectByDeviceIdentifier()) {
startHeartbeat();
return;
}
// Wait retry delay, then try known device again
std::this_thread::sleep_for(std::chrono::milliseconds(retryDelayMs));
if (connectToKnownDevice()) {
startHeartbeat();
return;
}
// All connection attempts failed
throw std::runtime_error(
std::string(__func__) + ": Failed to connect to device: "
+ discoveredDevice.deviceIdentifier);
}
bool Device::connectToKnownDevice()
{
// Get the global DeviceManager instance
auto& protoState = livoxProto1::getProtoState();
if (!protoState.deviceManager)
{
throw std::runtime_error(
std::string(__func__)
+ ": DeviceManager is not initialized in connectToKnownDevice()");
}
// Check if the device is known to the broadcastListener
if (!protoState.deviceManager->broadcastListener.deviceExists(
discoveredDevice.deviceIdentifier))
{
return false;
}
// Get the device info from broadcastListener
auto deviceInfo = protoState.deviceManager->broadcastListener.getDevice(
discoveredDevice.deviceIdentifier);
if (!deviceInfo)
{ return false; }
// Use the IP address from the broadcast message
std::string deviceIP = deviceInfo->ipAddr;
// Execute handshake with the known device
bool success = executeHandshake(
deviceIP, handshakeTimeoutMs, dataPort, cmdPort, imuPort);
// If successful, update our device's IP address with the one from broadcast
if (success) {
discoveredDevice.ipAddr = deviceInfo->ipAddr;
}
return success;
}
bool Device::connectByDeviceIdentifier()
{
std::string deviceIP = generateClientDeviceIpFromSerialNumber(
discoveredDevice.deviceIdentifier);
bool success = executeHandshake(
deviceIP, handshakeTimeoutMs, dataPort, cmdPort, imuPort);
// If successful, store the calculated IP address
if (success) {
discoveredDevice.ipAddr = deviceIP;
}
return success;
}
bool Device::executeHandshake(
const std::string& deviceIP, int timeoutMs,
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort
)
{
try {
2025-09-06 20:46:02 -04:00
/** EXPLANATION:
* This io_context queue here will allow us to async-bridge the
* handshake request and response without having to worry about
* re-enqueueing messages, as we would have to do if we used the
* io_context of a ComponentThread.
*/
boost::asio::io_context io_context;
boost::asio::ip::udp::socket socket(io_context);
socket.open(boost::asio::ip::udp::v4());
// Get the IP addr of the SMO machine's iface that is facing the device.
std::string smoIp = getSmoIp(deviceIP);
comms::HandshakeRequest handshakeReq(smoIp, dataPort, cmdPort, imuPort);
handshakeReq.swapContentsToProtocolEndianness();
handshakeReq.header.setCrc16FromRawBytes();
handshakeReq.header.swapCrc16ToProtocolEndianness();
handshakeReq.footer.crc_32 = handshakeReq.calculateCrc32();
handshakeReq.footer.swapCrc32ToProtocolEndianness();
boost::asio::ip::udp::endpoint deviceEndpoint(
boost::asio::ip::address::from_string(deviceIP), 65000);
socket.send_to(
boost::asio::buffer(&handshakeReq, sizeof(handshakeReq)),
deviceEndpoint);
std::cout << __func__ << ": Sent handshake request to "
<< deviceIP << ":65000" << std::endl;
// Wait for response with timeout using deadline_timer
boost::asio::deadline_timer timer(io_context);
timer.expires_from_now(boost::posix_time::milliseconds(timeoutMs));
uint8_t responseBuffer[1024];
boost::asio::ip::udp::endpoint senderEndpoint;
std::atomic<bool> timeoutOccurred{false};
timer.async_wait(
[&timeoutOccurred](const boost::system::error_code& ec) {
if (!ec) { timeoutOccurred.store(true); }
}
);
size_t bytesReceived = 0;
boost::system::error_code receiveError;
socket.async_receive_from(
boost::asio::buffer(responseBuffer, sizeof(responseBuffer)),
senderEndpoint,
[&bytesReceived, &receiveError](
const boost::system::error_code& ec, size_t bytes)
{
bytesReceived = bytes;
receiveError = ec;
}
);
while (!timeoutOccurred.load() && !receiveError && bytesReceived == 0) {
io_context.run_one();
}
timer.cancel();
if (timeoutOccurred.load())
{
std::cerr << __func__ << ": Handshake timeout with " << deviceIP
<< ":" << deviceEndpoint << std::endl;
return false;
}
if (receiveError)
{
std::cerr << __func__ << ": Handshake error with " << deviceIP
<< ": " << receiveError.message() << ":" << deviceEndpoint
<< std::endl;
return false;
}
if (bytesReceived < sizeof(comms::HandshakeResponse))
{
std::cerr << __func__ << ": Handshake failed - response too small from "
<< deviceIP << ":" << deviceEndpoint << std::endl;
return false;
}
// Parse response as complete frame
comms::HandshakeResponse* resp = reinterpret_cast<
comms::HandshakeResponse*
>(responseBuffer);
// Following the clean receiving flow:
// 1. Swap CRC32 to host endianness first
resp->footer.swapCrc32ToHostEndianness();
// 2. Validate CRC32 (on whole message excluding footer CRC32 field)
if (!resp->validateCrc32())
{
std::cerr << __func__ << ": Handshake failed - CRC32 validation "
"failed from " << deviceIP << ":" << deviceEndpoint
<< std::endl;
return false;
}
// 3. Swap CRC16 to host endianness
resp->header.swapCrc16ToHostEndianness();
// 4. Validate CRC16 (on header only)
if (!resp->header.validateCrc16())
{
std::cerr << __func__ << ": Handshake failed - CRC16 validation "
"failed from " << deviceIP << ":" << deviceEndpoint
<< std::endl;
return false;
}
// 5. Swap content to host endianness
resp->swapContentsToHostEndianness();
if (!resp->sanityCheck() || resp->ret_code != 0x00)
{
std::cerr << __func__ << ": Handshake failed - invalid response from "
<< deviceIP << ":" << deviceEndpoint
<< std::endl;
return false;
}
std::cout << __func__ << ": Handshake successful with " << deviceIP
<< ":" << deviceEndpoint << std::endl;
return true;
} catch (const std::exception& e) {
std::cerr << __func__ << ": Handshake failed with " << deviceIP << ": "
<< e.what() << std::endl;
}
return false;
}
std::string Device::generateClientDeviceIpFromSerialNumber(
const std::string& broadcastCode
)
{
2025-09-06 20:46:02 -04:00
/** EXPLANATION:
* The input string is either a serial number (14 chars) or a broadcast code
* (15 chars). We need to determine which one it is and extract the serial
* number from the broadcast code.
*
* To generate a default IP address, we use the device's subnet: X.X.X.1XX
* where XX = last two digits of serial. We use the smoIp and smoSubnetNbits
* to determine the network prefix.
*/
if (broadcastCode.empty())
{
throw std::invalid_argument(
std::string(__func__) + ": Broadcast code cannot be empty");
}
std::string serialNumber;
if (broadcastCode.length() == 14)
{
// Input is a serial number
serialNumber = broadcastCode;
} else if (broadcastCode.length() == 15)
{
// Input is a broadcast code (serial + selector)
serialNumber = broadcastCode.substr(0, 14);
} else
{
// Invalid length
throw std::invalid_argument(
std::string(__func__) +
": Broadcast code must be 14 or 15 characters long");
}
// Extract last two digits of serial number
if (serialNumber.length() < 2)
{
throw std::invalid_argument(
std::string(__func__) + ": Serial number too short");
}
std::string lastTwoDigits = serialNumber.substr(serialNumber.length() - 2);
// Validate that last two characters are digits
if (lastTwoDigits[0] < '0' || lastTwoDigits[0] > '9' ||
lastTwoDigits[1] < '0' || lastTwoDigits[1] > '9')
{
throw std::invalid_argument(
std::string(__func__) +
": Last two characters of serial number must be digits");
}
/** EXPLANATION:
* Use the device's subnet: X.X.X.1XX where XX = last two digits of serial.
* We use the smoIp and smoSubnetNbits to determine the network prefix.
*/
// Parse smoIp to extract network prefix
auto smoIpOctets = comms::parseIPv4Address(smoIp);
if (!smoIpOctets.has_value()) {
throw std::invalid_argument(
std::string(__func__) + ": Invalid smoIp format: must be X.X.X.X");
}
// Generate subnet mask based on nbits
uint32_t subnetMask = getSubnetMaskFor(smoSubnetNbits);
uint32_t smoIpAddr = (std::stoi(smoIpOctets->octet1) << 24) |
(std::stoi(smoIpOctets->octet2) << 16) |
(std::stoi(smoIpOctets->octet3) << 8) |
std::stoi(smoIpOctets->octet4);
// Apply subnet mask to get network prefix
uint32_t networkPrefix = smoIpAddr & subnetMask;
// Extract octets from network prefix
uint8_t octet1 = (networkPrefix >> 24) & 0xFF;
uint8_t octet2 = (networkPrefix >> 16) & 0xFF;
uint8_t octet3 = (networkPrefix >> 8) & 0xFF;
// Use the first three octets and append "1" + last two digits
return std::to_string(octet1) + "." + std::to_string(octet2) + "." +
std::to_string(octet3) + ".1" + lastTwoDigits;
}
void Device::startHeartbeat()
{
if (!componentThread || discoveredDevice.ipAddr.empty()) {
return; // Can't start heartbeat without component thread or IP
}
2025-09-06 20:46:02 -04:00
/** EXPLANATION:
* Create heartbeat socket using io_service of the componentThread that was
* given to us for use by this device.
*/
heartbeatSocket = std::make_unique<boost::asio::ip::udp::socket>(
componentThread->getIoService());
heartbeatSocket->open(boost::asio::ip::udp::v4());
// Create heartbeat timer
heartbeatTimer = std::make_unique<boost::asio::deadline_timer>(
componentThread->getIoService());
heartbeatActive.store(true);
// Send first heartbeat immediately
sendHeartbeat();
}
void Device::sendHeartbeat()
{
if (!heartbeatActive.load() || !heartbeatSocket
|| discoveredDevice.ipAddr.empty())
{
return;
}
try {
comms::HeartbeatMessage heartbeatMsg;
heartbeatMsg.swapContentsToProtocolEndianness();
heartbeatMsg.header.setCrc16FromRawBytes();
heartbeatMsg.header.swapCrc16ToProtocolEndianness();
heartbeatMsg.footer.crc_32 = heartbeatMsg.calculateCrc32();
heartbeatMsg.footer.swapCrc32ToProtocolEndianness();
2025-09-06 20:46:02 -04:00
boost::asio::ip::udp::endpoint deviceEndpoint(
2025-09-06 20:46:02 -04:00
boost::asio::ip::address::from_string(discoveredDevice.ipAddr),
cmdPort);
heartbeatSocket->send_to(
boost::asio::buffer(&heartbeatMsg, sizeof(heartbeatMsg)),
deviceEndpoint);
2025-09-06 20:46:02 -04:00
/** EXPLANATION:
* Schedule next heartbeat in 1 second, per the spec.
*/
heartbeatTimer->expires_from_now(boost::posix_time::seconds(1));
heartbeatTimer->async_wait(
[this](const boost::system::error_code& error) {
onHeartbeatTimer(error);
}
);
}
catch (const std::exception& e)
{
heartbeatActive.store(false);
std::cerr << "[" << __func__ << "] Heartbeat send failed for device "
<< discoveredDevice.deviceIdentifier
<< ": " << e.what() << std::endl;
}
}
void Device::onHeartbeatTimer(const boost::system::error_code& error)
{
// Timer was cancelled, heartbeat stopped
if (error == boost::asio::error::operation_aborted) {
return;
}
if (error)
{
heartbeatActive.store(false);
std::cerr << "[" << __func__ << "] Heartbeat timer error for device "
<< discoveredDevice.deviceIdentifier
<< ": " << error.message() << std::endl;
return;
}
// Send next heartbeat
sendHeartbeat();
}
uint32_t Device::getSubnetMaskFor(uint8_t nbits)
{
if (nbits > 32) {
throw std::invalid_argument(
std::string(__func__) + ": nbits must be between 0 and 32");
}
// Generate subnet mask: set the first nbits to 1, rest to 0
if (nbits == 0) {
return 0x00000000;
} else if (nbits == 32) {
return 0xFFFFFFFF;
} else {
// Create mask with nbits set to 1 from the left
return (0xFFFFFFFF << (32 - nbits));
}
}
std::optional<std::string> Device::detectSmoIp(const std::string& deviceIP)
{
/** EXPLANATION:
* This function detects the SMO IP address of the interface that's facing
* the device by iterating through all network interfaces and checking for
* the interface that has the IP address in the same subnet as the device's
* IP address.
*/
try {
// Parse the device IP to get the network prefix
auto deviceIpOctets = comms::parseIPv4Address(deviceIP);
if (!deviceIpOctets.has_value()) {
return std::nullopt;
}
// Convert device IP octets to integers for bitwise operations
uint32_t deviceIpAddr = (std::stoi(deviceIpOctets->octet1) << 24) |
(std::stoi(deviceIpOctets->octet2) << 16) |
(std::stoi(deviceIpOctets->octet3) << 8) |
std::stoi(deviceIpOctets->octet4);
// Generate subnet mask based on nbits
uint32_t subnetMask = getSubnetMaskFor(smoSubnetNbits);
/* Get all network interfaces using getifaddrs (Linux/Unix specific)
*
* FIXME: Add Windows support using GetAdaptersAddresses when porting
*/
struct ifaddrs *ifaddr;
if (getifaddrs(&ifaddr) == -1) {
return std::nullopt;
}
// Use unique_ptr for automatic cleanup (RAII) to free ifaddrs
auto ifaddr_deleter = [](struct ifaddrs* ptr) { freeifaddrs(ptr); };
std::unique_ptr<struct ifaddrs, decltype(ifaddr_deleter)> ifaddr_ptr(
ifaddr, ifaddr_deleter);
std::string found_ip;
2025-09-06 20:46:02 -04:00
/** EXPLANATION:
* Iterate through all network interfaces and check if the IP address is
* in the same subnet as the device's IP address.
*/
for (struct ifaddrs *ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next)
{
if (ifa->ifa_addr == nullptr) continue;
// Check if it's IPv4
if (ifa->ifa_addr->sa_family != AF_INET) { continue; }
// Get the IPv4 address
struct sockaddr_in* addr_in = (struct sockaddr_in*)ifa->ifa_addr;
char ip_str[INET_ADDRSTRLEN];
if (inet_ntop(
AF_INET, &addr_in->sin_addr, ip_str, INET_ADDRSTRLEN)
== nullptr)
{
continue;
}
std::string ip = ip_str;
// Check if this IP is in the same subnet
auto ipOctets = comms::parseIPv4Address(ip);
if (!ipOctets.has_value()) { continue; }
// Convert IP octets to integer
uint32_t ipAddr = (std::stoi(ipOctets->octet1) << 24) |
(std::stoi(ipOctets->octet2) << 16) |
(std::stoi(ipOctets->octet3) << 8) |
std::stoi(ipOctets->octet4);
/* Check if this iface's IP is in the same subnet as the device's IP
* using the calculated mask. Only compare the bits that are set in
* the subnet mask.
*/
if ((ipAddr & subnetMask) == (deviceIpAddr & subnetMask))
{
found_ip = ip;
2025-09-06 20:46:02 -04:00
break;
}
}
// Return the found IP (empty string if none found)
if (!found_ip.empty()) {
return found_ip;
}
return std::nullopt;
} catch (const std::exception& e) {
std::cerr << "Error detecting SMO IP: " << e.what() << std::endl;
return std::nullopt;
}
}
std::string Device::getSmoIp(const std::string& deviceIP)
{
2025-09-06 20:46:02 -04:00
/** EXPLANATION:
* If smo-ip was provided, return it.
* Otherwise, try to detect it based on the client device's IP address.
* If detection failed, throw an exception.
*/
if (!smoIp.empty()) {
return smoIp;
}
auto detectedIp = detectSmoIp(deviceIP);
if (detectedIp.has_value()) {
return detectedIp.value();
}
throw std::runtime_error(
std::string(__func__) + ": Failed to detect SMO IP address for device "
+ deviceIP + " with subnet mask /" + std::to_string(smoSubnetNbits));
}
} // namespace livoxProto1