Bug:BcastListener: Add SpinLock for races around stop()

This commit is contained in:
2025-11-07 20:44:03 -04:00
parent b598ca8594
commit 7d2cb58200
2 changed files with 28 additions and 24 deletions
+20 -18
View File
@@ -1,5 +1,6 @@
#include <algorithm> #include <algorithm>
#include <iostream> #include <iostream>
#include <functional>
#include <opts.h> #include <opts.h>
#include <componentThread.h> #include <componentThread.h>
#include "broadcastListener.h" #include "broadcastListener.h"
@@ -54,7 +55,6 @@ void BroadcastListener::broadcastMsgInd(
return; return;
} }
// Use placement new to construct BroadcastMessage in the buffer // Use placement new to construct BroadcastMessage in the buffer
BroadcastMessage* msg = new (bcastMsgRecvBuffer) BroadcastMessage; BroadcastMessage* msg = new (bcastMsgRecvBuffer) BroadcastMessage;
@@ -94,6 +94,8 @@ void BroadcastListener::broadcastMsgInd(
reinterpret_cast<const char*>(msg->broadcast_code)); reinterpret_cast<const char*>(msg->broadcast_code));
// Early return if device already exists // Early return if device already exists
smo::SpinLock::Guard lock(isListeningLock);
if (deviceExists(broadcastCode)) if (deviceExists(broadcastCode))
{ {
// Device already exists, just log the update // Device already exists, just log the update
@@ -103,21 +105,23 @@ void BroadcastListener::broadcastMsgInd(
<< ": Received broadcast from known device: " << ": Received broadcast from known device: "
<< broadcastCode << " at " << senderIP << "\n"; << broadcastCode << " at " << senderIP << "\n";
} }
return;
} }
else
{
// Create new DiscoveredDevice using conversion constructor // Create new DiscoveredDevice using conversion constructor
auto device = std::make_shared<DiscoveredDevice>(*msg, senderIP); auto device = std::make_shared<DiscoveredDevice>(*msg, senderIP);
discoveredDevices.push_back(device); discoveredDevices.push_back(device);
// Output device information using stringify // Output device information using stringify
std::cout << __func__ << ": Discovered new Livox device: " std::cout << __func__ << ": Discovered new Livox device: "
<< device->stringify() << "\n"; << device->stringify() << "\n";
} }
startReceive();
}
void BroadcastListener::start(void) void BroadcastListener::start(void)
{ {
if (isListening.load()) { return; } if (isListening) { return; }
try try
{ {
@@ -132,7 +136,7 @@ void BroadcastListener::start(void)
socket.open(boost::asio::ip::udp::v4()); socket.open(boost::asio::ip::udp::v4());
socket.bind(listeningEndpoint); socket.bind(listeningEndpoint);
isListening.store(true); isListening = true;
// Start the first async receive operation // Start the first async receive operation
startReceive(); startReceive();
std::cout << __func__ << ": BroadcastListener started on port " std::cout << __func__ << ": BroadcastListener started on port "
@@ -140,7 +144,7 @@ void BroadcastListener::start(void)
} }
catch (const boost::system::system_error& e) catch (const boost::system::system_error& e)
{ {
isListening.store(false); isListening = false;
std::cerr << __func__ << ": Failed to start BroadcastListener: " std::cerr << __func__ << ": Failed to start BroadcastListener: "
<< e.what() << std::endl; << e.what() << std::endl;
throw; throw;
@@ -149,27 +153,25 @@ void BroadcastListener::start(void)
void BroadcastListener::startReceive(void) void BroadcastListener::startReceive(void)
{ {
if (!isListening.load()) { return; } if (!isListening) { return; }
socket.async_receive_from( socket.async_receive_from(
boost::asio::buffer(bcastMsgRecvBuffer, sizeof(bcastMsgRecvBuffer)), boost::asio::buffer(bcastMsgRecvBuffer, sizeof(bcastMsgRecvBuffer)),
senderEndpoint, senderEndpoint,
[this](const boost::system::error_code& ec, std::size_t bytes_received) std::bind(
{ &BroadcastListener::broadcastMsgInd, this,
broadcastMsgInd(ec, bytes_received); std::placeholders::_1, std::placeholders::_2)
// Continue listening for the next packet
if (isListening.load())
{ startReceive(); }
}
); );
} }
void BroadcastListener::stop(void) void BroadcastListener::stop(void)
{ {
if (!isListening.load()) { return; } {
smo::SpinLock::Guard lock(isListeningLock);
if (!isListening) { return; }
isListening.store(false); isListening = false;
}
try try
{ {
+3 -1
View File
@@ -8,6 +8,7 @@
#include <atomic> #include <atomic>
#include <boost/asio/ip/udp.hpp> #include <boost/asio/ip/udp.hpp>
#include <user/senseApiDesc.h> #include <user/senseApiDesc.h>
#include <spinLock.h>
#include "device.h" #include "device.h"
namespace livoxProto1 { namespace livoxProto1 {
@@ -67,7 +68,8 @@ private:
boost::asio::ip::udp::socket socket; boost::asio::ip::udp::socket socket;
boost::asio::ip::udp::endpoint listeningEndpoint, senderEndpoint; boost::asio::ip::udp::endpoint listeningEndpoint, senderEndpoint;
std::atomic<bool> isListening; smo::SpinLock isListeningLock;
bool isListening;
uint8_t bcastMsgRecvBuffer[UDP_BCAST_MSG_BUFFER_NBYTES]; uint8_t bcastMsgRecvBuffer[UDP_BCAST_MSG_BUFFER_NBYTES];
}; };