From 36c79f3a2e94b711f9705593e3e39f5d6566054e Mon Sep 17 00:00:00 2001 From: Hayodea Hakol Date: Mon, 28 Jul 2025 07:20:44 -0400 Subject: [PATCH] Threading: run all code in PThreads, add JOLTing & exception bubbling This commit significantly restructures the way we setup threading in SMO. We now don't use the CRT main() thread at all. It's only used as a mechanism to ensure that Marionette doesn't execute before global constructors have been executed. JOLTing: This is a simple ASIO post()ed message that makes each thread setup its thread-local data pointer to its own ComponentThread object, and then enter its main ASIO run() loop to await commands from Marionette. Exception bubbling: We now cleanly cause mind threads to report their exceptions to marionette, so that marionette can cleanly shut the mind down in an orderly fashion. Thread Control messaging API: A namespace of asynchronous messages to be post()ed to threads to control them. It enables us to pause and resume threads. This will be very useful for Marionette when we add the ability for it to suspend Salmanoff's running mind, inject new goals, inspect current state, etc; and then resume the mind's execution. --- CMakeLists.txt | 5 +- main.cpp | 125 ++----------- smocore/CMakeLists.txt | 2 +- smocore/componentThread.cpp | 238 ++++++++++++++++++++---- smocore/include/componentThread.h | 115 +++++++++--- smocore/include/marionette/marionette.h | 21 +++ smocore/marionette/CMakeLists.txt | 5 + smocore/marionette/marionette.cpp | 109 ++++++++++- smocore/marionette/salmanoff.cpp | 27 +++ 9 files changed, 469 insertions(+), 178 deletions(-) create mode 100644 smocore/marionette/salmanoff.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 9a103a5..bbc83e0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,7 +9,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON) # Build type if(NOT CMAKE_BUILD_TYPE) - set(CMAKE_BUILD_TYPE Release) + set(CMAKE_BUILD_TYPE Debug FORCE) endif() # Compiler flags @@ -57,7 +57,8 @@ add_subdirectory(wilzorApis) add_executable(salmanoff main.cpp) target_link_libraries(salmanoff smocore - deviceManager + marionette + deviceManager senseApis ${Boost_LIBRARIES} ${DL_LIBRARY} diff --git a/main.cpp b/main.cpp index 64fe0ff..f8f404f 100644 --- a/main.cpp +++ b/main.cpp @@ -1,111 +1,26 @@ #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "componentThread.h" +#include +#include -namespace smo { - -static int initializeSalmanoff(int argc, char **argv, char **envp); - -} // namespace smo - -int main(int argc, char **argv, char **envp) +int main(int argc, char *argv[], char *envp[]) { - try { - std::cout << __func__ << ": Entering main()" << std::endl; - boost::asio::io_service mrntLoop; - boost::asio::io_service::work work(mrntLoop); - - // Validate thread IDs - smo::ComponentThread::validateThreadIds(); - - // Post initializeSalmanoff to mrntLoop - mrntLoop.post([&]() + /* We don't do anything inside of main() + * Main merely waits for the marionette thread to exit. + */ + std::cout << "CRT:" << __func__ << ": about to JOLT Mrntt with cmdline args" + << '\n'; + smo::mrntt::mrntt->getIoService().post( + [argc, argv, envp]() { - int ret = smo::initializeSalmanoff(argc, argv, envp); - if (ret != 0) - { - std::cerr << "Initialization failed with code: " - << ret << std::endl; - std::exit(ret); - } - }); + std::cout << "Mrntt:" << __func__ << ":JOLTED: setting cmdline args" + << '\n'; + smo::CrtCommandLineArgs::set(argc, argv, envp); + smo::mrntt::mrntt->getIoService().stop(); + } + ); - mrntLoop.run(); - } - catch (const std::exception& e) - { - std::cerr << __func__ << ": Exception occurred: " << e.what() - << std::endl; - return EXIT_FAILURE; - } - catch (...) - { - std::cerr << __func__ << ": Unknown exception occurred" << std::endl; - return EXIT_FAILURE; - } - - std::cout << __func__ << ": Exiting normally" << std::endl; - return 0; + smo::mrntt::mrntt->thread.join(); + std::cout << "CRT:" << __func__ << ": Mrntt exited with code '" + << smo::mrntt::exitCode << "'\n"; + return smo::mrntt::exitCode; } - -namespace smo { - -static int initializeSalmanoff(int argc, char **argv, char **envp) -{ - std::cout << __func__ << ": Entering" << std::endl; - - using namespace smo; - OptionParser &options = OptionParser::getOptions(); - smo::Mind mind; - - std::cout << PACKAGE_NAME << " " << PACKAGE_VERSION << std::endl; - - try { - options.parseArguments(argc, argv, envp); - std::cout << options.stringifyOptions() << std::endl; - } - catch (const std::invalid_argument& e) - { - std::cerr << __func__ << ": Exception occurred: " << e.what() << '\n' - << options.getUsage() << '\n'; - return EXIT_FAILURE; - } - - if (options.printUsage) - { - std::cout << options.getUsage() << std::endl; - return EXIT_SUCCESS; - } - - device::DeviceManager::getInstance().collateAllDeviceSpecs(); - device::DeviceManager::getInstance().parseAllDeviceSpecs(); - std::cout << device::DeviceManager::stringifyDeviceSpecs() << std::endl; - sense_api::SenseApiManager::getInstance().loadAllSenseApiLibsFromOptions(); - std::cout << sense_api::SenseApiManager::getInstance().stringifyLibs() - << std::endl; -std::cerr << "About to initializeAllSenseApiLibs" << std::endl; - sense_api::SenseApiManager::getInstance().initializeAllSenseApiLibs(); -std::cerr << "About to attachAllSenseDevicesFromSpecs" << std::endl; - sense_api::SenseApiManager::getInstance().attachAllSenseDevicesFromSpecs(); -std::cerr << "Done attachAllSenseDevicesFromSpecs" << std::endl; - - /* Start the threads */ - for (const auto& [id, componentThread] - : smo::ComponentThread::componentThreads) { - smo::ComponentThread::signalThread(id); - } - - std::cout << __func__ << ": Exiting" << std::endl; - return 0; -} - -} // namespace smo diff --git a/smocore/CMakeLists.txt b/smocore/CMakeLists.txt index 47403e5..297d0e2 100644 --- a/smocore/CMakeLists.txt +++ b/smocore/CMakeLists.txt @@ -9,6 +9,6 @@ target_include_directories(smocore PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include ) +add_subdirectory(marionette) add_subdirectory(deviceManager) add_subdirectory(senseApis) -add_subdirectory(marionette) diff --git a/smocore/componentThread.cpp b/smocore/componentThread.cpp index 38fdc93..dc87253 100644 --- a/smocore/componentThread.cpp +++ b/smocore/componentThread.cpp @@ -1,8 +1,25 @@ #include #include +#include namespace smo { +thread_local std::shared_ptr thisComponentThread; + +const std::string ComponentThread::threadNames[N_ITEMS] = +{ + "mrntt", + "director", + "simulator", + "subconscious", + "body", + "world" +}; + +namespace mrntt { +std::shared_ptr mrntt = + std::make_shared(ComponentThread::MRNTT); +} namespace director { /* The director is the seat of volition in Salmanoff. It receives sensor * events from the body and world, and uses them to direct its implexors @@ -10,89 +27,230 @@ namespace director { * and correlation with intrins, in order to form new attrimotions and * menties. */ -ComponentThread director; +std::shared_ptr director = + std::make_shared(ComponentThread::DIRECTOR); } namespace simulator { /* The canvas is the simulation engine in Salmanoff. It receives menties and * simulates them in accordance with the instructions from director. It then * re-renders them into perception for director to get feedback. */ -ComponentThread canvas; +std::shared_ptr canvas = + std::make_shared(ComponentThread::SIMULATOR); } namespace subconscious { /* The subconscious is the seat of memory in Salmanoff. It receives menties * from director and stores them in memory for later recall. */ -ComponentThread subconscious; +std::shared_ptr subconscious = + std::make_shared(ComponentThread::SUBCONSCIOUS); } namespace body { /* The body is a thread that polls, processes, and sends interoceptive sensor * events to director. It enables these events to occur asynchronously, * indepdendent any actions that the other threads are taking. */ -ComponentThread body; +std::shared_ptr body = + std::make_shared(ComponentThread::BODY); } namespace world { /* The world performs the same functions as the body, but for extrospective * sensor events. */ -ComponentThread world; +std::shared_ptr world = + std::make_shared(ComponentThread::WORLD); } - -std::unordered_map +std::array, ComponentThread::N_ITEMS> ComponentThread::componentThreads = { - {director::director.thread.get_id(), director::director}, - {simulator::canvas.thread.get_id(), simulator::canvas}, - {subconscious::subconscious.thread.get_id(), subconscious::subconscious}, - {body::body.thread.get_id(), body::body}, - {world::world.thread.get_id(), world::world} + mrntt::mrntt, + director::director, + simulator::canvas, + subconscious::subconscious, + body::body, + world::world }; -void ComponentThread::signalThread(std::thread::id id) +void ComponentThread::initializeTls(void) { - auto it = componentThreads.find(id); - if (it == componentThreads.end()) + thisComponentThread = shared_from_this(); +} + +const std::shared_ptr ComponentThread::getSelf(void) +{ + if (!thisComponentThread) { throw std::runtime_error(std::string(__func__) - + ": Thread ID not found in componentThreads map"); + + ": TLS not initialized"); } - ComponentThread& componentThread = it->second; - { - std::lock_guard lock(componentThread.startupSync.mutex); - componentThread.startupSync.ready = true; - } - componentThread.startupSync.cv.notify_one(); + return thisComponentThread; } void ComponentThread::main(ComponentThread& self) { - // We sleep on spawn until the main thread tells us to continue. - { - std::unique_lock lock(self.startupSync.mutex); - self.startupSync.cv.wait(lock, [&self]() { - return self.startupSync.ready; - }); - } - - std::cout << __func__ << ": Starting event loop." << std::endl; + std::cout << self.name << ":" << __func__ << ": Waiting for JOLT" <<"\n"; self.getIoService().run(); - std::cout << __func__ << ": Exiting." << std::endl; -} + self.initializeTls(); -void ComponentThread::validateThreadIds(void) -{ - for (const auto& [id, componentThread] : componentThreads) + std::cout << self.name << ":" << __func__ << ": Entering event loop" <<"\n"; + + for (self.keepLooping = true; self.keepLooping;) { - // std::thread::id() is usable as an invalid ID. - if (id == std::thread::id()) + try { + self.getIoService().reset(); + self.getIoService().run(); + } + catch (const std::exception& e) { - throw std::runtime_error( - std::string(__func__) + ": Invalid Thread ID."); + std::cerr << self.name << ":" << __func__ + << ": Exception occurred: " << e.what() << "\n"; + + mrntt::mrntt->exceptionInd(self); + } + catch (...) + { + std::cerr << self.name << ":" << __func__ + << ": Unknown exception occurred" << "\n"; + + mrntt::mrntt->exceptionInd(self); } } + + std::cout << self.name << ":" << __func__ << ": Exiting event loop" << "\n"; +} + +// Thread management method implementations +void ComponentThread::startThreadReq(std::function callback) +{ + this->getIoService().post([this, caller = getSelf(), callback]() + { + std::cout << "Thread '" << name << "': handling startThread." << "\n"; + + // Execute private setup sequence here + // This is where each thread would implement its specific initialization + + if (callback) { + caller->getIoService().post(callback); + } + }); +} + +void ComponentThread::cleanup(void) +{ + this->keepLooping = false; +} + +void ComponentThread::exitThreadReq(std::function callback) +{ + // Post to the main io_service + this->getIoService().post([this, caller = getSelf(), callback]() + { + std::cout << "Thread '" << name << "': handling exitThread " + "(main queue)." << std::endl; + + cleanup(); + + // Stop the main io_service to exit the thread + io_service.stop(); + if (callback) { caller->getIoService().post(callback); } + }); + + // Also post to the pause io_service + this->pause_io_service.post([this, caller = getSelf(), callback]() + { + std::cout << "Thread '" << name << "': handling exitThread " + "(pause queue)." << std::endl; + + cleanup(); + + // Stop both io_services to exit the thread + pause_io_service.stop(); + io_service.stop(); + if (callback) { caller->getIoService().post(callback); } + }); +} + +void ComponentThread::pauseThreadReq(std::function callback) +{ + this->getIoService().post([this, caller = getSelf(), callback]() + { + std::cout << "Thread '" << name << "': handling pauseThread." + << std::endl; + + if (callback) { + caller->getIoService().post(callback); + } + + // Reset the pause io_service before running to ensure it can run again + pause_io_service.reset(); + // Run the pause io_service to block this thread + pause_io_service.run(); + }); +} + +void ComponentThread::resumeThreadReq(std::function callback) +{ + // Post to the pause_io_service to unblock the paused thread + pause_io_service.post([this, caller = getSelf(), callback]() + { + std::cout << "Thread '" << name << "': handling resumeThread." + << std::endl; + + if (callback) { + caller->getIoService().post(callback); + } + + // Stop the pause_io_service to unblock the thread + pause_io_service.stop(); + }); +} + +static int threadsKilledCount; + +void ComponentThread::exceptionInd(ComponentThread& thread) +{ + if (this->id != MRNTT) + { + throw std::runtime_error(std::string(__func__) + + ": invoked on non-mrntt thread " + thread.name); + } + + // Post the exception to the mrntt thread. + this->getIoService().post( + [&thread]() + { + std::cerr << "Mrntt: Exception occurred: in thread " + << thread.name << ". Killing Salmanoff." << "\n"; + + threadsKilledCount = 0; + for (auto &currThread : ComponentThread::componentThreads) + { + if (currThread->id == MRNTT) + { continue; } + + currThread->exitThreadReq( + []() + { + ++threadsKilledCount; + if (threadsKilledCount < ComponentThread::N_ITEMS - 1) + { return; } + + for (auto &currThreadJ + : ComponentThread::componentThreads) + { + if (currThreadJ->id == MRNTT) + { continue; } + + currThreadJ->thread.join(); + } + mrntt::mrntt->getIoService().stop(); + } + ); + } + } + ); } } // namespace smo diff --git a/smocore/include/componentThread.h b/smocore/include/componentThread.h index f141433..8bcf43d 100644 --- a/smocore/include/componentThread.h +++ b/smocore/include/componentThread.h @@ -1,67 +1,130 @@ #ifndef COMPONENT_THREAD_H #define COMPONENT_THREAD_H +#include #include #include -#include #include #include +#include +#include namespace smo { class ComponentThread +: public std::enable_shared_from_this { public: - ComponentThread() - : work(io_service), startupSync(), - thread(ComponentThread::main, std::ref(*this)) + enum ThreadId + { + MRNTT = 0, + DIRECTOR, + SIMULATOR, + SUBCONSCIOUS, + BODY, + WORLD, + N_ITEMS + }; + + ComponentThread(ThreadId id) + : id(id), name(getThreadName(id)), + work(io_service), pause_work(pause_io_service), + thread( + ((id == MRNTT) ? marionetteMain : main), + std::ref(*this)) {} + void cleanup(void); + boost::asio::io_service& getIoService(void) { return io_service; } - static boost::asio::io_service& getEventLoop( - std::thread::id id = std::this_thread::get_id()) + void initializeTls(void); + const std::shared_ptr getSelf(void); + + static std::shared_ptr getComponentThread( + ThreadId id = N_ITEMS) { - auto it = componentThreads.find(id); - if (it == componentThreads.end()) + if (id < 0 || id > N_ITEMS) { throw std::runtime_error(std::string(__func__) - + ": Thread ID not found in componentThreads map"); + + ": Invalid thread ID"); } - - return it->second.getIoService(); + return componentThreads[id]; } - static void main(ComponentThread &self); - static void signalThread(std::thread::id id); - static void validateThreadIds(void); + // Overload: search by name + static std::shared_ptr getComponentThread( + const std::string& name) + { + for (auto& thread : componentThreads) { + if (thread->name == name) { return thread; } + } + throw std::runtime_error(std::string(__func__) + + ": Thread name not found in componentThreads map"); + } + + static boost::asio::io_service& getEventLoop( + ThreadId id = MRNTT) + { + return getComponentThread(id)->getIoService(); + } + + + typedef void (mainFn)(ComponentThread &self); + static mainFn main, marionetteMain; + + // Thread management methods + void startThreadReq(std::function callback = nullptr); + void exitThreadReq(std::function callback = nullptr); + void pauseThreadReq(std::function callback = nullptr); + void resumeThreadReq(std::function callback = nullptr); + void exceptionInd(ComponentThread& thread); public: + ThreadId id; + std::string name; boost::asio::io_service io_service; boost::asio::io_service::work work; - struct StartupSync { - std::mutex mutex; - std::condition_variable cv; - bool ready; - - StartupSync() : ready(false) {} - } startupSync; + boost::asio::io_service pause_io_service; + boost::asio::io_service::work pause_work; + std::atomic keepLooping; /* Always ensure that this is last so that the thread is spawned after - * everything else. + * everything else is constructed. */ std::thread thread; - static std::unordered_map componentThreads; + static std::array, ComponentThread::N_ITEMS> + componentThreads; + + static const std::string threadNames[ComponentThread::N_ITEMS]; + static const std::string getThreadName(ThreadId id) + { + if (id < 0 || id >= ComponentThread::N_ITEMS) + { + throw std::runtime_error(std::string(__func__) + + ": Invalid thread ID"); + } + return threadNames[id]; + } }; +namespace mrntt { +extern std::shared_ptr mrntt; +} namespace director { -extern ComponentThread director; +extern std::shared_ptr director; } namespace simulator { -extern ComponentThread canvas; +extern std::shared_ptr canvas; } namespace subconscious { -extern ComponentThread subconscious; +extern std::shared_ptr subconscious; +} +namespace body { +extern std::shared_ptr body; +} +namespace world { +extern std::shared_ptr world; } } // namespace smo diff --git a/smocore/include/marionette/marionette.h b/smocore/include/marionette/marionette.h index 638c18d..ec0c1e3 100644 --- a/smocore/include/marionette/marionette.h +++ b/smocore/include/marionette/marionette.h @@ -2,13 +2,34 @@ #define _MARIONETTE_H #include +#include +namespace smo { namespace mrntt { +extern std::atomic exitCode; + class Marionette { }; } // namespace mrntt +struct CrtCommandLineArgs +{ + CrtCommandLineArgs(int argc, char *argv[], char *envp[]) + : argc(argc), argv(argv), envp(envp) + {} + + int argc; + char **argv; + char **envp; + + static void set(int argc, char *argv[], char *envp[]); +}; + +int initializeSalmanoff(void); + +} // namespace smo + #endif // _MARIONETTE_H diff --git a/smocore/marionette/CMakeLists.txt b/smocore/marionette/CMakeLists.txt index da7d48a..2fd28f2 100644 --- a/smocore/marionette/CMakeLists.txt +++ b/smocore/marionette/CMakeLists.txt @@ -1,5 +1,10 @@ add_library(marionette STATIC marionette.cpp + salmanoff.cpp +) + +target_link_libraries(marionette + smocore ) target_include_directories(marionette PUBLIC diff --git a/smocore/marionette/marionette.cpp b/smocore/marionette/marionette.cpp index 5e79951..40a69ea 100644 --- a/smocore/marionette/marionette.cpp +++ b/smocore/marionette/marionette.cpp @@ -1,9 +1,110 @@ +#include +#include +#include +#include +#include +#include +#include -namespace mrntt { +namespace smo { -int main(int argc, char *argv[]) +CrtCommandLineArgs crtCommandLineArgs(0, nullptr, nullptr); + +void CrtCommandLineArgs::set(int argc, char *argv[], char *envp[]) { - return 0; + crtCommandLineArgs = CrtCommandLineArgs(argc, argv, envp); } -} // namespace mrntt \ No newline at end of file +namespace mrntt { +std::atomic exitCode; +} + +void ComponentThread::marionetteMain(ComponentThread& self) +{ + // Wait for CRT's main() to post us the command line args. + std::cout << __func__ << ": Waiting for command line JOLT" << std::endl; + self.getIoService().run(); + self.initializeTls(); + mrntt::exitCode = 0; + + try { + OptionParser &options = OptionParser::getOptions(); + + std::cout << __func__ << ": " << PACKAGE_NAME << " " << PACKAGE_VERSION + << std::endl; + + try { + options.parseArguments( + crtCommandLineArgs.argc, crtCommandLineArgs.argv, + crtCommandLineArgs.envp); + + std::cout << __func__ << ": " << options.stringifyOptions() + << std::endl; + } + catch (const std::invalid_argument& e) + { + std::cerr << __func__ << ": Exception occurred: " << e.what() + << '\n' << options.getUsage() << '\n'; + + mrntt::exitCode = EXIT_FAILURE; + return; + } + + if (options.printUsage) + { + std::cout << __func__ << ": " << options.getUsage() << std::endl; + mrntt::exitCode = EXIT_SUCCESS; + return; + } + + int ret = smo::initializeSalmanoff(); + if (ret != 0) + { + std::cerr << __func__ << ": Initialization failed with code: " + << ret << std::endl; + mrntt::exitCode = ret; + return; + } + + /* Start the threads */ + for (auto& componentThread : smo::ComponentThread::componentThreads) + { + // Post startThread() to the event loop of all threads except MRNTT. + if (componentThread->id == ComponentThread::MRNTT) { continue; } + + // JOLT the thread. + componentThread->getIoService().post([componentThread]() + { componentThread->getIoService().stop(); } + ); + + // Now tell it to execute its initialization sequence. + componentThread->startThreadReq(); + } + + body::body->getIoService().post([]{ + throw std::runtime_error("test exception"); + }); + + std::cout << __func__ << ": Entering event loop" << "\n"; + self.getIoService().reset(); + self.getIoService().run(); + } + catch (const std::exception& e) + { + std::cerr << __func__ << ": Exception occurred: " << e.what() + << std::endl; + mrntt::exitCode = EXIT_FAILURE; + return; + } + catch (...) + { + std::cerr << __func__ << ": Unknown exception occurred" << std::endl; + mrntt::exitCode = EXIT_FAILURE; + return; + } + + std::cout << __func__ << ": Exiting normally" << std::endl; +} + + +} // namespace smo diff --git a/smocore/marionette/salmanoff.cpp b/smocore/marionette/salmanoff.cpp new file mode 100644 index 0000000..9a74126 --- /dev/null +++ b/smocore/marionette/salmanoff.cpp @@ -0,0 +1,27 @@ +#include +#include +#include + +namespace smo { + +int initializeSalmanoff(void) +{ + std::cout << __func__ << ": Entered." << std::endl; + + device::DeviceManager::getInstance().collateAllDeviceSpecs(); + device::DeviceManager::getInstance().parseAllDeviceSpecs(); + std::cout << device::DeviceManager::stringifyDeviceSpecs() << std::endl; + sense_api::SenseApiManager::getInstance().loadAllSenseApiLibsFromOptions(); + std::cout << sense_api::SenseApiManager::getInstance().stringifyLibs() + << std::endl; +std::cerr << "About to initializeAllSenseApiLibs" << std::endl; + sense_api::SenseApiManager::getInstance().initializeAllSenseApiLibs(); +std::cerr << "About to attachAllSenseDevicesFromSpecs" << std::endl; + sense_api::SenseApiManager::getInstance().attachAllSenseDevicesFromSpecs(); +std::cerr << "Done attachAllSenseDevicesFromSpecs" << std::endl; + + std::cout << __func__ << ": Done." << std::endl; + return 0; +} + +} // namespace smo