SenseApis: Make attachDeviceReq async in drivers and SenseApiMgr

Slowly retrogressively making these sequences async
This commit is contained in:
2025-09-10 06:51:55 -04:00
parent 5b5a701c69
commit 1b6b12256d
9 changed files with 282 additions and 116 deletions
+9 -4
View File
@@ -34,12 +34,17 @@ struct SmoThreadingModelDesc
std::shared_ptr<ComponentThread> componentThread; std::shared_ptr<ComponentThread> componentThread;
}; };
typedef std::function<void(bool)> sal_mlo_attachDeviceReqCbFn;
typedef std::function<void(bool)> sal_mlo_detachDeviceReqCbFn;
typedef int (sal_mlo_initializeIndFn)(void); typedef int (sal_mlo_initializeIndFn)(void);
typedef int (sal_mlo_finalizeIndFn)(void); typedef int (sal_mlo_finalizeIndFn)(void);
typedef int (sal_mlo_attachDeviceReqFn)( typedef void (sal_mlo_attachDeviceReqFn)(
const std::shared_ptr<device::DeviceAttachmentSpec>& desc); const std::shared_ptr<device::DeviceAttachmentSpec>& desc,
typedef int (sal_mlo_detachDeviceReqFn)( sal_mlo_attachDeviceReqCbFn cb);
const std::shared_ptr<device::DeviceAttachmentSpec>& desc); typedef void (sal_mlo_detachDeviceReqFn)(
const std::shared_ptr<device::DeviceAttachmentSpec>& desc,
sal_mlo_detachDeviceReqCbFn cb);
/** /**
* @brief Hooks provided by Salmanoff to senseApi libraries. * @brief Hooks provided by Salmanoff to senseApi libraries.
+74 -64
View File
@@ -53,10 +53,12 @@ static std::vector<std::shared_ptr<livoxProto1::Device>> g_attachedDevices;
// Callback function declarations // Callback function declarations
extern "C" int livoxGen1_initializeInd(void); extern "C" int livoxGen1_initializeInd(void);
extern "C" int livoxGen1_finalizeInd(void); extern "C" int livoxGen1_finalizeInd(void);
extern "C" int livoxGen1_attachDeviceReq( extern "C" void livoxGen1_attachDeviceReq(
const std::shared_ptr<smo::device::DeviceAttachmentSpec>& desc); const std::shared_ptr<smo::device::DeviceAttachmentSpec>& desc,
extern "C" int livoxGen1_detachDeviceReq( smo::sense_api::sal_mlo_attachDeviceReqCbFn cb);
const std::shared_ptr<smo::device::DeviceAttachmentSpec>& desc); extern "C" void livoxGen1_detachDeviceReq(
const std::shared_ptr<smo::device::DeviceAttachmentSpec>& desc,
smo::sense_api::sal_mlo_detachDeviceReqCbFn cb);
// Sense API descriptor // Sense API descriptor
static const SenseApiDesc livoxGen1ApiDesc = { static const SenseApiDesc livoxGen1ApiDesc = {
@@ -151,8 +153,9 @@ extern "C" int livoxGen1_finalizeInd(void)
return 0; // Success return 0; // Success
} }
extern "C" int livoxGen1_attachDeviceReq( extern "C" void livoxGen1_attachDeviceReq(
const std::shared_ptr<smo::device::DeviceAttachmentSpec>& desc const std::shared_ptr<smo::device::DeviceAttachmentSpec>& desc,
smo::sense_api::sal_mlo_attachDeviceReqCbFn cb
) )
{ {
if (!livoxProto1.livoxProto1_getOrCreateDeviceReq) if (!livoxProto1.livoxProto1_getOrCreateDeviceReq)
@@ -162,6 +165,17 @@ extern "C" int livoxGen1_attachDeviceReq(
"not available"); "not available");
} }
/** FIXME:
* We should acquire a spinlock here to ensure that the device isn't added
* in the interim while the async op executes.
*/
for (const auto& dev : g_attachedDevices)
{
if (dev->discoveredDevice.deviceIdentifier == desc->deviceIdentifier)
{ return; }
}
// Parse integer parameters from provider params with defaults // Parse integer parameters from provider params with defaults
/* The Livox Avia will generally respond to a handshake request within /* The Livox Avia will generally respond to a handshake request within
* 50ms. So we set the handshake timeout to 300ms to be safe. * 50ms. So we set the handshake timeout to 300ms to be safe.
@@ -233,59 +247,47 @@ extern "C" int livoxGen1_attachDeviceReq(
} }
} }
std::atomic<bool> callbackCalled{false};
std::shared_ptr<livoxProto1::Device> device = nullptr;
std::shared_ptr<ComponentThread> self = smoHooksPtr->
ComponentThread_getSelf();
(*livoxProto1.livoxProto1_getOrCreateDeviceReq)( (*livoxProto1.livoxProto1_getOrCreateDeviceReq)(
desc->deviceSelector, // deviceIdentifier (broadcast code) desc->deviceSelector, // deviceIdentifier (broadcast code)
smoThreadingModelDesc.componentThread, smoThreadingModelDesc.componentThread,
handshakeTimeoutMs, retryDelayMs, handshakeTimeoutMs, retryDelayMs,
smoIp, smoSubnetNbits, smoIp, smoSubnetNbits,
dataPort, cmdPort, imuPort, dataPort, cmdPort, imuPort,
[&callbackCalled, &device, self]( [desc, cb](
bool success, std::shared_ptr<livoxProto1::Device> dev) -> void bool success, std::shared_ptr<livoxProto1::Device> dev) -> void
{ {
callbackCalled.store(true); if (!dev)
device = ((success) ? dev : nullptr);
// Ensure that the bridging loop below will get awakened.
self->getIoService().post([]{});
}
);
/** EXPLANATION:
* Bridge the async call by dequeueing until callbackCalled is true.
*/
for (;;)
{ {
self->getIoService().run_one(); std::cerr << __func__ << ": Failed to create Livox device: "
if (callbackCalled.load() || self->getIoService().stopped()) << desc->deviceSelector << std::endl;
{ break; } cb(false);
return;
} }
if (!device) g_attachedDevices.push_back(dev);
{
throw std::runtime_error(
std::string(__func__) + ": Failed to create Livox device: "
+ desc->deviceSelector);
}
g_attachedDevices.push_back(device);
if (1 || OptionParser::getOptions().verbose) if (1 || OptionParser::getOptions().verbose)
{ {
std::cout << __func__ << ": Successfully attached Livox device: " std::cout << __func__ << ": Successfully attached Livox "
<< desc->deviceSelector << " (ID: " << desc->deviceIdentifier "device: " << desc->deviceSelector << " (ID: "
<< ")\n"; << desc->deviceIdentifier << ")\n";
} }
return 0; // Success cb(success);
}
);
} }
extern "C" int livoxGen1_detachDeviceReq( extern "C" void livoxGen1_detachDeviceReq(
const std::shared_ptr<smo::device::DeviceAttachmentSpec>& desc const std::shared_ptr<smo::device::DeviceAttachmentSpec>& desc,
smo::sense_api::sal_mlo_detachDeviceReqCbFn cb
) )
{ {
/** FIXME:
* We should acquire a spinlock here to ensure that iterator doesn't become
* invalid in the interim while the async op executes. In the meantime,
* we'll repeat the search in the callback.
*/
// Find and remove the device from our collection // Find and remove the device from our collection
auto it = std::find_if(g_attachedDevices.begin(), g_attachedDevices.end(), auto it = std::find_if(g_attachedDevices.begin(), g_attachedDevices.end(),
[&desc](const std::shared_ptr<livoxProto1::Device>& dev) { [&desc](const std::shared_ptr<livoxProto1::Device>& dev) {
@@ -304,44 +306,52 @@ extern "C" int livoxGen1_detachDeviceReq(
if (it == g_attachedDevices.end()) if (it == g_attachedDevices.end())
{ {
std::cerr << __func__ << ": Device not found for detachment: " throw std::runtime_error(
<< desc->deviceIdentifier << "\n"; std::string(__func__) +
return -1; // Device not found ": Device not found for detachment: " + desc->deviceIdentifier);
} }
std::atomic<bool> callbackCalled{false};
bool retVal = false;
std::shared_ptr<ComponentThread> self = smoHooksPtr->
ComponentThread_getSelf();
(*livoxProto1.livoxProto1_destroyDeviceReq)( (*livoxProto1.livoxProto1_destroyDeviceReq)(
*it, *it,
[&callbackCalled, &retVal, self](bool success) [cb, desc](bool success)
{ {
callbackCalled.store(true); if (!success)
retVal = success; {
self->getIoService().post([]{}); std::cerr << __func__ << ": Failed to destroy Livox device: "
<< desc->deviceIdentifier << "\n";
cb(false);
return;
}
// Find the device in g_attachedDevices and remove it.
auto eraseIt = std::find_if(
g_attachedDevices.begin(), g_attachedDevices.end(),
[desc](const std::shared_ptr<livoxProto1::Device>& dev)
{
const std::string& devId = dev->discoveredDevice.deviceIdentifier;
std::string devIdPrefix = devId.substr(
0, std::min<size_t>(14, devId.size()));
return devIdPrefix == desc->deviceSelector.substr(
0, std::min<size_t>(14, desc->deviceSelector.size()));
} }
); );
for (;;) if (eraseIt == g_attachedDevices.end())
{ {
self->getIoService().run_one(); std::cerr << __func__ << ": Race condition: device not found "
if (callbackCalled.load() || self->getIoService().stopped()) "in g_attachedDevices for detachment: "
{ break; } << desc->deviceIdentifier << "\n";
cb(false);
return;
} }
if (!retVal) g_attachedDevices.erase(eraseIt);
{
std::cerr << __func__ << ": Failed to destroy Livox device: "
<< desc->deviceIdentifier << std::endl;
}
g_attachedDevices.erase(it);
std::cout << __func__ << ": Successfully detached Livox device: " std::cout << __func__ << ": Successfully detached Livox device: "
<< desc->deviceIdentifier << "\n"; << desc->deviceIdentifier << "\n";
return 0; cb(success);
}
);
} }
// Exported function // Exported function
+13 -7
View File
@@ -273,8 +273,9 @@ static int xcbWindow_finalizeInd(void)
return 0; return 0;
} }
static int xcbWindow_attachDeviceReq( static void xcbWindow_attachDeviceReq(
const std::shared_ptr<smo::device::DeviceAttachmentSpec>& desc const std::shared_ptr<smo::device::DeviceAttachmentSpec>& desc,
smo::sense_api::sal_mlo_attachDeviceReqCbFn cb
) )
{ {
g_attachedWindows.emplace_back( g_attachedWindows.emplace_back(
@@ -283,11 +284,13 @@ static int xcbWindow_attachDeviceReq(
std::cout << __func__ << ": Attached X11 window:\n " std::cout << __func__ << ": Attached X11 window:\n "
<< g_attachedWindows.back()->stringify() << g_attachedWindows.back()->stringify()
<< "\n"; << "\n";
return 0;
cb(true);
} }
static int xcbWindow_detachDeviceReq( static void xcbWindow_detachDeviceReq(
const std::shared_ptr<smo::device::DeviceAttachmentSpec>& spec const std::shared_ptr<smo::device::DeviceAttachmentSpec>& spec,
smo::sense_api::sal_mlo_detachDeviceReqCbFn cb
) )
{ {
auto it = std::find_if(g_attachedWindows.begin(), g_attachedWindows.end(), auto it = std::find_if(g_attachedWindows.begin(), g_attachedWindows.end(),
@@ -300,13 +303,16 @@ static int xcbWindow_detachDeviceReq(
{ {
std::cerr << __func__ << ": Device not found for detachment:\n" std::cerr << __func__ << ": Device not found for detachment:\n"
<< spec->stringify() << "\n"; << spec->stringify() << "\n";
return -1;
cb(false);
return;
} }
g_attachedWindows.erase(it); g_attachedWindows.erase(it);
std::cout << __func__ << ": Detached X11 window device:\n" std::cout << __func__ << ": Detached X11 window device:\n"
<< spec->stringify() << "\n"; << spec->stringify() << "\n";
return 0;
cb(true);
} }
// SenseApi descriptor // SenseApi descriptor
+2
View File
@@ -54,6 +54,7 @@ const std::string DeviceManager::stringifyDeviceSpecs(void)
return oss.str(); return oss.str();
} }
#if 0
void DeviceManager::newDeviceAttachmentSpecInd( void DeviceManager::newDeviceAttachmentSpecInd(
std::shared_ptr<DeviceAttachmentSpec> spec, std::shared_ptr<DeviceAttachmentSpec> spec,
std::function<void( std::function<void(
@@ -109,6 +110,7 @@ void DeviceManager::newDeviceAttachmentSpecInd(
callback(false, nullptr, nullptr); callback(false, nullptr, nullptr);
} }
} }
#endif
} // namespace device } // namespace device
} // namespace smo } // namespace smo
@@ -30,13 +30,13 @@ public:
static const std::string stringifyDeviceSpecs(void); static const std::string stringifyDeviceSpecs(void);
// New async function for device attachment // New async function for device attachment
void newDeviceAttachmentSpecInd( typedef std::function<void(
std::shared_ptr<DeviceAttachmentSpec> spec,
std::function<
void(
bool success, std::shared_ptr<Device> device, bool success, std::shared_ptr<Device> device,
std::shared_ptr<DeviceAttachmentSpec> deviceSpec)> std::shared_ptr<DeviceAttachmentSpec> deviceSpec)>
callback); deviceAttachmentSpecIndCbFn;
void newDeviceAttachmentSpecInd(
std::shared_ptr<DeviceAttachmentSpec> spec,
deviceAttachmentSpecIndCbFn callback);
private: private:
DeviceManager() = default; DeviceManager() = default;
+13 -4
View File
@@ -43,12 +43,18 @@ public:
void initializeAllSenseApiLibs(void); void initializeAllSenseApiLibs(void);
void finalizeAllSenseApiLibs(void); void finalizeAllSenseApiLibs(void);
typedef sal_mlo_attachDeviceReqCbFn attachSenseDeviceReqCbFn;
typedef sal_mlo_detachDeviceReqCbFn detachSenseDeviceReqCbFn;
void attachSenseDeviceReq(
const std::shared_ptr<device::DeviceAttachmentSpec>& spec,
attachSenseDeviceReqCbFn cb);
void detachSenseDeviceReq(
const std::shared_ptr<device::DeviceAttachmentSpec>& spec,
detachSenseDeviceReqCbFn cb);
void attachAllSenseDevicesFromSpecs(void); void attachAllSenseDevicesFromSpecs(void);
void attachSenseDevice(
const std::shared_ptr<device::DeviceAttachmentSpec>& spec);
void detachSenseDevice(
const std::shared_ptr<device::DeviceAttachmentSpec>& spec);
void detachAllSenseDevices(void); void detachAllSenseDevices(void);
void detachAllSenseDevicesReq(void);
std::string stringifyLibs() const; std::string stringifyLibs() const;
@@ -61,6 +67,9 @@ private:
std::vector<std::shared_ptr<SenseApiLib>> senseApiLibs; std::vector<std::shared_ptr<SenseApiLib>> senseApiLibs;
class AttachSenseDeviceReq;
class DetachSenseDeviceReq;
public: public:
static std::optional<std::string> searchForLibInSmoSearchPaths( static std::optional<std::string> searchForLibInSmoSearchPaths(
const std::string& libraryPath); const std::string& libraryPath);
View File
+1 -1
View File
@@ -28,7 +28,7 @@ void shutdownSalmanoff(void)
{ {
std::cout << __func__ << ": Entered." << std::endl; std::cout << __func__ << ": Entered." << std::endl;
sense_api::SenseApiManager::getInstance().detachAllSenseDevices(); sense_api::SenseApiManager::getInstance().detachAllSenseDevicesReq();
sense_api::SenseApiManager::getInstance().finalizeAllSenseApiLibs(); sense_api::SenseApiManager::getInstance().finalizeAllSenseApiLibs();
std::cout << __func__ << ": Done." << std::endl; std::cout << __func__ << ": Done." << std::endl;
+145 -11
View File
@@ -255,10 +255,18 @@ void SenseApiManager::finalizeAllSenseApiLibs(void)
} }
} }
void SenseApiManager::attachSenseDevice(
const std::shared_ptr<device::DeviceAttachmentSpec>& spec
void SenseApiManager::attachSenseDeviceReq(
const std::shared_ptr<device::DeviceAttachmentSpec>& spec,
attachSenseDeviceReqCbFn cb
) )
{ {
/** FIXME:
* We should acquire a spinlock here to ensure that the device isn't added
* in the interim while the async op executes.
*/
auto libOpt = getSenseApiLibByApiName(spec->api); auto libOpt = getSenseApiLibByApiName(spec->api);
if (!libOpt) if (!libOpt)
{ {
@@ -273,13 +281,19 @@ void SenseApiManager::attachSenseDevice(
std::string(__func__) + ": attachDeviceReq() is NULL for library '" std::string(__func__) + ": attachDeviceReq() is NULL for library '"
+ lib.libraryPath + "'"); + lib.libraryPath + "'");
} }
lib.senseApiDesc.sal_mgmt_libOps.attachDeviceReq(spec); lib.senseApiDesc.sal_mgmt_libOps.attachDeviceReq(spec, cb);
} }
void SenseApiManager::detachSenseDevice( void SenseApiManager::detachSenseDeviceReq(
const std::shared_ptr<device::DeviceAttachmentSpec>& spec const std::shared_ptr<device::DeviceAttachmentSpec>& spec,
detachSenseDeviceReqCbFn cb
) )
{ {
/** FIXME:
* We should acquire a spinlock here to ensure that the device isn't removed
* in the interim while the async op executes.
*/
auto libOpt = getSenseApiLibByApiName(spec->api); auto libOpt = getSenseApiLibByApiName(spec->api);
if (!libOpt) if (!libOpt)
{ {
@@ -294,21 +308,141 @@ void SenseApiManager::detachSenseDevice(
std::string(__func__) + ": detachDeviceReq() is NULL for library '" std::string(__func__) + ": detachDeviceReq() is NULL for library '"
+ lib.libraryPath + "'"); + lib.libraryPath + "'");
} }
lib.senseApiDesc.sal_mgmt_libOps.detachDeviceReq(spec); lib.senseApiDesc.sal_mgmt_libOps.detachDeviceReq(spec, cb);
} }
void SenseApiManager::attachAllSenseDevicesFromSpecs(void) void SenseApiManager::attachAllSenseDevicesFromSpecs(void)
{ {
for (const auto& spec : device::DeviceManager::deviceAttachmentSpecs) { std::atomic<int> nTotal = device::DeviceManager::deviceAttachmentSpecs
attachSenseDevice(spec); .size();
std::atomic<int> nSucceeded = 0, nFailed = 0;
auto self = ComponentThread::getSelf();
for (const auto& spec : device::DeviceManager::deviceAttachmentSpecs)
{
try {
attachSenseDeviceReq(spec,
[spec, &nTotal, &nSucceeded, &nFailed, caller = self](bool success) -> void
{
if (!success)
{
++nFailed;
std::cerr << __func__ << ": Failed to attach device: "
<< spec->deviceIdentifier << "\n";
caller->getIoService().post([]{});
return;
} }
++nSucceeded;
if (nSucceeded.load() + nFailed.load() != nTotal.load()) {
return;
}
std::cout << __func__ << ": " << nSucceeded.load()
<< " devices attached, "
<< nFailed.load() << " devices failed\n";
caller->getIoService().post([]{});
});
} catch (const std::exception& e) {
std::cerr << __func__ << ": Exception: " << e.what() << "\n";
++nFailed;
}
}
/* Bridge the async op here. */
for (;;)
{
self->getIoService().run_one();
if ((nSucceeded.load() + nFailed.load() == nTotal.load())
|| self->getIoService().stopped())
{
break;
}
}
if (self->getIoService().stopped())
{
/* Return early because the io_service is stopped. */
return;
}
if (nTotal.load() != nSucceeded.load() + nFailed.load())
{
throw std::runtime_error(
std::string(__func__) + ": Failed to get through all devices");
}
std::cout << __func__ << ": " << nSucceeded.load() << "/" << nTotal.load()
<< " devices attached, "
<< nFailed.load() << "/" << nTotal.load() << " devices failed\n";
} }
void SenseApiManager::detachAllSenseDevices(void) void SenseApiManager::detachAllSenseDevicesReq(void)
{ {
for (const auto& spec : device::DeviceManager::deviceAttachmentSpecs) { std::atomic<int> nTotal = device::DeviceManager::deviceAttachmentSpecs
detachSenseDevice(spec); .size();
std::atomic<int> nSucceeded = 0, nFailed = 0;
auto self = ComponentThread::getSelf();
for (const auto& spec : device::DeviceManager::deviceAttachmentSpecs)
{
try {
detachSenseDeviceReq(spec,
[spec, &nTotal, &nSucceeded, &nFailed, caller = self](bool success) -> void
{
if (!success)
{
++nFailed;
std::cerr << __func__ << ": Failed to detach device: "
<< spec->deviceIdentifier << "\n";
caller->getIoService().post([]{});
return;
} }
++nSucceeded;
if (nSucceeded.load() + nFailed.load() != nTotal.load()) {
return;
}
std::cout << __func__ << ": " << nSucceeded.load()
<< " devices detached, "
<< nFailed.load() << " devices failed\n";
caller->getIoService().post([]{});
});
} catch (const std::exception& e) {
std::cerr << __func__ << ": Exception: " << e.what() << "\n";
++nFailed;
}
}
/* Bridge the async op here. */
for (;;)
{
self->getIoService().run_one();
if ((nSucceeded.load() + nFailed.load() == nTotal.load())
|| self->getIoService().stopped())
{
break;
}
}
if (self->getIoService().stopped())
{
/* Return early because the io_service is stopped. */
return;
}
if (nTotal.load() != nSucceeded.load() + nFailed.load())
{
throw std::runtime_error(
std::string(__func__) + ": Failed to get through all devices");
}
std::cout << __func__ << ": " << nSucceeded.load() << "/" << nTotal.load()
<< " devices detached, "
<< nFailed.load() << "/" << nTotal.load() << " devices failed\n";
} }
} // namespace sense_api } // namespace sense_api