Exceptions: All of smocore likely now uses exceptions

This commit is contained in:
2026-06-07 19:37:50 -04:00
parent 241e8a6798
commit b2644f17c6
8 changed files with 304 additions and 242 deletions
+3 -29
View File
@@ -64,21 +64,8 @@ BodyViralPostingInvoker<void> Body::initializeCReq()
<< '\n'; << '\n';
} }
sscl::MultiOperationResultSet attachResults = co_await co_await device::DeviceManager::getInstance()
device::DeviceManager::getInstance()
.attachAllUnattachedDevicesFromCmdlineCReq(); .attachAllUnattachedDevicesFromCmdlineCReq();
std::cout << "Mrntt: attached "
<< attachResults.nSucceeded << " of " << attachResults.nTotal
<< " sense devices." << "\n";
if (attachResults.nTotal > 0 && attachResults.nSucceeded == 0)
{
throw std::runtime_error(
std::string(__func__)
+ ": Failed to attach any of "
+ std::to_string(attachResults.nTotal)
+ " requested sense devices");
}
co_return; co_return;
} }
@@ -102,21 +89,8 @@ BodyViralPostingInvoker<void> Body::finalizeCReq()
} }
std::cout << "Mrntt: About to detach all sense devices." << "\n"; std::cout << "Mrntt: About to detach all sense devices." << "\n";
sscl::MultiOperationResultSet detachResults = co_await co_await device::DeviceManager::getInstance()
device::DeviceManager::getInstance().detachAllAttachedDeviceRolesCReq(); .detachAllAttachedDeviceRolesCReq();
if (detachResults.nFailed > 0)
{
std::cerr << "Mrntt: Failed to detach "
<< detachResults.nFailed << " of " << detachResults.nTotal
<< " sense devices." << "\n";
}
else
{
std::cout << "Mrntt: Successfully detached "
<< detachResults.nSucceeded << " of " << detachResults.nTotal
<< " sense devices." << "\n";
}
std::cout << "Mrntt: About to finalize all stim buff api libs." << "\n"; std::cout << "Mrntt: About to finalize all stim buff api libs." << "\n";
co_await stim_buff::StimBuffApiManager::getInstance() co_await stim_buff::StimBuffApiManager::getInstance()
+191 -119
View File
@@ -16,6 +16,7 @@
#include <mind.h> #include <mind.h>
#include <spinscale/co/postTarget.h> #include <spinscale/co/postTarget.h>
#include <spinscale/co/group.h> #include <spinscale/co/group.h>
#include <spinscale/asynchronousLoop.h>
namespace smo { namespace smo {
namespace device { namespace device {
@@ -43,6 +44,28 @@ std::shared_ptr<sscl::ComponentThread> threadForDeviceOp(
return mind::globalMind->body.thread; return mind::globalMind->body.thread;
} }
void throwIfStimBuffAttachFailed(
const stim_buff::StimBuffDeviceOpResult &result,
const char *callerFn)
{
if (result.success) { return; }
throw std::runtime_error(
std::string(callerFn) + ": attach failed for "
+ result.deviceSpec->stringify());
}
void throwIfStimBuffDetachFailed(
const stim_buff::StimBuffDeviceOpResult &result,
const char *callerFn)
{
if (result.success) { return; }
throw std::runtime_error(
std::string(callerFn) + ": detach failed for "
+ result.deviceSpec->stringify());
}
} // namespace } // namespace
DeviceManager::~DeviceManager() DeviceManager::~DeviceManager()
@@ -60,38 +83,30 @@ const std::string DeviceManager::stringifyDeviceSpecs(void)
return oss.str(); return oss.str();
} }
mrntt::MrnttViralPostingInvoker<stim_buff::StimBuffDeviceOpResult> mrntt::MrnttViralPostingInvoker<void>
DeviceManager::attachStimBuffDeviceCReq( DeviceManager::attachStimBuffDeviceCReq(
const std::shared_ptr<DeviceAttachmentSpec>& spec) const std::shared_ptr<DeviceAttachmentSpec>& spec)
{ {
assertMarionetteThread(); assertMarionetteThread();
auto &sbam = stim_buff::StimBuffApiManager::getInstance(); auto &sbam = stim_buff::StimBuffApiManager::getInstance();
auto libOpt = sbam.getStimBuffApiLibByApiName(spec->stimBuffApi); auto &lib = sbam.getStimBuffApiLibByApiName(spec->stimBuffApi);
if (!libOpt)
{
std::cerr << "attachStimBuffDeviceCReq: No library found for API '"
<< spec->stimBuffApi << "'" << std::endl;
co_return stim_buff::StimBuffDeviceOpResult{false, spec};
}
auto &lib = *libOpt.value();
if (lib.isBeingDestroyed.load()) if (lib.isBeingDestroyed.load())
{ {
std::cerr << std::string(__func__) + ": Library is being destroyed" throw std::runtime_error(
<< " for API '" << spec->stimBuffApi << "'. Bailing out.\n"; std::string(__func__) + ": Library is being destroyed"
co_return stim_buff::StimBuffDeviceOpResult{false, spec}; + " for API '" + spec->stimBuffApi + "'");
} }
if (!lib.stimBuffApiDesc.sal_mgmt_libOps.attachDeviceCReq) if (!lib.stimBuffApiDesc.sal_mgmt_libOps.attachDeviceCReq)
{ {
std::cerr << std::string(__func__) + ": attachDeviceCReq() is NULL " throw std::runtime_error(
"for library '" << lib.libraryPath << "'" std::string(__func__) + ": attachDeviceCReq() is NULL "
<< std::endl; "for library '" + lib.libraryPath + "'");
co_return stim_buff::StimBuffDeviceOpResult{false, spec};
} }
/* FIXME Locking here makes no sense. */
sscl::co::CoQutex::ReleaseHandle sbamGuard = sscl::co::CoQutex::ReleaseHandle sbamGuard =
co_await sbam.s.lock.getAcquireInvocationAndSuspensionPolicy(); co_await sbam.s.lock.getAcquireInvocationAndSuspensionPolicy();
sscl::co::CoQutex::ReleaseHandle libGuard = sscl::co::CoQutex::ReleaseHandle libGuard =
@@ -119,43 +134,38 @@ DeviceManager::attachStimBuffDeviceCReq(
<< spec->deviceIdentifier << " to body thread" << "\n"; << spec->deviceIdentifier << " to body thread" << "\n";
} }
co_return co_await lib.stimBuffApiDesc.sal_mgmt_libOps.attachDeviceCReq( stim_buff::StimBuffDeviceOpResult result =
co_await lib.stimBuffApiDesc.sal_mgmt_libOps.attachDeviceCReq(
sscl::co::ExplicitPostTarget{targetThread->getIoContext()}, sscl::co::ExplicitPostTarget{targetThread->getIoContext()},
spec, targetThread); spec, targetThread);
throwIfStimBuffAttachFailed(result, __func__);
co_return;
} }
mrntt::MrnttViralPostingInvoker<stim_buff::StimBuffDeviceOpResult> mrntt::MrnttViralPostingInvoker<void>
DeviceManager::detachStimBuffDeviceCReq( DeviceManager::detachStimBuffDeviceCReq(
const std::shared_ptr<DeviceAttachmentSpec>& spec) const std::shared_ptr<DeviceAttachmentSpec>& spec)
{ {
assertMarionetteThread(); assertMarionetteThread();
auto &sbam = stim_buff::StimBuffApiManager::getInstance(); auto &sbam = stim_buff::StimBuffApiManager::getInstance();
auto libOpt = sbam.getStimBuffApiLibByApiName(spec->stimBuffApi); auto &lib = sbam.getStimBuffApiLibByApiName(spec->stimBuffApi);
if (!libOpt)
{
std::cerr << "detachStimBuffDeviceCReq: No library found for API '"
<< spec->stimBuffApi << "'" << std::endl;
co_return stim_buff::StimBuffDeviceOpResult{false, spec};
}
auto &lib = *libOpt.value();
if (lib.isBeingDestroyed.load()) if (lib.isBeingDestroyed.load())
{ {
std::cerr << std::string(__func__) + ": Library is being destroyed" throw std::runtime_error(
<< " for API '" << spec->stimBuffApi << "'. Bailing out.\n"; std::string(__func__) + ": Library is being destroyed"
co_return stim_buff::StimBuffDeviceOpResult{false, spec}; + " for API '" + spec->stimBuffApi + "'");
} }
if (!lib.stimBuffApiDesc.sal_mgmt_libOps.detachDeviceCReq) if (!lib.stimBuffApiDesc.sal_mgmt_libOps.detachDeviceCReq)
{ {
std::cerr << std::string(__func__) + ": detachDeviceCReq() is NULL " throw std::runtime_error(
"for library '" << lib.libraryPath << "'" std::string(__func__) + ": detachDeviceCReq() is NULL "
<< std::endl; "for library '" + lib.libraryPath + "'");
co_return stim_buff::StimBuffDeviceOpResult{false, spec};
} }
/* FIXME Locking here makes no sense. */
sscl::co::CoQutex::ReleaseHandle sbamGuard = sscl::co::CoQutex::ReleaseHandle sbamGuard =
co_await sbam.s.lock.getAcquireInvocationAndSuspensionPolicy(); co_await sbam.s.lock.getAcquireInvocationAndSuspensionPolicy();
sscl::co::CoQutex::ReleaseHandle libGuard = sscl::co::CoQutex::ReleaseHandle libGuard =
@@ -165,12 +175,15 @@ DeviceManager::detachStimBuffDeviceCReq(
std::shared_ptr<sscl::ComponentThread> targetThread = std::shared_ptr<sscl::ComponentThread> targetThread =
threadForDeviceOp(*spec); threadForDeviceOp(*spec);
co_return co_await lib.stimBuffApiDesc.sal_mgmt_libOps.detachDeviceCReq( stim_buff::StimBuffDeviceOpResult result =
co_await lib.stimBuffApiDesc.sal_mgmt_libOps.detachDeviceCReq(
sscl::co::ExplicitPostTarget{targetThread->getIoContext()}, sscl::co::ExplicitPostTarget{targetThread->getIoContext()},
spec); spec);
throwIfStimBuffDetachFailed(result, __func__);
co_return;
} }
mrntt::MrnttViralPostingInvoker<DeviceManager::DeviceAttachmentIndResult> mrntt::MrnttViralPostingInvoker<std::shared_ptr<DeviceRole>>
DeviceManager::newDeviceAttachmentSpecIndCReq( DeviceManager::newDeviceAttachmentSpecIndCReq(
const DeviceAttachmentSpec &spec) const DeviceAttachmentSpec &spec)
{ {
@@ -242,36 +255,29 @@ DeviceManager::newDeviceAttachmentSpecIndCReq(
", deviceExists=" + std::to_string(deviceExists)); ", deviceExists=" + std::to_string(deviceExists));
} }
// Already attached, return success co_return existingDeviceRole;
co_return DeviceAttachmentIndResult{
true, existingDeviceRole, specPtr};
} }
stim_buff::StimBuffDeviceOpResult attachResult = dmGuard.release();
/* FIXME:
* We should add an unlocked flag to at/detachStimBuffDeviceCReq()
* so we can call it with the devmgr lock held.
*/
co_await attachStimBuffDeviceCReq(specPtr); co_await attachStimBuffDeviceCReq(specPtr);
if (!attachResult.success) sscl::co::CoQutex::ReleaseHandle dmGuardAfterAttach =
{ co_await dm.s.lock.getAcquireInvocationAndSuspensionPolicy();
std::cerr << __func__ << ": Attach failed for device spec " (void)dmGuardAfterAttach;
<< attachResult.deviceSpec->stringify() << std::endl;
co_return DeviceAttachmentIndResult{
false, nullptr, attachResult.deviceSpec};
}
try {
// Create DeviceRole and add it to both DeviceManager's and Device's collections
auto deviceRole = std::make_shared<DeviceRole>(*device, specPtr); auto deviceRole = std::make_shared<DeviceRole>(*device, specPtr);
device->deviceRoles.push_back(deviceRole); device->deviceRoles.push_back(deviceRole);
dm.s.rsrc.attachedDeviceRoles.push_back(deviceRole); dm.s.rsrc.attachedDeviceRoles.push_back(deviceRole);
co_return DeviceAttachmentIndResult{true, deviceRole, specPtr}; co_return deviceRole;
} catch (const std::exception&) {
// Attach failed, return error
co_return DeviceAttachmentIndResult{false, nullptr, specPtr};
}
} }
mrntt::MrnttViralPostingInvoker<DeviceManager::DeviceAttachmentIndResult> mrntt::MrnttViralPostingInvoker<void>
DeviceManager::removeDeviceAttachmentSpecCReq( DeviceManager::removeDeviceAttachmentSpecCReq(
const DeviceAttachmentSpec &spec) const DeviceAttachmentSpec &spec)
{ {
@@ -294,23 +300,23 @@ DeviceManager::removeDeviceAttachmentSpecCReq(
if (!specPtr) if (!specPtr)
{ {
// Spec not found, return failure throw std::runtime_error(
co_return DeviceAttachmentIndResult{false, nullptr, nullptr}; std::string(__func__) + ": Device attachment spec not found: "
+ spec.stringify());
} }
// Call detachStimBuffDeviceCReq first - only clean up metadata if this succeeds dmGuard.release();
stim_buff::StimBuffDeviceOpResult detachResult =
/* FIXME:
* We should add an unlocked flag to at/detachStimBuffDeviceCReq()
* so we can call it with the devmgr lock held.
*/
co_await detachStimBuffDeviceCReq(specPtr); co_await detachStimBuffDeviceCReq(specPtr);
if (!detachResult.success) sscl::co::CoQutex::ReleaseHandle dmGuardAfterDetach =
{ co_await dm.s.lock.getAcquireInvocationAndSuspensionPolicy();
// Detach failed, metadata remains intact (void)dmGuardAfterDetach;
co_return DeviceAttachmentIndResult{
false, nullptr, detachResult.deviceSpec};
}
// Detach succeeded, now find and clean up metadata
try {
// Find the DeviceRole in attachedDeviceRoles // Find the DeviceRole in attachedDeviceRoles
auto deviceRoleIt = std::find_if( auto deviceRoleIt = std::find_if(
dm.s.rsrc.attachedDeviceRoles.begin(), dm.s.rsrc.attachedDeviceRoles.begin(),
@@ -322,9 +328,11 @@ DeviceManager::removeDeviceAttachmentSpecCReq(
if (deviceRoleIt == dm.s.rsrc.attachedDeviceRoles.end()) if (deviceRoleIt == dm.s.rsrc.attachedDeviceRoles.end())
{ {
// DeviceRole not found, return failure throw std::runtime_error(
co_return DeviceAttachmentIndResult{ std::string(__func__) + ": DeviceRole not found for spec (race "
false, nullptr, detachResult.deviceSpec}; "condition)?: "
+ specPtr->stringify() + ", deviceRoles="
+ std::to_string(dm.s.rsrc.attachedDeviceRoles.size()));
} }
auto deviceRole = *deviceRoleIt; auto deviceRole = *deviceRoleIt;
@@ -359,30 +367,25 @@ DeviceManager::removeDeviceAttachmentSpecCReq(
dm.s.rsrc.deviceAttachmentSpecs.erase(specIt); dm.s.rsrc.deviceAttachmentSpecs.erase(specIt);
} }
co_return DeviceAttachmentIndResult{ co_return;
true, deviceRole, detachResult.deviceSpec};
} catch (const std::exception&) {
// Cleanup failed, return error
co_return DeviceAttachmentIndResult{
false, nullptr, detachResult.deviceSpec};
}
} }
mrntt::MrnttViralPostingInvoker<sscl::MultiOperationResultSet> mrntt::MrnttViralPostingInvoker<sscl::MultiOperationResultSetWithException>
DeviceManager::attachAllUnattachedDevicesFromCReq( DeviceManager::attachAllUnattachedDevicesFromCReq(
const std::shared_ptr<std::vector<DeviceAttachmentSpec>> &specs) const std::shared_ptr<std::vector<DeviceAttachmentSpec>> &specs)
{ {
assertMarionetteThread(); assertMarionetteThread();
if (specs->empty()) { const unsigned int nTotal = static_cast<unsigned int>(specs->size());
co_return sscl::MultiOperationResultSet{}; if (nTotal == 0) {
co_return sscl::MultiOperationResultSetWithException{};
} }
sscl::co::Group group; sscl::co::Group group;
std::vector< std::vector<
mrntt::MrnttViralPostingInvoker<DeviceManager::DeviceAttachmentIndResult>> mrntt::MrnttViralPostingInvoker<std::shared_ptr<DeviceRole>>>
invokers; invokers;
invokers.reserve(specs->size()); invokers.reserve(nTotal);
for (const auto &spec : *specs) for (const auto &spec : *specs)
{ {
@@ -390,32 +393,32 @@ DeviceManager::attachAllUnattachedDevicesFromCReq(
group.add(invokers.back()); group.add(invokers.back());
} }
const std::vector<sscl::co::Group::SettlementDescriptor> &settlements =
co_await group.getAwaitAllSettlementsInvoker(); co_await group.getAwaitAllSettlementsInvoker();
group.checkForAndReThrowGroupExceptions();
unsigned int nSucceeded = 0; unsigned int nSucceeded = 0;
unsigned int nFailed = 0; unsigned int nFailed = 0;
for (auto &invoker : invokers) using SettlementType = sscl::co::Group::SettlementDescriptor::TypeE;
for (const auto &desc : settlements)
{ {
if (invoker.completedReturnValues().myReturnValue.success) { if (desc.type == SettlementType::EXCEPTION_THROWN) {
nSucceeded++;
} else {
nFailed++; nFailed++;
} else {
nSucceeded++;
} }
} }
if (OptionParser::getOptions().verbose) std::exception_ptr memberFailureException = nullptr;
{ if (nFailed > 0) {
std::cout << __func__ << ": " << nSucceeded memberFailureException = group.captureAggregatedGroupExceptions();
<< " devices attached, "
<< nFailed << " devices failed\n";
} }
co_return sscl::MultiOperationResultSet( co_return sscl::MultiOperationResultSetWithException(
static_cast<unsigned int>(specs->size()), nSucceeded, nFailed); sscl::MultiOperationResultSet(nTotal, nSucceeded, nFailed),
memberFailureException);
} }
mrntt::MrnttViralPostingInvoker<sscl::MultiOperationResultSet> mrntt::MrnttViralPostingInvoker<void>
DeviceManager::attachAllUnattachedDevicesFromKnownListCReq() DeviceManager::attachAllUnattachedDevicesFromKnownListCReq()
{ {
assertMarionetteThread(); assertMarionetteThread();
@@ -447,39 +450,91 @@ DeviceManager::attachAllUnattachedDevicesFromKnownListCReq()
dmGuard.release(); dmGuard.release();
co_return co_await attachAllUnattachedDevicesFromCReq(unattachedSpecs); const sscl::MultiOperationResultSetWithException batchResult =
co_await attachAllUnattachedDevicesFromCReq(unattachedSpecs);
if (batchResult.results.nSucceeded > 0)
{
std::cout << "DeviceReattacher: Successfully reattached "
<< batchResult.results.nSucceeded << " of "
<< batchResult.results.nTotal
<< " devices" << std::endl;
} }
mrntt::MrnttViralPostingInvoker<sscl::MultiOperationResultSet> if (batchResult.hasMemberFailure())
{
try {
std::rethrow_exception(batchResult.memberFailureException);
} catch (const std::exception &e) {
std::cerr << __func__ << ": " << e.what() << std::endl;
}
}
co_return;
}
mrntt::MrnttViralPostingInvoker<void>
DeviceManager::attachAllUnattachedDevicesFromCmdlineCReq() DeviceManager::attachAllUnattachedDevicesFromCmdlineCReq()
{ {
auto specs = std::make_shared<std::vector<DeviceAttachmentSpec>>( auto specs = std::make_shared<std::vector<DeviceAttachmentSpec>>(
getInstance().s.rsrc.commandLineDASpecs); getInstance().s.rsrc.commandLineDASpecs);
co_return co_await attachAllUnattachedDevicesFromCReq(specs); const sscl::MultiOperationResultSetWithException batchResult =
co_await attachAllUnattachedDevicesFromCReq(specs);
std::cout << "Mrntt: attached "
<< batchResult.results.nSucceeded << " of "
<< batchResult.results.nTotal
<< " sense devices." << "\n";
if (batchResult.results.nTotal > 0
&& batchResult.results.nSucceeded == 0)
{
std::string message =
std::string(__func__)
+ ": Startup policy requires at least one cmdline sense device "
"to attach successfully; 0 of "
+ std::to_string(batchResult.results.nTotal)
+ " requested sense devices attached — aborting startup.";
if (batchResult.hasMemberFailure())
{
try {
std::rethrow_exception(batchResult.memberFailureException);
} catch (const std::exception &e) {
message += "\n";
message += e.what();
} catch (...) {
message += "\n<unknown exception type>";
}
} }
mrntt::MrnttViralPostingInvoker<sscl::MultiOperationResultSet> throw std::runtime_error(message);
}
co_return;
}
mrntt::MrnttViralPostingInvoker<void>
DeviceManager::detachAllAttachedDeviceRolesCReq() DeviceManager::detachAllAttachedDeviceRolesCReq()
{ {
assertMarionetteThread(); assertMarionetteThread();
std::vector<std::shared_ptr<DeviceAttachmentSpec>> specsToDetach; std::vector<std::shared_ptr<DeviceAttachmentSpec>> specsToDetach;
specsToDetach.reserve(getInstance().s.rsrc.attachedDeviceRoles.size()); specsToDetach.reserve(getInstance().s.rsrc.attachedDeviceRoles.size());
for (const auto& deviceRole : getInstance().s.rsrc.attachedDeviceRoles) for (const auto& deviceRole : getInstance().s.rsrc.attachedDeviceRoles) {
{
specsToDetach.push_back(deviceRole->deviceAttachmentSpec); specsToDetach.push_back(deviceRole->deviceAttachmentSpec);
} }
if (specsToDetach.empty()) { sscl::AsynchronousLoop loop(
co_return sscl::MultiOperationResultSet{}; static_cast<unsigned int>(specsToDetach.size()));
if (loop.nTotalIsZero()) {
co_return;
} }
sscl::co::Group group; sscl::co::Group group;
std::vector< std::vector<mrntt::MrnttViralPostingInvoker<void>> invokers;
mrntt::MrnttViralPostingInvoker<stim_buff::StimBuffDeviceOpResult>> invokers.reserve(loop.nTotal);
invokers;
invokers.reserve(specsToDetach.size());
for (const auto &spec : specsToDetach) for (const auto &spec : specsToDetach)
{ {
@@ -487,18 +542,37 @@ DeviceManager::detachAllAttachedDeviceRolesCReq()
group.add(invokers.back()); group.add(invokers.back());
} }
const std::vector<sscl::co::Group::SettlementDescriptor> &settlements =
co_await group.getAwaitAllSettlementsInvoker(); co_await group.getAwaitAllSettlementsInvoker();
group.checkForAndReThrowGroupExceptions();
unsigned int nSucceeded = 0; using SettlementType = sscl::co::Group::SettlementDescriptor::TypeE;
unsigned int nFailed = 0; for (const auto &desc : settlements)
for (auto &invoker : invokers)
{ {
if (invoker.completedReturnValues().myReturnValue.success) { loop.incrementSuccessOrFailureDueTo(
nSucceeded++; desc.type != SettlementType::EXCEPTION_THROWN);
} else {
nFailed++;
} }
if (loop.nFailed.load() > 0)
{
try {
group.checkForAndReThrowGroupExceptions();
} catch (const std::exception &e) {
std::cerr << __func__ << ": " << e.what() << std::endl;
}
}
const unsigned int nSucceeded = loop.nSucceeded.load();
const unsigned int nFailed = loop.nFailed.load();
if (nFailed > 0)
{
std::cerr << "Mrntt: Failed to detach "
<< nFailed << " of " << loop.nTotal << " sense devices." << "\n";
}
else
{
std::cout << "Mrntt: Successfully detached "
<< nSucceeded << " of " << loop.nTotal << " sense devices." << "\n";
} }
if (OptionParser::getOptions().verbose) if (OptionParser::getOptions().verbose)
@@ -508,9 +582,7 @@ DeviceManager::detachAllAttachedDeviceRolesCReq()
<< nFailed << " devices failed\n"; << nFailed << " devices failed\n";
} }
co_return sscl::MultiOperationResultSet( co_return;
static_cast<unsigned int>(specsToDetach.size()),
nSucceeded, nFailed);
} }
void DeviceManager::initializeDeviceReattacher() void DeviceManager::initializeDeviceReattacher()
+24 -17
View File
@@ -6,6 +6,7 @@
#include <deviceManager/deviceReattacher.h> #include <deviceManager/deviceReattacher.h>
#include <deviceManager/deviceManager.h> #include <deviceManager/deviceManager.h>
#include <marionette/marionetteThread.h> #include <marionette/marionetteThread.h>
#include <spinscale/co/nonViralCompletion.h>
namespace smo { namespace smo {
namespace device { namespace device {
@@ -20,29 +21,25 @@ DeviceReattacher::DeviceReattacher(
DeviceManager& parent, std::shared_ptr<sscl::ComponentThread> ioThread) DeviceManager& parent, std::shared_ptr<sscl::ComponentThread> ioThread)
: parent(parent), ioThread(ioThread), timer(ioThread->getIoContext()) : parent(parent), ioThread(ioThread), timer(ioThread->getIoContext())
{ {
/** EXPLANATION:
* The thread on which DeviceReattacher runs is whichever thread executes
* the io_context that owns deadline_timer. Timer async_wait handlers
* (onTimeout, holdReattachCReq, reattachKnownListCReq) are dispatched on
* that thread. ioThread selects that io_context here; start() only arms
* the timer on it.
*/
} }
mrntt::MrnttNonViralPostingInvoker DeviceReattacher::reattachKnownListCReq( mrntt::MrnttNonViralNonPostingInvoker DeviceReattacher::reattachKnownListCReq(
[[maybe_unused]] sscl::co::ExplicitPostTarget postTarget,
[[maybe_unused]] std::exception_ptr &exceptionPtr, [[maybe_unused]] std::exception_ptr &exceptionPtr,
[[maybe_unused]] std::function<void()> callback) [[maybe_unused]] std::function<void()> callback)
{ {
/** EXPLANATION: /** EXPLANATION:
* DeviceManager attach APIs require the marionette thread; postTarget * Non-posting: invoked from holdReattachCReq on the timer callback thread
* selects where this coroutine runs (mrntt io_context). Completion still * (see ctor). Nested DeviceManager attach APIs still post to MRNTT as
* posts back to the mrntt timer thread via callerIoContext. * needed via their own viral posting invokers.
*/ */
(void)postTarget; co_await parent.attachAllUnattachedDevicesFromKnownListCReq();
sscl::MultiOperationResultSet results = co_await
parent.attachAllUnattachedDevicesFromKnownListCReq();
if (results.nTotal > 0)
{
std::cout << "DeviceReattacher: Successfully reattached "
<< results.nSucceeded << " of " << results.nTotal
<< " devices" << std::endl;
}
co_return; co_return;
} }
@@ -92,10 +89,20 @@ void DeviceReattacher::holdReattachCReq()
reattachCReqInvoker.reset(); reattachCReqInvoker.reset();
reattachCReqInvoker.emplace(reattachKnownListCReq( reattachCReqInvoker.emplace(reattachKnownListCReq(
sscl::co::ExplicitPostTarget{ioThread->getIoContext()},
reattachLifetimeExceptionPtr, reattachLifetimeExceptionPtr,
[this]() [this]()
{ {
sscl::co::NonViralCompletion nvc(reattachLifetimeExceptionPtr);
if (nvc.hasException())
{
try {
nvc.checkAndRethrowException();
} catch (const std::exception &e) {
std::cerr << "DeviceReattacher: " << e.what()
<< std::endl;
}
}
sscl::SpinLock::Guard guard(deviceReattacherCanceler.s.lock); sscl::SpinLock::Guard guard(deviceReattacherCanceler.s.lock);
reattachOpInFlight = false; reattachOpInFlight = false;
})); }));
+12 -19
View File
@@ -13,8 +13,8 @@
#include <deviceManager/deviceRole.h> #include <deviceManager/deviceRole.h>
#include <deviceManager/deviceReattacher.h> #include <deviceManager/deviceReattacher.h>
#include <marionette/marionetteThread.h> #include <marionette/marionetteThread.h>
#include <spinscale/co/coQutex.h>
#include <spinscale/multiOperationResultSet.h> #include <spinscale/multiOperationResultSet.h>
#include <spinscale/co/coQutex.h>
#include <spinscale/sharedResourceGroup.h> #include <spinscale/sharedResourceGroup.h>
namespace smo { namespace smo {
@@ -25,13 +25,6 @@ class DeviceReattacher;
class DeviceManager class DeviceManager
{ {
public: public:
struct DeviceAttachmentIndResult
{
bool success = false;
std::shared_ptr<DeviceRole> deviceRole;
std::shared_ptr<DeviceAttachmentSpec> deviceSpec;
};
static DeviceManager& getInstance() static DeviceManager& getInstance()
{ {
static DeviceManager instance; static DeviceManager instance;
@@ -52,31 +45,27 @@ public:
static const std::string stringifyDeviceSpecs(void); static const std::string stringifyDeviceSpecs(void);
mrntt::MrnttViralPostingInvoker<DeviceAttachmentIndResult> mrntt::MrnttViralPostingInvoker<std::shared_ptr<DeviceRole>>
newDeviceAttachmentSpecIndCReq(const DeviceAttachmentSpec &spec); newDeviceAttachmentSpecIndCReq(const DeviceAttachmentSpec &spec);
mrntt::MrnttViralPostingInvoker<DeviceAttachmentIndResult> mrntt::MrnttViralPostingInvoker<void>
removeDeviceAttachmentSpecCReq(const DeviceAttachmentSpec &spec); removeDeviceAttachmentSpecCReq(const DeviceAttachmentSpec &spec);
mrntt::MrnttViralPostingInvoker<stim_buff::StimBuffDeviceOpResult> mrntt::MrnttViralPostingInvoker<void>
attachStimBuffDeviceCReq( attachStimBuffDeviceCReq(
const std::shared_ptr<DeviceAttachmentSpec>& spec); const std::shared_ptr<DeviceAttachmentSpec>& spec);
mrntt::MrnttViralPostingInvoker<stim_buff::StimBuffDeviceOpResult> mrntt::MrnttViralPostingInvoker<void>
detachStimBuffDeviceCReq( detachStimBuffDeviceCReq(
const std::shared_ptr<DeviceAttachmentSpec>& spec); const std::shared_ptr<DeviceAttachmentSpec>& spec);
mrntt::MrnttViralPostingInvoker<sscl::MultiOperationResultSet> mrntt::MrnttViralPostingInvoker<void>
attachAllUnattachedDevicesFromCReq(
const std::shared_ptr<std::vector<DeviceAttachmentSpec>> &specs);
mrntt::MrnttViralPostingInvoker<sscl::MultiOperationResultSet>
attachAllUnattachedDevicesFromKnownListCReq(); attachAllUnattachedDevicesFromKnownListCReq();
mrntt::MrnttViralPostingInvoker<sscl::MultiOperationResultSet> mrntt::MrnttViralPostingInvoker<void>
attachAllUnattachedDevicesFromCmdlineCReq(); attachAllUnattachedDevicesFromCmdlineCReq();
mrntt::MrnttViralPostingInvoker<sscl::MultiOperationResultSet> mrntt::MrnttViralPostingInvoker<void>
detachAllAttachedDeviceRolesCReq(); detachAllAttachedDeviceRolesCReq();
private: private:
@@ -88,6 +77,10 @@ private:
DeviceManager(const DeviceManager&) = delete; DeviceManager(const DeviceManager&) = delete;
DeviceManager& operator=(const DeviceManager&) = delete; DeviceManager& operator=(const DeviceManager&) = delete;
mrntt::MrnttViralPostingInvoker<sscl::MultiOperationResultSetWithException>
attachAllUnattachedDevicesFromCReq(
const std::shared_ptr<std::vector<DeviceAttachmentSpec>> &specs);
public: public:
struct Resources struct Resources
{ {
@@ -9,7 +9,6 @@
#include <optional> #include <optional>
#include <boost/asio/deadline_timer.hpp> #include <boost/asio/deadline_timer.hpp>
#include <marionette/marionetteThread.h> #include <marionette/marionetteThread.h>
#include <spinscale/multiOperationResultSet.h>
#include <spinscale/syncCancelerForAsyncWork.h> #include <spinscale/syncCancelerForAsyncWork.h>
namespace smo { namespace smo {
@@ -37,17 +36,17 @@ private:
void onTimeout(const boost::system::error_code& error); void onTimeout(const boost::system::error_code& error);
void holdReattachCReq(); void holdReattachCReq();
mrntt::MrnttNonViralPostingInvoker reattachKnownListCReq( mrntt::MrnttNonViralNonPostingInvoker reattachKnownListCReq(
sscl::co::ExplicitPostTarget postTarget,
std::exception_ptr &exceptionPtr, std::exception_ptr &exceptionPtr,
std::function<void()> callback); std::function<void()> callback);
DeviceManager &parent; DeviceManager &parent;
// io_context thread for timer and non-posting reattach shell (see ctor).
std::shared_ptr<sscl::ComponentThread> ioThread; std::shared_ptr<sscl::ComponentThread> ioThread;
sscl::SyncCancelerForAsyncWork deviceReattacherCanceler; sscl::SyncCancelerForAsyncWork deviceReattacherCanceler;
boost::asio::deadline_timer timer; boost::asio::deadline_timer timer;
std::exception_ptr reattachLifetimeExceptionPtr; std::exception_ptr reattachLifetimeExceptionPtr;
std::optional<mrntt::MrnttNonViralPostingInvoker> reattachCReqInvoker; std::optional<mrntt::MrnttNonViralNonPostingInvoker> reattachCReqInvoker;
bool reattachOpInFlight = false; bool reattachOpInFlight = false;
std::chrono::steady_clock::time_point lastReattachReqTimestamp{}; std::chrono::steady_clock::time_point lastReattachReqTimestamp{};
}; };
@@ -20,6 +20,9 @@ using MrnttPostingPromise =
using MrnttNonViralPostingInvoker = using MrnttNonViralPostingInvoker =
sscl::co::NonViralPostingInvoker<MrnttPostingPromise>; sscl::co::NonViralPostingInvoker<MrnttPostingPromise>;
using MrnttNonViralNonPostingInvoker =
sscl::co::NonViralNonPostingInvoker;
template <typename T> template <typename T>
using MrnttViralPostingInvoker = using MrnttViralPostingInvoker =
sscl::co::ViralPostingInvoker<MrnttPostingPromise, T>; sscl::co::ViralPostingInvoker<MrnttPostingPromise, T>;
@@ -41,8 +41,9 @@ public:
std::optional<std::shared_ptr<StimBuffApiLib>> getStimBuffApiLib( std::optional<std::shared_ptr<StimBuffApiLib>> getStimBuffApiLib(
const std::string& libraryPath); const std::string& libraryPath);
std::optional<std::shared_ptr<StimBuffApiLib>> getStimBuffApiLibByApiName( std::optional<std::shared_ptr<StimBuffApiLib>> findStimBuffApiLibByApiName(
const std::string& apiName); const std::string& apiName);
StimBuffApiLib &getStimBuffApiLibByApiName(const std::string& apiName);
void unloadStimBuffApiLib(const std::string& libraryPath); void unloadStimBuffApiLib(const std::string& libraryPath);
void loadAllStimBuffApiLibsFromOptions( void loadAllStimBuffApiLibsFromOptions(
+14 -1
View File
@@ -230,7 +230,7 @@ StimBuffApiManager::getStimBuffApiLib(const std::string& libraryPath)
} }
std::optional<std::shared_ptr<StimBuffApiLib>> std::optional<std::shared_ptr<StimBuffApiLib>>
StimBuffApiManager::getStimBuffApiLibByApiName(const std::string& apiName) StimBuffApiManager::findStimBuffApiLibByApiName(const std::string& apiName)
{ {
auto &libs = getInstance().s.rsrc.stimBuffApiLibs; auto &libs = getInstance().s.rsrc.stimBuffApiLibs;
auto it = std::find_if(libs.begin(), libs.end(), auto it = std::find_if(libs.begin(), libs.end(),
@@ -243,6 +243,19 @@ StimBuffApiManager::getStimBuffApiLibByApiName(const std::string& apiName)
return std::nullopt; return std::nullopt;
} }
StimBuffApiLib &StimBuffApiManager::getStimBuffApiLibByApiName(
const std::string& apiName)
{
auto libOpt = findStimBuffApiLibByApiName(apiName);
if (!libOpt)
{
throw std::runtime_error(
std::string(__func__) + ": No library for API '" + apiName + "'");
}
return *libOpt.value();
}
void StimBuffApiManager::unloadStimBuffApiLib(const std::string& libraryPath) void StimBuffApiManager::unloadStimBuffApiLib(const std::string& libraryPath)
{ {
auto &libs = getInstance().s.rsrc.stimBuffApiLibs; auto &libs = getInstance().s.rsrc.stimBuffApiLibs;