#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace smo { namespace device { std::vector> DeviceManager::deviceAttachmentSpecs; std::vector> DeviceManager::devices; std::vector> DeviceManager::attachedDeviceRoles; std::vector DeviceManager::commandLineDASpecs; DeviceManager::~DeviceManager() { } const std::string DeviceManager::stringifyDeviceSpecs(void) { std::ostringstream oss; for (const auto& spec : DeviceManager::deviceAttachmentSpecs) { oss << "Device Attachment Spec: " << spec->stringify(); } return oss.str(); } class DeviceManager::NewDeviceAttachmentSpecInd : public SerializedAsynchronousContinuation { public: NewDeviceAttachmentSpecInd( const DeviceAttachmentSpec &spec, const std::shared_ptr &caller, Callback cb, std::vector> requiredLocks) : SerializedAsynchronousContinuation( caller, cb, requiredLocks), spec(spec) {} public: DeviceAttachmentSpec spec; std::shared_ptr specPtr; std::shared_ptr device; public: void newDeviceAttachmentSpecInd1_posted( [[maybe_unused]] std::shared_ptr context ) { // First, add the spec to deviceAttachmentSpecs if it's not already there bool specExists = false; for (const auto& existingSpec : DeviceManager::deviceAttachmentSpecs) { if (*existingSpec == spec) { specExists = true; specPtr = existingSpec; break; } } if (!specExists) { specPtr = std::make_shared(spec); DeviceManager::deviceAttachmentSpecs.push_back(specPtr); } bool deviceExists = false; for (const auto& existingDevice : DeviceManager::devices) { if (existingDevice->deviceIdentifier != spec.deviceIdentifier) { continue; } device = existingDevice; deviceExists = true; break; } // If device doesn't exist, create a new one and add it if (!device) { device = std::make_shared(spec.deviceIdentifier); DeviceManager::devices.push_back(device); } // Check if a DeviceRole w/ this spec already exists in attachedDeviceRoles bool deviceRoleExists = false; std::shared_ptr existingDeviceRole = nullptr; for (const auto& role : DeviceManager::attachedDeviceRoles) { if (*role->deviceAttachmentSpec == spec) { deviceRoleExists = true; existingDeviceRole = role; break; } } // If DeviceRole exists, both spec and device must also exist if (deviceRoleExists) { if (!specExists || !deviceExists) { throw std::runtime_error( "Program error: DeviceRole exists but spec or device doesn't " "pre-exist. specExists=" + std::to_string(specExists) + ", deviceExists=" + std::to_string(deviceExists)); } // Already attached, callback with success and return callOriginalCb(true, existingDeviceRole, specPtr); return; } DeviceManager::getInstance().attachStimBuffDeviceReq( specPtr, {context, std::bind( &NewDeviceAttachmentSpecInd::newDeviceAttachmentSpecInd2, context.get(), context, std::placeholders::_1, std::placeholders::_2)}); } void newDeviceAttachmentSpecInd2( [[maybe_unused]] std::shared_ptr context, bool success, std::shared_ptr deviceSpec ) { if (!success) { std::cerr << __func__ << ": Attach failed for device spec " << deviceSpec->stringify() << std::endl; callOriginalCb(false, nullptr, deviceSpec); return; } try { // Create DeviceRole and add it to both DeviceManager's and Device's collections auto deviceRole = std::make_shared(*device, specPtr); device->deviceRoles.push_back(deviceRole); DeviceManager::attachedDeviceRoles.push_back(deviceRole); // Callback with success callOriginalCb(true, deviceRole, specPtr); } catch (const std::exception& e) { // Attach failed, callback with error callOriginalCb(false, nullptr, specPtr); } } }; class DeviceManager::RemoveDeviceAttachmentSpecReq : public SerializedAsynchronousContinuation { public: RemoveDeviceAttachmentSpecReq( const DeviceAttachmentSpec &spec, const std::shared_ptr &caller, Callback cb, std::vector> requiredLocks) : SerializedAsynchronousContinuation( caller, cb, requiredLocks), spec(spec) {} public: DeviceAttachmentSpec spec; std::shared_ptr specPtr; public: void removeDeviceAttachmentSpecReq1_posted( [[maybe_unused]] std::shared_ptr context ) { // Find the shared_ptr to the spec in the collection for (const auto& existingSpec : DeviceManager::deviceAttachmentSpecs) { if (*existingSpec == spec) { specPtr = existingSpec; break; } } if (!specPtr) { // Spec not found, callback with failure and return callOriginalCb(false, nullptr); return; } // Call detachStimBuffDeviceReq first - only clean up metadata if this succeeds DeviceManager::getInstance().detachStimBuffDeviceReq( specPtr, {context, std::bind( &RemoveDeviceAttachmentSpecReq::removeDeviceAttachmentSpecReq2, context.get(), context, std::placeholders::_1, std::placeholders::_2)}); } void removeDeviceAttachmentSpecReq2( [[maybe_unused]] std::shared_ptr context, bool success, std::shared_ptr deviceSpec ) { if (!success) { // Detach failed, callback with failure (metadata remains intact) callOriginalCb(false, deviceSpec); return; } // Detach succeeded, now find and clean up metadata try { // Find the DeviceRole in attachedDeviceRoles auto deviceRoleIt = std::find_if( DeviceManager::attachedDeviceRoles.begin(), DeviceManager::attachedDeviceRoles.end(), [&specPtr = specPtr](const std::shared_ptr &role) { return *role->deviceAttachmentSpec == *specPtr; } ); if (deviceRoleIt == DeviceManager::attachedDeviceRoles.end()) { // DeviceRole not found, callback with failure callOriginalCb(false, deviceSpec); return; } auto deviceRole = *deviceRoleIt; auto& device = deviceRole->parentDevice; // Remove DeviceRole from DeviceManager's collection DeviceManager::attachedDeviceRoles.erase(deviceRoleIt); // Remove DeviceRole from Device's collection auto deviceRoleIt2 = std::find( device.deviceRoles.begin(), device.deviceRoles.end(), deviceRole); if (deviceRoleIt2 != device.deviceRoles.end()) { device.deviceRoles.erase(deviceRoleIt2); } // Remove DeviceAttachmentSpec from deviceAttachmentSpecs collection auto specIt = std::find_if( DeviceManager::deviceAttachmentSpecs.begin(), DeviceManager::deviceAttachmentSpecs.end(), [&specPtr = specPtr]( const std::shared_ptr &existingSpec) { return *existingSpec == *specPtr; } ); if (specIt != DeviceManager::deviceAttachmentSpecs.end()) { DeviceManager::deviceAttachmentSpecs.erase(specIt); } // Callback with success callOriginalCb(true, deviceSpec); } catch (const std::exception& e) { // Cleanup failed, callback with error callOriginalCb(false, deviceSpec); } } }; void DeviceManager::newDeviceAttachmentSpecInd( const DeviceAttachmentSpec &spec, Callback callback) { const auto& caller = ComponentThread::getSelf(); auto request = std::make_shared( spec, caller, callback, LockSet::Set{ std::ref(DeviceManager::getInstance().qutex) }); NewDeviceAttachmentSpecInd::LockerAndInvoker lockvoker( *request, mrntt::mrntt.thread, std::bind( &NewDeviceAttachmentSpecInd::newDeviceAttachmentSpecInd1_posted, request.get(), request)); } void DeviceManager::removeDeviceAttachmentSpecReq( const DeviceAttachmentSpec &spec, Callback callback) { const auto& caller = ComponentThread::getSelf(); auto request = std::make_shared( spec, caller, callback, LockSet::Set{ std::ref(DeviceManager::getInstance().qutex) }); RemoveDeviceAttachmentSpecReq::LockerAndInvoker lockvoker( *request, mrntt::mrntt.thread, std::bind( &RemoveDeviceAttachmentSpecReq ::removeDeviceAttachmentSpecReq1_posted, request.get(), request)); } class DeviceManager::AttachStimBuffDeviceReq : public SerializedAsynchronousContinuation< DeviceManager::attachStimBuffDeviceReqCbFn> { public: AttachStimBuffDeviceReq( const std::shared_ptr& spec, const std::shared_ptr &caller, Callback cb, std::shared_ptr &stimBuffApiLib, std::vector> requiredLocks) : SerializedAsynchronousContinuation( caller, cb, requiredLocks), spec(spec), stimBuffApiLib(stimBuffApiLib) {} public: void attachStimBuffDeviceReq1_posted( [[maybe_unused]] std::shared_ptr context ) { if (caller->id != ComponentThread::MRNTT) { std::cerr << std::string(__func__) << ": executed on non-mrntt thread: " << caller->name << std::endl; callOriginalCb(false, spec); return; } if (stimBuffApiLib->isBeingDestroyed.load()) { std::cerr << std::string(__func__) + ": Library is being destroyed" << " for API '" << spec->stimBuffApi << "'. Bailing out.\n"; callOriginalCb(false, spec); return; } if (!stimBuffApiLib->stimBuffApiDesc.sal_mgmt_libOps.attachDeviceReq) { std::cerr << std::string(__func__) + ": attachDeviceReq() is NULL " "for library '" << stimBuffApiLib->libraryPath << "'" << std::endl; callOriginalCb(false, spec); return; } releaseQutexEarly(stim_buff::StimBuffApiManager::getInstance().qutex); /** EXPLANATION: * We pass in either the body or world thread here, depending on whether * the device is an introspector (idev) or extrospector (edev). * * Introspectors are attached to the body thread; extrospectors are * attached to the world thread. */ std::shared_ptr threadForAttachment; if (spec->sensorType == 'e') { threadForAttachment = mind::globalMind->world.thread; std::cout << __func__ << ": Attaching edev " << spec->deviceIdentifier << " to world thread" << "\n"; } else { threadForAttachment = mind::globalMind->body.thread; std::cout << __func__ << ": Attaching non-edev " << spec->deviceIdentifier << " to body thread" << "\n"; } stimBuffApiLib->stimBuffApiDesc.sal_mgmt_libOps.attachDeviceReq( spec, threadForAttachment, {context, std::bind( &AttachStimBuffDeviceReq::attachStimBuffDeviceReq2, context.get(), context, std::placeholders::_1, std::placeholders::_2)}); } void attachStimBuffDeviceReq2( [[maybe_unused]] std::shared_ptr context, bool success, std::shared_ptr deviceSpec ) { callOriginalCb(success, deviceSpec); } void detachStimBuffDeviceReq1_posted( [[maybe_unused]] std::shared_ptr context ) { if (caller->id != ComponentThread::MRNTT) { std::cerr << std::string(__func__) << ": executed on non-mrntt thread: " << caller->name << std::endl; callOriginalCb(false, spec); return; } if (stimBuffApiLib->isBeingDestroyed.load()) { std::cerr << std::string(__func__) + ": Library is being destroyed" << " for API '" << spec->stimBuffApi << "'. Bailing out.\n"; callOriginalCb(false, spec); return; } if (!stimBuffApiLib->stimBuffApiDesc.sal_mgmt_libOps.detachDeviceReq) { std::cerr << std::string(__func__) + ": detachDeviceReq() is NULL " "for library '" << stimBuffApiLib->libraryPath << "'" << std::endl; callOriginalCb(false, spec); return; } releaseQutexEarly(stim_buff::StimBuffApiManager::getInstance().qutex); stimBuffApiLib->stimBuffApiDesc.sal_mgmt_libOps.detachDeviceReq( spec, {context, std::bind( &AttachStimBuffDeviceReq::detachStimBuffDeviceReq2, context.get(), context, std::placeholders::_1, std::placeholders::_2)}); } void detachStimBuffDeviceReq2( [[maybe_unused]] std::shared_ptr context, bool success, std::shared_ptr deviceSpec ) { callOriginalCb(success, deviceSpec); } public: std::shared_ptr spec; std::shared_ptr stimBuffApiLib; }; void DeviceManager::attachStimBuffDeviceReq( const std::shared_ptr& spec, Callback cb ) { const auto& caller = ComponentThread::getSelf(); // Get the stim buff API lib's qutex auto libOpt = stim_buff::StimBuffApiManager::getInstance() .getStimBuffApiLibByApiName(spec->stimBuffApi); if (!libOpt) { std::cerr << "attachStimBuffDeviceReq: No library found for API '" << spec->stimBuffApi << "'" << std::endl; cb.callbackFn(false, spec); return; } auto& lib = *libOpt.value(); auto request = std::make_shared( spec, caller, cb, libOpt.value(), LockSet::Set{ std::ref(stim_buff::StimBuffApiManager::getInstance().qutex), std::ref(lib.qutex) }); AttachStimBuffDeviceReq::LockerAndInvoker lockvoker( *request, mrntt::mrntt.thread, std::bind( &AttachStimBuffDeviceReq::attachStimBuffDeviceReq1_posted, request.get(), request)); } void DeviceManager::detachStimBuffDeviceReq( const std::shared_ptr& spec, Callback cb ) { const auto& caller = ComponentThread::getSelf(); // Get the stim buff API lib's qutex auto libOpt = stim_buff::StimBuffApiManager::getInstance() .getStimBuffApiLibByApiName(spec->stimBuffApi); if (!libOpt) { std::cerr << "detachStimBuffDeviceReq: No library found for API '" << spec->stimBuffApi << "'" << std::endl; cb.callbackFn(false, spec); return; } auto& lib = *libOpt.value(); auto request = std::make_shared( spec, caller, cb, libOpt.value(), LockSet::Set{ std::ref(stim_buff::StimBuffApiManager::getInstance().qutex), std::ref(lib.qutex) }); DetachStimBuffDeviceReq::LockerAndInvoker lockvoker( *request, mrntt::mrntt.thread, std::bind( &DetachStimBuffDeviceReq::detachStimBuffDeviceReq1_posted, request.get(), request)); } class DeviceManager::AttachAllUnattachedDevicesFromReq : public PostedAsynchronousContinuation< attachAllUnattachedDevicesFromReqCbFn> { public: AttachAllUnattachedDevicesFromReq( const unsigned int totalNSpecs, const std::shared_ptr>& specs, const std::shared_ptr& caller, Callback cb) : PostedAsynchronousContinuation( caller, cb), loop(totalNSpecs), specs(specs) {} public: void attachAllUnattachedDevicesFromReq1_posted( [[maybe_unused]] std::shared_ptr context ) { for (const auto& spec : *specs) { DeviceManager::getInstance().newDeviceAttachmentSpecInd( spec, {context, std::bind( &AttachAllUnattachedDevicesFromReq ::attachAllUnattachedDevicesFromReq2, context.get(), context, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)}); } } // Callback methods for the attachment sequence void attachAllUnattachedDevicesFromReq2( std::shared_ptr context, bool success, [[maybe_unused]] std::shared_ptr deviceRole, std::shared_ptr spec ) { if (!success) { std::cerr << __func__ << ": Failed to attach device: " << spec->deviceIdentifier << "\n"; // Fallthrough. } if (!context->loop.incrementSuccessOrFailureAndTestForCompletionDueTo( success)) { return; } if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": " << context->loop.nSucceeded.load() << " devices attached, " << context->loop.nFailed.load() << " devices failed\n"; } context->callOriginalCb(loop); } public: AsynchronousLoop loop; std::shared_ptr> specs; }; void DeviceManager::attachAllUnattachedDevicesFromReq( const std::shared_ptr> &specs, Callback cb ) { if (specs->size() == 0) { AsynchronousLoop tmp(0); cb.callbackFn(tmp); return; } const auto& caller = ComponentThread::getSelf(); auto request = std::make_shared( specs->size(), specs, caller, std::move(cb)); mrntt::mrntt.thread->getIoService().post( std::bind( &AttachAllUnattachedDevicesFromReq ::attachAllUnattachedDevicesFromReq1_posted, request.get(), request)); } void DeviceManager::attachAllUnattachedDevicesFromCmdlineReq( Callback cb ) { auto specs = std::make_shared>( commandLineDASpecs); attachAllUnattachedDevicesFromReq(specs, std::move(cb)); } class DeviceManager::AttachAllUnattachedDevicesFromKnownListReq : public SerializedAsynchronousContinuation< attachAllUnattachedDevicesFromReqCbFn> { public: AttachAllUnattachedDevicesFromKnownListReq( const std::shared_ptr &caller, Callback cb, std::vector> requiredLocks) : SerializedAsynchronousContinuation< attachAllUnattachedDevicesFromReqCbFn>( caller, cb, requiredLocks) {} public: void attachAllUnattachedDevicesFromKnownListReq1_posted( [[maybe_unused]] std::shared_ptr context ) { // Create a vector to hold unattached device specs auto unattachedSpecs = std::make_shared< std::vector>(); // Cycle through all DA specs in deviceAttachmentSpecs for (const auto& spec : DeviceManager::deviceAttachmentSpecs) { bool isAttached = false; // Cross reference with attachedDeviceRoles for (const auto& role : DeviceManager::attachedDeviceRoles) { if (*role->deviceAttachmentSpec == *spec) { isAttached = true; break; } } // If spec doesn't appear in attachedDeviceRoles, add it to vector if (!isAttached) { unattachedSpecs->push_back(*spec); } } // Release the DeviceManager qutex early before calling the inner method releaseQutexEarly(DeviceManager::getInstance().qutex); // Pass the vector to the existing function DeviceManager::getInstance().attachAllUnattachedDevicesFromReq( unattachedSpecs, {context, std::bind( &AttachAllUnattachedDevicesFromKnownListReq ::attachAllUnattachedDevicesFromKnownListReq2, context.get(), context, std::placeholders::_1)}); } void attachAllUnattachedDevicesFromKnownListReq2( [[maybe_unused]] std::shared_ptr context, AsynchronousLoop loop ) { callOriginalCb(loop); } }; void DeviceManager::attachAllUnattachedDevicesFromKnownListReq( Callback cb ) { const auto& caller = ComponentThread::getSelf(); auto request = std::make_shared( caller, cb, LockSet::Set{ std::ref(DeviceManager::getInstance().qutex) }); AttachAllUnattachedDevicesFromKnownListReq::LockerAndInvoker lockvoker( *request, mrntt::mrntt.thread, std::bind( &AttachAllUnattachedDevicesFromKnownListReq ::attachAllUnattachedDevicesFromKnownListReq1_posted, request.get(), request)); } class DeviceManager::DetachAllAttachedDeviceRoles : public PostedAsynchronousContinuation< detachAllAttachedDeviceRolesCbFn> { public: DetachAllAttachedDeviceRoles( const unsigned int totalNSpecs, const std::shared_ptr& caller, Callback cb) : PostedAsynchronousContinuation( caller, cb), loop(totalNSpecs) {} void detachAllAttachedDeviceRoles1_posted( [[maybe_unused]] std::shared_ptr context ) { for (const auto& deviceRole : DeviceManager::attachedDeviceRoles) { DeviceManager::getInstance().detachStimBuffDeviceReq( deviceRole->deviceAttachmentSpec, {context, std::bind( &DetachAllAttachedDeviceRoles::detachAllAttachedDeviceRoles2, context.get(), context, std::placeholders::_1, std::placeholders::_2)}); } } void detachAllAttachedDeviceRoles2( std::shared_ptr context, bool success, std::shared_ptr spec ) { if (!success) { std::cerr << __func__ << ": Failed to detach device: " << spec->deviceIdentifier << "\n"; // Fallthrough. } if (!context->loop.incrementSuccessOrFailureAndTestForCompletionDueTo( success)) { return; } if (OptionParser::getOptions().verbose) { std::cout << __func__ << ": " << context->loop.nSucceeded.load() << " devices detached, " << context->loop.nFailed.load() << " devices failed\n"; } context->callOriginalCb(loop); } public: AsynchronousLoop loop; }; void DeviceManager::detachAllAttachedDeviceRoles( Callback cb ) { if (DeviceManager::getInstance().attachedDeviceRoles.size() == 0) { AsynchronousLoop tmp(0); cb.callbackFn(tmp); return; } const auto& caller = ComponentThread::getSelf(); auto request = std::make_shared( DeviceManager::getInstance().attachedDeviceRoles.size(), caller, std::move(cb)); mrntt::mrntt.thread->getIoService().post( std::bind( &DetachAllAttachedDeviceRoles::detachAllAttachedDeviceRoles1_posted, request.get(), request)); } void DeviceManager::initializeDeviceReattacher() { deviceReattacher = std::make_unique( *this, mrntt::mrntt.thread); deviceReattacher->start(); } void DeviceManager::finalizeDeviceReattacher() { if (!deviceReattacher) { return; } deviceReattacher->stop(); deviceReattacher.reset(); } } // namespace device } // namespace smo