#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace smo { namespace device { namespace { void assertMarionetteThread() { auto self = sscl::ComponentThread::getSelf(); if (self->id != SmoThreadId::MRNTT) { throw std::runtime_error( std::string(__func__) + ": Must be executed on Marionette thread"); } } std::shared_ptr threadForDeviceOp( const DeviceAttachmentSpec& spec) { if (spec.sensorType == 'e') { return mind::globalMind->world.thread; } return mind::globalMind->body.thread; } void throwIfStimBuffAttachFailed( const stim_buff::StimBuffDeviceOpResult &result, const char *callerFn) { if (result.success) { return; } throw std::runtime_error( std::string(callerFn) + ": attach failed for " + result.deviceSpec->stringify()); } void throwIfStimBuffDetachFailed( const stim_buff::StimBuffDeviceOpResult &result, const char *callerFn) { if (result.success) { return; } throw std::runtime_error( std::string(callerFn) + ": detach failed for " + result.deviceSpec->stringify()); } } // namespace DeviceManager::~DeviceManager() { } const std::string DeviceManager::stringifyDeviceSpecs(void) { std::ostringstream oss; for (const auto& spec : getInstance().s.rsrc.deviceAttachmentSpecs) { oss << "Device Attachment Spec: " << spec->stringify() << "\n"; } return oss.str(); } mrntt::MrnttViralPostingInvoker DeviceManager::attachStimBuffDeviceCReq( const std::shared_ptr& spec) { assertMarionetteThread(); auto &sbam = stim_buff::StimBuffApiManager::getInstance(); auto &lib = sbam.getStimBuffApiLibByApiName(spec->stimBuffApi); if (lib.loadedSharedLibrary->isBeingDestroyed.load()) { throw std::runtime_error( std::string(__func__) + ": Library is being destroyed" + " for API '" + spec->stimBuffApi + "'"); } if (!lib.stimBuffApiDesc.sal_mgmt_libOps.attachDeviceCReq) { throw std::runtime_error( std::string(__func__) + ": attachDeviceCReq() is NULL " "for library '" + lib.loadedSharedLibrary->libraryPath + "'"); } /* FIXME Locking here makes no sense. */ sscl::co::CoQutex::ReleaseHandle sbamGuard = co_await sbam.s.lock .getAcquireInvocationAndSuspensionPolicy(); sscl::co::CoQutex::ReleaseHandle libGuard = co_await lib.s.lock.getAcquireInvocationAndSuspensionPolicy(); sbamGuard.release(); /** EXPLANATION: * We pass in either the body or world thread here, depending on whether * the device is an introspector (idev) or extrospector (edev). * * Introspectors are attached to the body thread; extrospectors are * attached to the world thread. */ std::shared_ptr targetThread = threadForDeviceOp(*spec); if (spec->sensorType == 'e') { std::cout << __func__ << ": Attaching edev " << spec->deviceIdentifier << " to world thread" << "\n"; } else { std::cout << __func__ << ": Attaching non-edev " << spec->deviceIdentifier << " to body thread" << "\n"; } stim_buff::StimBuffDeviceOpResult result = co_await lib.stimBuffApiDesc.sal_mgmt_libOps.attachDeviceCReq( sscl::co::ExplicitPostTarget{targetThread->getIoContext()}, spec, targetThread); throwIfStimBuffAttachFailed(result, __func__); co_return; } mrntt::MrnttViralPostingInvoker DeviceManager::detachStimBuffDeviceCReq( const std::shared_ptr& spec) { assertMarionetteThread(); auto &sbam = stim_buff::StimBuffApiManager::getInstance(); auto &lib = sbam.getStimBuffApiLibByApiName(spec->stimBuffApi); if (lib.loadedSharedLibrary->isBeingDestroyed.load()) { throw std::runtime_error( std::string(__func__) + ": Library is being destroyed" + " for API '" + spec->stimBuffApi + "'"); } if (!lib.stimBuffApiDesc.sal_mgmt_libOps.detachDeviceCReq) { throw std::runtime_error( std::string(__func__) + ": detachDeviceCReq() is NULL " "for library '" + lib.loadedSharedLibrary->libraryPath + "'"); } /* FIXME Locking here makes no sense. */ sscl::co::CoQutex::ReleaseHandle sbamGuard = co_await sbam.s.lock .getAcquireInvocationAndSuspensionPolicy(); sscl::co::CoQutex::ReleaseHandle libGuard = co_await lib.s.lock.getAcquireInvocationAndSuspensionPolicy(); sbamGuard.release(); std::shared_ptr targetThread = threadForDeviceOp(*spec); stim_buff::StimBuffDeviceOpResult result = co_await lib.stimBuffApiDesc.sal_mgmt_libOps.detachDeviceCReq( sscl::co::ExplicitPostTarget{targetThread->getIoContext()}, spec); throwIfStimBuffDetachFailed(result, __func__); co_return; } mrntt::MrnttViralPostingInvoker> DeviceManager::newDeviceAttachmentSpecIndCReq( const DeviceAttachmentSpec &spec) { assertMarionetteThread(); DeviceManager &dm = getInstance(); sscl::co::CoQutex::ReleaseHandle dmGuard = co_await dm.s.lock.getAcquireInvocationAndSuspensionPolicy(); // First, add the spec to deviceAttachmentSpecs if it's not already there std::shared_ptr specPtr; bool specExists = false; for (const auto& existingSpec : dm.s.rsrc.deviceAttachmentSpecs) { if (*existingSpec == spec) { specExists = true; specPtr = existingSpec; break; } } if (!specExists) { specPtr = std::make_shared(spec); dm.s.rsrc.deviceAttachmentSpecs.push_back(specPtr); } std::shared_ptr device; bool deviceExists = false; for (const auto& existingDevice : dm.s.rsrc.devices) { if (existingDevice->deviceIdentifier != spec.deviceIdentifier) { continue; } device = existingDevice; deviceExists = true; break; } // If device doesn't exist, create a new one and add it if (!device) { device = std::make_shared(spec.deviceIdentifier); dm.s.rsrc.devices.push_back(device); } // Check if a DeviceRole w/ this spec already exists in attachedDeviceRoles std::shared_ptr existingDeviceRole; bool deviceRoleExists = false; for (const auto& role : dm.s.rsrc.attachedDeviceRoles) { if (*role->deviceAttachmentSpec == spec) { deviceRoleExists = true; existingDeviceRole = role; break; } } // If DeviceRole exists, both spec and device must also exist if (deviceRoleExists) { if (!specExists || !deviceExists) { throw std::runtime_error( "Program error: DeviceRole exists but spec or device doesn't " "pre-exist. specExists=" + std::to_string(specExists) + ", deviceExists=" + std::to_string(deviceExists)); } co_return existingDeviceRole; } dmGuard.release(); /* FIXME: * We should add an unlocked flag to at/detachStimBuffDeviceCReq() * so we can call it with the devmgr lock held. */ co_await attachStimBuffDeviceCReq(specPtr); sscl::co::CoQutex::ReleaseHandle dmGuardAfterAttach = co_await dm.s.lock.getAcquireInvocationAndSuspensionPolicy(); (void)dmGuardAfterAttach; auto deviceRole = std::make_shared(*device, specPtr); device->deviceRoles.push_back(deviceRole); dm.s.rsrc.attachedDeviceRoles.push_back(deviceRole); co_return deviceRole; } mrntt::MrnttViralPostingInvoker DeviceManager::removeDeviceAttachmentSpecCReq( const DeviceAttachmentSpec &spec) { assertMarionetteThread(); DeviceManager &dm = getInstance(); sscl::co::CoQutex::ReleaseHandle dmGuard = co_await dm.s.lock.getAcquireInvocationAndSuspensionPolicy(); // Find the shared_ptr to the spec in the collection std::shared_ptr specPtr; for (const auto& existingSpec : dm.s.rsrc.deviceAttachmentSpecs) { if (*existingSpec == spec) { specPtr = existingSpec; break; } } if (!specPtr) { throw std::runtime_error( std::string(__func__) + ": Device attachment spec not found: " + spec.stringify()); } dmGuard.release(); /* FIXME: * We should add an unlocked flag to at/detachStimBuffDeviceCReq() * so we can call it with the devmgr lock held. */ co_await detachStimBuffDeviceCReq(specPtr); sscl::co::CoQutex::ReleaseHandle dmGuardAfterDetach = co_await dm.s.lock.getAcquireInvocationAndSuspensionPolicy(); (void)dmGuardAfterDetach; // Find the DeviceRole in attachedDeviceRoles auto deviceRoleIt = std::find_if( dm.s.rsrc.attachedDeviceRoles.begin(), dm.s.rsrc.attachedDeviceRoles.end(), [&specPtr](const std::shared_ptr &role) { return *role->deviceAttachmentSpec == *specPtr; } ); if (deviceRoleIt == dm.s.rsrc.attachedDeviceRoles.end()) { throw std::runtime_error( std::string(__func__) + ": DeviceRole not found for spec (race " "condition)?: " + specPtr->stringify() + ", deviceRoles=" + std::to_string(dm.s.rsrc.attachedDeviceRoles.size())); } auto deviceRole = *deviceRoleIt; auto& device = deviceRole->parentDevice; // Remove DeviceRole from DeviceManager's collection dm.s.rsrc.attachedDeviceRoles.erase(deviceRoleIt); // Remove DeviceRole from Device's collection auto deviceRoleIt2 = std::find( device.deviceRoles.begin(), device.deviceRoles.end(), deviceRole); if (deviceRoleIt2 != device.deviceRoles.end()) { device.deviceRoles.erase(deviceRoleIt2); } // Remove DeviceAttachmentSpec from deviceAttachmentSpecs collection auto specIt = std::find_if( dm.s.rsrc.deviceAttachmentSpecs.begin(), dm.s.rsrc.deviceAttachmentSpecs.end(), [&specPtr]( const std::shared_ptr &existingSpec) { return *existingSpec == *specPtr; } ); if (specIt != dm.s.rsrc.deviceAttachmentSpecs.end()) { dm.s.rsrc.deviceAttachmentSpecs.erase(specIt); } co_return; } mrntt::MrnttViralPostingInvoker DeviceManager::attachAllUnattachedDevicesFromCReq( const std::shared_ptr> &specs) { assertMarionetteThread(); const unsigned int nTotal = static_cast(specs->size()); if (nTotal == 0) { co_return sscl::MultiOperationResultSetWithException{}; } sscl::co::Group group; std::vector< mrntt::MrnttViralPostingInvoker>> invokers; invokers.reserve(nTotal); for (const auto &spec : *specs) { invokers.emplace_back(newDeviceAttachmentSpecIndCReq(spec)); group.add(invokers.back()); } const std::vector &settlements = co_await group.getAwaitAllSettlementsInvoker(); unsigned int nSucceeded = 0; unsigned int nFailed = 0; using SettlementType = sscl::co::Group::SettlementDescriptor::TypeE; for (const auto &desc : settlements) { if (desc.type == SettlementType::EXCEPTION_THROWN) { nFailed++; } else { nSucceeded++; } } std::exception_ptr memberFailureException = nullptr; if (nFailed > 0) { memberFailureException = group.captureAggregatedGroupExceptions(); } co_return sscl::MultiOperationResultSetWithException( sscl::MultiOperationResultSet(nTotal, nSucceeded, nFailed), memberFailureException); } mrntt::MrnttViralPostingInvoker DeviceManager::attachAllUnattachedDevicesFromKnownListCReq() { assertMarionetteThread(); DeviceManager &dm = getInstance(); sscl::co::CoQutex::ReleaseHandle dmGuard = co_await dm.s.lock.getAcquireInvocationAndSuspensionPolicy(); auto unattachedSpecs = std::make_shared< std::vector>(); for (const auto& spec : dm.s.rsrc.deviceAttachmentSpecs) { bool isAttached = false; for (const auto& role : dm.s.rsrc.attachedDeviceRoles) { if (*role->deviceAttachmentSpec == *spec) { isAttached = true; break; } } if (!isAttached) { unattachedSpecs->push_back(*spec); } } dmGuard.release(); const sscl::MultiOperationResultSetWithException batchResult = co_await attachAllUnattachedDevicesFromCReq(unattachedSpecs); if (batchResult.results.nSucceeded > 0) { std::cout << "DeviceReattacher: Successfully reattached " << batchResult.results.nSucceeded << " of " << batchResult.results.nTotal << " devices" << std::endl; } if (batchResult.hasMemberFailure()) { try { std::rethrow_exception(batchResult.memberFailureException); } catch (const std::exception &e) { std::cerr << __func__ << ": " << e.what() << std::endl; } } co_return; } mrntt::MrnttViralPostingInvoker DeviceManager::attachAllUnattachedDevicesFromCmdlineCReq() { auto specs = std::make_shared>( getInstance().s.rsrc.commandLineDASpecs); const sscl::MultiOperationResultSetWithException batchResult = co_await attachAllUnattachedDevicesFromCReq(specs); std::cout << "Mrntt: attached " << batchResult.results.nSucceeded << " of " << batchResult.results.nTotal << " sense devices." << "\n"; if (batchResult.results.nTotal > 0 && batchResult.results.nSucceeded == 0) { std::string message = std::string(__func__) + ": Startup policy requires at least one cmdline sense device " "to attach successfully; 0 of " + std::to_string(batchResult.results.nTotal) + " requested sense devices attached — aborting startup."; if (batchResult.hasMemberFailure()) { try { std::rethrow_exception(batchResult.memberFailureException); } catch (const std::exception &e) { message += "\n"; message += e.what(); } catch (...) { message += "\n"; } } throw std::runtime_error(message); } co_return; } mrntt::MrnttViralPostingInvoker DeviceManager::detachAllAttachedDeviceRolesCReq() { assertMarionetteThread(); std::vector> specsToDetach; specsToDetach.reserve(getInstance().s.rsrc.attachedDeviceRoles.size()); for (const auto& deviceRole : getInstance().s.rsrc.attachedDeviceRoles) { specsToDetach.push_back(deviceRole->deviceAttachmentSpec); } sscl::AsynchronousLoop loop( static_cast(specsToDetach.size())); if (loop.nTotalIsZero()) { co_return; } sscl::co::Group group; std::vector> invokers; invokers.reserve(loop.nTotal); for (const auto &spec : specsToDetach) { invokers.emplace_back(detachStimBuffDeviceCReq(spec)); group.add(invokers.back()); } const std::vector &settlements = co_await group.getAwaitAllSettlementsInvoker(); using SettlementType = sscl::co::Group::SettlementDescriptor::TypeE; for (const auto &desc : settlements) { loop.incrementSuccessOrFailureDueTo( desc.type != SettlementType::EXCEPTION_THROWN); } if (loop.nFailed.load() > 0) { try { group.checkForAndReThrowGroupExceptions(); } catch (const std::exception &e) { std::cerr << __func__ << ": " << e.what() << std::endl; } } const unsigned int nSucceeded = loop.nSucceeded.load(); const unsigned int nFailed = loop.nFailed.load(); if (nFailed > 0) { std::cerr << "Mrntt: Failed to detach " << nFailed << " of " << loop.nTotal << " sense devices." << "\n"; } else { std::cout << "Mrntt: Successfully detached " << nSucceeded << " of " << loop.nTotal << " sense devices." << "\n"; } if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": " << nSucceeded << " devices detached, " << nFailed << " devices failed\n"; } co_return; } void DeviceManager::initializeDeviceReattacher() { deviceReattacher = std::make_unique( *this, mrntt::mrntt.thread); deviceReattacher->start(); } void DeviceManager::finalizeDeviceReattacher() { if (!deviceReattacher) { return; } deviceReattacher->stop(); deviceReattacher.reset(); } } // namespace device } // namespace smo