Mind: Distribute and pin Mind threads to CPUs

At startup, Marionette will distribute and pin the Mind
threads across the available CPUs, warning if it couldn't
do so.
This commit is contained in:
2025-08-03 09:18:45 -04:00
parent 1deb92a416
commit 285b63b618
4 changed files with 97 additions and 0 deletions
+4
View File
@@ -9,6 +9,10 @@ target_include_directories(smocore PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/include
)
# Link against pthread for CPU affinity functions
find_package(Threads REQUIRED)
target_link_libraries(smocore PRIVATE Threads::Threads)
add_subdirectory(marionette)
add_subdirectory(deviceManager)
add_subdirectory(senseApis)
+71
View File
@@ -1,6 +1,9 @@
#include <iostream>
#include <componentThread.h>
#include <boost/asio.hpp>
#include <pthread.h>
#include <sched.h>
#include <unistd.h>
namespace smo {
@@ -382,4 +385,72 @@ void ComponentThread::exceptionInd(ComponentThread& thread)
});
}
// CPU management method implementations
int ComponentThread::getAvailableCpuCount()
{
int cpuCount = sysconf(_SC_NPROCESSORS_ONLN);
if (cpuCount <= 0)
{
throw std::runtime_error(std::string(__func__)
+ ": Failed to determine CPU count");
}
// Check if std::thread::hardware_concurrency() matches sysconf result
unsigned int hwConcurrency = std::thread::hardware_concurrency();
if (hwConcurrency != static_cast<unsigned int>(cpuCount))
{
std::cerr << "Warning: CPU count mismatch - "
"std::thread::hardware_concurrency() = "
<< hwConcurrency << ", sysconf(_SC_NPROCESSORS_ONLN) = "
<< cpuCount << "\n";
}
return cpuCount;
}
void ComponentThread::pinToCpu(int cpuId)
{
if (cpuId < 0)
{
throw std::runtime_error(std::string(__func__)
+ ": Invalid CPU ID: " + std::to_string(cpuId));
}
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpuId, &cpuset);
int result = pthread_setaffinity_np(
thread.native_handle(), sizeof(cpu_set_t), &cpuset);
if (result != 0)
{
throw std::runtime_error(std::string(__func__)
+ ": Failed to pin thread to CPU " + std::to_string(cpuId)
+ ": " + std::strerror(result));
}
pinnedCpuId = cpuId;
std::cout << name << ": Pinned to CPU " << cpuId << "\n";
}
void ComponentThread::distributeAndPinThreadsAcrossCpus()
{
int cpuCount = getAvailableCpuCount();
std::cout << "Available CPUs: " << cpuCount << "\n";
// Skip the marionette thread (MRNTT) as it's the control thread
int threadIndex = 0;
for (auto& thread : componentThreads)
{
if (thread->id == MRNTT) { continue; }
int targetCpu = threadIndex % cpuCount;
thread->pinToCpu(targetCpu);
++threadIndex;
}
std::cout << "Distributed " << (threadIndex) << " threads across "
<< cpuCount << " CPUs\n";
}
} // namespace smo
+10
View File
@@ -8,6 +8,9 @@
#include <stdexcept>
#include <queue>
#include <functional>
#include <pthread.h>
#include <sched.h>
#include <unistd.h>
namespace smo {
@@ -29,6 +32,7 @@ public:
ComponentThread(ThreadId id)
: id(id), name(getThreadName(id)),
work(io_service), pause_work(pause_io_service),
pinnedCpuId(-1),
thread(
((id == MRNTT) ? marionetteMain : main),
std::ref(*this))
@@ -87,6 +91,11 @@ public:
static void exitAllMindThreadsReq(std::function<void()> callback = nullptr);
static void joltAllMindThreadsReq(std::function<void()> callback = nullptr);
// CPU management methods
static int getAvailableCpuCount();
void pinToCpu(int cpuId);
static void distributeAndPinThreadsAcrossCpus();
enum class ThreadOp
{
START,
@@ -110,6 +119,7 @@ public:
boost::asio::io_service pause_io_service;
boost::asio::io_service::work pause_work;
std::atomic<bool> keepLooping;
int pinnedCpuId;
/* Always ensure that this is last so that the thread is spawned after
* everything else is constructed.
+12
View File
@@ -6,6 +6,18 @@ namespace smo {
void Mind::initialize()
{
/* Distribute threads across available CPUs */
try
{
ComponentThread::distributeAndPinThreadsAcrossCpus();
}
catch (const std::exception& e)
{
std::cerr << "Salmanoff couldn't distribute the mind threads across "
"the CPUs, so performance may be suboptimal.\n"
"Error: " << e.what() << "\n";
}
/* Jolt the threads, then start them */
ComponentThread::joltAllMindThreadsReq(
[]()