Files
salmanoff/smocore/deviceManager/deviceManager.cpp
T
hayodea cde2737876 Libspinscale: Initial top-level SMO port to coroutine framework
We haven't ported everything. Just the top-level methods. We'll
dig in to the leaf stuff later. Surprisingly, this all went without
any real difficulties.

Runs like a charm on first try.
2026-05-24 23:26:18 -04:00

530 lines
14 KiB
C++

#include <algorithm>
#include <iostream>
#include <fstream>
#include <stdexcept>
#include <string>
#include <vector>
#include <sstream>
#include <memory>
#include <opts.h>
#include <componentThread.h>
#include <cpsBoundary/stimBuffDeviceAReq.h>
#include <deviceManager/deviceManager.h>
#include <deviceManager/deviceReattacher.h>
#include <stimBuffApis/stimBuffApiManager.h>
#include <marionette/marionette.h>
#include <marionette/marionetteThread.h>
#include <mind.h>
#include <spinscale/co/group.h>
namespace smo {
namespace device {
namespace {
void assertMarionetteThread()
{
auto self = sscl::ComponentThread::getSelf();
if (self->id != SmoThreadId::MRNTT)
{
throw std::runtime_error(
std::string(__func__)
+ ": Must be executed on Marionette thread");
}
}
boost::asio::io_service &marionetteIoService()
{
return mrntt::MrnttThreadTag::io_service();
}
} // namespace
DeviceManager::~DeviceManager()
{
}
const std::string DeviceManager::stringifyDeviceSpecs(void)
{
std::ostringstream oss;
for (const auto& spec : getInstance().s.rsrc.deviceAttachmentSpecs) {
oss << "Device Attachment Spec: " << spec->stringify();
}
return oss.str();
}
mrntt::MrnttViralPostingInvoker<cpsBoundary::StimBuffDeviceOpResult>
DeviceManager::attachStimBuffDeviceCReq(
const std::shared_ptr<DeviceAttachmentSpec>& spec)
{
assertMarionetteThread();
auto &sbam = stim_buff::StimBuffApiManager::getInstance();
auto libOpt = sbam.getStimBuffApiLibByApiName(spec->stimBuffApi);
if (!libOpt)
{
std::cerr << "attachStimBuffDeviceCReq: No library found for API '"
<< spec->stimBuffApi << "'" << std::endl;
co_return cpsBoundary::StimBuffDeviceOpResult{false, spec};
}
auto &lib = *libOpt.value();
if (lib.isBeingDestroyed.load())
{
std::cerr << std::string(__func__) + ": Library is being destroyed"
<< " for API '" << spec->stimBuffApi << "'. Bailing out.\n";
co_return cpsBoundary::StimBuffDeviceOpResult{false, spec};
}
if (!lib.stimBuffApiDesc.sal_mgmt_libOps.attachDeviceReq)
{
std::cerr << std::string(__func__) + ": attachDeviceReq() is NULL "
"for library '" << lib.libraryPath << "'"
<< std::endl;
co_return cpsBoundary::StimBuffDeviceOpResult{false, spec};
}
sscl::co::CoQutex::ReleaseHandle sbamGuard =
co_await sbam.s.lock.getAcquireInvocationAndSuspensionPolicy();
sscl::co::CoQutex::ReleaseHandle libGuard =
co_await lib.s.lock.getAcquireInvocationAndSuspensionPolicy();
sbamGuard.release();
/** EXPLANATION:
* We pass in either the body or world thread here, depending on whether
* the device is an introspector (idev) or extrospector (edev).
*
* Introspectors are attached to the body thread; extrospectors are
* attached to the world thread.
*/
std::shared_ptr<sscl::ComponentThread> threadForAttachment;
if (spec->sensorType == 'e')
{
threadForAttachment = mind::globalMind->world.thread;
std::cout << __func__ << ": Attaching edev "
<< spec->deviceIdentifier << " to world thread" << "\n";
}
else
{
threadForAttachment = mind::globalMind->body.thread;
std::cout << __func__ << ": Attaching non-edev "
<< spec->deviceIdentifier << " to body thread" << "\n";
}
cpsBoundary::StimBuffDeviceOpResult result = co_await
cpsBoundary::AttachStimBuffDeviceAReq(
spec, lib, threadForAttachment, marionetteIoService());
co_return result;
}
mrntt::MrnttViralPostingInvoker<cpsBoundary::StimBuffDeviceOpResult>
DeviceManager::detachStimBuffDeviceCReq(
const std::shared_ptr<DeviceAttachmentSpec>& spec)
{
assertMarionetteThread();
auto &sbam = stim_buff::StimBuffApiManager::getInstance();
auto libOpt = sbam.getStimBuffApiLibByApiName(spec->stimBuffApi);
if (!libOpt)
{
std::cerr << "detachStimBuffDeviceCReq: No library found for API '"
<< spec->stimBuffApi << "'" << std::endl;
co_return cpsBoundary::StimBuffDeviceOpResult{false, spec};
}
auto &lib = *libOpt.value();
if (lib.isBeingDestroyed.load())
{
std::cerr << std::string(__func__) + ": Library is being destroyed"
<< " for API '" << spec->stimBuffApi << "'. Bailing out.\n";
co_return cpsBoundary::StimBuffDeviceOpResult{false, spec};
}
if (!lib.stimBuffApiDesc.sal_mgmt_libOps.detachDeviceReq)
{
std::cerr << std::string(__func__) + ": detachDeviceReq() is NULL "
"for library '" << lib.libraryPath << "'"
<< std::endl;
co_return cpsBoundary::StimBuffDeviceOpResult{false, spec};
}
sscl::co::CoQutex::ReleaseHandle sbamGuard =
co_await sbam.s.lock.getAcquireInvocationAndSuspensionPolicy();
sscl::co::CoQutex::ReleaseHandle libGuard =
co_await lib.s.lock.getAcquireInvocationAndSuspensionPolicy();
sbamGuard.release();
cpsBoundary::StimBuffDeviceOpResult result = co_await
cpsBoundary::DetachStimBuffDeviceAReq(
spec, lib, marionetteIoService());
co_return result;
}
mrntt::MrnttViralPostingInvoker<DeviceManager::DeviceAttachmentIndResult>
DeviceManager::newDeviceAttachmentSpecIndCReq(
const DeviceAttachmentSpec &spec)
{
assertMarionetteThread();
DeviceManager &dm = getInstance();
sscl::co::CoQutex::ReleaseHandle dmGuard =
co_await dm.s.lock.getAcquireInvocationAndSuspensionPolicy();
// First, add the spec to deviceAttachmentSpecs if it's not already there
std::shared_ptr<DeviceAttachmentSpec> specPtr;
bool specExists = false;
for (const auto& existingSpec : dm.s.rsrc.deviceAttachmentSpecs)
{
if (*existingSpec == spec)
{
specExists = true;
specPtr = existingSpec;
break;
}
}
if (!specExists)
{
specPtr = std::make_shared<DeviceAttachmentSpec>(spec);
dm.s.rsrc.deviceAttachmentSpecs.push_back(specPtr);
}
std::shared_ptr<Device> device;
bool deviceExists = false;
for (const auto& existingDevice : dm.s.rsrc.devices)
{
if (existingDevice->deviceIdentifier != spec.deviceIdentifier)
{ continue; }
device = existingDevice;
deviceExists = true;
break;
}
// If device doesn't exist, create a new one and add it
if (!device)
{
device = std::make_shared<Device>(spec.deviceIdentifier);
dm.s.rsrc.devices.push_back(device);
}
// Check if a DeviceRole w/ this spec already exists in attachedDeviceRoles
std::shared_ptr<DeviceRole> existingDeviceRole;
bool deviceRoleExists = false;
for (const auto& role : dm.s.rsrc.attachedDeviceRoles)
{
if (*role->deviceAttachmentSpec == spec)
{
deviceRoleExists = true;
existingDeviceRole = role;
break;
}
}
// If DeviceRole exists, both spec and device must also exist
if (deviceRoleExists)
{
if (!specExists || !deviceExists)
{
throw std::runtime_error(
"Program error: DeviceRole exists but spec or device doesn't "
"pre-exist. specExists=" + std::to_string(specExists) +
", deviceExists=" + std::to_string(deviceExists));
}
// Already attached, return success
co_return DeviceAttachmentIndResult{
true, existingDeviceRole, specPtr};
}
cpsBoundary::StimBuffDeviceOpResult attachResult =
co_await attachStimBuffDeviceCReq(specPtr);
if (!attachResult.success)
{
std::cerr << __func__ << ": Attach failed for device spec "
<< attachResult.deviceSpec->stringify() << std::endl;
co_return DeviceAttachmentIndResult{
false, nullptr, attachResult.deviceSpec};
}
try {
// Create DeviceRole and add it to both DeviceManager's and Device's collections
auto deviceRole = std::make_shared<DeviceRole>(*device, specPtr);
device->deviceRoles.push_back(deviceRole);
dm.s.rsrc.attachedDeviceRoles.push_back(deviceRole);
co_return DeviceAttachmentIndResult{true, deviceRole, specPtr};
} catch (const std::exception&) {
// Attach failed, return error
co_return DeviceAttachmentIndResult{false, nullptr, specPtr};
}
}
mrntt::MrnttViralPostingInvoker<DeviceManager::DeviceAttachmentIndResult>
DeviceManager::removeDeviceAttachmentSpecCReq(
const DeviceAttachmentSpec &spec)
{
assertMarionetteThread();
DeviceManager &dm = getInstance();
sscl::co::CoQutex::ReleaseHandle dmGuard =
co_await dm.s.lock.getAcquireInvocationAndSuspensionPolicy();
// Find the shared_ptr to the spec in the collection
std::shared_ptr<DeviceAttachmentSpec> specPtr;
for (const auto& existingSpec : dm.s.rsrc.deviceAttachmentSpecs)
{
if (*existingSpec == spec)
{
specPtr = existingSpec;
break;
}
}
if (!specPtr)
{
// Spec not found, return failure
co_return DeviceAttachmentIndResult{false, nullptr, nullptr};
}
// Call detachStimBuffDeviceCReq first - only clean up metadata if this succeeds
cpsBoundary::StimBuffDeviceOpResult detachResult =
co_await detachStimBuffDeviceCReq(specPtr);
if (!detachResult.success)
{
// Detach failed, metadata remains intact
co_return DeviceAttachmentIndResult{
false, nullptr, detachResult.deviceSpec};
}
// Detach succeeded, now find and clean up metadata
try {
// Find the DeviceRole in attachedDeviceRoles
auto deviceRoleIt = std::find_if(
dm.s.rsrc.attachedDeviceRoles.begin(),
dm.s.rsrc.attachedDeviceRoles.end(),
[&specPtr](const std::shared_ptr<DeviceRole> &role) {
return *role->deviceAttachmentSpec == *specPtr;
}
);
if (deviceRoleIt == dm.s.rsrc.attachedDeviceRoles.end())
{
// DeviceRole not found, return failure
co_return DeviceAttachmentIndResult{
false, nullptr, detachResult.deviceSpec};
}
auto deviceRole = *deviceRoleIt;
auto& device = deviceRole->parentDevice;
// Remove DeviceRole from DeviceManager's collection
dm.s.rsrc.attachedDeviceRoles.erase(deviceRoleIt);
// Remove DeviceRole from Device's collection
auto deviceRoleIt2 = std::find(
device.deviceRoles.begin(),
device.deviceRoles.end(),
deviceRole);
if (deviceRoleIt2 != device.deviceRoles.end())
{
device.deviceRoles.erase(deviceRoleIt2);
}
// Remove DeviceAttachmentSpec from deviceAttachmentSpecs collection
auto specIt = std::find_if(
dm.s.rsrc.deviceAttachmentSpecs.begin(),
dm.s.rsrc.deviceAttachmentSpecs.end(),
[&specPtr](
const std::shared_ptr<DeviceAttachmentSpec> &existingSpec)
{
return *existingSpec == *specPtr;
}
);
if (specIt != dm.s.rsrc.deviceAttachmentSpecs.end())
{
dm.s.rsrc.deviceAttachmentSpecs.erase(specIt);
}
co_return DeviceAttachmentIndResult{
true, deviceRole, detachResult.deviceSpec};
} catch (const std::exception&) {
// Cleanup failed, return error
co_return DeviceAttachmentIndResult{
false, nullptr, detachResult.deviceSpec};
}
}
mrntt::MrnttViralPostingInvoker<sscl::MultiOperationResultSet>
DeviceManager::attachAllUnattachedDevicesFromCReq(
const std::shared_ptr<std::vector<DeviceAttachmentSpec>> &specs)
{
assertMarionetteThread();
if (specs->empty()) {
co_return sscl::MultiOperationResultSet{};
}
sscl::co::Group group;
std::vector<
mrntt::MrnttViralPostingInvoker<DeviceManager::DeviceAttachmentIndResult>>
invokers;
invokers.reserve(specs->size());
for (const auto &spec : *specs)
{
invokers.emplace_back(newDeviceAttachmentSpecIndCReq(spec));
group.add(invokers.back());
}
co_await group.getAwaitAllSettlementsInvoker();
group.checkForAndReThrowGroupExceptions();
unsigned int nSucceeded = 0;
unsigned int nFailed = 0;
for (auto &invoker : invokers)
{
if (invoker.completedReturnValues().myReturnValue.success) {
nSucceeded++;
} else {
nFailed++;
}
}
if (OptionParser::getOptions().verbose)
{
std::cout << __func__ << ": " << nSucceeded
<< " devices attached, "
<< nFailed << " devices failed\n";
}
co_return sscl::MultiOperationResultSet(
static_cast<unsigned int>(specs->size()), nSucceeded, nFailed);
}
mrntt::MrnttViralPostingInvoker<sscl::MultiOperationResultSet>
DeviceManager::attachAllUnattachedDevicesFromKnownListCReq()
{
assertMarionetteThread();
DeviceManager &dm = getInstance();
sscl::co::CoQutex::ReleaseHandle dmGuard =
co_await dm.s.lock.getAcquireInvocationAndSuspensionPolicy();
auto unattachedSpecs = std::make_shared<
std::vector<DeviceAttachmentSpec>>();
for (const auto& spec : dm.s.rsrc.deviceAttachmentSpecs)
{
bool isAttached = false;
for (const auto& role : dm.s.rsrc.attachedDeviceRoles)
{
if (*role->deviceAttachmentSpec == *spec)
{
isAttached = true;
break;
}
}
if (!isAttached) {
unattachedSpecs->push_back(*spec);
}
}
dmGuard.release();
co_return co_await attachAllUnattachedDevicesFromCReq(unattachedSpecs);
}
mrntt::MrnttViralPostingInvoker<sscl::MultiOperationResultSet>
DeviceManager::attachAllUnattachedDevicesFromCmdlineCReq()
{
auto specs = std::make_shared<std::vector<DeviceAttachmentSpec>>(
getInstance().s.rsrc.commandLineDASpecs);
co_return co_await attachAllUnattachedDevicesFromCReq(specs);
}
mrntt::MrnttViralPostingInvoker<sscl::MultiOperationResultSet>
DeviceManager::detachAllAttachedDeviceRolesCReq()
{
assertMarionetteThread();
std::vector<std::shared_ptr<DeviceAttachmentSpec>> specsToDetach;
specsToDetach.reserve(getInstance().s.rsrc.attachedDeviceRoles.size());
for (const auto& deviceRole : getInstance().s.rsrc.attachedDeviceRoles)
{
specsToDetach.push_back(deviceRole->deviceAttachmentSpec);
}
if (specsToDetach.empty()) {
co_return sscl::MultiOperationResultSet{};
}
sscl::co::Group group;
std::vector<
mrntt::MrnttViralPostingInvoker<cpsBoundary::StimBuffDeviceOpResult>>
invokers;
invokers.reserve(specsToDetach.size());
for (const auto &spec : specsToDetach)
{
invokers.emplace_back(detachStimBuffDeviceCReq(spec));
group.add(invokers.back());
}
co_await group.getAwaitAllSettlementsInvoker();
group.checkForAndReThrowGroupExceptions();
unsigned int nSucceeded = 0;
unsigned int nFailed = 0;
for (auto &invoker : invokers)
{
if (invoker.completedReturnValues().myReturnValue.success) {
nSucceeded++;
} else {
nFailed++;
}
}
if (OptionParser::getOptions().verbose)
{
std::cout << __func__ << ": " << nSucceeded
<< " devices detached, "
<< nFailed << " devices failed\n";
}
co_return sscl::MultiOperationResultSet(
static_cast<unsigned int>(specsToDetach.size()),
nSucceeded, nFailed);
}
void DeviceManager::initializeDeviceReattacher()
{
deviceReattacher = std::make_unique<DeviceReattacher>(
*this, mrntt::mrntt.thread);
deviceReattacher->start();
}
void DeviceManager::finalizeDeviceReattacher()
{
if (!deviceReattacher) { return; }
deviceReattacher->stop();
deviceReattacher.reset();
}
} // namespace device
} // namespace smo