BroadcastListener: Port to use nurseries and CDaemon pattern

This commit is contained in:
2026-06-10 05:41:39 -04:00
parent 22a4cf283e
commit facb665217
3 changed files with 458 additions and 115 deletions
+281 -105
View File
@@ -1,15 +1,163 @@
#include <boostAsioLinkageFix.h>
#include <array>
#include <algorithm>
#include <cstring>
#include <iostream>
#include <functional>
#include <opts.h>
#include <componentThread.h>
#include <adapters/boostAsio/udpReceiveFromAReq.h>
#include <spinscale/co/nonViralCompletion.h>
#include "broadcastListener.h"
#include "core.h"
#include "protocol.h"
namespace livoxProto1 {
namespace comms {
namespace {
bool isShutdownReceiveError(const boost::system::error_code &ec)
{
return ec == boost::asio::error::operation_aborted
|| ec == boost::asio::error::bad_descriptor;
}
void logEventHandlerException(std::exception_ptr &exceptionPtr)
{
sscl::co::NonViralCompletion nvc(exceptionPtr);
if (!nvc.hasException()) {
return;
}
try {
nvc.checkAndRethrowException();
} catch (const std::exception &e) {
std::cerr << "BroadcastListener: event handler: "
<< e.what() << std::endl;
}
}
void logReceiveDaemonException(std::exception_ptr &exceptionPtr)
{
sscl::co::NonViralCompletion nvc(exceptionPtr);
if (!nvc.hasException()) {
return;
}
try {
nvc.checkAndRethrowException();
} catch (const std::exception &e) {
std::cerr << "BroadcastListener: receive daemon: "
<< e.what() << std::endl;
}
}
bool parseAndValidateBroadcastMessage(
const std::array<uint8_t, UDP_BCAST_MSG_BUFFER_NBYTES> &bytes,
std::size_t nbytes,
BroadcastMessage &msgOut)
{
if (nbytes < sizeof(BroadcastMessage))
{
std::cerr << "broadcastMsgIndCInd"
<< ": Received packet too small: " << nbytes
<< " bytes (expected at least "
<< sizeof(BroadcastMessage) << ")" << std::endl;
return false;
}
std::memcpy(&msgOut, bytes.data(), sizeof(BroadcastMessage));
// Following the clean receiving flow:
// 1. Swap CRC32 to host endianness first
msgOut.footer.swapCrc32ToHostEndianness();
// 2. Validate CRC32 (on whole message excluding footer CRC32 field)
if (!msgOut.validateCrc32())
{
std::cerr << "broadcastMsgIndCInd"
<< ": Broadcast message failed CRC32 validation"
<< std::endl;
return false;
}
// 3. Swap CRC16 to host endianness
msgOut.header.swapCrc16ToHostEndianness();
// 4. Validate CRC16 (on header only)
if (!msgOut.header.validateCrc16())
{
std::cerr << "broadcastMsgIndCInd"
<< ": Broadcast message failed CRC16 validation"
<< std::endl;
return false;
}
// 5. Swap content to host endianness
msgOut.swapContentsToHostEndianness();
// 6. Validate message sanity
if (!msgOut.sanityCheck())
{
std::cerr << "broadcastMsgIndCInd"
<< ": Broadcast message failed sanity check"
<< std::endl;
return false;
}
return true;
}
/** RAII: open eventHandlerNursery at CDaemon entry; drain at exit. */
class EventHandlerNurseryScopeGuard
{
public:
explicit EventHandlerNurseryScopeGuard(
sscl::co::NonViralTaskNursery &nursery)
: nursery(nursery)
{
nursery.openAdmission();
}
~EventHandlerNurseryScopeGuard()
{
nursery.closeAdmission();
nursery.syncAwaitAllSettlements(
sscl::ComponentThread::getSelf()->getIoContext());
}
EventHandlerNurseryScopeGuard(
const EventHandlerNurseryScopeGuard &) = delete;
EventHandlerNurseryScopeGuard &operator=(
const EventHandlerNurseryScopeGuard &) = delete;
private:
sscl::co::NonViralTaskNursery &nursery;
};
} // namespace
BroadcastListener::BroadcastIndPayload::BroadcastIndPayload(
const uint8_t *receiveBytes, std::size_t receiveNbytes,
const boost::asio::ip::udp::endpoint &endpoint)
: nbytes(receiveNbytes),
senderEndpoint(endpoint)
{
if (receiveNbytes > 0) {
std::memcpy(bytes.data(), receiveBytes, receiveNbytes);
}
}
BroadcastListener::BroadcastIndPayload::BroadcastIndPayload(
BroadcastIndPayload &&other) noexcept
: nbytes(other.nbytes),
senderEndpoint(std::move(other.senderEndpoint))
{
if (nbytes > 0) {
std::memcpy(bytes.data(), other.bytes.data(), nbytes);
}
other.nbytes = 0;
}
BroadcastListener::BroadcastListener(
const std::shared_ptr<sscl::ComponentThread>& componentThread,
uint16_t listeningPort, uint16_t connectPort
@@ -19,110 +167,143 @@ listeningPort(listeningPort),
connectPort(connectPort),
deviceGoneAwayCb(nullptr),
socket(componentThread->getIoContext()),
listeningEndpoint(boost::asio::ip::udp::v4(), listeningPort),
isListening(false)
listeningEndpoint(boost::asio::ip::udp::v4(), listeningPort)
{
}
std::shared_ptr<DiscoveredDevice>
BroadcastListener::getDevice(const std::string &deviceIdentifier) const
{
auto it = std::find_if(discoveredDevices.begin(), discoveredDevices.end(),
sscl::SpinLock::Guard lock(discoveredDevices.lock);
auto it = std::find_if(
discoveredDevices.rsrc.devices.begin(),
discoveredDevices.rsrc.devices.end(),
[&deviceIdentifier](const std::shared_ptr<DiscoveredDevice>& device) {
return comms::deviceIdentifiersEqual(
device->deviceIdentifier, deviceIdentifier);
}
);
return it != discoveredDevices.end() ? *it : nullptr;
return it != discoveredDevices.rsrc.devices.end() ? *it : nullptr;
}
void BroadcastListener::broadcastMsgInd(
const boost::system::error_code& ec, std::size_t bytes_received)
void BroadcastListener::registerDiscoveredDevice(
const BroadcastMessage &msg, const std::string &senderIp)
{
if (ec)
{
std::cerr << __func__ << ": Error receiving broadcast message: "
<< ec.message() << std::endl;
return;
}
if (bytes_received < sizeof(BroadcastMessage))
{
std::cerr << __func__
<< ": Received packet too small: " << bytes_received
<< " bytes (expected at least "
<< sizeof(BroadcastMessage) << ")" << std::endl;
return;
}
// Use placement new to construct BroadcastMessage in the buffer
BroadcastMessage* msg = new (bcastMsgRecvBuffer) BroadcastMessage;
// Following the clean receiving flow:
// 1. Swap CRC32 to host endianness first
msg->footer.swapCrc32ToHostEndianness();
// 2. Validate CRC32 (on whole message excluding footer CRC32 field)
if (!msg->validateCrc32())
{
std::cerr << __func__
<< ": Broadcast message failed CRC32 validation" << std::endl;
return;
}
// 3. Swap CRC16 to host endianness
msg->header.swapCrc16ToHostEndianness();
// 4. Validate CRC16 (on header only)
if (!msg->header.validateCrc16())
{
std::cerr << __func__
<< ": Broadcast message failed CRC16 validation" << std::endl;
return;
}
// 5. Swap content to host endianness
msg->swapContentsToHostEndianness();
// 6. Validate message sanity
if (!msg->sanityCheck())
{
std::cerr << __func__
<< ": Broadcast message failed sanity check" << std::endl;
return;
}
// Extract device information
std::string senderIP = senderEndpoint.address().to_string();
std::string broadcastCode(
reinterpret_cast<const char*>(msg->broadcast_code));
reinterpret_cast<const char*>(msg.broadcast_code));
sscl::SpinLock::Guard lock(discoveredDevices.lock);
// Early return if device already exists
sscl::SpinLock::Guard lock(isListeningLock);
const auto existingIt = std::find_if(
discoveredDevices.rsrc.devices.begin(),
discoveredDevices.rsrc.devices.end(),
[&broadcastCode](const std::shared_ptr<DiscoveredDevice> &device) {
return comms::deviceIdentifiersEqual(
device->deviceIdentifier, broadcastCode);
});
if (deviceExists(broadcastCode))
if (existingIt != discoveredDevices.rsrc.devices.end())
{
// Device already exists, just log the update
if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose)
{
std::cout << __func__
std::cout << "broadcastMsgIndCInd"
<< ": Received broadcast from known device: "
<< broadcastCode << " at " << senderIP << "\n";
<< broadcastCode << " at " << senderIp << "\n";
}
}
else
{
// Create new DiscoveredDevice using conversion constructor
auto device = std::make_shared<DiscoveredDevice>(*msg, senderIP);
discoveredDevices.push_back(device);
// Output device information using stringify
std::cout << __func__ << ": Discovered new Livox device: "
<< device->stringify() << "\n";
return;
}
startReceive();
// Create new DiscoveredDevice using conversion constructor
auto device = std::make_shared<DiscoveredDevice>(msg, senderIp);
discoveredDevices.rsrc.devices.push_back(device);
std::cout << "broadcastMsgIndCInd: Discovered new Livox device: "
<< device->stringify() << "\n";
}
sscl::co::NonViralNonPostingInvoker
BroadcastListener::broadcastMsgIndCInd(
[[maybe_unused]] std::exception_ptr &exceptionPtr,
[[maybe_unused]] std::function<void()> callback,
[[maybe_unused]] sscl::SyncCancelerForAsyncWork &canceler,
BroadcastIndPayload payload)
{
BroadcastMessage msg;
if (!parseAndValidateBroadcastMessage(
payload.bytes, payload.nbytes, msg)) {
co_return;
}
const std::string senderIp = payload.senderEndpoint.address().to_string();
registerDiscoveredDevice(msg, senderIp);
co_return;
}
sscl::co::DynamicNonViralPostingInvoker
BroadcastListener::broadcastReceiveCDaemon(
[[maybe_unused]] sscl::co::ExplicitPostTarget postTarget,
[[maybe_unused]] std::exception_ptr &exceptionPtr,
[[maybe_unused]] std::function<void()> callback,
sscl::SyncCancelerForAsyncWork &canceler)
{
EventHandlerNurseryScopeGuard eventHandlerScope(eventHandlerNursery);
boost::asio::io_context &ioContext =
sscl::ComponentThread::getSelf()->getIoContext();
std::array<uint8_t, UDP_BCAST_MSG_BUFFER_NBYTES> receiveBuffer{};
boost::asio::ip::udp::endpoint senderEndpoint;
while (!canceler.isCancellationRequested())
{
const adapters::boostAsio::UdpReceiveFromAReq::Result receiveResult =
co_await adapters::boostAsio::getUdpReceiveFromAReqAwaiter(
ioContext, socket,
boost::asio::buffer(receiveBuffer), senderEndpoint);
if (isShutdownReceiveError(receiveResult.ec)) { break; }
if (receiveResult.ec)
{
std::cerr << __func__
<< ": Error receiving broadcast message: "
<< receiveResult.ec.message() << std::endl;
continue;
}
if (receiveResult.nbytes == 0) { continue; }
if (!canceler.execUncancelableSegmentOrAbort([&]()
{
eventHandlerNursery.launch(
[this,
payload = BroadcastIndPayload(
receiveBuffer.data(),
receiveResult.nbytes,
senderEndpoint)](
sscl::co::NonViralTaskNursery::Slot::Lease &lease) mutable
{
return broadcastMsgIndCInd(
lease.getExceptionStorage(),
lease.getCallerLambda(),
lease.getSyncCanceler(),
std::move(payload));
},
logEventHandlerException);
})) {
break;
}
}
co_return;
}
void BroadcastListener::start(void)
{
if (isListening) { return; }
if (daemonNursery.admissionIsOpen()) { return; }
try
{
@@ -134,62 +315,57 @@ void BroadcastListener::start(void)
* We should also set up a timer to check for devices that have gone
* away.
*/
{
sscl::SpinLock::Guard lock(isListeningLock);
socket.open(boost::asio::ip::udp::v4());
socket.bind(listeningEndpoint);
socket.open(boost::asio::ip::udp::v4());
socket.bind(listeningEndpoint);
daemonNursery.openAdmission();
// Launch the receive daemon coroutine on componentThread
daemonNursery.launch(
[this](sscl::co::NonViralTaskNursery::Slot::Lease &lease)
{
return broadcastReceiveCDaemon(
sscl::co::ExplicitPostTarget{
componentThread->getIoContext()},
lease.getExceptionStorage(),
lease.getCallerLambda(),
lease.getSyncCanceler());
},
logReceiveDaemonException);
isListening = true;
}
// Start the first async receive operation
startReceive();
std::cout << __func__ << ": BroadcastListener started on port "
<< listeningPort << std::endl;
}
catch (const boost::system::system_error& e)
{
isListening = false;
std::cerr << __func__ << ": Failed to start BroadcastListener: "
<< e.what() << std::endl;
throw;
}
}
void BroadcastListener::startReceive(void)
{
if (!isListening) { return; }
socket.async_receive_from(
boost::asio::buffer(bcastMsgRecvBuffer, sizeof(bcastMsgRecvBuffer)),
senderEndpoint,
std::bind(
&BroadcastListener::broadcastMsgInd, this,
std::placeholders::_1, std::placeholders::_2)
);
}
void BroadcastListener::stop(void)
{
{
sscl::SpinLock::Guard lock(isListeningLock);
if (!isListening) { return; }
if (!daemonNursery.admissionIsOpen()) { return; }
isListening = false;
}
daemonNursery.requestCancelOnAll();
try
{
socket.cancel();
socket.close();
std::cout << __func__ << ": BroadcastListener stopped" << std::endl;
}
catch (const boost::system::system_error& e)
{
std::cerr << __func__ << ": Error stopping BroadcastListener: " << e.what()
<< std::endl;
std::cerr << __func__ << ": Error stopping BroadcastListener: "
<< e.what() << std::endl;
throw;
}
daemonNursery.closeAdmission();
daemonNursery.syncAwaitAllSettlements(
sscl::ComponentThread::getSelf()->getIoContext());
std::cout << __func__ << ": BroadcastListener stopped" << std::endl;
}
} // namespace comms
+64 -10
View File
@@ -1,13 +1,20 @@
#ifndef BROADCAST_LISTENER_H
#define BROADCAST_LISTENER_H
#include <boostAsioLinkageFix.h>
#include <array>
#include <vector>
#include <string>
#include <memory>
#include <atomic>
#include <functional>
#include <boost/asio/ip/udp.hpp>
#include <user/senseApiDesc.h>
#include <spinscale/spinLock.h>
#include <spinscale/sharedResourceGroup.h>
#include <spinscale/co/dynamicPostingInvoker.h>
#include <spinscale/co/nonViralTaskNursery.h>
#include <spinscale/syncCancelerForAsyncWork.h>
#include "device.h"
namespace livoxProto1 {
@@ -46,11 +53,50 @@ public:
void start(void);
void stop(void);
void broadcastMsgInd(
const boost::system::error_code& ec, std::size_t bytes_received);
private:
void startReceive(void);
/** Per-datagram snapshot; bytes are copied in ctor/move so handlers remain
* safe if broadcastMsgIndCInd is ever posted asynchronously.
*/
struct BroadcastIndPayload
{
std::array<uint8_t, UDP_BCAST_MSG_BUFFER_NBYTES> bytes{};
std::size_t nbytes = 0;
boost::asio::ip::udp::endpoint senderEndpoint;
BroadcastIndPayload() = default;
BroadcastIndPayload(
const uint8_t *receiveBytes, std::size_t receiveNbytes,
const boost::asio::ip::udp::endpoint &endpoint);
BroadcastIndPayload(BroadcastIndPayload &&other) noexcept;
BroadcastIndPayload(const BroadcastIndPayload &) = delete;
BroadcastIndPayload &operator=(const BroadcastIndPayload &) = delete;
BroadcastIndPayload &operator=(BroadcastIndPayload &&) = delete;
};
/** EXPLANATION:
* broadcastReceiveCDaemon is a dynamic posting non-viral coroutine: start()
* passes ExplicitPostTarget{componentThread->getIoContext()} so the daemon
* body always runs on componentThread. Synchronous listener state is
* touched only inside canceler.execUncancelableSegmentOrAbort() so stop()
* cannot tear down *this while the daemon is in a critical section.
*/
sscl::co::DynamicNonViralPostingInvoker broadcastReceiveCDaemon(
sscl::co::ExplicitPostTarget postTarget,
std::exception_ptr &exceptionPtr,
std::function<void()> callback,
sscl::SyncCancelerForAsyncWork &canceler);
sscl::co::NonViralNonPostingInvoker broadcastMsgIndCInd(
std::exception_ptr &exceptionPtr,
std::function<void()> callback,
sscl::SyncCancelerForAsyncWork &canceler,
BroadcastIndPayload payload);
void registerDiscoveredDevice(
const BroadcastMessage &msg, const std::string &senderIp);
private:
std::shared_ptr<sscl::ComponentThread> componentThread;
@@ -63,14 +109,22 @@ private:
*/
uint16_t listeningPort, connectPort;
DeviceGoneAwayCbFn *deviceGoneAwayCb;
std::vector<std::shared_ptr<DiscoveredDevice>> discoveredDevices;
struct DiscoveredDevicesResources
{
std::vector<std::shared_ptr<DiscoveredDevice>> devices;
};
mutable sscl::SharedResourceGroup<
sscl::SpinLock, DiscoveredDevicesResources> discoveredDevices;
boost::asio::ip::udp::socket socket;
boost::asio::ip::udp::endpoint listeningEndpoint, senderEndpoint;
sscl::SpinLock isListeningLock;
bool isListening;
boost::asio::ip::udp::endpoint listeningEndpoint;
uint8_t bcastMsgRecvBuffer[UDP_BCAST_MSG_BUFFER_NBYTES];
/** Hosts broadcastReceiveCDaemon; stop() cancels and syncAwaitAll's it. */
sscl::co::NonViralTaskNursery daemonNursery;
/** Hosts per-datagram broadcastMsgIndCInd; admission/drain RAII inside CDaemon. */
sscl::co::NonViralTaskNursery eventHandlerNursery;
};
} // namespace comms
@@ -0,0 +1,113 @@
#ifndef ADAPTERS_BOOST_ASIO_UDP_RECEIVE_FROM_AREQ_H
#define ADAPTERS_BOOST_ASIO_UDP_RECEIVE_FROM_AREQ_H
#include <boostAsioLinkageFix.h>
#include <atomic>
#include <coroutine>
#include <memory>
#include <boost/asio/buffer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/udp.hpp>
#include <boost/asio/post.hpp>
#include <boost/system/error_code.hpp>
namespace adapters::boostAsio {
/** Coroutine awaiter wrapping boost::asio::ip::udp::socket::async_receive_from.
*
* Resumes on resumeIoContext when the receive completes or fails.
*/
class UdpReceiveFromAReq
{
public:
struct Result
{
boost::system::error_code ec;
std::size_t nbytes = 0;
};
struct AsyncState
{
std::atomic<bool> settled{false};
Result result;
std::coroutine_handle<> callerSchedHandle;
};
UdpReceiveFromAReq(
boost::asio::io_context &resumeIoContext,
boost::asio::ip::udp::socket &socket,
const boost::asio::mutable_buffer &buffer,
boost::asio::ip::udp::endpoint &senderEndpoint)
: asyncState(std::make_shared<AsyncState>()),
resumeIoContext(resumeIoContext)
{
socket.async_receive_from(
buffer, senderEndpoint,
[this](const boost::system::error_code &error, std::size_t nbytes)
{
onReceiveComplete(error, nbytes);
});
}
bool await_ready() const noexcept
{
return asyncState->settled.load(std::memory_order_acquire);
}
bool await_suspend(std::coroutine_handle<> caller) noexcept
{
if (asyncState->settled.load(std::memory_order_acquire)) {
return false;
}
asyncState->callerSchedHandle = caller;
return true;
}
Result await_resume() const noexcept
{
return asyncState->result;
}
private:
void onReceiveComplete(
const boost::system::error_code &error, std::size_t nbytes)
{
if (asyncState->settled.exchange(true)) {
return;
}
asyncState->result.ec = error;
asyncState->result.nbytes = nbytes;
signalSettledAndResumeCaller();
}
void signalSettledAndResumeCaller()
{
std::coroutine_handle<> handle = asyncState->callerSchedHandle;
if (!handle) {
return;
}
boost::asio::post(resumeIoContext, handle);
}
std::shared_ptr<AsyncState> asyncState;
boost::asio::io_context &resumeIoContext;
};
inline auto getUdpReceiveFromAReqAwaiter(
boost::asio::io_context &resumeIoContext,
boost::asio::ip::udp::socket &socket,
const boost::asio::mutable_buffer &buffer,
boost::asio::ip::udp::endpoint &senderEndpoint)
{
return UdpReceiveFromAReq(
resumeIoContext, socket, buffer, senderEndpoint);
}
} // namespace adapters::boostAsio
#endif // ADAPTERS_BOOST_ASIO_UDP_RECEIVE_FROM_AREQ_H