From cde2737876b5edc241a9024d074ff20132fba0eb Mon Sep 17 00:00:00 2001 From: Hayodea Hekol Date: Sun, 24 May 2026 16:12:29 -0400 Subject: [PATCH] Libspinscale: Initial top-level SMO port to coroutine framework We haven't ported everything. Just the top-level methods. We'll dig in to the leaf stuff later. Surprisingly, this all went without any real difficulties. Runs like a charm on first try. --- commonLibs/livoxProto1/core.cpp | 20 +- commonLibs/livoxProto1/core.h | 6 +- commonLibs/livoxProto1/device.cpp | 78 +- commonLibs/livoxProto1/device.h | 20 +- commonLibs/livoxProto1/livoxProto1.cpp | 12 +- commonLibs/livoxProto1/livoxProto1.h | 12 +- docs/prompts/new-dap-spec-negtrins.md | 35 + include/user/senseApiDesc.h | 6 +- libspinscale | 2 +- smocore/CMakeLists.txt | 1 + smocore/body/body.cpp | 235 +--- smocore/componentThreadTags.cpp | 85 ++ .../deviceAttachmentPipeSpecParser.cpp | 14 +- .../deviceAttachmentPipeSpecp.yy | 4 +- smocore/deviceManager/deviceManager.cpp | 1156 ++++++----------- smocore/deviceManager/deviceReattacher.cpp | 80 +- smocore/include/body/body.h | 14 +- smocore/include/body/bodyThread.h | 30 + smocore/include/deviceManager/device.h | 29 +- smocore/include/deviceManager/deviceManager.h | 125 +- .../include/deviceManager/deviceReattacher.h | 44 +- smocore/include/director/directorThread.h | 27 + smocore/include/marionette/marionette.h | 28 +- smocore/include/marionette/marionetteThread.h | 37 + smocore/include/mind.h | 11 +- smocore/include/simulator/simulatorThread.h | 27 + smocore/include/stimBuffApis/stimBuffApiLib.h | 14 +- .../include/stimBuffApis/stimBuffApiManager.h | 17 +- smocore/include/subconsciousThread.h | 25 + smocore/include/world/worldThread.h | 25 + smocore/marionette/lifetime.cpp | 253 ++-- smocore/marionette/main.cpp | 21 +- smocore/mind.cpp | 194 +-- smocore/opts.cpp | 4 +- smocore/stimBuffApis/stimBuffApiManager.cpp | 33 +- .../livoxGen1/ioUringAssemblyEngine.cpp | 18 +- .../livoxGen1/ioUringAssemblyEngine.h | 6 +- stimBuffApis/livoxGen1/livoxGen1.cpp | 20 +- .../openClCollatingAndMeshingEngine.cpp | 16 +- .../openClCollatingAndMeshingEngine.h | 4 +- .../livoxGen1/pcloudStimulusProducer.cpp | 8 +- .../livoxGen1/pcloudStimulusProducer.h | 6 +- stimBuffApis/xcbWindow/xcbWindow.cpp | 6 +- tests/smocore/qutex_tests.cpp | 18 +- 44 files changed, 1296 insertions(+), 1530 deletions(-) create mode 100644 docs/prompts/new-dap-spec-negtrins.md create mode 100644 smocore/componentThreadTags.cpp create mode 100644 smocore/include/body/bodyThread.h create mode 100644 smocore/include/director/directorThread.h create mode 100644 smocore/include/marionette/marionetteThread.h create mode 100644 smocore/include/simulator/simulatorThread.h create mode 100644 smocore/include/subconsciousThread.h create mode 100644 smocore/include/world/worldThread.h diff --git a/commonLibs/livoxProto1/core.cpp b/commonLibs/livoxProto1/core.cpp index 03837ed..ce2907d 100644 --- a/commonLibs/livoxProto1/core.cpp +++ b/commonLibs/livoxProto1/core.cpp @@ -3,8 +3,8 @@ #include #include #include -#include -#include +#include +#include #include #include "protocol.h" #include "core.h" @@ -74,7 +74,7 @@ std::optional> DeviceManager::getDevice( // GetOrCreateDeviceReq nested class implementation class DeviceManager::GetOrCreateDeviceReq -: public sscl::NonPostedAsynchronousContinuation< +: public sscl::cps::NonPostedAsynchronousContinuation< livoxProto1_getOrCreateDeviceReqCbFn> { public: @@ -86,8 +86,8 @@ public: GetOrCreateDeviceReq( DeviceManager& mgr, std::shared_ptr device, - sscl::Callback cb) - : sscl::NonPostedAsynchronousContinuation< + sscl::cps::Callback cb) + : sscl::cps::NonPostedAsynchronousContinuation< livoxProto1_getOrCreateDeviceReqCbFn>(std::move(cb)), deviceManager(mgr), pendingDevice(device) {} @@ -132,7 +132,7 @@ void DeviceManager::getOrCreateDeviceReq( int commandTimeoutMs, int retryDelayMs, const std::string& smoIp, uint8_t smoSubnetNbits, uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort, - sscl::Callback callback) + sscl::cps::Callback callback) { // Validate smoIp format using Boost.Asio IPv4 validation if (!smoIp.empty() && !comms::isValidIPv4(smoIp)) @@ -179,7 +179,7 @@ void DeviceManager::getOrCreateDeviceReq( } class DeviceManager::DestroyDeviceReq -: public sscl::NonPostedAsynchronousContinuation< +: public sscl::cps::NonPostedAsynchronousContinuation< livoxProto1_destroyDeviceReqCbFn> { public: @@ -190,8 +190,8 @@ public: DestroyDeviceReq( DeviceManager& mgr, std::shared_ptr device, - sscl::Callback cb) - : sscl::NonPostedAsynchronousContinuation< + sscl::cps::Callback cb) + : sscl::cps::NonPostedAsynchronousContinuation< livoxProto1_destroyDeviceReqCbFn>(std::move(cb)), deviceManager(mgr), pendingDevice(device) {} @@ -220,7 +220,7 @@ public: void DeviceManager::destroyDeviceReq( std::shared_ptr dev, - sscl::Callback callback + sscl::cps::Callback callback ) { /** EXPLANATION: diff --git a/commonLibs/livoxProto1/core.h b/commonLibs/livoxProto1/core.h index a430826..22e2375 100644 --- a/commonLibs/livoxProto1/core.h +++ b/commonLibs/livoxProto1/core.h @@ -11,7 +11,7 @@ #include "broadcastListener.h" #include "udpCommandDemuxer.h" #include "livoxProto1.h" -#include +#include namespace livoxProto1 { @@ -29,11 +29,11 @@ public: int commandTimeoutMs, int retryDelayMs, const std::string& smoIp, uint8_t smoSubnetNbits, uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort, - sscl::Callback callback); + sscl::cps::Callback callback); void destroyDeviceReq( std::shared_ptr device, - sscl::Callback callback); + sscl::cps::Callback callback); std::optional> getDevice( const std::string &deviceIdentifier); diff --git a/commonLibs/livoxProto1/device.cpp b/commonLibs/livoxProto1/device.cpp index b947046..0ac074f 100644 --- a/commonLibs/livoxProto1/device.cpp +++ b/commonLibs/livoxProto1/device.cpp @@ -17,8 +17,8 @@ #include #include #include -#include -#include +#include +#include #include "device.h" #include "protocol.h" #include "core.h" @@ -124,15 +124,15 @@ Device::~Device() * This class manages the overall device connection process including handshake and heartbeat setup */ class Device::ConnectReq -: public sscl::NonPostedAsynchronousContinuation +: public sscl::cps::NonPostedAsynchronousContinuation { private: Device& device; boost::asio::deadline_timer delayTimer; public: - ConnectReq(Device& dev, sscl::Callback cb) - : sscl::NonPostedAsynchronousContinuation( + ConnectReq(Device& dev, sscl::cps::Callback cb) + : sscl::cps::NonPostedAsynchronousContinuation( std::move(cb)), device(dev), delayTimer(dev.componentThread->getIoService()) {} @@ -216,7 +216,7 @@ public: { callOriginalCallback(false); } }; -void Device::connectReq(sscl::Callback callback) +void Device::connectReq(sscl::cps::Callback callback) { // Create the connection request object to hold state and callbacks auto request = std::make_shared(*this, std::move(callback)); @@ -233,7 +233,7 @@ void Device::connectReq(sscl::Callback callback) } class Device::ConnectToKnownDeviceReq -: public sscl::NonPostedAsynchronousContinuation< +: public sscl::cps::NonPostedAsynchronousContinuation< Device::connectToKnownDeviceReqCbFn> { public: @@ -241,8 +241,8 @@ public: std::string deviceIP; std::shared_ptr deviceInfo; - ConnectToKnownDeviceReq(Device& dev, sscl::Callback cb) - : sscl::NonPostedAsynchronousContinuation< + ConnectToKnownDeviceReq(Device& dev, sscl::cps::Callback cb) + : sscl::cps::NonPostedAsynchronousContinuation< Device::connectToKnownDeviceReqCbFn>(std::move(cb)), device(dev) {} @@ -269,7 +269,7 @@ public: * broadcastListener. */ void Device::connectToKnownDeviceReq( - sscl::Callback callback + sscl::cps::Callback callback ) { // Create the connection request object to hold state and callbacks @@ -336,7 +336,7 @@ void Device::connectToKnownDeviceReq( } class Device::ConnectByDeviceIdentifierReq -: public sscl::NonPostedAsynchronousContinuation< +: public sscl::cps::NonPostedAsynchronousContinuation< Device::connectByDeviceIdentifierReqCbFn> { public: @@ -344,8 +344,8 @@ public: std::string deviceIP; ConnectByDeviceIdentifierReq( - Device& dev, sscl::Callback cb) - : sscl::NonPostedAsynchronousContinuation< + Device& dev, sscl::cps::Callback cb) + : sscl::cps::NonPostedAsynchronousContinuation< Device::connectByDeviceIdentifierReqCbFn>( std::move(cb)), device(dev) {} @@ -370,7 +370,7 @@ public: }; void Device::connectByDeviceIdentifierReq( - sscl::Callback callback + sscl::cps::Callback callback ) { /** EXPLANATION: @@ -419,13 +419,13 @@ void Device::connectByDeviceIdentifierReq( } class Device::ExecuteHandshakeReq -: public sscl::NonPostedAsynchronousContinuation< +: public sscl::cps::NonPostedAsynchronousContinuation< Device::executeHandshakeReqCbFn> { public: friend void Device::executeHandshakeReq( const std::string& deviceIP, - sscl::Callback callback); + sscl::cps::Callback callback); enum class SocketState { @@ -459,8 +459,8 @@ public: Device& dev, const std::string& deviceIP, std::shared_ptr &cmdEndpointFdDesc, - sscl::Callback cb) - : sscl::NonPostedAsynchronousContinuation( + sscl::cps::Callback cb) + : sscl::cps::NonPostedAsynchronousContinuation( std::move(cb)), device(dev), deviceIP(deviceIP), cmdEndpointFdDesc(cmdEndpointFdDesc), @@ -753,7 +753,7 @@ private: void Device::executeHandshakeReq( const std::string& deviceIP, - sscl::Callback callback + sscl::cps::Callback callback ) { // Get the command endpoint from the UdpCommandDemuxer @@ -803,7 +803,7 @@ void Device::executeHandshakeReq( } } -void Device::disconnectReq(sscl::Callback callback) +void Device::disconnectReq(sscl::cps::Callback callback) { // Stop heartbeat first stopHeartbeat(); @@ -1331,7 +1331,7 @@ std::optional Device::getSmoIp(const std::string& deviceIP) // Base class for both enable and disable pcloud data requests template class EnDisablePcloudDataReq -: public sscl::NonPostedAsynchronousContinuation +: public sscl::cps::NonPostedAsynchronousContinuation { public: enum class SocketState @@ -1362,8 +1362,8 @@ public: protected: EnDisablePcloudDataReq( Device& dev, - sscl::Callback cb) - : sscl::NonPostedAsynchronousContinuation(std::move(cb)), + sscl::cps::Callback cb) + : sscl::cps::NonPostedAsynchronousContinuation(std::move(cb)), device(dev), timeoutTimer(device.componentThread->getIoService()) {} @@ -1608,11 +1608,11 @@ class Device::EnablePcloudDataReq { public: friend void Device::enablePcloudDataReq( - sscl::Callback callback); + sscl::cps::Callback callback); EnablePcloudDataReq( Device& dev, - sscl::Callback cb) + sscl::cps::Callback cb) : EnDisablePcloudDataReq(dev, std::move(cb)) {} @@ -1643,11 +1643,11 @@ class Device::DisablePcloudDataReq { public: friend void Device::disablePcloudDataReq( - sscl::Callback callback); + sscl::cps::Callback callback); DisablePcloudDataReq( Device& dev, - sscl::Callback cb) + sscl::cps::Callback cb) : EnDisablePcloudDataReq( dev, std::move(cb)) {} @@ -1675,7 +1675,7 @@ private: }; void Device::enablePcloudDataReq( - sscl::Callback callback + sscl::cps::Callback callback ) { auto request = std::make_shared( @@ -1702,7 +1702,7 @@ void Device::enablePcloudDataReq( } void Device::disablePcloudDataReq( - sscl::Callback callback + sscl::cps::Callback callback ) { auto request = std::make_shared( @@ -1858,7 +1858,7 @@ void Device::unregisterUdpCommandHandler( // SetReturnModeReq continuation class class Device::SetReturnModeReq -: public sscl::NonPostedAsynchronousContinuation +: public sscl::cps::NonPostedAsynchronousContinuation { public: enum class SocketState @@ -1890,12 +1890,12 @@ public: public: friend void Device::setReturnModeReq( uint8_t returnMode, - sscl::Callback callback); + sscl::cps::Callback callback); SetReturnModeReq( Device& dev, uint8_t mode, - sscl::Callback cb) - : sscl::NonPostedAsynchronousContinuation( + sscl::cps::Callback cb) + : sscl::cps::NonPostedAsynchronousContinuation( std::move(cb)), device(dev), returnMode(mode), timeoutTimer(device.componentThread->getIoService()) @@ -2107,7 +2107,7 @@ public: // GetReturnModeReq continuation class class Device::GetReturnModeReq -: public sscl::NonPostedAsynchronousContinuation +: public sscl::cps::NonPostedAsynchronousContinuation { public: enum class SocketState @@ -2137,12 +2137,12 @@ public: public: friend void Device::getReturnModeReq( - sscl::Callback callback); + sscl::cps::Callback callback); GetReturnModeReq( Device& dev, - sscl::Callback cb) - : sscl::NonPostedAsynchronousContinuation( + sscl::cps::Callback cb) + : sscl::cps::NonPostedAsynchronousContinuation( std::move(cb)), device(dev), timeoutTimer(device.componentThread->getIoService()) @@ -2351,7 +2351,7 @@ public: void Device::setReturnModeReq( uint8_t returnMode, - sscl::Callback callback + sscl::cps::Callback callback ) { auto request = std::make_shared( @@ -2378,7 +2378,7 @@ void Device::setReturnModeReq( } void Device::getReturnModeReq( - sscl::Callback callback + sscl::cps::Callback callback ) { auto request = std::make_shared( diff --git a/commonLibs/livoxProto1/device.h b/commonLibs/livoxProto1/device.h index 6646cec..d409f5a 100644 --- a/commonLibs/livoxProto1/device.h +++ b/commonLibs/livoxProto1/device.h @@ -18,7 +18,7 @@ #include #include #include "protocol.h" -#include +#include #include // Custom hash function for std::pair @@ -163,20 +163,20 @@ public: getReturnModeReqCbFn; // Async connection methods - void connectReq(sscl::Callback callback); + void connectReq(sscl::cps::Callback callback); void connectToKnownDeviceReq( - sscl::Callback callback); + sscl::cps::Callback callback); void connectByDeviceIdentifierReq( - sscl::Callback callback); + sscl::cps::Callback callback); void executeHandshakeReq( const std::string& deviceIP, - sscl::Callback callback); - void disconnectReq(sscl::Callback callback); - void enablePcloudDataReq(sscl::Callback callback); - void disablePcloudDataReq(sscl::Callback callback); + sscl::cps::Callback callback); + void disconnectReq(sscl::cps::Callback callback); + void enablePcloudDataReq(sscl::cps::Callback callback); + void disablePcloudDataReq(sscl::cps::Callback callback); void setReturnModeReq( - uint8_t returnMode, sscl::Callback callback); - void getReturnModeReq(sscl::Callback callback); + uint8_t returnMode, sscl::cps::Callback callback); + void getReturnModeReq(sscl::cps::Callback callback); public: comms::DiscoveredDevice discoveredDevice; diff --git a/commonLibs/livoxProto1/livoxProto1.cpp b/commonLibs/livoxProto1/livoxProto1.cpp index 11a5407..41b1c46 100644 --- a/commonLibs/livoxProto1/livoxProto1.cpp +++ b/commonLibs/livoxProto1/livoxProto1.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include #include #include "livoxProto1.h" #include "device.h" @@ -16,7 +16,7 @@ void livoxProto1_getOrCreateDeviceReq( int commandTimeoutMs, int retryDelayMs, const std::string& smoIp, uint8_t smoSubnetNbits, uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort, - sscl::Callback callback + sscl::cps::Callback callback ) { // Get the global DeviceManager instance @@ -39,7 +39,7 @@ void livoxProto1_getOrCreateDeviceReq( void livoxProto1_destroyDeviceReq( std::shared_ptr device, - sscl::Callback callback + sscl::cps::Callback callback ) { auto& protoState = livoxProto1::getProtoState(); @@ -67,7 +67,7 @@ void livoxProto1_exit(void) void livoxProto1_device_enablePcloudDataReq( std::shared_ptr device, - sscl::Callback callback + sscl::cps::Callback callback ) { if (!device) @@ -81,7 +81,7 @@ void livoxProto1_device_enablePcloudDataReq( void livoxProto1_device_disablePcloudDataReq( std::shared_ptr device, - sscl::Callback callback + sscl::cps::Callback callback ) { if (!device) @@ -95,7 +95,7 @@ void livoxProto1_device_disablePcloudDataReq( void livoxProto1_device_getReturnModeReq( std::shared_ptr device, - sscl::Callback callback + sscl::cps::Callback callback ) { if (!device) diff --git a/commonLibs/livoxProto1/livoxProto1.h b/commonLibs/livoxProto1/livoxProto1.h index 834b58d..8c203a2 100644 --- a/commonLibs/livoxProto1/livoxProto1.h +++ b/commonLibs/livoxProto1/livoxProto1.h @@ -6,7 +6,7 @@ #include #include #include -#include +#include #include // Forward declarations @@ -64,30 +64,30 @@ typedef void livoxProto1_getOrCreateDeviceReqFn( int commandTimeoutMs, int retryDelayMs, const std::string& smoIp, uint8_t smoSubnetNbits, uint16_t dataPort, uint16_t cmdPort, uint16_t imuPort, - sscl::Callback callback); + sscl::cps::Callback callback); typedef std::function livoxProto1_destroyDeviceReqCbFn; typedef void livoxProto1_destroyDeviceReqFn( std::shared_ptr device, - sscl::Callback callback); + sscl::cps::Callback callback); typedef std::function livoxProto1_device_enablePcloudDataReqCbFn; typedef void livoxProto1_device_enablePcloudDataReqFn( std::shared_ptr device, - sscl::Callback callback); + sscl::cps::Callback callback); typedef std::function livoxProto1_device_disablePcloudDataReqCbFn; typedef void livoxProto1_device_disablePcloudDataReqFn( std::shared_ptr device, - sscl::Callback callback); + sscl::cps::Callback callback); typedef std::function livoxProto1_device_getReturnModeReqCbFn; typedef void livoxProto1_device_getReturnModeReqFn( std::shared_ptr device, - sscl::Callback callback); + sscl::cps::Callback callback); typedef std::shared_ptr livoxProto1_getPcloudDataFdDescFn(void); diff --git a/docs/prompts/new-dap-spec-negtrins.md b/docs/prompts/new-dap-spec-negtrins.md new file mode 100644 index 0000000..2230acc --- /dev/null +++ b/docs/prompts/new-dap-spec-negtrins.md @@ -0,0 +1,35 @@ +Ok. We're extending the DAPS language to support specifying at most 1 postrin and at most 1 negtrin for each DAP spec. I.e: we'll no longer treat negtrins/postrins as their own qualeifaceapis attached to a deviceselector, but rather now negtrins and postrins will be attached to the nontrins they are triggered by. So looking at avia0.dapss: + +``` +// This line will be entirely deleted: ++edev|avia0|postrin( + from-stimbuff=pcloudAmbience| + interest-pc=85| + passband-count-lt-val=8 + ) +|livoxGen1()|livoxProto1(SMO_IP)|3JEDK380010Z39|| + +// And this one will be deleted too: ++edev|avia0|negtrin( + from-stimbuff=pcloudAmbience| + interest-pc=85|distraction-pc=90|intolerable-pc=95| + passband-count-gt-val=120 + ) + |livoxGen1()|livoxProto1(SMO_IP)|3JEDK380010Z39 + +// And this line will be modified from: ++edev|avia0|pcloudAmbience()|livoxGen1()|livoxProto1(SMO_IP)|3JEDK380010Z39|| + +// To: ++edev|avia0| postrin(interest-pc=85)| negtrin(interest-pc=85) |pcloudAmbience(passband-count-lt-val=8|passband-count-gt-val=120)|livoxGen1()|livoxProto1(SMO_IP)|3JEDK380010Z39|| +``` + +So we're adding the ability to spec intrins on nontrin stimbuffs. Any DAP spec can declare 0 or 1 negtrin and 0 or 1 postrin. These must be declared before the first qualeIfaceApi. + +Modify the current DAP spec lex/yacc files to support thissyntax; as well as the current DeviceAttachmentSpec struct type too. And update the current stimbuff devices if they use the current postrin/negtrin parameters. Let them look for these new postrin and negtrin specs and their params instead. For the official language-level spec, you can use these names for the new specifiers: + +``` +sensor-type|dev-identifier|postrin-spec(postrin-params)|negtrin-spec(negtrin-params)|quale-iface-api(quale-iface-api-params)|stim-buff-api(api-params)|provider(provider-params)|dev-selector +``` + +NB: The postrin and negtrin spec have no ordering requirement with respect to one another. They are only ordered with respect to the other specifiers in the DAP specs -- i.e: again, they must precede the qualeIfaceApi specifier. So the negtrin may appear before the postrin; and vice versa. \ No newline at end of file diff --git a/include/user/senseApiDesc.h b/include/user/senseApiDesc.h index 6b84598..f0460a9 100644 --- a/include/user/senseApiDesc.h +++ b/include/user/senseApiDesc.h @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include #define CL_TARGET_OPENCL_VERSION 120 #include @@ -57,10 +57,10 @@ typedef int (sal_mlo_finalizeIndFn)(void); typedef void (sal_mlo_attachDeviceReqFn)( const std::shared_ptr& desc, const std::shared_ptr& componentThread, - sscl::Callback cb); + sscl::cps::Callback cb); typedef void (sal_mlo_detachDeviceReqFn)( const std::shared_ptr& desc, - sscl::Callback cb); + sscl::cps::Callback cb); /** * @brief Hooks provided by Salmanoff to senseApi libraries. diff --git a/libspinscale b/libspinscale index b6eb502..a14d622 160000 --- a/libspinscale +++ b/libspinscale @@ -1 +1 @@ -Subproject commit b6eb502e56747b6a16b1ee83587e90e88137e4ce +Subproject commit a14d622eaf2c2e6a40c958e7d735ccd47d6e6d51 diff --git a/smocore/CMakeLists.txt b/smocore/CMakeLists.txt index a1f3018..3df91d0 100644 --- a/smocore/CMakeLists.txt +++ b/smocore/CMakeLists.txt @@ -7,6 +7,7 @@ add_library(smocore STATIC mind.cpp mindComponent.cpp componentThread.cpp + componentThreadTags.cpp opts.cpp # Body diff --git a/smocore/body/body.cpp b/smocore/body/body.cpp index a00935d..23d67ed 100644 --- a/smocore/body/body.cpp +++ b/smocore/body/body.cpp @@ -1,15 +1,12 @@ #include +#include #include -#include -#include -#include -#include -#include #include +#include #include +#include #include #include -#include namespace smo { namespace body { @@ -19,165 +16,71 @@ Body::Body(Mind &parent, const std::shared_ptr &thread) { } -class Body::InitializeReq -: public sscl::PostedAsynchronousContinuation +BodyViralPostingInvoker Body::initializeCReq() { -public: - InitializeReq( - sscl::PuppetApplication &parent, - const std::shared_ptr &caller, - sscl::Callback callback) - : sscl::PostedAsynchronousContinuation(caller, callback), - parent(parent) - {} - -private: - sscl::PuppetApplication &parent; - -public: - void initializeReq1_posted( - [[maybe_unused]] std::shared_ptr context - ) - { - auto self = sscl::ComponentThread::getSelf(); - if (self->id != SmoThreadId::BODY) - { - throw std::runtime_error(std::string(__func__) - + ": Must be executed on Body thread"); - } - - /** EXPLANATION: - * The ComponentThread instance we pass in here is the one that will be - * used by Senseapi libs to perform device-independent background - * operations. - * For example, liblivoxProto1's BroadcastListener will use this thread - * to listen for UDP broadcast dgrams from Livox devices. - * - * We used to use Marionette, but there's a strong argument for using - * Body instead since it's meant to handle device-management operations. - */ - // Upcast to Mind to access Mind-specific members - Mind &mind = static_cast(context->parent); - stim_buff::StimBuffApiManager::getInstance() - .loadAllStimBuffApiLibsFromOptions(mind.body.thread); - - /** EXPLANATION: - * Consider body::initializeReq to have been called if even one of its - * operations was executed at all, whether successfully or - * unsuccessfully. - */ - mind.bodyComponentInitialized = true; - - std::cout << stim_buff::StimBuffApiManager::getInstance().stringifyLibs() - << std::endl; - - if (OptionParser::getOptions().verbose) - { - std::cout << __func__ << ": About to initializeAllStimBuffApiLibs" - << '\n'; - } - stim_buff::StimBuffApiManager::getInstance() - .initializeAllStimBuffApiLibs(); - - if (OptionParser::getOptions().verbose) - { - std::cout << __func__ << ": About to attachAllSenseDevicesFromSpecs" - << '\n'; - } - device::DeviceManager::getInstance() - .attachAllUnattachedDevicesFromCmdlineReq( - {context, std::bind( - &InitializeReq::initializeReq2, - context.get(), context, - std::placeholders::_1)}); - } - - void initializeReq2( - [[maybe_unused]] std::shared_ptr context, - sscl::AsynchronousLoop &results - ) - { - std::cout << "Mrntt: attached " - << results.nSucceeded << " of " << results.nTotal - << " sense devices." << "\n"; - - callOriginalCb(results.nSucceeded > 0); - } -}; - -class Body::FinalizeReq -: public InitializeReq -{ -public: - using InitializeReq::InitializeReq; - -public: - void finalizeReq1_posted( - [[maybe_unused]] std::shared_ptr context - ) - { - auto self = sscl::ComponentThread::getSelf(); - if (self->id != SmoThreadId::BODY) - { - throw std::runtime_error(std::string(__func__) - + ": Must be executed on Body thread"); - } - - std::cout << "Mrntt: About to detach all sense devices." << "\n"; - device::DeviceManager::getInstance().detachAllAttachedDeviceRoles( - {context, std::bind( - &FinalizeReq::finalizeReq2, - context.get(), context, - std::placeholders::_1)}); - } - - void finalizeReq2( - [[maybe_unused]] std::shared_ptr context, - sscl::AsynchronousLoop &results - ) - { - std::cout << "Mrntt: Successfully detached " - << results.nSucceeded << " of " << results.nTotal - << " sense devices." << "\n"; - - std::cout << "Mrntt: About to finalize all stim buff api libs." << "\n"; - stim_buff::StimBuffApiManager::getInstance().finalizeAllStimBuffApiLibs(); - - std::cout << "Mrntt: About to unload all stim buff api libs." << "\n"; - stim_buff::StimBuffApiManager::getInstance().unloadAllStimBuffApiLibs(); - callOriginalCb(results.nSucceeded == results.nTotal); - } -}; - -void Body::initializeReq(sscl::Callback callback) -{ - auto mrntt = sscl::ComponentThread::getSelf(); - - if (mrntt->id != SmoThreadId::MRNTT) + auto self = sscl::ComponentThread::getSelf(); + if (self->id != SmoThreadId::BODY) { throw std::runtime_error(std::string(__func__) - + ": Must be invoked by Mrntt thread"); + + ": Must be executed on Body thread"); } - auto request = std::make_shared( - parent, mrntt, callback); + /** EXPLANATION: + * The ComponentThread instance we pass in here is the one that will be + * used by Senseapi libs to perform device-independent background + * operations. + * For example, liblivoxProto1's BroadcastListener will use this thread + * to listen for UDP broadcast dgrams from Livox devices. + * + * We used to use Marionette, but there's a strong argument for using + * Body instead since it's meant to handle device-management operations. + */ + // Upcast to Mind to access Mind-specific members + Mind &mind = static_cast(parent); + stim_buff::StimBuffApiManager::getInstance() + .loadAllStimBuffApiLibsFromOptions(mind.body.thread); - thread->getIoService().post( - STC(std::bind( - &InitializeReq::initializeReq1_posted, - request.get(), request))); + /** EXPLANATION: + * Consider body::initializeCReq to have been called if even one of its + * operations was executed at all, whether successfully or + * unsuccessfully. + */ + mind.bodyComponentInitialized = true; + + std::cout << stim_buff::StimBuffApiManager::getInstance().stringifyLibs() + << std::endl; + + if (OptionParser::getOptions().verbose) + { + std::cout << __func__ << ": About to initializeAllStimBuffApiLibs" + << '\n'; + } + stim_buff::StimBuffApiManager::getInstance() + .initializeAllStimBuffApiLibs(); + + if (OptionParser::getOptions().verbose) + { + std::cout << __func__ << ": About to attachAllUnattachedDevicesFromCmdline" + << '\n'; + } + + sscl::MultiOperationResultSet attachResults = co_await + device::DeviceManager::getInstance() + .attachAllUnattachedDevicesFromCmdlineCReq(); + std::cout << "Mrntt: attached " + << attachResults.nSucceeded << " of " << attachResults.nTotal + << " sense devices." << "\n"; + + co_return attachResults.nSucceeded > 0; } -void Body::finalizeReq(sscl::Callback callback) +BodyViralPostingInvoker Body::finalizeCReq() { - auto mrntt = sscl::ComponentThread::getSelf(); - - if (mrntt->id != SmoThreadId::MRNTT) + auto self = sscl::ComponentThread::getSelf(); + if (self->id != SmoThreadId::BODY) { - std::cerr << __func__ << ": Must be invoked by Mrntt thread" - << std::endl; - callback.callbackFn(false); - return; + throw std::runtime_error(std::string(__func__) + + ": Must be executed on Body thread"); } // Upcast to Mind to access Mind-specific members @@ -186,17 +89,23 @@ void Body::finalizeReq(sscl::Callback callback) { std::cout << "Mrntt: Body component not initialized. " << "Skipping finalization." << "\n"; - callback.callbackFn(true); - return; + co_return true; } - auto request = std::make_shared( - parent, mrntt, callback); + std::cout << "Mrntt: About to detach all sense devices." << "\n"; + sscl::MultiOperationResultSet detachResults = co_await + device::DeviceManager::getInstance().detachAllAttachedDeviceRolesCReq(); + std::cout << "Mrntt: Successfully detached " + << detachResults.nSucceeded << " of " << detachResults.nTotal + << " sense devices." << "\n"; - thread->getIoService().post( - STC(std::bind( - &FinalizeReq::finalizeReq1_posted, - request.get(), request))); + std::cout << "Mrntt: About to finalize all stim buff api libs." << "\n"; + stim_buff::StimBuffApiManager::getInstance().finalizeAllStimBuffApiLibs(); + + std::cout << "Mrntt: About to unload all stim buff api libs." << "\n"; + stim_buff::StimBuffApiManager::getInstance().unloadAllStimBuffApiLibs(); + + co_return detachResults.nSucceeded == detachResults.nTotal; } } // namespace body diff --git a/smocore/componentThreadTags.cpp b/smocore/componentThreadTags.cpp new file mode 100644 index 0000000..20b4771 --- /dev/null +++ b/smocore/componentThreadTags.cpp @@ -0,0 +1,85 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace smo { +namespace mrntt { + +boost::asio::io_service &MrnttThreadTag::io_service() noexcept +{ + return thread->getIoService(); +} + +} // namespace mrntt + +namespace body { + +boost::asio::io_service &BodyThreadTag::io_service() +{ + if (!mind::globalMind) { + throw std::runtime_error( + "BodyThreadTag: globalMind not initialized"); + } + + return mind::globalMind->body.thread->getIoService(); +} + +} // namespace body + +namespace director { + +boost::asio::io_service &DirectorThreadTag::io_service() +{ + if (!mind::globalMind) { + throw std::runtime_error( + "DirectorThreadTag: globalMind not initialized"); + } + + return mind::globalMind->director.thread->getIoService(); +} + +} // namespace director + +namespace simulator { + +boost::asio::io_service &SimulatorThreadTag::io_service() +{ + if (!mind::globalMind) { + throw std::runtime_error( + "SimulatorThreadTag: globalMind not initialized"); + } + + return mind::globalMind->canvas.thread->getIoService(); +} + +} // namespace simulator + +boost::asio::io_service &SubconsciousThreadTag::io_service() +{ + if (!mind::globalMind) { + throw std::runtime_error( + "SubconsciousThreadTag: globalMind not initialized"); + } + + return mind::globalMind->subconscious.thread->getIoService(); +} + +boost::asio::io_service &WorldThreadTag::io_service() +{ + if (!mind::globalMind) { + throw std::runtime_error( + "WorldThreadTag: globalMind not initialized"); + } + + return mind::globalMind->world.thread->getIoService(); +} + +} // namespace smo diff --git a/smocore/deviceManager/deviceAttachmentPipeSpecParser.cpp b/smocore/deviceManager/deviceAttachmentPipeSpecParser.cpp index 990a605..faa4ad5 100644 --- a/smocore/deviceManager/deviceAttachmentPipeSpecParser.cpp +++ b/smocore/deviceManager/deviceAttachmentPipeSpecParser.cpp @@ -33,23 +33,27 @@ std::string DeviceManager::readDapSpecFile(const std::string& filename) void DeviceManager::collateAllDapSpecs(void) { OptionParser &options = OptionParser::getOptions(); - allDapSpecs = options.dapSpecs; + DeviceManager &dm = getInstance(); + dm.s.rsrc.allDapSpecs = options.dapSpecs; for (const auto& file : options.dapSpecFiles) { std::string fileContent = readDapSpecFile(file); - if (!allDapSpecs.empty()) { - allDapSpecs += "||"; + if (!dm.s.rsrc.allDapSpecs.empty()) { + dm.s.rsrc.allDapSpecs += "||"; } - allDapSpecs += fileContent; + dm.s.rsrc.allDapSpecs += fileContent; } } void DeviceManager::parseAllDapSpecs(void) { + DeviceManager &dm = getInstance(); auto file_deleter = [](FILE* f) { if (f) fclose(f); }; std::unique_ptr input( - fmemopen((void*)allDapSpecs.c_str(), allDapSpecs.size(), "r"), + fmemopen( + (void*)dm.s.rsrc.allDapSpecs.c_str(), + dm.s.rsrc.allDapSpecs.size(), "r"), file_deleter); if (!input) diff --git a/smocore/deviceManager/deviceAttachmentPipeSpecp.yy b/smocore/deviceManager/deviceAttachmentPipeSpecp.yy index 1d8093e..5f85ead 100644 --- a/smocore/deviceManager/deviceAttachmentPipeSpecp.yy +++ b/smocore/deviceManager/deviceAttachmentPipeSpecp.yy @@ -91,7 +91,7 @@ interoceptor_spec: *static_cast($3)); spec->sensorType = $1; - smo::device::DeviceManager::commandLineDASpecs.push_back(*spec); + smo::device::DeviceManager::getInstance().s.rsrc.commandLineDASpecs.push_back(*spec); delete $3; } @@ -103,7 +103,7 @@ extrospector_spec: *static_cast($3)); spec->sensorType = $1; - smo::device::DeviceManager::commandLineDASpecs.push_back(*spec); + smo::device::DeviceManager::getInstance().s.rsrc.commandLineDASpecs.push_back(*spec); delete $3; } diff --git a/smocore/deviceManager/deviceManager.cpp b/smocore/deviceManager/deviceManager.cpp index 45964c3..ecc8ed5 100644 --- a/smocore/deviceManager/deviceManager.cpp +++ b/smocore/deviceManager/deviceManager.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -6,28 +7,38 @@ #include #include #include -#include -#include -#include -#include #include +#include #include #include #include #include +#include #include +#include namespace smo { namespace device { -std::vector> - DeviceManager::deviceAttachmentSpecs; -std::vector> - DeviceManager::devices; -std::vector> - DeviceManager::attachedDeviceRoles; -std::vector - DeviceManager::commandLineDASpecs; +namespace { + +void assertMarionetteThread() +{ + auto self = sscl::ComponentThread::getSelf(); + if (self->id != SmoThreadId::MRNTT) + { + throw std::runtime_error( + std::string(__func__) + + ": Must be executed on Marionette thread"); + } +} + +boost::asio::io_service &marionetteIoService() +{ + return mrntt::MrnttThreadTag::io_service(); +} + +} // namespace DeviceManager::~DeviceManager() { @@ -37,780 +48,465 @@ const std::string DeviceManager::stringifyDeviceSpecs(void) { std::ostringstream oss; - for (const auto& spec : DeviceManager::deviceAttachmentSpecs) { + for (const auto& spec : getInstance().s.rsrc.deviceAttachmentSpecs) { oss << "Device Attachment Spec: " << spec->stringify(); } return oss.str(); } -class DeviceManager::NewDeviceAttachmentSpecInd -: public sscl::SerializedAsynchronousContinuation +mrntt::MrnttViralPostingInvoker +DeviceManager::attachStimBuffDeviceCReq( + const std::shared_ptr& spec) { -public: - NewDeviceAttachmentSpecInd( - const DeviceAttachmentSpec &spec, - const std::shared_ptr &caller, - sscl::Callback cb, - std::vector> requiredLocks) - : sscl::SerializedAsynchronousContinuation( - caller, cb, requiredLocks), - spec(spec) - {} + assertMarionetteThread(); -public: - DeviceAttachmentSpec spec; - std::shared_ptr specPtr; - std::shared_ptr device; - -public: - void newDeviceAttachmentSpecInd1_posted( - [[maybe_unused]] std::shared_ptr context - ) + auto &sbam = stim_buff::StimBuffApiManager::getInstance(); + auto libOpt = sbam.getStimBuffApiLibByApiName(spec->stimBuffApi); + if (!libOpt) { - // First, add the spec to deviceAttachmentSpecs if it's not already there - bool specExists = false; - for (const auto& existingSpec : DeviceManager::deviceAttachmentSpecs) - { - if (*existingSpec == spec) - { - specExists = true; - specPtr = existingSpec; - break; - } - } + std::cerr << "attachStimBuffDeviceCReq: No library found for API '" + << spec->stimBuffApi << "'" << std::endl; + co_return cpsBoundary::StimBuffDeviceOpResult{false, spec}; + } - if (!specExists) - { - specPtr = std::make_shared(spec); - DeviceManager::deviceAttachmentSpecs.push_back(specPtr); - } + auto &lib = *libOpt.value(); - bool deviceExists = false; - for (const auto& existingDevice : DeviceManager::devices) - { - if (existingDevice->deviceIdentifier != spec.deviceIdentifier) - { continue; } + if (lib.isBeingDestroyed.load()) + { + std::cerr << std::string(__func__) + ": Library is being destroyed" + << " for API '" << spec->stimBuffApi << "'. Bailing out.\n"; + co_return cpsBoundary::StimBuffDeviceOpResult{false, spec}; + } - device = existingDevice; - deviceExists = true; + if (!lib.stimBuffApiDesc.sal_mgmt_libOps.attachDeviceReq) + { + std::cerr << std::string(__func__) + ": attachDeviceReq() is NULL " + "for library '" << lib.libraryPath << "'" + << std::endl; + co_return cpsBoundary::StimBuffDeviceOpResult{false, spec}; + } + + sscl::co::CoQutex::ReleaseHandle sbamGuard = + co_await sbam.s.lock.getAcquireInvocationAndSuspensionPolicy(); + sscl::co::CoQutex::ReleaseHandle libGuard = + co_await lib.s.lock.getAcquireInvocationAndSuspensionPolicy(); + sbamGuard.release(); + + /** EXPLANATION: + * We pass in either the body or world thread here, depending on whether + * the device is an introspector (idev) or extrospector (edev). + * + * Introspectors are attached to the body thread; extrospectors are + * attached to the world thread. + */ + std::shared_ptr threadForAttachment; + if (spec->sensorType == 'e') + { + threadForAttachment = mind::globalMind->world.thread; + std::cout << __func__ << ": Attaching edev " + << spec->deviceIdentifier << " to world thread" << "\n"; + } + else + { + threadForAttachment = mind::globalMind->body.thread; + std::cout << __func__ << ": Attaching non-edev " + << spec->deviceIdentifier << " to body thread" << "\n"; + } + + cpsBoundary::StimBuffDeviceOpResult result = co_await + cpsBoundary::AttachStimBuffDeviceAReq( + spec, lib, threadForAttachment, marionetteIoService()); + + co_return result; +} + +mrntt::MrnttViralPostingInvoker +DeviceManager::detachStimBuffDeviceCReq( + const std::shared_ptr& spec) +{ + assertMarionetteThread(); + + auto &sbam = stim_buff::StimBuffApiManager::getInstance(); + auto libOpt = sbam.getStimBuffApiLibByApiName(spec->stimBuffApi); + if (!libOpt) + { + std::cerr << "detachStimBuffDeviceCReq: No library found for API '" + << spec->stimBuffApi << "'" << std::endl; + co_return cpsBoundary::StimBuffDeviceOpResult{false, spec}; + } + + auto &lib = *libOpt.value(); + + if (lib.isBeingDestroyed.load()) + { + std::cerr << std::string(__func__) + ": Library is being destroyed" + << " for API '" << spec->stimBuffApi << "'. Bailing out.\n"; + co_return cpsBoundary::StimBuffDeviceOpResult{false, spec}; + } + + if (!lib.stimBuffApiDesc.sal_mgmt_libOps.detachDeviceReq) + { + std::cerr << std::string(__func__) + ": detachDeviceReq() is NULL " + "for library '" << lib.libraryPath << "'" + << std::endl; + co_return cpsBoundary::StimBuffDeviceOpResult{false, spec}; + } + + sscl::co::CoQutex::ReleaseHandle sbamGuard = + co_await sbam.s.lock.getAcquireInvocationAndSuspensionPolicy(); + sscl::co::CoQutex::ReleaseHandle libGuard = + co_await lib.s.lock.getAcquireInvocationAndSuspensionPolicy(); + sbamGuard.release(); + + cpsBoundary::StimBuffDeviceOpResult result = co_await + cpsBoundary::DetachStimBuffDeviceAReq( + spec, lib, marionetteIoService()); + + co_return result; +} + +mrntt::MrnttViralPostingInvoker +DeviceManager::newDeviceAttachmentSpecIndCReq( + const DeviceAttachmentSpec &spec) +{ + assertMarionetteThread(); + + DeviceManager &dm = getInstance(); + sscl::co::CoQutex::ReleaseHandle dmGuard = + co_await dm.s.lock.getAcquireInvocationAndSuspensionPolicy(); + + // First, add the spec to deviceAttachmentSpecs if it's not already there + std::shared_ptr specPtr; + bool specExists = false; + for (const auto& existingSpec : dm.s.rsrc.deviceAttachmentSpecs) + { + if (*existingSpec == spec) + { + specExists = true; + specPtr = existingSpec; break; } - - // If device doesn't exist, create a new one and add it - if (!device) - { - device = std::make_shared(spec.deviceIdentifier); - DeviceManager::devices.push_back(device); - } - - // Check if a DeviceRole w/ this spec already exists in attachedDeviceRoles - bool deviceRoleExists = false; - std::shared_ptr existingDeviceRole = nullptr; - for (const auto& role : DeviceManager::attachedDeviceRoles) - { - if (*role->deviceAttachmentSpec == spec) - { - deviceRoleExists = true; - existingDeviceRole = role; - break; - } - } - - // If DeviceRole exists, both spec and device must also exist - if (deviceRoleExists) - { - if (!specExists || !deviceExists) - { - throw std::runtime_error( - "Program error: DeviceRole exists but spec or device doesn't " - "pre-exist. specExists=" + std::to_string(specExists) + - ", deviceExists=" + std::to_string(deviceExists)); - } - - // Already attached, callback with success and return - callOriginalCb(true, existingDeviceRole, specPtr); - return; - } - - DeviceManager::getInstance().attachStimBuffDeviceReq( - specPtr, - {context, std::bind( - &NewDeviceAttachmentSpecInd::newDeviceAttachmentSpecInd2, - context.get(), context, - std::placeholders::_1, std::placeholders::_2)}); } - void newDeviceAttachmentSpecInd2( - [[maybe_unused]] std::shared_ptr context, - bool success, - std::shared_ptr deviceSpec - ) + if (!specExists) { - if (!success) + specPtr = std::make_shared(spec); + dm.s.rsrc.deviceAttachmentSpecs.push_back(specPtr); + } + + std::shared_ptr device; + bool deviceExists = false; + for (const auto& existingDevice : dm.s.rsrc.devices) + { + if (existingDevice->deviceIdentifier != spec.deviceIdentifier) + { continue; } + + device = existingDevice; + deviceExists = true; + break; + } + + // If device doesn't exist, create a new one and add it + if (!device) + { + device = std::make_shared(spec.deviceIdentifier); + dm.s.rsrc.devices.push_back(device); + } + + // Check if a DeviceRole w/ this spec already exists in attachedDeviceRoles + std::shared_ptr existingDeviceRole; + bool deviceRoleExists = false; + for (const auto& role : dm.s.rsrc.attachedDeviceRoles) + { + if (*role->deviceAttachmentSpec == spec) { - std::cerr << __func__ << ": Attach failed for device spec " - << deviceSpec->stringify() << std::endl; - callOriginalCb(false, nullptr, deviceSpec); - return; - } - - try { - // Create DeviceRole and add it to both DeviceManager's and Device's collections - auto deviceRole = std::make_shared(*device, specPtr); - device->deviceRoles.push_back(deviceRole); - DeviceManager::attachedDeviceRoles.push_back(deviceRole); - - // Callback with success - callOriginalCb(true, deviceRole, specPtr); - } catch (const std::exception& e) { - // Attach failed, callback with error - callOriginalCb(false, nullptr, specPtr); + deviceRoleExists = true; + existingDeviceRole = role; + break; } } -}; -class DeviceManager::RemoveDeviceAttachmentSpecReq -: public sscl::SerializedAsynchronousContinuation + // If DeviceRole exists, both spec and device must also exist + if (deviceRoleExists) + { + if (!specExists || !deviceExists) + { + throw std::runtime_error( + "Program error: DeviceRole exists but spec or device doesn't " + "pre-exist. specExists=" + std::to_string(specExists) + + ", deviceExists=" + std::to_string(deviceExists)); + } + + // Already attached, return success + co_return DeviceAttachmentIndResult{ + true, existingDeviceRole, specPtr}; + } + + cpsBoundary::StimBuffDeviceOpResult attachResult = + co_await attachStimBuffDeviceCReq(specPtr); + + if (!attachResult.success) + { + std::cerr << __func__ << ": Attach failed for device spec " + << attachResult.deviceSpec->stringify() << std::endl; + co_return DeviceAttachmentIndResult{ + false, nullptr, attachResult.deviceSpec}; + } + + try { + // Create DeviceRole and add it to both DeviceManager's and Device's collections + auto deviceRole = std::make_shared(*device, specPtr); + device->deviceRoles.push_back(deviceRole); + dm.s.rsrc.attachedDeviceRoles.push_back(deviceRole); + + co_return DeviceAttachmentIndResult{true, deviceRole, specPtr}; + } catch (const std::exception&) { + // Attach failed, return error + co_return DeviceAttachmentIndResult{false, nullptr, specPtr}; + } +} + +mrntt::MrnttViralPostingInvoker +DeviceManager::removeDeviceAttachmentSpecCReq( + const DeviceAttachmentSpec &spec) { -public: - RemoveDeviceAttachmentSpecReq( - const DeviceAttachmentSpec &spec, - const std::shared_ptr &caller, - sscl::Callback cb, - std::vector> requiredLocks) - : sscl::SerializedAsynchronousContinuation( - caller, cb, requiredLocks), - spec(spec) - {} + assertMarionetteThread(); -public: - DeviceAttachmentSpec spec; + DeviceManager &dm = getInstance(); + sscl::co::CoQutex::ReleaseHandle dmGuard = + co_await dm.s.lock.getAcquireInvocationAndSuspensionPolicy(); + + // Find the shared_ptr to the spec in the collection std::shared_ptr specPtr; - -public: - void removeDeviceAttachmentSpecReq1_posted( - [[maybe_unused]] std::shared_ptr context - ) + for (const auto& existingSpec : dm.s.rsrc.deviceAttachmentSpecs) { - // Find the shared_ptr to the spec in the collection - for (const auto& existingSpec : DeviceManager::deviceAttachmentSpecs) + if (*existingSpec == spec) { - if (*existingSpec == spec) + specPtr = existingSpec; + break; + } + } + + if (!specPtr) + { + // Spec not found, return failure + co_return DeviceAttachmentIndResult{false, nullptr, nullptr}; + } + + // Call detachStimBuffDeviceCReq first - only clean up metadata if this succeeds + cpsBoundary::StimBuffDeviceOpResult detachResult = + co_await detachStimBuffDeviceCReq(specPtr); + + if (!detachResult.success) + { + // Detach failed, metadata remains intact + co_return DeviceAttachmentIndResult{ + false, nullptr, detachResult.deviceSpec}; + } + + // Detach succeeded, now find and clean up metadata + try { + // Find the DeviceRole in attachedDeviceRoles + auto deviceRoleIt = std::find_if( + dm.s.rsrc.attachedDeviceRoles.begin(), + dm.s.rsrc.attachedDeviceRoles.end(), + [&specPtr](const std::shared_ptr &role) { + return *role->deviceAttachmentSpec == *specPtr; + } + ); + + if (deviceRoleIt == dm.s.rsrc.attachedDeviceRoles.end()) + { + // DeviceRole not found, return failure + co_return DeviceAttachmentIndResult{ + false, nullptr, detachResult.deviceSpec}; + } + + auto deviceRole = *deviceRoleIt; + auto& device = deviceRole->parentDevice; + + // Remove DeviceRole from DeviceManager's collection + dm.s.rsrc.attachedDeviceRoles.erase(deviceRoleIt); + + // Remove DeviceRole from Device's collection + auto deviceRoleIt2 = std::find( + device.deviceRoles.begin(), + device.deviceRoles.end(), + deviceRole); + if (deviceRoleIt2 != device.deviceRoles.end()) + { + device.deviceRoles.erase(deviceRoleIt2); + } + + // Remove DeviceAttachmentSpec from deviceAttachmentSpecs collection + auto specIt = std::find_if( + dm.s.rsrc.deviceAttachmentSpecs.begin(), + dm.s.rsrc.deviceAttachmentSpecs.end(), + [&specPtr]( + const std::shared_ptr &existingSpec) { - specPtr = existingSpec; + return *existingSpec == *specPtr; + } + ); + + if (specIt != dm.s.rsrc.deviceAttachmentSpecs.end()) + { + dm.s.rsrc.deviceAttachmentSpecs.erase(specIt); + } + + co_return DeviceAttachmentIndResult{ + true, deviceRole, detachResult.deviceSpec}; + } catch (const std::exception&) { + // Cleanup failed, return error + co_return DeviceAttachmentIndResult{ + false, nullptr, detachResult.deviceSpec}; + } +} + +mrntt::MrnttViralPostingInvoker +DeviceManager::attachAllUnattachedDevicesFromCReq( + const std::shared_ptr> &specs) +{ + assertMarionetteThread(); + + if (specs->empty()) { + co_return sscl::MultiOperationResultSet{}; + } + + sscl::co::Group group; + std::vector< + mrntt::MrnttViralPostingInvoker> + invokers; + invokers.reserve(specs->size()); + + for (const auto &spec : *specs) + { + invokers.emplace_back(newDeviceAttachmentSpecIndCReq(spec)); + group.add(invokers.back()); + } + + co_await group.getAwaitAllSettlementsInvoker(); + group.checkForAndReThrowGroupExceptions(); + + unsigned int nSucceeded = 0; + unsigned int nFailed = 0; + for (auto &invoker : invokers) + { + if (invoker.completedReturnValues().myReturnValue.success) { + nSucceeded++; + } else { + nFailed++; + } + } + + if (OptionParser::getOptions().verbose) + { + std::cout << __func__ << ": " << nSucceeded + << " devices attached, " + << nFailed << " devices failed\n"; + } + + co_return sscl::MultiOperationResultSet( + static_cast(specs->size()), nSucceeded, nFailed); +} + +mrntt::MrnttViralPostingInvoker +DeviceManager::attachAllUnattachedDevicesFromKnownListCReq() +{ + assertMarionetteThread(); + + DeviceManager &dm = getInstance(); + sscl::co::CoQutex::ReleaseHandle dmGuard = + co_await dm.s.lock.getAcquireInvocationAndSuspensionPolicy(); + + auto unattachedSpecs = std::make_shared< + std::vector>(); + + for (const auto& spec : dm.s.rsrc.deviceAttachmentSpecs) + { + bool isAttached = false; + + for (const auto& role : dm.s.rsrc.attachedDeviceRoles) + { + if (*role->deviceAttachmentSpec == *spec) + { + isAttached = true; break; } } - if (!specPtr) - { - // Spec not found, callback with failure and return - callOriginalCb(false, nullptr); - return; - } - - // Call detachStimBuffDeviceReq first - only clean up metadata if this succeeds - DeviceManager::getInstance().detachStimBuffDeviceReq( - specPtr, - {context, std::bind( - &RemoveDeviceAttachmentSpecReq::removeDeviceAttachmentSpecReq2, - context.get(), context, - std::placeholders::_1, std::placeholders::_2)}); - } - - void removeDeviceAttachmentSpecReq2( - [[maybe_unused]] std::shared_ptr context, - bool success, - std::shared_ptr deviceSpec - ) - { - if (!success) - { - // Detach failed, callback with failure (metadata remains intact) - callOriginalCb(false, deviceSpec); - return; - } - - // Detach succeeded, now find and clean up metadata - try { - // Find the DeviceRole in attachedDeviceRoles - auto deviceRoleIt = std::find_if( - DeviceManager::attachedDeviceRoles.begin(), - DeviceManager::attachedDeviceRoles.end(), - [&specPtr = specPtr](const std::shared_ptr &role) { - return *role->deviceAttachmentSpec == *specPtr; - } - ); - - if (deviceRoleIt == DeviceManager::attachedDeviceRoles.end()) - { - // DeviceRole not found, callback with failure - callOriginalCb(false, deviceSpec); - return; - } - - auto deviceRole = *deviceRoleIt; - auto& device = deviceRole->parentDevice; - - // Remove DeviceRole from DeviceManager's collection - DeviceManager::attachedDeviceRoles.erase(deviceRoleIt); - - // Remove DeviceRole from Device's collection - auto deviceRoleIt2 = std::find( - device.deviceRoles.begin(), - device.deviceRoles.end(), - deviceRole); - if (deviceRoleIt2 != device.deviceRoles.end()) - { - device.deviceRoles.erase(deviceRoleIt2); - } - - // Remove DeviceAttachmentSpec from deviceAttachmentSpecs collection - auto specIt = std::find_if( - DeviceManager::deviceAttachmentSpecs.begin(), - DeviceManager::deviceAttachmentSpecs.end(), - [&specPtr = specPtr]( - const std::shared_ptr &existingSpec) - { - return *existingSpec == *specPtr; - } - ); - - if (specIt != DeviceManager::deviceAttachmentSpecs.end()) - { - DeviceManager::deviceAttachmentSpecs.erase(specIt); - } - - // Callback with success - callOriginalCb(true, deviceSpec); - } catch (const std::exception& e) { - // Cleanup failed, callback with error - callOriginalCb(false, deviceSpec); + if (!isAttached) { + unattachedSpecs->push_back(*spec); } } -}; -void DeviceManager::newDeviceAttachmentSpecInd( - const DeviceAttachmentSpec &spec, - sscl::Callback callback) -{ - const auto& caller = sscl::ComponentThread::getSelf(); + dmGuard.release(); - auto request = std::make_shared( - spec, caller, callback, - sscl::LockSet::Set{ - std::ref(DeviceManager::getInstance().qutex) - }); - - NewDeviceAttachmentSpecInd::LockerAndInvoker lockvoker( - *request, mrntt::mrntt.thread, - std::bind( - &NewDeviceAttachmentSpecInd::newDeviceAttachmentSpecInd1_posted, - request.get(), request)); + co_return co_await attachAllUnattachedDevicesFromCReq(unattachedSpecs); } -void DeviceManager::removeDeviceAttachmentSpecReq( - const DeviceAttachmentSpec &spec, - sscl::Callback callback) -{ - const auto& caller = sscl::ComponentThread::getSelf(); - - auto request = std::make_shared( - spec, caller, callback, - sscl::LockSet::Set{ - std::ref(DeviceManager::getInstance().qutex) - }); - - RemoveDeviceAttachmentSpecReq::LockerAndInvoker lockvoker( - *request, mrntt::mrntt.thread, - std::bind( - &RemoveDeviceAttachmentSpecReq - ::removeDeviceAttachmentSpecReq1_posted, - request.get(), request)); -} - -class DeviceManager::AttachStimBuffDeviceReq -: public sscl::SerializedAsynchronousContinuation< - DeviceManager::attachStimBuffDeviceReqCbFn> -{ -public: - AttachStimBuffDeviceReq( - const std::shared_ptr& spec, - const std::shared_ptr &caller, - sscl::Callback cb, - std::shared_ptr &stimBuffApiLib, - std::vector> requiredLocks) - : sscl::SerializedAsynchronousContinuation( - caller, cb, requiredLocks), - spec(spec), stimBuffApiLib(stimBuffApiLib) - {} - -public: - void attachStimBuffDeviceReq1_posted( - [[maybe_unused]] std::shared_ptr context - ) - { - if (caller->id != SmoThreadId::MRNTT) - { - std::cerr << std::string(__func__) - << ": executed on non-mrntt thread: " - << caller->name << std::endl; - callOriginalCb(false, spec); - return; - } - - if (stimBuffApiLib->isBeingDestroyed.load()) - { - std::cerr << std::string(__func__) + ": Library is being destroyed" - << " for API '" << spec->stimBuffApi << "'. Bailing out.\n"; - callOriginalCb(false, spec); - return; - } - - if (!stimBuffApiLib->stimBuffApiDesc.sal_mgmt_libOps.attachDeviceReq) - { - std::cerr << std::string(__func__) + ": attachDeviceReq() is NULL " - "for library '" << stimBuffApiLib->libraryPath << "'" - << std::endl; - callOriginalCb(false, spec); - return; - } - - releaseQutexEarly(stim_buff::StimBuffApiManager::getInstance().qutex); - - /** EXPLANATION: - * We pass in either the body or world thread here, depending on whether - * the device is an introspector (idev) or extrospector (edev). - * - * Introspectors are attached to the body thread; extrospectors are - * attached to the world thread. - */ - std::shared_ptr threadForAttachment; - if (spec->sensorType == 'e') - { - threadForAttachment = mind::globalMind->world.thread; - std::cout << __func__ << ": Attaching edev " - << spec->deviceIdentifier << " to world thread" << "\n"; - } - else - { - threadForAttachment = mind::globalMind->body.thread; - std::cout << __func__ << ": Attaching non-edev " - << spec->deviceIdentifier << " to body thread" << "\n"; - } - - stimBuffApiLib->stimBuffApiDesc.sal_mgmt_libOps.attachDeviceReq( - spec, threadForAttachment, - {context, std::bind( - &AttachStimBuffDeviceReq::attachStimBuffDeviceReq2, - context.get(), context, - std::placeholders::_1, std::placeholders::_2)}); - } - - void attachStimBuffDeviceReq2( - [[maybe_unused]] std::shared_ptr context, - bool success, - std::shared_ptr deviceSpec - ) - { - callOriginalCb(success, deviceSpec); - } - - void detachStimBuffDeviceReq1_posted( - [[maybe_unused]] std::shared_ptr context - ) - { - if (caller->id != SmoThreadId::MRNTT) - { - std::cerr << std::string(__func__) - << ": executed on non-mrntt thread: " - << caller->name << std::endl; - callOriginalCb(false, spec); - return; - } - - if (stimBuffApiLib->isBeingDestroyed.load()) - { - std::cerr << std::string(__func__) + ": Library is being destroyed" - << " for API '" << spec->stimBuffApi << "'. Bailing out.\n"; - callOriginalCb(false, spec); - return; - } - - if (!stimBuffApiLib->stimBuffApiDesc.sal_mgmt_libOps.detachDeviceReq) - { - std::cerr << std::string(__func__) + ": detachDeviceReq() is NULL " - "for library '" << stimBuffApiLib->libraryPath << "'" - << std::endl; - callOriginalCb(false, spec); - return; - } - - releaseQutexEarly(stim_buff::StimBuffApiManager::getInstance().qutex); - - stimBuffApiLib->stimBuffApiDesc.sal_mgmt_libOps.detachDeviceReq( - spec, - {context, std::bind( - &AttachStimBuffDeviceReq::detachStimBuffDeviceReq2, - context.get(), context, - std::placeholders::_1, std::placeholders::_2)}); - } - - void detachStimBuffDeviceReq2( - [[maybe_unused]] std::shared_ptr context, - bool success, - std::shared_ptr deviceSpec - ) - { - callOriginalCb(success, deviceSpec); - } - -public: - std::shared_ptr spec; - std::shared_ptr stimBuffApiLib; -}; - -void DeviceManager::attachStimBuffDeviceReq( - const std::shared_ptr& spec, - sscl::Callback cb - ) -{ - const auto& caller = sscl::ComponentThread::getSelf(); - - // Get the stim buff API lib's qutex - auto libOpt = stim_buff::StimBuffApiManager::getInstance() - .getStimBuffApiLibByApiName(spec->stimBuffApi); - - if (!libOpt) - { - std::cerr << "attachStimBuffDeviceReq: No library found for API '" - << spec->stimBuffApi << "'" << std::endl; - cb.callbackFn(false, spec); - return; - } - - auto& lib = *libOpt.value(); - - auto request = std::make_shared( - spec, caller, cb, libOpt.value(), - sscl::LockSet::Set{ - std::ref(stim_buff::StimBuffApiManager::getInstance().qutex), - std::ref(lib.qutex) - }); - - AttachStimBuffDeviceReq::LockerAndInvoker lockvoker( - *request, mrntt::mrntt.thread, - std::bind( - &AttachStimBuffDeviceReq::attachStimBuffDeviceReq1_posted, - request.get(), request)); -} - -void DeviceManager::detachStimBuffDeviceReq( - const std::shared_ptr& spec, - sscl::Callback cb - ) -{ - const auto& caller = sscl::ComponentThread::getSelf(); - - // Get the stim buff API lib's qutex - auto libOpt = stim_buff::StimBuffApiManager::getInstance() - .getStimBuffApiLibByApiName(spec->stimBuffApi); - - if (!libOpt) - { - std::cerr << "detachStimBuffDeviceReq: No library found for API '" - << spec->stimBuffApi << "'" << std::endl; - cb.callbackFn(false, spec); - return; - } - - auto& lib = *libOpt.value(); - - auto request = std::make_shared( - spec, caller, cb, libOpt.value(), - sscl::LockSet::Set{ - std::ref(stim_buff::StimBuffApiManager::getInstance().qutex), - std::ref(lib.qutex) - }); - - DetachStimBuffDeviceReq::LockerAndInvoker lockvoker( - *request, mrntt::mrntt.thread, - std::bind( - &DetachStimBuffDeviceReq::detachStimBuffDeviceReq1_posted, - request.get(), request)); -} - -class DeviceManager::AttachAllUnattachedDevicesFromReq -: public sscl::PostedAsynchronousContinuation< - attachAllUnattachedDevicesFromReqCbFn> -{ -public: - AttachAllUnattachedDevicesFromReq( - const unsigned int totalNSpecs, - const std::shared_ptr>& specs, - const std::shared_ptr& caller, - sscl::Callback cb) - : sscl::PostedAsynchronousContinuation( - caller, cb), - loop(totalNSpecs), specs(specs) - {} - -public: - void attachAllUnattachedDevicesFromReq1_posted( - [[maybe_unused]] std::shared_ptr - context - ) - { - for (const auto& spec : *specs) - { - DeviceManager::getInstance().newDeviceAttachmentSpecInd( - spec, - {context, std::bind( - &AttachAllUnattachedDevicesFromReq - ::attachAllUnattachedDevicesFromReq2, - context.get(), context, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3)}); - } - } - - // Callback methods for the attachment sequence - void attachAllUnattachedDevicesFromReq2( - std::shared_ptr context, - bool success, [[maybe_unused]] std::shared_ptr deviceRole, - std::shared_ptr spec - ) - { - if (!success) - { - std::cerr << __func__ << ": Failed to attach device: " - << spec->deviceIdentifier << "\n"; - // Fallthrough. - } - - if (!context->loop.incrementSuccessOrFailureAndTestForCompletionDueTo( - success)) - { - return; - } - - if (OptionParser::getOptions().verbose) - { - std::cout << __func__ << ": " << context->loop.nSucceeded.load() - << " devices attached, " - << context->loop.nFailed.load() << " devices failed\n"; - } - - context->callOriginalCb(loop); - } - -public: - sscl::AsynchronousLoop loop; - std::shared_ptr> specs; -}; - -void DeviceManager::attachAllUnattachedDevicesFromReq( - const std::shared_ptr> &specs, - sscl::Callback cb - ) -{ - if (specs->size() == 0) - { - sscl::AsynchronousLoop tmp(0); - cb.callbackFn(tmp); - return; - } - - const auto& caller = sscl::ComponentThread::getSelf(); - auto request = std::make_shared( - specs->size(), specs, caller, std::move(cb)); - - mrntt::mrntt.thread->getIoService().post( - STC(std::bind( - &AttachAllUnattachedDevicesFromReq - ::attachAllUnattachedDevicesFromReq1_posted, - request.get(), request))); -} - -void DeviceManager::attachAllUnattachedDevicesFromCmdlineReq( - sscl::Callback cb - ) +mrntt::MrnttViralPostingInvoker +DeviceManager::attachAllUnattachedDevicesFromCmdlineCReq() { auto specs = std::make_shared>( - commandLineDASpecs); - attachAllUnattachedDevicesFromReq(specs, std::move(cb)); + getInstance().s.rsrc.commandLineDASpecs); + + co_return co_await attachAllUnattachedDevicesFromCReq(specs); } -class DeviceManager::AttachAllUnattachedDevicesFromKnownListReq -: public sscl::SerializedAsynchronousContinuation< - attachAllUnattachedDevicesFromReqCbFn> +mrntt::MrnttViralPostingInvoker +DeviceManager::detachAllAttachedDeviceRolesCReq() { -public: - AttachAllUnattachedDevicesFromKnownListReq( - const std::shared_ptr &caller, - sscl::Callback cb, - std::vector> requiredLocks) - : sscl::SerializedAsynchronousContinuation< - attachAllUnattachedDevicesFromReqCbFn>( - caller, cb, requiredLocks) - {} + assertMarionetteThread(); -public: - void attachAllUnattachedDevicesFromKnownListReq1_posted( - [[maybe_unused]] - std::shared_ptr context - ) + std::vector> specsToDetach; + specsToDetach.reserve(getInstance().s.rsrc.attachedDeviceRoles.size()); + for (const auto& deviceRole : getInstance().s.rsrc.attachedDeviceRoles) { - // Create a vector to hold unattached device specs - auto unattachedSpecs = std::make_shared< - std::vector>(); - - // Cycle through all DA specs in deviceAttachmentSpecs - for (const auto& spec : DeviceManager::deviceAttachmentSpecs) - { - bool isAttached = false; - - // Cross reference with attachedDeviceRoles - for (const auto& role : DeviceManager::attachedDeviceRoles) - { - if (*role->deviceAttachmentSpec == *spec) - { - isAttached = true; - break; - } - } - - // If spec doesn't appear in attachedDeviceRoles, add it to vector - if (!isAttached) { - unattachedSpecs->push_back(*spec); - } - } - - // Release the DeviceManager qutex early before calling the inner method - releaseQutexEarly(DeviceManager::getInstance().qutex); - - // Pass the vector to the existing function - DeviceManager::getInstance().attachAllUnattachedDevicesFromReq( - unattachedSpecs, - {context, std::bind( - &AttachAllUnattachedDevicesFromKnownListReq - ::attachAllUnattachedDevicesFromKnownListReq2, - context.get(), context, - std::placeholders::_1)}); + specsToDetach.push_back(deviceRole->deviceAttachmentSpec); } - void attachAllUnattachedDevicesFromKnownListReq2( - [[maybe_unused]] - std::shared_ptr context, - sscl::AsynchronousLoop loop - ) - { - callOriginalCb(loop); + if (specsToDetach.empty()) { + co_return sscl::MultiOperationResultSet{}; } -}; -void DeviceManager::attachAllUnattachedDevicesFromKnownListReq( - sscl::Callback cb - ) -{ - const auto& caller = sscl::ComponentThread::getSelf(); + sscl::co::Group group; + std::vector< + mrntt::MrnttViralPostingInvoker> + invokers; + invokers.reserve(specsToDetach.size()); - auto request = std::make_shared( - caller, cb, - sscl::LockSet::Set{ - std::ref(DeviceManager::getInstance().qutex) - }); - - AttachAllUnattachedDevicesFromKnownListReq::LockerAndInvoker lockvoker( - *request, mrntt::mrntt.thread, - std::bind( - &AttachAllUnattachedDevicesFromKnownListReq - ::attachAllUnattachedDevicesFromKnownListReq1_posted, - request.get(), request)); -} - -class DeviceManager::DetachAllAttachedDeviceRoles -: public sscl::PostedAsynchronousContinuation< - detachAllAttachedDeviceRolesCbFn> -{ -public: - DetachAllAttachedDeviceRoles( - const unsigned int totalNSpecs, - const std::shared_ptr& caller, - sscl::Callback cb) - : sscl::PostedAsynchronousContinuation( - caller, cb), - loop(totalNSpecs) - {} - - void detachAllAttachedDeviceRoles1_posted( - [[maybe_unused]] std::shared_ptr context - ) + for (const auto &spec : specsToDetach) { - for (const auto& deviceRole : DeviceManager::attachedDeviceRoles) - { - DeviceManager::getInstance().detachStimBuffDeviceReq( - deviceRole->deviceAttachmentSpec, - {context, std::bind( - &DetachAllAttachedDeviceRoles::detachAllAttachedDeviceRoles2, - context.get(), context, - std::placeholders::_1, std::placeholders::_2)}); + invokers.emplace_back(detachStimBuffDeviceCReq(spec)); + group.add(invokers.back()); + } + + co_await group.getAwaitAllSettlementsInvoker(); + group.checkForAndReThrowGroupExceptions(); + + unsigned int nSucceeded = 0; + unsigned int nFailed = 0; + for (auto &invoker : invokers) + { + if (invoker.completedReturnValues().myReturnValue.success) { + nSucceeded++; + } else { + nFailed++; } } - void detachAllAttachedDeviceRoles2( - std::shared_ptr context, - bool success, std::shared_ptr spec - ) + if (OptionParser::getOptions().verbose) { - if (!success) - { - std::cerr << __func__ << ": Failed to detach device: " - << spec->deviceIdentifier << "\n"; - // Fallthrough. - } - - if (!context->loop.incrementSuccessOrFailureAndTestForCompletionDueTo( - success)) - { - return; - } - - if (OptionParser::getOptions().verbose) - { - std::cout << __func__ << ": " << context->loop.nSucceeded.load() - << " devices detached, " - << context->loop.nFailed.load() << " devices failed\n"; - } - - context->callOriginalCb(loop); + std::cout << __func__ << ": " << nSucceeded + << " devices detached, " + << nFailed << " devices failed\n"; } -public: - sscl::AsynchronousLoop loop; -}; - -void DeviceManager::detachAllAttachedDeviceRoles( - sscl::Callback cb - ) -{ - if (DeviceManager::getInstance().attachedDeviceRoles.size() == 0) - { - sscl::AsynchronousLoop tmp(0); - cb.callbackFn(tmp); - return; - } - - const auto& caller = sscl::ComponentThread::getSelf(); - auto request = std::make_shared( - DeviceManager::getInstance().attachedDeviceRoles.size(), - caller, std::move(cb)); - - mrntt::mrntt.thread->getIoService().post( - STC(std::bind( - &DetachAllAttachedDeviceRoles::detachAllAttachedDeviceRoles1_posted, - request.get(), request))); + co_return sscl::MultiOperationResultSet( + static_cast(specsToDetach.size()), + nSucceeded, nFailed); } void DeviceManager::initializeDeviceReattacher() diff --git a/smocore/deviceManager/deviceReattacher.cpp b/smocore/deviceManager/deviceReattacher.cpp index 8e742d1..d1eae41 100644 --- a/smocore/deviceManager/deviceReattacher.cpp +++ b/smocore/deviceManager/deviceReattacher.cpp @@ -1,22 +1,20 @@ #include +#include #include #include #include -#include #include #include +#include namespace smo { namespace device { -static void reattachmentCb(sscl::AsynchronousLoop& results) -{ - if (results.nTotal == 0) { return; } +namespace { - std::cout << "DeviceReattacher: Successfully reattached " - << results.nSucceeded << " of " << results.nTotal << " devices" - << std::endl; -} +constexpr unsigned int reattachInFlightStaleThresholdMultiplier = 4; + +} // namespace DeviceReattacher::DeviceReattacher( DeviceManager& parent, std::shared_ptr ioThread) @@ -25,6 +23,22 @@ timer(ioThread->getIoService()) { } +mrntt::MrnttNonViralPostingInvoker DeviceReattacher::reattachKnownListCReq( + [[maybe_unused]] std::exception_ptr &exceptionPtr, + [[maybe_unused]] std::function callback) +{ + sscl::MultiOperationResultSet results = co_await + parent.attachAllUnattachedDevicesFromKnownListCReq(); + if (results.nTotal > 0) + { + std::cout << "DeviceReattacher: Successfully reattached " + << results.nSucceeded << " of " << results.nTotal + << " devices" << std::endl; + } + + co_return; +} + void DeviceReattacher::start() { shouldContinue = true; @@ -36,6 +50,14 @@ void DeviceReattacher::stop() { sscl::SpinLock::Guard lock(shouldContinueLock); shouldContinue = false; + reattachOpInFlight = false; + /** EXPLANATION: + * Do not call reattachCReqInvoker.reset() here. Forcibly destroying + * the invoker would tear down an in-flight reattach coroutine frame + * mid-operation. During normal program teardown the optional (and + * its invoker) are destroyed with the rest of the binary anyway; leave + * a running reattach time to finish if shutdown races with it. + */ } timer.cancel(); @@ -56,6 +78,21 @@ void DeviceReattacher::scheduleNextTimeout() std::bind(&DeviceReattacher::onTimeout, this, std::placeholders::_1)); } +void DeviceReattacher::holdReattachCReq() +{ + reattachOpInFlight = true; + lastReattachReqTimestamp = std::chrono::steady_clock::now(); + + reattachCReqInvoker.reset(); + reattachCReqInvoker.emplace(reattachKnownListCReq( + reattachLifetimeExceptionPtr, + [this]() + { + sscl::SpinLock::Guard lock(shouldContinueLock); + reattachOpInFlight = false; + })); +} + void DeviceReattacher::onTimeout(const boost::system::error_code& error) { // Timer was cancelled, which is expected when stopping @@ -75,9 +112,32 @@ void DeviceReattacher::onTimeout(const boost::system::error_code& error) return; } + const auto staleThreshold = std::chrono::milliseconds( + reattachInFlightStaleThresholdMultiplier + * CONFIG_MRNTT_DEVMGR_REATTACHER_PERIOD_MS); + // Attempt to reattach all unattached devices from the known list - parent.attachAllUnattachedDevicesFromKnownListReq( - { nullptr, reattachmentCb}); + if (!reattachOpInFlight) + { + holdReattachCReq(); + } + else + { + const auto elapsedSinceLastReattachReq = + std::chrono::steady_clock::now() - lastReattachReqTimestamp; + + if (elapsedSinceLastReattachReq >= staleThreshold) + { + std::cerr << "DeviceReattacher: Reattach op still in flight after " + << std::chrono::duration_cast( + elapsedSinceLastReattachReq).count() + << "ms (threshold " + << staleThreshold.count() + << "ms); forcing a new reattach request." + << std::endl; + holdReattachCReq(); + } + } // Schedule the next timeout scheduleNextTimeout(); diff --git a/smocore/include/body/body.h b/smocore/include/body/body.h index 78fb6df..add63e6 100644 --- a/smocore/include/body/body.h +++ b/smocore/include/body/body.h @@ -4,8 +4,7 @@ #include #include #include -#include -#include +#include namespace smo { @@ -20,16 +19,11 @@ public: Body(Mind &parent, const std::shared_ptr &thread); ~Body() = default; - typedef std::function bodyLifetimeMgmtOpCbFn; - void initializeReq(sscl::Callback callback); - void finalizeReq(sscl::Callback callback); - -private: - class InitializeReq; - class FinalizeReq; + BodyViralPostingInvoker initializeCReq(); + BodyViralPostingInvoker finalizeCReq(); }; } // namespace body } // namespace smo -#endif // _BODY_COMPONENT_H \ No newline at end of file +#endif // _BODY_COMPONENT_H diff --git a/smocore/include/body/bodyThread.h b/smocore/include/body/bodyThread.h new file mode 100644 index 0000000..42b2b5e --- /dev/null +++ b/smocore/include/body/bodyThread.h @@ -0,0 +1,30 @@ +#ifndef SMO_BODY_THREAD_H +#define SMO_BODY_THREAD_H + +#include +#include +#include + +namespace smo { +namespace body { + +struct BodyThreadTag +{ + static boost::asio::io_service &io_service(); +}; + +template +using BodyPostingPromise = + sscl::co::TaggedPostingPromise; + +using BodyNonViralPostingInvoker = + sscl::co::NonViralPostingInvoker; + +template +using BodyViralPostingInvoker = + sscl::co::ViralPostingInvoker; + +} // namespace body +} // namespace smo + +#endif // SMO_BODY_THREAD_H diff --git a/smocore/include/deviceManager/device.h b/smocore/include/deviceManager/device.h index 6a1e1c7..53de719 100644 --- a/smocore/include/deviceManager/device.h +++ b/smocore/include/deviceManager/device.h @@ -7,7 +7,7 @@ #include #include #include -#include +#include namespace smo { namespace device { @@ -15,25 +15,26 @@ namespace device { class Device { public: - Device(const std::string& identifier) - : deviceIdentifier(identifier), qutex("Device-" + identifier) + explicit Device(const std::string& identifier) + : deviceIdentifier(identifier), + qutex("Device-" + identifier) {} - std::string stringify() const - { - std::ostringstream os; - os << "Device Identifier: " << deviceIdentifier - << ", Device Roles: " << deviceRoles.size() << std::endl; - for (const auto& deviceRole : deviceRoles) { - os << " " << deviceRole->deviceAttachmentSpec->stringify(); - } - return os.str(); - } + std::string stringify() const + { + std::ostringstream os; + os << "Device Identifier: " << deviceIdentifier + << ", Device Roles: " << deviceRoles.size() << std::endl; + for (const auto& deviceRole : deviceRoles) { + os << " " << deviceRole->deviceAttachmentSpec->stringify(); + } + return os.str(); + } public: std::string deviceIdentifier; std::vector> deviceRoles; - sscl::Qutex qutex; + sscl::co::CoQutex qutex; }; } // namespace device diff --git a/smocore/include/deviceManager/deviceManager.h b/smocore/include/deviceManager/deviceManager.h index 85f87d1..56a09a7 100644 --- a/smocore/include/deviceManager/deviceManager.h +++ b/smocore/include/deviceManager/deviceManager.h @@ -7,15 +7,16 @@ #include #include #include -#include #include #include -#include #include #include #include -#include -#include +#include +#include +#include +#include +#include namespace smo { namespace device { @@ -25,11 +26,18 @@ class DeviceReattacher; class DeviceManager { public: - static DeviceManager& getInstance() - { - static DeviceManager instance; - return instance; - } + struct DeviceAttachmentIndResult + { + bool success = false; + std::shared_ptr deviceRole; + std::shared_ptr deviceSpec; + }; + + static DeviceManager& getInstance() + { + static DeviceManager instance; + return instance; + } void initialize(void) {}; @@ -39,80 +47,61 @@ public: void initializeDeviceReattacher(); void finalizeDeviceReattacher(); - std::string readDapSpecFile(const std::string& filename); - void collateAllDapSpecs(void); - void parseAllDapSpecs(void); + std::string readDapSpecFile(const std::string& filename); + void collateAllDapSpecs(void); + void parseAllDapSpecs(void); - static const std::string stringifyDeviceSpecs(void); + static const std::string stringifyDeviceSpecs(void); - typedef std::function deviceRole, - std::shared_ptr deviceSpec)> - newDeviceAttachmentSpecIndCbFn; - typedef std::function deviceSpec)> - removeDeviceAttachmentSpecReqCbFn; + mrntt::MrnttViralPostingInvoker + newDeviceAttachmentSpecIndCReq(const DeviceAttachmentSpec &spec); - void newDeviceAttachmentSpecInd( - const DeviceAttachmentSpec &spec, - sscl::Callback callback); - void removeDeviceAttachmentSpecReq( - const DeviceAttachmentSpec &spec, - sscl::Callback callback); + mrntt::MrnttViralPostingInvoker + removeDeviceAttachmentSpecCReq(const DeviceAttachmentSpec &spec); - // Device attachment/detachment methods moved from SenseApiManager - typedef stim_buff::sal_mlo_attachDeviceReqCbFn attachStimBuffDeviceReqCbFn; - typedef stim_buff::sal_mlo_detachDeviceReqCbFn detachStimBuffDeviceReqCbFn; + mrntt::MrnttViralPostingInvoker + attachStimBuffDeviceCReq( + const std::shared_ptr& spec); - void attachStimBuffDeviceReq( - const std::shared_ptr& spec, - sscl::Callback cb); - void detachStimBuffDeviceReq( - const std::shared_ptr& spec, - sscl::Callback cb); + mrntt::MrnttViralPostingInvoker + detachStimBuffDeviceCReq( + const std::shared_ptr& spec); - typedef std::function - attachAllUnattachedDevicesFromReqCbFn; - typedef std::function - detachAllAttachedDeviceRolesCbFn; + mrntt::MrnttViralPostingInvoker + attachAllUnattachedDevicesFromCReq( + const std::shared_ptr> &specs); - void attachAllUnattachedDevicesFromReq( - const std::shared_ptr> &specs, - sscl::Callback cb); - void attachAllUnattachedDevicesFromKnownListReq( - sscl::Callback cb); - void attachAllUnattachedDevicesFromCmdlineReq( - sscl::Callback cb); - void detachAllAttachedDeviceRoles( - sscl::Callback cb); + mrntt::MrnttViralPostingInvoker + attachAllUnattachedDevicesFromKnownListCReq(); + + mrntt::MrnttViralPostingInvoker + attachAllUnattachedDevicesFromCmdlineCReq(); + + mrntt::MrnttViralPostingInvoker + detachAllAttachedDeviceRolesCReq(); private: - DeviceManager() - : qutex("DeviceManager"), deviceReattacher(nullptr) + DeviceManager() + : s("DeviceManager") {} - ~DeviceManager(); - DeviceManager(const DeviceManager&) = delete; - DeviceManager& operator=(const DeviceManager&) = delete; + + ~DeviceManager(); + DeviceManager(const DeviceManager&) = delete; + DeviceManager& operator=(const DeviceManager&) = delete; public: - sscl::Qutex qutex; - std::string allDapSpecs; - static std::vector> - deviceAttachmentSpecs; - static std::vector> devices; - static std::vector> attachedDeviceRoles; - static std::vector commandLineDASpecs; + struct Resources + { + std::vector> deviceAttachmentSpecs; + std::vector> devices; + std::vector> attachedDeviceRoles; + std::vector commandLineDASpecs; + std::string allDapSpecs; + }; + sscl::SharedResourceGroup s; private: std::unique_ptr deviceReattacher; - - class NewDeviceAttachmentSpecInd; - class RemoveDeviceAttachmentSpecReq; - class AttachStimBuffDeviceReq; - typedef AttachStimBuffDeviceReq DetachStimBuffDeviceReq; - class AttachAllUnattachedDevicesFromReq; - class AttachAllUnattachedDevicesFromKnownListReq; - class DetachAllAttachedDeviceRoles; }; } // namespace device diff --git a/smocore/include/deviceManager/deviceReattacher.h b/smocore/include/deviceManager/deviceReattacher.h index 08efc81..3d54d84 100644 --- a/smocore/include/deviceManager/deviceReattacher.h +++ b/smocore/include/deviceManager/deviceReattacher.h @@ -3,8 +3,14 @@ #include #include +#include +#include +#include #include +#include #include +#include +#include #include namespace smo { @@ -16,27 +22,35 @@ class DeviceManager; class DeviceReattacher { public: - DeviceReattacher( + DeviceReattacher( DeviceManager& parent, std::shared_ptr ioThread); - ~DeviceReattacher() = default; + ~DeviceReattacher() = default; - // Non-copyable - DeviceReattacher(const DeviceReattacher&) = delete; - DeviceReattacher& operator=(const DeviceReattacher&) = delete; + DeviceReattacher(const DeviceReattacher&) = delete; + DeviceReattacher& operator=(const DeviceReattacher&) = delete; - // Control methods - void start(); - void stop(); + // Control methods + void start(); + void stop(); private: - void scheduleNextTimeout(); - void onTimeout(const boost::system::error_code& error); + void scheduleNextTimeout(); + void onTimeout(const boost::system::error_code& error); + void holdReattachCReq(); - DeviceManager &parent; - std::shared_ptr ioThread; - sscl::SpinLock shouldContinueLock; - bool shouldContinue; - boost::asio::deadline_timer timer; + mrntt::MrnttNonViralPostingInvoker reattachKnownListCReq( + std::exception_ptr &exceptionPtr, + std::function callback); + + DeviceManager &parent; + std::shared_ptr ioThread; + sscl::SpinLock shouldContinueLock; + bool shouldContinue; + boost::asio::deadline_timer timer; + std::exception_ptr reattachLifetimeExceptionPtr; + std::optional reattachCReqInvoker; + bool reattachOpInFlight = false; + std::chrono::steady_clock::time_point lastReattachReqTimestamp{}; }; } // namespace device diff --git a/smocore/include/director/directorThread.h b/smocore/include/director/directorThread.h new file mode 100644 index 0000000..a4f8ddd --- /dev/null +++ b/smocore/include/director/directorThread.h @@ -0,0 +1,27 @@ +#ifndef SMO_DIRECTOR_THREAD_H +#define SMO_DIRECTOR_THREAD_H + +#include +#include +#include + +namespace smo { +namespace director { + +struct DirectorThreadTag +{ + static boost::asio::io_service &io_service(); +}; + +template +using DirectorPostingPromise = + sscl::co::TaggedPostingPromise; + +template +using DirectorViralPostingInvoker = + sscl::co::ViralPostingInvoker; + +} // namespace director +} // namespace smo + +#endif // SMO_DIRECTOR_THREAD_H diff --git a/smocore/include/marionette/marionette.h b/smocore/include/marionette/marionette.h index 56098f4..5d98292 100644 --- a/smocore/include/marionette/marionette.h +++ b/smocore/include/marionette/marionette.h @@ -4,9 +4,12 @@ #include #include #include +#include #include #include +#include #include +#include namespace sscl { @@ -26,11 +29,17 @@ public: {} ~MarionetteComponent() = default; -public: - typedef std::function mrnttLifetimeMgmtOpCbFn; - void initializeReq(sscl::Callback callback); - void finalizeReq(sscl::Callback callback); - // Intentionally doesn't take a callback. + void holdInitializeCReq(std::function completion); + void holdFinalizeCReq(std::function completion); + + MrnttNonViralPostingInvoker initializeCReq( + std::exception_ptr &exceptionPtr, + std::function callback); + + MrnttNonViralPostingInvoker finalizeCReq( + std::exception_ptr &exceptionPtr, + std::function callback); + void exceptionInd(); void handleLoopExceptionHook() override; @@ -47,11 +56,14 @@ protected: void handleTryBlock1UnknownException() override; private: - class MrnttLifetimeMgmtOp; - class TerminationEvent; - std::unique_ptr signals; bool callShutdownSalmanoff = false; + std::optional initializeCReqInvoker; + std::optional finalizeCReqInvoker; + +public: + std::exception_ptr initializeLifetimeExceptionPtr; + std::exception_ptr finalizeLifetimeExceptionPtr; }; extern std::shared_ptr thread; diff --git a/smocore/include/marionette/marionetteThread.h b/smocore/include/marionette/marionetteThread.h new file mode 100644 index 0000000..845175e --- /dev/null +++ b/smocore/include/marionette/marionetteThread.h @@ -0,0 +1,37 @@ +#ifndef SMO_MARIONETTE_THREAD_H +#define SMO_MARIONETTE_THREAD_H + +#include +#include +#include + +namespace smo { +namespace mrntt { + +struct MrnttThreadTag +{ + static boost::asio::io_service &io_service() noexcept; +}; + +template +using MrnttPostingPromise = + sscl::co::TaggedPostingPromise; + +using MrnttNonViralPostingInvoker = + sscl::co::NonViralPostingInvoker; + +template +using MrnttViralPostingInvoker = + sscl::co::ViralPostingInvoker; + +using MrnttViralNonPostingInvoker = + sscl::co::ViralNonPostingInvoker; + +template +using MrnttViralNonPostingInvokerT = + sscl::co::ViralNonPostingInvoker; + +} // namespace mrntt +} // namespace smo + +#endif // SMO_MARIONETTE_THREAD_H diff --git a/smocore/include/mind.h b/smocore/include/mind.h index b3fdb7c..25bdab6 100644 --- a/smocore/include/mind.h +++ b/smocore/include/mind.h @@ -2,16 +2,15 @@ #define _MIND_H #include -#include #include #include -#include #include #include #include #include #include +#include #include #include #include @@ -25,9 +24,8 @@ public: Mind(void); ~Mind(void) = default; - typedef std::function mindLifetimeMgmtOpCbFn; - void initializeReq(sscl::Callback callback); - void finalizeReq(sscl::Callback callback); + mrntt::MrnttViralNonPostingInvokerT initializeCReq(); + mrntt::MrnttViralNonPostingInvokerT finalizeCReq(); // ComponentThread access methods std::shared_ptr getComponentThread(sscl::ThreadId id) const; @@ -46,9 +44,6 @@ public: private: friend class body::Body; bool bodyComponentInitialized = false; - -private: - class MindLifetimeMgmtOp; }; namespace mind { diff --git a/smocore/include/simulator/simulatorThread.h b/smocore/include/simulator/simulatorThread.h new file mode 100644 index 0000000..d091730 --- /dev/null +++ b/smocore/include/simulator/simulatorThread.h @@ -0,0 +1,27 @@ +#ifndef SMO_SIMULATOR_THREAD_H +#define SMO_SIMULATOR_THREAD_H + +#include +#include +#include + +namespace smo { +namespace simulator { + +struct SimulatorThreadTag +{ + static boost::asio::io_service &io_service(); +}; + +template +using SimulatorPostingPromise = + sscl::co::TaggedPostingPromise; + +template +using SimulatorViralPostingInvoker = + sscl::co::ViralPostingInvoker; + +} // namespace simulator +} // namespace smo + +#endif // SMO_SIMULATOR_THREAD_H diff --git a/smocore/include/stimBuffApis/stimBuffApiLib.h b/smocore/include/stimBuffApis/stimBuffApiLib.h index 0c0541d..98f7eb2 100644 --- a/smocore/include/stimBuffApis/stimBuffApiLib.h +++ b/smocore/include/stimBuffApis/stimBuffApiLib.h @@ -8,7 +8,8 @@ #include #include #include -#include +#include +#include namespace smo { namespace stim_buff { @@ -31,10 +32,11 @@ public: StimBuffApiLib( const std::string& path, void *_dlopen_handle, SMO_GET_STIM_BUFF_API_DESC_FN_TYPEDEF *descFn) - : libraryPath(path), qutex("StimBuffApiLib-" + path), + : libraryPath(path), isBeingDestroyed(false), dlopen_handle(_dlopen_handle, DlCloser()), - SMO_GET_STIM_BUFF_API_DESC_FN_NAME(descFn) + SMO_GET_STIM_BUFF_API_DESC_FN_NAME(descFn), + s("StimBuffApiLib-" + path) {} void setStimBuffApiDesc(const StimBuffApiDesc &desc) @@ -51,7 +53,6 @@ public: public: std::string libraryPath; - sscl::Qutex qutex; std::atomic isBeingDestroyed; std::unique_ptr dlopen_handle; /* UNIMPLEMENTED: API-specific cmdline options. These affect this specific @@ -77,6 +78,11 @@ public: */ StimBuffApiDesc stimBuffApiDesc; + struct StimBuffApiLibResources + {}; + + sscl::SharedResourceGroup s; + std::string stringify() const { std::string result = "Library Path: " + libraryPath + "\n"; result += "Stim Buff API Descriptor: " + stimBuffApiDesc.stringify() + "\n"; diff --git a/smocore/include/stimBuffApis/stimBuffApiManager.h b/smocore/include/stimBuffApis/stimBuffApiManager.h index c7ddb0b..071a565 100644 --- a/smocore/include/stimBuffApis/stimBuffApiManager.h +++ b/smocore/include/stimBuffApis/stimBuffApiManager.h @@ -7,11 +7,10 @@ #include #include #include -#include #include #include -#include -#include +#include +#include namespace smo { namespace stim_buff { @@ -19,6 +18,11 @@ namespace stim_buff { class StimBuffApiManager { public: + struct Resources + { + std::vector> stimBuffApiLibs; + }; + static StimBuffApiManager& getInstance() { static StimBuffApiManager instance; @@ -54,17 +58,16 @@ public: private: StimBuffApiManager() - : qutex("StimBuffApiManager") + : s("StimBuffApiManager") {} + ~StimBuffApiManager() = default; StimBuffApiManager(const StimBuffApiManager&) = delete; StimBuffApiManager& operator=(const StimBuffApiManager&) = delete; - std::vector> stimBuffApiLibs; - public: - sscl::Qutex qutex; + sscl::SharedResourceGroup s; public: static std::optional searchForLibInSmoSearchPaths( diff --git a/smocore/include/subconsciousThread.h b/smocore/include/subconsciousThread.h new file mode 100644 index 0000000..01059f4 --- /dev/null +++ b/smocore/include/subconsciousThread.h @@ -0,0 +1,25 @@ +#ifndef SMO_SUBCONSCIOUS_THREAD_H +#define SMO_SUBCONSCIOUS_THREAD_H + +#include +#include +#include + +namespace smo { + +struct SubconsciousThreadTag +{ + static boost::asio::io_service &io_service(); +}; + +template +using SubconsciousPostingPromise = + sscl::co::TaggedPostingPromise; + +template +using SubconsciousViralPostingInvoker = + sscl::co::ViralPostingInvoker; + +} // namespace smo + +#endif // SMO_SUBCONSCIOUS_THREAD_H diff --git a/smocore/include/world/worldThread.h b/smocore/include/world/worldThread.h new file mode 100644 index 0000000..dfc8313 --- /dev/null +++ b/smocore/include/world/worldThread.h @@ -0,0 +1,25 @@ +#ifndef SMO_WORLD_THREAD_H +#define SMO_WORLD_THREAD_H + +#include +#include +#include + +namespace smo { + +struct WorldThreadTag +{ + static boost::asio::io_service &io_service(); +}; + +template +using WorldPostingPromise = + sscl::co::TaggedPostingPromise; + +template +using WorldViralPostingInvoker = + sscl::co::ViralPostingInvoker; + +} // namespace smo + +#endif // SMO_WORLD_THREAD_H diff --git a/smocore/marionette/lifetime.cpp b/smocore/marionette/lifetime.cpp index 89c84f8..2198d4a 100644 --- a/smocore/marionette/lifetime.cpp +++ b/smocore/marionette/lifetime.cpp @@ -1,190 +1,92 @@ #include #include -#include -#include -#include -#include -#include -#include +#include + +#include #include #include +#include +#include #include +#include namespace smo { namespace mrntt { -class MarionetteComponent::MrnttLifetimeMgmtOp -: public sscl::PostedAsynchronousContinuation +namespace { + +void assertMarionetteThread() { -public: - MrnttLifetimeMgmtOp( - MarionetteComponent &parent, - const std::shared_ptr &caller, - sscl::Callback callback) - : sscl::PostedAsynchronousContinuation( - caller, callback), - parent(parent) - {} - -private: - MarionetteComponent &parent; - -public: - void initializeReq1_posted( - [[maybe_unused]] std::shared_ptr context - ) + auto self = sscl::ComponentThread::getSelf(); + if (self->id != SmoThreadId::MRNTT) { - auto self = sscl::ComponentThread::getSelf(); - if (self->id != smo::SmoThreadId::MRNTT) - { - throw std::runtime_error(std::string(__func__) - + ": Must be executed on Marionette thread"); - } - - smo::mind::globalMind = std::make_shared(); - smo::mind::globalMind->initializeReq({context, std::bind( - &MrnttLifetimeMgmtOp::initializeReq2, - this, context, std::placeholders::_1)}); - } - - void initializeReq2( - std::shared_ptr context, - bool success - ) - { - if (!success) - { - std::cerr << __func__ << ": Failed to initialize globalMind" - << std::endl; - context->callOriginalCb(false); - return; - } - - smo::device::DeviceManager::getInstance().initializeDeviceReattacher(); - - // Call negtrinEventInd on the Director in the final callback - smo::mind::globalMind->director.negtrinEventInd(); - - context->callOriginalCb(success); - } - - void finalizeReq1_posted( - [[maybe_unused]] std::shared_ptr context - ) - { - auto self = sscl::ComponentThread::getSelf(); - if (self->id != smo::SmoThreadId::MRNTT) - { - throw std::runtime_error(std::string(__func__) - + ": Must be executed on Marionette thread"); - } - - smo::device::DeviceManager::getInstance().finalizeDeviceReattacher(); - - /** FIXME: - * It may be necessary to add a delay here to ensure that all in-flight - * timer timeouts have finished executing? Or some other mechanism. - * - * We need some way to ensure that in-flight timeouts don't get fired - * during the finalize sequence. This is because they may depend on - * state that is being finalized or has been finalized at the point - * when they timeout. - * - * This seems to be actually happening with the delayed calls to - * AttachDeviceReq::attachDeviceReq2() inside of livoxGen1.cpp. - * - * One tactic might be to shut down device reattacher before finalizing - * and pause for a bit before continuing to shutdown other components. - */ - - smo::mind::globalMind->finalizeReq({context, std::bind( - &MrnttLifetimeMgmtOp::finalizeReq2, - this, context, std::placeholders::_1)}); - } - - void finalizeReq2( - std::shared_ptr context, - bool success - ) - { - if (!success) - { - std::cerr << __func__ << ": globalMind finalization failed" - << std::endl; - context->callOriginalCb(false); - return; - } - - context->callOriginalCb(success); - } -}; - -class MarionetteComponent::TerminationEvent -: public sscl::PostedAsynchronousContinuation -{ -public: - TerminationEvent( - const std::shared_ptr &caller) - : sscl::PostedAsynchronousContinuation( - caller, {nullptr, nullptr}) - {} - -public: - void exceptionInd1_posted( - [[maybe_unused]] std::shared_ptr context - ) - { - auto self = sscl::ComponentThread::getSelf(); - if (self->id != smo::SmoThreadId::MRNTT) - { - throw std::runtime_error(std::string(__func__) - + ": Must be executed on Marionette thread"); - } - - smo::mrntt::mrntt.finalizeReq({nullptr, std::bind( - &smo::mrntt::marionetteFinalizeReqCb, - std::placeholders::_1)}); - } -}; - -void MarionetteComponent::initializeReq( - sscl::Callback callback) -{ - auto mrntt = sscl::ComponentThread::getSelf(); - - if (mrntt->id != smo::SmoThreadId::MRNTT) - { - throw std::runtime_error(std::string(__func__) + throw std::runtime_error( + std::string(__func__) + ": Must be executed on Marionette thread"); } - - auto request = std::make_shared( - *this, mrntt, callback); - - mrntt->getIoService().post( - STC(std::bind( - &MrnttLifetimeMgmtOp::initializeReq1_posted, - request.get(), request))); } -void MarionetteComponent::finalizeReq( - sscl::Callback callback) -{ - auto mrntt = sscl::ComponentThread::getSelf(); +} // namespace - if (mrntt->id != smo::SmoThreadId::MRNTT) +void MarionetteComponent::holdInitializeCReq( + std::function completion) +{ + initializeCReqInvoker.emplace(initializeCReq( + initializeLifetimeExceptionPtr, std::move(completion))); +} + +void MarionetteComponent::holdFinalizeCReq( + std::function completion) +{ + finalizeCReqInvoker.emplace(finalizeCReq( + finalizeLifetimeExceptionPtr, std::move(completion))); +} + +MrnttNonViralPostingInvoker MarionetteComponent::initializeCReq( + [[maybe_unused]] std::exception_ptr &exceptionPtr, + [[maybe_unused]] std::function callback) +{ + assertMarionetteThread(); + + smo::mind::globalMind = std::make_shared(); + + bool mindInitialized = co_await smo::mind::globalMind->initializeCReq(); + if (!mindInitialized) { - throw std::runtime_error(std::string(__func__) - + ": Must be executed on Marionette thread"); + std::cerr << __func__ << ": Failed to initialize globalMind" + << std::endl; + co_return; } - auto request = std::make_shared( - *this, mrntt, callback); + smo::device::DeviceManager::getInstance().initializeDeviceReattacher(); - mrntt->getIoService().post( - STC(std::bind( - &MrnttLifetimeMgmtOp::finalizeReq1_posted, - request.get(), request))); + // Call negtrinEventInd on the Director in the final callback + smo::mind::globalMind->director.negtrinEventInd(); + + co_return; +} + +MrnttNonViralPostingInvoker MarionetteComponent::finalizeCReq( + [[maybe_unused]] std::exception_ptr &exceptionPtr, + [[maybe_unused]] std::function callback) +{ + assertMarionetteThread(); + + smo::device::DeviceManager::getInstance().finalizeDeviceReattacher(); + + if (!smo::mind::globalMind) + { + co_return; + } + + bool mindFinalized = co_await smo::mind::globalMind->finalizeCReq(); + if (!mindFinalized) + { + std::cerr << __func__ << ": globalMind finalization failed" + << std::endl; + } + + co_return; } void MarionetteComponent::handleLoopExceptionHook() @@ -195,16 +97,15 @@ void MarionetteComponent::handleLoopExceptionHook() void MarionetteComponent::exceptionInd() { - auto faultyThread = sscl::ComponentThread::getSelf(); - auto mrntt = sscl::ComponentThread::getPptr(); + auto puppeteer = sscl::ComponentThread::getPptr(); - auto request = std::make_shared( - faultyThread); - - mrntt->getIoService().post( - STC(std::bind( - &TerminationEvent::exceptionInd1_posted, - request.get(), request))); + boost::asio::post( + puppeteer->getIoService(), + [] + { + mrntt.holdFinalizeCReq( + []() { marionetteFinalizeReqCb(true); }); + }); } } // namespace mrntt diff --git a/smocore/marionette/main.cpp b/smocore/marionette/main.cpp index 6a14683..0a00136 100644 --- a/smocore/marionette/main.cpp +++ b/smocore/marionette/main.cpp @@ -33,9 +33,8 @@ void marionetteInitializeReqCb(bool success) std::cerr << __func__ << ": Failed to initialize Marionette. Shutting down." << '\n'; - mrntt.finalizeReq({nullptr, std::bind( - &smo::mrntt::marionetteFinalizeReqCb, - std::placeholders::_1)}); + mrntt.holdFinalizeCReq( + []() { marionetteFinalizeReqCb(true); }); } void marionetteFinalizeReqCb(bool success) @@ -83,9 +82,8 @@ void MarionetteComponent::postJoltHook() default: break; } - mrntt.finalizeReq({nullptr, std::bind( - &marionetteFinalizeReqCb, - std::placeholders::_1)}); + mrntt.holdFinalizeCReq( + []() { marionetteFinalizeReqCb(true); }); }); } @@ -110,7 +108,7 @@ void MarionetteComponent::tryBlock1Hook() void MarionetteComponent::preLoopHook() { - /** EXPLANATION: + /** EXPLANATION: * Initialize Salmanoff's Manager classes first. * Manager classes' initialization is synchronous in nature, so we * don't need the minds to be running to initialize them. @@ -131,9 +129,12 @@ void MarionetteComponent::preLoopHook() smo::initializeSalmanoff(); callShutdownSalmanoff = true; - initializeReq(sscl::Callback{ - nullptr, - std::bind(&marionetteInitializeReqCb, std::placeholders::_1)}); + holdInitializeCReq( + [] + { + marionetteInitializeReqCb( + !mrntt.initializeLifetimeExceptionPtr); + }); std::cout << "PuppeteerThread::main: Entering event loop" << "\n"; } diff --git a/smocore/mind.cpp b/smocore/mind.cpp index 61b5f97..f629b7e 100644 --- a/smocore/mind.cpp +++ b/smocore/mind.cpp @@ -2,11 +2,6 @@ #include #include #include -#include -#include -#include -#include -#include #include #include #include @@ -63,17 +58,17 @@ Mind::getComponentThread(sscl::ThreadId id) const "getComponentThread"); } - // Search through the vector for the thread with matching id - for (auto& thread : componentThreads) + // Search through the vector for the thread with matching id + for (auto& thread : componentThreads) { - if (thread->id == id) { + if (thread->id == id) { return std::static_pointer_cast(thread); } - } + } - // Throw exception if no thread found - throw std::runtime_error(std::string(__func__) + - ": No MindThread found with ID " + // Throw exception if no thread found + throw std::runtime_error(std::string(__func__) + + ": No MindThread found with ID " + std::to_string(static_cast(id))); } @@ -88,16 +83,16 @@ Mind::getComponentThread(const std::string& name) const "getComponentThread"); } - for (auto& thread : componentThreads) + for (auto& thread : componentThreads) { - if (thread->name == name) { + if (thread->name == name) { return std::static_pointer_cast(thread); } } - // Throw exception if no thread found - throw std::runtime_error(std::string(__func__) + - ": No MindThread found with name '" + name + "'"); + // Throw exception if no thread found + throw std::runtime_error(std::string(__func__) + + ": No MindThread found with name '" + name + "'"); } std::vector> @@ -111,152 +106,47 @@ Mind::getMindThreads() const return mindThreads; } -class Mind::MindLifetimeMgmtOp -: public sscl::PostedAsynchronousContinuation +mrntt::MrnttViralNonPostingInvokerT Mind::initializeCReq() { -public: - MindLifetimeMgmtOp( - Mind &parent, const std::shared_ptr &caller, - sscl::Callback callback) - : sscl::PostedAsynchronousContinuation( - caller, callback), - parent(parent) - {} - -public: - Mind &parent; - -public: - void initializeReq1_posted( - [[maybe_unused]] std::shared_ptr context - ) + try { - /* Jolt the threads, then start them */ - parent.joltAllPuppetThreadsReq( - {context, std::bind( - &MindLifetimeMgmtOp::initializeReq2, - context.get(), context)}); + distributeAndPinThreadsAcrossCpus(); + } + catch (const std::exception& e) + { + std::cerr << "Salmanoff couldn't distribute the mind threads across " + "the CPUs, so performance may be suboptimal.\n" + "Error: " << e.what() << "\n"; } - void initializeReq2( - [[maybe_unused]] std::shared_ptr context - ) - { - std::cout << "Mrntt: All mind threads JOLTed." << "\n"; + co_await joltAllPuppetThreadsCReq(); + std::cout << "Mrntt: All mind threads JOLTed." << "\n"; - parent.startAllPuppetThreadsReq( - {context, std::bind( - &MindLifetimeMgmtOp::initializeReq3, - context.get(), context)}); - } + co_await startAllPuppetThreadsCReq(); + std::cout << "Mrntt: All mind threads started." << "\n"; - void initializeReq3( - [[maybe_unused]] std::shared_ptr context - ) - { - std::cout << "Mrntt: All mind threads started." << "\n"; + bool bodyInitialized = co_await body.initializeCReq(); + std::cout << "Mrntt: Body component initialized." << "\n"; - parent.body.initializeReq( - {context, std::bind( - &MindLifetimeMgmtOp::initializeReq4, - context.get(), context, std::placeholders::_1)}); - } - - void initializeReq4( - [[maybe_unused]] std::shared_ptr context, - bool success - ) - { - std::cout << "Mrntt: Body component initialized." << "\n"; - callOriginalCb(success); - } - - void finalizeReq1_posted( - [[maybe_unused]] std::shared_ptr context - ) - { - parent.body.finalizeReq( - {context, std::bind( - &MindLifetimeMgmtOp::finalizeReq2, - context.get(), context, std::placeholders::_1)}); - } - - void finalizeReq2( - [[maybe_unused]] std::shared_ptr context, - bool success - ) - { - if (!success) { - std::cerr << "Mrntt: Body component failed to finalize." << "\n"; - } else { - std::cout << "Mrntt: Body component finalized." << "\n"; - } - - /* If the threads haven't been jolted, we need to do that first, because - * otherwise they'll just enter their main loops and wait for control - * messages from mrntt after processing the exit request. - */ - parent.joltAllPuppetThreadsReq( - {context, std::bind( - &MindLifetimeMgmtOp::finalizeReq3, - context.get(), context)}); - } - - void finalizeReq3( - [[maybe_unused]] std::shared_ptr context - ) - { - std::cout << "Mrntt: All mind threads JOLTed for finalization." << "\n"; - - parent.exitAllPuppetThreadsReq( - {context, std::bind( - &MindLifetimeMgmtOp::finalizeReq4, - context.get(), context)}); - } - - void finalizeReq4( - [[maybe_unused]] std::shared_ptr context - ) - { - std::cout << "Mrntt: All mind threads exited." << "\n"; - callOriginalCb(true); - } -}; - -void Mind::initializeReq(sscl::Callback callback) -{ - /* Distribute threads across available CPUs */ - try - { - distributeAndPinThreadsAcrossCpus(); - } - catch (const std::exception& e) - { - std::cerr << "Salmanoff couldn't distribute the mind threads across " - "the CPUs, so performance may be suboptimal.\n" - "Error: " << e.what() << "\n"; - } - - const auto& caller = sscl::ComponentThread::getSelf(); - auto request = std::make_shared( - *this, caller, callback); - - mrntt::mrntt.thread->getIoService().post( - STC(std::bind( - &MindLifetimeMgmtOp::initializeReq1_posted, - request.get(), request))); + co_return bodyInitialized; } -void Mind::finalizeReq(sscl::Callback callback) +mrntt::MrnttViralNonPostingInvokerT Mind::finalizeCReq() { - const auto& caller = sscl::ComponentThread::getSelf(); - auto request = std::make_shared( - *this, caller, callback); + bool bodyFinalized = co_await body.finalizeCReq(); + if (!bodyFinalized) { + std::cerr << "Mrntt: Body component failed to finalize." << "\n"; + } else { + std::cout << "Mrntt: Body component finalized." << "\n"; + } - mrntt::mrntt.thread->getIoService().post( - STC(std::bind( - &MindLifetimeMgmtOp::finalizeReq1_posted, - request.get(), request))); + co_await joltAllPuppetThreadsCReq(); + std::cout << "Mrntt: All mind threads JOLTed for finalization." << "\n"; + + co_await exitAllPuppetThreadsCReq(); + std::cout << "Mrntt: All mind threads exited." << "\n"; + + co_return bodyFinalized; } } // namespace smo diff --git a/smocore/opts.cpp b/smocore/opts.cpp index ea928a6..008e7b2 100644 --- a/smocore/opts.cpp +++ b/smocore/opts.cpp @@ -7,7 +7,7 @@ #include #include #include -#include +#include OptionParser::Exception::Exception(std::string message_) @@ -107,7 +107,7 @@ void OptionParser::parseArguments(int argc, char *argv[], char **envp) } } - sscl::CallableTracer::optTraceCallables = traceCallables; + sscl::cps::CallableTracer::optTraceCallables = traceCallables; } std::string OptionParser::getUsage() const diff --git a/smocore/stimBuffApis/stimBuffApiManager.cpp b/smocore/stimBuffApis/stimBuffApiManager.cpp index 2bcf991..3cfcbc2 100644 --- a/smocore/stimBuffApis/stimBuffApiManager.cpp +++ b/smocore/stimBuffApis/stimBuffApiManager.cpp @@ -5,10 +5,6 @@ #include #include #include -#include -#include -#include -#include #include #include #include @@ -198,47 +194,50 @@ StimBuffApiLib& StimBuffApiManager::loadStimBuffApiLib( auto lib = std::make_shared( libraryPath, dlopen_handle.release(), func); lib->setStimBuffApiDesc(libApiDesc); - stimBuffApiLibs.push_back(lib); - return *stimBuffApiLibs.back(); + getInstance().s.rsrc.stimBuffApiLibs.push_back(lib); + return *getInstance().s.rsrc.stimBuffApiLibs.back(); } std::optional> StimBuffApiManager::getStimBuffApiLib(const std::string& libraryPath) { - auto it = std::find_if(stimBuffApiLibs.begin(), stimBuffApiLibs.end(), + auto &libs = getInstance().s.rsrc.stimBuffApiLibs; + auto it = std::find_if(libs.begin(), libs.end(), [&libPath = libraryPath](const std::shared_ptr& lib) { return lib->libraryPath == libPath; } ); - if (it != stimBuffApiLibs.end()) { return *it; } + if (it != libs.end()) { return *it; } return std::nullopt; } std::optional> StimBuffApiManager::getStimBuffApiLibByApiName(const std::string& apiName) { - auto it = std::find_if(stimBuffApiLibs.begin(), stimBuffApiLibs.end(), + auto &libs = getInstance().s.rsrc.stimBuffApiLibs; + auto it = std::find_if(libs.begin(), libs.end(), [&apiName](const std::shared_ptr& lib) { return lib->stimBuffApiDesc.name == apiName; } ); - if (it != stimBuffApiLibs.end()) { return *it; } + if (it != libs.end()) { return *it; } return std::nullopt; } void StimBuffApiManager::unloadStimBuffApiLib(const std::string& libraryPath) { - auto it = std::find_if(stimBuffApiLibs.begin(), stimBuffApiLibs.end(), + auto &libs = getInstance().s.rsrc.stimBuffApiLibs; + auto it = std::find_if(libs.begin(), libs.end(), [&lpath = libraryPath](const std::shared_ptr& lib) { return lib->libraryPath == lpath; } ); - if (it != stimBuffApiLibs.end()) + if (it != libs.end()) { - stimBuffApiLibs.erase(it); + libs.erase(it); return; } @@ -248,7 +247,7 @@ void StimBuffApiManager::unloadStimBuffApiLib(const std::string& libraryPath) void StimBuffApiManager::unloadAllStimBuffApiLibs(void) { - stimBuffApiLibs.clear(); + getInstance().s.rsrc.stimBuffApiLibs.clear(); } void StimBuffApiManager::loadAllStimBuffApiLibsFromOptions( @@ -264,7 +263,7 @@ void StimBuffApiManager::loadAllStimBuffApiLibsFromOptions( std::string StimBuffApiManager::stringifyLibs() const { std::string result; - for (const auto& lib : stimBuffApiLibs) { + for (const auto& lib : getInstance().s.rsrc.stimBuffApiLibs) { result += lib->stringify() + "\n"; } return result; @@ -303,14 +302,14 @@ void StimBuffApiManager::finalizeStimBuffApiLib(StimBuffApiLib& lib) void StimBuffApiManager::initializeAllStimBuffApiLibs(void) { - for (auto& lib : stimBuffApiLibs) { + for (auto& lib : getInstance().s.rsrc.stimBuffApiLibs) { initializeStimBuffApiLib(*lib); } } void StimBuffApiManager::finalizeAllStimBuffApiLibs(void) { - for (auto& lib : stimBuffApiLibs) { + for (auto& lib : getInstance().s.rsrc.stimBuffApiLibs) { finalizeStimBuffApiLib(*lib); } } diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp index 9732fa3..f9d73f4 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.cpp @@ -17,11 +17,11 @@ #include #include #include -#include +#include #include -#include -#include -#include +#include +#include +#include #include #include "ioUringAssemblyEngine.h" #include "pcloudStimulusProducer.h" @@ -175,7 +175,7 @@ void IoUringAssemblyEngine::finalize() { auto& ioService = smoHooksPtr->ComponentThread_getSelf()->getIoService(); - sscl::AsynchronousBridge bridge(ioService); + sscl::cps::AsynchronousBridge bridge(ioService); boost::asio::deadline_timer timeoutTimer(ioService); /** EXPLANATION: @@ -420,15 +420,15 @@ cleanup_eventfd: // Continuation class for assembleFrameReq class IoUringAssemblyEngine::AssembleFrameReq -: public sscl::PostedAsynchronousContinuation< +: public sscl::cps::PostedAsynchronousContinuation< IoUringAssemblyEngine::assembleFrameReqCbFn> { public: AssembleFrameReq( IoUringAssemblyEngine& engine_, const std::shared_ptr& caller, - sscl::Callback cb) - : sscl::PostedAsynchronousContinuation< + sscl::cps::Callback cb) + : sscl::cps::PostedAsynchronousContinuation< IoUringAssemblyEngine::assembleFrameReqCbFn>(caller, cb), engine(engine_), loop(engine_.frameAssemblyDesc->numSlots), @@ -635,7 +635,7 @@ public: }; void IoUringAssemblyEngine::assembleFrameReq( - sscl::Callback cb) + sscl::cps::Callback cb) { { sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); diff --git a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h index 2003498..82ec0ac 100644 --- a/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h +++ b/stimBuffApis/livoxGen1/ioUringAssemblyEngine.h @@ -16,9 +16,9 @@ #include #include #include -#include +#include #include -#include +#include #include #include #include @@ -42,7 +42,7 @@ public: void finalize(); typedef std::function assembleFrameReqCbFn; - void assembleFrameReq(sscl::Callback cb); + void assembleFrameReq(sscl::cps::Callback cb); // Telemetry helpers static size_t computePointsPerFrame(int returnMode, size_t nDgramsPerFrame) diff --git a/stimBuffApis/livoxGen1/livoxGen1.cpp b/stimBuffApis/livoxGen1/livoxGen1.cpp index b50471c..425cddb 100644 --- a/stimBuffApis/livoxGen1/livoxGen1.cpp +++ b/stimBuffApis/livoxGen1/livoxGen1.cpp @@ -11,11 +11,11 @@ #include #include #include -#include +#include #include #include #include -#include +#include #include #include "pcloudStimulusProducer.h" #include "livoxGen1.h" @@ -91,13 +91,13 @@ LivoxProto1DllState livoxProto1; // Continuation classes for async operations class AttachDeviceReq -: public sscl::NonPostedAsynchronousContinuation +: public sscl::cps::NonPostedAsynchronousContinuation { public: AttachDeviceReq( const std::shared_ptr& spec, - sscl::Callback cb) - : sscl::NonPostedAsynchronousContinuation( + sscl::cps::Callback cb) + : sscl::cps::NonPostedAsynchronousContinuation( std::move(cb)), spec(spec) {} @@ -360,14 +360,14 @@ public: }; class DetachDeviceReq -: public sscl::NonPostedAsynchronousContinuation +: public sscl::cps::NonPostedAsynchronousContinuation { public: DetachDeviceReq( const std::shared_ptr& spec, const std::shared_ptr& stimBuffer, - sscl::Callback cb) - : sscl::NonPostedAsynchronousContinuation( + sscl::cps::Callback cb) + : sscl::cps::NonPostedAsynchronousContinuation( std::move(cb)), spec(spec), stimBuffer(stimBuffer) {} @@ -625,7 +625,7 @@ extern "C" int livoxGen1_finalizeInd(void) extern "C" void livoxGen1_attachDeviceReq( const std::shared_ptr& desc, const std::shared_ptr& componentThread, - sscl::Callback cb + sscl::cps::Callback cb ) { if (!livoxProto1.livoxProto1_getOrCreateDeviceReq) @@ -825,7 +825,7 @@ extern "C" void livoxGen1_attachDeviceReq( extern "C" void livoxGen1_detachDeviceReq( const std::shared_ptr& desc, - sscl::Callback cb + sscl::cps::Callback cb ) { // Case 1: Check if StimBuffer doesn't exist (early return) diff --git a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp index 5508964..c0661fa 100644 --- a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp +++ b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.cpp @@ -9,9 +9,9 @@ #include #include #include -#include -#include -#include +#include +#include +#include #include #include #include @@ -229,7 +229,7 @@ void OpenClCollatingAndMeshingEngine::finalize() int delayMs = std::max(OCLCOLLMESH_ENGN_FINALIZE_DELAY_MS, 0); auto& ioService = smoHooksPtr->ComponentThread_getSelf()->getIoService(); - sscl::AsynchronousBridge bridge(ioService); + sscl::cps::AsynchronousBridge bridge(ioService); boost::asio::deadline_timer timeoutTimer(ioService); /** EXPLANATION: @@ -1008,7 +1008,7 @@ void OpenClCollatingAndMeshingEngine::produceAmbienceStimulusFrame( } class OpenClCollatingAndMeshingEngine::CompactCollateAndMeshFrameReq -: public sscl::PostedAsynchronousContinuation +: public sscl::cps::PostedAsynchronousContinuation { private: OpenClCollatingAndMeshingEngine& engine; @@ -1027,8 +1027,8 @@ public: std::optional lightAmbienceProductionDesc_, std::optional darkAmbienceProductionDesc_, const std::shared_ptr& caller, - sscl::Callback cb) - : sscl::PostedAsynchronousContinuation( + sscl::cps::Callback cb) + : sscl::cps::PostedAsynchronousContinuation( caller, cb), engine(engine_), frameAssemblyResult(asyncLoop), stimulusFrame(stimulusFrame_), @@ -1253,7 +1253,7 @@ void OpenClCollatingAndMeshingEngine::compactCollateAndMeshFrameReq( std::optional> intensityStimFrame, std::optional lightAmbienceProductionDesc, std::optional darkAmbienceProductionDesc, - sscl::Callback callback) + sscl::cps::Callback callback) { { sscl::SpinLock::Guard lock(shouldAcceptRequestsLock); diff --git a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h index 563307c..3ef79c9 100644 --- a/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h +++ b/stimBuffApis/livoxGen1/openClCollatingAndMeshingEngine.h @@ -14,7 +14,7 @@ #define CL_TARGET_OPENCL_VERSION 120 #include #include -#include +#include #include #include #include @@ -94,7 +94,7 @@ public: std::optional> intensityStimFrame, std::optional lightAmbienceProductionDesc, std::optional darkAmbienceProductionDesc, - sscl::Callback callback); + sscl::cps::Callback callback); private: // Callback function types diff --git a/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp b/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp index cb6700e..f663831 100644 --- a/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp +++ b/stimBuffApis/livoxGen1/pcloudStimulusProducer.cpp @@ -426,7 +426,7 @@ void PcloudStimulusProducer::stimFrameProductionTimesliceInd() } class PcloudStimulusProducer::ProduceFrameReq -: public sscl::PostedAsynchronousContinuation +: public sscl::cps::PostedAsynchronousContinuation { private: PcloudStimulusProducer& pcloudProducer; @@ -440,8 +440,8 @@ public: ProduceFrameReq( PcloudStimulusProducer& producer, const std::shared_ptr& caller, - sscl::Callback cb) - : sscl::PostedAsynchronousContinuation(caller, cb), + sscl::cps::Callback cb) + : sscl::cps::PostedAsynchronousContinuation(caller, cb), pcloudProducer(producer), frameAssemblyResult(0), stimulusFrame(producer.tempStimulusFrame) @@ -724,7 +724,7 @@ public: }; void PcloudStimulusProducer::produceFrameReq( - sscl::Callback callback) + sscl::cps::Callback callback) { /** EXPLANATION: * We shouldn't acquire the StimulusProducer::shouldContinueLock here because diff --git a/stimBuffApis/livoxGen1/pcloudStimulusProducer.h b/stimBuffApis/livoxGen1/pcloudStimulusProducer.h index 04c3f57..43acdb2 100644 --- a/stimBuffApis/livoxGen1/pcloudStimulusProducer.h +++ b/stimBuffApis/livoxGen1/pcloudStimulusProducer.h @@ -6,8 +6,8 @@ #include #include #include -#include -#include +#include +#include #include #include "ioUringAssemblyEngine.h" #include "livoxPcloudFrameDumper.h" @@ -87,7 +87,7 @@ protected: typedef std::function produceFrameReqCbFn; public: - void produceFrameReq(sscl::Callback callback); + void produceFrameReq(sscl::cps::Callback callback); size_t nDgramsPerStagingBufferFrame; std::shared_ptr device; diff --git a/stimBuffApis/xcbWindow/xcbWindow.cpp b/stimBuffApis/xcbWindow/xcbWindow.cpp index 0c752c9..34923ff 100644 --- a/stimBuffApis/xcbWindow/xcbWindow.cpp +++ b/stimBuffApis/xcbWindow/xcbWindow.cpp @@ -8,7 +8,7 @@ #include #include #include -#include +#include #include #include "xcbWindow.h" @@ -277,7 +277,7 @@ static int xcbWindow_finalizeInd(void) static void xcbWindow_attachDeviceReq( const std::shared_ptr& desc, const std::shared_ptr& componentThread, - sscl::Callback cb + sscl::cps::Callback cb ) { // Not used yet, but may be used later. @@ -302,7 +302,7 @@ static void xcbWindow_attachDeviceReq( static void xcbWindow_detachDeviceReq( const std::shared_ptr& spec, - sscl::Callback cb + sscl::cps::Callback cb ) { auto it = std::find_if(g_attachedWindows.begin(), g_attachedWindows.end(), diff --git a/tests/smocore/qutex_tests.cpp b/tests/smocore/qutex_tests.cpp index 8bacd27..b66dd2c 100644 --- a/tests/smocore/qutex_tests.cpp +++ b/tests/smocore/qutex_tests.cpp @@ -1,6 +1,6 @@ #include -#include -#include +#include +#include #include #include #include @@ -9,18 +9,18 @@ namespace smo { // Mock implementation of LockerAndInvokerBase for testing -class MockLockerAndInvoker : public LockerAndInvokerBase { +class MockLockerAndInvoker : public sscl::cps::LockerAndInvokerBase { public: explicit MockLockerAndInvoker(const void* addr) - : LockerAndInvokerBase(addr), awakened(false) {} + : sscl::cps::LockerAndInvokerBase(addr), awakened(false) {} bool awakened; - sscl::Qutex* registeredQutex = nullptr; - List::iterator queueIterator; + sscl::cps::Qutex* registeredQutex = nullptr; + sscl::cps::LockerAndInvokerBase::List::iterator queueIterator; - List::iterator getLockvokerIteratorForQutex(sscl::Qutex& qutex) override { + sscl::cps::LockerAndInvokerBase::List::iterator getLockvokerIteratorForQutex(sscl::cps::Qutex& qutex) override { registeredQutex = &qutex; - queueIterator = qutex.registerInQueue(std::shared_ptr(this)); + queueIterator = qutex.registerInQueue(std::shared_ptr(this)); return queueIterator; } @@ -44,7 +44,7 @@ protected: // Clean up } - sscl::Qutex qutex; + sscl::cps::Qutex qutex; std::shared_ptr mock1, mock2, mock3, mock4, mock5; // Unique addresses for testing