From af33b7f0971ee4caaace2c730e622aafde9f2bf0 Mon Sep 17 00:00:00 2001 From: Hayodea Hakol Date: Tue, 16 Sep 2025 18:38:06 -0400 Subject: [PATCH] SenseApiMgr: Make at/detachSenseDev & at/detachAllSenseDevs posted They are posted to Marionette. * We also fixed callOriginCb invocations; * Also made posted CBs use std::bind instead of greedily early-invoking the CB on the servicing thread's stack. --- smocore/include/senseApis/senseApiManager.h | 2 +- smocore/senseApis/senseApiManager.cpp | 373 +++++++++++++------- 2 files changed, 250 insertions(+), 125 deletions(-) diff --git a/smocore/include/senseApis/senseApiManager.h b/smocore/include/senseApis/senseApiManager.h index 78a621c..3b46971 100644 --- a/smocore/include/senseApis/senseApiManager.h +++ b/smocore/include/senseApis/senseApiManager.h @@ -81,7 +81,7 @@ private: std::vector> senseApiLibs; class AttachSenseDeviceReq; - class DetachSenseDeviceReq; + typedef AttachSenseDeviceReq DetachSenseDeviceReq; class AttachAllSenseDevicesFromSpecsReq; class DetachAllSenseDevicesReq; diff --git a/smocore/senseApis/senseApiManager.cpp b/smocore/senseApis/senseApiManager.cpp index e814386..a8c632a 100644 --- a/smocore/senseApis/senseApiManager.cpp +++ b/smocore/senseApis/senseApiManager.cpp @@ -11,6 +11,8 @@ #include #include #include +#include + namespace fs = std::filesystem; @@ -255,58 +257,179 @@ void SenseApiManager::initializeAllSenseApiLibs(void) void SenseApiManager::finalizeAllSenseApiLibs(void) { for (auto& lib : senseApiLibs) { - finalizeSenseApiLib(*lib); + finalizeSenseApiLib(*lib); } } +class SenseApiManager::AttachSenseDeviceReq +: public TargetedAsynchronousContinuation +{ +public: + AttachSenseDeviceReq( + const std::shared_ptr& spec, + const std::shared_ptr &caller, + attachSenseDeviceReqCbFn cb) + : TargetedAsynchronousContinuation( + caller, cb), + spec(spec) + {} + + void callOriginalCb( + bool success, std::shared_ptr deviceSpec + ) + { + if (originalCbFn) + { + caller->getIoService().post( + std::bind( + originalCbFn, success, deviceSpec)); + } + } + +public: + void attachSenseDeviceReq1_posted( + [[maybe_unused]] std::shared_ptr context + ) + { + if (caller->id != ComponentThread::MRNTT) + { + std::cerr << std::string(__func__) + << ": executed on non-mrntt thread: " + << caller->name << std::endl; + callOriginalCb(false, spec); + return; + } + + /** FIXME: + * We should acquire a spinlock here to ensure that the device isn't + * added in the interim while the async op executes. + */ + auto libOpt = SenseApiManager::getInstance().getSenseApiLibByApiName( + spec->api); + + if (!libOpt) + { + std::cerr << std::string(__func__) + ": No library found for API '" + << spec->api << "'" << std::endl; + callOriginalCb(false, spec); + return; + } + + auto& lib = *libOpt.value(); + if (!lib.senseApiDesc.sal_mgmt_libOps.attachDeviceReq) + { + std::cerr << std::string(__func__) + ": attachDeviceReq() is NULL " + "for library '" << lib.libraryPath << "'" << std::endl; + callOriginalCb(false, spec); + return; + } + + /** EXPLANATION: + * We pass in either the body or world thread here, depending on whether + * the device is an introspector (idev) or extrospector (edev). + * + * Introspectors are attached to the body thread; extrospectors are + * attached to the world thread. + */ + std::shared_ptr threadForAttachment; + if (spec->sensorType == 'e') + { + threadForAttachment = mind::globalMind->world.thread; + std::cout << __func__ << ": Attaching edev " + << spec->deviceIdentifier << " to world thread" << "\n"; + } + else + { + threadForAttachment = mind::globalMind->body.thread; + std::cout << __func__ << ": Attaching non-edev " + << spec->deviceIdentifier << " to body thread" << "\n"; + } + + lib.senseApiDesc.sal_mgmt_libOps.attachDeviceReq( + spec, threadForAttachment, + std::bind( + &AttachSenseDeviceReq::attachSenseDeviceReq2, + context.get(), context, + std::placeholders::_1, std::placeholders::_2)); + } + + void attachSenseDeviceReq2( + [[maybe_unused]] std::shared_ptr context, + bool success, + std::shared_ptr deviceSpec + ) + { + callOriginalCb(success, deviceSpec); + } + + void detachSenseDeviceReq1_posted( + [[maybe_unused]] std::shared_ptr context + ) + { + if (caller->id != ComponentThread::MRNTT) + { + std::cerr << std::string(__func__) + << ": executed on non-mrntt thread: " + << caller->name << std::endl; + callOriginalCb(false, spec); + return; + } + + /** FIXME: + * We should acquire a spinlock here to ensure that the device isn't + * removed in the interim while the async op executes. + */ + auto libOpt = SenseApiManager::getInstance().getSenseApiLibByApiName( + spec->api); + if (!libOpt) + { + std::cerr << std::string(__func__) + ": No library found for API '" + << spec->api << "'" << std::endl; + callOriginalCb(false, spec); + return; + } + auto& lib = *libOpt.value(); + if (!lib.senseApiDesc.sal_mgmt_libOps.detachDeviceReq) + { + std::cerr << std::string(__func__) + ": detachDeviceReq() is NULL " + "for library '" << lib.libraryPath << "'" << std::endl; + callOriginalCb(false, spec); + return; + } + lib.senseApiDesc.sal_mgmt_libOps.detachDeviceReq( + spec, + std::bind( + &DetachSenseDeviceReq::detachSenseDeviceReq2, + context.get(), context, + std::placeholders::_1, std::placeholders::_2)); + } + + void detachSenseDeviceReq2( + [[maybe_unused]] std::shared_ptr context, + bool success, + std::shared_ptr deviceSpec + ) + { + callOriginalCb(success, deviceSpec); + } + +public: + std::shared_ptr spec; +}; + void SenseApiManager::attachSenseDeviceReq( const std::shared_ptr& spec, attachSenseDeviceReqCbFn cb ) { - /** FIXME: - * We should acquire a spinlock here to ensure that the device isn't added - * in the interim while the async op executes. - */ + const auto& caller = ComponentThread::getSelf(); + auto request = std::make_shared( + spec, caller, cb); - auto libOpt = getSenseApiLibByApiName(spec->api); - if (!libOpt) - { - throw std::runtime_error( - std::string(__func__) + ": No library found for API '" - + spec->api + "'"); - } - auto& lib = *libOpt.value(); - if (!lib.senseApiDesc.sal_mgmt_libOps.attachDeviceReq) - { - throw std::runtime_error( - std::string(__func__) + ": attachDeviceReq() is NULL for library '" - + lib.libraryPath + "'"); - } - - /** EXPLANATION: - * We pass in either the body or world thread here, depending on whether - * the device is an introspector (idev) or extrospector (edev). - * - * Introspectors are attached to the body thread; extrospectors are attached - * to the world thread. - */ - std::shared_ptr threadForAttachment; - if (spec->sensorType == 'e') - { - threadForAttachment = mind::globalMind->world.thread; - std::cout << __func__ << ": Attaching edev " << spec->deviceIdentifier - << " to world thread" << "\n"; - } - else - { - threadForAttachment = mind::globalMind->body.thread; - std::cout << __func__ << ": Attaching non-edev " - << spec->deviceIdentifier << " to body thread" << "\n"; - } - - lib.senseApiDesc.sal_mgmt_libOps.attachDeviceReq( - spec, threadForAttachment, cb); + mrntt::mrntt.thread->getIoService().post( + std::bind( + &AttachSenseDeviceReq::attachSenseDeviceReq1_posted, + request.get(), request)); } void SenseApiManager::detachSenseDeviceReq( @@ -314,41 +437,58 @@ void SenseApiManager::detachSenseDeviceReq( detachSenseDeviceReqCbFn cb ) { - /** FIXME: - * We should acquire a spinlock here to ensure that the device isn't removed - * in the interim while the async op executes. - */ + const auto& caller = ComponentThread::getSelf(); + auto request = std::make_shared( + spec, caller, cb); - auto libOpt = getSenseApiLibByApiName(spec->api); - if (!libOpt) - { - throw std::runtime_error( - std::string(__func__) + ": No library found for API '" - + spec->api + "'"); - } - auto& lib = *libOpt.value(); - if (!lib.senseApiDesc.sal_mgmt_libOps.detachDeviceReq) - { - throw std::runtime_error( - std::string(__func__) + ": detachDeviceReq() is NULL for library '" - + lib.libraryPath + "'"); - } - lib.senseApiDesc.sal_mgmt_libOps.detachDeviceReq(spec, cb); + mrntt::mrntt.thread->getIoService().post( + std::bind( + &DetachSenseDeviceReq::detachSenseDeviceReq1_posted, + request.get(), request)); } class SenseApiManager::AttachAllSenseDevicesFromSpecsReq -: public AsynchronousContinuation +: public TargetedAsynchronousContinuation< + attachAllSenseDevicesFromSpecsReqCbFn> { public: AttachAllSenseDevicesFromSpecsReq( const unsigned int totalNSpecs, + const std::shared_ptr& caller, attachAllSenseDevicesFromSpecsReqCbFn cb) - : AsynchronousContinuation(std::move(cb)), + : TargetedAsynchronousContinuation( + caller, cb), loop(totalNSpecs) {} + void callOriginalCallback() + { + if (originalCbFn) + { + caller->getIoService().post( + std::bind( + originalCbFn, loop)); + } + } + +public: + void attachAllSenseDevicesFromSpecsReq1_posted( + [[maybe_unused]] std::shared_ptr context + ) + { + for (const auto& spec : device::DeviceManager::deviceAttachmentSpecs) + { + SenseApiManager::getInstance().attachSenseDeviceReq( + spec, + std::bind( + &AttachAllSenseDevicesFromSpecsReq::attachAllSenseDevicesFromSpecsReq2, + context.get(), context, + std::placeholders::_1, std::placeholders::_2)); + } + } + // Callback methods for the attachment sequence - void attachAllSenseDevicesFromSpecsReq1( + void attachAllSenseDevicesFromSpecsReq2( std::shared_ptr context, bool success, std::shared_ptr spec ) @@ -373,12 +513,7 @@ public: << context->loop.nFailed.load() << " devices failed\n"; } - context->originalCbFn(context->loop); - } - - void callOriginalCallback() - { - originalCbFn(loop); + context->callOriginalCallback(); } public: @@ -389,31 +524,22 @@ void SenseApiManager::attachAllSenseDevicesFromSpecsReq( attachAllSenseDevicesFromSpecsReqCbFn cb ) { - // Create the attachment request object to hold state and callbacks - auto request = std::make_shared( - device::DeviceManager::deviceAttachmentSpecs.size(), std::move(cb)); - - if (request->loop.nTotalIsZero()) + if (device::DeviceManager::getInstance().deviceAttachmentSpecs.size() == 0) { - request->callOriginalCallback(); + AsynchronousLoop tmp(0); + cb(tmp); return; } - for (const auto& spec : device::DeviceManager::deviceAttachmentSpecs) - { - try { - attachSenseDeviceReq( - spec, - std::bind( - &AttachAllSenseDevicesFromSpecsReq::attachAllSenseDevicesFromSpecsReq1, - request.get(), request, - std::placeholders::_1, std::placeholders::_2)); - } catch (const std::exception& e) { - std::cerr << __func__ << ": Exception: " << e.what() << "\n"; - if (request->loop.incrementSuccessOrFailureAndTestForCompletionDueTo(false)) - { request->callOriginalCallback(); } - } - } + const auto& caller = ComponentThread::getSelf(); + auto request = std::make_shared( + device::DeviceManager::getInstance().deviceAttachmentSpecs.size(), + caller, std::move(cb)); + + mrntt::mrntt.thread->getIoService().post( + std::bind( + &AttachAllSenseDevicesFromSpecsReq::attachAllSenseDevicesFromSpecsReq1_posted, + request.get(), request)); } class SenseApiManager::DetachAllSenseDevicesReq @@ -422,7 +548,22 @@ class SenseApiManager::DetachAllSenseDevicesReq public: using AttachAllSenseDevicesFromSpecsReq::AttachAllSenseDevicesFromSpecsReq; - void detachAllSenseDevicesReq1( + void detachAllSenseDevicesReq1_posted( + [[maybe_unused]] std::shared_ptr context + ) + { + for (const auto& spec : device::DeviceManager::deviceAttachmentSpecs) + { + SenseApiManager::getInstance().detachSenseDeviceReq( + spec, + std::bind( + &DetachAllSenseDevicesReq::detachAllSenseDevicesReq2, + context.get(), context, + std::placeholders::_1, std::placeholders::_2)); + } + } + + void detachAllSenseDevicesReq2( std::shared_ptr context, bool success, std::shared_ptr spec ) @@ -443,16 +584,11 @@ public: if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": " << context->loop.nSucceeded.load() - << " devices detached, " - << context->loop.nFailed.load() << " devices failed\n"; + << " devices detached, " + << context->loop.nFailed.load() << " devices failed\n"; } - context->originalCbFn(context->loop); - } - - void callOriginalCallback() - { - originalCbFn(loop); + context->callOriginalCallback(); } }; @@ -460,33 +596,22 @@ void SenseApiManager::detachAllSenseDevicesReq( detachAllSenseDevicesReqCbFn cb ) { - auto request = std::make_shared( - device::DeviceManager::deviceAttachmentSpecs.size(), std::move(cb)); - - if (request->loop.nTotalIsZero()) + if (device::DeviceManager::getInstance().deviceAttachmentSpecs.size() == 0) { - request->callOriginalCallback(); + AsynchronousLoop tmp(0); + cb(tmp); return; } - for (const auto& spec : device::DeviceManager::deviceAttachmentSpecs) - { - try { - detachSenseDeviceReq( - spec, - std::bind( - &DetachAllSenseDevicesReq::detachAllSenseDevicesReq1, - request.get(), request, - std::placeholders::_1, std::placeholders::_2)); - } catch (const std::exception& e) { - std::cerr << __func__ << ": Exception: " << e.what() << "\n"; - if (request->loop - .incrementSuccessOrFailureAndTestForCompletionDueTo(false)) - { - request->callOriginalCallback(); - } - } - } + const auto& caller = ComponentThread::getSelf(); + auto request = std::make_shared( + device::DeviceManager::getInstance().deviceAttachmentSpecs.size(), + caller, std::move(cb)); + + mrntt::mrntt.thread->getIoService().post( + std::bind( + &DetachAllSenseDevicesReq::detachAllSenseDevicesReq1_posted, + request.get(), request)); } } // namespace sense_api