#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace smo { namespace device { namespace { void assertMarionetteThread() { auto self = sscl::ComponentThread::getSelf(); if (self->id != SmoThreadId::MRNTT) { throw std::runtime_error( std::string(__func__) + ": Must be executed on Marionette thread"); } } std::shared_ptr threadForDeviceOp( const DeviceAttachmentSpec& spec) { if (spec.sensorType == 'e') { return mind::globalMind->world.thread; } return mind::globalMind->body.thread; } } // namespace DeviceManager::~DeviceManager() { } const std::string DeviceManager::stringifyDeviceSpecs(void) { std::ostringstream oss; for (const auto& spec : getInstance().s.rsrc.deviceAttachmentSpecs) { oss << "Device Attachment Spec: " << spec->stringify(); } return oss.str(); } mrntt::MrnttViralPostingInvoker DeviceManager::attachStimBuffDeviceCReq( const std::shared_ptr& spec) { assertMarionetteThread(); auto &sbam = stim_buff::StimBuffApiManager::getInstance(); auto libOpt = sbam.getStimBuffApiLibByApiName(spec->stimBuffApi); if (!libOpt) { std::cerr << "attachStimBuffDeviceCReq: No library found for API '" << spec->stimBuffApi << "'" << std::endl; co_return stim_buff::StimBuffDeviceOpResult{false, spec}; } auto &lib = *libOpt.value(); if (lib.isBeingDestroyed.load()) { std::cerr << std::string(__func__) + ": Library is being destroyed" << " for API '" << spec->stimBuffApi << "'. Bailing out.\n"; co_return stim_buff::StimBuffDeviceOpResult{false, spec}; } if (!lib.stimBuffApiDesc.sal_mgmt_libOps.attachDeviceCReq) { std::cerr << std::string(__func__) + ": attachDeviceCReq() is NULL " "for library '" << lib.libraryPath << "'" << std::endl; co_return stim_buff::StimBuffDeviceOpResult{false, spec}; } sscl::co::CoQutex::ReleaseHandle sbamGuard = co_await sbam.s.lock.getAcquireInvocationAndSuspensionPolicy(); sscl::co::CoQutex::ReleaseHandle libGuard = co_await lib.s.lock.getAcquireInvocationAndSuspensionPolicy(); sbamGuard.release(); /** EXPLANATION: * We pass in either the body or world thread here, depending on whether * the device is an introspector (idev) or extrospector (edev). * * Introspectors are attached to the body thread; extrospectors are * attached to the world thread. */ std::shared_ptr targetThread = threadForDeviceOp(*spec); if (spec->sensorType == 'e') { std::cout << __func__ << ": Attaching edev " << spec->deviceIdentifier << " to world thread" << "\n"; } else { std::cout << __func__ << ": Attaching non-edev " << spec->deviceIdentifier << " to body thread" << "\n"; } co_return co_await lib.stimBuffApiDesc.sal_mgmt_libOps.attachDeviceCReq( sscl::co::ExplicitPostTarget{targetThread->getIoContext()}, spec, targetThread); } mrntt::MrnttViralPostingInvoker DeviceManager::detachStimBuffDeviceCReq( const std::shared_ptr& spec) { assertMarionetteThread(); auto &sbam = stim_buff::StimBuffApiManager::getInstance(); auto libOpt = sbam.getStimBuffApiLibByApiName(spec->stimBuffApi); if (!libOpt) { std::cerr << "detachStimBuffDeviceCReq: No library found for API '" << spec->stimBuffApi << "'" << std::endl; co_return stim_buff::StimBuffDeviceOpResult{false, spec}; } auto &lib = *libOpt.value(); if (lib.isBeingDestroyed.load()) { std::cerr << std::string(__func__) + ": Library is being destroyed" << " for API '" << spec->stimBuffApi << "'. Bailing out.\n"; co_return stim_buff::StimBuffDeviceOpResult{false, spec}; } if (!lib.stimBuffApiDesc.sal_mgmt_libOps.detachDeviceCReq) { std::cerr << std::string(__func__) + ": detachDeviceCReq() is NULL " "for library '" << lib.libraryPath << "'" << std::endl; co_return stim_buff::StimBuffDeviceOpResult{false, spec}; } sscl::co::CoQutex::ReleaseHandle sbamGuard = co_await sbam.s.lock.getAcquireInvocationAndSuspensionPolicy(); sscl::co::CoQutex::ReleaseHandle libGuard = co_await lib.s.lock.getAcquireInvocationAndSuspensionPolicy(); sbamGuard.release(); std::shared_ptr targetThread = threadForDeviceOp(*spec); co_return co_await lib.stimBuffApiDesc.sal_mgmt_libOps.detachDeviceCReq( sscl::co::ExplicitPostTarget{targetThread->getIoContext()}, spec); } mrntt::MrnttViralPostingInvoker DeviceManager::newDeviceAttachmentSpecIndCReq( const DeviceAttachmentSpec &spec) { assertMarionetteThread(); DeviceManager &dm = getInstance(); sscl::co::CoQutex::ReleaseHandle dmGuard = co_await dm.s.lock.getAcquireInvocationAndSuspensionPolicy(); // First, add the spec to deviceAttachmentSpecs if it's not already there std::shared_ptr specPtr; bool specExists = false; for (const auto& existingSpec : dm.s.rsrc.deviceAttachmentSpecs) { if (*existingSpec == spec) { specExists = true; specPtr = existingSpec; break; } } if (!specExists) { specPtr = std::make_shared(spec); dm.s.rsrc.deviceAttachmentSpecs.push_back(specPtr); } std::shared_ptr device; bool deviceExists = false; for (const auto& existingDevice : dm.s.rsrc.devices) { if (existingDevice->deviceIdentifier != spec.deviceIdentifier) { continue; } device = existingDevice; deviceExists = true; break; } // If device doesn't exist, create a new one and add it if (!device) { device = std::make_shared(spec.deviceIdentifier); dm.s.rsrc.devices.push_back(device); } // Check if a DeviceRole w/ this spec already exists in attachedDeviceRoles std::shared_ptr existingDeviceRole; bool deviceRoleExists = false; for (const auto& role : dm.s.rsrc.attachedDeviceRoles) { if (*role->deviceAttachmentSpec == spec) { deviceRoleExists = true; existingDeviceRole = role; break; } } // If DeviceRole exists, both spec and device must also exist if (deviceRoleExists) { if (!specExists || !deviceExists) { throw std::runtime_error( "Program error: DeviceRole exists but spec or device doesn't " "pre-exist. specExists=" + std::to_string(specExists) + ", deviceExists=" + std::to_string(deviceExists)); } // Already attached, return success co_return DeviceAttachmentIndResult{ true, existingDeviceRole, specPtr}; } stim_buff::StimBuffDeviceOpResult attachResult = co_await attachStimBuffDeviceCReq(specPtr); if (!attachResult.success) { std::cerr << __func__ << ": Attach failed for device spec " << attachResult.deviceSpec->stringify() << std::endl; co_return DeviceAttachmentIndResult{ false, nullptr, attachResult.deviceSpec}; } try { // Create DeviceRole and add it to both DeviceManager's and Device's collections auto deviceRole = std::make_shared(*device, specPtr); device->deviceRoles.push_back(deviceRole); dm.s.rsrc.attachedDeviceRoles.push_back(deviceRole); co_return DeviceAttachmentIndResult{true, deviceRole, specPtr}; } catch (const std::exception&) { // Attach failed, return error co_return DeviceAttachmentIndResult{false, nullptr, specPtr}; } } mrntt::MrnttViralPostingInvoker DeviceManager::removeDeviceAttachmentSpecCReq( const DeviceAttachmentSpec &spec) { assertMarionetteThread(); DeviceManager &dm = getInstance(); sscl::co::CoQutex::ReleaseHandle dmGuard = co_await dm.s.lock.getAcquireInvocationAndSuspensionPolicy(); // Find the shared_ptr to the spec in the collection std::shared_ptr specPtr; for (const auto& existingSpec : dm.s.rsrc.deviceAttachmentSpecs) { if (*existingSpec == spec) { specPtr = existingSpec; break; } } if (!specPtr) { // Spec not found, return failure co_return DeviceAttachmentIndResult{false, nullptr, nullptr}; } // Call detachStimBuffDeviceCReq first - only clean up metadata if this succeeds stim_buff::StimBuffDeviceOpResult detachResult = co_await detachStimBuffDeviceCReq(specPtr); if (!detachResult.success) { // Detach failed, metadata remains intact co_return DeviceAttachmentIndResult{ false, nullptr, detachResult.deviceSpec}; } // Detach succeeded, now find and clean up metadata try { // Find the DeviceRole in attachedDeviceRoles auto deviceRoleIt = std::find_if( dm.s.rsrc.attachedDeviceRoles.begin(), dm.s.rsrc.attachedDeviceRoles.end(), [&specPtr](const std::shared_ptr &role) { return *role->deviceAttachmentSpec == *specPtr; } ); if (deviceRoleIt == dm.s.rsrc.attachedDeviceRoles.end()) { // DeviceRole not found, return failure co_return DeviceAttachmentIndResult{ false, nullptr, detachResult.deviceSpec}; } auto deviceRole = *deviceRoleIt; auto& device = deviceRole->parentDevice; // Remove DeviceRole from DeviceManager's collection dm.s.rsrc.attachedDeviceRoles.erase(deviceRoleIt); // Remove DeviceRole from Device's collection auto deviceRoleIt2 = std::find( device.deviceRoles.begin(), device.deviceRoles.end(), deviceRole); if (deviceRoleIt2 != device.deviceRoles.end()) { device.deviceRoles.erase(deviceRoleIt2); } // Remove DeviceAttachmentSpec from deviceAttachmentSpecs collection auto specIt = std::find_if( dm.s.rsrc.deviceAttachmentSpecs.begin(), dm.s.rsrc.deviceAttachmentSpecs.end(), [&specPtr]( const std::shared_ptr &existingSpec) { return *existingSpec == *specPtr; } ); if (specIt != dm.s.rsrc.deviceAttachmentSpecs.end()) { dm.s.rsrc.deviceAttachmentSpecs.erase(specIt); } co_return DeviceAttachmentIndResult{ true, deviceRole, detachResult.deviceSpec}; } catch (const std::exception&) { // Cleanup failed, return error co_return DeviceAttachmentIndResult{ false, nullptr, detachResult.deviceSpec}; } } mrntt::MrnttViralPostingInvoker DeviceManager::attachAllUnattachedDevicesFromCReq( const std::shared_ptr> &specs) { assertMarionetteThread(); if (specs->empty()) { co_return sscl::MultiOperationResultSet{}; } sscl::co::Group group; std::vector< mrntt::MrnttViralPostingInvoker> invokers; invokers.reserve(specs->size()); for (const auto &spec : *specs) { invokers.emplace_back(newDeviceAttachmentSpecIndCReq(spec)); group.add(invokers.back()); } co_await group.getAwaitAllSettlementsInvoker(); group.checkForAndReThrowGroupExceptions(); unsigned int nSucceeded = 0; unsigned int nFailed = 0; for (auto &invoker : invokers) { if (invoker.completedReturnValues().myReturnValue.success) { nSucceeded++; } else { nFailed++; } } if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": " << nSucceeded << " devices attached, " << nFailed << " devices failed\n"; } co_return sscl::MultiOperationResultSet( static_cast(specs->size()), nSucceeded, nFailed); } mrntt::MrnttViralPostingInvoker DeviceManager::attachAllUnattachedDevicesFromKnownListCReq() { assertMarionetteThread(); DeviceManager &dm = getInstance(); sscl::co::CoQutex::ReleaseHandle dmGuard = co_await dm.s.lock.getAcquireInvocationAndSuspensionPolicy(); auto unattachedSpecs = std::make_shared< std::vector>(); for (const auto& spec : dm.s.rsrc.deviceAttachmentSpecs) { bool isAttached = false; for (const auto& role : dm.s.rsrc.attachedDeviceRoles) { if (*role->deviceAttachmentSpec == *spec) { isAttached = true; break; } } if (!isAttached) { unattachedSpecs->push_back(*spec); } } dmGuard.release(); co_return co_await attachAllUnattachedDevicesFromCReq(unattachedSpecs); } mrntt::MrnttViralPostingInvoker DeviceManager::attachAllUnattachedDevicesFromCmdlineCReq() { auto specs = std::make_shared>( getInstance().s.rsrc.commandLineDASpecs); co_return co_await attachAllUnattachedDevicesFromCReq(specs); } mrntt::MrnttViralPostingInvoker DeviceManager::detachAllAttachedDeviceRolesCReq() { assertMarionetteThread(); std::vector> specsToDetach; specsToDetach.reserve(getInstance().s.rsrc.attachedDeviceRoles.size()); for (const auto& deviceRole : getInstance().s.rsrc.attachedDeviceRoles) { specsToDetach.push_back(deviceRole->deviceAttachmentSpec); } if (specsToDetach.empty()) { co_return sscl::MultiOperationResultSet{}; } sscl::co::Group group; std::vector< mrntt::MrnttViralPostingInvoker> invokers; invokers.reserve(specsToDetach.size()); for (const auto &spec : specsToDetach) { invokers.emplace_back(detachStimBuffDeviceCReq(spec)); group.add(invokers.back()); } co_await group.getAwaitAllSettlementsInvoker(); group.checkForAndReThrowGroupExceptions(); unsigned int nSucceeded = 0; unsigned int nFailed = 0; for (auto &invoker : invokers) { if (invoker.completedReturnValues().myReturnValue.success) { nSucceeded++; } else { nFailed++; } } if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": " << nSucceeded << " devices detached, " << nFailed << " devices failed\n"; } co_return sscl::MultiOperationResultSet( static_cast(specsToDetach.size()), nSucceeded, nFailed); } void DeviceManager::initializeDeviceReattacher() { deviceReattacher = std::make_unique( *this, mrntt::mrntt.thread); deviceReattacher->start(); } void DeviceManager::finalizeDeviceReattacher() { if (!deviceReattacher) { return; } deviceReattacher->stop(); deviceReattacher.reset(); } } // namespace device } // namespace smo