diff --git a/AGENTS.md b/AGENTS.md index 9f1ee0d..33507c9 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -11,6 +11,7 @@ - Aggressively isolate, split off, deduplicate and reuse code which can be made into common library code. Do the same with UI elements. Do this both when implementing new features and opportunistically while refactoring or changing old code/UI elements. - Names of files, functions, classes, abstractions, database fields, etc should be aimed at disambiguating purpose and function, rather than at brevity. - Any source or header file that includes a Boost header must include `` first (at the top of the file, or immediately after the include guard in headers), before all other includes, so Boost.Asio is used as a non-header-only library correctly. +- When refactoring code, moving code around or splitting code into new files, don't omit or remove source code comments. Preserve source code comments across refactors. ## Style: @@ -18,7 +19,8 @@ * UpperCamelCase for class/struct names, lowerCamelCase for var and member var names; underscores_between_words for namespace names. No hungarian notation. * Differentiate overshadowing local arg var names from class member var names - by prefixing local arg var names with underscore (_). + by prefixing local arg var names with underscore (_). Don't do things like + postfixing the local arg var name with "In", etc. * Single-line blocks after a selection/iteration statement must always be enclosed in braces, but you can choose whether opening brace is on same line, versus whether you put both braces on same line. I.e: diff --git a/commonLibs/livoxProto1/CMakeLists.txt b/commonLibs/livoxProto1/CMakeLists.txt index ae42203..9f53c6c 100644 --- a/commonLibs/livoxProto1/CMakeLists.txt +++ b/commonLibs/livoxProto1/CMakeLists.txt @@ -17,10 +17,17 @@ if(ENABLE_LIB_livoxProto1) # Set config define for header generation add_compile_definitions(CONFIG_LIB_LIVOXPROTO1_ENABLED) - target_include_directories(livoxProto1 PUBLIC ${Boost_INCLUDE_DIRS}) + target_include_directories(livoxProto1 PUBLIC + ${Boost_INCLUDE_DIRS} + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_SOURCE_DIR}/smocore/include + ${CMAKE_SOURCE_DIR}/commonLibs + ) target_link_libraries(livoxProto1 PUBLIC Boost::system Boost::log - attachmentSupport) + attachmentSupport + spinscale + ) # Verify Boost dynamic dependencies after build add_custom_command(TARGET livoxProto1 POST_BUILD diff --git a/commonLibs/livoxProto1/broadcastListener.cpp b/commonLibs/livoxProto1/broadcastListener.cpp index 00714b7..61993eb 100644 --- a/commonLibs/livoxProto1/broadcastListener.cpp +++ b/commonLibs/livoxProto1/broadcastListener.cpp @@ -1,3 +1,5 @@ +#include + #include #include #include diff --git a/commonLibs/livoxProto1/core.cpp b/commonLibs/livoxProto1/core.cpp index ce2907d..7ee8723 100644 --- a/commonLibs/livoxProto1/core.cpp +++ b/commonLibs/livoxProto1/core.cpp @@ -1,10 +1,10 @@ +#include + #include #include #include #include #include -#include -#include #include #include "protocol.h" #include "core.h" @@ -72,68 +72,16 @@ std::optional> DeviceManager::getDevice( return std::nullopt; } -// GetOrCreateDeviceReq nested class implementation -class DeviceManager::GetOrCreateDeviceReq -: public sscl::cps::NonPostedAsynchronousContinuation< - livoxProto1_getOrCreateDeviceReqCbFn> -{ -public: - DeviceManager& deviceManager; - // The device we're trying to connect (holds all connection parameters) - std::shared_ptr pendingDevice; - -public: - GetOrCreateDeviceReq( - DeviceManager& mgr, - std::shared_ptr device, - sscl::cps::Callback cb) - : sscl::cps::NonPostedAsynchronousContinuation< - livoxProto1_getOrCreateDeviceReqCbFn>(std::move(cb)), - deviceManager(mgr), pendingDevice(device) - {} - - // Public accessor for the original callback - void callOriginalCallback(bool success, std::shared_ptr device) - { callOriginalCb(success, device); } - - void callOriginalCallbackWithFailure() - { callOriginalCallback(false, nullptr); } - - void getOrCreateDeviceReq1( - std::shared_ptr context, bool connectSuccess - ) - { - if (!connectSuccess) - { - std::cerr << __func__ << ": Connection failed for device " - << context->pendingDevice->discoveredDevice.deviceIdentifier - << std::endl; - context->callOriginalCallbackWithFailure(); - return; - } - - // Connection successful, add device to collection - context->deviceManager.devices.push_back(context->pendingDevice); - if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose) - { - std::cout << __func__ << ": Successfully connected and added device " - << context->pendingDevice->discoveredDevice.deviceIdentifier - << std::endl; - } - - // Return success with the connected device - context->callOriginalCallback(true, context->pendingDevice); - } -}; - -void DeviceManager::getOrCreateDeviceReq( +sscl::co::ViralNonPostingInvoker +DeviceManager::getOrCreateDeviceCReq( const std::string &deviceIdentifier, const std::shared_ptr& componentThread, int commandTimeoutMs, int retryDelayMs, const std::string& smoIp, uint8_t smoSubnetNbits, - uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort, - sscl::cps::Callback callback) + uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort) { + LivoxProto1GetOrCreateDeviceResult result; + // Validate smoIp format using Boost.Asio IPv4 validation if (!smoIp.empty() && !comms::isValidIPv4(smoIp)) { @@ -155,9 +103,9 @@ void DeviceManager::getOrCreateDeviceReq( auto existingDevice = getDevice(deviceIdentifier); if (existingDevice) { - // Device already exists and is connected, return it - callback.callbackFn(true, existingDevice.value()); - return; + result.success = true; + result.device = existingDevice.value(); + co_return result; } // Device doesn't exist, create a new one but don't add it to collection yet @@ -167,82 +115,50 @@ void DeviceManager::getOrCreateDeviceReq( smoIp, smoSubnetNbits, dataPort, cmdPort, imuPort); - // Create the continuation request object to hold state and callbacks - auto request = std::make_shared( - *this, newDevice, std::move(callback)); - // Start the connection process - only add to collection on success - request->pendingDevice->connectReq( - {request, std::bind( - &DeviceManager::GetOrCreateDeviceReq::getOrCreateDeviceReq1, - request.get(), request, std::placeholders::_1)}); + const bool connectSuccess = co_await newDevice->connectCReq(); + if (!connectSuccess) + { + std::cerr << __func__ << ": Connection failed for device " + << newDevice->discoveredDevice.deviceIdentifier + << std::endl; + co_return result; + } + + devices.push_back(newDevice); + if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose) + { + std::cout << __func__ << ": Successfully connected and added device " + << newDevice->discoveredDevice.deviceIdentifier + << std::endl; + } + + result.success = true; + result.device = newDevice; + co_return result; } -class DeviceManager::DestroyDeviceReq -: public sscl::cps::NonPostedAsynchronousContinuation< - livoxProto1_destroyDeviceReqCbFn> -{ -public: - DeviceManager& deviceManager; - std::shared_ptr pendingDevice; - -public: - DestroyDeviceReq( - DeviceManager& mgr, - std::shared_ptr device, - sscl::cps::Callback cb) - : sscl::cps::NonPostedAsynchronousContinuation< - livoxProto1_destroyDeviceReqCbFn>(std::move(cb)), - deviceManager(mgr), pendingDevice(device) - {} - - // Public accessor for the original callback - void callOriginalCallback(bool success) - { callOriginalCb(success); } - - void callOriginalCallbackWithFailure() - { callOriginalCallback(false); } - - void destroyDeviceReq1( - std::shared_ptr context, bool success - ) - { - context->deviceManager.devices.erase( - std::remove( - context->deviceManager.devices.begin(), - context->deviceManager.devices.end(), - context->pendingDevice), - context->deviceManager.devices.end()); - - context->callOriginalCallback(success); - } -}; - -void DeviceManager::destroyDeviceReq( - std::shared_ptr dev, - sscl::cps::Callback callback -) +sscl::co::ViralNonPostingInvoker DeviceManager::destroyDeviceCReq( + std::shared_ptr dev) { /** EXPLANATION: * Check to see if the device is in our collection. If so, call - * disconnectReq and then remove it. + * disconnectCReq and then remove it. */ std::shared_ptr device = getDevice(dev->discoveredDevice). value_or(nullptr); - if (!device || device->nAttachedStimulusProducers > 0) - { - callback.callbackFn(false); - return; + if (!device || device->nAttachedStimulusProducers > 0) { + co_return false; } - auto request = std::make_shared( - *this, device, std::move(callback)); + const bool success = co_await device->disconnectCReq(); - device->disconnectReq( - {request, std::bind( - &DeviceManager::DestroyDeviceReq::destroyDeviceReq1, - request.get(), request, std::placeholders::_1)}); + devices.erase( + std::remove(devices.begin(), devices.end(), device), + devices.end()); + + co_return success; } void main(const std::shared_ptr &componentThread, diff --git a/commonLibs/livoxProto1/core.h b/commonLibs/livoxProto1/core.h index 22e2375..f6bdb22 100644 --- a/commonLibs/livoxProto1/core.h +++ b/commonLibs/livoxProto1/core.h @@ -11,7 +11,7 @@ #include "broadcastListener.h" #include "udpCommandDemuxer.h" #include "livoxProto1.h" -#include +#include namespace livoxProto1 { @@ -23,17 +23,16 @@ public: static void deviceGoneAwayInd(const comms::DiscoveredDevice &device); - void getOrCreateDeviceReq( + sscl::co::ViralNonPostingInvoker + getOrCreateDeviceCReq( const std::string &deviceIdentifier, const std::shared_ptr& componentThread, int commandTimeoutMs, int retryDelayMs, const std::string& smoIp, uint8_t smoSubnetNbits, - uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort, - sscl::cps::Callback callback); + uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort); - void destroyDeviceReq( - std::shared_ptr device, - sscl::cps::Callback callback); + sscl::co::ViralNonPostingInvoker destroyDeviceCReq( + std::shared_ptr device); std::optional> getDevice( const std::string &deviceIdentifier); @@ -52,10 +51,6 @@ public: std::vector> devices; comms::BroadcastListener broadcastListener; comms::UdpCommandDemuxer udpCommandDemuxer; - - // Nested continuation class for async device creation - class GetOrCreateDeviceReq; - class DestroyDeviceReq; }; void main( diff --git a/commonLibs/livoxProto1/device.cpp b/commonLibs/livoxProto1/device.cpp index 0ac074f..26757dc 100644 --- a/commonLibs/livoxProto1/device.cpp +++ b/commonLibs/livoxProto1/device.cpp @@ -17,8 +17,6 @@ #include #include #include -#include -#include #include "device.h" #include "protocol.h" #include "core.h" @@ -119,288 +117,563 @@ Device::~Device() heartbeatTimer.reset(); } -/** - * Device::ConnectReq - Encapsulates all state and resources for async connection sequence - * This class manages the overall device connection process including handshake and heartbeat setup - */ -class Device::ConnectReq -: public sscl::cps::NonPostedAsynchronousContinuation +namespace { + +constexpr uint8_t CMD_SET_GENERAL = 0x00; +constexpr uint8_t CMD_ID_HANDSHAKE_ACK = 0x01; +constexpr uint8_t CMD_ID_SAMPLING_RESPONSE = 0x04; +constexpr uint8_t CMD_SET_LIDAR = 0x01; +constexpr uint8_t CMD_ID_SET_RETURN_MODE_RESPONSE = 0x06; +constexpr uint8_t CMD_ID_GET_RETURN_MODE_RESPONSE = 0x07; +constexpr uint16_t LIVOX_COMMAND_PORT = 65000; + +using UdpCommandResponseResult = comms::UdpCommandResponseResult; + +comms::UdpCommandDemuxer *getUdpCommandDemuxer() { -private: - Device& device; - boost::asio::deadline_timer delayTimer; + auto &protoState = livoxProto1::getProtoState(); + if (!protoState.deviceManager) { + return nullptr; + } -public: - ConnectReq(Device& dev, sscl::cps::Callback cb) - : sscl::cps::NonPostedAsynchronousContinuation( - std::move(cb)), device(dev), - delayTimer(dev.componentThread->getIoService()) - {} + return &protoState.deviceManager->udpCommandDemuxer; +} +std::shared_ptr getCmdEndpointFdDesc() +{ + auto &protoState = livoxProto1::getProtoState(); + if (!protoState.deviceManager) { + return nullptr; + } + + return protoState.deviceManager->udpCommandDemuxer.getCmdEndpointFdDesc(); +} + +bool sendHandshakeRequest( + Device &device, + const std::string &deviceIP, + int cmdSocketFd) +{ + /** EXPLANATION: + * Prepare handshake request. + */ + comms::HandshakeRequest handshakeReq( + device.detectedSmoListeningIp, + device.dataPort, device.cmdPort, device.imuPort); + handshakeReq.swapContentsToProtocolEndianness(); + handshakeReq.header.setCrc16FromRawBytes(); + handshakeReq.header.swapCrc16ToProtocolEndianness(); + handshakeReq.footer.crc_32 = handshakeReq.calculateCrc32(); + handshakeReq.footer.swapCrc32ToProtocolEndianness(); + + struct sockaddr_in deviceAddr; + memset(&deviceAddr, 0, sizeof(deviceAddr)); + deviceAddr.sin_family = AF_INET; + deviceAddr.sin_addr.s_addr = inet_addr(deviceIP.c_str()); + deviceAddr.sin_port = htons(LIVOX_COMMAND_PORT); + + ssize_t bytesSent = sendto( + cmdSocketFd, + &handshakeReq, sizeof(comms::HandshakeRequest), 0, + (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); + + if (bytesSent < 0) + { + std::cerr << __func__ << ": Failed to send handshake request: " + << strerror(errno) << std::endl; + return false; + } + + return true; +} + +bool processHandshakeResponse( + const UdpCommandResponseResult &responseResult, + Device &device, + const std::string &deviceIP) +{ + if (responseResult.outcome == UdpCommandResponseResult::Outcome::Timeout) + { + std::cerr << __func__ << ": Command timeout with " + << deviceIP << "(" << device.discoveredDevice.deviceIdentifier + << ")" << std::endl; + return false; + } + + if (responseResult.outcome == UdpCommandResponseResult::Outcome::RecvError) + { + std::cerr << __func__ << ": Receive error during handshake with " + << deviceIP << std::endl; + return false; + } + + const ssize_t bytesReceived = responseResult.bytesReceived; + if (bytesReceived < static_cast(sizeof(comms::HandshakeResponse))) + { + std::cerr << __func__ << ": Response of size " << bytesReceived + << " too small from " << deviceIP << std::endl; + return false; + } + + comms::HandshakeResponse *resp = reinterpret_cast( + const_cast(responseResult.buffer)); + + /** EXPLANATION: + * Following the clean receiving flow: + * 1. Swap CRC32 to host endianness first + * 2. Validate CRC32 + * 3. Swap CRC16 to host endianness + * 4. Validate CRC16 + * 5. Swap content to host endianness + */ + resp->footer.swapCrc32ToHostEndianness(); + if (!resp->validateCrc32()) + { + std::cerr << __func__ << ": CRC32 validation failed from " + << deviceIP << std::endl; + return false; + } + resp->header.swapCrc16ToHostEndianness(); + if (!resp->header.validateCrc16()) + { + std::cerr << __func__ << ": CRC16 validation failed from " + << deviceIP << std::endl; + return false; + } + resp->swapContentsToHostEndianness(); + if (!resp->sanityCheck() || resp->ret_code != 0x00) + { + std::cerr << __func__ << ": Invalid response from " + << deviceIP << std::endl; + return false; + } + + if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose) + { + std::cout << __func__ << ": Handshake successful with " + << deviceIP << "(" + << device.discoveredDevice.deviceIdentifier + << ")" << "\n"; + } + + return true; +} + +bool sendEnDisablePcloudCommand( + Device &device, uint8_t enableFlag, const char *commandName) +{ + livoxProto1::comms::StartStopSamplingMessage message; + message.enable = enableFlag; + + message.swapContentsToProtocolEndianness(); + message.header.setCrc16FromRawBytes(); + message.header.swapCrc16ToProtocolEndianness(); + message.footer.crc_32 = message.calculateCrc32(); + message.footer.swapCrc32ToProtocolEndianness(); + + struct sockaddr_in deviceAddr; + memset(&deviceAddr, 0, sizeof(deviceAddr)); + deviceAddr.sin_family = AF_INET; + deviceAddr.sin_addr.s_addr = + inet_addr(device.discoveredDevice.ipAddr.c_str()); + deviceAddr.sin_port = htons(LIVOX_COMMAND_PORT); + + auto &protoState = livoxProto1::getProtoState(); + if (!protoState.deviceManager) + { + std::cerr << __func__ << ": No device manager available" << std::endl; + return false; + } + + auto cmdEndpointFdDesc = getCmdEndpointFdDesc(); + if (!cmdEndpointFdDesc) + { + std::cerr << __func__ << ": No command endpoint available" << std::endl; + return false; + } + + ssize_t bytesSent = sendto( + cmdEndpointFdDesc->native_handle(), + &message, sizeof(message), 0, + (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); + + if (bytesSent < 0) + { + std::cerr << __func__ << ": Failed to send " << commandName + << " command: " << strerror(errno) << std::endl; + return false; + } + + return true; +} + +bool processSamplingResponse( + const UdpCommandResponseResult &responseResult, + Device &device, + const char *failureContext) +{ + (void)failureContext; + + if (responseResult.outcome == UdpCommandResponseResult::Outcome::Timeout) + { + std::cerr << __func__ << ": Command timeout for device " + << device.discoveredDevice.deviceIdentifier + << std::endl; + return false; + } + + if (responseResult.outcome == UdpCommandResponseResult::Outcome::RecvError) + { + std::cerr + << __func__ << ": Receive error during command for device " + << device.discoveredDevice.deviceIdentifier + << std::endl; + return false; + } + + const ssize_t bytesReceived = responseResult.bytesReceived; + if (bytesReceived < static_cast(sizeof(comms::SamplingResponse))) + { + std::cerr << __func__ << ": Response of size " + << bytesReceived + << " is too small for sampling response (expected " + << sizeof(comms::SamplingResponse) << ")" + << std::endl; + return false; + } + + comms::SamplingResponse *response = + reinterpret_cast( + const_cast(responseResult.buffer)); + + response->swapContentsToHostEndianness(); + if (!response->sanityCheck()) + { + std::cerr << __func__ << ": Invalid sampling response structure.\n"; + return false; + } + + if (!(response->command.cmd_set == CMD_SET_GENERAL && + response->command.cmd_id == CMD_ID_SAMPLING_RESPONSE && + response->ret_code == 0x00)) + { + if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose) + { + std::cout << __func__ << ": Failed to en/disable pcloud data " + "for device " + "(" << device.discoveredDevice.deviceIdentifier + << ") @(" << device.discoveredDevice.ipAddr << "). " + << "cmd_set: " << (int)response->command.cmd_set + << ", cmd_id: " << (int)response->command.cmd_id + << ", ret_code: " << (int)response->ret_code << "\n"; + } + return false; + } + + return true; +} + +bool sendSetReturnModeCommand(Device &device, uint8_t returnMode) +{ + auto cmdEndpointFdDesc = getCmdEndpointFdDesc(); + if (!cmdEndpointFdDesc) + { + std::cerr << __func__ << ": No command endpoint available.\n"; + return false; + } + + comms::SetLiDARReturnMode setReturnModeMsg; + setReturnModeMsg.mode = returnMode; + setReturnModeMsg.swapContentsToProtocolEndianness(); + setReturnModeMsg.header.setCrc16FromRawBytes(); + setReturnModeMsg.header.swapCrc16ToProtocolEndianness(); + setReturnModeMsg.footer.crc_32 = setReturnModeMsg.calculateCrc32(); + setReturnModeMsg.footer.swapCrc32ToProtocolEndianness(); + + struct sockaddr_in deviceAddr; + deviceAddr.sin_family = AF_INET; + deviceAddr.sin_addr.s_addr = inet_addr( + device.discoveredDevice.ipAddr.c_str()); + deviceAddr.sin_port = htons(LIVOX_COMMAND_PORT); + + ssize_t bytesSent = sendto( + cmdEndpointFdDesc->native_handle(), + &setReturnModeMsg, sizeof(setReturnModeMsg), 0, + (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); + + if (bytesSent < 0) + { + std::cerr << __func__ << ": Failed to send set return mode message: " + << strerror(errno) << std::endl; + return false; + } + + std::cout << __func__ << ": Sent set return mode message to " + << device.discoveredDevice.ipAddr << ":" << LIVOX_COMMAND_PORT + << std::endl; + return true; +} + +bool processSetReturnModeResponse( + const UdpCommandResponseResult &responseResult, + Device &device, + uint8_t returnMode) +{ + if (responseResult.outcome == UdpCommandResponseResult::Outcome::Timeout) + { + std::cerr << __func__ << ": Set return mode timeout with " + << device.discoveredDevice.ipAddr << "(" + << device.discoveredDevice.deviceIdentifier << ")" << "\n"; + return false; + } + + if (responseResult.outcome == UdpCommandResponseResult::Outcome::RecvError) + { + std::cerr << __func__ << ": Receive error during set return mode with " + << device.discoveredDevice.ipAddr << "\n"; + return false; + } + + const ssize_t bytesReceived = responseResult.bytesReceived; + if (bytesReceived + < static_cast(sizeof(comms::SetLiDARReturnModeResponse))) + { + std::cerr << __func__ << ": Response of size " + << bytesReceived << " too small from " + << device.discoveredDevice.ipAddr << "\n"; + return false; + } + + comms::SetLiDARReturnModeResponse *response = + reinterpret_cast( + const_cast(responseResult.buffer)); + response->swapContentsToHostEndianness(); + + if (response->command.cmd_set != CMD_SET_LIDAR || + response->command.cmd_id != CMD_ID_SET_RETURN_MODE_RESPONSE || + response->ret_code != 0x00) + { + return false; + } + + device.currentReturnMode = Device::ReturnMode(returnMode); + return true; +} + +bool sendGetReturnModeCommand(Device &device) +{ + auto cmdEndpointFdDesc = getCmdEndpointFdDesc(); + if (!cmdEndpointFdDesc) + { + std::cerr << __func__ << ": No command endpoint available.\n"; + return false; + } + + comms::GetLiDARReturnMode getReturnModeMsg; + getReturnModeMsg.swapContentsToProtocolEndianness(); + getReturnModeMsg.header.setCrc16FromRawBytes(); + getReturnModeMsg.header.swapCrc16ToProtocolEndianness(); + getReturnModeMsg.footer.crc_32 = getReturnModeMsg.calculateCrc32(); + getReturnModeMsg.footer.swapCrc32ToProtocolEndianness(); + + struct sockaddr_in deviceAddr; + deviceAddr.sin_family = AF_INET; + deviceAddr.sin_addr.s_addr = inet_addr(device.discoveredDevice.ipAddr.c_str()); + deviceAddr.sin_port = htons(LIVOX_COMMAND_PORT); + + ssize_t bytesSent = sendto( + cmdEndpointFdDesc->native_handle(), + &getReturnModeMsg, sizeof(getReturnModeMsg), 0, + (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); + + if (bytesSent < 0) + { + std::cerr << __func__ << ": Failed to send get return mode message: " + << strerror(errno) << std::endl; + return false; + } + + std::cout << __func__ << ": Sent get return mode message to " + << device.discoveredDevice.ipAddr << ":" << LIVOX_COMMAND_PORT + << std::endl; + return true; +} + +bool processGetReturnModeResponse( + const UdpCommandResponseResult &responseResult, + Device &device, + uint8_t &outReturnMode) +{ + if (responseResult.outcome == UdpCommandResponseResult::Outcome::Timeout) + { + std::cerr << __func__ << ": Get return mode timeout with " + << device.discoveredDevice.ipAddr << "(" + << device.discoveredDevice.deviceIdentifier + << ")" << "\n"; + return false; + } + + if (responseResult.outcome == UdpCommandResponseResult::Outcome::RecvError) + { + std::cerr << __func__ << ": Receive error during get return mode with " + << device.discoveredDevice.ipAddr << std::endl; + return false; + } + + const ssize_t bytesReceived = responseResult.bytesReceived; + if (bytesReceived + < static_cast(sizeof(comms::GetLiDARReturnModeResponse))) + { + std::cerr << __func__ << ": Response of size " + << bytesReceived << " too small from " + << device.discoveredDevice.ipAddr << std::endl; + return false; + } + + comms::GetLiDARReturnModeResponse *response = + reinterpret_cast( + const_cast(responseResult.buffer)); + response->swapContentsToHostEndianness(); + + if (!(response->command.cmd_set == CMD_SET_LIDAR && + response->command.cmd_id == CMD_ID_GET_RETURN_MODE_RESPONSE && + response->ret_code == 0x00)) + { + return false; + } + + device.currentReturnMode = Device::ReturnMode(response->mode); + outReturnMode = response->mode; + return true; +} + +} // namespace + +sscl::co::ViralNonPostingInvoker Device::connectCReq() +{ /** FIXME: * WE need to assign the ipAddr to the Device being connected up. */ - // Callback methods for the connection sequence - void connectReq1( - std::shared_ptr context, - bool success, const std::string& ipAddr - ) - { - // Fail early - if handshake failed, try next method - if (!success) - { - if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose) - { - std::cout << __func__ << ": Trying to connect to device by " - << "identifier" << "\n"; - } - - // Try direct connect by device identifier - context->device.connectByDeviceIdentifierReq( - {context, std::bind(&ConnectReq::connectReq2, - context.get(), context, - std::placeholders::_1, std::placeholders::_2)}); - - return; - } - - // Success - store connection info and proceed to next step - context->device.discoveredDevice.ipAddr = ipAddr; - context->device.startHeartbeat(); - - context->connectReq3(context, success); - } - - void connectReq2( - std::shared_ptr context, - bool success, const std::string& ipAddr - ) - { - // Fail early - if this also failed, all connection attempts failed - if (!success) - { - context->callOriginalCallbackWithFailure(); - return; - } - - // Success - store connection info and proceed to next step - context->device.discoveredDevice.ipAddr = ipAddr; - context->device.startHeartbeat(); - - context->connectReq3(context, success); - } - - void connectReq3( - std::shared_ptr context, bool success - ) - { - if (!success) - { - std::cerr << __func__ << ": Failed to connect to device " - "(" << context->device.discoveredDevice.deviceIdentifier - << ") @(" << context->device.discoveredDevice.ipAddr << ").\n"; - context->callOriginalCallbackWithFailure(); - return; - } - - context->callOriginalCallback(success); - } - - // Public accessor for the original callback - void callOriginalCallback(bool success) - { callOriginalCb(success); } - - // Wrapper for failure cases - void callOriginalCallbackWithFailure() - { callOriginalCallback(false); } -}; - -void Device::connectReq(sscl::cps::Callback callback) -{ - // Create the connection request object to hold state and callbacks - auto request = std::make_shared(*this, std::move(callback)); - - // Try connecting to known device first if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose) { std::cout << __func__ << ": Trying to connect to known device" << "\n"; } - connectToKnownDeviceReq( - {request, std::bind( - &ConnectReq::connectReq1, request.get(), request, - std::placeholders::_1, std::placeholders::_2)}); + Device::ConnectIpResult knownResult = co_await connectToKnownDeviceCReq(); + if (!knownResult.success) + { + if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose) + { + std::cout << __func__ << ": Trying to connect to device by " + << "identifier" << "\n"; + } + + Device::ConnectIpResult idResult = co_await + connectByDeviceIdentifierCReq(); + + if (!idResult.success) + { co_return false; } + + discoveredDevice.ipAddr = idResult.ipAddr; + startHeartbeat(); + } + else + { + discoveredDevice.ipAddr = knownResult.ipAddr; + startHeartbeat(); + } + + const bool success = true; + if (!success) + { + std::cerr << __func__ << ": Failed to connect to device " + "(" << discoveredDevice.deviceIdentifier + << ") @(" << discoveredDevice.ipAddr << ").\n"; + co_return false; + } + + co_return success; } -class Device::ConnectToKnownDeviceReq -: public sscl::cps::NonPostedAsynchronousContinuation< - Device::connectToKnownDeviceReqCbFn> +sscl::co::ViralNonPostingInvoker +Device::connectToKnownDeviceCReq() { -public: - Device& device; - std::string deviceIP; - std::shared_ptr deviceInfo; + /** EXPLANATION: + * This function is used to connect to a device that is already known to the + * broadcastListener. + */ + Device::ConnectIpResult result; - ConnectToKnownDeviceReq(Device& dev, sscl::cps::Callback cb) - : sscl::cps::NonPostedAsynchronousContinuation< - Device::connectToKnownDeviceReqCbFn>(std::move(cb)), device(dev) - {} - - // Public accessor for the original callback - void callOriginalCallback(bool success, const std::string& ipAddr) - { callOriginalCb(success, ipAddr); } - - // Wrapper for failure cases - void callOriginalCallbackWithFailure() - { callOriginalCallback(false, ""); } - - // Callback methods for the connection sequence - void connectToKnownDeviceReq1( - std::shared_ptr context, bool success - ) - { - // Return the IP address to the caller - context->callOriginalCallback(success, context->deviceIP); - } -}; - -/** EXPLANATION: - * This function is used to connect to a device that is already known to the - * broadcastListener. - */ -void Device::connectToKnownDeviceReq( - sscl::cps::Callback callback - ) -{ - // Create the connection request object to hold state and callbacks - auto request = std::make_shared( - *this, std::move(callback)); - - auto& protoState = livoxProto1::getProtoState(); + auto &protoState = livoxProto1::getProtoState(); if (!protoState.deviceManager) { - request->callOriginalCallbackWithFailure(); - return; + co_return result; } - // Check if the device is known to the broadcastListener if (!protoState.deviceManager->broadcastListener.deviceExists( - request->device.discoveredDevice.deviceIdentifier)) + discoveredDevice.deviceIdentifier)) { - request->callOriginalCallbackWithFailure(); - return; + co_return result; } - request->deviceInfo = protoState.deviceManager->broadcastListener.getDevice( - request->device.discoveredDevice.deviceIdentifier); - if (!request->deviceInfo) + std::shared_ptr deviceInfo = + protoState.deviceManager->broadcastListener.getDevice( + discoveredDevice.deviceIdentifier); + if (!deviceInfo) { - request->callOriginalCallbackWithFailure(); - return; + co_return result; } - // Use the IP address from the broadcast message - request->deviceIP = request->deviceInfo->ipAddr; + const std::string deviceIP = deviceInfo->ipAddr; - // Determine the final listening IP address - auto smoIpResult = request->device.getSmoIp(request->deviceIP); + auto smoIpResult = getSmoIp(deviceIP); if (!smoIpResult.has_value()) { - // Auto-detection failed, fail early std::cerr << __func__ << ": Failed to detect SMO listening IP for " << "known device (" - << request->device.discoveredDevice.deviceIdentifier << ")" - << " @(" << request->deviceIP << ").\n"; + << discoveredDevice.deviceIdentifier << ")" + << " @(" << deviceIP << ").\n"; - request->callOriginalCallbackWithFailure(); - return; + co_return result; } if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose) { std::cout << __func__ << ": Detected SMO listening IP for known device " - << request->device.discoveredDevice.deviceIdentifier - << " @(" << request->deviceIP << ") is " + << discoveredDevice.deviceIdentifier + << " @(" << deviceIP << ") is " << smoIpResult.value() << ". About to try to handshake.\n"; } - request->device.detectedSmoListeningIp = smoIpResult.value(); + detectedSmoListeningIp = smoIpResult.value(); - // Execute handshake with the known device using async method - request->device.executeHandshakeReq( - request->deviceIP, - {request, std::bind( - &ConnectToKnownDeviceReq::connectToKnownDeviceReq1, - request.get(), request, - std::placeholders::_1)}); + const bool handshakeSuccess = co_await executeHandshakeCReq(deviceIP); + result.success = handshakeSuccess; + result.ipAddr = deviceIP; + co_return result; } -class Device::ConnectByDeviceIdentifierReq -: public sscl::cps::NonPostedAsynchronousContinuation< - Device::connectByDeviceIdentifierReqCbFn> -{ -public: - Device& device; - std::string deviceIP; - - ConnectByDeviceIdentifierReq( - Device& dev, sscl::cps::Callback cb) - : sscl::cps::NonPostedAsynchronousContinuation< - Device::connectByDeviceIdentifierReqCbFn>( - std::move(cb)), device(dev) - {} - - // Public accessor for the original callback - void callOriginalCallback(bool success, const std::string& ipAddr) - { callOriginalCb(success, ipAddr); } - - // Wrapper for failure cases - void callOriginalCallbackWithFailure() - { callOriginalCallback(false, ""); } - - // Callback methods for the connection sequence - void connectByDeviceIdentifierReq1( - std::shared_ptr context, - bool success - ) - { - // Return the IP address to the caller - context->callOriginalCallback(success, context->deviceIP); - } -}; - -void Device::connectByDeviceIdentifierReq( - sscl::cps::Callback callback - ) +sscl::co::ViralNonPostingInvoker +Device::connectByDeviceIdentifierCReq() { /** EXPLANATION: * This method uses heuristic device IP construction from the serial number. * This requires smoIp to be provided because: * 1. We need the network prefix to generate a valid device IP address - * 2. Without a target device IP, we cannot detect which interface faces the device + * 2. Without a target device subnet, we cannot detect which interface faces the device * 3. Therefore, if smoIp is omitted, heuristic construction is impossible * * If smoIp is not provided, the driver must rely only on broadcast advertisements * from the device (handled by connectToKnownDeviceReq). */ + Device::ConnectIpResult result; - // Check if smoIp is provided - required for heuristic construction - if (smoIp.empty()) - { - callback.callbackFn(false, ""); - return; - } + if (smoIp.empty()) { co_return result; } - // Create the connection request object to hold state and callbacks - auto request = std::make_shared( - *this, std::move(callback)); + const std::string deviceIP = generateClientDeviceIpFromSerialNumber( + discoveredDevice.deviceIdentifier); - // Generate device IP from serial number - request->deviceIP = generateClientDeviceIpFromSerialNumber( - request->device.discoveredDevice.deviceIdentifier); - - // For heuristic construction, always use the provided smoIp. - request->device.detectedSmoListeningIp = request->device.smoIp; + detectedSmoListeningIp = smoIp; if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose) { @@ -409,172 +682,49 @@ void Device::connectByDeviceIdentifierReq( << " at IP (" << smoIp << ").\n"; } - // Execute handshake using async method - request->device.executeHandshakeReq( - request->deviceIP, - {request, std::bind( - &ConnectByDeviceIdentifierReq::connectByDeviceIdentifierReq1, - request.get(), request, - std::placeholders::_1)}); + const bool handshakeSuccess = co_await executeHandshakeCReq(deviceIP); + result.success = handshakeSuccess; + result.ipAddr = deviceIP; + co_return result; } -class Device::ExecuteHandshakeReq -: public sscl::cps::NonPostedAsynchronousContinuation< - Device::executeHandshakeReqCbFn> +sscl::co::ViralNonPostingInvoker Device::executeHandshakeCReq( + const std::string &deviceIP) { -public: - friend void Device::executeHandshakeReq( - const std::string& deviceIP, - sscl::cps::Callback callback); + auto &protoState = livoxProto1::getProtoState(); + if (!protoState.deviceManager) + { co_return false; } - enum class SocketState - { - SOCKET_STILL_WAITING = 0, - SOCKET_ERROR, - SOCKET_RECV_SUCCESS, - SOCKET_RECV_ERROR - }; - -public: - Device& device; - std::string deviceIP; - - // Atomic state flags for async coordination - std::atomic timerFired{false}; - std::atomic socketState{SocketState::SOCKET_STILL_WAITING}; - std::atomic handlerExecuted{false}; - - // Cmd fd desc. Will be returned to caller (shared with UdpCommandDemuxer) - std::shared_ptr cmdEndpointFdDesc; - boost::asio::deadline_timer timeoutTimer; - - // Received data storage - uint8_t responseBuffer[1024]{}; - ssize_t bytesReceived = -1; - struct sockaddr_in senderAddr; - socklen_t senderAddrLen = sizeof(senderAddr); - -public: - ExecuteHandshakeReq( - Device& dev, const std::string& deviceIP, - std::shared_ptr - &cmdEndpointFdDesc, - sscl::cps::Callback cb) - : sscl::cps::NonPostedAsynchronousContinuation( - std::move(cb)), - device(dev), deviceIP(deviceIP), - cmdEndpointFdDesc(cmdEndpointFdDesc), - timeoutTimer(device.componentThread->getIoService()) + auto cmdEndpointFdDesc = getCmdEndpointFdDesc(); + if (!cmdEndpointFdDesc) { + std::cerr << __func__ << ": UdpCommandDemuxer not started or no " + "command endpoint available." << std::endl; + co_return false; } - ~ExecuteHandshakeReq() + if (detectedSmoListeningIp.empty()) { co_return false; } + + if (!cmdEndpointFdDesc->is_open()) { - cleanup(); + throw std::runtime_error( + std::string(__func__) + + ": cmdEndpointFdDesc is null; cannot set up async callbacks " + "for device " + deviceIP + "(" + + discoveredDevice.deviceIdentifier + ")" + " handshake." + "Check UdpCommandDemuxer initialization." + ); } - // Public accessor for the original callback - void callOriginalCallback(bool success) - { callOriginalCb(success); } - - void callOriginalCallbackWithFailure() - { - /** EXPLANATION: - * We have to call cleanupHandshakeSocket() here, specifically because - * there are 5 references we need to clean up, and 2 of them are - * actually recursive self-references within this class. - * - * The timer and the handshakeFdDesc are both given recursive - * self-referencing shared_ptr's to this class when we eventually set up - * the async callbacks for them. - * - * So merely letting the async continuation sequences go out of scope - * won't cause this class to be destroyed. Rather, since the class - * references itelf, we have to first break those references. Then and - * only then will the shared_ptr's refcount go to 0 and the class will - * be destroyed. - * - * We always unconditionally break the timer's reference inside of - * the branch-unifying segment (executeHandshakeReq2), but we generally - * only want to break the handshakeFdDesc's reference at the exact - * moment when the sequence fails, because if it succeeds, we want to - * commit the FD to the Device object being created (for heartbeats). - * - * Hence, we call cleanupHandshakeSocket() at the point of failure. - * To break the self-reference when the sequence is successful, we - * manually do that at the end of the sequence. - */ - cleanupHandshakeSocket(); - callOriginalCallback(false); - } - -private: - - bool sendHandshakeRequest() - { - /** EXPLANATION: - * Prepare handshake request. - */ - comms::HandshakeRequest handshakeReq( - device.detectedSmoListeningIp, - device.dataPort, device.cmdPort, device.imuPort); - handshakeReq.swapContentsToProtocolEndianness(); - handshakeReq.header.setCrc16FromRawBytes(); - handshakeReq.header.swapCrc16ToProtocolEndianness(); - handshakeReq.footer.crc_32 = handshakeReq.calculateCrc32(); - handshakeReq.footer.swapCrc32ToProtocolEndianness(); - - // Prepare device endpoint - struct sockaddr_in deviceAddr; - memset(&deviceAddr, 0, sizeof(deviceAddr)); - deviceAddr.sin_family = AF_INET; - deviceAddr.sin_addr.s_addr = inet_addr(deviceIP.c_str()); - deviceAddr.sin_port = htons(65000); - - // Send handshake request directly (synchronous) - ssize_t bytesSent = sendto( - cmdEndpointFdDesc->native_handle(), - &handshakeReq, sizeof(comms::HandshakeRequest), 0, - (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); - - if (bytesSent < 0) + try { + if (!sendHandshakeRequest( + *this, deviceIP, cmdEndpointFdDesc->native_handle())) { - std::cerr << __func__ << ": Failed to send handshake request: " - << strerror(errno) << std::endl; - return false; + co_return false; } - return true; - } - - void setupAsyncCallbacks( - const std::shared_ptr &request - ) - { - if (!cmdEndpointFdDesc || !cmdEndpointFdDesc->is_open()) - { - throw std::runtime_error( - std::string(__func__) + - ": cmdEndpointFdDesc is null; cannot set up async callbacks " - "for device " + deviceIP + "(" - + device.discoveredDevice.deviceIdentifier + ")" + " handshake." - "Check UdpCommandDemuxer initialization." - ); - } - - /** EXPLANATION: - * We setup an async timer event to detect timeout, and register a UDP - * command handler to wait for the device to respond to the handshake - * request. If the device does not respond within the timeout period, - * we will consider the handshake to have failed. - */ - timeoutTimer.expires_from_now( - boost::posix_time::milliseconds(device.commandTimeoutMs)); - - timeoutTimer.async_wait( - std::bind( - &ExecuteHandshakeReq::executeHandshakeReq1_1, this, request, - std::placeholders::_1)); + comms::UdpCommandDemuxer *demuxer = getUdpCommandDemuxer(); + if (!demuxer) { co_return false; } /** EXPLANATION: * Register a UDP command handler for handshake ACK @@ -582,261 +732,39 @@ private: * The handler will be called by the UdpCommandDemuxer when a handshake * response is received. */ - // Add device to temporary collection for devices under construction - device.registerUdpCommandHandler( - 0x00, 0x01, - std::bind(&ExecuteHandshakeReq::executeHandshakeReq1_2, - this, request, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3), - deviceIP); - } + const UdpCommandResponseResult responseResult = co_await + demuxer->waitForCommandResponseCReq( + CMD_SET_GENERAL, CMD_ID_HANDSHAKE_ACK, + deviceIP, + commandTimeoutMs); - void executeHandshakeReq1_1( - std::shared_ptr, - const boost::system::error_code& error) - { - // This is called from the timer callback - if (!error) // Timeout occurred (not cancelled) - { - timerFired.store(true); - executeHandshakeReq2(); - } - } - - void executeHandshakeReq1_2( - std::shared_ptr context, - const uint8_t* data, ssize_t bytesReceived, - const struct sockaddr_in& senderAddr - ) - { - // Store the received data - context->bytesReceived = bytesReceived; - context->senderAddr = senderAddr; - context->senderAddrLen = sizeof(senderAddr); - - // Copy the data to our buffer - if (bytesReceived > 0 - && bytesReceived <= (ssize_t)sizeof(context->responseBuffer)) - { - memcpy(context->responseBuffer, data, bytesReceived); - context->socketState.store(SocketState::SOCKET_RECV_SUCCESS); - } else - { - context->socketState.store(SocketState::SOCKET_RECV_ERROR); - std::cerr << __func__ << ": Invalid data size: " << bytesReceived - << std::endl; - } - - executeHandshakeReq2(); - } - - void executeHandshakeReq2() - { - // Ensure we only execute once using atomic exchange - if (handlerExecuted.exchange(true) == true) { return; } - - // Cancel timer if still running - timeoutTimer.cancel(); - device.unregisterUdpCommandHandler(0x00, 0x01, deviceIP); - - // Examine the flags and decide what happened - SocketState finalSocketState = socketState.load(); - bool finalTimerFired = timerFired.load(); - - // Check for timeout only if there was no socket activity - if (finalTimerFired - && finalSocketState == SocketState::SOCKET_STILL_WAITING) - { - std::cerr << __func__ << ": Command timeout with " - << deviceIP << "(" << device.discoveredDevice.deviceIdentifier - << ")" << std::endl; - - callOriginalCallbackWithFailure(); - return; - } - - // Socket error from boost::asio - if (finalSocketState == SocketState::SOCKET_ERROR) - { - std::cerr << __func__ << ": Socket error during handshake with " - << deviceIP << std::endl; - callOriginalCallbackWithFailure(); - return; - } - - if (finalSocketState == SocketState::SOCKET_RECV_ERROR) - { - std::cerr << __func__ << ": Receive error during handshake with " - << deviceIP << std::endl; - callOriginalCallbackWithFailure(); - return; - } - - /* Result must have been RECV_SUCCESS state if we reach here. - * Data was already read in the async callback, just validate it - */ - if (bytesReceived < (ssize_t)sizeof(comms::HandshakeResponse)) - { - std::cerr << __func__ << ": Response of size " << bytesReceived - << " too small from " << deviceIP << std::endl; - callOriginalCallbackWithFailure(); - return; - } - - comms::HandshakeResponse* resp = - reinterpret_cast(responseBuffer); - - /** EXPLANATION: - * Following the clean receiving flow: - * 1. Swap CRC32 to host endianness first - * 2. Validate CRC32 - * 3. Swap CRC16 to host endianness - * 4. Validate CRC16 - * 5. Swap content to host endianness - */ - resp->footer.swapCrc32ToHostEndianness(); - if (!resp->validateCrc32()) - { - std::cerr << __func__ << ": CRC32 validation failed from " - << deviceIP << std::endl; - callOriginalCallbackWithFailure(); - return; - } - resp->header.swapCrc16ToHostEndianness(); - if (!resp->header.validateCrc16()) - { - std::cerr << __func__ << ": CRC16 validation failed from " - << deviceIP << std::endl; - callOriginalCallbackWithFailure(); - return; - } - resp->swapContentsToHostEndianness(); - if (!resp->sanityCheck() || resp->ret_code != 0x00) - { - std::cerr << __func__ << ": Invalid response from " - << deviceIP << std::endl; - callOriginalCallbackWithFailure(); - return; - } - - if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose) - { - std::cout << __func__ << ": Handshake successful with " - << deviceIP << "(" - << device.discoveredDevice.deviceIdentifier - << ")" << "\n"; - } - - // Transfer any successful state to Device - commit(); - callOriginalCallback(true); - } - - void commit() // Transfer successful state to Device object - { - // Clean up resources (timer) but not the socket FD - // The socket FD is returned to the caller, not transferred to the device - timeoutTimer.cancel(); - } - - void cleanupHandshakeSocket() - { - // Obsolete - socket is managed by shared_ptr in UdpCommandDemuxer - } - void cleanup() // Clean up transient resources - { - timeoutTimer.cancel(); - cleanupHandshakeSocket(); - } -}; - -void Device::executeHandshakeReq( - const std::string& deviceIP, - sscl::cps::Callback callback - ) -{ - // Get the command endpoint from the UdpCommandDemuxer - auto& protoState = livoxProto1::getProtoState(); - if (!protoState.deviceManager) - { - callback.callbackFn(false); - return; - } - - auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer - .getCmdEndpointFdDesc(); - - if (!cmdEndpointFdDesc) - { - std::cerr << __func__ << ": UdpCommandDemuxer not started or no " - "command endpoint available." << std::endl; - callback.callbackFn(false); - return; - } - - // Create the handshake request object to hold state and callbacks - auto request = std::make_shared( - *this, deviceIP, cmdEndpointFdDesc, std::move(callback)); - - // Check if detectedSmoListeningIp is empty - this should not happen - if (detectedSmoListeningIp.empty()) - { - // This should not happen as it should be set by the calling method - request->callOriginalCallbackWithFailure(); - return; - } - - try { - if (!request->sendHandshakeRequest()) - { - request->callOriginalCallbackWithFailure(); - return; - } - - request->setupAsyncCallbacks(request); - - } catch (const std::exception& e) { + co_return processHandshakeResponse(responseResult, *this, deviceIP); + } catch (const std::exception &e) { std::cerr << __func__ << ": Handshake failed with " << deviceIP << ": " << e.what() << std::endl; - request->callOriginalCallbackWithFailure(); + co_return false; } } -void Device::disconnectReq(sscl::cps::Callback callback) +sscl::co::ViralNonPostingInvoker Device::disconnectCReq() { - // Stop heartbeat first stopHeartbeat(); if (discoveredDevice.ipAddr.empty()) { std::cout << __func__ << ": No device IP available, skipping " "disconnect message" << std::endl; - callback.callbackFn(true); - return; + co_return true; } - // Get the command endpoint from the UdpCommandDemuxer - auto& protoState = livoxProto1::getProtoState(); - if (!protoState.deviceManager) - { - std::cout << __func__ << ": No device manager available, skipping " - "disconnect message" << std::endl; - callback.callbackFn(true); - return; - } - - auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer - .getCmdEndpointFdDesc(); + auto cmdEndpointFdDesc = getCmdEndpointFdDesc(); if (!cmdEndpointFdDesc) { std::cout << __func__ << ": No command endpoint available, skipping " "disconnect message" << std::endl; - callback.callbackFn(true); - return; + co_return true; } - // Create disconnect message comms::DisconnectMessage disconnectMsg; disconnectMsg.swapContentsToProtocolEndianness(); disconnectMsg.header.setCrc16FromRawBytes(); @@ -844,13 +772,11 @@ void Device::disconnectReq(sscl::cps::Callback callba disconnectMsg.footer.crc_32 = disconnectMsg.calculateCrc32(); disconnectMsg.footer.swapCrc32ToProtocolEndianness(); - // Set up destination address struct sockaddr_in deviceAddr; deviceAddr.sin_family = AF_INET; deviceAddr.sin_addr.s_addr = inet_addr(discoveredDevice.ipAddr.c_str()); - deviceAddr.sin_port = htons(65000); // Commands go to port 65000 + deviceAddr.sin_port = htons(LIVOX_COMMAND_PORT); - // Send disconnect message ssize_t bytesSent = sendto( cmdEndpointFdDesc->native_handle(), &disconnectMsg, sizeof(disconnectMsg), 0, @@ -860,13 +786,136 @@ void Device::disconnectReq(sscl::cps::Callback callba { std::cerr << __func__ << ": Failed to send disconnect message: " << strerror(errno) << std::endl; - // Continue with disconnect even if message send fails } std::cout << __func__ << ": Sent disconnect message to " - << discoveredDevice.ipAddr << ":" << 65000 << std::endl; + << discoveredDevice.ipAddr << ":" << LIVOX_COMMAND_PORT << std::endl; - callback.callbackFn(true); + co_return true; +} + +sscl::co::ViralNonPostingInvoker Device::enablePcloudDataCReq() +{ + if (discoveredDevice.ipAddr.empty()) + { + std::cerr << __func__ << ": No device IP available for device " + << discoveredDevice.deviceIdentifier << std::endl; + co_return false; + } + + if (!sendEnDisablePcloudCommand(*this, 0x01, "enable pcloud data")) + { co_return false; } + + comms::UdpCommandDemuxer *demuxer = getUdpCommandDemuxer(); + if (!demuxer) + { co_return false; } + + const UdpCommandResponseResult responseResult = co_await + demuxer->waitForCommandResponseCReq( + CMD_SET_GENERAL, CMD_ID_SAMPLING_RESPONSE, + discoveredDevice.ipAddr, + commandTimeoutMs); + + if (!processSamplingResponse(responseResult, *this, "enable")) + { co_return false; } + + pcloudDataActive.store(true); + co_return true; +} + +sscl::co::ViralNonPostingInvoker Device::disablePcloudDataCReq() +{ + if (discoveredDevice.ipAddr.empty()) + { + std::cerr << __func__ << ": No device IP available for device " + << discoveredDevice.deviceIdentifier << std::endl; + co_return false; + } + + /* Unconditionally close the pcloud data socket early since there's no good + * reason to only close it if the command packet succeeds and ACKs. We want + * to stop receiving data immediately when disable is requested. + */ + cleanupPcloudDataSocket(); + + if (!sendEnDisablePcloudCommand(*this, 0x00, "disable pcloud data")) + { co_return false; } + + comms::UdpCommandDemuxer *demuxer = getUdpCommandDemuxer(); + if (!demuxer) + { co_return false; } + + const UdpCommandResponseResult responseResult = co_await + demuxer->waitForCommandResponseCReq( + CMD_SET_GENERAL, + CMD_ID_SAMPLING_RESPONSE, + discoveredDevice.ipAddr, + commandTimeoutMs); + + if (!processSamplingResponse(responseResult, *this, "disable")) + { co_return false; } + + pcloudDataActive.store(false); + co_return true; +} + +sscl::co::ViralNonPostingInvoker +Device::setReturnModeCReq(uint8_t returnMode) +{ + if (discoveredDevice.ipAddr.empty()) + { + std::cerr << __func__ << ": No device IP available for device " + << discoveredDevice.deviceIdentifier << std::endl; + co_return false; + } + + if (!sendSetReturnModeCommand(*this, returnMode)) { co_return false; } + + comms::UdpCommandDemuxer *demuxer = getUdpCommandDemuxer(); + if (!demuxer) { co_return false; } + + const UdpCommandResponseResult responseResult = co_await + demuxer->waitForCommandResponseCReq( + CMD_SET_LIDAR, + CMD_ID_SET_RETURN_MODE_RESPONSE, + discoveredDevice.ipAddr, + commandTimeoutMs); + + co_return processSetReturnModeResponse( + responseResult, *this, returnMode); +} + +sscl::co::ViralNonPostingInvoker +Device::getReturnModeCReq() +{ + Device::GetReturnModeResult result; + + if (discoveredDevice.ipAddr.empty()) + { + std::cerr << __func__ << ": No device IP available for device " + << discoveredDevice.deviceIdentifier << std::endl; + co_return result; + } + + if (!sendGetReturnModeCommand(*this)) { co_return result; } + + comms::UdpCommandDemuxer *demuxer = getUdpCommandDemuxer(); + if (!demuxer) { co_return result; } + + const UdpCommandResponseResult responseResult = co_await + demuxer->waitForCommandResponseCReq( + CMD_SET_LIDAR, + CMD_ID_GET_RETURN_MODE_RESPONSE, + discoveredDevice.ipAddr, + commandTimeoutMs); + + uint8_t returnMode = 0; + if (!processGetReturnModeResponse(responseResult, *this, returnMode)) + { co_return result; } + + result.success = true; + result.returnMode = returnMode; + co_return result; } std::string Device::generateClientDeviceIpFromSerialNumber( @@ -1328,412 +1377,6 @@ std::optional Device::getSmoIp(const std::string& deviceIP) return std::nullopt; } -// Base class for both enable and disable pcloud data requests -template -class EnDisablePcloudDataReq -: public sscl::cps::NonPostedAsynchronousContinuation -{ -public: - enum class SocketState - { - SOCKET_STILL_WAITING = 0, - SOCKET_ERROR, - SOCKET_RECV_SUCCESS, - SOCKET_RECV_ERROR - }; - -public: - Device& device; - - // Atomic state flags for async coordination - std::atomic timerFired{false}; - std::atomic socketState{SocketState::SOCKET_STILL_WAITING}; - std::atomic handlerExecuted{false}; - - // The timeout timer. - boost::asio::deadline_timer timeoutTimer; - - // Received data storage - uint8_t responseBuffer[1024]{}; - ssize_t bytesReceived = -1; - struct sockaddr_in senderAddr; - socklen_t senderAddrLen = sizeof(senderAddr); - -protected: - EnDisablePcloudDataReq( - Device& dev, - sscl::cps::Callback cb) - : sscl::cps::NonPostedAsynchronousContinuation(std::move(cb)), - device(dev), - timeoutTimer(device.componentThread->getIoService()) - {} - -public: - virtual ~EnDisablePcloudDataReq() - { - cleanup(); - } - - // Public accessor for the original callback - void callOriginalCallback(bool success) - { this->callOriginalCb(success); } - - void callOriginalCallbackWithFailure() - { - callOriginalCallback(false); - } - -protected: - - void setupAsyncCallbacks( - const std::shared_ptr> &request - ) - { - // Setup timeout timer - timeoutTimer.expires_from_now( - boost::posix_time::milliseconds(device.commandTimeoutMs)); - - timeoutTimer.async_wait( - std::bind( - &EnDisablePcloudDataReq::enDisablePcloudDataReq1_1, - this, request, - std::placeholders::_1)); - - // Register UDP command handler for sampling response (cmd_set=0x00, cmd_id=0x04) - device.registerUdpCommandHandler( - 0x00, 0x04, - std::bind( - &EnDisablePcloudDataReq::enDisablePcloudDataReq1_2, - this, request, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3), - device.discoveredDevice.ipAddr); - } - - void enDisablePcloudDataReq1_1( - std::shared_ptr> context, - const boost::system::error_code& error - ) - { - (void)error; // Suppress unused parameter warning - context->timerFired = true; - context->enDisablePcloudDataReq2(context); - } - - void enDisablePcloudDataReq1_2( - std::shared_ptr> context, - const uint8_t* data, ssize_t bytesReceived, - const struct sockaddr_in& senderAddr - ) - { - // Store the received data - context->bytesReceived = bytesReceived; - context->senderAddr = senderAddr; - context->senderAddrLen = sizeof(senderAddr); - - // Copy the data to our buffer - if (bytesReceived > 0 - && bytesReceived <= (ssize_t)sizeof(context->responseBuffer)) - { - memcpy(context->responseBuffer, data, bytesReceived); - context->socketState = SocketState::SOCKET_RECV_SUCCESS; - } else - { - context->socketState = SocketState::SOCKET_RECV_ERROR; - std::cerr << __func__ << ": Invalid data size: " << bytesReceived - << std::endl; - } - - context->enDisablePcloudDataReq2(context); - } - - void enDisablePcloudDataReq2( - std::shared_ptr> context - ) - { - // Only execute once - if (context->handlerExecuted.exchange(true)) { return; } - - context->timeoutTimer.cancel(); - device.unregisterUdpCommandHandler( - 0x00, 0x04, device.discoveredDevice.ipAddr); - - SocketState finalSocketState = context->socketState.load(); - bool finalTimerFired = context->timerFired.load(); - - if (finalTimerFired && - finalSocketState == SocketState::SOCKET_STILL_WAITING) - { - std::cerr << __func__ << ": Command timeout for device " - << context->device.discoveredDevice.deviceIdentifier - << std::endl; - context->callOriginalCallbackWithFailure(); - return; - } - - if (finalSocketState == SocketState::SOCKET_ERROR) - { - std::cerr << __func__ << ": Socket error during command for device " - << context->device.discoveredDevice.deviceIdentifier - << std::endl; - context->callOriginalCallbackWithFailure(); - return; - } - - if (finalSocketState == SocketState::SOCKET_RECV_ERROR) - { - std::cerr - << __func__ << ": Receive error during command for device " - << context->device.discoveredDevice.deviceIdentifier - << std::endl; - context->callOriginalCallbackWithFailure(); - return; - } - - // Result must have been RECV_SUCCESS state if we reach here - if (context->bytesReceived - < (ssize_t)sizeof(livoxProto1::comms::SamplingResponse)) - { - std::cerr << __func__ << ": Response of size " - << context->bytesReceived - << " is too small for sampling response (expected " - << sizeof(livoxProto1::comms::SamplingResponse) << ")" - << std::endl; - context->callOriginalCallbackWithFailure(); - return; - } - - // Parse response using protocol structure - livoxProto1::comms::SamplingResponse* response = - reinterpret_cast( - context->responseBuffer); - - response->swapContentsToHostEndianness(); - if (!response->sanityCheck()) - { - std::cerr << __func__ << ": Invalid sampling response structure.\n"; - context->callOriginalCallbackWithFailure(); - return; - } - - // Check for error first, return early on failure - if (!(response->command.cmd_set == 0x00 && - response->command.cmd_id == 0x04 && - response->ret_code == 0x00)) - { - if (getProtoState().smoCallbacks.OptionParser_getOptions().verbose) - { - std::cout << __func__ << ": Failed to en/disable pcloud data " - "for device " - "(" << context->device.discoveredDevice.deviceIdentifier - << ") @(" << context->device.discoveredDevice.ipAddr << "). " - << "cmd_set: " << (int)response->command.cmd_set - << ", cmd_id: " << (int)response->command.cmd_id - << ", ret_code: " << (int)response->ret_code << "\n"; - } - context->callOriginalCallbackWithFailure(); - return; - } - - context->setPcloudDataActiveState(); - context->callOriginalCallback(true); - } - - void cleanup() - { - timeoutTimer.cancel(); - } - - // Pure virtual methods that derived classes must implement - virtual uint8_t getEnableFlag() const = 0; - virtual const char* getCommandName() const = 0; - virtual void setPcloudDataActiveState() = 0; - - // Common sendCommand implementation - bool sendCommand() - { - // Create start/stop sampling message using protocol structure - livoxProto1::comms::StartStopSamplingMessage message; - // Set enable flag based on derived class implementation - message.enable = getEnableFlag(); - - message.swapContentsToProtocolEndianness(); - message.header.setCrc16FromRawBytes(); - message.header.swapCrc16ToProtocolEndianness(); - message.footer.crc_32 = message.calculateCrc32(); - message.footer.swapCrc32ToProtocolEndianness(); - - struct sockaddr_in deviceAddr; - memset(&deviceAddr, 0, sizeof(deviceAddr)); - deviceAddr.sin_family = AF_INET; - deviceAddr.sin_addr.s_addr = - inet_addr(device.discoveredDevice.ipAddr.c_str()); - deviceAddr.sin_port = htons(65000); // Command port - - // Send command directly (synchronous) - // Get the command endpoint from the UdpCommandDemuxer - auto& protoState = livoxProto1::getProtoState(); - if (!protoState.deviceManager) - { - std::cerr << __func__ << ": No device manager available" << std::endl; - return false; - } - - auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer - .getCmdEndpointFdDesc(); - if (!cmdEndpointFdDesc) - { - std::cerr << __func__ << ": No command endpoint available" << std::endl; - return false; - } - - ssize_t bytesSent = sendto( - cmdEndpointFdDesc->native_handle(), - &message, sizeof(message), 0, - (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); - - if (bytesSent < 0) - { - std::cerr << __func__ << ": Failed to send " << getCommandName() - << " command: " << strerror(errno) << std::endl; - return false; - } - - return true; - } -}; - -class Device::EnablePcloudDataReq -: public EnDisablePcloudDataReq -{ -public: - friend void Device::enablePcloudDataReq( - sscl::cps::Callback callback); - - EnablePcloudDataReq( - Device& dev, - sscl::cps::Callback cb) - : EnDisablePcloudDataReq(dev, std::move(cb)) - {} - - ~EnablePcloudDataReq() - { - cleanup(); - } - -private: - uint8_t getEnableFlag() const override - { - return 0x01; // Start sampling - } - - const char* getCommandName() const override - { - return "enable pcloud data"; - } - - void setPcloudDataActiveState() override - { - device.pcloudDataActive.store(true); - } -}; - -class Device::DisablePcloudDataReq -: public EnDisablePcloudDataReq -{ -public: - friend void Device::disablePcloudDataReq( - sscl::cps::Callback callback); - - DisablePcloudDataReq( - Device& dev, - sscl::cps::Callback cb) - : EnDisablePcloudDataReq( - dev, std::move(cb)) - {} - - ~DisablePcloudDataReq() - { - cleanup(); - } - -private: - uint8_t getEnableFlag() const override - { - return 0x00; // Stop sampling - } - - const char* getCommandName() const override - { - return "disable pcloud data"; - } - - void setPcloudDataActiveState() override - { - device.pcloudDataActive.store(false); - } -}; - -void Device::enablePcloudDataReq( - sscl::cps::Callback callback - ) -{ - auto request = std::make_shared( - *this, std::move(callback)); - - // Check if heartbeat socket is available - if (discoveredDevice.ipAddr.empty()) - { - std::cerr << __func__ << ": No device IP available for device " - << discoveredDevice.deviceIdentifier << std::endl; - request->callOriginalCallbackWithFailure(); - return; - } - - // Send the start sampling command - if (!request->sendCommand()) - { - request->callOriginalCallbackWithFailure(); - return; - } - - // Setup async callbacks - request->setupAsyncCallbacks(request); -} - -void Device::disablePcloudDataReq( - sscl::cps::Callback callback - ) -{ - auto request = std::make_shared( - *this, std::move(callback)); - - // Check if heartbeat socket is available - if (discoveredDevice.ipAddr.empty()) - { - std::cerr << __func__ << ": No device IP available for device " - << discoveredDevice.deviceIdentifier << std::endl; - request->callOriginalCallbackWithFailure(); - return; - } - - /* Unconditionally close the pcloud data socket early since there's no good - * reason to only close it if the command packet succeeds and ACKs. We want - * to stop receiving data immediately when disable is requested. - */ - cleanupPcloudDataSocket(); - - // Send the stop sampling command - if (!request->sendCommand()) - { - request->callOriginalCallbackWithFailure(); - return; - } - - // Setup async callbacks - request->setupAsyncCallbacks(request); -} - void Device::cleanupPcloudDataSocket() { pcloudDataActive.store(false); @@ -1856,552 +1499,4 @@ void Device::unregisterUdpCommandHandler( } } -// SetReturnModeReq continuation class -class Device::SetReturnModeReq -: public sscl::cps::NonPostedAsynchronousContinuation -{ -public: - enum class SocketState - { - SOCKET_STILL_WAITING = 0, - SOCKET_ERROR, - SOCKET_RECV_SUCCESS, - SOCKET_RECV_ERROR - }; - -public: - Device& device; - uint8_t returnMode; - - // Atomic state flags for async coordination - std::atomic timerFired{false}; - std::atomic socketState{SocketState::SOCKET_STILL_WAITING}; - std::atomic handlerExecuted{false}; - - // The timeout timer. - boost::asio::deadline_timer timeoutTimer; - - // Received data storage - uint8_t responseBuffer[1024]{}; - ssize_t bytesReceived = -1; - struct sockaddr_in senderAddr; - socklen_t senderAddrLen = sizeof(senderAddr); - -public: - friend void Device::setReturnModeReq( - uint8_t returnMode, - sscl::cps::Callback callback); - - SetReturnModeReq( - Device& dev, uint8_t mode, - sscl::cps::Callback cb) - : sscl::cps::NonPostedAsynchronousContinuation( - std::move(cb)), - device(dev), returnMode(mode), - timeoutTimer(device.componentThread->getIoService()) - {} - - virtual ~SetReturnModeReq() - { - cleanup(); - } - - // Public accessor for the original callback - void callOriginalCallback(bool success) - { this->callOriginalCb(success); } - - void callOriginalCallbackWithFailure() - { this->callOriginalCb(false); } - - void setupAsyncCallbacks(std::shared_ptr request) - { - // Set up timeout timer - timeoutTimer.expires_from_now(boost::posix_time::milliseconds( - device.commandTimeoutMs)); - timeoutTimer.async_wait( - std::bind(&SetReturnModeReq::setReturnModeReq1_1, - this, request, - std::placeholders::_1)); - - // Register UDP command handler for set return mode response (cmd_set=0x01, cmd_id=0x06) - device.registerUdpCommandHandler( - 0x01, 0x06, - std::bind(&SetReturnModeReq::setReturnModeReq1_2, - this, request, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3), - device.discoveredDevice.ipAddr); - } - - void setReturnModeReq1_1( - std::shared_ptr context, - const boost::system::error_code& error - ) - { - if (error == boost::asio::error::operation_aborted) { - // Timer was cancelled, ignore - return; - } - - context->timerFired.store(true); - context->setReturnModeReq2(context); - } - - void setReturnModeReq1_2( - std::shared_ptr context, - const uint8_t* data, ssize_t bytesReceived, - const struct sockaddr_in& senderAddr - ) - { - // Store the received data - context->bytesReceived = bytesReceived; - context->senderAddr = senderAddr; - context->senderAddrLen = sizeof(senderAddr); - - // Copy the data to our buffer - if (bytesReceived > 0 - && bytesReceived <= (ssize_t)sizeof(context->responseBuffer)) - { - memcpy(context->responseBuffer, data, bytesReceived); - context->socketState = SocketState::SOCKET_RECV_SUCCESS; - } else - { - context->socketState = SocketState::SOCKET_RECV_ERROR; - std::cerr << __func__ << ": Invalid data size: " << bytesReceived - << std::endl; - } - - context->setReturnModeReq2(context); - } - - void setReturnModeReq2( - std::shared_ptr context - ) - { - // Only execute once - if (context->handlerExecuted.exchange(true)) { return; } - - context->timeoutTimer.cancel(); - device.unregisterUdpCommandHandler( - 0x01, 0x06, device.discoveredDevice.ipAddr); - - SocketState finalSocketState = context->socketState.load(); - bool finalTimerFired = context->timerFired.load(); - - // Check for timeout only if there was no socket activity - if (finalTimerFired - && finalSocketState == SocketState::SOCKET_STILL_WAITING) - { - std::cerr << __func__ << ": Set return mode timeout with " - << device.discoveredDevice.ipAddr << "(" - << device.discoveredDevice.deviceIdentifier << ")" << "\n"; - context->callOriginalCallbackWithFailure(); - return; - } - - // Socket error from boost::asio - if (finalSocketState == SocketState::SOCKET_ERROR) - { - std::cerr << __func__ << ": Socket error during set return mode with " - << device.discoveredDevice.ipAddr << std::endl; - context->callOriginalCallbackWithFailure(); - return; - } - - if (finalSocketState == SocketState::SOCKET_RECV_ERROR) - { - std::cerr << __func__ << ": Receive error during set return mode with " - << device.discoveredDevice.ipAddr << "\n"; - context->callOriginalCallbackWithFailure(); - return; - } - - /* Result must have been RECV_SUCCESS state if we reach here. - * Data was already read in the async callback, just validate it - */ - if (context->bytesReceived - < static_cast(sizeof(comms::SetLiDARReturnModeResponse))) - { - std::cerr << __func__ << ": Response of size " - << context->bytesReceived << " too small from " - << device.discoveredDevice.ipAddr << "\n"; - context->callOriginalCallbackWithFailure(); - return; - } - - comms::SetLiDARReturnModeResponse* response = - reinterpret_cast( - context->responseBuffer); - response->swapContentsToHostEndianness(); - - // Early callback return on error; success path only if all checks pass - if (response->command.cmd_set != 0x01 || - response->command.cmd_id != 0x06 || - response->ret_code != 0x00) - { - context->callOriginalCallbackWithFailure(); - return; - } - - device.currentReturnMode = Device::ReturnMode(context->returnMode); - context->callOriginalCallback(true); - } - - void cleanup() - { - timeoutTimer.cancel(); - } - - bool sendCommand() - { - // Get the command endpoint from the UdpCommandDemuxer - auto& protoState = livoxProto1::getProtoState(); - if (!protoState.deviceManager) - { - std::cerr << __func__ << ": No device manager available.\n"; - return false; - } - - auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer - .getCmdEndpointFdDesc(); - if (!cmdEndpointFdDesc) - { - std::cerr << __func__ << ": No command endpoint available.\n"; - return false; - } - - // Create set return mode message - comms::SetLiDARReturnMode setReturnModeMsg; - setReturnModeMsg.mode = returnMode; - setReturnModeMsg.swapContentsToProtocolEndianness(); - setReturnModeMsg.header.setCrc16FromRawBytes(); - setReturnModeMsg.header.swapCrc16ToProtocolEndianness(); - setReturnModeMsg.footer.crc_32 = setReturnModeMsg.calculateCrc32(); - setReturnModeMsg.footer.swapCrc32ToProtocolEndianness(); - - // Set up destination address - struct sockaddr_in deviceAddr; - deviceAddr.sin_family = AF_INET; - deviceAddr.sin_addr.s_addr = inet_addr( - device.discoveredDevice.ipAddr.c_str()); - deviceAddr.sin_port = htons(65000); // Commands go to port 65000 - - // Send set return mode message - ssize_t bytesSent = sendto( - cmdEndpointFdDesc->native_handle(), - &setReturnModeMsg, sizeof(setReturnModeMsg), 0, - (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); - - if (bytesSent < 0) - { - std::cerr << __func__ << ": Failed to send set return mode message: " - << strerror(errno) << std::endl; - return false; - } - - std::cout << __func__ << ": Sent set return mode message to " - << device.discoveredDevice.ipAddr << ":" << 65000 << std::endl; - return true; - } -}; - -// GetReturnModeReq continuation class -class Device::GetReturnModeReq -: public sscl::cps::NonPostedAsynchronousContinuation -{ -public: - enum class SocketState - { - SOCKET_STILL_WAITING = 0, - SOCKET_ERROR, - SOCKET_RECV_SUCCESS, - SOCKET_RECV_ERROR - }; - -public: - Device& device; - - // Atomic state flags for async coordination - std::atomic timerFired{false}; - std::atomic socketState{SocketState::SOCKET_STILL_WAITING}; - std::atomic handlerExecuted{false}; - - // The timeout timer. - boost::asio::deadline_timer timeoutTimer; - - // Received data storage - uint8_t responseBuffer[1024]{}; - ssize_t bytesReceived = -1; - struct sockaddr_in senderAddr; - socklen_t senderAddrLen = sizeof(senderAddr); - -public: - friend void Device::getReturnModeReq( - sscl::cps::Callback callback); - - GetReturnModeReq( - Device& dev, - sscl::cps::Callback cb) - : sscl::cps::NonPostedAsynchronousContinuation( - std::move(cb)), - device(dev), - timeoutTimer(device.componentThread->getIoService()) - {} - - virtual ~GetReturnModeReq() - { - cleanup(); - } - - // Public accessor for the original callback - void callOriginalCallback(bool success, uint8_t returnMode) - { this->callOriginalCb(success, returnMode); } - - void callOriginalCallbackWithFailure() - { this->callOriginalCb(false, 0); } - - void setupAsyncCallbacks(std::shared_ptr request) - { - // Set up timeout timer - timeoutTimer.expires_from_now(boost::posix_time::milliseconds( - device.commandTimeoutMs)); - - timeoutTimer.async_wait( - std::bind(&GetReturnModeReq::getReturnModeReq1_1, - this, request, - std::placeholders::_1)); - - // Register UDP command handler for get return mode response (cmd_set=0x01, cmd_id=0x07) - device.registerUdpCommandHandler( - 0x01, 0x07, - std::bind(&GetReturnModeReq::getReturnModeReq1_2, - this, request, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3), - device.discoveredDevice.ipAddr); - } - - void getReturnModeReq1_1( - std::shared_ptr context, - const boost::system::error_code& error - ) - { - if (error == boost::asio::error::operation_aborted) { - // Timer was cancelled, ignore - return; - } - - context->timerFired.store(true); - context->getReturnModeReq2(context); - } - - void getReturnModeReq1_2( - std::shared_ptr context, - const uint8_t* data, ssize_t bytesReceived, - const struct sockaddr_in& senderAddr - ) - { - // Store the received data - context->bytesReceived = bytesReceived; - context->senderAddr = senderAddr; - context->senderAddrLen = sizeof(senderAddr); - - // Copy the data to our buffer - if (bytesReceived > 0 - && bytesReceived <= (ssize_t)sizeof(context->responseBuffer)) - { - memcpy(context->responseBuffer, data, bytesReceived); - context->socketState = SocketState::SOCKET_RECV_SUCCESS; - } else - { - context->socketState = SocketState::SOCKET_RECV_ERROR; - std::cerr << __func__ << ": Invalid data size: " << bytesReceived - << std::endl; - } - - context->getReturnModeReq2(context); - } - - void getReturnModeReq2( - std::shared_ptr context - ) - { - // Only execute once - if (context->handlerExecuted.exchange(true)) { return; } - - context->timeoutTimer.cancel(); - device.unregisterUdpCommandHandler( - 0x01, 0x07, device.discoveredDevice.ipAddr); - - SocketState finalSocketState = context->socketState.load(); - bool finalTimerFired = context->timerFired.load(); - - // Check for timeout only if there was no socket activity - if (finalTimerFired - && finalSocketState == SocketState::SOCKET_STILL_WAITING) - { - std::cerr << __func__ << ": Get return mode timeout with " - << device.discoveredDevice.ipAddr << "(" - << device.discoveredDevice.deviceIdentifier - << ")" << "\n"; - context->callOriginalCallbackWithFailure(); - return; - } - // Socket error from boost::asio - if (finalSocketState == SocketState::SOCKET_ERROR) - { - std::cerr << __func__ << ": Socket error during get return mode with " - << device.discoveredDevice.ipAddr << std::endl; - context->callOriginalCallbackWithFailure(); - return; - } - if (finalSocketState == SocketState::SOCKET_RECV_ERROR) - { - std::cerr << __func__ << ": Receive error during get return mode with " - << device.discoveredDevice.ipAddr << std::endl; - context->callOriginalCallbackWithFailure(); - return; - } - /* Result must have been RECV_SUCCESS state if we reach here. - * Data was already read in the async callback, just validate it - */ - if (context->bytesReceived - < static_cast(sizeof(comms::GetLiDARReturnModeResponse))) - { - std::cerr << __func__ << ": Response of size " - << context->bytesReceived << " too small from " - << device.discoveredDevice.ipAddr << std::endl; - context->callOriginalCallbackWithFailure(); - return; - } - - comms::GetLiDARReturnModeResponse* response = - reinterpret_cast( - context->responseBuffer); - response->swapContentsToHostEndianness(); - - // Check if response indicates success - if (!(response->command.cmd_set == 0x01 && - response->command.cmd_id == 0x07 && - response->ret_code == 0x00)) - { - context->callOriginalCallbackWithFailure(); - return; - } - - device.currentReturnMode = Device::ReturnMode(response->mode); - context->callOriginalCallback(true, response->mode); - } - - void cleanup() - { - timeoutTimer.cancel(); - } - - bool sendCommand() - { - // Get the command endpoint from the UdpCommandDemuxer - auto& protoState = livoxProto1::getProtoState(); - if (!protoState.deviceManager) - { - std::cerr << __func__ << ": No device manager available.\n"; - return false; - } - - auto cmdEndpointFdDesc = protoState.deviceManager->udpCommandDemuxer - .getCmdEndpointFdDesc(); - if (!cmdEndpointFdDesc) - { - std::cerr << __func__ << ": No command endpoint available.\n"; - return false; - } - - // Create get return mode message - comms::GetLiDARReturnMode getReturnModeMsg; - getReturnModeMsg.swapContentsToProtocolEndianness(); - getReturnModeMsg.header.setCrc16FromRawBytes(); - getReturnModeMsg.header.swapCrc16ToProtocolEndianness(); - getReturnModeMsg.footer.crc_32 = getReturnModeMsg.calculateCrc32(); - getReturnModeMsg.footer.swapCrc32ToProtocolEndianness(); - - // Set up destination address - struct sockaddr_in deviceAddr; - deviceAddr.sin_family = AF_INET; - deviceAddr.sin_addr.s_addr = inet_addr(device.discoveredDevice.ipAddr.c_str()); - deviceAddr.sin_port = htons(65000); // Commands go to port 65000 - - // Send get return mode message - ssize_t bytesSent = sendto( - cmdEndpointFdDesc->native_handle(), - &getReturnModeMsg, sizeof(getReturnModeMsg), 0, - (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); - - if (bytesSent < 0) - { - std::cerr << __func__ << ": Failed to send get return mode message: " - << strerror(errno) << std::endl; - return false; - } - - std::cout << __func__ << ": Sent get return mode message to " - << device.discoveredDevice.ipAddr << ":" << 65000 << std::endl; - return true; - } -}; - -void Device::setReturnModeReq( - uint8_t returnMode, - sscl::cps::Callback callback - ) -{ - auto request = std::make_shared( - *this, returnMode, std::move(callback)); - - // Check if device IP is available - if (discoveredDevice.ipAddr.empty()) - { - std::cerr << __func__ << ": No device IP available for device " - << discoveredDevice.deviceIdentifier << std::endl; - request->callOriginalCallbackWithFailure(); - return; - } - - // Send the set return mode command - if (!request->sendCommand()) - { - request->callOriginalCallbackWithFailure(); - return; - } - - // Setup async callbacks - request->setupAsyncCallbacks(request); -} - -void Device::getReturnModeReq( - sscl::cps::Callback callback - ) -{ - auto request = std::make_shared( - *this, std::move(callback)); - - // Check if device IP is available - if (discoveredDevice.ipAddr.empty()) - { - std::cerr << __func__ << ": No device IP available for device " - << discoveredDevice.deviceIdentifier << std::endl; - request->callOriginalCallbackWithFailure(); - return; - } - - // Send the get return mode command - if (!request->sendCommand()) - { - request->callOriginalCallbackWithFailure(); - return; - } - - // Setup async callbacks - request->setupAsyncCallbacks(request); -} - } // namespace livoxProto1 diff --git a/commonLibs/livoxProto1/device.h b/commonLibs/livoxProto1/device.h index d409f5a..afc948a 100644 --- a/commonLibs/livoxProto1/device.h +++ b/commonLibs/livoxProto1/device.h @@ -18,7 +18,7 @@ #include #include #include "protocol.h" -#include +#include #include // Custom hash function for std::pair @@ -96,16 +96,6 @@ private: std::optional detectSmoIp(const std::string& deviceIP); uint32_t getSubnetMaskFor(uint8_t nbits); - class ConnectReq; - class ConnectToKnownDeviceReq; - class ConnectByDeviceIdentifierReq; - class ExecuteHandshakeReq; - class DisconnectReq; - class EnablePcloudDataReq; - class DisablePcloudDataReq; - class SetReturnModeReq; - class GetReturnModeReq; - public: enum class ReturnMode : uint8_t { @@ -146,37 +136,30 @@ public: // Utility methods std::optional getSmoIp(const std::string& deviceIP); - // Callback function type definitions for async methods - typedef std::function connectReqCbFn; - typedef std::function< - void(bool success, const std::string& ipAddr)> - connectToKnownDeviceReqCbFn; - typedef std::function< - void(bool success, const std::string& ipAddr)> - connectByDeviceIdentifierReqCbFn; - typedef std::function executeHandshakeReqCbFn; - typedef std::function disconnectReqCbFn; - typedef std::function enablePcloudDataReqCbFn; - typedef std::function disablePcloudDataReqCbFn; - typedef std::function setReturnModeReqCbFn; - typedef std::function - getReturnModeReqCbFn; + struct ConnectIpResult + { + bool success = false; + std::string ipAddr; + }; // Async connection methods - void connectReq(sscl::cps::Callback callback); - void connectToKnownDeviceReq( - sscl::cps::Callback callback); - void connectByDeviceIdentifierReq( - sscl::cps::Callback callback); - void executeHandshakeReq( - const std::string& deviceIP, - sscl::cps::Callback callback); - void disconnectReq(sscl::cps::Callback callback); - void enablePcloudDataReq(sscl::cps::Callback callback); - void disablePcloudDataReq(sscl::cps::Callback callback); - void setReturnModeReq( - uint8_t returnMode, sscl::cps::Callback callback); - void getReturnModeReq(sscl::cps::Callback callback); + sscl::co::ViralNonPostingInvoker connectCReq(); + sscl::co::ViralNonPostingInvoker connectToKnownDeviceCReq(); + sscl::co::ViralNonPostingInvoker connectByDeviceIdentifierCReq(); + sscl::co::ViralNonPostingInvoker executeHandshakeCReq( + const std::string& deviceIP); + sscl::co::ViralNonPostingInvoker disconnectCReq(); + sscl::co::ViralNonPostingInvoker enablePcloudDataCReq(); + sscl::co::ViralNonPostingInvoker disablePcloudDataCReq(); + + struct GetReturnModeResult + { + bool success = false; + uint8_t returnMode = 0; + }; + + sscl::co::ViralNonPostingInvoker setReturnModeCReq(uint8_t returnMode); + sscl::co::ViralNonPostingInvoker getReturnModeCReq(); public: comms::DiscoveredDevice discoveredDevice; diff --git a/commonLibs/livoxProto1/livoxProto1.cpp b/commonLibs/livoxProto1/livoxProto1.cpp index 41b1c46..f830ba4 100644 --- a/commonLibs/livoxProto1/livoxProto1.cpp +++ b/commonLibs/livoxProto1/livoxProto1.cpp @@ -1,6 +1,5 @@ #include #include -#include #include #include "livoxProto1.h" #include "device.h" @@ -10,14 +9,13 @@ extern "C" { -void livoxProto1_getOrCreateDeviceReq( +sscl::co::ViralNonPostingInvoker +livoxProto1_getOrCreateDeviceCReq( const std::string& deviceIdentifier, const std::shared_ptr& componentThread, int commandTimeoutMs, int retryDelayMs, const std::string& smoIp, uint8_t smoSubnetNbits, - uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort, - sscl::cps::Callback callback -) + uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort) { // Get the global DeviceManager instance auto& protoState = livoxProto1::getProtoState(); @@ -28,19 +26,16 @@ void livoxProto1_getOrCreateDeviceReq( "livoxProto1_main first"); } - // Delegate to DeviceManager - protoState.deviceManager->getOrCreateDeviceReq( + // Delegate to the DeviceManager to create the device + co_return co_await protoState.deviceManager->getOrCreateDeviceCReq( deviceIdentifier, componentThread, commandTimeoutMs, retryDelayMs, smoIp, smoSubnetNbits, - dataPort, cmdPort, imuPort, - callback); + dataPort, cmdPort, imuPort); } -void livoxProto1_destroyDeviceReq( - std::shared_ptr device, - sscl::cps::Callback callback -) +sscl::co::ViralNonPostingInvoker livoxProto1_destroyDeviceCReq( + std::shared_ptr device) { auto& protoState = livoxProto1::getProtoState(); if (!protoState.deviceManager) @@ -49,8 +44,7 @@ void livoxProto1_destroyDeviceReq( + ": DeviceManager not initialized"); } - protoState.deviceManager->destroyDeviceReq( - device, callback); + co_return co_await protoState.deviceManager->destroyDeviceCReq(device); } void livoxProto1_main( @@ -65,10 +59,8 @@ void livoxProto1_exit(void) livoxProto1::exit(); } -void livoxProto1_device_enablePcloudDataReq( - std::shared_ptr device, - sscl::cps::Callback callback -) +sscl::co::ViralNonPostingInvoker livoxProto1_device_enablePcloudDataCReq( + std::shared_ptr device) { if (!device) { @@ -76,13 +68,11 @@ void livoxProto1_device_enablePcloudDataReq( + ": Device pointer is null"); } - device->enablePcloudDataReq(callback); + co_return co_await device->enablePcloudDataCReq(); } -void livoxProto1_device_disablePcloudDataReq( - std::shared_ptr device, - sscl::cps::Callback callback -) +sscl::co::ViralNonPostingInvoker livoxProto1_device_disablePcloudDataCReq( + std::shared_ptr device) { if (!device) { @@ -90,13 +80,12 @@ void livoxProto1_device_disablePcloudDataReq( + ": Device pointer is null"); } - device->disablePcloudDataReq(callback); + co_return co_await device->disablePcloudDataCReq(); } -void livoxProto1_device_getReturnModeReq( - std::shared_ptr device, - sscl::cps::Callback callback -) +sscl::co::ViralNonPostingInvoker +livoxProto1_device_getReturnModeCReq( + std::shared_ptr device) { if (!device) { @@ -104,7 +93,12 @@ void livoxProto1_device_getReturnModeReq( + ": Device pointer is null"); } - device->getReturnModeReq(callback); + livoxProto1::Device::GetReturnModeResult deviceResult = + co_await device->getReturnModeCReq(); + LivoxProto1GetReturnModeResult result; + result.success = deviceResult.success; + result.returnMode = deviceResult.returnMode; + co_return result; } std::shared_ptr diff --git a/commonLibs/livoxProto1/livoxProto1.h b/commonLibs/livoxProto1/livoxProto1.h index 8c203a2..dcb2767 100644 --- a/commonLibs/livoxProto1/livoxProto1.h +++ b/commonLibs/livoxProto1/livoxProto1.h @@ -6,7 +6,7 @@ #include #include #include -#include +#include #include // Forward declarations @@ -23,6 +23,18 @@ namespace livoxProto1 { class Device; } +struct LivoxProto1GetOrCreateDeviceResult +{ + bool success = false; + std::shared_ptr device; +}; + +struct LivoxProto1GetReturnModeResult +{ + bool success = false; + uint8_t returnMode = 0; +}; + #ifdef __cplusplus extern "C" { #endif @@ -52,54 +64,43 @@ typedef void livoxProto1_exitFn(void); * @param dataPort Data port for point cloud (default: 56000) * @param cmdPort Command port (default: 56001) * @param imuPort IMU port (default: 56002) - * @return Device pointer on success, nullptr on failure + * @return LivoxProto1GetOrCreateDeviceResult (success + device on success, + * null device on failure) */ -typedef std::function< - void(bool success, std::shared_ptr device)> - livoxProto1_getOrCreateDeviceReqCbFn; - -typedef void livoxProto1_getOrCreateDeviceReqFn( +typedef sscl::co::ViralNonPostingInvoker + livoxProto1_getOrCreateDeviceCReqFn( const std::string& deviceIdentifier, const std::shared_ptr& componentThread, int commandTimeoutMs, int retryDelayMs, const std::string& smoIp, uint8_t smoSubnetNbits, - uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort, - sscl::cps::Callback callback); + uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort); -typedef std::function livoxProto1_destroyDeviceReqCbFn; -typedef void livoxProto1_destroyDeviceReqFn( - std::shared_ptr device, - sscl::cps::Callback callback); +typedef sscl::co::ViralNonPostingInvoker livoxProto1_destroyDeviceCReqFn( + std::shared_ptr device); -typedef std::function - livoxProto1_device_enablePcloudDataReqCbFn; -typedef void livoxProto1_device_enablePcloudDataReqFn( - std::shared_ptr device, - sscl::cps::Callback callback); +typedef sscl::co::ViralNonPostingInvoker + livoxProto1_device_enablePcloudDataCReqFn( + std::shared_ptr device); -typedef std::function - livoxProto1_device_disablePcloudDataReqCbFn; -typedef void livoxProto1_device_disablePcloudDataReqFn( - std::shared_ptr device, - sscl::cps::Callback callback); +typedef sscl::co::ViralNonPostingInvoker + livoxProto1_device_disablePcloudDataCReqFn( + std::shared_ptr device); -typedef std::function - livoxProto1_device_getReturnModeReqCbFn; -typedef void livoxProto1_device_getReturnModeReqFn( - std::shared_ptr device, - sscl::cps::Callback callback); +typedef sscl::co::ViralNonPostingInvoker + livoxProto1_device_getReturnModeCReqFn( + std::shared_ptr device); typedef std::shared_ptr livoxProto1_getPcloudDataFdDescFn(void); livoxProto1_mainFn livoxProto1_main; livoxProto1_exitFn livoxProto1_exit; -livoxProto1_getOrCreateDeviceReqFn livoxProto1_getOrCreateDeviceReq; -livoxProto1_destroyDeviceReqFn livoxProto1_destroyDeviceReq; -livoxProto1_device_enablePcloudDataReqFn livoxProto1_device_enablePcloudDataReq; -livoxProto1_device_disablePcloudDataReqFn - livoxProto1_device_disablePcloudDataReq; -livoxProto1_device_getReturnModeReqFn livoxProto1_device_getReturnModeReq; +livoxProto1_getOrCreateDeviceCReqFn livoxProto1_getOrCreateDeviceCReq; +livoxProto1_destroyDeviceCReqFn livoxProto1_destroyDeviceCReq; +livoxProto1_device_enablePcloudDataCReqFn livoxProto1_device_enablePcloudDataCReq; +livoxProto1_device_disablePcloudDataCReqFn + livoxProto1_device_disablePcloudDataCReq; +livoxProto1_device_getReturnModeCReqFn livoxProto1_device_getReturnModeCReq; livoxProto1_getPcloudDataFdDescFn livoxProto1_getPcloudDataFdDesc; #ifdef __cplusplus diff --git a/commonLibs/livoxProto1/udpCommandDemuxer.cpp b/commonLibs/livoxProto1/udpCommandDemuxer.cpp index 38fba57..011360c 100644 --- a/commonLibs/livoxProto1/udpCommandDemuxer.cpp +++ b/commonLibs/livoxProto1/udpCommandDemuxer.cpp @@ -1,6 +1,10 @@ +#include + #include #include +#include #include +#include #include #include #include @@ -8,7 +12,11 @@ #include #include +#include +#include +#include #include "udpCommandDemuxer.h" +#include "protocol.h" #include "core.h" #include "device.h" @@ -330,6 +338,19 @@ void UdpCommandDemuxer::processIncomingData() char sourceIP[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &senderAddr.sin_addr, sourceIP, INET_ADDRSTRLEN); + if (bytesReceived >= static_cast( + sizeof(Header) + sizeof(Command))) + { + const uint8_t cmdSet = receiveBuffer[sizeof(Header)]; + const uint8_t cmdId = receiveBuffer[sizeof(Header) + 1]; + + if (tryCompletePendingCommandWait( + sourceIP, cmdSet, cmdId, receiveBuffer, bytesReceived)) + { + return; + } + } + // First, find device with matching IP address in DeviceManager collection for (const auto &device : deviceManager.devices) { @@ -395,5 +416,208 @@ void UdpCommandDemuxer::processIncomingData() << sourceIP << ", discarding datagram" << std::endl; } +struct UdpCommandDemuxer::PendingCommandWaitDesc +{ + CommandWaitKey key; + boost::asio::io_service &resumeIoService; + std::atomic settled{false}; + UdpCommandResponseResult result{}; + std::coroutine_handle<> callerSchedHandle; + + PendingCommandWaitDesc( + CommandWaitKey keyIn, + boost::asio::io_service &resumeIoServiceIn) + : key(std::move(keyIn)), + resumeIoService(resumeIoServiceIn) + {} +}; + +void UdpCommandDemuxer::settlePendingCommandWait( + const std::shared_ptr &wait, + UdpCommandResponseResult::Outcome outcome, + const uint8_t *data, ssize_t bytesReceived) +{ + if (wait->settled.exchange(true)) { + return; + } + + wait->result.outcome = outcome; + wait->result.bytesReceived = bytesReceived; + + if (outcome == UdpCommandResponseResult::Outcome::Response + && data != nullptr + && bytesReceived > 0 + && bytesReceived + <= static_cast(sizeof(wait->result.buffer))) + { + memcpy(wait->result.buffer, data, bytesReceived); + } + + std::coroutine_handle<> handle = wait->callerSchedHandle; + if (!handle) { + return; + } + + boost::asio::post(wait->resumeIoService, handle); +} + +std::shared_ptr +UdpCommandDemuxer::findAndRemovePendingCommandWait(const CommandWaitKey &key) +{ + sscl::SpinLock::Guard guard(pendingWaits.lock); + const auto iterator = pendingWaits.rsrc.pendingWaits.find(key); + if (iterator == pendingWaits.rsrc.pendingWaits.end()) { + return nullptr; + } + + std::shared_ptr wait = iterator->second; + pendingWaits.rsrc.pendingWaits.erase(iterator); + return wait; +} + +void UdpCommandDemuxer::cancelPendingCommandWait( + uint8_t cmdSet, uint8_t cmdId, + const std::string &deviceIp) +{ + std::shared_ptr wait = findAndRemovePendingCommandWait( + {deviceIp, cmdSet, cmdId}); + + if (!wait) { return; } + + settlePendingCommandWait( + wait, + UdpCommandResponseResult::Outcome::Timeout, + nullptr, -1); +} + +bool UdpCommandDemuxer::tryCompletePendingCommandWait( + const char *sourceIp, + uint8_t cmdSet, uint8_t cmdId, + const uint8_t *data, ssize_t bytesReceived) +{ + std::shared_ptr wait = findAndRemovePendingCommandWait( + {sourceIp, cmdSet, cmdId}); + + if (!wait) { return false; } + + const UdpCommandResponseResult::Outcome outcome = + (bytesReceived > 0 + && bytesReceived + <= static_cast(sizeof(wait->result.buffer))) + ? UdpCommandResponseResult::Outcome::Response + : UdpCommandResponseResult::Outcome::RecvError; + + settlePendingCommandWait(wait, outcome, data, bytesReceived); + return true; +} + +sscl::co::ViralNonPostingInvoker +UdpCommandDemuxer::waitForCommandResponseCReq( + uint8_t cmdSet, uint8_t cmdId, + const std::string &deviceIp) +{ + const CommandWaitKey key{deviceIp, cmdSet, cmdId}; + auto wait = std::make_shared( + key, componentThread->getIoService()); + + { + sscl::SpinLock::Guard guard(pendingWaits.lock); + pendingWaits.rsrc.pendingWaits[key] = wait; + } + + struct PendingCommandWaitDescAwaiter + { + std::shared_ptr wait; + + bool await_ready() const noexcept + { + return wait->settled.load(std::memory_order_acquire); + } + + bool await_suspend(std::coroutine_handle<> caller) noexcept + { + if (wait->settled.load(std::memory_order_acquire)) { + return false; + } + + wait->callerSchedHandle = caller; + return true; + } + + UdpCommandResponseResult await_resume() const noexcept + { + return wait->result; + } + }; + + const UdpCommandResponseResult result = + co_await PendingCommandWaitDescAwaiter{wait}; + + if (findAndRemovePendingCommandWait(key)) + { + std::cerr << __func__ << ": pending wait still registered after " + "settle for device " << deviceIp << " (cmd_set=" + << static_cast(cmdSet) << ", cmd_id=" + << static_cast(cmdId) << "); program error" + << std::endl; + } + + co_return result; +} + +sscl::co::ViralNonPostingInvoker +UdpCommandDemuxer::waitForCommandResponseCReq( + uint8_t cmdSet, uint8_t cmdId, + const std::string &deviceIp, + int timeoutMs) +{ + /** EXPLANATION: + * We setup an async timer event to detect timeout, and register a UDP + * command handler to wait for the device to respond to the incoming command + * request. If the device does not respond within the timeout period, + * we will consider the command to have failed. + */ + boost::asio::io_service &ioService = componentThread->getIoService(); + std::optional> raceTimer; + auto timerAwaiter = adapters::boostAsio::getDeadlineTimerAReqAwaiter( + ioService, + boost::posix_time::milliseconds(timeoutMs), + raceTimer); + auto responseInvoker = waitForCommandResponseCReq(cmdSet, cmdId, deviceIp); + + static constexpr int timerMemberSettlementIndex = 0; + + sscl::co::Group group; + group.add(timerAwaiter); + group.add(responseInvoker); + + co_await group.getAwaitFirstSettlementInvoker(); + group.checkForAndReThrowGroupExceptions(); + + const bool timerWonFirst = + group.s.rsrc.firstSettledInvokerIdx == timerMemberSettlementIndex; + + if (timerWonFirst) { + cancelPendingCommandWait(cmdSet, cmdId, deviceIp); + } else if (raceTimer) { + (*raceTimer)->cancel(); + } + + /** Group member adapter coros are fire-and-forget; keep group alive until + * both members settle so the loser adapter does not touch freed state. + */ + co_await group.getAwaitAllSettlementsInvoker(); + group.checkForAndReThrowGroupExceptions(); + + if (timerWonFirst) + { + UdpCommandResponseResult timeoutResult; + timeoutResult.outcome = UdpCommandResponseResult::Outcome::Timeout; + co_return timeoutResult; + } + + co_return responseInvoker.completedReturnValues().myReturnValue; +} + } // namespace comms } // namespace livoxProto1 diff --git a/commonLibs/livoxProto1/udpCommandDemuxer.h b/commonLibs/livoxProto1/udpCommandDemuxer.h index 75e3da3..347fc67 100644 --- a/commonLibs/livoxProto1/udpCommandDemuxer.h +++ b/commonLibs/livoxProto1/udpCommandDemuxer.h @@ -3,10 +3,16 @@ #include #include +#include +#include #include +#include +#include #include #include #include +#include +#include namespace livoxProto1 { @@ -15,6 +21,45 @@ class DeviceManager; namespace comms { +struct UdpCommandResponseResult +{ + enum class Outcome + { + Timeout, + Response, + RecvError + }; + + Outcome outcome = Outcome::Timeout; + uint8_t buffer[1024]{}; + ssize_t bytesReceived = -1; +}; + +struct CommandWaitKey +{ + std::string deviceIp; + uint8_t cmdSet; + uint8_t cmdId; + + bool operator==(const CommandWaitKey &other) const + { + return deviceIp == other.deviceIp + && cmdSet == other.cmdSet + && cmdId == other.cmdId; + } +}; + +struct CommandWaitKeyHash +{ + std::size_t operator()(const CommandWaitKey &key) const + { + std::size_t hash = std::hash{}(key.deviceIp); + hash ^= (static_cast(key.cmdSet) << 8) + | static_cast(key.cmdId); + return hash; + } +}; + /** * UdpCommandDemuxer - Routes UDP command datagrams to appropriate devices * @@ -62,13 +107,20 @@ public: return pcloudDataFdDesc; } -private: - // Socket and async objects - std::shared_ptr pcloudDataFdDesc; - // Socket and async objects - std::shared_ptr cmdEndpointFdDesc; + sscl::co::ViralNonPostingInvoker + waitForCommandResponseCReq( + uint8_t cmdSet, uint8_t cmdId, + const std::string &deviceIp, + int timeoutMs); private: + struct PendingCommandWaitDesc; + + sscl::co::ViralNonPostingInvoker + waitForCommandResponseCReq( + uint8_t cmdSet, uint8_t cmdId, + const std::string &deviceIp); + void setupSockets(); void setupCommandSocket(); void setupPcloudDataSocket(); @@ -76,6 +128,23 @@ private: void onDataReady(const boost::system::error_code& error); void processIncomingData(); + bool tryCompletePendingCommandWait( + const char *sourceIp, + uint8_t cmdSet, uint8_t cmdId, + const uint8_t *data, ssize_t bytesReceived); + + void cancelPendingCommandWait( + uint8_t cmdSet, uint8_t cmdId, + const std::string &deviceIp); + + std::shared_ptr findAndRemovePendingCommandWait( + const CommandWaitKey &key); + + void settlePendingCommandWait( + const std::shared_ptr &wait, + UdpCommandResponseResult::Outcome outcome, + const uint8_t *data, ssize_t bytesReceived); + std::shared_ptr componentThread; DeviceManager& deviceManager; uint16_t commandPort; @@ -86,7 +155,21 @@ private: std::atomic isActive{false}; std::atomic shouldStop{false}; - // Receive buffer + struct PendingWaitsResources + { + std::unordered_map< + CommandWaitKey, + std::shared_ptr, + CommandWaitKeyHash> + pendingWaits; + }; + + sscl::SharedResourceGroup + pendingWaits; + + std::shared_ptr pcloudDataFdDesc; + std::shared_ptr cmdEndpointFdDesc; + uint8_t receiveBuffer[1024]; struct sockaddr_in senderAddr; socklen_t senderAddrLen; diff --git a/include/adapters/README.md b/include/adapters/README.md index 7685d59..2488657 100644 --- a/include/adapters/README.md +++ b/include/adapters/README.md @@ -18,12 +18,11 @@ or event-driven APIs. include/adapters/ README.md boostAsio/ - + deadlineTimerAReq.h opencl/ smo/ cpsCallbackAReq.h - livoxProto1CpsAwaiters.h ``` @@ -32,7 +31,7 @@ include/adapters/ - Name adapter awaiter wrapper functions `getAReqAwaiter()`, where `` is the wrapped CPS/API request symbol with its library prefix removed and each `_`-delimited segment Pascal-cased (e.g. - `livoxProto1_getOrCreateDeviceReq` → `getGetOrCreateDeviceReqAReqAwaiter()`). + `someLib_someOperationReq` → `getSomeOperationReqAReqAwaiter()`). - Keep adapters small and single-purpose; but unify where possible to reduce code duplication. - Make result types explicit for multi-argument callbacks. diff --git a/include/adapters/boostAsio/deadlineTimerAReq.h b/include/adapters/boostAsio/deadlineTimerAReq.h index de01084..b7096f2 100644 --- a/include/adapters/boostAsio/deadlineTimerAReq.h +++ b/include/adapters/boostAsio/deadlineTimerAReq.h @@ -2,31 +2,110 @@ #define ADAPTERS_BOOST_ASIO_DEADLINE_TIMER_AREQ_H #include -#include + +#include +#include +#include +#include + #include +#include +#include #include -#include +#include namespace adapters::boostAsio { -using TimerWaitCbFn = std::function; +/** Coroutine awaiter: true if the delay elapsed, false if cancelled/aborted. */ +class DeadlineTimerAReq +{ +public: + struct AsyncState + { + std::atomic settled{false}; + bool timerExpiredNormally = false; + std::coroutine_handle<> callerSchedHandle; + std::shared_ptr timer; + }; -inline auto deadlineTimerWaitAReq( + DeadlineTimerAReq( + boost::asio::io_service &resumeIoService, + const boost::posix_time::milliseconds delay, + std::optional> &timerOut) + : asyncState(std::make_shared()), + resumeIoService(resumeIoService) + { + asyncState->timer = + std::make_shared(resumeIoService); + timerOut = asyncState->timer; + + asyncState->timer->expires_from_now(delay); + asyncState->timer->async_wait( + [this](const boost::system::error_code &error) + { + onTimer(error); + }); + } + + bool await_ready() const noexcept + { + return asyncState->settled.load(std::memory_order_acquire); + } + + bool await_suspend(std::coroutine_handle<> caller) noexcept + { + if (asyncState->settled.load(std::memory_order_acquire)) { + return false; + } + + asyncState->callerSchedHandle = caller; + return true; + } + + bool await_resume() const noexcept + { + return asyncState->timerExpiredNormally; + } + +private: + void onTimer(const boost::system::error_code &error) + { + if (asyncState->settled.exchange(true)) { + return; + } + + asyncState->timerExpiredNormally = !error; + signalSettledAndResumeCaller(); + } + + void signalSettledAndResumeCaller() + { + std::coroutine_handle<> handle = asyncState->callerSchedHandle; + if (!handle) { + return; + } + + boost::asio::post(resumeIoService, handle); + } + + std::shared_ptr asyncState; + boost::asio::io_service &resumeIoService; +}; + +inline auto getDeadlineTimerAReqAwaiter( boost::asio::io_service &ioService, const boost::posix_time::milliseconds delay) { - return smo::cpsBoundary::CpsCallbackAReq)>>( - ioService, - [&ioService, delay](sscl::cps::Callback cb) - { - auto timer = std::make_shared(ioService); - timer->expires_from_now(delay); - timer->async_wait( - [timer, cb](const boost::system::error_code &error) mutable - { - cb.callbackFn(!error); - }); - }); + std::optional> timerOut; + return DeadlineTimerAReq(ioService, delay, timerOut); +} + +inline auto getDeadlineTimerAReqAwaiter( + boost::asio::io_service &ioService, + const boost::posix_time::milliseconds delay, + std::optional> &timerOut) +{ + return DeadlineTimerAReq(ioService, delay, timerOut); } } // namespace adapters::boostAsio diff --git a/include/adapters/smo/cpsCallbackAReq.h b/include/adapters/smo/cpsCallbackAReq.h index 0db47bb..5e90ad6 100644 --- a/include/adapters/smo/cpsCallbackAReq.h +++ b/include/adapters/smo/cpsCallbackAReq.h @@ -80,9 +80,7 @@ private: return; } - boost::asio::post( - resumeIoService, - [handle]() { handle.resume(); }); + boost::asio::post(resumeIoService, handle); } std::shared_ptr asyncState; diff --git a/include/adapters/smo/livoxProto1CpsAwaiters.h b/include/adapters/smo/livoxProto1CpsAwaiters.h deleted file mode 100644 index 5b1030b..0000000 --- a/include/adapters/smo/livoxProto1CpsAwaiters.h +++ /dev/null @@ -1,121 +0,0 @@ -#ifndef ADAPTERS_SMO_LIVOX_PROTO1_CPS_AWAITERS_H -#define ADAPTERS_SMO_LIVOX_PROTO1_CPS_AWAITERS_H - -#include -#include - -#include -#include -#include - -namespace adapters::smo { - -struct GetOrCreateDeviceResult -{ - bool success = false; - std::shared_ptr device; -}; - -struct GetReturnModeResult -{ - bool success = false; - uint8_t returnMode = 0; -}; - -inline auto getGetOrCreateDeviceReqAReqAwaiter( - boost::asio::io_service &resumeIoService, - livoxProto1_getOrCreateDeviceReqFn *fn, - const std::string &deviceIdentifier, - const std::shared_ptr &componentThread, - int commandTimeoutMs, - int retryDelayMs, - const std::string &smoIp, - uint8_t smoSubnetNbits, - uint16_t dataPort, - uint16_t cmdPort, - uint16_t imuPort) -{ - return ::smo::cpsBoundary::CpsCallbackAReq< - GetOrCreateDeviceResult, - livoxProto1_getOrCreateDeviceReqCbFn, - std::function)>>( - resumeIoService, - [=](sscl::cps::Callback cb) - { - (*fn)( - deviceIdentifier, - componentThread, - commandTimeoutMs, retryDelayMs, - smoIp, smoSubnetNbits, - dataPort, cmdPort, imuPort, - std::move(cb)); - }); -} - -inline auto getDeviceGetReturnModeReqAReqAwaiter( - boost::asio::io_service &resumeIoService, - livoxProto1_device_getReturnModeReqFn *fn, - std::shared_ptr device) -{ - return ::smo::cpsBoundary::CpsCallbackAReq< - GetReturnModeResult, - livoxProto1_device_getReturnModeReqCbFn, - std::function)>>( - resumeIoService, - [=](sscl::cps::Callback cb) - { - (*fn)(device, std::move(cb)); - }); -} - -inline auto getDeviceEnablePcloudDataReqAReqAwaiter( - boost::asio::io_service &resumeIoService, - livoxProto1_device_enablePcloudDataReqFn *fn, - std::shared_ptr device) -{ - return ::smo::cpsBoundary::CpsCallbackAReq< - bool, - livoxProto1_device_enablePcloudDataReqCbFn, - std::function)>>( - resumeIoService, - [=](sscl::cps::Callback cb) - { - (*fn)(device, std::move(cb)); - }); -} - -inline auto getDeviceDisablePcloudDataReqAReqAwaiter( - boost::asio::io_service &resumeIoService, - livoxProto1_device_disablePcloudDataReqFn *fn, - std::shared_ptr device) -{ - return ::smo::cpsBoundary::CpsCallbackAReq< - bool, - livoxProto1_device_disablePcloudDataReqCbFn, - std::function)>>( - resumeIoService, - [=](sscl::cps::Callback cb) - { - (*fn)(device, std::move(cb)); - }); -} - -inline auto getDestroyDeviceReqAReqAwaiter( - boost::asio::io_service &resumeIoService, - livoxProto1_destroyDeviceReqFn *fn, - std::shared_ptr device) -{ - return ::smo::cpsBoundary::CpsCallbackAReq< - bool, - livoxProto1_destroyDeviceReqCbFn, - std::function)>>( - resumeIoService, - [=](sscl::cps::Callback cb) - { - (*fn)(device, std::move(cb)); - }); -} - -} // namespace adapters::smo - -#endif // ADAPTERS_SMO_LIVOX_PROTO1_CPS_AWAITERS_H diff --git a/stimBuffApis/livoxGen1/CMakeLists.txt b/stimBuffApis/livoxGen1/CMakeLists.txt index 70200e3..550d60c 100644 --- a/stimBuffApis/livoxGen1/CMakeLists.txt +++ b/stimBuffApis/livoxGen1/CMakeLists.txt @@ -14,7 +14,6 @@ if(ENABLE_STIMBUFFAPI_livoxGen1) add_library(livoxGen1 SHARED livoxGen1.cpp - livoxGen1Proto1CpsBridge.cpp pcloudStimulusProducer.cpp livoxPcloudFrameDumper.cpp ioUringAssemblyEngine.cpp diff --git a/stimBuffApis/livoxGen1/livoxGen1.cpp b/stimBuffApis/livoxGen1/livoxGen1.cpp index 3cd73a2..aafce3c 100644 --- a/stimBuffApis/livoxGen1/livoxGen1.cpp +++ b/stimBuffApis/livoxGen1/livoxGen1.cpp @@ -11,7 +11,6 @@ #include #include "livoxGen1Internal.h" -#include "livoxGen1Proto1CpsBridge.h" namespace smo::stim_buff { @@ -28,11 +27,11 @@ LivoxProto1DllState::LivoxProto1DllState() : dlopenHandle(nullptr, DlCloser), livoxProto1_main(nullptr), livoxProto1_exit(nullptr), - livoxProto1_getOrCreateDeviceReq(nullptr), - livoxProto1_destroyDeviceReq(nullptr), - livoxProto1_device_enablePcloudDataReq(nullptr), - livoxProto1_device_disablePcloudDataReq(nullptr), - livoxProto1_device_getReturnModeReq(nullptr), + livoxProto1_getOrCreateDeviceCReq(nullptr), + livoxProto1_destroyDeviceCReq(nullptr), + livoxProto1_device_enablePcloudDataCReq(nullptr), + livoxProto1_device_disablePcloudDataCReq(nullptr), + livoxProto1_device_getReturnModeCReq(nullptr), livoxProto1_getPcloudDataFdDesc(nullptr) {} @@ -221,14 +220,14 @@ bool validateAttachRequest( sscl::co::ViralNonPostingInvoker enablePcloudDataForAttach( const std::shared_ptr &desc, - const std::shared_ptr &componentThread, + const std::shared_ptr &/*componentThread*/, const std::shared_ptr &device) { /* Enable pcloud data. Don't need delay since no commands were * sent to device prior to us reaching here (or delay already handled). */ - const bool enabled = co_await coAwaitEnablePcloudData( - componentThread, device); + const bool enabled = co_await (*livoxProto1.livoxProto1_device_enablePcloudDataCReq)( + device); if (!enabled) { @@ -329,10 +328,13 @@ attachByCreatingProducer( * Generally, it will resume sending them within 1-2 seconds. */ const LivoxProviderParams params = parseLivoxProviderParams(desc); - adapters::smo::GetOrCreateDeviceResult deviceResult = - co_await coAwaitGetOrCreateDevice( - componentThread, desc->deviceSelector, - params); + LivoxProto1GetOrCreateDeviceResult deviceResult = + co_await (*livoxProto1.livoxProto1_getOrCreateDeviceCReq)( + desc->deviceSelector, + componentThread, + params.commandTimeoutMs, params.retryDelayMs, + params.smoIp, params.smoSubnetNbits, + params.dataPort, params.cmdPort, params.imuPort); if (!deviceResult.success || !deviceResult.device) { @@ -354,7 +356,7 @@ attachByCreatingProducer( * may not yet be ready for another command. */ // Initialize timer with LivoxGen1 metadata io_service - const bool delayOk = co_await adapters::boostAsio::deadlineTimerWaitAReq( + const bool delayOk = co_await adapters::boostAsio::getDeadlineTimerAReqAwaiter( componentThread->getIoService(), boost::posix_time::milliseconds(LIVOX_GEN1_DEVICE_COMMAND_DELAY_MS)); @@ -364,8 +366,9 @@ attachByCreatingProducer( co_return StimBuffDeviceOpResult{false, desc}; } - auto returnModeResult = co_await coAwaitGetReturnMode( - componentThread, deviceResult.device); + LivoxProto1GetReturnModeResult returnModeResult = + co_await (*livoxProto1.livoxProto1_device_getReturnModeCReq)( + deviceResult.device); if (!returnModeResult.success) { @@ -455,7 +458,7 @@ livoxGen1_attachDeviceCReq( const std::shared_ptr &desc, const std::shared_ptr &componentThread) { - if (!livoxProto1.livoxProto1_getOrCreateDeviceReq) + if (!livoxProto1.livoxProto1_getOrCreateDeviceCReq) { throw std::runtime_error( std::string(__func__) + ": LivoxProto1 getOrCreateDevice function " @@ -522,8 +525,8 @@ livoxGen1_detachDeviceCReq( // Last buffer on producer: disable pcloud before tearing down device // Disable point cloud data first - const bool disabled = co_await coAwaitDisablePcloudData( - requestComponentThread, stimProducer->device); + const bool disabled = co_await (*livoxProto1.livoxProto1_device_disablePcloudDataCReq)( + stimProducer->device); if (!disabled) { @@ -536,7 +539,7 @@ livoxGen1_detachDeviceCReq( // Helper method to delay and then call destroyDeviceReq // Initialize timer with LivoxGen1 metadata io_service - co_await adapters::boostAsio::deadlineTimerWaitAReq( + co_await adapters::boostAsio::getDeadlineTimerAReqAwaiter( requestComponentThread->getIoService(), boost::posix_time::milliseconds(LIVOX_GEN1_DEVICE_COMMAND_DELAY_MS)); @@ -563,8 +566,7 @@ livoxGen1_detachDeviceCReq( attachedStimulusProducers.erase(it); } - const bool destroyed = co_await coAwaitDestroyDevice( - requestComponentThread, + const bool destroyed = co_await (*livoxProto1.livoxProto1_destroyDeviceCReq)( stimProducer->device); if (!destroyed) { std::cerr << __func__ << ": Failed to destroy dev " @@ -611,31 +613,31 @@ sscl::co::ViralNonPostingInvoker livoxGen1_initializeCInd() dlsym(livoxProto1.dlopenHandle.get(), "livoxProto1_main")); livoxProto1.livoxProto1_exit = reinterpret_cast( dlsym(livoxProto1.dlopenHandle.get(), "livoxProto1_exit")); - livoxProto1.livoxProto1_getOrCreateDeviceReq = reinterpret_cast< - livoxProto1_getOrCreateDeviceReqFn *>( + livoxProto1.livoxProto1_getOrCreateDeviceCReq = reinterpret_cast< + livoxProto1_getOrCreateDeviceCReqFn *>( dlsym( livoxProto1.dlopenHandle.get(), - "livoxProto1_getOrCreateDeviceReq")); - livoxProto1.livoxProto1_destroyDeviceReq = reinterpret_cast< - livoxProto1_destroyDeviceReqFn *>( + "livoxProto1_getOrCreateDeviceCReq")); + livoxProto1.livoxProto1_destroyDeviceCReq = reinterpret_cast< + livoxProto1_destroyDeviceCReqFn *>( dlsym( livoxProto1.dlopenHandle.get(), - "livoxProto1_destroyDeviceReq")); - livoxProto1.livoxProto1_device_enablePcloudDataReq = reinterpret_cast< - livoxProto1_device_enablePcloudDataReqFn *>( + "livoxProto1_destroyDeviceCReq")); + livoxProto1.livoxProto1_device_enablePcloudDataCReq = reinterpret_cast< + livoxProto1_device_enablePcloudDataCReqFn *>( dlsym( livoxProto1.dlopenHandle.get(), - "livoxProto1_device_enablePcloudDataReq")); - livoxProto1.livoxProto1_device_disablePcloudDataReq = reinterpret_cast< - livoxProto1_device_disablePcloudDataReqFn *>( + "livoxProto1_device_enablePcloudDataCReq")); + livoxProto1.livoxProto1_device_disablePcloudDataCReq = reinterpret_cast< + livoxProto1_device_disablePcloudDataCReqFn *>( dlsym( livoxProto1.dlopenHandle.get(), - "livoxProto1_device_disablePcloudDataReq")); - livoxProto1.livoxProto1_device_getReturnModeReq = reinterpret_cast< - livoxProto1_device_getReturnModeReqFn *>( + "livoxProto1_device_disablePcloudDataCReq")); + livoxProto1.livoxProto1_device_getReturnModeCReq = reinterpret_cast< + livoxProto1_device_getReturnModeCReqFn *>( dlsym( livoxProto1.dlopenHandle.get(), - "livoxProto1_device_getReturnModeReq")); + "livoxProto1_device_getReturnModeCReq")); livoxProto1.livoxProto1_getPcloudDataFdDesc = reinterpret_cast< livoxProto1_getPcloudDataFdDescFn *>( dlsym( @@ -644,11 +646,11 @@ sscl::co::ViralNonPostingInvoker livoxGen1_initializeCInd() if (!livoxProto1.livoxProto1_main || !livoxProto1.livoxProto1_exit - || !livoxProto1.livoxProto1_getOrCreateDeviceReq - || !livoxProto1.livoxProto1_destroyDeviceReq - || !livoxProto1.livoxProto1_device_enablePcloudDataReq - || !livoxProto1.livoxProto1_device_disablePcloudDataReq - || !livoxProto1.livoxProto1_device_getReturnModeReq + || !livoxProto1.livoxProto1_getOrCreateDeviceCReq + || !livoxProto1.livoxProto1_destroyDeviceCReq + || !livoxProto1.livoxProto1_device_enablePcloudDataCReq + || !livoxProto1.livoxProto1_device_disablePcloudDataCReq + || !livoxProto1.livoxProto1_device_getReturnModeCReq || !livoxProto1.livoxProto1_getPcloudDataFdDesc) { throw std::runtime_error( diff --git a/stimBuffApis/livoxGen1/livoxGen1.h b/stimBuffApis/livoxGen1/livoxGen1.h index b08b395..cc0faab 100644 --- a/stimBuffApis/livoxGen1/livoxGen1.h +++ b/stimBuffApis/livoxGen1/livoxGen1.h @@ -18,13 +18,13 @@ struct LivoxProto1DllState std::unique_ptr dlopenHandle; livoxProto1_mainFn *livoxProto1_main; livoxProto1_exitFn *livoxProto1_exit; - livoxProto1_getOrCreateDeviceReqFn *livoxProto1_getOrCreateDeviceReq; - livoxProto1_destroyDeviceReqFn *livoxProto1_destroyDeviceReq; - livoxProto1_device_enablePcloudDataReqFn - *livoxProto1_device_enablePcloudDataReq; - livoxProto1_device_disablePcloudDataReqFn - *livoxProto1_device_disablePcloudDataReq; - livoxProto1_device_getReturnModeReqFn *livoxProto1_device_getReturnModeReq; + livoxProto1_getOrCreateDeviceCReqFn *livoxProto1_getOrCreateDeviceCReq; + livoxProto1_destroyDeviceCReqFn *livoxProto1_destroyDeviceCReq; + livoxProto1_device_enablePcloudDataCReqFn + *livoxProto1_device_enablePcloudDataCReq; + livoxProto1_device_disablePcloudDataCReqFn + *livoxProto1_device_disablePcloudDataCReq; + livoxProto1_device_getReturnModeCReqFn *livoxProto1_device_getReturnModeCReq; livoxProto1_getPcloudDataFdDescFn *livoxProto1_getPcloudDataFdDesc; }; diff --git a/stimBuffApis/livoxGen1/livoxGen1Proto1CpsBridge.cpp b/stimBuffApis/livoxGen1/livoxGen1Proto1CpsBridge.cpp deleted file mode 100644 index 59b5d3a..0000000 --- a/stimBuffApis/livoxGen1/livoxGen1Proto1CpsBridge.cpp +++ /dev/null @@ -1,89 +0,0 @@ -#include "livoxGen1Proto1CpsBridge.h" - -#include - -namespace smo::stim_buff { - -sscl::co::ViralNonPostingInvoker -coAwaitGetOrCreateDevice( - const std::shared_ptr &componentThread, - const std::string &deviceIdentifier, - const LivoxProviderParams ¶ms) -{ - if (!livoxProto1.livoxProto1_getOrCreateDeviceReq) { - throw std::runtime_error("coAwaitGetOrCreateDevice: proto1 function missing"); - } - - auto result = co_await adapters::smo::getGetOrCreateDeviceReqAReqAwaiter( - componentThread->getIoService(), - livoxProto1.livoxProto1_getOrCreateDeviceReq, - deviceIdentifier, - componentThread, - params.commandTimeoutMs, - params.retryDelayMs, - params.smoIp, - params.smoSubnetNbits, - params.dataPort, - params.cmdPort, - params.imuPort); - co_return result; -} - -sscl::co::ViralNonPostingInvoker -coAwaitGetReturnMode( - const std::shared_ptr &componentThread, - const std::shared_ptr &device) -{ - if (!livoxProto1.livoxProto1_device_getReturnModeReq) { - throw std::runtime_error("coAwaitGetReturnMode: proto1 function missing"); - } - - co_return co_await adapters::smo::getDeviceGetReturnModeReqAReqAwaiter( - componentThread->getIoService(), - livoxProto1.livoxProto1_device_getReturnModeReq, - device); -} - -sscl::co::ViralNonPostingInvoker coAwaitEnablePcloudData( - const std::shared_ptr &componentThread, - const std::shared_ptr &device) -{ - if (!livoxProto1.livoxProto1_device_enablePcloudDataReq) { - throw std::runtime_error("coAwaitEnablePcloudData: proto1 function missing"); - } - - co_return co_await adapters::smo::getDeviceEnablePcloudDataReqAReqAwaiter( - componentThread->getIoService(), - livoxProto1.livoxProto1_device_enablePcloudDataReq, - device); -} - -sscl::co::ViralNonPostingInvoker coAwaitDisablePcloudData( - const std::shared_ptr &componentThread, - const std::shared_ptr &device) -{ - if (!livoxProto1.livoxProto1_device_disablePcloudDataReq) { - throw std::runtime_error("coAwaitDisablePcloudData: proto1 function missing"); - } - - co_return co_await adapters::smo::getDeviceDisablePcloudDataReqAReqAwaiter( - componentThread->getIoService(), - livoxProto1.livoxProto1_device_disablePcloudDataReq, - device); -} - -sscl::co::ViralNonPostingInvoker coAwaitDestroyDevice( - const std::shared_ptr &componentThread, - const std::shared_ptr &device) -{ - if (!livoxProto1.livoxProto1_destroyDeviceReq) { - throw std::runtime_error("coAwaitDestroyDevice: proto1 function missing"); - } - - co_return co_await adapters::smo::getDestroyDeviceReqAReqAwaiter( - componentThread->getIoService(), - livoxProto1.livoxProto1_destroyDeviceReq, - device); -} - -} // namespace smo::stim_buff diff --git a/stimBuffApis/livoxGen1/livoxGen1Proto1CpsBridge.h b/stimBuffApis/livoxGen1/livoxGen1Proto1CpsBridge.h deleted file mode 100644 index 394d053..0000000 --- a/stimBuffApis/livoxGen1/livoxGen1Proto1CpsBridge.h +++ /dev/null @@ -1,39 +0,0 @@ -#ifndef LIVOX_GEN1_PROTO1_CPS_BRIDGE_H -#define LIVOX_GEN1_PROTO1_CPS_BRIDGE_H - -#include - -#include -#include -#include - -#include "livoxGen1Internal.h" - -namespace smo::stim_buff { - -sscl::co::ViralNonPostingInvoker -coAwaitGetOrCreateDevice( - const std::shared_ptr &componentThread, - const std::string &deviceIdentifier, - const LivoxProviderParams ¶ms); - -sscl::co::ViralNonPostingInvoker -coAwaitGetReturnMode( - const std::shared_ptr &componentThread, - const std::shared_ptr &device); - -sscl::co::ViralNonPostingInvoker coAwaitEnablePcloudData( - const std::shared_ptr &componentThread, - const std::shared_ptr &device); - -sscl::co::ViralNonPostingInvoker coAwaitDisablePcloudData( - const std::shared_ptr &componentThread, - const std::shared_ptr &device); - -sscl::co::ViralNonPostingInvoker coAwaitDestroyDevice( - const std::shared_ptr &componentThread, - const std::shared_ptr &device); - -} // namespace smo::stim_buff - -#endif // LIVOX_GEN1_PROTO1_CPS_BRIDGE_H