#ifndef UDP_COMMAND_DEMUXER_H #define UDP_COMMAND_DEMUXER_H #include #include #include #include #include #include #include #include #include #include #include namespace livoxProto1 { // Forward declarations class DeviceManager; namespace comms { struct UdpCommandResponseResult { enum class Outcome { Timeout, Response, RecvError }; Outcome outcome = Outcome::Timeout; uint8_t buffer[1024]{}; ssize_t bytesReceived = -1; }; struct CommandWaitKey { std::string deviceIp; uint8_t cmdSet; uint8_t cmdId; bool operator==(const CommandWaitKey &other) const { return deviceIp == other.deviceIp && cmdSet == other.cmdSet && cmdId == other.cmdId; } }; struct CommandWaitKeyHash { std::size_t operator()(const CommandWaitKey &key) const { std::size_t hash = std::hash{}(key.deviceIp); hash ^= (static_cast(key.cmdSet) << 8) | static_cast(key.cmdId); return hash; } }; /** * UdpCommandDemuxer - Routes UDP command datagrams to appropriate devices * * This class listens on the command port (65000) for incoming UDP datagrams * from Livox devices and routes them to the appropriate Device based on * the source IP address. * * The reason we need a whole class for this is because we use the same port * numbers for all connected devices, so we have no way to distinguish between * devices except based on the devices' IP addrs. Since all commands are sent * over UDP, our sockets don't have built-in binding to a specific source IP. * * So we need to discriminate between source IPs manually, and demultiplex * the dgrams received from different devices manually. * * We'll prolly also have to do the same thing for point cloud and IMU data, so * we'll prolly end up renaming this class to UdpResponseDemuxer. */ class UdpCommandDemuxer { public: UdpCommandDemuxer( const std::shared_ptr& componentThread, DeviceManager& deviceManager, uint16_t commandPort = 56001, uint16_t dataPort = 56000); ~UdpCommandDemuxer(); void start(); void stop(); bool isRunning() const { return isActive.load(); } // Get shared pointer to command endpoint for handshake use std::shared_ptr getCmdEndpointFdDesc() const { return cmdEndpointFdDesc; } // Get shared pointer to pcloud data fd for use in IoUringAssemblyEngine std::shared_ptr getPcloudDataFdDesc() const { return pcloudDataFdDesc; } sscl::co::ViralNonPostingInvoker waitForCommandResponseCReq( uint8_t cmdSet, uint8_t cmdId, const std::string &deviceIp, int timeoutMs); private: struct PendingCommandWaitDesc; sscl::co::ViralNonPostingInvoker waitForCommandResponseCReq( uint8_t cmdSet, uint8_t cmdId, const std::string &deviceIp); void setupSockets(); void setupCommandSocket(); void setupPcloudDataSocket(); void startAsyncReceive(); void onDataReady(const boost::system::error_code& error); void processIncomingData(); bool tryCompletePendingCommandWait( const char *sourceIp, uint8_t cmdSet, uint8_t cmdId, const uint8_t *data, ssize_t bytesReceived); void cancelPendingCommandWait( uint8_t cmdSet, uint8_t cmdId, const std::string &deviceIp); std::shared_ptr findAndRemovePendingCommandWait( const CommandWaitKey &key); void settlePendingCommandWait( const std::shared_ptr &wait, UdpCommandResponseResult::Outcome outcome, const uint8_t *data, ssize_t bytesReceived); std::shared_ptr componentThread; DeviceManager& deviceManager; uint16_t commandPort; uint16_t dataPort; // State management sscl::SpinLock isActiveAndShouldStopLock; std::atomic isActive{false}; std::atomic shouldStop{false}; struct PendingWaitsResources { std::unordered_map< CommandWaitKey, std::shared_ptr, CommandWaitKeyHash> pendingWaits; }; sscl::SharedResourceGroup pendingWaits; std::shared_ptr pcloudDataFdDesc; std::shared_ptr cmdEndpointFdDesc; uint8_t receiveBuffer[1024]; struct sockaddr_in senderAddr; socklen_t senderAddrLen; ssize_t bytesReceived; }; } // namespace comms } // namespace livoxProto1 #endif // UDP_COMMAND_DEMUXER_H