SenseApiMgr: Make at/detachSenseDev & at/detachAllSenseDevs posted

They are posted to Marionette.

* We also fixed callOriginCb invocations;
* Also made posted CBs use std::bind instead of greedily
  early-invoking the CB on the servicing thread's stack.
This commit is contained in:
2025-09-16 18:38:06 -04:00
parent 92e55641a0
commit af33b7f097
2 changed files with 250 additions and 125 deletions
+1 -1
View File
@@ -81,7 +81,7 @@ private:
std::vector<std::shared_ptr<SenseApiLib>> senseApiLibs; std::vector<std::shared_ptr<SenseApiLib>> senseApiLibs;
class AttachSenseDeviceReq; class AttachSenseDeviceReq;
class DetachSenseDeviceReq; typedef AttachSenseDeviceReq DetachSenseDeviceReq;
class AttachAllSenseDevicesFromSpecsReq; class AttachAllSenseDevicesFromSpecsReq;
class DetachAllSenseDevicesReq; class DetachAllSenseDevicesReq;
+222 -97
View File
@@ -11,6 +11,8 @@
#include <user/senseApiDesc.h> #include <user/senseApiDesc.h>
#include <mind.h> #include <mind.h>
#include <deviceManager/deviceManager.h> #include <deviceManager/deviceManager.h>
#include <marionette/marionette.h>
namespace fs = std::filesystem; namespace fs = std::filesystem;
@@ -259,44 +261,82 @@ void SenseApiManager::finalizeAllSenseApiLibs(void)
} }
} }
void SenseApiManager::attachSenseDeviceReq( class SenseApiManager::AttachSenseDeviceReq
const std::shared_ptr<device::DeviceAttachmentSpec>& spec, : public TargetedAsynchronousContinuation<attachSenseDeviceReqCbFn>
attachSenseDeviceReqCbFn cb
)
{ {
/** FIXME: public:
* We should acquire a spinlock here to ensure that the device isn't added AttachSenseDeviceReq(
* in the interim while the async op executes. const std::shared_ptr<device::DeviceAttachmentSpec>& spec,
*/ const std::shared_ptr<ComponentThread> &caller,
attachSenseDeviceReqCbFn cb)
: TargetedAsynchronousContinuation<attachSenseDeviceReqCbFn>(
caller, cb),
spec(spec)
{}
void callOriginalCb(
bool success, std::shared_ptr<device::DeviceAttachmentSpec> deviceSpec
)
{
if (originalCbFn)
{
caller->getIoService().post(
std::bind(
originalCbFn, success, deviceSpec));
}
}
public:
void attachSenseDeviceReq1_posted(
[[maybe_unused]] std::shared_ptr<AttachSenseDeviceReq> context
)
{
if (caller->id != ComponentThread::MRNTT)
{
std::cerr << std::string(__func__)
<< ": executed on non-mrntt thread: "
<< caller->name << std::endl;
callOriginalCb(false, spec);
return;
}
/** FIXME:
* We should acquire a spinlock here to ensure that the device isn't
* added in the interim while the async op executes.
*/
auto libOpt = SenseApiManager::getInstance().getSenseApiLibByApiName(
spec->api);
auto libOpt = getSenseApiLibByApiName(spec->api);
if (!libOpt) if (!libOpt)
{ {
throw std::runtime_error( std::cerr << std::string(__func__) + ": No library found for API '"
std::string(__func__) + ": No library found for API '" << spec->api << "'" << std::endl;
+ spec->api + "'"); callOriginalCb(false, spec);
return;
} }
auto& lib = *libOpt.value(); auto& lib = *libOpt.value();
if (!lib.senseApiDesc.sal_mgmt_libOps.attachDeviceReq) if (!lib.senseApiDesc.sal_mgmt_libOps.attachDeviceReq)
{ {
throw std::runtime_error( std::cerr << std::string(__func__) + ": attachDeviceReq() is NULL "
std::string(__func__) + ": attachDeviceReq() is NULL for library '" "for library '" << lib.libraryPath << "'" << std::endl;
+ lib.libraryPath + "'"); callOriginalCb(false, spec);
return;
} }
/** EXPLANATION: /** EXPLANATION:
* We pass in either the body or world thread here, depending on whether * We pass in either the body or world thread here, depending on whether
* the device is an introspector (idev) or extrospector (edev). * the device is an introspector (idev) or extrospector (edev).
* *
* Introspectors are attached to the body thread; extrospectors are attached * Introspectors are attached to the body thread; extrospectors are
* to the world thread. * attached to the world thread.
*/ */
std::shared_ptr<ComponentThread> threadForAttachment; std::shared_ptr<ComponentThread> threadForAttachment;
if (spec->sensorType == 'e') if (spec->sensorType == 'e')
{ {
threadForAttachment = mind::globalMind->world.thread; threadForAttachment = mind::globalMind->world.thread;
std::cout << __func__ << ": Attaching edev " << spec->deviceIdentifier std::cout << __func__ << ": Attaching edev "
<< " to world thread" << "\n"; << spec->deviceIdentifier << " to world thread" << "\n";
} }
else else
{ {
@@ -306,7 +346,90 @@ void SenseApiManager::attachSenseDeviceReq(
} }
lib.senseApiDesc.sal_mgmt_libOps.attachDeviceReq( lib.senseApiDesc.sal_mgmt_libOps.attachDeviceReq(
spec, threadForAttachment, cb); spec, threadForAttachment,
std::bind(
&AttachSenseDeviceReq::attachSenseDeviceReq2,
context.get(), context,
std::placeholders::_1, std::placeholders::_2));
}
void attachSenseDeviceReq2(
[[maybe_unused]] std::shared_ptr<AttachSenseDeviceReq> context,
bool success,
std::shared_ptr<device::DeviceAttachmentSpec> deviceSpec
)
{
callOriginalCb(success, deviceSpec);
}
void detachSenseDeviceReq1_posted(
[[maybe_unused]] std::shared_ptr<DetachSenseDeviceReq> context
)
{
if (caller->id != ComponentThread::MRNTT)
{
std::cerr << std::string(__func__)
<< ": executed on non-mrntt thread: "
<< caller->name << std::endl;
callOriginalCb(false, spec);
return;
}
/** FIXME:
* We should acquire a spinlock here to ensure that the device isn't
* removed in the interim while the async op executes.
*/
auto libOpt = SenseApiManager::getInstance().getSenseApiLibByApiName(
spec->api);
if (!libOpt)
{
std::cerr << std::string(__func__) + ": No library found for API '"
<< spec->api << "'" << std::endl;
callOriginalCb(false, spec);
return;
}
auto& lib = *libOpt.value();
if (!lib.senseApiDesc.sal_mgmt_libOps.detachDeviceReq)
{
std::cerr << std::string(__func__) + ": detachDeviceReq() is NULL "
"for library '" << lib.libraryPath << "'" << std::endl;
callOriginalCb(false, spec);
return;
}
lib.senseApiDesc.sal_mgmt_libOps.detachDeviceReq(
spec,
std::bind(
&DetachSenseDeviceReq::detachSenseDeviceReq2,
context.get(), context,
std::placeholders::_1, std::placeholders::_2));
}
void detachSenseDeviceReq2(
[[maybe_unused]] std::shared_ptr<DetachSenseDeviceReq> context,
bool success,
std::shared_ptr<device::DeviceAttachmentSpec> deviceSpec
)
{
callOriginalCb(success, deviceSpec);
}
public:
std::shared_ptr<device::DeviceAttachmentSpec> spec;
};
void SenseApiManager::attachSenseDeviceReq(
const std::shared_ptr<device::DeviceAttachmentSpec>& spec,
attachSenseDeviceReqCbFn cb
)
{
const auto& caller = ComponentThread::getSelf();
auto request = std::make_shared<AttachSenseDeviceReq>(
spec, caller, cb);
mrntt::mrntt.thread->getIoService().post(
std::bind(
&AttachSenseDeviceReq::attachSenseDeviceReq1_posted,
request.get(), request));
} }
void SenseApiManager::detachSenseDeviceReq( void SenseApiManager::detachSenseDeviceReq(
@@ -314,41 +437,58 @@ void SenseApiManager::detachSenseDeviceReq(
detachSenseDeviceReqCbFn cb detachSenseDeviceReqCbFn cb
) )
{ {
/** FIXME: const auto& caller = ComponentThread::getSelf();
* We should acquire a spinlock here to ensure that the device isn't removed auto request = std::make_shared<DetachSenseDeviceReq>(
* in the interim while the async op executes. spec, caller, cb);
*/
auto libOpt = getSenseApiLibByApiName(spec->api); mrntt::mrntt.thread->getIoService().post(
if (!libOpt) std::bind(
{ &DetachSenseDeviceReq::detachSenseDeviceReq1_posted,
throw std::runtime_error( request.get(), request));
std::string(__func__) + ": No library found for API '"
+ spec->api + "'");
}
auto& lib = *libOpt.value();
if (!lib.senseApiDesc.sal_mgmt_libOps.detachDeviceReq)
{
throw std::runtime_error(
std::string(__func__) + ": detachDeviceReq() is NULL for library '"
+ lib.libraryPath + "'");
}
lib.senseApiDesc.sal_mgmt_libOps.detachDeviceReq(spec, cb);
} }
class SenseApiManager::AttachAllSenseDevicesFromSpecsReq class SenseApiManager::AttachAllSenseDevicesFromSpecsReq
: public AsynchronousContinuation<attachAllSenseDevicesFromSpecsReqCbFn> : public TargetedAsynchronousContinuation<
attachAllSenseDevicesFromSpecsReqCbFn>
{ {
public: public:
AttachAllSenseDevicesFromSpecsReq( AttachAllSenseDevicesFromSpecsReq(
const unsigned int totalNSpecs, const unsigned int totalNSpecs,
const std::shared_ptr<ComponentThread>& caller,
attachAllSenseDevicesFromSpecsReqCbFn cb) attachAllSenseDevicesFromSpecsReqCbFn cb)
: AsynchronousContinuation(std::move(cb)), : TargetedAsynchronousContinuation<attachAllSenseDevicesFromSpecsReqCbFn>(
caller, cb),
loop(totalNSpecs) loop(totalNSpecs)
{} {}
void callOriginalCallback()
{
if (originalCbFn)
{
caller->getIoService().post(
std::bind(
originalCbFn, loop));
}
}
public:
void attachAllSenseDevicesFromSpecsReq1_posted(
[[maybe_unused]] std::shared_ptr<AttachAllSenseDevicesFromSpecsReq> context
)
{
for (const auto& spec : device::DeviceManager::deviceAttachmentSpecs)
{
SenseApiManager::getInstance().attachSenseDeviceReq(
spec,
std::bind(
&AttachAllSenseDevicesFromSpecsReq::attachAllSenseDevicesFromSpecsReq2,
context.get(), context,
std::placeholders::_1, std::placeholders::_2));
}
}
// Callback methods for the attachment sequence // Callback methods for the attachment sequence
void attachAllSenseDevicesFromSpecsReq1( void attachAllSenseDevicesFromSpecsReq2(
std::shared_ptr<AttachAllSenseDevicesFromSpecsReq> context, std::shared_ptr<AttachAllSenseDevicesFromSpecsReq> context,
bool success, std::shared_ptr<device::DeviceAttachmentSpec> spec bool success, std::shared_ptr<device::DeviceAttachmentSpec> spec
) )
@@ -373,12 +513,7 @@ public:
<< context->loop.nFailed.load() << " devices failed\n"; << context->loop.nFailed.load() << " devices failed\n";
} }
context->originalCbFn(context->loop); context->callOriginalCallback();
}
void callOriginalCallback()
{
originalCbFn(loop);
} }
public: public:
@@ -389,31 +524,22 @@ void SenseApiManager::attachAllSenseDevicesFromSpecsReq(
attachAllSenseDevicesFromSpecsReqCbFn cb attachAllSenseDevicesFromSpecsReqCbFn cb
) )
{ {
// Create the attachment request object to hold state and callbacks if (device::DeviceManager::getInstance().deviceAttachmentSpecs.size() == 0)
auto request = std::make_shared<AttachAllSenseDevicesFromSpecsReq>(
device::DeviceManager::deviceAttachmentSpecs.size(), std::move(cb));
if (request->loop.nTotalIsZero())
{ {
request->callOriginalCallback(); AsynchronousLoop tmp(0);
cb(tmp);
return; return;
} }
for (const auto& spec : device::DeviceManager::deviceAttachmentSpecs) const auto& caller = ComponentThread::getSelf();
{ auto request = std::make_shared<AttachAllSenseDevicesFromSpecsReq>(
try { device::DeviceManager::getInstance().deviceAttachmentSpecs.size(),
attachSenseDeviceReq( caller, std::move(cb));
spec,
mrntt::mrntt.thread->getIoService().post(
std::bind( std::bind(
&AttachAllSenseDevicesFromSpecsReq::attachAllSenseDevicesFromSpecsReq1, &AttachAllSenseDevicesFromSpecsReq::attachAllSenseDevicesFromSpecsReq1_posted,
request.get(), request, request.get(), request));
std::placeholders::_1, std::placeholders::_2));
} catch (const std::exception& e) {
std::cerr << __func__ << ": Exception: " << e.what() << "\n";
if (request->loop.incrementSuccessOrFailureAndTestForCompletionDueTo(false))
{ request->callOriginalCallback(); }
}
}
} }
class SenseApiManager::DetachAllSenseDevicesReq class SenseApiManager::DetachAllSenseDevicesReq
@@ -422,7 +548,22 @@ class SenseApiManager::DetachAllSenseDevicesReq
public: public:
using AttachAllSenseDevicesFromSpecsReq::AttachAllSenseDevicesFromSpecsReq; using AttachAllSenseDevicesFromSpecsReq::AttachAllSenseDevicesFromSpecsReq;
void detachAllSenseDevicesReq1( void detachAllSenseDevicesReq1_posted(
[[maybe_unused]] std::shared_ptr<DetachAllSenseDevicesReq> context
)
{
for (const auto& spec : device::DeviceManager::deviceAttachmentSpecs)
{
SenseApiManager::getInstance().detachSenseDeviceReq(
spec,
std::bind(
&DetachAllSenseDevicesReq::detachAllSenseDevicesReq2,
context.get(), context,
std::placeholders::_1, std::placeholders::_2));
}
}
void detachAllSenseDevicesReq2(
std::shared_ptr<DetachAllSenseDevicesReq> context, std::shared_ptr<DetachAllSenseDevicesReq> context,
bool success, std::shared_ptr<device::DeviceAttachmentSpec> spec bool success, std::shared_ptr<device::DeviceAttachmentSpec> spec
) )
@@ -447,12 +588,7 @@ public:
<< context->loop.nFailed.load() << " devices failed\n"; << context->loop.nFailed.load() << " devices failed\n";
} }
context->originalCbFn(context->loop); context->callOriginalCallback();
}
void callOriginalCallback()
{
originalCbFn(loop);
} }
}; };
@@ -460,33 +596,22 @@ void SenseApiManager::detachAllSenseDevicesReq(
detachAllSenseDevicesReqCbFn cb detachAllSenseDevicesReqCbFn cb
) )
{ {
auto request = std::make_shared<DetachAllSenseDevicesReq>( if (device::DeviceManager::getInstance().deviceAttachmentSpecs.size() == 0)
device::DeviceManager::deviceAttachmentSpecs.size(), std::move(cb));
if (request->loop.nTotalIsZero())
{ {
request->callOriginalCallback(); AsynchronousLoop tmp(0);
cb(tmp);
return; return;
} }
for (const auto& spec : device::DeviceManager::deviceAttachmentSpecs) const auto& caller = ComponentThread::getSelf();
{ auto request = std::make_shared<DetachAllSenseDevicesReq>(
try { device::DeviceManager::getInstance().deviceAttachmentSpecs.size(),
detachSenseDeviceReq( caller, std::move(cb));
spec,
mrntt::mrntt.thread->getIoService().post(
std::bind( std::bind(
&DetachAllSenseDevicesReq::detachAllSenseDevicesReq1, &DetachAllSenseDevicesReq::detachAllSenseDevicesReq1_posted,
request.get(), request, request.get(), request));
std::placeholders::_1, std::placeholders::_2));
} catch (const std::exception& e) {
std::cerr << __func__ << ": Exception: " << e.what() << "\n";
if (request->loop
.incrementSuccessOrFailureAndTestForCompletionDueTo(false))
{
request->callOriginalCallback();
}
}
}
} }
} // namespace sense_api } // namespace sense_api