From f9c64cf363d53bd3c18285ee0cc741569ceb0aa9 Mon Sep 17 00:00:00 2001 From: Hayodea Hekol Date: Wed, 10 Jun 2026 07:02:07 -0400 Subject: [PATCH] livoxProto1: Convert heartbeat sender into daemon coro --- commonLibs/livoxProto1/device.cpp | 295 +++++++++++++++++------------- commonLibs/livoxProto1/device.h | 29 ++- 2 files changed, 194 insertions(+), 130 deletions(-) diff --git a/commonLibs/livoxProto1/device.cpp b/commonLibs/livoxProto1/device.cpp index 3074d1f..a50adc8 100644 --- a/commonLibs/livoxProto1/device.cpp +++ b/commonLibs/livoxProto1/device.cpp @@ -1,6 +1,9 @@ +#include + #include #include #include +#include #include #include #include @@ -15,7 +18,10 @@ #include #include #include +#include +#include #include +#include #include "device.h" #include "protocol.h" #include "core.h" @@ -100,7 +106,7 @@ componentThread(componentThread), commandTimeoutMs(commandTimeoutMs), retryDelayMs(retryDelayMs), smoIp(smoIp), detectedSmoListeningIp(""), smoSubnetNbits(smoSubnetNbits), dataPort(dataPort), cmdPort(cmdPort), imuPort(imuPort), -heartbeatActive(false), +heartbeatTimer(componentThread->getIoContext()), pcloudDataActive(false) { } @@ -112,8 +118,6 @@ Device::~Device() if (pcloudDataActive.load()) { pcloudDataActive.store(false); } - - heartbeatTimer.reset(); } namespace { @@ -1080,6 +1084,148 @@ static void discardHeartbeatAck( << std::hex << ack.ack_msg << std::dec << std::endl; } +namespace { + +constexpr long DEVICE_HEARTBEAT_PERIOD_MS = 1000; + +long computeTimesliceResidueMs(long workDurationMs, long periodMs) +{ + if (workDurationMs >= periodMs) { + return 0; + } + return periodMs - workDurationMs; +} + +long durationMsSince( + const std::chrono::high_resolution_clock::time_point &startStamp, + const std::chrono::high_resolution_clock::time_point &endStamp) +{ + const auto duration = endStamp - startStamp; + return std::chrono::duration_cast( + duration).count(); +} + +void logDeviceCDaemonException(std::exception_ptr &exceptionPtr) +{ + sscl::co::NonViralCompletion nvc(exceptionPtr); + if (!nvc.hasException()) { + return; + } + + try { + nvc.checkAndRethrowException(); + } catch (const std::exception &e) { + std::cerr << "Device: deviceCDaemon: " + << e.what() << std::endl; + } +} + +} // namespace + +void Device::sendHeartbeatOnce() +{ + if (discoveredDevice.ipAddr.empty()) + { + throw std::runtime_error( + std::string(__func__) + + ": Ending heartbeat loop due to " + "discoveredDevice.ipAddr.empty()."); + } + + // Get the command endpoint from the UdpCommandDemuxer + auto& protoState = livoxProto1::getProtoState(); + if (!protoState.deviceManager) + { + throw std::runtime_error( + std::string(__func__) + ": No device manager available"); + } + + auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer + .getCmdEndpointFdDesc(); + if (!cmdEndpointFdDesc) + { + throw std::runtime_error( + std::string(__func__) + ": No command endpoint available"); + } + + comms::HeartbeatMessage heartbeatMsg; + heartbeatMsg.swapContentsToProtocolEndianness(); + heartbeatMsg.header.setCrc16FromRawBytes(); + heartbeatMsg.header.swapCrc16ToProtocolEndianness(); + heartbeatMsg.footer.crc_32 = heartbeatMsg.calculateCrc32(); + heartbeatMsg.footer.swapCrc32ToProtocolEndianness(); + + // Set up destination address for raw socket + struct sockaddr_in deviceAddr; + memset(&deviceAddr, 0, sizeof(deviceAddr)); + deviceAddr.sin_family = AF_INET; + deviceAddr.sin_addr.s_addr = inet_addr(discoveredDevice.ipAddr.c_str()); + // Heartbeats and commands go to port 65000 + deviceAddr.sin_port = htons(65000); + + ssize_t bytesSent = sendto( + cmdEndpointFdDesc->native_handle(), + &heartbeatMsg, sizeof(heartbeatMsg), 0, + (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); + + if (bytesSent < 0) + { + throw std::runtime_error( + std::string("[") + __func__ + "] Failed to send heartbeat: " + + strerror(errno)); + } +} + +sscl::co::DynamicNonViralPostingInvoker +Device::deviceCDaemon( + sscl::co::ExplicitPostTarget, std::exception_ptr &, std::function, + sscl::SyncCancelerForAsyncWork &canceler) +{ + const long heartbeatPeriodMs = DEVICE_HEARTBEAT_PERIOD_MS; + + while (!canceler.isCancellationRequested()) + { + const auto workStartStamp = + std::chrono::high_resolution_clock::now(); + + try { + if (!canceler.execUncancelableSegmentOrAbort([&]() { + sendHeartbeatOnce(); + })) { + break; + } + } catch (const std::exception &e) { + std::cerr << "deviceCDaemon: heartbeat failed for device " + << discoveredDevice.deviceIdentifier << ": " + << e.what() << std::endl; + } + + const auto workEndStamp = + std::chrono::high_resolution_clock::now(); + const long workDurationMs = durationMsSince( + workStartStamp, workEndStamp); + + /** EXPLANATION: + * Schedule next heartbeat in 1 second, per the spec. + */ + const long residueMs = computeTimesliceResidueMs( + workDurationMs, heartbeatPeriodMs); + + // Timer was cancelled, which is expected when stopping + const bool expiredNormally = co_await + adapters::boostAsio::getDeadlineTimerAReqAwaiter( + sscl::ComponentThread::getSelf()->getIoContext(), + heartbeatTimer, + boost::posix_time::milliseconds(residueMs)); + + if (!expiredNormally) { + break; + } + } + + co_return; +} + void Device::startHeartbeat() { if (!componentThread || discoveredDevice.ipAddr.empty()) @@ -1089,139 +1235,42 @@ void Device::startHeartbeat() ": Can't start heartbeat without component thread or IP"); } - // Register heartbeat ACK handler (cmd_set=0x00, cmd_id=0x03) - sscl::SpinLock::Guard lock(heartbeatActiveLock); + if (daemonNursery.admissionIsOpen()) { + return; + } + // Register heartbeat ACK handler (cmd_set=0x00, cmd_id=0x03) registerUdpCommandHandler( 0x00, 0x03, discardHeartbeatAck, discoveredDevice.ipAddr); - // Create heartbeat timer - heartbeatTimer = std::make_unique( - componentThread->getIoContext()); - - heartbeatActive.store(true); - + daemonNursery.openAdmission(); // Send first heartbeat immediately - sendHeartbeat(); + daemonNursery.launch( + [this](sscl::co::NonViralTaskNursery::Slot::Lease &lease) + { + return deviceCDaemon( + sscl::co::ExplicitPostTarget{ + componentThread->getIoContext()}, + lease.getExceptionStorage(), + lease.getCallerLambda(), + lease.getSyncCanceler()); + }, + logDeviceCDaemonException); } void Device::stopHeartbeat() { - { - sscl::SpinLock::Guard lock(heartbeatActiveLock); + unregisterUdpCommandHandler(0x00, 0x03, discoveredDevice.ipAddr); - heartbeatActive.store(false); - unregisterUdpCommandHandler(0x00, 0x03, discoveredDevice.ipAddr); - } - - if (heartbeatTimer) { - heartbeatTimer->cancel(); - heartbeatTimer.reset(); - } -} - -void Device::sendHeartbeat() -{ - if (!heartbeatActive.load()) - { - std::cerr << __func__ << ": Ending heartbeat loop due to " - "heartbeatActive==false.\n"; + if (!daemonNursery.admissionIsOpen()) { return; } - if (discoveredDevice.ipAddr.empty()) - { - std::cerr << __func__ << ": Ending heartbeat loop due to " - "discoveredDevice.ipAddr.empty().\n"; - return; - } - - // Get the command endpoint from the UdpCommandDemuxer - auto& protoState = livoxProto1::getProtoState(); - if (!protoState.deviceManager) - { - std::cerr << __func__ << ": No device manager available\n"; - return; - } - - auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer - .getCmdEndpointFdDesc(); - if (!cmdEndpointFdDesc) - { - std::cerr << __func__ << ": No command endpoint available\n"; - return; - } - - try { - comms::HeartbeatMessage heartbeatMsg; - heartbeatMsg.swapContentsToProtocolEndianness(); - heartbeatMsg.header.setCrc16FromRawBytes(); - heartbeatMsg.header.swapCrc16ToProtocolEndianness(); - heartbeatMsg.footer.crc_32 = heartbeatMsg.calculateCrc32(); - heartbeatMsg.footer.swapCrc32ToProtocolEndianness(); - - // Set up destination address for raw socket - struct sockaddr_in deviceAddr; - memset(&deviceAddr, 0, sizeof(deviceAddr)); - deviceAddr.sin_family = AF_INET; - deviceAddr.sin_addr.s_addr = inet_addr(discoveredDevice.ipAddr.c_str()); - // Heartbeats and commands go to port 65000 - deviceAddr.sin_port = htons(65000); - - ssize_t bytesSent = sendto( - cmdEndpointFdDesc->native_handle(), - &heartbeatMsg, sizeof(heartbeatMsg), 0, - (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); - - if (bytesSent < 0) - { - std::cerr << "[" << __func__ << "] Failed to send heartbeat: " - << strerror(errno) << std::endl; - return; - } - - /** EXPLANATION: - * Schedule next heartbeat in 1 second, per the spec. - */ - heartbeatTimer->expires_from_now(boost::posix_time::seconds(1)); - heartbeatTimer->async_wait( - [this](const boost::system::error_code& error) { - onHeartbeatTimer(error); - } - ); - } - catch (const std::exception& e) - { - std::cerr << __func__ << ": Heartbeat send failed for device " - << discoveredDevice.deviceIdentifier - << ": " << e.what() << std::endl; - } -} - -void Device::onHeartbeatTimer(const boost::system::error_code& error) -{ - // Timer was cancelled, heartbeat stopped - if (error == boost::asio::error::operation_aborted) { - return; - } - - if (error) - { - std::cerr << "[" << __func__ << "] Heartbeat timer error for device " - << discoveredDevice.deviceIdentifier - << ": " << error.message() << std::endl; - - return; - } - - // Send next heartbeat - { - sscl::SpinLock::Guard lock(heartbeatActiveLock); - if (!heartbeatActive.load()) - { return; } - - sendHeartbeat(); - } + heartbeatTimer.cancel(); + daemonNursery.requestCancelOnAll(); + daemonNursery.closeAdmission(); + daemonNursery.syncAwaitAllSettlements( + sscl::ComponentThread::getSelf()->getIoContext()); } uint32_t Device::getSubnetMaskFor(uint8_t nbits) diff --git a/commonLibs/livoxProto1/device.h b/commonLibs/livoxProto1/device.h index 2ac291f..271570f 100644 --- a/commonLibs/livoxProto1/device.h +++ b/commonLibs/livoxProto1/device.h @@ -1,6 +1,8 @@ #ifndef LIVOX_PROTO1_DEVICE_H #define LIVOX_PROTO1_DEVICE_H +#include + #include #include #include @@ -17,8 +19,10 @@ #include #include #include "protocol.h" +#include #include -#include +#include +#include // Custom hash function for std::pair namespace std { @@ -86,8 +90,20 @@ private: // Heartbeat mechanism void startHeartbeat(); void stopHeartbeat(); - void sendHeartbeat(); - void onHeartbeatTimer(const boost::system::error_code& error); + void sendHeartbeatOnce(); + + /** EXPLANATION: + * deviceCDaemon is a dynamic posting non-viral coroutine: startHeartbeat() + * passes ExplicitPostTarget{componentThread->getIoContext()} so the daemon + * body always runs on componentThread. Extensible for future per-device + * background work beyond heartbeats. + */ + sscl::co::DynamicNonViralPostingInvoker deviceCDaemon( + sscl::co::ExplicitPostTarget postTarget, + std::exception_ptr &exceptionPtr, + std::function callback, + sscl::SyncCancelerForAsyncWork &canceler); + std::string generateClientDeviceIpFromSerialNumber( const std::string& broadcastCode); @@ -172,10 +188,9 @@ public: uint8_t smoSubnetNbits; uint16_t dataPort, cmdPort, imuPort; - // Heartbeat state - std::unique_ptr heartbeatTimer; - std::atomic heartbeatActive; - sscl::SpinLock heartbeatActiveLock; + // Heartbeat state (timer lifetime tied to Device ctor/dtor) + boost::asio::deadline_timer heartbeatTimer; + sscl::co::NonViralTaskNursery daemonNursery; // Point cloud data state std::atomic pcloudDataActive;