livoxProto1: Convert heartbeat sender into daemon coro

This commit is contained in:
2026-06-10 07:02:07 -04:00
parent facb665217
commit f9c64cf363
2 changed files with 194 additions and 130 deletions
+172 -123
View File
@@ -1,6 +1,9 @@
#include <boostAsioLinkageFix.h>
#include <sstream>
#include <thread>
#include <chrono>
#include <iostream>
#include <string>
#include <stdexcept>
#include <memory>
@@ -15,7 +18,10 @@
#include <optional>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <componentThread.h>
#include <adapters/boostAsio/deadlineTimerAReq.h>
#include <opts.h>
#include <spinscale/co/nonViralCompletion.h>
#include "device.h"
#include "protocol.h"
#include "core.h"
@@ -100,7 +106,7 @@ componentThread(componentThread),
commandTimeoutMs(commandTimeoutMs), retryDelayMs(retryDelayMs),
smoIp(smoIp), detectedSmoListeningIp(""), smoSubnetNbits(smoSubnetNbits),
dataPort(dataPort), cmdPort(cmdPort), imuPort(imuPort),
heartbeatActive(false),
heartbeatTimer(componentThread->getIoContext()),
pcloudDataActive(false)
{
}
@@ -112,8 +118,6 @@ Device::~Device()
if (pcloudDataActive.load()) {
pcloudDataActive.store(false);
}
heartbeatTimer.reset();
}
namespace {
@@ -1080,6 +1084,148 @@ static void discardHeartbeatAck(
<< std::hex << ack.ack_msg << std::dec << std::endl;
}
namespace {
constexpr long DEVICE_HEARTBEAT_PERIOD_MS = 1000;
long computeTimesliceResidueMs(long workDurationMs, long periodMs)
{
if (workDurationMs >= periodMs) {
return 0;
}
return periodMs - workDurationMs;
}
long durationMsSince(
const std::chrono::high_resolution_clock::time_point &startStamp,
const std::chrono::high_resolution_clock::time_point &endStamp)
{
const auto duration = endStamp - startStamp;
return std::chrono::duration_cast<std::chrono::milliseconds>(
duration).count();
}
void logDeviceCDaemonException(std::exception_ptr &exceptionPtr)
{
sscl::co::NonViralCompletion nvc(exceptionPtr);
if (!nvc.hasException()) {
return;
}
try {
nvc.checkAndRethrowException();
} catch (const std::exception &e) {
std::cerr << "Device: deviceCDaemon: "
<< e.what() << std::endl;
}
}
} // namespace
void Device::sendHeartbeatOnce()
{
if (discoveredDevice.ipAddr.empty())
{
throw std::runtime_error(
std::string(__func__)
+ ": Ending heartbeat loop due to "
"discoveredDevice.ipAddr.empty().");
}
// Get the command endpoint from the UdpCommandDemuxer
auto& protoState = livoxProto1::getProtoState();
if (!protoState.deviceManager)
{
throw std::runtime_error(
std::string(__func__) + ": No device manager available");
}
auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer
.getCmdEndpointFdDesc();
if (!cmdEndpointFdDesc)
{
throw std::runtime_error(
std::string(__func__) + ": No command endpoint available");
}
comms::HeartbeatMessage heartbeatMsg;
heartbeatMsg.swapContentsToProtocolEndianness();
heartbeatMsg.header.setCrc16FromRawBytes();
heartbeatMsg.header.swapCrc16ToProtocolEndianness();
heartbeatMsg.footer.crc_32 = heartbeatMsg.calculateCrc32();
heartbeatMsg.footer.swapCrc32ToProtocolEndianness();
// Set up destination address for raw socket
struct sockaddr_in deviceAddr;
memset(&deviceAddr, 0, sizeof(deviceAddr));
deviceAddr.sin_family = AF_INET;
deviceAddr.sin_addr.s_addr = inet_addr(discoveredDevice.ipAddr.c_str());
// Heartbeats and commands go to port 65000
deviceAddr.sin_port = htons(65000);
ssize_t bytesSent = sendto(
cmdEndpointFdDesc->native_handle(),
&heartbeatMsg, sizeof(heartbeatMsg), 0,
(struct sockaddr*)&deviceAddr, sizeof(deviceAddr));
if (bytesSent < 0)
{
throw std::runtime_error(
std::string("[") + __func__ + "] Failed to send heartbeat: "
+ strerror(errno));
}
}
sscl::co::DynamicNonViralPostingInvoker
Device::deviceCDaemon(
sscl::co::ExplicitPostTarget, std::exception_ptr &, std::function<void()>,
sscl::SyncCancelerForAsyncWork &canceler)
{
const long heartbeatPeriodMs = DEVICE_HEARTBEAT_PERIOD_MS;
while (!canceler.isCancellationRequested())
{
const auto workStartStamp =
std::chrono::high_resolution_clock::now();
try {
if (!canceler.execUncancelableSegmentOrAbort([&]() {
sendHeartbeatOnce();
})) {
break;
}
} catch (const std::exception &e) {
std::cerr << "deviceCDaemon: heartbeat failed for device "
<< discoveredDevice.deviceIdentifier << ": "
<< e.what() << std::endl;
}
const auto workEndStamp =
std::chrono::high_resolution_clock::now();
const long workDurationMs = durationMsSince(
workStartStamp, workEndStamp);
/** EXPLANATION:
* Schedule next heartbeat in 1 second, per the spec.
*/
const long residueMs = computeTimesliceResidueMs(
workDurationMs, heartbeatPeriodMs);
// Timer was cancelled, which is expected when stopping
const bool expiredNormally = co_await
adapters::boostAsio::getDeadlineTimerAReqAwaiter(
sscl::ComponentThread::getSelf()->getIoContext(),
heartbeatTimer,
boost::posix_time::milliseconds(residueMs));
if (!expiredNormally) {
break;
}
}
co_return;
}
void Device::startHeartbeat()
{
if (!componentThread || discoveredDevice.ipAddr.empty())
@@ -1089,139 +1235,42 @@ void Device::startHeartbeat()
": Can't start heartbeat without component thread or IP");
}
// Register heartbeat ACK handler (cmd_set=0x00, cmd_id=0x03)
sscl::SpinLock::Guard lock(heartbeatActiveLock);
if (daemonNursery.admissionIsOpen()) {
return;
}
// Register heartbeat ACK handler (cmd_set=0x00, cmd_id=0x03)
registerUdpCommandHandler(
0x00, 0x03, discardHeartbeatAck, discoveredDevice.ipAddr);
// Create heartbeat timer
heartbeatTimer = std::make_unique<boost::asio::deadline_timer>(
componentThread->getIoContext());
heartbeatActive.store(true);
daemonNursery.openAdmission();
// Send first heartbeat immediately
sendHeartbeat();
daemonNursery.launch(
[this](sscl::co::NonViralTaskNursery::Slot::Lease &lease)
{
return deviceCDaemon(
sscl::co::ExplicitPostTarget{
componentThread->getIoContext()},
lease.getExceptionStorage(),
lease.getCallerLambda(),
lease.getSyncCanceler());
},
logDeviceCDaemonException);
}
void Device::stopHeartbeat()
{
{
sscl::SpinLock::Guard lock(heartbeatActiveLock);
unregisterUdpCommandHandler(0x00, 0x03, discoveredDevice.ipAddr);
heartbeatActive.store(false);
unregisterUdpCommandHandler(0x00, 0x03, discoveredDevice.ipAddr);
}
if (heartbeatTimer) {
heartbeatTimer->cancel();
heartbeatTimer.reset();
}
}
void Device::sendHeartbeat()
{
if (!heartbeatActive.load())
{
std::cerr << __func__ << ": Ending heartbeat loop due to "
"heartbeatActive==false.\n";
if (!daemonNursery.admissionIsOpen()) {
return;
}
if (discoveredDevice.ipAddr.empty())
{
std::cerr << __func__ << ": Ending heartbeat loop due to "
"discoveredDevice.ipAddr.empty().\n";
return;
}
// Get the command endpoint from the UdpCommandDemuxer
auto& protoState = livoxProto1::getProtoState();
if (!protoState.deviceManager)
{
std::cerr << __func__ << ": No device manager available\n";
return;
}
auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer
.getCmdEndpointFdDesc();
if (!cmdEndpointFdDesc)
{
std::cerr << __func__ << ": No command endpoint available\n";
return;
}
try {
comms::HeartbeatMessage heartbeatMsg;
heartbeatMsg.swapContentsToProtocolEndianness();
heartbeatMsg.header.setCrc16FromRawBytes();
heartbeatMsg.header.swapCrc16ToProtocolEndianness();
heartbeatMsg.footer.crc_32 = heartbeatMsg.calculateCrc32();
heartbeatMsg.footer.swapCrc32ToProtocolEndianness();
// Set up destination address for raw socket
struct sockaddr_in deviceAddr;
memset(&deviceAddr, 0, sizeof(deviceAddr));
deviceAddr.sin_family = AF_INET;
deviceAddr.sin_addr.s_addr = inet_addr(discoveredDevice.ipAddr.c_str());
// Heartbeats and commands go to port 65000
deviceAddr.sin_port = htons(65000);
ssize_t bytesSent = sendto(
cmdEndpointFdDesc->native_handle(),
&heartbeatMsg, sizeof(heartbeatMsg), 0,
(struct sockaddr*)&deviceAddr, sizeof(deviceAddr));
if (bytesSent < 0)
{
std::cerr << "[" << __func__ << "] Failed to send heartbeat: "
<< strerror(errno) << std::endl;
return;
}
/** EXPLANATION:
* Schedule next heartbeat in 1 second, per the spec.
*/
heartbeatTimer->expires_from_now(boost::posix_time::seconds(1));
heartbeatTimer->async_wait(
[this](const boost::system::error_code& error) {
onHeartbeatTimer(error);
}
);
}
catch (const std::exception& e)
{
std::cerr << __func__ << ": Heartbeat send failed for device "
<< discoveredDevice.deviceIdentifier
<< ": " << e.what() << std::endl;
}
}
void Device::onHeartbeatTimer(const boost::system::error_code& error)
{
// Timer was cancelled, heartbeat stopped
if (error == boost::asio::error::operation_aborted) {
return;
}
if (error)
{
std::cerr << "[" << __func__ << "] Heartbeat timer error for device "
<< discoveredDevice.deviceIdentifier
<< ": " << error.message() << std::endl;
return;
}
// Send next heartbeat
{
sscl::SpinLock::Guard lock(heartbeatActiveLock);
if (!heartbeatActive.load())
{ return; }
sendHeartbeat();
}
heartbeatTimer.cancel();
daemonNursery.requestCancelOnAll();
daemonNursery.closeAdmission();
daemonNursery.syncAwaitAllSettlements(
sscl::ComponentThread::getSelf()->getIoContext());
}
uint32_t Device::getSubnetMaskFor(uint8_t nbits)