Files
salmanoff/commonLibs/livoxProto1/device.cpp
T

1503 lines
41 KiB
C++

#include <boostAsioLinkageFix.h>
#include <sstream>
#include <thread>
#include <chrono>
#include <string>
#include <stdexcept>
#include <memory>
#include <unistd.h>
#include <ifaddrs.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <errno.h>
#include <cstring>
#include <netinet/in.h>
#include <optional>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <opts.h>
#include "device.h"
#include "protocol.h"
#include "core.h"
/** EXPLANATION:
* This file contains the implementation of the Device class.
*
* FIXME:
* We may need to check how smo-subnet-nbits is used in here because we didn't
* actually check to see under what conditions it's required vs optional. Hence
* we don't currently enforce correct usage of it, and we just assume that the
* livoxGen1's policy of supplying a default value of 24 is correct.
*/
namespace livoxProto1 {
// Static member definition for devices under construction
std::unordered_map<std::string, std::vector<Device::CommandHandler>>
Device::devicesUnderConstruction;
namespace comms {
DiscoveredDevice::DiscoveredDevice(
const std::string &deviceIdentifier,
DeviceType deviceType,
const std::string &ipAddr)
: deviceIdentifier(deviceIdentifier),
deviceType(deviceType),
ipAddr(ipAddr)
{
}
DiscoveredDevice::DiscoveredDevice(
const BroadcastMessage &msg, const std::string &ipAddr
)
: DiscoveredDevice(
reinterpret_cast<const char*>(msg.broadcast_code),
static_cast<DeviceType>(msg.dev_type),
ipAddr)
{
}
std::string DiscoveredDevice::stringify(void) const
{
std::ostringstream oss;
oss << "DiscoveredDevice{"
<< "identifier='" << deviceIdentifier << "', "
<< "ipAddr='" << ipAddr << "', "
<< "deviceType=" << (int)deviceType << " (" << getDeviceTypeName() << ")"
<< "}";
return oss.str();
}
std::string DiscoveredDevice::getDeviceTypeName(void) const
{
switch (deviceType)
{
case DeviceType::Hub: return "Hub";
case DeviceType::Mid40: return "Mid-40";
case DeviceType::Tele15: return "Tele-15";
case DeviceType::Horizon: return "Horizon";
case DeviceType::Mid70: return "Mid-70";
case DeviceType::Avia: return "Avia";
default: return "Unknown";
}
}
} // namespace comms
Device::Device(const std::string &deviceIdentifier,
const std::shared_ptr<sscl::ComponentThread>& componentThread,
int commandTimeoutMs, int retryDelayMs,
const std::string& smoIp, uint8_t smoSubnetNbits,
uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort)
: discoveredDevice(
deviceIdentifier, comms::DeviceType::Mid40,
// Initialize empty. IP will be set upon successful connection.
""),
nAttachedStimulusProducers(0),
componentThread(componentThread),
commandTimeoutMs(commandTimeoutMs), retryDelayMs(retryDelayMs),
smoIp(smoIp), detectedSmoListeningIp(""), smoSubnetNbits(smoSubnetNbits),
dataPort(dataPort), cmdPort(cmdPort), imuPort(imuPort),
heartbeatActive(false),
pcloudDataActive(false)
{
}
Device::~Device()
{
stopHeartbeat();
if (pcloudDataActive.load()) {
pcloudDataActive.store(false);
}
heartbeatTimer.reset();
}
namespace {
constexpr uint8_t CMD_SET_GENERAL = 0x00;
constexpr uint8_t CMD_ID_HANDSHAKE_ACK = 0x01;
constexpr uint8_t CMD_ID_SAMPLING_RESPONSE = 0x04;
constexpr uint8_t CMD_SET_LIDAR = 0x01;
constexpr uint8_t CMD_ID_SET_RETURN_MODE_RESPONSE = 0x06;
constexpr uint8_t CMD_ID_GET_RETURN_MODE_RESPONSE = 0x07;
constexpr uint16_t LIVOX_COMMAND_PORT = 65000;
using UdpCommandResponseResult = comms::UdpCommandResponseResult;
comms::UdpCommandDemuxer *getUdpCommandDemuxer()
{
auto &protoState = livoxProto1::getProtoState();
if (!protoState.deviceManager) {
return nullptr;
}
return &protoState.deviceManager->udpCommandDemuxer;
}
std::shared_ptr<boost::asio::posix::stream_descriptor> getCmdEndpointFdDesc()
{
auto &protoState = livoxProto1::getProtoState();
if (!protoState.deviceManager) {
return nullptr;
}
return protoState.deviceManager->udpCommandDemuxer.getCmdEndpointFdDesc();
}
bool sendHandshakeRequest(
Device &device,
const std::string &deviceIP,
int cmdSocketFd)
{
/** EXPLANATION:
* Prepare handshake request.
*/
comms::HandshakeRequest handshakeReq(
device.detectedSmoListeningIp,
device.dataPort, device.cmdPort, device.imuPort);
handshakeReq.swapContentsToProtocolEndianness();
handshakeReq.header.setCrc16FromRawBytes();
handshakeReq.header.swapCrc16ToProtocolEndianness();
handshakeReq.footer.crc_32 = handshakeReq.calculateCrc32();
handshakeReq.footer.swapCrc32ToProtocolEndianness();
struct sockaddr_in deviceAddr;
memset(&deviceAddr, 0, sizeof(deviceAddr));
deviceAddr.sin_family = AF_INET;
deviceAddr.sin_addr.s_addr = inet_addr(deviceIP.c_str());
deviceAddr.sin_port = htons(LIVOX_COMMAND_PORT);
ssize_t bytesSent = sendto(
cmdSocketFd,
&handshakeReq, sizeof(comms::HandshakeRequest), 0,
(struct sockaddr*)&deviceAddr, sizeof(deviceAddr));
if (bytesSent < 0)
{
std::cerr << __func__ << ": Failed to send handshake request: "
<< strerror(errno) << std::endl;
return false;
}
return true;
}
bool processHandshakeResponse(
const UdpCommandResponseResult &responseResult,
Device &device,
const std::string &deviceIP)
{
if (responseResult.outcome == UdpCommandResponseResult::Outcome::Timeout)
{
std::cerr << __func__ << ": Command timeout with "
<< deviceIP << "(" << device.discoveredDevice.deviceIdentifier
<< ")" << std::endl;
return false;
}
if (responseResult.outcome == UdpCommandResponseResult::Outcome::RecvError)
{
std::cerr << __func__ << ": Receive error during handshake with "
<< deviceIP << std::endl;
return false;
}
const ssize_t bytesReceived = responseResult.bytesReceived;
if (bytesReceived < static_cast<ssize_t>(sizeof(comms::HandshakeResponse)))
{
std::cerr << __func__ << ": Response of size " << bytesReceived
<< " too small from " << deviceIP << std::endl;
return false;
}
comms::HandshakeResponse *resp = reinterpret_cast<comms::HandshakeResponse*>(
const_cast<uint8_t*>(responseResult.buffer));
/** EXPLANATION:
* Following the clean receiving flow:
* 1. Swap CRC32 to host endianness first
* 2. Validate CRC32
* 3. Swap CRC16 to host endianness
* 4. Validate CRC16
* 5. Swap content to host endianness
*/
resp->footer.swapCrc32ToHostEndianness();
if (!resp->validateCrc32())
{
std::cerr << __func__ << ": CRC32 validation failed from "
<< deviceIP << std::endl;
return false;
}
resp->header.swapCrc16ToHostEndianness();
if (!resp->header.validateCrc16())
{
std::cerr << __func__ << ": CRC16 validation failed from "
<< deviceIP << std::endl;
return false;
}
resp->swapContentsToHostEndianness();
if (!resp->sanityCheck() || resp->ret_code != 0x00)
{
std::cerr << __func__ << ": Invalid response from "
<< deviceIP << std::endl;
return false;
}
if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose)
{
std::cout << __func__ << ": Handshake successful with "
<< deviceIP << "("
<< device.discoveredDevice.deviceIdentifier
<< ")" << "\n";
}
return true;
}
bool sendEnDisablePcloudCommand(
Device &device, uint8_t enableFlag, const char *commandName)
{
livoxProto1::comms::StartStopSamplingMessage message;
message.enable = enableFlag;
message.swapContentsToProtocolEndianness();
message.header.setCrc16FromRawBytes();
message.header.swapCrc16ToProtocolEndianness();
message.footer.crc_32 = message.calculateCrc32();
message.footer.swapCrc32ToProtocolEndianness();
struct sockaddr_in deviceAddr;
memset(&deviceAddr, 0, sizeof(deviceAddr));
deviceAddr.sin_family = AF_INET;
deviceAddr.sin_addr.s_addr =
inet_addr(device.discoveredDevice.ipAddr.c_str());
deviceAddr.sin_port = htons(LIVOX_COMMAND_PORT);
auto &protoState = livoxProto1::getProtoState();
if (!protoState.deviceManager)
{
std::cerr << __func__ << ": No device manager available" << std::endl;
return false;
}
auto cmdEndpointFdDesc = getCmdEndpointFdDesc();
if (!cmdEndpointFdDesc)
{
std::cerr << __func__ << ": No command endpoint available" << std::endl;
return false;
}
ssize_t bytesSent = sendto(
cmdEndpointFdDesc->native_handle(),
&message, sizeof(message), 0,
(struct sockaddr*)&deviceAddr, sizeof(deviceAddr));
if (bytesSent < 0)
{
std::cerr << __func__ << ": Failed to send " << commandName
<< " command: " << strerror(errno) << std::endl;
return false;
}
return true;
}
bool processSamplingResponse(
const UdpCommandResponseResult &responseResult,
Device &device,
const char *failureContext)
{
(void)failureContext;
if (responseResult.outcome == UdpCommandResponseResult::Outcome::Timeout)
{
std::cerr << __func__ << ": Command timeout for device "
<< device.discoveredDevice.deviceIdentifier
<< std::endl;
return false;
}
if (responseResult.outcome == UdpCommandResponseResult::Outcome::RecvError)
{
std::cerr
<< __func__ << ": Receive error during command for device "
<< device.discoveredDevice.deviceIdentifier
<< std::endl;
return false;
}
const ssize_t bytesReceived = responseResult.bytesReceived;
if (bytesReceived < static_cast<ssize_t>(sizeof(comms::SamplingResponse)))
{
std::cerr << __func__ << ": Response of size "
<< bytesReceived
<< " is too small for sampling response (expected "
<< sizeof(comms::SamplingResponse) << ")"
<< std::endl;
return false;
}
comms::SamplingResponse *response =
reinterpret_cast<comms::SamplingResponse*>(
const_cast<uint8_t*>(responseResult.buffer));
response->swapContentsToHostEndianness();
if (!response->sanityCheck())
{
std::cerr << __func__ << ": Invalid sampling response structure.\n";
return false;
}
if (!(response->command.cmd_set == CMD_SET_GENERAL &&
response->command.cmd_id == CMD_ID_SAMPLING_RESPONSE &&
response->ret_code == 0x00))
{
if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose)
{
std::cout << __func__ << ": Failed to en/disable pcloud data "
"for device "
"(" << device.discoveredDevice.deviceIdentifier
<< ") @(" << device.discoveredDevice.ipAddr << "). "
<< "cmd_set: " << (int)response->command.cmd_set
<< ", cmd_id: " << (int)response->command.cmd_id
<< ", ret_code: " << (int)response->ret_code << "\n";
}
return false;
}
return true;
}
bool sendSetReturnModeCommand(Device &device, uint8_t returnMode)
{
auto cmdEndpointFdDesc = getCmdEndpointFdDesc();
if (!cmdEndpointFdDesc)
{
std::cerr << __func__ << ": No command endpoint available.\n";
return false;
}
comms::SetLiDARReturnMode setReturnModeMsg;
setReturnModeMsg.mode = returnMode;
setReturnModeMsg.swapContentsToProtocolEndianness();
setReturnModeMsg.header.setCrc16FromRawBytes();
setReturnModeMsg.header.swapCrc16ToProtocolEndianness();
setReturnModeMsg.footer.crc_32 = setReturnModeMsg.calculateCrc32();
setReturnModeMsg.footer.swapCrc32ToProtocolEndianness();
struct sockaddr_in deviceAddr;
deviceAddr.sin_family = AF_INET;
deviceAddr.sin_addr.s_addr = inet_addr(
device.discoveredDevice.ipAddr.c_str());
deviceAddr.sin_port = htons(LIVOX_COMMAND_PORT);
ssize_t bytesSent = sendto(
cmdEndpointFdDesc->native_handle(),
&setReturnModeMsg, sizeof(setReturnModeMsg), 0,
(struct sockaddr*)&deviceAddr, sizeof(deviceAddr));
if (bytesSent < 0)
{
std::cerr << __func__ << ": Failed to send set return mode message: "
<< strerror(errno) << std::endl;
return false;
}
std::cout << __func__ << ": Sent set return mode message to "
<< device.discoveredDevice.ipAddr << ":" << LIVOX_COMMAND_PORT
<< std::endl;
return true;
}
bool processSetReturnModeResponse(
const UdpCommandResponseResult &responseResult,
Device &device,
uint8_t returnMode)
{
if (responseResult.outcome == UdpCommandResponseResult::Outcome::Timeout)
{
std::cerr << __func__ << ": Set return mode timeout with "
<< device.discoveredDevice.ipAddr << "("
<< device.discoveredDevice.deviceIdentifier << ")" << "\n";
return false;
}
if (responseResult.outcome == UdpCommandResponseResult::Outcome::RecvError)
{
std::cerr << __func__ << ": Receive error during set return mode with "
<< device.discoveredDevice.ipAddr << "\n";
return false;
}
const ssize_t bytesReceived = responseResult.bytesReceived;
if (bytesReceived
< static_cast<ssize_t>(sizeof(comms::SetLiDARReturnModeResponse)))
{
std::cerr << __func__ << ": Response of size "
<< bytesReceived << " too small from "
<< device.discoveredDevice.ipAddr << "\n";
return false;
}
comms::SetLiDARReturnModeResponse *response =
reinterpret_cast<comms::SetLiDARReturnModeResponse*>(
const_cast<uint8_t*>(responseResult.buffer));
response->swapContentsToHostEndianness();
if (response->command.cmd_set != CMD_SET_LIDAR ||
response->command.cmd_id != CMD_ID_SET_RETURN_MODE_RESPONSE ||
response->ret_code != 0x00)
{
return false;
}
device.currentReturnMode = Device::ReturnMode(returnMode);
return true;
}
bool sendGetReturnModeCommand(Device &device)
{
auto cmdEndpointFdDesc = getCmdEndpointFdDesc();
if (!cmdEndpointFdDesc)
{
std::cerr << __func__ << ": No command endpoint available.\n";
return false;
}
comms::GetLiDARReturnMode getReturnModeMsg;
getReturnModeMsg.swapContentsToProtocolEndianness();
getReturnModeMsg.header.setCrc16FromRawBytes();
getReturnModeMsg.header.swapCrc16ToProtocolEndianness();
getReturnModeMsg.footer.crc_32 = getReturnModeMsg.calculateCrc32();
getReturnModeMsg.footer.swapCrc32ToProtocolEndianness();
struct sockaddr_in deviceAddr;
deviceAddr.sin_family = AF_INET;
deviceAddr.sin_addr.s_addr = inet_addr(device.discoveredDevice.ipAddr.c_str());
deviceAddr.sin_port = htons(LIVOX_COMMAND_PORT);
ssize_t bytesSent = sendto(
cmdEndpointFdDesc->native_handle(),
&getReturnModeMsg, sizeof(getReturnModeMsg), 0,
(struct sockaddr*)&deviceAddr, sizeof(deviceAddr));
if (bytesSent < 0)
{
std::cerr << __func__ << ": Failed to send get return mode message: "
<< strerror(errno) << std::endl;
return false;
}
std::cout << __func__ << ": Sent get return mode message to "
<< device.discoveredDevice.ipAddr << ":" << LIVOX_COMMAND_PORT
<< std::endl;
return true;
}
bool processGetReturnModeResponse(
const UdpCommandResponseResult &responseResult,
Device &device,
uint8_t &outReturnMode)
{
if (responseResult.outcome == UdpCommandResponseResult::Outcome::Timeout)
{
std::cerr << __func__ << ": Get return mode timeout with "
<< device.discoveredDevice.ipAddr << "("
<< device.discoveredDevice.deviceIdentifier
<< ")" << "\n";
return false;
}
if (responseResult.outcome == UdpCommandResponseResult::Outcome::RecvError)
{
std::cerr << __func__ << ": Receive error during get return mode with "
<< device.discoveredDevice.ipAddr << std::endl;
return false;
}
const ssize_t bytesReceived = responseResult.bytesReceived;
if (bytesReceived
< static_cast<ssize_t>(sizeof(comms::GetLiDARReturnModeResponse)))
{
std::cerr << __func__ << ": Response of size "
<< bytesReceived << " too small from "
<< device.discoveredDevice.ipAddr << std::endl;
return false;
}
comms::GetLiDARReturnModeResponse *response =
reinterpret_cast<comms::GetLiDARReturnModeResponse*>(
const_cast<uint8_t*>(responseResult.buffer));
response->swapContentsToHostEndianness();
if (!(response->command.cmd_set == CMD_SET_LIDAR &&
response->command.cmd_id == CMD_ID_GET_RETURN_MODE_RESPONSE &&
response->ret_code == 0x00))
{
return false;
}
device.currentReturnMode = Device::ReturnMode(response->mode);
outReturnMode = response->mode;
return true;
}
} // namespace
sscl::co::ViralNonPostingInvoker<bool> Device::connectCReq()
{
/** FIXME:
* WE need to assign the ipAddr to the Device being connected up.
*/
if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose) {
std::cout << __func__ << ": Trying to connect to known device" << "\n";
}
Device::ConnectIpResult knownResult = co_await connectToKnownDeviceCReq();
if (!knownResult.success)
{
if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose)
{
std::cout << __func__ << ": Trying to connect to device by "
<< "identifier" << "\n";
}
Device::ConnectIpResult idResult = co_await
connectByDeviceIdentifierCReq();
if (!idResult.success)
{ co_return false; }
discoveredDevice.ipAddr = idResult.ipAddr;
startHeartbeat();
}
else
{
discoveredDevice.ipAddr = knownResult.ipAddr;
startHeartbeat();
}
const bool success = true;
if (!success)
{
std::cerr << __func__ << ": Failed to connect to device "
"(" << discoveredDevice.deviceIdentifier
<< ") @(" << discoveredDevice.ipAddr << ").\n";
co_return false;
}
co_return success;
}
sscl::co::ViralNonPostingInvoker<Device::ConnectIpResult>
Device::connectToKnownDeviceCReq()
{
/** EXPLANATION:
* This function is used to connect to a device that is already known to the
* broadcastListener.
*/
Device::ConnectIpResult result;
auto &protoState = livoxProto1::getProtoState();
if (!protoState.deviceManager)
{
co_return result;
}
if (!protoState.deviceManager->broadcastListener.deviceExists(
discoveredDevice.deviceIdentifier))
{
co_return result;
}
std::shared_ptr<livoxProto1::comms::DiscoveredDevice> deviceInfo =
protoState.deviceManager->broadcastListener.getDevice(
discoveredDevice.deviceIdentifier);
if (!deviceInfo)
{
co_return result;
}
const std::string deviceIP = deviceInfo->ipAddr;
auto smoIpResult = getSmoIp(deviceIP);
if (!smoIpResult.has_value())
{
std::cerr << __func__ << ": Failed to detect SMO listening IP for "
<< "known device ("
<< discoveredDevice.deviceIdentifier << ")"
<< " @(" << deviceIP << ").\n";
co_return result;
}
if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose)
{
std::cout << __func__ << ": Detected SMO listening IP for known device "
<< discoveredDevice.deviceIdentifier
<< " @(" << deviceIP << ") is "
<< smoIpResult.value() << ". About to try to handshake.\n";
}
detectedSmoListeningIp = smoIpResult.value();
const bool handshakeSuccess = co_await executeHandshakeCReq(deviceIP);
result.success = handshakeSuccess;
result.ipAddr = deviceIP;
co_return result;
}
sscl::co::ViralNonPostingInvoker<Device::ConnectIpResult>
Device::connectByDeviceIdentifierCReq()
{
/** EXPLANATION:
* This method uses heuristic device IP construction from the serial number.
* This requires smoIp to be provided because:
* 1. We need the network prefix to generate a valid device IP address
* 2. Without a target device subnet, we cannot detect which interface faces the device
* 3. Therefore, if smoIp is omitted, heuristic construction is impossible
*
* If smoIp is not provided, the driver must rely only on broadcast advertisements
* from the device (handled by connectToKnownDeviceReq).
*/
Device::ConnectIpResult result;
if (smoIp.empty()) { co_return result; }
const std::string deviceIP = generateClientDeviceIpFromSerialNumber(
discoveredDevice.deviceIdentifier);
detectedSmoListeningIp = smoIp;
if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose)
{
std::cout << __func__ << ": About to try to connect to device by "
<< "identifier (" << discoveredDevice.deviceIdentifier << ")"
<< " at IP (" << smoIp << ").\n";
}
const bool handshakeSuccess = co_await executeHandshakeCReq(deviceIP);
result.success = handshakeSuccess;
result.ipAddr = deviceIP;
co_return result;
}
sscl::co::ViralNonPostingInvoker<bool> Device::executeHandshakeCReq(
const std::string &deviceIP)
{
auto &protoState = livoxProto1::getProtoState();
if (!protoState.deviceManager)
{ co_return false; }
auto cmdEndpointFdDesc = getCmdEndpointFdDesc();
if (!cmdEndpointFdDesc)
{
std::cerr << __func__ << ": UdpCommandDemuxer not started or no "
"command endpoint available." << std::endl;
co_return false;
}
if (detectedSmoListeningIp.empty()) { co_return false; }
if (!cmdEndpointFdDesc->is_open())
{
throw std::runtime_error(
std::string(__func__) +
": cmdEndpointFdDesc is null; cannot set up async callbacks "
"for device " + deviceIP + "("
+ discoveredDevice.deviceIdentifier + ")" + " handshake."
"Check UdpCommandDemuxer initialization."
);
}
try {
if (!sendHandshakeRequest(
*this, deviceIP, cmdEndpointFdDesc->native_handle()))
{
co_return false;
}
comms::UdpCommandDemuxer *demuxer = getUdpCommandDemuxer();
if (!demuxer) { co_return false; }
/** EXPLANATION:
* Register a UDP command handler for handshake ACK
* (cmd_set=0x00, cmd_id=0x01).
* The handler will be called by the UdpCommandDemuxer when a handshake
* response is received.
*/
const UdpCommandResponseResult responseResult = co_await
demuxer->waitForCommandResponseCReq(
CMD_SET_GENERAL, CMD_ID_HANDSHAKE_ACK,
deviceIP,
commandTimeoutMs);
co_return processHandshakeResponse(responseResult, *this, deviceIP);
} catch (const std::exception &e) {
std::cerr << __func__ << ": Handshake failed with " << deviceIP << ": "
<< e.what() << std::endl;
co_return false;
}
}
sscl::co::ViralNonPostingInvoker<bool> Device::disconnectCReq()
{
stopHeartbeat();
if (discoveredDevice.ipAddr.empty())
{
std::cout << __func__ << ": No device IP available, skipping "
"disconnect message" << std::endl;
co_return true;
}
auto cmdEndpointFdDesc = getCmdEndpointFdDesc();
if (!cmdEndpointFdDesc)
{
std::cout << __func__ << ": No command endpoint available, skipping "
"disconnect message" << std::endl;
co_return true;
}
comms::DisconnectMessage disconnectMsg;
disconnectMsg.swapContentsToProtocolEndianness();
disconnectMsg.header.setCrc16FromRawBytes();
disconnectMsg.header.swapCrc16ToProtocolEndianness();
disconnectMsg.footer.crc_32 = disconnectMsg.calculateCrc32();
disconnectMsg.footer.swapCrc32ToProtocolEndianness();
struct sockaddr_in deviceAddr;
deviceAddr.sin_family = AF_INET;
deviceAddr.sin_addr.s_addr = inet_addr(discoveredDevice.ipAddr.c_str());
deviceAddr.sin_port = htons(LIVOX_COMMAND_PORT);
ssize_t bytesSent = sendto(
cmdEndpointFdDesc->native_handle(),
&disconnectMsg, sizeof(disconnectMsg), 0,
(struct sockaddr*)&deviceAddr, sizeof(deviceAddr));
if (bytesSent < 0)
{
std::cerr << __func__ << ": Failed to send disconnect message: "
<< strerror(errno) << std::endl;
}
std::cout << __func__ << ": Sent disconnect message to "
<< discoveredDevice.ipAddr << ":" << LIVOX_COMMAND_PORT << std::endl;
co_return true;
}
sscl::co::ViralNonPostingInvoker<bool> Device::enablePcloudDataCReq()
{
if (discoveredDevice.ipAddr.empty())
{
std::cerr << __func__ << ": No device IP available for device "
<< discoveredDevice.deviceIdentifier << std::endl;
co_return false;
}
if (!sendEnDisablePcloudCommand(*this, 0x01, "enable pcloud data"))
{ co_return false; }
comms::UdpCommandDemuxer *demuxer = getUdpCommandDemuxer();
if (!demuxer)
{ co_return false; }
const UdpCommandResponseResult responseResult = co_await
demuxer->waitForCommandResponseCReq(
CMD_SET_GENERAL, CMD_ID_SAMPLING_RESPONSE,
discoveredDevice.ipAddr,
commandTimeoutMs);
if (!processSamplingResponse(responseResult, *this, "enable"))
{ co_return false; }
pcloudDataActive.store(true);
co_return true;
}
sscl::co::ViralNonPostingInvoker<bool> Device::disablePcloudDataCReq()
{
if (discoveredDevice.ipAddr.empty())
{
std::cerr << __func__ << ": No device IP available for device "
<< discoveredDevice.deviceIdentifier << std::endl;
co_return false;
}
/* Unconditionally close the pcloud data socket early since there's no good
* reason to only close it if the command packet succeeds and ACKs. We want
* to stop receiving data immediately when disable is requested.
*/
cleanupPcloudDataSocket();
if (!sendEnDisablePcloudCommand(*this, 0x00, "disable pcloud data"))
{ co_return false; }
comms::UdpCommandDemuxer *demuxer = getUdpCommandDemuxer();
if (!demuxer)
{ co_return false; }
const UdpCommandResponseResult responseResult = co_await
demuxer->waitForCommandResponseCReq(
CMD_SET_GENERAL,
CMD_ID_SAMPLING_RESPONSE,
discoveredDevice.ipAddr,
commandTimeoutMs);
if (!processSamplingResponse(responseResult, *this, "disable"))
{ co_return false; }
pcloudDataActive.store(false);
co_return true;
}
sscl::co::ViralNonPostingInvoker<bool>
Device::setReturnModeCReq(uint8_t returnMode)
{
if (discoveredDevice.ipAddr.empty())
{
std::cerr << __func__ << ": No device IP available for device "
<< discoveredDevice.deviceIdentifier << std::endl;
co_return false;
}
if (!sendSetReturnModeCommand(*this, returnMode)) { co_return false; }
comms::UdpCommandDemuxer *demuxer = getUdpCommandDemuxer();
if (!demuxer) { co_return false; }
const UdpCommandResponseResult responseResult = co_await
demuxer->waitForCommandResponseCReq(
CMD_SET_LIDAR,
CMD_ID_SET_RETURN_MODE_RESPONSE,
discoveredDevice.ipAddr,
commandTimeoutMs);
co_return processSetReturnModeResponse(
responseResult, *this, returnMode);
}
sscl::co::ViralNonPostingInvoker<Device::GetReturnModeResult>
Device::getReturnModeCReq()
{
Device::GetReturnModeResult result;
if (discoveredDevice.ipAddr.empty())
{
std::cerr << __func__ << ": No device IP available for device "
<< discoveredDevice.deviceIdentifier << std::endl;
co_return result;
}
if (!sendGetReturnModeCommand(*this)) { co_return result; }
comms::UdpCommandDemuxer *demuxer = getUdpCommandDemuxer();
if (!demuxer) { co_return result; }
const UdpCommandResponseResult responseResult = co_await
demuxer->waitForCommandResponseCReq(
CMD_SET_LIDAR,
CMD_ID_GET_RETURN_MODE_RESPONSE,
discoveredDevice.ipAddr,
commandTimeoutMs);
uint8_t returnMode = 0;
if (!processGetReturnModeResponse(responseResult, *this, returnMode))
{ co_return result; }
result.success = true;
result.returnMode = returnMode;
co_return result;
}
std::string Device::generateClientDeviceIpFromSerialNumber(
const std::string& broadcastCode
)
{
/** EXPLANATION:
* The input string is either a serial number (14 chars) or a broadcast code
* (15 chars). We need to determine which one it is and extract the serial
* number from the broadcast code.
*
* To generate a default IP address, we use the device's subnet: X.X.X.1XX
* where XX = last two digits of serial. We use the smoIp and smoSubnetNbits
* to determine the network prefix.
*/
if (broadcastCode.empty())
{
throw std::invalid_argument(
std::string(__func__) + ": Broadcast code cannot be empty");
}
std::string serialNumber;
if (broadcastCode.length() == 14)
{
// Input is a serial number
serialNumber = broadcastCode;
} else if (broadcastCode.length() == 15)
{
// Input is a broadcast code (serial + selector)
serialNumber = broadcastCode.substr(0, 14);
} else
{
// Invalid length
throw std::invalid_argument(
std::string(__func__) +
": Broadcast code must be 14 or 15 characters long");
}
// Extract last two digits of serial number
if (serialNumber.length() < 2)
{
throw std::invalid_argument(
std::string(__func__) + ": Serial number too short");
}
std::string lastTwoDigits = serialNumber.substr(serialNumber.length() - 2);
// Validate that last two characters are digits
if (lastTwoDigits[0] < '0' || lastTwoDigits[0] > '9' ||
lastTwoDigits[1] < '0' || lastTwoDigits[1] > '9')
{
throw std::invalid_argument(
std::string(__func__) +
": Last two characters of serial number must be digits");
}
/** EXPLANATION:
* Use the device's subnet: X.X.X.1XX where XX = last two digits of serial.
* We use the smoIp and smoSubnetNbits to determine the network prefix.
*/
// Parse smoIp to extract network prefix
auto smoIpOctets = comms::parseIPv4Address(smoIp);
if (!smoIpOctets.has_value())
{
throw std::invalid_argument(
std::string(__func__) + ": Invalid smoIp format: must be X.X.X.X");
}
// Generate subnet mask based on nbits
uint32_t subnetMask = getSubnetMaskFor(smoSubnetNbits);
uint32_t smoIpAddr = (std::stoi(smoIpOctets->octet1) << 24) |
(std::stoi(smoIpOctets->octet2) << 16) |
(std::stoi(smoIpOctets->octet3) << 8) |
std::stoi(smoIpOctets->octet4);
// Apply subnet mask to get network prefix
uint32_t networkPrefix = smoIpAddr & subnetMask;
// Extract octets from network prefix
uint8_t octet1 = (networkPrefix >> 24) & 0xFF;
uint8_t octet2 = (networkPrefix >> 16) & 0xFF;
uint8_t octet3 = (networkPrefix >> 8) & 0xFF;
// Use the first three octets and append "1" + last two digits
return std::to_string(octet1) + "." + std::to_string(octet2) + "." +
std::to_string(octet3) + ".1" + lastTwoDigits;
}
static void discardHeartbeatAck(
const uint8_t* data, ssize_t bytesReceived,
const struct sockaddr_in& senderAddr
)
{
(void)senderAddr;
// Check if we have enough data for a HeartbeatACK message
if (bytesReceived
< static_cast<ssize_t>(sizeof(livoxProto1::comms::HeartbeatACK)))
{
std::cout << __func__ << ": Received heartbeat ACK with insufficient "
"data (" << bytesReceived << " bytes, expected "
<< sizeof(livoxProto1::comms::HeartbeatACK) << ")" << std::endl;
return;
}
// Directly use a non-const reference to HeartbeatACK structure
livoxProto1::comms::HeartbeatACK& ack =
*reinterpret_cast<livoxProto1::comms::HeartbeatACK*>(
const_cast<uint8_t*>(data));
ack.swapContentsToHostEndianness();
if (!ack.validateCrc32())
{
std::cerr << __func__ << ": Discarded heartbeat ACK - CRC32 validation "
"failed" << std::endl;
return;
}
if (!ack.header.validateCrc16())
{
std::cerr << __func__ << ": Discarded heartbeat ACK - CRC16 validation "
"failed" << std::endl;
return;
}
if (!ack.sanityCheck())
{
std::cerr << __func__ << ": Discarded heartbeat ACK - sanity check "
"failed" << std::endl;
return;
}
if (ack.work_state == 0x01) { return; }
// Print work_state with human-readable description
std::string workStateStr;
switch (ack.work_state)
{
case 0x00:
workStateStr = "Initializing";
break;
case 0x01:
workStateStr = "Normal";
break;
case 0x02:
workStateStr = "Power-Saving";
break;
case 0x03:
workStateStr = "Standby";
break;
case 0x04:
workStateStr = "Error";
break;
default:
workStateStr = "Unknown";
break;
}
std::cerr << __func__ << ": Lidar not ready for operation: work_state: 0x"
<< std::hex << static_cast<int>(ack.work_state) << std::dec
<< " (" << workStateStr << "), ack_msg: 0x"
<< std::hex << ack.ack_msg << std::dec << std::endl;
}
void Device::startHeartbeat()
{
if (!componentThread || discoveredDevice.ipAddr.empty())
{
throw std::runtime_error(
std::string(__func__) +
": Can't start heartbeat without component thread or IP");
}
// Register heartbeat ACK handler (cmd_set=0x00, cmd_id=0x03)
sscl::SpinLock::Guard lock(heartbeatActiveLock);
registerUdpCommandHandler(
0x00, 0x03, discardHeartbeatAck, discoveredDevice.ipAddr);
// Create heartbeat timer
heartbeatTimer = std::make_unique<boost::asio::deadline_timer>(
componentThread->getIoContext());
heartbeatActive.store(true);
// Send first heartbeat immediately
sendHeartbeat();
}
void Device::stopHeartbeat()
{
{
sscl::SpinLock::Guard lock(heartbeatActiveLock);
heartbeatActive.store(false);
unregisterUdpCommandHandler(0x00, 0x03, discoveredDevice.ipAddr);
}
if (heartbeatTimer) {
heartbeatTimer->cancel();
heartbeatTimer.reset();
}
}
void Device::sendHeartbeat()
{
if (!heartbeatActive.load())
{
std::cerr << __func__ << ": Ending heartbeat loop due to "
"heartbeatActive==false.\n";
return;
}
if (discoveredDevice.ipAddr.empty())
{
std::cerr << __func__ << ": Ending heartbeat loop due to "
"discoveredDevice.ipAddr.empty().\n";
return;
}
// Get the command endpoint from the UdpCommandDemuxer
auto& protoState = livoxProto1::getProtoState();
if (!protoState.deviceManager)
{
std::cerr << __func__ << ": No device manager available\n";
return;
}
auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer
.getCmdEndpointFdDesc();
if (!cmdEndpointFdDesc)
{
std::cerr << __func__ << ": No command endpoint available\n";
return;
}
try {
comms::HeartbeatMessage heartbeatMsg;
heartbeatMsg.swapContentsToProtocolEndianness();
heartbeatMsg.header.setCrc16FromRawBytes();
heartbeatMsg.header.swapCrc16ToProtocolEndianness();
heartbeatMsg.footer.crc_32 = heartbeatMsg.calculateCrc32();
heartbeatMsg.footer.swapCrc32ToProtocolEndianness();
// Set up destination address for raw socket
struct sockaddr_in deviceAddr;
memset(&deviceAddr, 0, sizeof(deviceAddr));
deviceAddr.sin_family = AF_INET;
deviceAddr.sin_addr.s_addr = inet_addr(discoveredDevice.ipAddr.c_str());
// Heartbeats and commands go to port 65000
deviceAddr.sin_port = htons(65000);
ssize_t bytesSent = sendto(
cmdEndpointFdDesc->native_handle(),
&heartbeatMsg, sizeof(heartbeatMsg), 0,
(struct sockaddr*)&deviceAddr, sizeof(deviceAddr));
if (bytesSent < 0)
{
std::cerr << "[" << __func__ << "] Failed to send heartbeat: "
<< strerror(errno) << std::endl;
return;
}
/** EXPLANATION:
* Schedule next heartbeat in 1 second, per the spec.
*/
heartbeatTimer->expires_from_now(boost::posix_time::seconds(1));
heartbeatTimer->async_wait(
[this](const boost::system::error_code& error) {
onHeartbeatTimer(error);
}
);
}
catch (const std::exception& e)
{
std::cerr << __func__ << ": Heartbeat send failed for device "
<< discoveredDevice.deviceIdentifier
<< ": " << e.what() << std::endl;
}
}
void Device::onHeartbeatTimer(const boost::system::error_code& error)
{
// Timer was cancelled, heartbeat stopped
if (error == boost::asio::error::operation_aborted) {
return;
}
if (error)
{
std::cerr << "[" << __func__ << "] Heartbeat timer error for device "
<< discoveredDevice.deviceIdentifier
<< ": " << error.message() << std::endl;
return;
}
// Send next heartbeat
{
sscl::SpinLock::Guard lock(heartbeatActiveLock);
if (!heartbeatActive.load())
{ return; }
sendHeartbeat();
}
}
uint32_t Device::getSubnetMaskFor(uint8_t nbits)
{
if (nbits > 32) {
throw std::invalid_argument(
std::string(__func__) + ": nbits must be between 0 and 32");
}
// Generate subnet mask: set the first nbits to 1, rest to 0
if (nbits == 0) {
return 0x00000000;
} else if (nbits == 32) {
return 0xFFFFFFFF;
} else {
// Create mask with nbits set to 1 from the left
return (0xFFFFFFFF << (32 - nbits));
}
}
std::optional<std::string> Device::detectSmoIp(const std::string& deviceIP)
{
/** EXPLANATION:
* This function detects the SMO IP address of the interface that's facing
* the device by iterating through all network interfaces and checking for
* the interface that has the IP address in the same subnet as the device's
* IP address.
*/
try {
// Parse the device IP to get the network prefix
auto deviceIpOctets = comms::parseIPv4Address(deviceIP);
if (!deviceIpOctets.has_value()) {
return std::nullopt;
}
// Convert device IP octets to integers for bitwise operations
uint32_t deviceIpAddr = (std::stoi(deviceIpOctets->octet1) << 24) |
(std::stoi(deviceIpOctets->octet2) << 16) |
(std::stoi(deviceIpOctets->octet3) << 8) |
std::stoi(deviceIpOctets->octet4);
// Generate subnet mask based on nbits
uint32_t subnetMask = getSubnetMaskFor(smoSubnetNbits);
/* Get all network interfaces using getifaddrs (Linux/Unix specific)
*
* FIXME: Add Windows support using GetAdaptersAddresses when porting
*/
struct ifaddrs *ifaddr;
if (getifaddrs(&ifaddr) == -1) {
return std::nullopt;
}
// Use unique_ptr for automatic cleanup (RAII) to free ifaddrs
auto ifaddr_deleter = [](struct ifaddrs* ptr) { freeifaddrs(ptr); };
std::unique_ptr<struct ifaddrs, decltype(ifaddr_deleter)> ifaddr_ptr(
ifaddr, ifaddr_deleter);
std::string found_ip;
/** EXPLANATION:
* Iterate through all network interfaces and check if the IP address is
* in the same subnet as the device's IP address.
*/
for (struct ifaddrs *ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next)
{
if (ifa->ifa_addr == nullptr) continue;
// Check if it's IPv4
if (ifa->ifa_addr->sa_family != AF_INET) { continue; }
// Get the IPv4 address
struct sockaddr_in* addr_in = (struct sockaddr_in*)ifa->ifa_addr;
char ip_str[INET_ADDRSTRLEN];
if (inet_ntop(
AF_INET, &addr_in->sin_addr, ip_str, INET_ADDRSTRLEN)
== nullptr)
{
continue;
}
std::string ip = ip_str;
// Check if this IP is in the same subnet
auto ipOctets = comms::parseIPv4Address(ip);
if (!ipOctets.has_value()) { continue; }
// Convert IP octets to integer
uint32_t ipAddr = (std::stoi(ipOctets->octet1) << 24) |
(std::stoi(ipOctets->octet2) << 16) |
(std::stoi(ipOctets->octet3) << 8) |
std::stoi(ipOctets->octet4);
/* Check if this iface's IP is in the same subnet as the device's IP
* using the calculated mask. Only compare the bits that are set in
* the subnet mask.
*/
if ((ipAddr & subnetMask) == (deviceIpAddr & subnetMask))
{
found_ip = ip;
break;
}
}
// Return the found IP (empty string if none found)
if (!found_ip.empty()) {
return found_ip;
}
return std::nullopt;
} catch (const std::exception& e) {
std::cerr << "Error detecting SMO IP: " << e.what() << std::endl;
return std::nullopt;
}
}
std::optional<std::string> Device::getSmoIp(const std::string& deviceIP)
{
/** EXPLANATION:
* This is only used when connecting to a device that's already known to
* the broadcastListener.
* It is NOT and SHOULD not be used when connecting by heuristic IP
* construction using the client device's serial number.
*
* Determines the SMO listening IP address for this device.
* If smoIp is provided, validate it against detected IP and return it.
* If smoIp is empty, attempt auto-detection based on device IP.
* Returns std::optional<std::string> - empty if detection fails.
*/
auto detectedIp = detectSmoIp(deviceIP);
if (!smoIp.empty())
{
// smoIp was provided, validate it against detected IP
if (detectedIp.has_value() && detectedIp.value() != smoIp)
{
// Print warning if provided smoIp doesn't match detected IP
std::cerr << "Warning: Provided smo-ip (" << smoIp
<< ") doesn't match detected IP (" << detectedIp.value()
<< ") for device " << discoveredDevice.deviceIdentifier
<< " @(" << deviceIP << ")"
<< ". Using provided smo-ip anyway." << std::endl;
}
return smoIp;
}
if (detectedIp.has_value()) {
return detectedIp.value();
}
// Auto-detection failed
return std::nullopt;
}
void Device::cleanupPcloudDataSocket()
{
pcloudDataActive.store(false);
}
void Device::handleUdpDgram(
const uint8_t *data, ssize_t bytesReceived,
const struct sockaddr_in &senderAddr
)
{
// Check minimum size for any valid protocol message
if (bytesReceived < static_cast<ssize_t>(
sizeof(comms::Header) + sizeof(comms::Command)))
{
// Too small for header + command
return;
}
// Extract command set and command ID from the first two bytes after the header
uint8_t cmd_set = data[sizeof(comms::Header)];
uint8_t cmd_id = data[sizeof(comms::Header) + 1];
// Look for a registered handler for this command
auto key = std::make_pair(cmd_set, cmd_id);
auto it = udpCommandHandlers.find(key);
if (it != udpCommandHandlers.end())
{
// Found a registered handler, invoke it
try
{
it->second(data, bytesReceived, senderAddr);
}
catch (const std::exception& e)
{
std::cerr << __func__ << ": Exception in command handler for "
<< discoveredDevice.deviceIdentifier
<< " cmd_set=" << (int)cmd_set
<< " cmd_id=" << (int)cmd_id
<< ": " << e.what() << std::endl;
}
}
// Unknown command types are silently ignored
}
void Device::registerUdpCommandHandler(
uint8_t cmd_set, uint8_t cmd_id,
std::function<void(
const uint8_t* data, ssize_t bytesReceived,
const struct sockaddr_in& senderAddr)> handler,
const std::string& deviceIP
)
{
/** EXPLANATION:
* Register a UDP command handler for the given cmd_set and cmd_id.
* If the handler already exists for the given device IP, replace it.
* If the handler does not exist, add it to the temporary collection.
*
* Adding a handler to a cmd_set+cmd_id pair which already has a handler
* results in the new handler replacing the old one.
*/
auto key = std::make_pair(cmd_set, cmd_id);
udpCommandHandlers[key] = handler; // Don't move, we need to copy
/** EXPLANATION:
* Add to temporary collection if deviceIP is provided (not empty)
*/
if (!deviceIP.empty())
{
auto& handlers = devicesUnderConstruction[deviceIP];
auto it = std::find_if(handlers.begin(), handlers.end(),
[cmd_set, cmd_id](const CommandHandler& existing) {
return existing.cmd_set == cmd_set && existing.cmd_id == cmd_id;
});
// Create the new command handler
CommandHandler cmdHandler;
cmdHandler.cmd_set = cmd_set;
cmdHandler.cmd_id = cmd_id;
cmdHandler.handler = std::move(handler);
if (it != handlers.end()) {
// Replace existing handler
*it = std::move(cmdHandler);
} else {
// Add new handler
handlers.push_back(std::move(cmdHandler));
}
}
}
void Device::unregisterUdpCommandHandler(
uint8_t cmd_set, uint8_t cmd_id, const std::string& deviceIP
)
{
auto key = std::make_pair(cmd_set, cmd_id);
udpCommandHandlers.erase(key);
/** EXPLANATION:
* Remove from temporary collection if deviceIP is provided (not empty)
*/
if (!deviceIP.empty())
{
auto it = devicesUnderConstruction.find(deviceIP);
if (it != devicesUnderConstruction.end()) {
// Remove the specific command handler for this cmd_set/cmd_id
auto& handlers = it->second;
handlers.erase(
std::remove_if(handlers.begin(), handlers.end(),
[cmd_set, cmd_id](const CommandHandler& h) {
return h.cmd_set == cmd_set && h.cmd_id == cmd_id;
}),
handlers.end());
// If no handlers left for this IP, remove the entire entry
if (handlers.empty()) {
devicesUnderConstruction.erase(it);
}
}
}
}
} // namespace livoxProto1