diff --git a/smocore/CMakeLists.txt b/smocore/CMakeLists.txt index 297d0e2..c4c7358 100644 --- a/smocore/CMakeLists.txt +++ b/smocore/CMakeLists.txt @@ -9,6 +9,10 @@ target_include_directories(smocore PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include ) +# Link against pthread for CPU affinity functions +find_package(Threads REQUIRED) +target_link_libraries(smocore PRIVATE Threads::Threads) + add_subdirectory(marionette) add_subdirectory(deviceManager) add_subdirectory(senseApis) diff --git a/smocore/componentThread.cpp b/smocore/componentThread.cpp index d684675..9b5c8f3 100644 --- a/smocore/componentThread.cpp +++ b/smocore/componentThread.cpp @@ -1,6 +1,9 @@ #include #include #include +#include +#include +#include namespace smo { @@ -382,4 +385,72 @@ void ComponentThread::exceptionInd(ComponentThread& thread) }); } +// CPU management method implementations +int ComponentThread::getAvailableCpuCount() +{ + int cpuCount = sysconf(_SC_NPROCESSORS_ONLN); + if (cpuCount <= 0) + { + throw std::runtime_error(std::string(__func__) + + ": Failed to determine CPU count"); + } + + // Check if std::thread::hardware_concurrency() matches sysconf result + unsigned int hwConcurrency = std::thread::hardware_concurrency(); + if (hwConcurrency != static_cast(cpuCount)) + { + std::cerr << "Warning: CPU count mismatch - " + "std::thread::hardware_concurrency() = " + << hwConcurrency << ", sysconf(_SC_NPROCESSORS_ONLN) = " + << cpuCount << "\n"; + } + + return cpuCount; +} + +void ComponentThread::pinToCpu(int cpuId) +{ + if (cpuId < 0) + { + throw std::runtime_error(std::string(__func__) + + ": Invalid CPU ID: " + std::to_string(cpuId)); + } + + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(cpuId, &cpuset); + + int result = pthread_setaffinity_np( + thread.native_handle(), sizeof(cpu_set_t), &cpuset); + if (result != 0) + { + throw std::runtime_error(std::string(__func__) + + ": Failed to pin thread to CPU " + std::to_string(cpuId) + + ": " + std::strerror(result)); + } + + pinnedCpuId = cpuId; + std::cout << name << ": Pinned to CPU " << cpuId << "\n"; +} + +void ComponentThread::distributeAndPinThreadsAcrossCpus() +{ + int cpuCount = getAvailableCpuCount(); + std::cout << "Available CPUs: " << cpuCount << "\n"; + + // Skip the marionette thread (MRNTT) as it's the control thread + int threadIndex = 0; + for (auto& thread : componentThreads) + { + if (thread->id == MRNTT) { continue; } + + int targetCpu = threadIndex % cpuCount; + thread->pinToCpu(targetCpu); + ++threadIndex; + } + + std::cout << "Distributed " << (threadIndex) << " threads across " + << cpuCount << " CPUs\n"; +} + } // namespace smo diff --git a/smocore/include/componentThread.h b/smocore/include/componentThread.h index e676334..0e0ca19 100644 --- a/smocore/include/componentThread.h +++ b/smocore/include/componentThread.h @@ -8,6 +8,9 @@ #include #include #include +#include +#include +#include namespace smo { @@ -29,6 +32,7 @@ public: ComponentThread(ThreadId id) : id(id), name(getThreadName(id)), work(io_service), pause_work(pause_io_service), + pinnedCpuId(-1), thread( ((id == MRNTT) ? marionetteMain : main), std::ref(*this)) @@ -87,6 +91,11 @@ public: static void exitAllMindThreadsReq(std::function callback = nullptr); static void joltAllMindThreadsReq(std::function callback = nullptr); + // CPU management methods + static int getAvailableCpuCount(); + void pinToCpu(int cpuId); + static void distributeAndPinThreadsAcrossCpus(); + enum class ThreadOp { START, @@ -110,6 +119,7 @@ public: boost::asio::io_service pause_io_service; boost::asio::io_service::work pause_work; std::atomic keepLooping; + int pinnedCpuId; /* Always ensure that this is last so that the thread is spawned after * everything else is constructed. diff --git a/smocore/mind.cpp b/smocore/mind.cpp index da1eca9..4bf686b 100644 --- a/smocore/mind.cpp +++ b/smocore/mind.cpp @@ -6,6 +6,18 @@ namespace smo { void Mind::initialize() { + /* Distribute threads across available CPUs */ + try + { + ComponentThread::distributeAndPinThreadsAcrossCpus(); + } + catch (const std::exception& e) + { + std::cerr << "Salmanoff couldn't distribute the mind threads across " + "the CPUs, so performance may be suboptimal.\n" + "Error: " << e.what() << "\n"; + } + /* Jolt the threads, then start them */ ComponentThread::joltAllMindThreadsReq( []()