Files
salmanoff/commonLibs/livoxProto1/device.cpp
T
hayodea 816a047920 Async: new hierachy; manages reply posting and unlocking
Async: Use new [Non]PostedAsyncCont and callOriginalCb

This new hierarchy of classes gives us a central mechanism for
managing both reply-posting and lockSpec unlocking.

* callOriginalCb: Now uses a modern C++ variadic template design
  enabling it to handle both direct calling and std::bind()
  re-binding of an arbitrary number of arguments from the caller.

This enables us to mostly eliminate the repeated, bespoke
definitions of callOriginalCb littered throughout the codebase.

We've also propagated these changes throughout the codebase in
this patch.
2025-09-17 16:38:48 -04:00

1265 lines
36 KiB
C++

#include <sstream>
#include <thread>
#include <chrono>
#include <string>
#include <stdexcept>
#include <memory>
#include <unistd.h>
#include <ifaddrs.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <errno.h>
#include <cstring>
#include <netinet/in.h>
#include <optional>
#include <boost/asio.hpp>
#include <opts.h>
#include <asynchronousContinuation.h>
#include "device.h"
#include "protocol.h"
#include "core.h"
/** EXPLANATION:
* This file contains the implementation of the Device class.
*
* FIXME:
* We may need to check how smo-subnet-nbits is used in here because we didn't
* actually check to see under what conditions it's required vs optional. Hence
* we don't currently enforce correct usage of it, and we just assume that the
* livoxGen1's policy of supplying a default value of 24 is correct.
*/
namespace livoxProto1 {
namespace comms {
DiscoveredDevice::DiscoveredDevice(
const std::string &deviceIdentifier,
DeviceType deviceType,
const std::string &ipAddr)
: deviceIdentifier(deviceIdentifier),
deviceType(deviceType),
ipAddr(ipAddr)
{
}
DiscoveredDevice::DiscoveredDevice(
const BroadcastMessage &msg, const std::string &ipAddr
)
: DiscoveredDevice(
reinterpret_cast<const char*>(msg.broadcast_code),
static_cast<DeviceType>(msg.dev_type),
ipAddr)
{
}
std::string DiscoveredDevice::stringify(void) const
{
std::ostringstream oss;
oss << "DiscoveredDevice{"
<< "identifier='" << deviceIdentifier << "', "
<< "ipAddr='" << ipAddr << "', "
<< "deviceType=" << (int)deviceType << " (" << getDeviceTypeName() << ")"
<< "}";
return oss.str();
}
std::string DiscoveredDevice::getDeviceTypeName(void) const
{
switch (deviceType)
{
case DeviceType::Hub: return "Hub";
case DeviceType::Mid40: return "Mid-40";
case DeviceType::Tele15: return "Tele-15";
case DeviceType::Horizon: return "Horizon";
case DeviceType::Mid70: return "Mid-70";
case DeviceType::Avia: return "Avia";
default: return "Unknown";
}
}
} // namespace comms
Device::Device(const std::string &deviceIdentifier,
const std::shared_ptr<smo::ComponentThread>& componentThread,
int handshakeTimeoutMs, int retryDelayMs,
const std::string& smoIp, uint8_t smoSubnetNbits,
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort)
: discoveredDevice(
deviceIdentifier, comms::DeviceType::Mid40,
// Initialize empty. IP will be set upon successful connection.
""),
componentThread(componentThread),
handshakeTimeoutMs(handshakeTimeoutMs), retryDelayMs(retryDelayMs),
smoIp(smoIp), detectedSmoListeningIp(""), smoSubnetNbits(smoSubnetNbits),
dataPort(dataPort), cmdPort(cmdPort), imuPort(imuPort),
heartbeatFd(-1),
heartbeatActive(false)
{
}
Device::~Device()
{
if (heartbeatActive.load()) {
heartbeatActive.store(false);
if (heartbeatTimer) {
heartbeatTimer->cancel();
}
}
heartbeatTimer.reset();
if (heartbeatFd >= 0) {
close(heartbeatFd);
heartbeatFd = -1;
}
}
/**
* Device::ConnectReq - Encapsulates all state and resources for async connection sequence
* This class manages the overall device connection process including handshake and heartbeat setup
*/
class Device::ConnectReq
: public smo::NonPostedAsynchronousContinuation<Device::connectReqCbFn>
{
private:
Device& device;
std::unique_ptr<boost::asio::deadline_timer> retryTimer;
public:
ConnectReq(Device& dev, Device::connectReqCbFn cb)
: smo::NonPostedAsynchronousContinuation<Device::connectReqCbFn>(
std::move(cb)), device(dev)
{}
/** FIXME:
* WE need to assign the ipAddr to the Device being connected up.
*/
// Callback methods for the connection sequence
void connectReq1(
std::shared_ptr<ConnectReq> context,
bool success, const std::string& ipAddr, int fd
)
{
if (success)
{
// Store the IP address in the device
context->device.discoveredDevice.ipAddr = ipAddr;
// Store the handshake FD for heartbeats
context->device.heartbeatFd = fd;
context->device.startHeartbeat();
context->callOriginalCb(true);
return;
}
if (OptionParser::getOptions().verbose)
{
std::cout << __func__ << ": Trying to connect to device by "
<< "identifier" << "\n";
}
// Try direct connect by device identifier
context->device.connectByDeviceIdentifierReq(
std::bind(&ConnectReq::connectReq2, context.get(), context,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3));
}
void connectReq2(
std::shared_ptr<ConnectReq> context,
bool success, const std::string& ipAddr, int fd
)
{
if (success)
{
// Store the IP address in the device
context->device.discoveredDevice.ipAddr = ipAddr;
// Store the handshake FD for heartbeats
context->device.heartbeatFd = fd;
context->device.startHeartbeat();
context->callOriginalCb(true);
return;
}
// Start retry timer
if (OptionParser::getOptions().verbose)
{
std::cout << __func__ << ": Starting retry delay ("
<< context->device.retryDelayMs << "ms), then trying known "
<< "device again" << "\n";
}
context->retryTimer = std::make_unique<boost::asio::deadline_timer>(
context->device.componentThread->getIoService());
context->retryTimer->expires_from_now(
boost::posix_time::milliseconds(context->device.retryDelayMs));
context->retryTimer->async_wait(
std::bind(
&ConnectReq::connectReq3, context.get(), context,
std::placeholders::_1));
}
void connectReq3(
std::shared_ptr<ConnectReq> context,
const boost::system::error_code& error
)
{
if (error)
{
context->callOriginalCb(false);
return;
}
if (OptionParser::getOptions().verbose)
{
std::cout << __func__ << ": Trying to connect to known device "
<< "again" << "\n";
}
context->device.connectToKnownDeviceReq(
std::bind(&ConnectReq::connectReq4, context.get(), context,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3));
}
void connectReq4(
std::shared_ptr<ConnectReq> context,
bool success, const std::string& ipAddr, int fd
)
{
if (success)
{
context->device.discoveredDevice.ipAddr = ipAddr;
context->device.heartbeatFd = fd;
context->device.startHeartbeat();
context->callOriginalCb(true);
return;
}
// All connection attempts failed
context->callOriginalCb(false);
}
};
void Device::connectReq(Device::connectReqCbFn callback)
{
// Create the connection request object to hold state and callbacks
auto request = std::make_shared<ConnectReq>(*this, std::move(callback));
// Try connecting to known device first
if (OptionParser::getOptions().verbose) {
std::cout << __func__ << ": Trying to connect to known device" << "\n";
}
connectToKnownDeviceReq(
std::bind(
&ConnectReq::connectReq1, request.get(), request,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3));
}
class Device::ConnectToKnownDeviceReq
: public smo::NonPostedAsynchronousContinuation<
Device::connectToKnownDeviceReqCbFn>
{
public:
Device& device;
std::string deviceIP;
std::shared_ptr<livoxProto1::comms::DiscoveredDevice> deviceInfo;
ConnectToKnownDeviceReq(Device& dev, Device::connectToKnownDeviceReqCbFn cb)
: smo::NonPostedAsynchronousContinuation<
Device::connectToKnownDeviceReqCbFn>(std::move(cb)), device(dev)
{}
// Public accessor for the original callback
void callOriginalCallback(bool success, const std::string& ipAddr, int fd)
{ callOriginalCb(success, ipAddr, fd); }
// Wrapper for failure cases
void callOriginalCallbackWithFailure()
{ callOriginalCallback(false, "", -1); }
// Callback methods for the connection sequence
void connectToKnownDeviceReq1(
std::shared_ptr<ConnectToKnownDeviceReq> context, bool success, int fd
)
{
// Return the IP address and raw FD to the caller
context->callOriginalCallback(success, context->deviceIP, fd);
}
};
/** EXPLANATION:
* This function is used to connect to a device that is already known to the
* broadcastListener.
*/
void Device::connectToKnownDeviceReq(
Device::connectToKnownDeviceReqCbFn callback
)
{
// Create the connection request object to hold state and callbacks
auto request = std::make_shared<ConnectToKnownDeviceReq>(
*this, std::move(callback));
auto& protoState = livoxProto1::getProtoState();
if (!protoState.deviceManager)
{
request->callOriginalCallbackWithFailure();
return;
}
// Check if the device is known to the broadcastListener
if (!protoState.deviceManager->broadcastListener.deviceExists(
request->device.discoveredDevice.deviceIdentifier))
{
request->callOriginalCallbackWithFailure();
return;
}
request->deviceInfo = protoState.deviceManager->broadcastListener.getDevice(
request->device.discoveredDevice.deviceIdentifier);
if (!request->deviceInfo)
{
request->callOriginalCallbackWithFailure();
return;
}
// Use the IP address from the broadcast message
request->deviceIP = request->deviceInfo->ipAddr;
// Determine the final listening IP address
auto smoIpResult = request->device.getSmoIp(request->deviceIP);
if (!smoIpResult.has_value())
{
// Auto-detection failed, fail early
std::cerr << __func__ << ": Failed to detect SMO listening IP for "
<< "known device ("
<< request->device.discoveredDevice.deviceIdentifier << ")"
<< " @(" << request->deviceIP << ").\n";
request->callOriginalCallbackWithFailure();
return;
}
if (OptionParser::getOptions().verbose)
{
std::cout << __func__ << ": Detected SMO listening IP for known device "
<< request->device.discoveredDevice.deviceIdentifier
<< " @(" << request->deviceIP << ") is "
<< smoIpResult.value() << ". About to try to handshake.\n";
}
request->device.detectedSmoListeningIp = smoIpResult.value();
// Execute handshake with the known device using async method
request->device.executeHandshakeReq(
request->deviceIP,
std::bind(
&ConnectToKnownDeviceReq::connectToKnownDeviceReq1,
request.get(), request,
std::placeholders::_1, std::placeholders::_2));
}
class Device::ConnectByDeviceIdentifierReq
: public smo::NonPostedAsynchronousContinuation<
Device::connectByDeviceIdentifierReqCbFn>
{
public:
Device& device;
std::string deviceIP;
ConnectByDeviceIdentifierReq(
Device& dev, Device::connectByDeviceIdentifierReqCbFn cb)
: smo::NonPostedAsynchronousContinuation<
Device::connectByDeviceIdentifierReqCbFn>(
std::move(cb)), device(dev)
{}
// Public accessor for the original callback
void callOriginalCallback(bool success, const std::string& ipAddr, int fd)
{ callOriginalCb(success, ipAddr, fd); }
// Wrapper for failure cases
void callOriginalCallbackWithFailure()
{ callOriginalCallback(false, "", -1); }
// Callback methods for the connection sequence
void connectByDeviceIdentifierReq1(
std::shared_ptr<ConnectByDeviceIdentifierReq> context,
bool success, int fd
)
{
// Return the IP address and raw FD to the caller
context->callOriginalCallback(success, context->deviceIP, fd);
}
};
void Device::connectByDeviceIdentifierReq(
Device::connectByDeviceIdentifierReqCbFn callback
)
{
/** EXPLANATION:
* This method uses heuristic device IP construction from the serial number.
* This requires smoIp to be provided because:
* 1. We need the network prefix to generate a valid device IP address
* 2. Without a target device IP, we cannot detect which interface faces the device
* 3. Therefore, if smoIp is omitted, heuristic construction is impossible
*
* If smoIp is not provided, the driver must rely only on broadcast advertisements
* from the device (handled by connectToKnownDeviceReq).
*/
// Check if smoIp is provided - required for heuristic construction
if (smoIp.empty())
{
callback(false, "", -1);
return;
}
// Create the connection request object to hold state and callbacks
auto request = std::make_shared<ConnectByDeviceIdentifierReq>(
*this, std::move(callback));
// Generate device IP from serial number
request->deviceIP = generateClientDeviceIpFromSerialNumber(
request->device.discoveredDevice.deviceIdentifier);
// For heuristic construction, always use the provided smoIp.
request->device.detectedSmoListeningIp = request->device.smoIp;
if (OptionParser::getOptions().verbose)
{
std::cout << __func__ << ": About to try to connect to device by "
<< "identifier (" << discoveredDevice.deviceIdentifier << ")"
<< " at IP (" << smoIp << ").\n";
}
// Execute handshake using async method
request->device.executeHandshakeReq(
request->deviceIP,
std::bind(
&ConnectByDeviceIdentifierReq::connectByDeviceIdentifierReq1,
request.get(), request,
std::placeholders::_1, std::placeholders::_2));
}
class Device::ExecuteHandshakeReq
: public smo::NonPostedAsynchronousContinuation<
Device::executeHandshakeReqCbFn>
{
public:
friend void Device::executeHandshakeReq(
const std::string& deviceIP, Device::executeHandshakeReqCbFn callback);
enum class SocketState
{
SOCKET_STILL_WAITING = 0,
SOCKET_ERROR,
SOCKET_RECV_SUCCESS,
SOCKET_RECV_ERROR
};
public:
Device& device;
std::string deviceIP;
// Atomic state flags for async coordination
std::atomic<bool> timerFired{false};
std::atomic<SocketState> socketState{SocketState::SOCKET_STILL_WAITING};
std::atomic<bool> handlerExecuted{false};
// The stream descriptor that will be returned to the caller
boost::asio::posix::stream_descriptor handshakeFdDesc;
boost::asio::deadline_timer timeoutTimer;
// Received data storage
uint8_t responseBuffer[1024]{};
ssize_t bytesReceived = -1;
struct sockaddr_in senderAddr;
socklen_t senderAddrLen = sizeof(senderAddr);
public:
ExecuteHandshakeReq(
Device& dev, const std::string& deviceIP,
Device::executeHandshakeReqCbFn cb)
: smo::NonPostedAsynchronousContinuation<Device::executeHandshakeReqCbFn>(
std::move(cb)),
device(dev), deviceIP(deviceIP),
handshakeFdDesc(device.componentThread->getIoService()),
timeoutTimer(device.componentThread->getIoService())
{
}
~ExecuteHandshakeReq()
{
cleanup();
}
// Public accessor for the original callback
void callOriginalCallback(bool success, int fd)
{ callOriginalCb(success, fd); }
void callOriginalCallbackWithFailure()
{
/** EXPLANATION:
* We have to call cleanupHandshakeSocket() here, specifically because
* there are 5 references we need to clean up, and 2 of them are
* actually recursive self-references within this class.
*
* The timer and the handshakeFdDesc are both given recursive
* self-referencing shared_ptr's to this class when we eventually set up
* the async callbacks for them.
*
* So merely letting the async continuation sequences go out of scope
* won't cause this class to be destroyed. Rather, since the class
* references itelf, we have to first break those references. Then and
* only then will the shared_ptr's refcount go to 0 and the class will
* be destroyed.
*
* We always unconditionally break the timer's reference inside of
* the branch-unifying segment (executeHandshakeReq2), but we generally
* only want to break the handshakeFdDesc's reference at the exact
* moment when the sequence fails, because if it succeeds, we want to
* commit the FD to the Device object being created (for heartbeats).
*
* Hence, we call cleanupHandshakeSocket() at the point of failure.
* To break the self-reference when the sequence is successful, we
* manually do that at the end of the sequence.
*/
cleanupHandshakeSocket();
callOriginalCallback(false, -1);
}
private:
bool setupSocket()
{
/** EXPLANATION:
* Create non-blocking UDP socket for handshake. We can't use
* boost::asio::socket because it causes a segfault if associated with
* an io_service from the main program (it's a boost bug).
*/
int socketFd = socket(AF_INET, SOCK_DGRAM, 0);
if (socketFd < 0)
{
std::cerr << __func__ << ": Failed to create socket: "
<< strerror(errno) << std::endl;
return false;
}
int flags = fcntl(socketFd, F_GETFL, 0);
if (flags < 0 || fcntl(socketFd, F_SETFL, flags | O_NONBLOCK) < 0)
{
std::cerr << __func__ << ": Failed to set socket non-blocking: "
<< strerror(errno) << std::endl;
return false;
}
// Bind socket to cmdPort so we can receive the handshake response
struct sockaddr_in localAddr;
memset(&localAddr, 0, sizeof(localAddr));
localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = INADDR_ANY;
localAddr.sin_port = htons(device.cmdPort);
if (bind(socketFd, (struct sockaddr*)&localAddr, sizeof(localAddr)) < 0)
{
std::cerr << __func__ << ": Failed to bind socket to port "
<< device.cmdPort << ": " << strerror(errno) << std::endl;
return false;
}
// Assign the socket FD to the stream descriptor
handshakeFdDesc.assign(socketFd);
return true;
}
bool sendHandshakeRequest()
{
/** EXPLANATION:
* Prepare handshake request.
*/
comms::HandshakeRequest handshakeReq(
device.detectedSmoListeningIp,
device.dataPort, device.cmdPort, device.imuPort);
handshakeReq.swapContentsToProtocolEndianness();
handshakeReq.header.setCrc16FromRawBytes();
handshakeReq.header.swapCrc16ToProtocolEndianness();
handshakeReq.footer.crc_32 = handshakeReq.calculateCrc32();
handshakeReq.footer.swapCrc32ToProtocolEndianness();
// Prepare device endpoint
struct sockaddr_in deviceAddr;
memset(&deviceAddr, 0, sizeof(deviceAddr));
deviceAddr.sin_family = AF_INET;
deviceAddr.sin_addr.s_addr = inet_addr(deviceIP.c_str());
deviceAddr.sin_port = htons(65000);
// Send handshake request directly (synchronous)
ssize_t bytesSent = sendto(
handshakeFdDesc.native_handle(),
&handshakeReq, sizeof(comms::HandshakeRequest), 0,
(struct sockaddr*)&deviceAddr, sizeof(deviceAddr));
if (bytesSent < 0)
{
std::cerr << __func__ << ": Failed to send handshake request: "
<< strerror(errno) << std::endl;
return false;
}
return true;
}
void setupAsyncCallbacks(
const std::shared_ptr<ExecuteHandshakeReq> &request
)
{
if (!handshakeFdDesc.is_open())
{
throw std::runtime_error(
std::string(__func__) +
": handshakeFdDesc is not open; cannot set up async callbacks "
"for device " + deviceIP + "("
+ device.discoveredDevice.deviceIdentifier + ")" + " handshake."
"Check socket initialization and bining."
);
}
/** EXPLANATION:
* We setup an async timer event to detect timeout, and wait for the
* device to respond to the handshake request. If the device does not
* respond within the timeout period, we will consider the handshake
* to have failed.
*/
timeoutTimer.expires_from_now(
boost::posix_time::milliseconds(device.handshakeTimeoutMs));
timeoutTimer.async_wait(
std::bind(
&ExecuteHandshakeReq::executeHandshakeReq1_1, this, request,
std::placeholders::_1));
/** EXPLANATION:
* Since we're using POSIX sockets calls on the underlying
* native_handle, Let's use async_wait with POLLIN to detect when data
* is available for reading.
*/
handshakeFdDesc.async_wait(
boost::asio::posix::stream_descriptor::wait_read,
std::bind(&ExecuteHandshakeReq::executeHandshakeReq1_2, this,
request,
std::placeholders::_1));
}
void executeHandshakeReq1_1(
std::shared_ptr<ExecuteHandshakeReq>,
const boost::system::error_code& error)
{
// This is called from the timer callback
if (!error) // Timeout occurred (not cancelled)
{
timerFired.store(true);
executeHandshakeReq2();
}
}
void executeHandshakeReq1_2(
std::shared_ptr<ExecuteHandshakeReq>,
const boost::system::error_code& error
)
{
// This is called from the socket read callback
if (error)
{
socketState.store(SocketState::SOCKET_ERROR);
std::cerr << __func__ << ": Socket read error: " << error.message()
<< std::endl;
} else
{
// Socket is readable, now actually read the data
bytesReceived = recvfrom(
handshakeFdDesc.native_handle(),
responseBuffer, sizeof(responseBuffer), 0,
(struct sockaddr*)&senderAddr, &senderAddrLen);
if (bytesReceived > 0)
{
socketState.store(SocketState::SOCKET_RECV_SUCCESS);
} else if (bytesReceived == 0)
{
socketState.store(SocketState::SOCKET_RECV_ERROR);
std::cerr << __func__ << ": Received 0 bytes from recvfrom"
<< std::endl;
} else
{
socketState.store(SocketState::SOCKET_ERROR);
std::cerr << __func__ << ": recvfrom failed: "
<< strerror(errno) << " (errno: " << errno << ")"
<< std::endl;
}
}
executeHandshakeReq2();
}
void executeHandshakeReq2()
{
// Ensure we only execute once using atomic exchange
if (handlerExecuted.exchange(true) == true) { return; }
// Examine the flags and decide what happened
SocketState finalSocketState = socketState.load();
bool finalTimerFired = timerFired.load();
// Cancel timer if still running
timeoutTimer.cancel();
// Check for timeout only if there was no socket activity
if (finalTimerFired
&& finalSocketState == SocketState::SOCKET_STILL_WAITING)
{
std::cerr << __func__ << ": Handshake timeout with "
<< deviceIP << "(" << device.discoveredDevice.deviceIdentifier
<< ")" << std::endl;
callOriginalCallbackWithFailure();
return;
}
// Socket error from boost::asio
if (finalSocketState == SocketState::SOCKET_ERROR)
{
std::cerr << __func__ << ": Socket error during handshake with "
<< deviceIP << std::endl;
callOriginalCallbackWithFailure();
return;
}
if (finalSocketState == SocketState::SOCKET_RECV_ERROR)
{
std::cerr << __func__ << ": Receive error during handshake with "
<< deviceIP << std::endl;
callOriginalCallbackWithFailure();
return;
}
/* Result must have been RECV_SUCCESS state if we reach here.
* Data was already read in the async callback, just validate it
*/
if (bytesReceived < (ssize_t)sizeof(comms::HandshakeResponse))
{
std::cerr << __func__ << ": Response of size " << bytesReceived
<< " too small from " << deviceIP << std::endl;
callOriginalCallbackWithFailure();
return;
}
comms::HandshakeResponse* resp =
reinterpret_cast<comms::HandshakeResponse*>(responseBuffer);
/** EXPLANATION:
* Following the clean receiving flow:
* 1. Swap CRC32 to host endianness first
* 2. Validate CRC32
* 3. Swap CRC16 to host endianness
* 4. Validate CRC16
* 5. Swap content to host endianness
*/
resp->footer.swapCrc32ToHostEndianness();
if (!resp->validateCrc32())
{
std::cerr << __func__ << ": CRC32 validation failed from "
<< deviceIP << std::endl;
callOriginalCallbackWithFailure();
return;
}
resp->header.swapCrc16ToHostEndianness();
if (!resp->header.validateCrc16())
{
std::cerr << __func__ << ": CRC16 validation failed from "
<< deviceIP << std::endl;
callOriginalCallbackWithFailure();
return;
}
resp->swapContentsToHostEndianness();
if (!resp->sanityCheck() || resp->ret_code != 0x00)
{
std::cerr << __func__ << ": Invalid response from "
<< deviceIP << std::endl;
callOriginalCallbackWithFailure();
return;
}
if (OptionParser::getOptions().verbose)
{
std::cout << __func__ << ": Handshake successful with "
<< deviceIP << "("
<< device.discoveredDevice.deviceIdentifier
<< ")" << "\n";
}
// Transfer any successful state to Device
commit();
int rawFd = handshakeFdDesc.release();
callOriginalCallback(true, rawFd);
}
void commit() // Transfer successful state to Device object
{
// Clean up resources (timer) but not the socket FD
// The socket FD is returned to the caller, not transferred to the device
timeoutTimer.cancel();
}
void cleanupHandshakeSocket()
{
int fd = handshakeFdDesc.release();
if (fd != -1) {
close(fd);
}
}
void cleanup() // Clean up transient resources
{
timeoutTimer.cancel();
cleanupHandshakeSocket();
}
};
void Device::executeHandshakeReq(
const std::string& deviceIP, Device::executeHandshakeReqCbFn callback
)
{
// Create the handshake request object to hold state and callbacks
auto request = std::make_shared<ExecuteHandshakeReq>(
*this, deviceIP, std::move(callback));
// Check if detectedSmoListeningIp is empty - this should not happen
if (detectedSmoListeningIp.empty())
{
// This should not happen as it should be set by the calling method
request->callOriginalCallbackWithFailure();
return;
}
try {
if (!request->setupSocket())
{
request->callOriginalCallbackWithFailure();
return;
}
if (!request->sendHandshakeRequest())
{
request->callOriginalCallbackWithFailure();
return;
}
request->setupAsyncCallbacks(request);
} catch (const std::exception& e) {
std::cerr << __func__ << ": Handshake failed with " << deviceIP << ": "
<< e.what() << std::endl;
request->callOriginalCallbackWithFailure();
}
}
void Device::disconnectReq(Device::disconnectReqCbFn callback)
{
// Stop heartbeat first
heartbeatActive.store(false);
if (heartbeatFd == -1)
{
std::cout << __func__ << ": No heartbeat socket available, skipping "
"disconnect message" << std::endl;
callback(true);
return;
}
// Create disconnect message
comms::DisconnectMessage disconnectMsg;
disconnectMsg.swapContentsToProtocolEndianness();
disconnectMsg.header.setCrc16FromRawBytes();
disconnectMsg.header.swapCrc16ToProtocolEndianness();
disconnectMsg.footer.crc_32 = disconnectMsg.calculateCrc32();
disconnectMsg.footer.swapCrc32ToProtocolEndianness();
// Set up destination address
struct sockaddr_in deviceAddr;
deviceAddr.sin_family = AF_INET;
deviceAddr.sin_addr.s_addr = inet_addr(discoveredDevice.ipAddr.c_str());
deviceAddr.sin_port = htons(65000); // Commands go to port 65000
// Send disconnect message
ssize_t bytesSent = sendto(
heartbeatFd, &disconnectMsg, sizeof(disconnectMsg), 0,
(struct sockaddr*)&deviceAddr, sizeof(deviceAddr));
if (bytesSent < 0)
{
std::cerr << __func__ << ": Failed to send disconnect message: "
<< strerror(errno) << std::endl;
// Continue with disconnect even if message send fails
}
std::cout << __func__ << ": Sent disconnect message to "
<< discoveredDevice.ipAddr << ":" << 65000 << std::endl;
// Close the heartbeat socket
close(heartbeatFd);
heartbeatFd = -1;
callback(true);
}
std::string Device::generateClientDeviceIpFromSerialNumber(
const std::string& broadcastCode
)
{
/** EXPLANATION:
* The input string is either a serial number (14 chars) or a broadcast code
* (15 chars). We need to determine which one it is and extract the serial
* number from the broadcast code.
*
* To generate a default IP address, we use the device's subnet: X.X.X.1XX
* where XX = last two digits of serial. We use the smoIp and smoSubnetNbits
* to determine the network prefix.
*/
if (broadcastCode.empty())
{
throw std::invalid_argument(
std::string(__func__) + ": Broadcast code cannot be empty");
}
std::string serialNumber;
if (broadcastCode.length() == 14)
{
// Input is a serial number
serialNumber = broadcastCode;
} else if (broadcastCode.length() == 15)
{
// Input is a broadcast code (serial + selector)
serialNumber = broadcastCode.substr(0, 14);
} else
{
// Invalid length
throw std::invalid_argument(
std::string(__func__) +
": Broadcast code must be 14 or 15 characters long");
}
// Extract last two digits of serial number
if (serialNumber.length() < 2)
{
throw std::invalid_argument(
std::string(__func__) + ": Serial number too short");
}
std::string lastTwoDigits = serialNumber.substr(serialNumber.length() - 2);
// Validate that last two characters are digits
if (lastTwoDigits[0] < '0' || lastTwoDigits[0] > '9' ||
lastTwoDigits[1] < '0' || lastTwoDigits[1] > '9')
{
throw std::invalid_argument(
std::string(__func__) +
": Last two characters of serial number must be digits");
}
/** EXPLANATION:
* Use the device's subnet: X.X.X.1XX where XX = last two digits of serial.
* We use the smoIp and smoSubnetNbits to determine the network prefix.
*/
// Parse smoIp to extract network prefix
auto smoIpOctets = comms::parseIPv4Address(smoIp);
if (!smoIpOctets.has_value())
{
throw std::invalid_argument(
std::string(__func__) + ": Invalid smoIp format: must be X.X.X.X");
}
// Generate subnet mask based on nbits
uint32_t subnetMask = getSubnetMaskFor(smoSubnetNbits);
uint32_t smoIpAddr = (std::stoi(smoIpOctets->octet1) << 24) |
(std::stoi(smoIpOctets->octet2) << 16) |
(std::stoi(smoIpOctets->octet3) << 8) |
std::stoi(smoIpOctets->octet4);
// Apply subnet mask to get network prefix
uint32_t networkPrefix = smoIpAddr & subnetMask;
// Extract octets from network prefix
uint8_t octet1 = (networkPrefix >> 24) & 0xFF;
uint8_t octet2 = (networkPrefix >> 16) & 0xFF;
uint8_t octet3 = (networkPrefix >> 8) & 0xFF;
// Use the first three octets and append "1" + last two digits
return std::to_string(octet1) + "." + std::to_string(octet2) + "." +
std::to_string(octet3) + ".1" + lastTwoDigits;
}
void Device::startHeartbeat()
{
if (!componentThread || discoveredDevice.ipAddr.empty())
{
throw std::runtime_error(
std::string(__func__) +
": Can't start heartbeat without component thread or IP");
}
// Check if we have the handshake socket available for heartbeat use
if (heartbeatFd < 0)
{
throw std::runtime_error(
std::string(__func__) +
": Expected to find handshake socket present but didn't find it");
}
// Create heartbeat timer
heartbeatTimer = std::make_unique<boost::asio::deadline_timer>(
componentThread->getIoService());
heartbeatActive.store(true);
// Send first heartbeat immediately
sendHeartbeat();
}
void Device::sendHeartbeat()
{
if (!heartbeatActive.load())
{
std::cerr << __func__ << ": Ending heartbeat loop due to "
"heartbeatActive==false.\n";
return;
}
if (heartbeatFd < 0 || discoveredDevice.ipAddr.empty())
{
std::cerr << __func__ << ": Ending heartbeat loop due to "
"heartbeatFd==-1 or discoveredDevice.ipAddr.empty().\n";
return;
}
try {
comms::HeartbeatMessage heartbeatMsg;
heartbeatMsg.swapContentsToProtocolEndianness();
heartbeatMsg.header.setCrc16FromRawBytes();
heartbeatMsg.header.swapCrc16ToProtocolEndianness();
heartbeatMsg.footer.crc_32 = heartbeatMsg.calculateCrc32();
heartbeatMsg.footer.swapCrc32ToProtocolEndianness();
// Set up destination address for raw socket
struct sockaddr_in deviceAddr;
memset(&deviceAddr, 0, sizeof(deviceAddr));
deviceAddr.sin_family = AF_INET;
deviceAddr.sin_addr.s_addr = inet_addr(discoveredDevice.ipAddr.c_str());
// Heartbeats and commands go to port 65000
deviceAddr.sin_port = htons(65000);
ssize_t bytesSent = sendto(
heartbeatFd, &heartbeatMsg, sizeof(heartbeatMsg), 0,
(struct sockaddr*)&deviceAddr, sizeof(deviceAddr));
if (bytesSent < 0)
{
std::cerr << "[" << __func__ << "] Failed to send heartbeat: "
<< strerror(errno) << std::endl;
return;
}
/** EXPLANATION:
* Schedule next heartbeat in 1 second, per the spec.
*/
heartbeatTimer->expires_from_now(boost::posix_time::seconds(1));
heartbeatTimer->async_wait(
[this](const boost::system::error_code& error) {
onHeartbeatTimer(error);
}
);
}
catch (const std::exception& e)
{
heartbeatActive.store(false);
std::cerr << __func__ << ": Heartbeat send failed for device "
<< discoveredDevice.deviceIdentifier
<< ": " << e.what() << std::endl;
}
}
void Device::onHeartbeatTimer(const boost::system::error_code& error)
{
// Timer was cancelled, heartbeat stopped
if (error == boost::asio::error::operation_aborted) {
return;
}
if (error)
{
heartbeatActive.store(false);
std::cerr << "[" << __func__ << "] Heartbeat timer error for device "
<< discoveredDevice.deviceIdentifier
<< ": " << error.message() << std::endl;
return;
}
// Send next heartbeat
sendHeartbeat();
}
uint32_t Device::getSubnetMaskFor(uint8_t nbits)
{
if (nbits > 32) {
throw std::invalid_argument(
std::string(__func__) + ": nbits must be between 0 and 32");
}
// Generate subnet mask: set the first nbits to 1, rest to 0
if (nbits == 0) {
return 0x00000000;
} else if (nbits == 32) {
return 0xFFFFFFFF;
} else {
// Create mask with nbits set to 1 from the left
return (0xFFFFFFFF << (32 - nbits));
}
}
std::optional<std::string> Device::detectSmoIp(const std::string& deviceIP)
{
/** EXPLANATION:
* This function detects the SMO IP address of the interface that's facing
* the device by iterating through all network interfaces and checking for
* the interface that has the IP address in the same subnet as the device's
* IP address.
*/
try {
// Parse the device IP to get the network prefix
auto deviceIpOctets = comms::parseIPv4Address(deviceIP);
if (!deviceIpOctets.has_value()) {
return std::nullopt;
}
// Convert device IP octets to integers for bitwise operations
uint32_t deviceIpAddr = (std::stoi(deviceIpOctets->octet1) << 24) |
(std::stoi(deviceIpOctets->octet2) << 16) |
(std::stoi(deviceIpOctets->octet3) << 8) |
std::stoi(deviceIpOctets->octet4);
// Generate subnet mask based on nbits
uint32_t subnetMask = getSubnetMaskFor(smoSubnetNbits);
/* Get all network interfaces using getifaddrs (Linux/Unix specific)
*
* FIXME: Add Windows support using GetAdaptersAddresses when porting
*/
struct ifaddrs *ifaddr;
if (getifaddrs(&ifaddr) == -1) {
return std::nullopt;
}
// Use unique_ptr for automatic cleanup (RAII) to free ifaddrs
auto ifaddr_deleter = [](struct ifaddrs* ptr) { freeifaddrs(ptr); };
std::unique_ptr<struct ifaddrs, decltype(ifaddr_deleter)> ifaddr_ptr(
ifaddr, ifaddr_deleter);
std::string found_ip;
/** EXPLANATION:
* Iterate through all network interfaces and check if the IP address is
* in the same subnet as the device's IP address.
*/
for (struct ifaddrs *ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next)
{
if (ifa->ifa_addr == nullptr) continue;
// Check if it's IPv4
if (ifa->ifa_addr->sa_family != AF_INET) { continue; }
// Get the IPv4 address
struct sockaddr_in* addr_in = (struct sockaddr_in*)ifa->ifa_addr;
char ip_str[INET_ADDRSTRLEN];
if (inet_ntop(
AF_INET, &addr_in->sin_addr, ip_str, INET_ADDRSTRLEN)
== nullptr)
{
continue;
}
std::string ip = ip_str;
// Check if this IP is in the same subnet
auto ipOctets = comms::parseIPv4Address(ip);
if (!ipOctets.has_value()) { continue; }
// Convert IP octets to integer
uint32_t ipAddr = (std::stoi(ipOctets->octet1) << 24) |
(std::stoi(ipOctets->octet2) << 16) |
(std::stoi(ipOctets->octet3) << 8) |
std::stoi(ipOctets->octet4);
/* Check if this iface's IP is in the same subnet as the device's IP
* using the calculated mask. Only compare the bits that are set in
* the subnet mask.
*/
if ((ipAddr & subnetMask) == (deviceIpAddr & subnetMask))
{
found_ip = ip;
break;
}
}
// Return the found IP (empty string if none found)
if (!found_ip.empty()) {
return found_ip;
}
return std::nullopt;
} catch (const std::exception& e) {
std::cerr << "Error detecting SMO IP: " << e.what() << std::endl;
return std::nullopt;
}
}
std::optional<std::string> Device::getSmoIp(const std::string& deviceIP)
{
/** EXPLANATION:
* This is only used when connecting to a device that's already known to
* the broadcastListener.
* It is NOT and SHOULD not be used when connecting by heuristic IP
* construction using the client device's serial number.
*
* Determines the SMO listening IP address for this device.
* If smoIp is provided, validate it against detected IP and return it.
* If smoIp is empty, attempt auto-detection based on device IP.
* Returns std::optional<std::string> - empty if detection fails.
*/
auto detectedIp = detectSmoIp(deviceIP);
if (!smoIp.empty())
{
// smoIp was provided, validate it against detected IP
if (detectedIp.has_value() && detectedIp.value() != smoIp)
{
// Print warning if provided smoIp doesn't match detected IP
std::cerr << "Warning: Provided smo-ip (" << smoIp
<< ") doesn't match detected IP (" << detectedIp.value()
<< ") for device " << discoveredDevice.deviceIdentifier
<< " @(" << deviceIP << ")"
<< ". Using provided smo-ip anyway." << std::endl;
}
return smoIp;
}
if (detectedIp.has_value()) {
return detectedIp.value();
}
// Auto-detection failed
return std::nullopt;
}
} // namespace livoxProto1