Files
salmanoff/include/adapters/boostAsio/udpReceiveFromAReq.h
T

114 lines
2.5 KiB
C++
Raw Normal View History

#ifndef ADAPTERS_BOOST_ASIO_UDP_RECEIVE_FROM_AREQ_H
#define ADAPTERS_BOOST_ASIO_UDP_RECEIVE_FROM_AREQ_H
#include <boostAsioLinkageFix.h>
#include <atomic>
#include <coroutine>
#include <memory>
#include <boost/asio/buffer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/udp.hpp>
#include <boost/asio/post.hpp>
#include <boost/system/error_code.hpp>
namespace adapters::boostAsio {
/** Coroutine awaiter wrapping boost::asio::ip::udp::socket::async_receive_from.
*
* Resumes on resumeIoContext when the receive completes or fails.
*/
class UdpReceiveFromAReq
{
public:
struct Result
{
boost::system::error_code ec;
std::size_t nbytes = 0;
};
struct AsyncState
{
std::atomic<bool> settled{false};
Result result;
std::coroutine_handle<> callerSchedHandle;
};
UdpReceiveFromAReq(
boost::asio::io_context &resumeIoContext,
boost::asio::ip::udp::socket &socket,
const boost::asio::mutable_buffer &buffer,
boost::asio::ip::udp::endpoint &senderEndpoint)
: asyncState(std::make_shared<AsyncState>()),
resumeIoContext(resumeIoContext)
{
socket.async_receive_from(
buffer, senderEndpoint,
[this](const boost::system::error_code &error, std::size_t nbytes)
{
onReceiveComplete(error, nbytes);
});
}
bool await_ready() const noexcept
{
return asyncState->settled.load(std::memory_order_acquire);
}
bool await_suspend(std::coroutine_handle<> caller) noexcept
{
if (asyncState->settled.load(std::memory_order_acquire)) {
return false;
}
asyncState->callerSchedHandle = caller;
return true;
}
Result await_resume() const noexcept
{
return asyncState->result;
}
private:
void onReceiveComplete(
const boost::system::error_code &error, std::size_t nbytes)
{
if (asyncState->settled.exchange(true)) {
return;
}
asyncState->result.ec = error;
asyncState->result.nbytes = nbytes;
signalSettledAndResumeCaller();
}
void signalSettledAndResumeCaller()
{
std::coroutine_handle<> handle = asyncState->callerSchedHandle;
if (!handle) {
return;
}
boost::asio::post(resumeIoContext, handle);
}
std::shared_ptr<AsyncState> asyncState;
boost::asio::io_context &resumeIoContext;
};
inline auto getUdpReceiveFromAReqAwaiter(
boost::asio::io_context &resumeIoContext,
boost::asio::ip::udp::socket &socket,
const boost::asio::mutable_buffer &buffer,
boost::asio::ip::udp::endpoint &senderEndpoint)
{
return UdpReceiveFromAReq(
resumeIoContext, socket, buffer, senderEndpoint);
}
} // namespace adapters::boostAsio
#endif // ADAPTERS_BOOST_ASIO_UDP_RECEIVE_FROM_AREQ_H