LivoxProto1: port to sscl::co framework

Code now actually looks a lot cleaner, tbh.
This commit is contained in:
2026-05-28 20:13:12 -04:00
parent bbc16dc4c4
commit 25efccf6c5
20 changed files with 1275 additions and 2145 deletions
@@ -1,6 +1,10 @@
#include <boostAsioLinkageFix.h>
#include <iostream>
#include <cstring>
#include <coroutine>
#include <functional>
#include <optional>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
@@ -8,7 +12,11 @@
#include <fcntl.h>
#include <errno.h>
#include <adapters/boostAsio/deadlineTimerAReq.h>
#include <boost/asio/post.hpp>
#include <spinscale/co/group.h>
#include "udpCommandDemuxer.h"
#include "protocol.h"
#include "core.h"
#include "device.h"
@@ -330,6 +338,19 @@ void UdpCommandDemuxer::processIncomingData()
char sourceIP[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &senderAddr.sin_addr, sourceIP, INET_ADDRSTRLEN);
if (bytesReceived >= static_cast<ssize_t>(
sizeof(Header) + sizeof(Command)))
{
const uint8_t cmdSet = receiveBuffer[sizeof(Header)];
const uint8_t cmdId = receiveBuffer[sizeof(Header) + 1];
if (tryCompletePendingCommandWait(
sourceIP, cmdSet, cmdId, receiveBuffer, bytesReceived))
{
return;
}
}
// First, find device with matching IP address in DeviceManager collection
for (const auto &device : deviceManager.devices)
{
@@ -395,5 +416,208 @@ void UdpCommandDemuxer::processIncomingData()
<< sourceIP << ", discarding datagram" << std::endl;
}
struct UdpCommandDemuxer::PendingCommandWaitDesc
{
CommandWaitKey key;
boost::asio::io_service &resumeIoService;
std::atomic<bool> settled{false};
UdpCommandResponseResult result{};
std::coroutine_handle<> callerSchedHandle;
PendingCommandWaitDesc(
CommandWaitKey keyIn,
boost::asio::io_service &resumeIoServiceIn)
: key(std::move(keyIn)),
resumeIoService(resumeIoServiceIn)
{}
};
void UdpCommandDemuxer::settlePendingCommandWait(
const std::shared_ptr<PendingCommandWaitDesc> &wait,
UdpCommandResponseResult::Outcome outcome,
const uint8_t *data, ssize_t bytesReceived)
{
if (wait->settled.exchange(true)) {
return;
}
wait->result.outcome = outcome;
wait->result.bytesReceived = bytesReceived;
if (outcome == UdpCommandResponseResult::Outcome::Response
&& data != nullptr
&& bytesReceived > 0
&& bytesReceived
<= static_cast<ssize_t>(sizeof(wait->result.buffer)))
{
memcpy(wait->result.buffer, data, bytesReceived);
}
std::coroutine_handle<> handle = wait->callerSchedHandle;
if (!handle) {
return;
}
boost::asio::post(wait->resumeIoService, handle);
}
std::shared_ptr<UdpCommandDemuxer::PendingCommandWaitDesc>
UdpCommandDemuxer::findAndRemovePendingCommandWait(const CommandWaitKey &key)
{
sscl::SpinLock::Guard guard(pendingWaits.lock);
const auto iterator = pendingWaits.rsrc.pendingWaits.find(key);
if (iterator == pendingWaits.rsrc.pendingWaits.end()) {
return nullptr;
}
std::shared_ptr<PendingCommandWaitDesc> wait = iterator->second;
pendingWaits.rsrc.pendingWaits.erase(iterator);
return wait;
}
void UdpCommandDemuxer::cancelPendingCommandWait(
uint8_t cmdSet, uint8_t cmdId,
const std::string &deviceIp)
{
std::shared_ptr<PendingCommandWaitDesc> wait = findAndRemovePendingCommandWait(
{deviceIp, cmdSet, cmdId});
if (!wait) { return; }
settlePendingCommandWait(
wait,
UdpCommandResponseResult::Outcome::Timeout,
nullptr, -1);
}
bool UdpCommandDemuxer::tryCompletePendingCommandWait(
const char *sourceIp,
uint8_t cmdSet, uint8_t cmdId,
const uint8_t *data, ssize_t bytesReceived)
{
std::shared_ptr<PendingCommandWaitDesc> wait = findAndRemovePendingCommandWait(
{sourceIp, cmdSet, cmdId});
if (!wait) { return false; }
const UdpCommandResponseResult::Outcome outcome =
(bytesReceived > 0
&& bytesReceived
<= static_cast<ssize_t>(sizeof(wait->result.buffer)))
? UdpCommandResponseResult::Outcome::Response
: UdpCommandResponseResult::Outcome::RecvError;
settlePendingCommandWait(wait, outcome, data, bytesReceived);
return true;
}
sscl::co::ViralNonPostingInvoker<UdpCommandResponseResult>
UdpCommandDemuxer::waitForCommandResponseCReq(
uint8_t cmdSet, uint8_t cmdId,
const std::string &deviceIp)
{
const CommandWaitKey key{deviceIp, cmdSet, cmdId};
auto wait = std::make_shared<PendingCommandWaitDesc>(
key, componentThread->getIoService());
{
sscl::SpinLock::Guard guard(pendingWaits.lock);
pendingWaits.rsrc.pendingWaits[key] = wait;
}
struct PendingCommandWaitDescAwaiter
{
std::shared_ptr<PendingCommandWaitDesc> wait;
bool await_ready() const noexcept
{
return wait->settled.load(std::memory_order_acquire);
}
bool await_suspend(std::coroutine_handle<> caller) noexcept
{
if (wait->settled.load(std::memory_order_acquire)) {
return false;
}
wait->callerSchedHandle = caller;
return true;
}
UdpCommandResponseResult await_resume() const noexcept
{
return wait->result;
}
};
const UdpCommandResponseResult result =
co_await PendingCommandWaitDescAwaiter{wait};
if (findAndRemovePendingCommandWait(key))
{
std::cerr << __func__ << ": pending wait still registered after "
"settle for device " << deviceIp << " (cmd_set="
<< static_cast<int>(cmdSet) << ", cmd_id="
<< static_cast<int>(cmdId) << "); program error"
<< std::endl;
}
co_return result;
}
sscl::co::ViralNonPostingInvoker<UdpCommandResponseResult>
UdpCommandDemuxer::waitForCommandResponseCReq(
uint8_t cmdSet, uint8_t cmdId,
const std::string &deviceIp,
int timeoutMs)
{
/** EXPLANATION:
* We setup an async timer event to detect timeout, and register a UDP
* command handler to wait for the device to respond to the incoming command
* request. If the device does not respond within the timeout period,
* we will consider the command to have failed.
*/
boost::asio::io_service &ioService = componentThread->getIoService();
std::optional<std::shared_ptr<boost::asio::deadline_timer>> raceTimer;
auto timerAwaiter = adapters::boostAsio::getDeadlineTimerAReqAwaiter(
ioService,
boost::posix_time::milliseconds(timeoutMs),
raceTimer);
auto responseInvoker = waitForCommandResponseCReq(cmdSet, cmdId, deviceIp);
static constexpr int timerMemberSettlementIndex = 0;
sscl::co::Group group;
group.add(timerAwaiter);
group.add(responseInvoker);
co_await group.getAwaitFirstSettlementInvoker();
group.checkForAndReThrowGroupExceptions();
const bool timerWonFirst =
group.s.rsrc.firstSettledInvokerIdx == timerMemberSettlementIndex;
if (timerWonFirst) {
cancelPendingCommandWait(cmdSet, cmdId, deviceIp);
} else if (raceTimer) {
(*raceTimer)->cancel();
}
/** Group member adapter coros are fire-and-forget; keep group alive until
* both members settle so the loser adapter does not touch freed state.
*/
co_await group.getAwaitAllSettlementsInvoker();
group.checkForAndReThrowGroupExceptions();
if (timerWonFirst)
{
UdpCommandResponseResult timeoutResult;
timeoutResult.outcome = UdpCommandResponseResult::Outcome::Timeout;
co_return timeoutResult;
}
co_return responseInvoker.completedReturnValues().myReturnValue;
}
} // namespace comms
} // namespace livoxProto1