Files
salmanoff/smocore/deviceManager/deviceManager.cpp
T
hayodea 3e19d39853 SenseApiDesc,xcbWindow: port to sscl coro framework
SenseApiDesc's exported API now uses coro pointers instead of
CPS fn pointers.
* Do not build this version of SMO with the Livox drivers enabled,
  because SMO has been changed at the smocore level to use coros
  when calling into stimbuffAPI libs. But the Livox drivers
  haven't yet been ported from CPS to coros.

xcbWindow has been ported to expose coros to SMO in its
senseApiDesc exported iface.
2026-05-25 08:58:36 -04:00

518 lines
14 KiB
C++

#include <algorithm>
#include <iostream>
#include <fstream>
#include <stdexcept>
#include <string>
#include <vector>
#include <sstream>
#include <memory>
#include <opts.h>
#include <componentThread.h>
#include <deviceManager/deviceManager.h>
#include <deviceManager/deviceReattacher.h>
#include <stimBuffApis/stimBuffApiManager.h>
#include <marionette/marionette.h>
#include <marionette/marionetteThread.h>
#include <mind.h>
#include <spinscale/co/group.h>
namespace smo {
namespace device {
namespace {
void assertMarionetteThread()
{
auto self = sscl::ComponentThread::getSelf();
if (self->id != SmoThreadId::MRNTT)
{
throw std::runtime_error(
std::string(__func__)
+ ": Must be executed on Marionette thread");
}
}
} // namespace
DeviceManager::~DeviceManager()
{
}
const std::string DeviceManager::stringifyDeviceSpecs(void)
{
std::ostringstream oss;
for (const auto& spec : getInstance().s.rsrc.deviceAttachmentSpecs) {
oss << "Device Attachment Spec: " << spec->stringify();
}
return oss.str();
}
mrntt::MrnttViralPostingInvoker<stim_buff::StimBuffDeviceOpResult>
DeviceManager::attachStimBuffDeviceCReq(
const std::shared_ptr<DeviceAttachmentSpec>& spec)
{
assertMarionetteThread();
auto &sbam = stim_buff::StimBuffApiManager::getInstance();
auto libOpt = sbam.getStimBuffApiLibByApiName(spec->stimBuffApi);
if (!libOpt)
{
std::cerr << "attachStimBuffDeviceCReq: No library found for API '"
<< spec->stimBuffApi << "'" << std::endl;
co_return stim_buff::StimBuffDeviceOpResult{false, spec};
}
auto &lib = *libOpt.value();
if (lib.isBeingDestroyed.load())
{
std::cerr << std::string(__func__) + ": Library is being destroyed"
<< " for API '" << spec->stimBuffApi << "'. Bailing out.\n";
co_return stim_buff::StimBuffDeviceOpResult{false, spec};
}
if (!lib.stimBuffApiDesc.sal_mgmt_libOps.attachDeviceCReq)
{
std::cerr << std::string(__func__) + ": attachDeviceCReq() is NULL "
"for library '" << lib.libraryPath << "'"
<< std::endl;
co_return stim_buff::StimBuffDeviceOpResult{false, spec};
}
sscl::co::CoQutex::ReleaseHandle sbamGuard =
co_await sbam.s.lock.getAcquireInvocationAndSuspensionPolicy();
sscl::co::CoQutex::ReleaseHandle libGuard =
co_await lib.s.lock.getAcquireInvocationAndSuspensionPolicy();
sbamGuard.release();
/** EXPLANATION:
* We pass in either the body or world thread here, depending on whether
* the device is an introspector (idev) or extrospector (edev).
*
* Introspectors are attached to the body thread; extrospectors are
* attached to the world thread.
*/
std::shared_ptr<sscl::ComponentThread> threadForAttachment;
if (spec->sensorType == 'e')
{
threadForAttachment = mind::globalMind->world.thread;
std::cout << __func__ << ": Attaching edev "
<< spec->deviceIdentifier << " to world thread" << "\n";
}
else
{
threadForAttachment = mind::globalMind->body.thread;
std::cout << __func__ << ": Attaching non-edev "
<< spec->deviceIdentifier << " to body thread" << "\n";
}
co_return co_await lib.stimBuffApiDesc.sal_mgmt_libOps.attachDeviceCReq(
spec, threadForAttachment);
}
mrntt::MrnttViralPostingInvoker<stim_buff::StimBuffDeviceOpResult>
DeviceManager::detachStimBuffDeviceCReq(
const std::shared_ptr<DeviceAttachmentSpec>& spec)
{
assertMarionetteThread();
auto &sbam = stim_buff::StimBuffApiManager::getInstance();
auto libOpt = sbam.getStimBuffApiLibByApiName(spec->stimBuffApi);
if (!libOpt)
{
std::cerr << "detachStimBuffDeviceCReq: No library found for API '"
<< spec->stimBuffApi << "'" << std::endl;
co_return stim_buff::StimBuffDeviceOpResult{false, spec};
}
auto &lib = *libOpt.value();
if (lib.isBeingDestroyed.load())
{
std::cerr << std::string(__func__) + ": Library is being destroyed"
<< " for API '" << spec->stimBuffApi << "'. Bailing out.\n";
co_return stim_buff::StimBuffDeviceOpResult{false, spec};
}
if (!lib.stimBuffApiDesc.sal_mgmt_libOps.detachDeviceCReq)
{
std::cerr << std::string(__func__) + ": detachDeviceCReq() is NULL "
"for library '" << lib.libraryPath << "'"
<< std::endl;
co_return stim_buff::StimBuffDeviceOpResult{false, spec};
}
sscl::co::CoQutex::ReleaseHandle sbamGuard =
co_await sbam.s.lock.getAcquireInvocationAndSuspensionPolicy();
sscl::co::CoQutex::ReleaseHandle libGuard =
co_await lib.s.lock.getAcquireInvocationAndSuspensionPolicy();
sbamGuard.release();
co_return co_await lib.stimBuffApiDesc.sal_mgmt_libOps.detachDeviceCReq(
spec);
}
mrntt::MrnttViralPostingInvoker<DeviceManager::DeviceAttachmentIndResult>
DeviceManager::newDeviceAttachmentSpecIndCReq(
const DeviceAttachmentSpec &spec)
{
assertMarionetteThread();
DeviceManager &dm = getInstance();
sscl::co::CoQutex::ReleaseHandle dmGuard =
co_await dm.s.lock.getAcquireInvocationAndSuspensionPolicy();
// First, add the spec to deviceAttachmentSpecs if it's not already there
std::shared_ptr<DeviceAttachmentSpec> specPtr;
bool specExists = false;
for (const auto& existingSpec : dm.s.rsrc.deviceAttachmentSpecs)
{
if (*existingSpec == spec)
{
specExists = true;
specPtr = existingSpec;
break;
}
}
if (!specExists)
{
specPtr = std::make_shared<DeviceAttachmentSpec>(spec);
dm.s.rsrc.deviceAttachmentSpecs.push_back(specPtr);
}
std::shared_ptr<Device> device;
bool deviceExists = false;
for (const auto& existingDevice : dm.s.rsrc.devices)
{
if (existingDevice->deviceIdentifier != spec.deviceIdentifier)
{ continue; }
device = existingDevice;
deviceExists = true;
break;
}
// If device doesn't exist, create a new one and add it
if (!device)
{
device = std::make_shared<Device>(spec.deviceIdentifier);
dm.s.rsrc.devices.push_back(device);
}
// Check if a DeviceRole w/ this spec already exists in attachedDeviceRoles
std::shared_ptr<DeviceRole> existingDeviceRole;
bool deviceRoleExists = false;
for (const auto& role : dm.s.rsrc.attachedDeviceRoles)
{
if (*role->deviceAttachmentSpec == spec)
{
deviceRoleExists = true;
existingDeviceRole = role;
break;
}
}
// If DeviceRole exists, both spec and device must also exist
if (deviceRoleExists)
{
if (!specExists || !deviceExists)
{
throw std::runtime_error(
"Program error: DeviceRole exists but spec or device doesn't "
"pre-exist. specExists=" + std::to_string(specExists) +
", deviceExists=" + std::to_string(deviceExists));
}
// Already attached, return success
co_return DeviceAttachmentIndResult{
true, existingDeviceRole, specPtr};
}
stim_buff::StimBuffDeviceOpResult attachResult =
co_await attachStimBuffDeviceCReq(specPtr);
if (!attachResult.success)
{
std::cerr << __func__ << ": Attach failed for device spec "
<< attachResult.deviceSpec->stringify() << std::endl;
co_return DeviceAttachmentIndResult{
false, nullptr, attachResult.deviceSpec};
}
try {
// Create DeviceRole and add it to both DeviceManager's and Device's collections
auto deviceRole = std::make_shared<DeviceRole>(*device, specPtr);
device->deviceRoles.push_back(deviceRole);
dm.s.rsrc.attachedDeviceRoles.push_back(deviceRole);
co_return DeviceAttachmentIndResult{true, deviceRole, specPtr};
} catch (const std::exception&) {
// Attach failed, return error
co_return DeviceAttachmentIndResult{false, nullptr, specPtr};
}
}
mrntt::MrnttViralPostingInvoker<DeviceManager::DeviceAttachmentIndResult>
DeviceManager::removeDeviceAttachmentSpecCReq(
const DeviceAttachmentSpec &spec)
{
assertMarionetteThread();
DeviceManager &dm = getInstance();
sscl::co::CoQutex::ReleaseHandle dmGuard =
co_await dm.s.lock.getAcquireInvocationAndSuspensionPolicy();
// Find the shared_ptr to the spec in the collection
std::shared_ptr<DeviceAttachmentSpec> specPtr;
for (const auto& existingSpec : dm.s.rsrc.deviceAttachmentSpecs)
{
if (*existingSpec == spec)
{
specPtr = existingSpec;
break;
}
}
if (!specPtr)
{
// Spec not found, return failure
co_return DeviceAttachmentIndResult{false, nullptr, nullptr};
}
// Call detachStimBuffDeviceCReq first - only clean up metadata if this succeeds
stim_buff::StimBuffDeviceOpResult detachResult =
co_await detachStimBuffDeviceCReq(specPtr);
if (!detachResult.success)
{
// Detach failed, metadata remains intact
co_return DeviceAttachmentIndResult{
false, nullptr, detachResult.deviceSpec};
}
// Detach succeeded, now find and clean up metadata
try {
// Find the DeviceRole in attachedDeviceRoles
auto deviceRoleIt = std::find_if(
dm.s.rsrc.attachedDeviceRoles.begin(),
dm.s.rsrc.attachedDeviceRoles.end(),
[&specPtr](const std::shared_ptr<DeviceRole> &role) {
return *role->deviceAttachmentSpec == *specPtr;
}
);
if (deviceRoleIt == dm.s.rsrc.attachedDeviceRoles.end())
{
// DeviceRole not found, return failure
co_return DeviceAttachmentIndResult{
false, nullptr, detachResult.deviceSpec};
}
auto deviceRole = *deviceRoleIt;
auto& device = deviceRole->parentDevice;
// Remove DeviceRole from DeviceManager's collection
dm.s.rsrc.attachedDeviceRoles.erase(deviceRoleIt);
// Remove DeviceRole from Device's collection
auto deviceRoleIt2 = std::find(
device.deviceRoles.begin(),
device.deviceRoles.end(),
deviceRole);
if (deviceRoleIt2 != device.deviceRoles.end())
{
device.deviceRoles.erase(deviceRoleIt2);
}
// Remove DeviceAttachmentSpec from deviceAttachmentSpecs collection
auto specIt = std::find_if(
dm.s.rsrc.deviceAttachmentSpecs.begin(),
dm.s.rsrc.deviceAttachmentSpecs.end(),
[&specPtr](
const std::shared_ptr<DeviceAttachmentSpec> &existingSpec)
{
return *existingSpec == *specPtr;
}
);
if (specIt != dm.s.rsrc.deviceAttachmentSpecs.end())
{
dm.s.rsrc.deviceAttachmentSpecs.erase(specIt);
}
co_return DeviceAttachmentIndResult{
true, deviceRole, detachResult.deviceSpec};
} catch (const std::exception&) {
// Cleanup failed, return error
co_return DeviceAttachmentIndResult{
false, nullptr, detachResult.deviceSpec};
}
}
mrntt::MrnttViralPostingInvoker<sscl::MultiOperationResultSet>
DeviceManager::attachAllUnattachedDevicesFromCReq(
const std::shared_ptr<std::vector<DeviceAttachmentSpec>> &specs)
{
assertMarionetteThread();
if (specs->empty()) {
co_return sscl::MultiOperationResultSet{};
}
sscl::co::Group group;
std::vector<
mrntt::MrnttViralPostingInvoker<DeviceManager::DeviceAttachmentIndResult>>
invokers;
invokers.reserve(specs->size());
for (const auto &spec : *specs)
{
invokers.emplace_back(newDeviceAttachmentSpecIndCReq(spec));
group.add(invokers.back());
}
co_await group.getAwaitAllSettlementsInvoker();
group.checkForAndReThrowGroupExceptions();
unsigned int nSucceeded = 0;
unsigned int nFailed = 0;
for (auto &invoker : invokers)
{
if (invoker.completedReturnValues().myReturnValue.success) {
nSucceeded++;
} else {
nFailed++;
}
}
if (OptionParser::getOptions().verbose)
{
std::cout << __func__ << ": " << nSucceeded
<< " devices attached, "
<< nFailed << " devices failed\n";
}
co_return sscl::MultiOperationResultSet(
static_cast<unsigned int>(specs->size()), nSucceeded, nFailed);
}
mrntt::MrnttViralPostingInvoker<sscl::MultiOperationResultSet>
DeviceManager::attachAllUnattachedDevicesFromKnownListCReq()
{
assertMarionetteThread();
DeviceManager &dm = getInstance();
sscl::co::CoQutex::ReleaseHandle dmGuard =
co_await dm.s.lock.getAcquireInvocationAndSuspensionPolicy();
auto unattachedSpecs = std::make_shared<
std::vector<DeviceAttachmentSpec>>();
for (const auto& spec : dm.s.rsrc.deviceAttachmentSpecs)
{
bool isAttached = false;
for (const auto& role : dm.s.rsrc.attachedDeviceRoles)
{
if (*role->deviceAttachmentSpec == *spec)
{
isAttached = true;
break;
}
}
if (!isAttached) {
unattachedSpecs->push_back(*spec);
}
}
dmGuard.release();
co_return co_await attachAllUnattachedDevicesFromCReq(unattachedSpecs);
}
mrntt::MrnttViralPostingInvoker<sscl::MultiOperationResultSet>
DeviceManager::attachAllUnattachedDevicesFromCmdlineCReq()
{
auto specs = std::make_shared<std::vector<DeviceAttachmentSpec>>(
getInstance().s.rsrc.commandLineDASpecs);
co_return co_await attachAllUnattachedDevicesFromCReq(specs);
}
mrntt::MrnttViralPostingInvoker<sscl::MultiOperationResultSet>
DeviceManager::detachAllAttachedDeviceRolesCReq()
{
assertMarionetteThread();
std::vector<std::shared_ptr<DeviceAttachmentSpec>> specsToDetach;
specsToDetach.reserve(getInstance().s.rsrc.attachedDeviceRoles.size());
for (const auto& deviceRole : getInstance().s.rsrc.attachedDeviceRoles)
{
specsToDetach.push_back(deviceRole->deviceAttachmentSpec);
}
if (specsToDetach.empty()) {
co_return sscl::MultiOperationResultSet{};
}
sscl::co::Group group;
std::vector<
mrntt::MrnttViralPostingInvoker<stim_buff::StimBuffDeviceOpResult>>
invokers;
invokers.reserve(specsToDetach.size());
for (const auto &spec : specsToDetach)
{
invokers.emplace_back(detachStimBuffDeviceCReq(spec));
group.add(invokers.back());
}
co_await group.getAwaitAllSettlementsInvoker();
group.checkForAndReThrowGroupExceptions();
unsigned int nSucceeded = 0;
unsigned int nFailed = 0;
for (auto &invoker : invokers)
{
if (invoker.completedReturnValues().myReturnValue.success) {
nSucceeded++;
} else {
nFailed++;
}
}
if (OptionParser::getOptions().verbose)
{
std::cout << __func__ << ": " << nSucceeded
<< " devices detached, "
<< nFailed << " devices failed\n";
}
co_return sscl::MultiOperationResultSet(
static_cast<unsigned int>(specsToDetach.size()),
nSucceeded, nFailed);
}
void DeviceManager::initializeDeviceReattacher()
{
deviceReattacher = std::make_unique<DeviceReattacher>(
*this, mrntt::mrntt.thread);
deviceReattacher->start();
}
void DeviceManager::finalizeDeviceReattacher()
{
if (!deviceReattacher) { return; }
deviceReattacher->stop();
deviceReattacher.reset();
}
} // namespace device
} // namespace smo