From facb6652177f1cf39d6bf52aeb2416cda22fd04a Mon Sep 17 00:00:00 2001 From: Hayodea Hekol Date: Wed, 10 Jun 2026 05:41:39 -0400 Subject: [PATCH] BroadcastListener: Port to use nurseries and CDaemon pattern --- commonLibs/livoxProto1/broadcastListener.cpp | 386 +++++++++++++----- commonLibs/livoxProto1/broadcastListener.h | 74 +++- .../adapters/boostAsio/udpReceiveFromAReq.h | 113 +++++ 3 files changed, 458 insertions(+), 115 deletions(-) create mode 100644 include/adapters/boostAsio/udpReceiveFromAReq.h diff --git a/commonLibs/livoxProto1/broadcastListener.cpp b/commonLibs/livoxProto1/broadcastListener.cpp index 8163e64..e4e650b 100644 --- a/commonLibs/livoxProto1/broadcastListener.cpp +++ b/commonLibs/livoxProto1/broadcastListener.cpp @@ -1,15 +1,163 @@ +#include +#include #include +#include #include #include #include #include +#include +#include #include "broadcastListener.h" #include "core.h" +#include "protocol.h" namespace livoxProto1 { namespace comms { +namespace { + +bool isShutdownReceiveError(const boost::system::error_code &ec) +{ + return ec == boost::asio::error::operation_aborted + || ec == boost::asio::error::bad_descriptor; +} + +void logEventHandlerException(std::exception_ptr &exceptionPtr) +{ + sscl::co::NonViralCompletion nvc(exceptionPtr); + if (!nvc.hasException()) { + return; + } + + try { + nvc.checkAndRethrowException(); + } catch (const std::exception &e) { + std::cerr << "BroadcastListener: event handler: " + << e.what() << std::endl; + } +} + +void logReceiveDaemonException(std::exception_ptr &exceptionPtr) +{ + sscl::co::NonViralCompletion nvc(exceptionPtr); + if (!nvc.hasException()) { + return; + } + + try { + nvc.checkAndRethrowException(); + } catch (const std::exception &e) { + std::cerr << "BroadcastListener: receive daemon: " + << e.what() << std::endl; + } +} + +bool parseAndValidateBroadcastMessage( + const std::array &bytes, + std::size_t nbytes, + BroadcastMessage &msgOut) +{ + if (nbytes < sizeof(BroadcastMessage)) + { + std::cerr << "broadcastMsgIndCInd" + << ": Received packet too small: " << nbytes + << " bytes (expected at least " + << sizeof(BroadcastMessage) << ")" << std::endl; + return false; + } + + std::memcpy(&msgOut, bytes.data(), sizeof(BroadcastMessage)); + + // Following the clean receiving flow: + // 1. Swap CRC32 to host endianness first + msgOut.footer.swapCrc32ToHostEndianness(); + // 2. Validate CRC32 (on whole message excluding footer CRC32 field) + if (!msgOut.validateCrc32()) + { + std::cerr << "broadcastMsgIndCInd" + << ": Broadcast message failed CRC32 validation" + << std::endl; + return false; + } + + // 3. Swap CRC16 to host endianness + msgOut.header.swapCrc16ToHostEndianness(); + // 4. Validate CRC16 (on header only) + if (!msgOut.header.validateCrc16()) + { + std::cerr << "broadcastMsgIndCInd" + << ": Broadcast message failed CRC16 validation" + << std::endl; + return false; + } + + // 5. Swap content to host endianness + msgOut.swapContentsToHostEndianness(); + // 6. Validate message sanity + if (!msgOut.sanityCheck()) + { + std::cerr << "broadcastMsgIndCInd" + << ": Broadcast message failed sanity check" + << std::endl; + return false; + } + + return true; +} + +/** RAII: open eventHandlerNursery at CDaemon entry; drain at exit. */ +class EventHandlerNurseryScopeGuard +{ +public: + explicit EventHandlerNurseryScopeGuard( + sscl::co::NonViralTaskNursery &nursery) + : nursery(nursery) + { + nursery.openAdmission(); + } + + ~EventHandlerNurseryScopeGuard() + { + nursery.closeAdmission(); + nursery.syncAwaitAllSettlements( + sscl::ComponentThread::getSelf()->getIoContext()); + } + + EventHandlerNurseryScopeGuard( + const EventHandlerNurseryScopeGuard &) = delete; + EventHandlerNurseryScopeGuard &operator=( + const EventHandlerNurseryScopeGuard &) = delete; + +private: + sscl::co::NonViralTaskNursery &nursery; +}; + +} // namespace + +BroadcastListener::BroadcastIndPayload::BroadcastIndPayload( + const uint8_t *receiveBytes, std::size_t receiveNbytes, + const boost::asio::ip::udp::endpoint &endpoint) +: nbytes(receiveNbytes), +senderEndpoint(endpoint) +{ + if (receiveNbytes > 0) { + std::memcpy(bytes.data(), receiveBytes, receiveNbytes); + } +} + +BroadcastListener::BroadcastIndPayload::BroadcastIndPayload( + BroadcastIndPayload &&other) noexcept +: nbytes(other.nbytes), +senderEndpoint(std::move(other.senderEndpoint)) +{ + if (nbytes > 0) { + std::memcpy(bytes.data(), other.bytes.data(), nbytes); + } + other.nbytes = 0; +} + BroadcastListener::BroadcastListener( const std::shared_ptr& componentThread, uint16_t listeningPort, uint16_t connectPort @@ -19,110 +167,143 @@ listeningPort(listeningPort), connectPort(connectPort), deviceGoneAwayCb(nullptr), socket(componentThread->getIoContext()), -listeningEndpoint(boost::asio::ip::udp::v4(), listeningPort), -isListening(false) +listeningEndpoint(boost::asio::ip::udp::v4(), listeningPort) { } std::shared_ptr BroadcastListener::getDevice(const std::string &deviceIdentifier) const { - auto it = std::find_if(discoveredDevices.begin(), discoveredDevices.end(), + sscl::SpinLock::Guard lock(discoveredDevices.lock); + + auto it = std::find_if( + discoveredDevices.rsrc.devices.begin(), + discoveredDevices.rsrc.devices.end(), [&deviceIdentifier](const std::shared_ptr& device) { return comms::deviceIdentifiersEqual( device->deviceIdentifier, deviceIdentifier); } ); - return it != discoveredDevices.end() ? *it : nullptr; + return it != discoveredDevices.rsrc.devices.end() ? *it : nullptr; } -void BroadcastListener::broadcastMsgInd( - const boost::system::error_code& ec, std::size_t bytes_received) +void BroadcastListener::registerDiscoveredDevice( + const BroadcastMessage &msg, const std::string &senderIp) { - if (ec) - { - std::cerr << __func__ << ": Error receiving broadcast message: " - << ec.message() << std::endl; - return; - } - - if (bytes_received < sizeof(BroadcastMessage)) - { - std::cerr << __func__ - << ": Received packet too small: " << bytes_received - << " bytes (expected at least " - << sizeof(BroadcastMessage) << ")" << std::endl; - return; - } - - // Use placement new to construct BroadcastMessage in the buffer - BroadcastMessage* msg = new (bcastMsgRecvBuffer) BroadcastMessage; - - // Following the clean receiving flow: - // 1. Swap CRC32 to host endianness first - msg->footer.swapCrc32ToHostEndianness(); - // 2. Validate CRC32 (on whole message excluding footer CRC32 field) - if (!msg->validateCrc32()) - { - std::cerr << __func__ - << ": Broadcast message failed CRC32 validation" << std::endl; - return; - } - - // 3. Swap CRC16 to host endianness - msg->header.swapCrc16ToHostEndianness(); - // 4. Validate CRC16 (on header only) - if (!msg->header.validateCrc16()) - { - std::cerr << __func__ - << ": Broadcast message failed CRC16 validation" << std::endl; - return; - } - // 5. Swap content to host endianness - msg->swapContentsToHostEndianness(); - // 6. Validate message sanity - if (!msg->sanityCheck()) - { - std::cerr << __func__ - << ": Broadcast message failed sanity check" << std::endl; - return; - } - - // Extract device information - std::string senderIP = senderEndpoint.address().to_string(); std::string broadcastCode( - reinterpret_cast(msg->broadcast_code)); + reinterpret_cast(msg.broadcast_code)); + + sscl::SpinLock::Guard lock(discoveredDevices.lock); // Early return if device already exists - sscl::SpinLock::Guard lock(isListeningLock); + const auto existingIt = std::find_if( + discoveredDevices.rsrc.devices.begin(), + discoveredDevices.rsrc.devices.end(), + [&broadcastCode](const std::shared_ptr &device) { + return comms::deviceIdentifiersEqual( + device->deviceIdentifier, broadcastCode); + }); - if (deviceExists(broadcastCode)) + if (existingIt != discoveredDevices.rsrc.devices.end()) { - // Device already exists, just log the update if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose) { - std::cout << __func__ + std::cout << "broadcastMsgIndCInd" << ": Received broadcast from known device: " - << broadcastCode << " at " << senderIP << "\n"; + << broadcastCode << " at " << senderIp << "\n"; } - } - else - { - // Create new DiscoveredDevice using conversion constructor - auto device = std::make_shared(*msg, senderIP); - discoveredDevices.push_back(device); - // Output device information using stringify - std::cout << __func__ << ": Discovered new Livox device: " - << device->stringify() << "\n"; + return; } - startReceive(); + // Create new DiscoveredDevice using conversion constructor + auto device = std::make_shared(msg, senderIp); + discoveredDevices.rsrc.devices.push_back(device); + std::cout << "broadcastMsgIndCInd: Discovered new Livox device: " + << device->stringify() << "\n"; +} + +sscl::co::NonViralNonPostingInvoker +BroadcastListener::broadcastMsgIndCInd( + [[maybe_unused]] std::exception_ptr &exceptionPtr, + [[maybe_unused]] std::function callback, + [[maybe_unused]] sscl::SyncCancelerForAsyncWork &canceler, + BroadcastIndPayload payload) +{ + BroadcastMessage msg; + if (!parseAndValidateBroadcastMessage( + payload.bytes, payload.nbytes, msg)) { + co_return; + } + + const std::string senderIp = payload.senderEndpoint.address().to_string(); + registerDiscoveredDevice(msg, senderIp); + + co_return; +} + +sscl::co::DynamicNonViralPostingInvoker +BroadcastListener::broadcastReceiveCDaemon( + [[maybe_unused]] sscl::co::ExplicitPostTarget postTarget, + [[maybe_unused]] std::exception_ptr &exceptionPtr, + [[maybe_unused]] std::function callback, + sscl::SyncCancelerForAsyncWork &canceler) +{ + EventHandlerNurseryScopeGuard eventHandlerScope(eventHandlerNursery); + + boost::asio::io_context &ioContext = + sscl::ComponentThread::getSelf()->getIoContext(); + + std::array receiveBuffer{}; + boost::asio::ip::udp::endpoint senderEndpoint; + + while (!canceler.isCancellationRequested()) + { + const adapters::boostAsio::UdpReceiveFromAReq::Result receiveResult = + co_await adapters::boostAsio::getUdpReceiveFromAReqAwaiter( + ioContext, socket, + boost::asio::buffer(receiveBuffer), senderEndpoint); + + if (isShutdownReceiveError(receiveResult.ec)) { break; } + + if (receiveResult.ec) + { + std::cerr << __func__ + << ": Error receiving broadcast message: " + << receiveResult.ec.message() << std::endl; + continue; + } + + if (receiveResult.nbytes == 0) { continue; } + + if (!canceler.execUncancelableSegmentOrAbort([&]() + { + eventHandlerNursery.launch( + [this, + payload = BroadcastIndPayload( + receiveBuffer.data(), + receiveResult.nbytes, + senderEndpoint)]( + sscl::co::NonViralTaskNursery::Slot::Lease &lease) mutable + { + return broadcastMsgIndCInd( + lease.getExceptionStorage(), + lease.getCallerLambda(), + lease.getSyncCanceler(), + std::move(payload)); + }, + logEventHandlerException); + })) { + break; + } + } + + co_return; } void BroadcastListener::start(void) { - if (isListening) { return; } + if (daemonNursery.admissionIsOpen()) { return; } try { @@ -134,62 +315,57 @@ void BroadcastListener::start(void) * We should also set up a timer to check for devices that have gone * away. */ - { - sscl::SpinLock::Guard lock(isListeningLock); + socket.open(boost::asio::ip::udp::v4()); + socket.bind(listeningEndpoint); - socket.open(boost::asio::ip::udp::v4()); - socket.bind(listeningEndpoint); + daemonNursery.openAdmission(); + // Launch the receive daemon coroutine on componentThread + daemonNursery.launch( + [this](sscl::co::NonViralTaskNursery::Slot::Lease &lease) + { + return broadcastReceiveCDaemon( + sscl::co::ExplicitPostTarget{ + componentThread->getIoContext()}, + lease.getExceptionStorage(), + lease.getCallerLambda(), + lease.getSyncCanceler()); + }, + logReceiveDaemonException); - isListening = true; - } - - // Start the first async receive operation - startReceive(); std::cout << __func__ << ": BroadcastListener started on port " << listeningPort << std::endl; } catch (const boost::system::system_error& e) { - isListening = false; std::cerr << __func__ << ": Failed to start BroadcastListener: " << e.what() << std::endl; throw; } } -void BroadcastListener::startReceive(void) -{ - if (!isListening) { return; } - - socket.async_receive_from( - boost::asio::buffer(bcastMsgRecvBuffer, sizeof(bcastMsgRecvBuffer)), - senderEndpoint, - std::bind( - &BroadcastListener::broadcastMsgInd, this, - std::placeholders::_1, std::placeholders::_2) - ); -} - void BroadcastListener::stop(void) { - { - sscl::SpinLock::Guard lock(isListeningLock); - if (!isListening) { return; } + if (!daemonNursery.admissionIsOpen()) { return; } - isListening = false; - } + daemonNursery.requestCancelOnAll(); try { + socket.cancel(); socket.close(); - std::cout << __func__ << ": BroadcastListener stopped" << std::endl; } catch (const boost::system::system_error& e) { - std::cerr << __func__ << ": Error stopping BroadcastListener: " << e.what() - << std::endl; + std::cerr << __func__ << ": Error stopping BroadcastListener: " + << e.what() << std::endl; throw; } + + daemonNursery.closeAdmission(); + daemonNursery.syncAwaitAllSettlements( + sscl::ComponentThread::getSelf()->getIoContext()); + + std::cout << __func__ << ": BroadcastListener stopped" << std::endl; } } // namespace comms diff --git a/commonLibs/livoxProto1/broadcastListener.h b/commonLibs/livoxProto1/broadcastListener.h index 5796544..a05a4f9 100644 --- a/commonLibs/livoxProto1/broadcastListener.h +++ b/commonLibs/livoxProto1/broadcastListener.h @@ -1,13 +1,20 @@ #ifndef BROADCAST_LISTENER_H #define BROADCAST_LISTENER_H +#include + +#include #include #include #include -#include +#include #include #include #include +#include +#include +#include +#include #include "device.h" namespace livoxProto1 { @@ -46,11 +53,50 @@ public: void start(void); void stop(void); - void broadcastMsgInd( - const boost::system::error_code& ec, std::size_t bytes_received); - private: - void startReceive(void); + /** Per-datagram snapshot; bytes are copied in ctor/move so handlers remain + * safe if broadcastMsgIndCInd is ever posted asynchronously. + */ + struct BroadcastIndPayload + { + std::array bytes{}; + std::size_t nbytes = 0; + boost::asio::ip::udp::endpoint senderEndpoint; + + BroadcastIndPayload() = default; + + BroadcastIndPayload( + const uint8_t *receiveBytes, std::size_t receiveNbytes, + const boost::asio::ip::udp::endpoint &endpoint); + + BroadcastIndPayload(BroadcastIndPayload &&other) noexcept; + + BroadcastIndPayload(const BroadcastIndPayload &) = delete; + BroadcastIndPayload &operator=(const BroadcastIndPayload &) = delete; + BroadcastIndPayload &operator=(BroadcastIndPayload &&) = delete; + }; + + /** EXPLANATION: + * broadcastReceiveCDaemon is a dynamic posting non-viral coroutine: start() + * passes ExplicitPostTarget{componentThread->getIoContext()} so the daemon + * body always runs on componentThread. Synchronous listener state is + * touched only inside canceler.execUncancelableSegmentOrAbort() so stop() + * cannot tear down *this while the daemon is in a critical section. + */ + sscl::co::DynamicNonViralPostingInvoker broadcastReceiveCDaemon( + sscl::co::ExplicitPostTarget postTarget, + std::exception_ptr &exceptionPtr, + std::function callback, + sscl::SyncCancelerForAsyncWork &canceler); + + sscl::co::NonViralNonPostingInvoker broadcastMsgIndCInd( + std::exception_ptr &exceptionPtr, + std::function callback, + sscl::SyncCancelerForAsyncWork &canceler, + BroadcastIndPayload payload); + + void registerDiscoveredDevice( + const BroadcastMessage &msg, const std::string &senderIp); private: std::shared_ptr componentThread; @@ -63,14 +109,22 @@ private: */ uint16_t listeningPort, connectPort; DeviceGoneAwayCbFn *deviceGoneAwayCb; - std::vector> discoveredDevices; + + struct DiscoveredDevicesResources + { + std::vector> devices; + }; + + mutable sscl::SharedResourceGroup< + sscl::SpinLock, DiscoveredDevicesResources> discoveredDevices; boost::asio::ip::udp::socket socket; - boost::asio::ip::udp::endpoint listeningEndpoint, senderEndpoint; - sscl::SpinLock isListeningLock; - bool isListening; + boost::asio::ip::udp::endpoint listeningEndpoint; - uint8_t bcastMsgRecvBuffer[UDP_BCAST_MSG_BUFFER_NBYTES]; + /** Hosts broadcastReceiveCDaemon; stop() cancels and syncAwaitAll's it. */ + sscl::co::NonViralTaskNursery daemonNursery; + /** Hosts per-datagram broadcastMsgIndCInd; admission/drain RAII inside CDaemon. */ + sscl::co::NonViralTaskNursery eventHandlerNursery; }; } // namespace comms diff --git a/include/adapters/boostAsio/udpReceiveFromAReq.h b/include/adapters/boostAsio/udpReceiveFromAReq.h new file mode 100644 index 0000000..eca42f2 --- /dev/null +++ b/include/adapters/boostAsio/udpReceiveFromAReq.h @@ -0,0 +1,113 @@ +#ifndef ADAPTERS_BOOST_ASIO_UDP_RECEIVE_FROM_AREQ_H +#define ADAPTERS_BOOST_ASIO_UDP_RECEIVE_FROM_AREQ_H + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace adapters::boostAsio { + +/** Coroutine awaiter wrapping boost::asio::ip::udp::socket::async_receive_from. + * + * Resumes on resumeIoContext when the receive completes or fails. + */ +class UdpReceiveFromAReq +{ +public: + struct Result + { + boost::system::error_code ec; + std::size_t nbytes = 0; + }; + + struct AsyncState + { + std::atomic settled{false}; + Result result; + std::coroutine_handle<> callerSchedHandle; + }; + + UdpReceiveFromAReq( + boost::asio::io_context &resumeIoContext, + boost::asio::ip::udp::socket &socket, + const boost::asio::mutable_buffer &buffer, + boost::asio::ip::udp::endpoint &senderEndpoint) + : asyncState(std::make_shared()), + resumeIoContext(resumeIoContext) + { + socket.async_receive_from( + buffer, senderEndpoint, + [this](const boost::system::error_code &error, std::size_t nbytes) + { + onReceiveComplete(error, nbytes); + }); + } + + bool await_ready() const noexcept + { + return asyncState->settled.load(std::memory_order_acquire); + } + + bool await_suspend(std::coroutine_handle<> caller) noexcept + { + if (asyncState->settled.load(std::memory_order_acquire)) { + return false; + } + + asyncState->callerSchedHandle = caller; + return true; + } + + Result await_resume() const noexcept + { + return asyncState->result; + } + +private: + void onReceiveComplete( + const boost::system::error_code &error, std::size_t nbytes) + { + if (asyncState->settled.exchange(true)) { + return; + } + + asyncState->result.ec = error; + asyncState->result.nbytes = nbytes; + signalSettledAndResumeCaller(); + } + + void signalSettledAndResumeCaller() + { + std::coroutine_handle<> handle = asyncState->callerSchedHandle; + if (!handle) { + return; + } + + boost::asio::post(resumeIoContext, handle); + } + + std::shared_ptr asyncState; + boost::asio::io_context &resumeIoContext; +}; + +inline auto getUdpReceiveFromAReqAwaiter( + boost::asio::io_context &resumeIoContext, + boost::asio::ip::udp::socket &socket, + const boost::asio::mutable_buffer &buffer, + boost::asio::ip::udp::endpoint &senderEndpoint) +{ + return UdpReceiveFromAReq( + resumeIoContext, socket, buffer, senderEndpoint); +} + +} // namespace adapters::boostAsio + +#endif // ADAPTERS_BOOST_ASIO_UDP_RECEIVE_FROM_AREQ_H