diff --git a/commonLibs/livoxProto1/device.cpp b/commonLibs/livoxProto1/device.cpp index f212d63..e40c58d 100644 --- a/commonLibs/livoxProto1/device.cpp +++ b/commonLibs/livoxProto1/device.cpp @@ -96,7 +96,9 @@ handshakeTimeoutMs(handshakeTimeoutMs), retryDelayMs(retryDelayMs), smoIp(smoIp), detectedSmoListeningIp(""), smoSubnetNbits(smoSubnetNbits), dataPort(dataPort), cmdPort(cmdPort), imuPort(imuPort), heartbeatFd(-1), -heartbeatActive(false) +heartbeatActive(false), +pcloudDataActive(false), +pcloudDataFd(-1) { } @@ -109,11 +111,23 @@ Device::~Device() } } + if (pcloudDataActive.load()) { + pcloudDataActive.store(false); + if (pcloudDataSocketDesc) { + pcloudDataSocketDesc->cancel(); + } + } + heartbeatTimer.reset(); + pcloudDataSocketDesc.reset(); if (heartbeatFd >= 0) { close(heartbeatFd); heartbeatFd = -1; } + if (pcloudDataFd >= 0) { + close(pcloudDataFd); + pcloudDataFd = -1; + } } /** @@ -1207,4 +1221,511 @@ std::optional Device::getSmoIp(const std::string& deviceIP) return std::nullopt; } +// Base class for both enable and disable pcloud data requests +template +class EnDisablePcloudDataReq +: public smo::NonPostedAsynchronousContinuation +{ +public: + enum class SocketState + { + SOCKET_STILL_WAITING = 0, + SOCKET_ERROR, + SOCKET_RECV_SUCCESS, + SOCKET_RECV_ERROR + }; + +public: + Device& device; + + // Atomic state flags for async coordination + std::atomic timerFired{false}; + std::atomic socketState{SocketState::SOCKET_STILL_WAITING}; + std::atomic handlerExecuted{false}; + + // The timeout timer. + boost::asio::deadline_timer timeoutTimer; + /* This wrapper is just to enable us to use boost::stream_descriptor for its + * convenient API when waiting for the enable/disable ACK dgram. + */ + boost::asio::posix::stream_descriptor cmdResponseBoostFdWrapper; + + // Received data storage + uint8_t responseBuffer[1024]{}; + ssize_t bytesReceived = -1; + struct sockaddr_in senderAddr; + socklen_t senderAddrLen = sizeof(senderAddr); + +protected: + EnDisablePcloudDataReq( + Device& dev, + smo::Callback cb) + : smo::NonPostedAsynchronousContinuation(std::move(cb)), + device(dev), + timeoutTimer(device.componentThread->getIoService()), + cmdResponseBoostFdWrapper(device.componentThread->getIoService()) + {} + +public: + virtual ~EnDisablePcloudDataReq() + { + cleanup(); + } + + // Public accessor for the original callback + void callOriginalCallback(bool success) + { this->callOriginalCb(success); } + + void callOriginalCallbackWithFailure() + { + /** + * EXPLANATION: + * We have to call cleanupCmdResponseFdBoostWrapper() here, specifically + * because there are self-references within this class that need to be + * cleaned up. + * + * The cmdResponseBoostFdWrapper holds a reference to the heartbeat + * socket for async operations. When the sequence fails, we need to + * break this reference to allow proper cleanup. + * + * Hence, we call cleanupCmdResponseFdBoostWrapper() at the point of + * failure. + */ + cleanupCmdResponseFdBoostWrapper(); + callOriginalCallback(false); + } + + void cleanupCmdResponseFdBoostWrapper() + { + if (cmdResponseBoostFdWrapper.is_open()) { + cmdResponseBoostFdWrapper.release(); // Don't close heartbeat socket + } + } + +protected: + bool setupSocket() + { + // Use the existing heartbeat socket for sending commands and receiving responses + if (device.heartbeatFd < 0) + { + std::cerr << __func__ << ": No heartbeat socket available" + << std::endl; + return false; + } + return true; + } + + void setupAsyncCallbacks( + const std::shared_ptr> &request + ) + { + cmdResponseBoostFdWrapper.assign(device.heartbeatFd); + + // Setup timeout timer + timeoutTimer.expires_from_now( + boost::posix_time::milliseconds(device.handshakeTimeoutMs)); + + timeoutTimer.async_wait( + std::bind( + &EnDisablePcloudDataReq::enDisablePcloudDataReq1_1, + this, request, + std::placeholders::_1)); + + // Setup async wait for read-ready + cmdResponseBoostFdWrapper.async_wait( + boost::asio::posix::stream_descriptor::wait_read, + std::bind( + &EnDisablePcloudDataReq::enDisablePcloudDataReq1_2, + this, request, + std::placeholders::_1)); + } + + void enDisablePcloudDataReq1_1( + std::shared_ptr> context, + const boost::system::error_code& error + ) + { + (void)error; // Suppress unused parameter warning + context->timerFired = true; + context->enDisablePcloudDataReq2(context); + } + + void enDisablePcloudDataReq1_2( + std::shared_ptr> context, + const boost::system::error_code& error + ) + { + if (!error) + { + // Data is available for reading, perform the actual read + context->bytesReceived = recvfrom( + context->device.heartbeatFd, + context->responseBuffer, sizeof(context->responseBuffer), 0, + (struct sockaddr*)&context->senderAddr, &context->senderAddrLen); + + if (context->bytesReceived > 0) + { context->socketState = SocketState::SOCKET_RECV_SUCCESS; } + else + { context->socketState = SocketState::SOCKET_RECV_ERROR; } + } + else + { context->socketState = SocketState::SOCKET_RECV_ERROR; } + + context->enDisablePcloudDataReq2(context); + } + + void enDisablePcloudDataReq2( + std::shared_ptr> context + ) + { + // Only execute once + if (context->handlerExecuted.exchange(true)) { return; } + + SocketState finalSocketState = context->socketState.load(); + bool finalTimerFired = context->timerFired.load(); + + context->timeoutTimer.cancel(); + + if (finalTimerFired && + finalSocketState == SocketState::SOCKET_STILL_WAITING) + { + std::cerr << __func__ << ": Command timeout for device " + << context->device.discoveredDevice.deviceIdentifier + << std::endl; + context->callOriginalCallbackWithFailure(); + return; + } + + if (finalSocketState == SocketState::SOCKET_ERROR) + { + std::cerr << __func__ << ": Socket error during command for device " + << context->device.discoveredDevice.deviceIdentifier + << std::endl; + context->callOriginalCallbackWithFailure(); + return; + } + + if (finalSocketState == SocketState::SOCKET_RECV_ERROR) + { + std::cerr + << __func__ << ": Receive error during command for device " + << context->device.discoveredDevice.deviceIdentifier + << std::endl; + context->callOriginalCallbackWithFailure(); + return; + } + + // Result must have been RECV_SUCCESS state if we reach here + if (context->bytesReceived + < (ssize_t)sizeof(livoxProto1::comms::SamplingResponse)) + { + std::cerr << __func__ << ": Response of size " + << context->bytesReceived + << " is too small for sampling response (expected " + << sizeof(livoxProto1::comms::SamplingResponse) << ")" + << std::endl; + context->callOriginalCallbackWithFailure(); + return; + } + + // Parse response using protocol structure + livoxProto1::comms::SamplingResponse* response = + reinterpret_cast( + context->responseBuffer); + + response->swapContentsToHostEndianness(); + if (!response->sanityCheck()) + { + std::cerr << __func__ << ": Invalid sampling response structure.\n"; + context->callOriginalCallbackWithFailure(); + return; + } + + // Check if response indicates success + if (response->command.cmd_set == 0x00 && + response->command.cmd_id == 0x04 && + response->ret_code == 0x00) + { + // Set the appropriate pcloud data active state based on command type + context->setPcloudDataActiveState(); + context->callOriginalCallback(true); + return; + } + + // If we get here, the command failed + context->callOriginalCallbackWithFailure(); + } + + void cleanup() + { + timeoutTimer.cancel(); + cleanupCmdResponseFdBoostWrapper(); + } + + // Pure virtual methods that derived classes must implement + virtual uint8_t getEnableFlag() const = 0; + virtual const char* getCommandName() const = 0; + virtual void setPcloudDataActiveState() = 0; + + // Common sendCommand implementation + bool sendCommand() + { + // Create start/stop sampling message using protocol structure + livoxProto1::comms::StartStopSamplingMessage message; + + // Set enable flag based on derived class implementation + message.enable = getEnableFlag(); + + // Calculate and set CRC32 + message.footer.crc_32 = message.calculateCrc32(); + message.swapContentsToProtocolEndianness(); + + struct sockaddr_in deviceAddr; + memset(&deviceAddr, 0, sizeof(deviceAddr)); + deviceAddr.sin_family = AF_INET; + deviceAddr.sin_addr.s_addr = + inet_addr(device.discoveredDevice.ipAddr.c_str()); + deviceAddr.sin_port = htons(65000); // Command port + + // Send command directly (synchronous) + ssize_t bytesSent = sendto( + device.heartbeatFd, + &message, sizeof(message), 0, + (struct sockaddr*)&deviceAddr, sizeof(deviceAddr)); + + if (bytesSent < 0) + { + std::cerr << __func__ << ": Failed to send " << getCommandName() + << " command: " << strerror(errno) << std::endl; + return false; + } + + return true; + } +}; + +class Device::EnablePcloudDataReq +: public EnDisablePcloudDataReq +{ +public: + friend void Device::enablePcloudDataReq( + smo::Callback callback); + + EnablePcloudDataReq( + Device& dev, + smo::Callback cb) + : EnDisablePcloudDataReq(dev, std::move(cb)) + {} + + ~EnablePcloudDataReq() + { + cleanup(); + } + +private: + uint8_t getEnableFlag() const override + { + return 0x01; // Start sampling + } + + const char* getCommandName() const override + { + return "enable pcloud data"; + } + + void setPcloudDataActiveState() override + { + device.pcloudDataActive.store(true); + } +}; + +class Device::DisablePcloudDataReq +: public EnDisablePcloudDataReq +{ +public: + friend void Device::disablePcloudDataReq( + smo::Callback callback); + + DisablePcloudDataReq( + Device& dev, + smo::Callback cb) + : EnDisablePcloudDataReq(dev, std::move(cb)) + {} + + ~DisablePcloudDataReq() + { + cleanup(); + } + +private: + uint8_t getEnableFlag() const override + { + return 0x00; // Stop sampling + } + + const char* getCommandName() const override + { + return "disable pcloud data"; + } + + void setPcloudDataActiveState() override + { + device.pcloudDataActive.store(false); + } +}; + +void Device::enablePcloudDataReq( + smo::Callback callback + ) +{ + auto request = std::make_shared( + *this, std::move(callback)); + + // Check if heartbeat socket is available + if (heartbeatFd < 0) + { + std::cerr << __func__ << ": No heartbeat socket available for device " + << discoveredDevice.deviceIdentifier << std::endl; + request->callOriginalCallbackWithFailure(); + return; + } + + // Setup socket for async operations + if (!request->setupSocket()) + { + request->callOriginalCallbackWithFailure(); + return; + } + + // Set up the point cloud data socket for actual data reception + if (!setupPcloudDataSocket()) + { + std::cerr << __func__ << ": Failed to set up point cloud data socket" + << std::endl; + // Don't fail the command, but log the issue + } + + // Send the start sampling command + if (!request->sendCommand()) + { + request->callOriginalCallbackWithFailure(); + return; + } + + // Setup async callbacks + request->setupAsyncCallbacks(request); +} + +void Device::disablePcloudDataReq( + smo::Callback callback + ) +{ + auto request = std::make_shared( + *this, std::move(callback)); + + // Check if heartbeat socket is available + if (heartbeatFd < 0) + { + std::cerr << __func__ << ": No heartbeat socket available for device " + << discoveredDevice.deviceIdentifier << std::endl; + request->callOriginalCallbackWithFailure(); + return; + } + + /* Unconditionally close the pcloud data socket early since there's no good + * reason to only close it if the command packet succeeds and ACKs. We want + * to stop receiving data immediately when disable is requested. + */ + cleanupPcloudDataSocket(); + + // Setup socket for async operations + if (!request->setupSocket()) + { + request->callOriginalCallbackWithFailure(); + return; + } + + // Send the stop sampling command + if (!request->sendCommand()) + { + request->callOriginalCallbackWithFailure(); + return; + } + + // Setup async callbacks + request->setupAsyncCallbacks(request); +} + +bool Device::setupPcloudDataSocket() +{ + // RAII class to manage socket file descriptor + struct SocketRAII + { + int fd; + SocketRAII(int socketFd) : fd(socketFd) {} + ~SocketRAII() { if (fd >= 0) close(fd); } + void commit() { fd = -1; } // Transfer ownership, prevent close + int getFd() const { return fd; } + bool isValid() const { return fd >= 0; } + }; + + // Create UDP socket for point cloud data reception + SocketRAII socketGuard(socket(AF_INET, SOCK_DGRAM, 0)); + if (!socketGuard.isValid()) + { + std::cerr << __func__ << ": Failed to create socket: " + << strerror(errno) << std::endl; + return false; + } + + // Set socket to non-blocking mode + int flags = fcntl(socketGuard.getFd(), F_GETFL, 0); + if (flags < 0 || + fcntl(socketGuard.getFd(), F_SETFL, flags | O_NONBLOCK) < 0) + { + std::cerr << __func__ << ": Failed to set non-blocking mode: " + << strerror(errno) << std::endl; + return false; + } + + // Bind to the data port (65001) + struct sockaddr_in localAddr; + memset(&localAddr, 0, sizeof(localAddr)); + localAddr.sin_family = AF_INET; + localAddr.sin_addr.s_addr = INADDR_ANY; + localAddr.sin_port = htons(65001); // Data port + + if (bind( + socketGuard.getFd(), (struct sockaddr *)&localAddr, + sizeof(localAddr)) < 0) + { + std::cerr << __func__ << ": Failed to bind to data port: " + << strerror(errno) << std::endl; + return false; + } + + // Create boost wrapper for async operations + pcloudDataSocketDesc = + std::make_unique( + componentThread->getIoService(), socketGuard.getFd()); + + pcloudDataFd = socketGuard.getFd(); + // Transfer ownership, prevent auto-close + socketGuard.commit(); + return true; +} + +void Device::cleanupPcloudDataSocket() +{ + if (pcloudDataSocketDesc) { + pcloudDataSocketDesc->cancel(); + pcloudDataSocketDesc.reset(); + } + if (pcloudDataFd >= 0) { + close(pcloudDataFd); + pcloudDataFd = -1; + } + pcloudDataActive.store(false); +} + } // namespace livoxProto1 diff --git a/commonLibs/livoxProto1/device.h b/commonLibs/livoxProto1/device.h index 23c45e4..6ae872d 100644 --- a/commonLibs/livoxProto1/device.h +++ b/commonLibs/livoxProto1/device.h @@ -12,6 +12,7 @@ #include #include #include +#include #include "protocol.h" #include @@ -95,6 +96,8 @@ private: class ConnectByDeviceIdentifierReq; class ExecuteHandshakeReq; class DisconnectReq; + class EnablePcloudDataReq; + class DisablePcloudDataReq; public: // Utility methods @@ -110,6 +113,8 @@ public: connectByDeviceIdentifierReqCbFn; typedef std::function executeHandshakeReqCbFn; typedef std::function disconnectReqCbFn; + typedef std::function enablePcloudDataReqCbFn; + typedef std::function disablePcloudDataReqCbFn; // Async connection methods void connectReq(smo::Callback callback); @@ -121,11 +126,24 @@ public: const std::string& deviceIP, smo::Callback callback); void disconnectReq(smo::Callback callback); + void enablePcloudDataReq(smo::Callback callback); + void disablePcloudDataReq(smo::Callback callback); // Heartbeat state std::unique_ptr heartbeatTimer; + // FIXME: Might be useful to rename this to commandAndHeartbeatFd. int heartbeatFd; // Socket file descriptor used for heartbeat std::atomic heartbeatActive; + + // Point cloud data state + std::unique_ptr pcloudDataSocketDesc; + std::atomic pcloudDataActive; + int pcloudDataFd; // Socket file descriptor for point cloud data reception + +private: + // Point cloud data setup + bool setupPcloudDataSocket(); + void cleanupPcloudDataSocket(); }; } // namespace livoxProto1 diff --git a/commonLibs/livoxProto1/protocol.cpp b/commonLibs/livoxProto1/protocol.cpp index 01ccc72..7148e89 100644 --- a/commonLibs/livoxProto1/protocol.cpp +++ b/commonLibs/livoxProto1/protocol.cpp @@ -708,5 +708,98 @@ bool DisconnectMessage::validateCrc32() const return isValid; } +// StartStopSamplingMessage methods +StartStopSamplingMessage::StartStopSamplingMessage() +{ + // Initialize header + header.sof = 0xAA; + header.version = 1; + header.length = sizeof(StartStopSamplingMessage) - sizeof(Header) - sizeof(Footer); + header.cmd_type = 0x02; // MSG type + header.seq_num = 0; // Will be set by caller if needed + header.crc_16 = 0; // Will be calculated + + // Initialize command + command.cmd_set = 0x00; // General command set + command.cmd_id = 0x04; // Sampling command ID + + // Initialize data - enable flag will be set manually by caller + enable = 0x00; // Default to stop, caller will override + + // Initialize footer + footer.crc_32 = 0; // Will be calculated +} + +uint32_t StartStopSamplingMessage::calculateCrc32() const +{ + // Calculate CRC32 for the entire message excluding the footer CRC32 field + const uint8_t* messageData = reinterpret_cast(this); + size_t messageSize = sizeof(StartStopSamplingMessage) - sizeof(footer.crc_32); + + return comms::calculateCrc32(messageData, messageSize); +} + +void StartStopSamplingMessage::swapContentsToProtocolEndianness() +{ + header.swapToProtocolEndianness(); + command.swapToProtocolEndianness(); + footer.swapToProtocolEndianness(); +} + + +bool StartStopSamplingMessage::sanityCheck() const +{ + return header.sanityCheck() && command.sanityCheck() && footer.sanityCheck(); +} + +bool StartStopSamplingMessage::validateCrc32() const +{ + uint32_t calculatedCrc = calculateCrc32(); + bool isValid = (calculatedCrc == footer.crc_32); + + // Debug output only if validation fails + if (!isValid) + { + std::cout << "StartStopSamplingMessage CRC32 Debug: calculated=0x" + << std::hex << calculatedCrc + << ", received=0x" << footer.crc_32 << std::dec << std::endl; + } + + return isValid; +} + +// SamplingResponse methods +void SamplingResponse::swapContentsToHostEndianness() +{ + header.swapToHostEndianness(); + command.swapToHostEndianness(); + footer.swapToHostEndianness(); +} + +bool SamplingResponse::sanityCheck() const +{ + return header.sanityCheck() && command.sanityCheck() && footer.sanityCheck(); +} + +bool SamplingResponse::validateCrc32() const +{ + // Calculate CRC32 for the entire message excluding the footer CRC32 field + const uint8_t* messageData = reinterpret_cast(this); + size_t messageSize = sizeof(SamplingResponse) - sizeof(footer.crc_32); + + uint32_t calculatedCrc = comms::calculateCrc32(messageData, messageSize); + bool isValid = (calculatedCrc == footer.crc_32); + + // Debug output only if validation fails + if (!isValid) + { + std::cout << "SamplingResponse CRC32 Debug: calculated=0x" + << std::hex << calculatedCrc + << ", received=0x" << footer.crc_32 << std::dec << std::endl; + } + + return isValid; +} + } // namespace comms } // namespace livoxProto1 diff --git a/commonLibs/livoxProto1/protocol.h b/commonLibs/livoxProto1/protocol.h index 34cebc1..d564d16 100644 --- a/commonLibs/livoxProto1/protocol.h +++ b/commonLibs/livoxProto1/protocol.h @@ -246,6 +246,40 @@ struct DisconnectMessage bool validateCrc32() const; } __attribute__((packed)); +/** EXPLANATION: + * Complete start/stop sampling command frame for enabling/disabling point cloud data from Livox devices. + * This is the complete wire format including header, command fields, data, and footer. + */ +struct StartStopSamplingMessage +{ + Header header; // 0-8: Protocol frame header + Command command; // 9-10: Command identification + uint8_t enable; // 11: Enable flag (0x01 = Start, 0x00 = Stop) + Footer footer; // 12-15: Protocol frame footer + + StartStopSamplingMessage(); + uint32_t calculateCrc32() const; + void swapContentsToProtocolEndianness(); + bool sanityCheck() const; + bool validateCrc32() const; +} __attribute__((packed)); + +/** EXPLANATION: + * Complete sampling response frame from Livox devices. + * This is the complete wire format including header, command fields, data, and footer. + */ +struct SamplingResponse +{ + Header header; // 0-8: Protocol frame header + Command command; // 9-10: Command identification + uint8_t ret_code; // 11: Return Code (0x00 = Success, 0x01 = Fail) + Footer footer; // 12-15: Protocol frame footer + + void swapContentsToHostEndianness(); + bool sanityCheck() const; + bool validateCrc32() const; +} __attribute__((packed)); + } // namespace comms } // namespace livoxProto1