diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3bf77d2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +build-test +b-* +build +b diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..af04caf --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,171 @@ +cmake_minimum_required(VERSION 3.16) +project(libspinscale VERSION 0.1.0 LANGUAGES CXX) + +# Set C++ standard +set(CMAKE_CXX_STANDARD 20) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +# Build type +if(NOT CMAKE_BUILD_TYPE) + set(CMAKE_BUILD_TYPE Debug FORCE) +endif() + +# Compiler flags +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic") + +# Debug options - allow parent to override when used as subdirectory +# option() will respect existing cache values, so parent can set them before add_subdirectory() +option(ENABLE_DEBUG_LOCKS "Enable debug features for locking system" OFF) +option(ENABLE_DEBUG_TRACE_CALLABLES + "Enable callable tracing for debugging boost::asio post operations" OFF) + +# Qutex deadlock detection configuration +if(NOT DEFINED DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS) + set(DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS 500 CACHE STRING + "Timeout in milliseconds for deadlock detection in qutex system") +endif() + +if(ENABLE_DEBUG_LOCKS) + # Validate the timeout value + if(NOT DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS OR DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS STREQUAL "") + message(FATAL_ERROR "DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS must be a positive integer > 0") + endif() + + # Convert to integer and validate + math(EXPR timeout_int "${DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS}") + if(timeout_int LESS_EQUAL 0) + message(FATAL_ERROR "DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS must be a positive integer > 0") + endif() +endif() + +# Set config variables for config.h +if(ENABLE_DEBUG_LOCKS) + set(CONFIG_ENABLE_DEBUG_LOCKS TRUE) +endif() + +if(ENABLE_DEBUG_TRACE_CALLABLES) + set(CONFIG_DEBUG_TRACE_CALLABLES TRUE) + # Suppress frame-address warnings when using __builtin_return_address() + # with values above 0 (See callableTracer.h). + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-frame-address") +endif() + +set(CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS ${DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS}) + +# Configure config.h +configure_file( + ${CMAKE_CURRENT_SOURCE_DIR}/include/config.h.in + ${CMAKE_CURRENT_BINARY_DIR}/include/config.h + @ONLY +) + +# Find dependencies +# Tell CMake we're linking against the shared library (not header-only) +set(Boost_USE_STATIC_LIBS OFF) +set(Boost_USE_HEADER_ONLY OFF) +find_package(Boost REQUIRED COMPONENTS system log) +# Define BOOST_ALL_DYN_LINK project-wide to ensure all Boost libraries use dynamic linking +add_compile_definitions(BOOST_ALL_DYN_LINK) + +find_package(Threads REQUIRED) + +# Create the library +add_library(spinscale SHARED + src/qutex.cpp + src/lockerAndInvokerBase.cpp + src/componentThread.cpp + src/component.cpp + src/puppetApplication.cpp +) + +# Conditionally add qutexAcquisitionHistoryTracker.cpp only when debug locks +# are enabled, since the tracker is only referenced under CONFIG_ENABLE_DEBUG_LOCKS. +if(ENABLE_DEBUG_LOCKS) + target_sources(spinscale PRIVATE src/qutexAcquisitionHistoryTracker.cpp) +endif() + +# Set compile features +target_compile_features(spinscale PUBLIC cxx_std_20) + +# Include directories +target_include_directories(spinscale PUBLIC + $ + $ + $ +) + +# Link against required dependencies for shared library +# Boost::system is PUBLIC because componentThread.h exposes Boost.Asio types +target_link_libraries(spinscale PUBLIC + Threads::Threads + Boost::system + Boost::log +) + +# Verify Boost dynamic dependencies after build +# Prefer parent project's script when used as subdirectory, fall back to our own for standalone builds +set(VERIFY_SCRIPT "") +if(EXISTS ${CMAKE_SOURCE_DIR}/cmake/VerifyBoostDynamic.cmake) + set(VERIFY_SCRIPT ${CMAKE_SOURCE_DIR}/cmake/VerifyBoostDynamic.cmake) +elseif(EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/cmake/VerifyBoostDynamic.cmake) + set(VERIFY_SCRIPT ${CMAKE_CURRENT_SOURCE_DIR}/cmake/VerifyBoostDynamic.cmake) +endif() + +if(VERIFY_SCRIPT) + add_custom_command(TARGET spinscale POST_BUILD + COMMAND ${CMAKE_COMMAND} -DVERIFY_FILE="$" + -P ${VERIFY_SCRIPT} + COMMENT "Verifying Boost dynamic dependencies for spinscale" + ) +else() + message(WARNING "VerifyBoostDynamic.cmake not found - cannot verify Boost dependencies for spinscale") +endif() + +# Install rules +install(TARGETS spinscale + EXPORT spinscaleTargets + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib + RUNTIME DESTINATION bin +) + +install(DIRECTORY include/spinscale + DESTINATION include + FILES_MATCHING PATTERN "*.h" +) + +install(FILES include/boostAsioLinkageFix.h + DESTINATION include +) + +install(FILES ${CMAKE_CURRENT_BINARY_DIR}/include/config.h + DESTINATION include +) + +# Install CMake config files for find_package() support +install(EXPORT spinscaleTargets + FILE spinscaleTargets.cmake + NAMESPACE spinscale:: + DESTINATION lib/cmake/spinscale +) + +# Create config file for find_package() +include(CMakePackageConfigHelpers) + +configure_package_config_file( + ${CMAKE_CURRENT_SOURCE_DIR}/cmake/spinscaleConfig.cmake.in + ${CMAKE_CURRENT_BINARY_DIR}/spinscaleConfig.cmake + INSTALL_DESTINATION lib/cmake/spinscale +) + +write_basic_package_version_file( + ${CMAKE_CURRENT_BINARY_DIR}/spinscaleConfigVersion.cmake + VERSION ${PROJECT_VERSION} + COMPATIBILITY SameMajorVersion +) + +install(FILES + ${CMAKE_CURRENT_BINARY_DIR}/spinscaleConfig.cmake + ${CMAKE_CURRENT_BINARY_DIR}/spinscaleConfigVersion.cmake + DESTINATION lib/cmake/spinscale +) diff --git a/cmake/VerifyBoostDynamic.cmake b/cmake/VerifyBoostDynamic.cmake new file mode 100644 index 0000000..fd71bf7 --- /dev/null +++ b/cmake/VerifyBoostDynamic.cmake @@ -0,0 +1,63 @@ +# SMO_VERIFY_BOOST_DYNAMIC_DEPENDENCY +# Verifies that a target file (executable or shared library) has Boost libraries +# in its dynamic dependency list via ldd. +# +# Usage as function: +# SMO_VERIFY_BOOST_DYNAMIC_DEPENDENCY() +# +# Usage as script (with -P): +# cmake -DVERIFY_FILE= -P VerifyBoostDynamic.cmake +# +# This function/script: +# 1. Runs ldd on the target file +# 2. Checks for boost libraries in the dependency list +# 3. Reports success or failure with appropriate messages +# +function(SMO_VERIFY_BOOST_DYNAMIC_DEPENDENCY target_file) + _verify_boost_dynamic_dependency("${target_file}") +endfunction() + +# Internal implementation that can be called from script mode or function mode +function(_verify_boost_dynamic_dependency target_file) + if(NOT EXISTS "${target_file}") + message(WARNING "SMO_VERIFY_BOOST_DYNAMIC_DEPENDENCY: Target file '${target_file}' does not exist") + return() + endif() + + # Run ldd on the target file + execute_process( + COMMAND ldd "${target_file}" + OUTPUT_VARIABLE ldd_output + ERROR_VARIABLE ldd_error + RESULT_VARIABLE ldd_result + ) + + if(ldd_result) + message(WARNING "SMO_VERIFY_BOOST_DYNAMIC_DEPENDENCY: Failed to run ldd on '${target_file}': ${ldd_error}") + return() + endif() + + # Check if output contains boost libraries + string(TOLOWER "${ldd_output}" ldd_output_lower) + string(FIND "${ldd_output_lower}" "libboost" boost_found) + + if(boost_found EQUAL -1) + message(STATUS "SMO_VERIFY_BOOST_DYNAMIC_DEPENDENCY: WARNING - No Boost libraries found in dependencies of '${target_file}'") + message(STATUS "ldd output:") + message(STATUS "${ldd_output}") + else() + # Extract boost library lines + string(REGEX MATCHALL "libboost[^\n]*" boost_libs "${ldd_output}") + message(STATUS "SMO_VERIFY_BOOST_DYNAMIC_DEPENDENCY: SUCCESS - Boost libraries found in '${target_file}':") + foreach(boost_lib ${boost_libs}) + string(STRIP "${boost_lib}" boost_lib_stripped) + message(STATUS " ${boost_lib_stripped}") + endforeach() + endif() +endfunction() + +# Script mode: if VERIFY_FILE is defined, run the verification +if(VERIFY_FILE) + _verify_boost_dynamic_dependency("${VERIFY_FILE}") +endif() + diff --git a/cmake/spinscaleConfig.cmake.in b/cmake/spinscaleConfig.cmake.in new file mode 100644 index 0000000..42cde11 --- /dev/null +++ b/cmake/spinscaleConfig.cmake.in @@ -0,0 +1,5 @@ +@PACKAGE_INIT@ + +include("${CMAKE_CURRENT_LIST_DIR}/spinscaleTargets.cmake") + +check_required_components(spinscale) diff --git a/include/boostAsioLinkageFix.h b/include/boostAsioLinkageFix.h new file mode 100644 index 0000000..8756b5c --- /dev/null +++ b/include/boostAsioLinkageFix.h @@ -0,0 +1,25 @@ +#ifndef BOOST_ASIO_LINKAGE_FIX_H +#define BOOST_ASIO_LINKAGE_FIX_H + +#include +#include +#include + +namespace boost { +namespace asio { +namespace detail { + +/** EXPLANATION: + * Extern declaration of the template instantiation + * This ensures that the .o translation units don't have their + * own copies of `call_stack<>::top_` defined in them. + */ +extern template +tss_ptr::context> +call_stack::top_; + +} // namespace detail +} // namespace asio +} // namespace boost + +#endif // BOOST_ASIO_LINKAGE_FIX_H diff --git a/include/config.h.in b/include/config.h.in new file mode 100644 index 0000000..0c93e99 --- /dev/null +++ b/include/config.h.in @@ -0,0 +1,11 @@ +#ifndef _CONFIG_H +#define _CONFIG_H + +/* Debug locking configuration */ +#cmakedefine CONFIG_ENABLE_DEBUG_LOCKS +#cmakedefine CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS @DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS@ + +/* Debug callable tracing configuration */ +#cmakedefine CONFIG_DEBUG_TRACE_CALLABLES + +#endif /* _CONFIG_H */ diff --git a/include/spinscale/asynchronousBridge.h b/include/spinscale/asynchronousBridge.h new file mode 100644 index 0000000..99d050d --- /dev/null +++ b/include/spinscale/asynchronousBridge.h @@ -0,0 +1,58 @@ +#ifndef ASYNCHRONOUS_BRIDGE_H +#define ASYNCHRONOUS_BRIDGE_H + +#include +#include +#include + +namespace sscl { + +class AsynchronousBridge +{ +public: + AsynchronousBridge(boost::asio::io_service &io_service) + : isAsyncOperationComplete(false), io_service(io_service) + {} + + void setAsyncOperationComplete(void) + { + /** EXPLANATION: + * This empty post()ed message is necessary to ensure that the thread + * that's waiting on the io_service is signaled to wake up and check + * the io_service's queue. + */ + isAsyncOperationComplete.store(true); + io_service.post([]{}); + } + + void waitForAsyncOperationCompleteOrIoServiceStopped(void) + { + for (;;) + { + io_service.run_one(); + if (isAsyncOperationComplete.load() || io_service.stopped()) + { break; } + + /** EXPLANATION: + * In the mrntt and mind thread loops we call checkException() after + * run() returns, but we don't have to do that here because + * setException() calls stop. + * + * So if an exception is set on our thread, we'll break out of this + * loop due to the check for stopped() above, and that'll take us + * back out to the main loop, where we'll catch the exception. + */ + } + } + + bool exitedBecauseIoServiceStopped(void) const + { return io_service.stopped(); } + +private: + std::atomic isAsyncOperationComplete; + boost::asio::io_service &io_service; +}; + +} // namespace sscl + +#endif // ASYNCHRONOUS_BRIDGE_H diff --git a/include/spinscale/asynchronousContinuation.h b/include/spinscale/asynchronousContinuation.h new file mode 100644 index 0000000..cb7fcee --- /dev/null +++ b/include/spinscale/asynchronousContinuation.h @@ -0,0 +1,158 @@ +#ifndef ASYNCHRONOUS_CONTINUATION_H +#define ASYNCHRONOUS_CONTINUATION_H + +#include +#include +#include +#include +#include +#include +#include + + +namespace sscl { + +/** + * AsynchronousContinuation - Template base class for async sequence management + * + * This template provides a common pattern for managing asynchronous operations + * that need to maintain object lifetime through a sequence of callbacks. + * + * The template parameter OriginalCbFnT represents the signature of the original + * callback that will be invoked when the async sequence completes. + */ +template +class AsynchronousContinuation +: public AsynchronousContinuationChainLink +{ +public: + explicit AsynchronousContinuation(Callback originalCb) + : originalCallback(std::move(originalCb)) + {} + + /** EXPLANATION: + * Each numbered segmented sequence persists the lifetime of the + * continuation object by taking a copy of its shared_ptr. + */ + typedef void (SegmentFn)( + std::shared_ptr> + lifetimePreservingConveyance); + + /** EXPLANATION: + * When an exception is thrown in a an async callee, which pertains to an + * error in the data given by the caller, we ought not to throw the + * exception within the callee. Instead, we should store the exception + * in the continuation object and return it to the caller. + * + * The caller should then call checkException() to rethrow it on its + * own stack. + * + * This macro should be used by the caller to bubble the exception to the + * caller. + */ + #define CALLEE_SETEXC(continuation, type, exc_obj) \ + (continuation)->exception = std::make_exception_ptr(exc_obj) + + #define CALLEE_SETEXC_CALLCB(continuation, type, exc_obj) \ + do { \ + CALLEE_SETEXC(continuation, type, exc_obj); \ + (continuation)->callOriginalCb(); \ + } while(0) + + #define CALLEE_SETEXC_CALLCB_RET(continuation, type, exc_obj) \ + do { \ + CALLEE_SETEXC_CALLCB(continuation, type, exc_obj); \ + return; \ + } while(0) + + // Call this in the caller to rethrow the exception. + void checkException() + { + if (exception) + { std::rethrow_exception(exception); } + } + + // Implement the virtual method from AsynchronousContinuationChainLink + virtual std::shared_ptr + getCallersContinuationShPtr() const override + { return originalCallback.callerContinuation; } + +public: + Callback originalCallback; + std::exception_ptr exception; +}; + +/** + * NonPostedAsynchronousContinuation - For continuations that don't post + * callbacks + * + * Note: We intentionally do not create a + * LockedNonPostedAsynchronousContinuation because the only way to implement + * non-posted locking would be via busy-spinning or sleeplocks. This would + * eliminate the throughput advantage from our Qspinning mechanism, which + * relies on re-posting to the io_service queue when locks are unavailable. + */ +template +class NonPostedAsynchronousContinuation +: public AsynchronousContinuation +{ +public: + explicit NonPostedAsynchronousContinuation( + Callback originalCb) + : AsynchronousContinuation(originalCb) + {} + + /** + * @brief Call the original callback with perfect forwarding + * (immediate execution) + * + * This implementation calls the original callback immediately without + * posting to any thread or queue. Used for non-posted continuations. + * + * @param args Arguments to forward to the original callback + */ + template + void callOriginalCb(Args&&... args) + { + if (AsynchronousContinuation::originalCallback + .callbackFn) + { + AsynchronousContinuation::originalCallback + .callbackFn(std::forward(args)...); + } + } +}; + +template +class PostedAsynchronousContinuation +: public AsynchronousContinuation +{ +public: + PostedAsynchronousContinuation( + const std::shared_ptr &caller, + Callback originalCbFn) + : AsynchronousContinuation(originalCbFn), + caller(caller) + {} + + template + void callOriginalCb(Args&&... args) + { + if (AsynchronousContinuation::originalCallback + .callbackFn) + { + caller->getIoService().post( + STC(std::bind( + AsynchronousContinuation::originalCallback + .callbackFn, + std::forward(args)...))); + } + } + +public: + std::shared_ptr caller; +}; + +} // namespace sscl + +#endif // ASYNCHRONOUS_CONTINUATION_H diff --git a/include/spinscale/asynchronousContinuationChainLink.h b/include/spinscale/asynchronousContinuationChainLink.h new file mode 100644 index 0000000..e9b05da --- /dev/null +++ b/include/spinscale/asynchronousContinuationChainLink.h @@ -0,0 +1,32 @@ +#ifndef ASYNCHRONOUS_CONTINUATION_CHAIN_LINK_H +#define ASYNCHRONOUS_CONTINUATION_CHAIN_LINK_H + +#include + +namespace sscl { + +/** + * @brief Base class for all asynchronous continuation chain links + * + * This non-template base class provides type erasure for the continuation + * chain, allowing RTTI and dynamic casting when walking the chain. + * + * The chain walking logic can use dynamic_cast to determine the most + * derived type and perform appropriate operations. + * + * Inherits from enable_shared_from_this to allow objects to obtain a + * shared_ptr to themselves, which is useful for gridlock detection tracking. + */ +class AsynchronousContinuationChainLink +: public std::enable_shared_from_this +{ +public: + virtual ~AsynchronousContinuationChainLink() = default; + + virtual std::shared_ptr + getCallersContinuationShPtr() const = 0; +}; + +} // namespace sscl + +#endif // ASYNCHRONOUS_CONTINUATION_CHAIN_LINK_H diff --git a/include/spinscale/asynchronousLoop.h b/include/spinscale/asynchronousLoop.h new file mode 100644 index 0000000..bccb12d --- /dev/null +++ b/include/spinscale/asynchronousLoop.h @@ -0,0 +1,69 @@ +#ifndef ASYNCHRONOUS_LOOP_H +#define ASYNCHRONOUS_LOOP_H + +#include + +namespace sscl { + +class AsynchronousLoop +{ +public: + AsynchronousLoop( + const unsigned int nTotal, + unsigned int nSucceeded=0, unsigned int nFailed=0) + : nTotal(nTotal), nSucceeded(nSucceeded), nFailed(nFailed) + {} + + AsynchronousLoop(const AsynchronousLoop& other) + : nTotal(other.nTotal), + nSucceeded(other.nSucceeded.load()), nFailed(other.nFailed.load()) + {} + + AsynchronousLoop& operator=(const AsynchronousLoop& other) + { + if (this != &other) + { + nTotal = other.nTotal; + nSucceeded.store(other.nSucceeded.load()); + nFailed.store(other.nFailed.load()); + } + return *this; + } + + bool isComplete(void) const + { + return nSucceeded + nFailed == nTotal; + } + + void incrementSuccessOrFailureDueTo(bool success) + { + if (success) + { ++nSucceeded; } + else + { ++nFailed; } + } + + bool incrementSuccessOrFailureAndTestForCompletionDueTo(bool success) + { + incrementSuccessOrFailureDueTo(success); + return isComplete(); + } + + bool nTotalIsZero(void) const + { + return nTotal == 0; + } + + void setRemainingIterationsToFailure() + { + nFailed.store(nTotal - nSucceeded.load()); + } + +public: + unsigned int nTotal; + std::atomic nSucceeded, nFailed; +}; + +} // namespace sscl + +#endif // ASYNCHRONOUS_LOOP_H diff --git a/include/spinscale/callableTracer.h b/include/spinscale/callableTracer.h new file mode 100644 index 0000000..3774a3c --- /dev/null +++ b/include/spinscale/callableTracer.h @@ -0,0 +1,149 @@ +#ifndef SPINSCALE_CALLABLE_TRACER_H +#define SPINSCALE_CALLABLE_TRACER_H + +#include +#include +#include +#include +#include +#include + +// Forward declaration - OptionParser is defined in smocore/include/opts.h +// If you need tracing, include opts.h before including this header +// The code will check for OPTS_H define to see if opts.h has been included +class OptionParser; + +namespace sscl { + +/** + * @brief CallableTracer - Wraps callables with metadata for debugging + * + * This class wraps any callable object with metadata (caller function name, + * line number, and return addresses) to help debug cases where callables + * posted to boost::asio::io_service have gone out of scope. The metadata + * can be accessed from the callable's address when debugging. + */ +class CallableTracer +{ +public: + /** + * @brief Constructor that wraps a callable with metadata + * @param callerFuncName The name of the function that created this callable + * @param callerLine The line number where this callable was created + * @param returnAddr0 The return address of the direct caller + * @param returnAddr1 The return address of the caller before that + * @param callable The callable object to wrap + */ + template + explicit CallableTracer( + const char* callerFuncName, + int callerLine, + void* returnAddr0, + void* returnAddr1, + CallableT&& callable) + : callerFuncName(callerFuncName), + callerLine(callerLine), + returnAddr0(returnAddr0), + returnAddr1(returnAddr1), + callable(std::forward(callable)) + {} + + void operator()() + { + // OptionParser::getOptions() requires opts.h to be included + // Only check traceCallables if opts.h has been included (OPTS_H is defined) + #ifdef CONFIG_DEBUG_TRACE_CALLABLES + #ifdef OPTS_H + if (OptionParser::getOptions().traceCallables) + { + std::cout << "" << __func__ << ": On thread " + << (ComponentThread::tlsInitialized() + ? ComponentThread::getSelf()->name : "") + << ": Calling callable posted by:\n" + << "\t" << callerFuncName << "\n\tat line " << (int)callerLine + << " return addr 0: " << returnAddr0 + << ", return addr 1: " << returnAddr1 + << std::endl; + } + #endif + #endif + callable(); + } + +public: + /// Name of the function that created this callable + std::string callerFuncName; + /// Line number where this callable was created + int callerLine; + /// Return address of the direct caller + void* returnAddr0; + /// Return address of the caller before that + void* returnAddr1; + +private: + /// The wrapped callable (type-erased using std::function) + std::function callable; +}; + +} // namespace sscl + +/** + * @brief STC - SMO Traceable Callable macro + * + * When CONFIG_DEBUG_TRACE_CALLABLES is defined, wraps the callable with + * CallableTracer to store metadata (caller function name, line number, + * and return addresses). When not defined, returns the callable directly + * with no overhead. + * + * Uses compiler-specific macros to get fully qualified function names: + * - GCC/Clang: __PRETTY_FUNCTION__ (includes full signature with namespace/class) + * - MSVC: __FUNCSIG__ (includes full signature) + * - Fallback: __func__ (unqualified function name only) + * + * Uses compiler-specific builtins to get return addresses: + * - GCC/Clang: __builtin_return_address(0) and __builtin_return_address(1) + * - MSVC: _ReturnAddress() (only one level available) + * - Fallback: nullptr for return addresses + * + * Usage: + * thread->getIoService().post( + * STC(std::bind(&SomeClass::method, this, arg1, arg2))); + */ +#ifdef CONFIG_DEBUG_TRACE_CALLABLES + #if defined(__GNUC__) || defined(__clang__) + // GCC/Clang: __PRETTY_FUNCTION__ gives full signature + // e.g., "void smo::SomeClass::method(int, int)" + // __builtin_return_address(0) = direct caller + // __builtin_return_address(1) = caller before that + #define STC(arg) smo::CallableTracer( \ + __PRETTY_FUNCTION__, \ + __LINE__, \ + __builtin_return_address(0), \ + __builtin_return_address(1), \ + arg) + #elif defined(_MSC_VER) + // MSVC: __FUNCSIG__ gives full signature + // e.g., "void __cdecl smo::SomeClass::method(int, int)" + // _ReturnAddress() = direct caller (only one level available) + #include + #define STC(arg) smo::CallableTracer( \ + __FUNCSIG__, \ + __LINE__, \ + _ReturnAddress(), \ + nullptr, \ + arg) + #else + // Fallback to standard __func__ (unqualified name only) + // No return address support + #define STC(arg) smo::CallableTracer( \ + __func__, \ + __LINE__, \ + nullptr, \ + nullptr, \ + arg) + #endif +#else +#define STC(arg) arg +#endif + +#endif // SPINSCALE_CALLABLE_TRACER_H diff --git a/include/spinscale/callback.h b/include/spinscale/callback.h new file mode 100644 index 0000000..1dfeb50 --- /dev/null +++ b/include/spinscale/callback.h @@ -0,0 +1,31 @@ +#ifndef SPINSCALE_CALLBACK_H +#define SPINSCALE_CALLBACK_H + +#include + +namespace sscl { + +// Forward declaration +class AsynchronousContinuationChainLink; + +/** + * @brief Callback class that wraps a function and its caller continuation + * + * This class provides a way to pass both a callback function and the + * caller's continuation in a single object, enabling deadlock detection + * by walking the chain of continuations. + * + * Usage: Callback{context, std::bind(...)} + */ +template +class Callback +{ +public: + // Aggregate initialization allows: Callback{context, std::bind(...)} + std::shared_ptr callerContinuation; + CbFnT callbackFn; +}; + +} // namespace sscl + +#endif // SPINSCALE_CALLBACK_H diff --git a/include/spinscale/component.h b/include/spinscale/component.h new file mode 100644 index 0000000..0fb3b04 --- /dev/null +++ b/include/spinscale/component.h @@ -0,0 +1,41 @@ +#ifndef COMPONENT_H +#define COMPONENT_H + +#include +#include +#include +#include +#include + +namespace sscl { + +class ComponentThread; + +class Component +{ +public: + Component(const std::shared_ptr &thread); + ~Component() = default; + +public: + std::shared_ptr thread; + +public: +}; + +class PuppetComponent +: public Component +{ +public: + PuppetComponent( + PuppetApplication &parent, + const std::shared_ptr &thread); + ~PuppetComponent() = default; + +public: + PuppetApplication &parent; +}; + +} // namespace sscl + +#endif // COMPONENT_H diff --git a/include/spinscale/componentThread.h b/include/spinscale/componentThread.h new file mode 100644 index 0000000..39d453c --- /dev/null +++ b/include/spinscale/componentThread.h @@ -0,0 +1,167 @@ +#ifndef COMPONENT_THREAD_H +#define COMPONENT_THREAD_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace sscl { + +class MarionetteThread; +class PuppetThread; + +// ThreadId is a generic type - application-specific enums should be defined elsewhere +typedef uint8_t ThreadId; + +class ComponentThread +{ +protected: + ComponentThread(ThreadId _id) + : id(_id), name(getThreadName(_id)), + work(io_service) + {} + +public: + virtual ~ComponentThread() = default; + + // getThreadName implementation is provided by application code + static std::string getThreadName(ThreadId id); + + void cleanup(void); + + boost::asio::io_service& getIoService(void) { return io_service; } + + static const std::shared_ptr getSelf(void); + static bool tlsInitialized(void); + static std::shared_ptr getMrntt(); + + typedef void (mainFn)(ComponentThread &self); + + // CPU management methods + static int getAvailableCpuCount(); + + typedef std::function mindShutdownIndOpCbFn; + // Intentionally doesn't take a callback. + void exceptionInd(const std::shared_ptr &faultyThread); + // Intentionally doesn't take a callback. + void userShutdownInd(); + +public: + ThreadId id; + std::string name; + boost::asio::io_service io_service; + boost::asio::io_service::work work; + std::atomic keepLooping; +}; + +class MarionetteThread +: public std::enable_shared_from_this, + public ComponentThread +{ +public: + MarionetteThread(ThreadId id = 0) + : ComponentThread(id), + thread(main, std::ref(*this)) + { + } + + static void main(MarionetteThread& self); + void initializeTls(void); + +public: + std::thread thread; +}; + +class PuppetThread +: public std::enable_shared_from_this, + public ComponentThread +{ +public: + enum class ThreadOp + { + START, + PAUSE, + RESUME, + EXIT, + JOLT, + N_ITEMS + }; + + PuppetThread(ThreadId _id) + : ComponentThread(_id), + pinnedCpuId(-1), + pause_work(pause_io_service), + thread(main, std::ref(*this)) + { + } + + virtual ~PuppetThread() = default; + + static void main(PuppetThread& self); + void initializeTls(void); + + // Thread management methods + typedef std::function threadLifetimeMgmtOpCbFn; + void startThreadReq(Callback callback); + void exitThreadReq(Callback callback); + void pauseThreadReq(Callback callback); + void resumeThreadReq(Callback callback); + + /** + * JOLTs this thread to begin processing after global initialization. + * + * JOLTing is the mechanism that allows threads to enter their main + * event loops and set up TLS vars after all global constructors have + * completed. This prevents race conditions during system startup. + * + * @param selfPtr Shared pointer to this thread (required because TLS + * isn't set up yet, so shared_from_this() can't be used) + * @param callback Callback to invoke when JOLT completes + */ + void joltThreadReq( + const std::shared_ptr& selfPtr, + Callback callback); + + // CPU management methods + void pinToCpu(int cpuId); + +protected: + /** + * Handle exception - called from main() when an exception occurs. + * Derived classes can override to provide application-specific handling. + */ + virtual void handleException() {} + +public: + int pinnedCpuId; + boost::asio::io_service pause_io_service; + boost::asio::io_service::work pause_work; + std::thread thread; + +public: + class ThreadLifetimeMgmtOp; +}; + +namespace mrntt { +extern std::shared_ptr thread; + +// Forward declaration for marionette thread ID management +// Must be after sscl namespace so ThreadId is defined +extern ThreadId marionetteThreadId; +void setMarionetteThreadId(ThreadId id); +} // namespace mrntt +} + +#endif // COMPONENT_THREAD_H diff --git a/include/spinscale/dependencyGraph.h b/include/spinscale/dependencyGraph.h new file mode 100644 index 0000000..2cefe45 --- /dev/null +++ b/include/spinscale/dependencyGraph.h @@ -0,0 +1,85 @@ +#ifndef DEPENDENCY_GRAPH_H +#define DEPENDENCY_GRAPH_H + +#include +#include +#include +#include + +namespace sscl { + +// Forward declarations +class AsynchronousContinuationChainLink; + +/** + * @brief DependencyGraph - Represents a directed graph for lock dependency analysis + * + * This graph represents dependencies between continuations (lockvokers) where + * an edge from A to B means that continuation A wants a lock that is held by + * continuation B. This is used to detect circular dependencies (gridlocks). + */ +class DependencyGraph +{ +public: + typedef std::shared_ptr Node; + // Each node maps to a set of nodes it depends on + typedef std::unordered_map> AdjacencyList; + +public: + void addNode(const Node& node); + + /** + * @brief Add a directed edge from source to target + * @param source The continuation that wants a lock + * @param target The continuation that holds the wanted lock + */ + void addEdge(const Node& source, const Node& target); + + /** + * @brief Find all cycles in the graph using DFS + * @return Vector of cycles, where each cycle is a vector of nodes + */ + std::vector> findCycles() const; + + /** + * @brief Check if there are any cycles in the graph + * @return true if cycles exist, false otherwise + */ + bool hasCycles() const; + + /** + * @brief Get the number of nodes in the graph + * @return Number of nodes + */ + size_t getNodeCount() const; + + /** + * @brief Get the adjacency list for debugging + * @return Reference to the adjacency list + */ + const AdjacencyList& getAdjacencyList() const { return adjacencyList; } + +private: + /** + * @brief DFS helper for cycle detection + * @param node Current node being visited + * @param visited Set of nodes that have been fully processed + * @param recursionStack Set of nodes currently in the recursion stack + * @param path Current path being explored + * @param cycles Vector to store found cycles + */ + void dfsCycleDetection( + const Node& node, + std::unordered_set& visited, + std::unordered_set& recursionStack, + std::vector& path, + std::vector>& cycles) + const; + +private: + AdjacencyList adjacencyList; +}; + +} // namespace sscl + +#endif // DEPENDENCY_GRAPH_H diff --git a/include/spinscale/lockSet.h b/include/spinscale/lockSet.h new file mode 100644 index 0000000..e00687c --- /dev/null +++ b/include/spinscale/lockSet.h @@ -0,0 +1,260 @@ +#ifndef LOCK_SET_H +#define LOCK_SET_H + +#include +#include +#include +#include +#include +#include +#include + +namespace sscl { + +// Forward declarations +template +class SerializedAsynchronousContinuation; +class Qutex; + +/** + * @brief LockSet - Manages a collection of locks for acquisition/release + */ +template +class LockSet +{ +public: + /** EXPLANATION: + * Tracks both the Qutex that must be acquired, as well as the parent + * LockerAndInvoker that this LockSet has registered into that Qutex's + * queue. + */ + struct LockUsageDesc + { + std::reference_wrapper qutex; + typename LockerAndInvokerBase::List::iterator iterator; + bool hasBeenReleased = false; + + LockUsageDesc(std::reference_wrapper qutexRef, + typename LockerAndInvokerBase::List::iterator iter) + : qutex(qutexRef), iterator(iter), hasBeenReleased(false) {} + }; + + typedef std::vector> Set; + +public: + /** + * @brief Constructor + * @param parentContinuation Reference to the parent + * SerializedAsynchronousContinuation + * @param qutexes Vector of Qutex references that must be acquired + */ + LockSet( + SerializedAsynchronousContinuation &parentContinuation, + std::vector> qutexes = {}) + : parentContinuation(parentContinuation), allLocksAcquired(false), + registeredInQutexQueues(false) + { + /* Convert Qutex references to LockUsageDesc (iterators will be filled + * in during registration) + */ + locks.reserve(qutexes.size()); + for (auto& qutexRef : qutexes) + { + locks.emplace_back( + qutexRef, + typename LockerAndInvokerBase::List::iterator{}); + } + } + + /** + * @brief Register the LockSet with all its Qutex locks + * @param lockvoker The LockerAndInvoker to register with each Qutex + * + * EXPLANATION: + * I'm not sure an unregisterFromQutexQueues() method is needed. + * Why? Because if an async sequence can't acquire all locks, it will + * simply never leave the qutexQ until it eventually does. The only other + * time it will leave the qutexQ is when the program terminates. + * + * I'm not sure we'll actually cancal all in-flight async sequences -- + * and especially not all those that aren't even in any io_service queues. + * To whatever extent these objects get cleaned up, they'll probably be + * cleaned up in the qutexQ's std::list destructor -- and that won't + * execute any fancy cleanup logic. It'll just clear() out the list. + */ + void registerInQutexQueues( + const std::shared_ptr &lockvoker + ) + { + /** EXPLANATION: + * Register the lockvoker with each Qutex and store the returned + * iterator to its place within each Qutex's queue. We store the + * iterator so that we can quickly move the lockvoker around within + * the queue, and eventually, erase() it when we acquire all the + * locks. + */ + for (auto& lockUsageDesc : locks) + { + lockUsageDesc.iterator = lockUsageDesc.qutex.get().registerInQueue( + lockvoker); + } + + registeredInQutexQueues = true; + } + + void unregisterFromQutexQueues() + { + if (!registeredInQutexQueues) + { + throw std::runtime_error( + std::string(__func__) + + ": LockSet::unregisterFromQutexQueues() called but not " + "registered in Qutex queues"); + } + + // Unregister from all qutex queues + for (auto& lockUsageDesc : locks) + { + auto it = lockUsageDesc.iterator; + lockUsageDesc.qutex.get().unregisterFromQueue(it); + } + } + + + /** + * @brief Try to acquire all locks in order; back off if acquisition fails + * @param lockvoker The LockerAndInvoker attempting to acquire the locks + * @param firstFailedQutex Output parameter to receive the first Qutex that + * failed acquisition (can be nullptr) + * @return true if all locks were acquired, false otherwise + */ + bool tryAcquireOrBackOff( + LockerAndInvokerBase &lockvoker, + std::optional> &firstFailedQutex + = std::nullopt + ) + { + if (!registeredInQutexQueues) + { + throw std::runtime_error( + std::string(__func__) + + ": LockSet::tryAcquireOrBackOff() called but not registered in " + "Qutex queues"); + } + if (allLocksAcquired) + { + throw std::runtime_error( + std::string(__func__) + + ": LockSet::tryAcquireOrBackOff() called but allLocksAcquired " + "is already true"); + } + + // Try to acquire all required locks + int nAcquired = 0; + const int nRequiredLocks = static_cast(locks.size()); + for (auto& lockUsageDesc : locks) + { + if (!lockUsageDesc.qutex.get().tryAcquire( + lockvoker, nRequiredLocks)) + { + // Set the first failed qutex for debugging + firstFailedQutex = std::ref(lockUsageDesc.qutex.get()); + break; + } + + nAcquired++; + } + + if (nAcquired < nRequiredLocks) + { + // Release any locks we managed to acquire + for (int i = 0; i < nAcquired; i++) { + locks[i].qutex.get().backoff(lockvoker, nRequiredLocks); + } + + return false; + } + + allLocksAcquired = true; + return true; + } + + // @brief Release all locks + void release() + { + if (!registeredInQutexQueues) + { + throw std::runtime_error( + std::string(__func__) + + ": LockSet::release() called but not registered in Qutex " + "queues"); + } + + if (!allLocksAcquired) + { + throw std::runtime_error( + std::string(__func__) + + ": LockSet::release() called but allLocksAcquired is false"); + } + + for (auto& lockUsageDesc : locks) + { + if (lockUsageDesc.hasBeenReleased) { continue; } + + lockUsageDesc.qutex.get().release(); + } + + allLocksAcquired = false; + } + + const LockUsageDesc &getLockUsageDesc(const Qutex &criterionLock) const + { + for (auto& lockUsageDesc : locks) + { + if (&lockUsageDesc.qutex.get() == &criterionLock) { + return lockUsageDesc; + } + } + + // Should never happen if the LockSet is properly constructed + throw std::runtime_error( + std::string(__func__) + + ": Qutex not found in this LockSet"); + } + + /** + * @brief Release a specific qutex early and mark it as released + * @param qutex The qutex to release early + */ + void releaseQutexEarly(Qutex &qutex) + { + if (!allLocksAcquired) + { + throw std::runtime_error( + std::string(__func__) + + ": LockSet::releaseQutexEarly() called but allLocksAcquired is false"); + } + + auto& lockUsageDesc = const_cast( + getLockUsageDesc(qutex)); + + if (!lockUsageDesc.hasBeenReleased) + { + lockUsageDesc.qutex.get().release(); + lockUsageDesc.hasBeenReleased = true; + } + + return; + } + +public: + std::vector locks; + +private: + SerializedAsynchronousContinuation &parentContinuation; + bool allLocksAcquired, registeredInQutexQueues; +}; + +} // namespace sscl + +#endif // LOCK_SET_H diff --git a/include/spinscale/lockerAndInvokerBase.h b/include/spinscale/lockerAndInvokerBase.h new file mode 100644 index 0000000..528ebef --- /dev/null +++ b/include/spinscale/lockerAndInvokerBase.h @@ -0,0 +1,87 @@ +#ifndef LOCKER_AND_INVOKER_BASE_H +#define LOCKER_AND_INVOKER_BASE_H + +#include +#include + +namespace sscl { + +// Forward declaration +class Qutex; + +/** + * @brief LockerAndInvokerBase - Base class for lockvoking mechanism + * + * This base class contains the common functionality needed by Qutex, + * including the serialized continuation reference and comparison operators. + */ +class LockerAndInvokerBase +{ +public: + /** + * @brief Constructor + * @param serializedContinuationVaddr Raw pointer to the serialized continuation + */ + explicit LockerAndInvokerBase(const void* serializedContinuationVaddr) + : serializedContinuationVaddr(serializedContinuationVaddr) + {} + + /** + * @brief Typedef for list of LockerAndInvokerBase shared pointers + */ + typedef std::list> List; + + /** + * @brief Get the iterator for this lockvoker in the specified Qutex's queue + * @param qutex The Qutex to get the iterator for + * @return Iterator pointing to this lockvoker in the Qutex's queue + */ + virtual List::iterator getLockvokerIteratorForQutex(Qutex& qutex) const = 0; + + /** + * @brief Awaken this lockvoker by posting it to its io_service + * @param forceAwaken If true, post even if already awake + */ + virtual void awaken(bool forceAwaken = false) = 0; + + /* These two are ued to iterate through the lockset of a Lockvoker in a + * template-erased manner. We use them in the gridlock detection algorithm. + */ + virtual size_t getLockSetSize() const = 0; + virtual Qutex& getLockAt(size_t index) const = 0; + + /** + * @brief Equality operator + * + * Compare by the address of the continuation objects. Why? + * Because there's no guarantee that the lockvoker object that was + * passed in by the io_service invocation is the same object as that + * which is in the qutexQs. Especially because we make_shared() a + * copy when registerInQutexQueues()ing. + * + * Generally when we "wake" a lockvoker by enqueuing it, boost's + * io_service::post will copy the lockvoker object. + */ + bool operator==(const LockerAndInvokerBase &other) const + { + return serializedContinuationVaddr == other.serializedContinuationVaddr; + } + + /** + * @brief Inequality operator + */ + bool operator!=(const LockerAndInvokerBase &other) const + { + return serializedContinuationVaddr != other.serializedContinuationVaddr; + } + +protected: + /* Never let this monstrosity be seen beyond this class's scope. + * Remember what I've taught you, quasi-modo? + */ + const void* serializedContinuationVaddr; +}; + +} // namespace sscl + +#endif // LOCKER_AND_INVOKER_BASE_H diff --git a/include/spinscale/marionette.h b/include/spinscale/marionette.h new file mode 100644 index 0000000..b982cee --- /dev/null +++ b/include/spinscale/marionette.h @@ -0,0 +1,58 @@ +#ifndef _MARIONETTE_H +#define _MARIONETTE_H + +#include +#include +#include +#include + +namespace sscl { + +class MarionetteThread; + +namespace mrntt { + +class MarionetteComponent +: public sscl::Component +{ +public: + MarionetteComponent(const std::shared_ptr &thread); + ~MarionetteComponent() = default; + +public: + typedef std::function mrnttLifetimeMgmtOpCbFn; + void initializeReq(sscl::Callback callback); + void finalizeReq(sscl::Callback callback); + // Intentionally doesn't take a callback. + void exceptionInd(); + +private: + class MrnttLifetimeMgmtOp; + class TerminationEvent; +}; + +extern std::shared_ptr thread; + +extern std::atomic exitCode; +void exitMarionetteLoop(); +void marionetteFinalizeReqCb(bool success); +extern MarionetteComponent mrntt; + +} // namespace mrntt + +struct CrtCommandLineArgs +{ + CrtCommandLineArgs(int argc, char *argv[], char *envp[]) + : argc(argc), argv(argv), envp(envp) + {} + + int argc; + char **argv; + char **envp; + + static void set(int argc, char *argv[], char *envp[]); +}; + +} // namespace sscl + +#endif // _MARIONETTE_H diff --git a/include/spinscale/puppetApplication.h b/include/spinscale/puppetApplication.h new file mode 100644 index 0000000..24b5e06 --- /dev/null +++ b/include/spinscale/puppetApplication.h @@ -0,0 +1,68 @@ +#ifndef PUPPET_APPLICATION_H +#define PUPPET_APPLICATION_H + +#include +#include +#include +#include +#include +#include + +namespace sscl { + +class PuppetApplication +: public std::enable_shared_from_this +{ +public: + PuppetApplication( + const std::vector> &threads); + ~PuppetApplication() = default; + + // Thread management methods + typedef std::function puppetThreadLifetimeMgmtOpCbFn; + void joltAllPuppetThreadsReq( + Callback callback); + void startAllPuppetThreadsReq( + Callback callback); + void pauseAllPuppetThreadsReq( + Callback callback); + void resumeAllPuppetThreadsReq( + Callback callback); + void exitAllPuppetThreadsReq( + Callback callback); + + // CPU distribution method + void distributeAndPinThreadsAcrossCpus(); + +protected: + // Collection of PuppetThread instances + std::vector> componentThreads; + + /** + * Indicates whether all puppet threads have been JOLTed at least once. + * + * JOLTing serves two critical purposes: + * + * 1. **Global Constructor Sequencing**: Since pthreads begin executing while + * global constructors are still being executed, globally defined pthreads + * cannot depend on global objects having been constructed. JOLTing is done + * by the CRT's main thread within main(), which provides a sequencing + * guarantee that global constructors have been called. + * + * 2. **shared_from_this Safety**: shared_from_this() requires a prior + * shared_ptr handle to be established. The global list of + * shared_ptr guarantees that at least one shared_ptr to + * each ComponentThread has been initialized before JOLTing occurs. + * + * This flag ensures that JOLTing happens exactly once and provides + * a synchronization point for the entire system initialization. + */ + bool threadsHaveBeenJolted = false; + +private: + class PuppetThreadLifetimeMgmtOp; +}; + +} // namespace sscl + +#endif // PUPPET_APPLICATION_H diff --git a/include/spinscale/qutex.h b/include/spinscale/qutex.h new file mode 100644 index 0000000..dfa06f8 --- /dev/null +++ b/include/spinscale/qutex.h @@ -0,0 +1,107 @@ +#ifndef QUTEX_H +#define QUTEX_H + +#include +#include +#include +#include +#include +#include + +namespace sscl { + +/** + * @brief Qutex - Queue-based mutex for asynchronous lock management + * + * A Qutex combines a spinlock, an ownership flag, and a queue of waiting + * lockvokers to provide efficient asynchronous lock management with + * priority-based acquisition for LockSets. + */ +class Qutex +{ +public: + /** + * @brief Constructor + */ + Qutex([[maybe_unused]] const std::string &_name) + : +#ifdef CONFIG_ENABLE_DEBUG_LOCKS + name(_name), currOwner(nullptr), +#endif + isOwned(false) + {} + + /** + * @brief Register a lockvoker in the queue + * @param lockvoker The lockvoker to register + * @return Iterator pointing to the registered lockvoker in the queue + */ + LockerAndInvokerBase::List::iterator registerInQueue( + const std::shared_ptr &lockvoker + ) + { + lock.acquire(); + auto it = queue.insert(queue.end(), lockvoker); + lock.release(); + return it; + } + + /** + * @brief Unregister a lockvoker from the queue + * @param it Iterator pointing to the lockvoker to unregister + * @param shouldLock Whether to acquire the spinlock before erasing (default: true) + */ + void unregisterFromQueue( + LockerAndInvokerBase::List::iterator it, bool shouldLock = true + ) + { + if (shouldLock) + { + lock.acquire(); + queue.erase(it); + lock.release(); + } + else { + queue.erase(it); + } + } + + /** + * @brief Try to acquire the lock for a lockvoker + * @param tryingLockvoker The lockvoker attempting to acquire the lock + * @param nRequiredLocks Number of locks required by the lockvoker's LockSet + * @return true if the lock was successfully acquired, false otherwise + */ + bool tryAcquire( + const LockerAndInvokerBase &tryingLockvoker, int nRequiredLocks); + + /** + * @brief Handle backoff when a lockvoker fails to acquire all required locks + * @param failedAcquirer The lockvoker that failed to acquire all locks + * @param nRequiredLocks Number of locks required by the lockvoker's LockSet + */ + void backoff(const LockerAndInvokerBase &failedAcquirer, int nRequiredLocks); + + /** + * @brief Release the lock and wake up the next waiting lockvoker + */ + void release(); + +#ifdef CONFIG_ENABLE_DEBUG_LOCKS + std::shared_ptr getCurrOwner() const + { return currOwner; } +#endif + +public: +#ifdef CONFIG_ENABLE_DEBUG_LOCKS + std::string name; + std::shared_ptr currOwner; +#endif + SpinLock lock; + LockerAndInvokerBase::List queue; + bool isOwned; +}; + +} // namespace sscl + +#endif // QUTEX_H diff --git a/include/spinscale/qutexAcquisitionHistoryTracker.h b/include/spinscale/qutexAcquisitionHistoryTracker.h new file mode 100644 index 0000000..73a6aa7 --- /dev/null +++ b/include/spinscale/qutexAcquisitionHistoryTracker.h @@ -0,0 +1,164 @@ +#ifndef QUTEX_ACQUISITION_HISTORY_TRACKER_H +#define QUTEX_ACQUISITION_HISTORY_TRACKER_H + +#include +#include +#include +#include +#include "spinLock.h" + + +namespace sscl { + +// Forward declarations +class Qutex; +class AsynchronousContinuationChainLink; +class DependencyGraph; + +/** + * @brief QutexAcquisitionHistoryTracker - Tracks acquisition history for + * gridlock detection + * + * This class maintains a central acquisition history to track all lockvokers + * suspected of being gridlocked. It stores information about what locks each + * timed-out lockvoker wants and what locks they hold in their continuation + * history. + */ +class QutexAcquisitionHistoryTracker +{ +public: + /** + * @brief Type definition for the acquisition history entry + * + * pair.first: The firstFailedQutex that this lockvoker WANTS but can't + * acquire + * pair.second: A unique_ptr to a list of all acquired Qutexes in this + * lockvoker's continuation history + */ + typedef std::pair< + std::reference_wrapper, + std::unique_ptr>> + > AcquisitionHistoryEntry; + + /** + * @brief Type definition for the acquisition history map + * + * Key: std::shared_ptr + * (the continuation that contains the timed-out lockvoker) + * Value: AcquisitionHistoryEntry + * (its wanted lock (aka: firstFailedQutex/pair.first) + held locks) + */ + typedef std::unordered_map< + std::shared_ptr, + AcquisitionHistoryEntry + > AcquisitionHistoryMap; + +public: + static QutexAcquisitionHistoryTracker& getInstance() + { + static QutexAcquisitionHistoryTracker instance; + return instance; + } + + /** + * @brief Add a continuation to the acquisition history if it doesn't + * already exist + * @param continuation Shared pointer to the + * AsynchronousContinuationChainLink + * @param wantedLock The lock that this continuation wants but can't + * acquire + * @param heldLocks Unique pointer to list of locks held in this + * continuation's history (will be moved) + */ + void addIfNotExists( + std::shared_ptr &continuation, + Qutex& wantedLock, + std::unique_ptr>> + heldLocks + ) + { + acquisitionHistoryLock.acquire(); + + auto it = acquisitionHistory.find(continuation); + // If a continuation already exists, don't add it again + if (it != acquisitionHistory.end()) + { + acquisitionHistoryLock.release(); + return; + } + + acquisitionHistory.emplace(continuation, std::make_pair( + std::ref(wantedLock), std::move(heldLocks))); + + acquisitionHistoryLock.release(); + } + + /** + * @brief Remove a continuation from the acquisition history + * + * @param continuation Shared pointer to the + * AsynchronousContinuationChainLink to remove + * @return true if the continuation was found and removed, false if not found + */ + bool remove( + std::shared_ptr &continuation + ) + { + acquisitionHistoryLock.acquire(); + + auto it = acquisitionHistory.find(continuation); + if (it != acquisitionHistory.end()) + { + acquisitionHistory.erase(it); + + acquisitionHistoryLock.release(); + return true; + } + + acquisitionHistoryLock.release(); + return false; + } + + bool heuristicallyTraceContinuationHistoryForGridlockOn( + Qutex &firstFailedQutex, + std::shared_ptr& + currentContinuation); + bool completelyTraceContinuationHistoryForGridlockOn( + Qutex &firstFailedQutex); + + /** + * @brief Generates a dependency graph among known continuations, based on + * the currently known acquisition history. There may well be a cyclical + * dependency which hasn't been reported to the history tracker yet. + * @param dontAcquireLock If true, skips acquiring the internal spinlock + * (assumes caller already holds it) + */ + [[nodiscard]] std::unique_ptr generateGraph( + bool dontAcquireLock = false); + + // Disable copy constructor and assignment operator + QutexAcquisitionHistoryTracker( + const QutexAcquisitionHistoryTracker&) = delete; + QutexAcquisitionHistoryTracker& operator=( + const QutexAcquisitionHistoryTracker&) = delete; + +private: + QutexAcquisitionHistoryTracker() = default; + ~QutexAcquisitionHistoryTracker() = default; + +private: + /** EXPLANATION: + * We use a SpinLock here instead of a Qutex because this acquisition + * history tracker is invoked within the LockerAndInvoker. + * Since LockerAndInvoker is too tightly coupled with Qutex workings, using + * a Qutex here would create a circular dependency or deadlock situation. + * Therefore, it's best to use a SpinLock on the history class to avoid + * these coupling issues. + */ + SpinLock acquisitionHistoryLock; + AcquisitionHistoryMap acquisitionHistory; +}; + +} // namespace sscl + +#endif // QUTEX_ACQUISITION_HISTORY_TRACKER_H diff --git a/include/spinscale/serializedAsynchronousContinuation.h b/include/spinscale/serializedAsynchronousContinuation.h new file mode 100644 index 0000000..2c4b4de --- /dev/null +++ b/include/spinscale/serializedAsynchronousContinuation.h @@ -0,0 +1,588 @@ +#ifndef SERIALIZED_ASYNCHRONOUS_CONTINUATION_H +#define SERIALIZED_ASYNCHRONOUS_CONTINUATION_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace sscl { + +template +class SerializedAsynchronousContinuation +: public PostedAsynchronousContinuation +{ +public: + SerializedAsynchronousContinuation( + const std::shared_ptr &caller, + Callback originalCbFn, + std::vector> requiredLocks) + : PostedAsynchronousContinuation(caller, originalCbFn), + requiredLocks(*this, std::move(requiredLocks)) + {} + + template + void callOriginalCb(Args&&... args) + { + requiredLocks.release(); + PostedAsynchronousContinuation::callOriginalCb( + std::forward(args)...); + } + + // Return list of all qutexes in predecessors' LockSets; excludes self. + [[nodiscard]] + std::unique_ptr>> + getAcquiredQutexHistory() const; + + /** + * @brief Release a specific qutex early + * @param qutex The qutex to release early + */ + void releaseQutexEarly(Qutex &qutex) + { requiredLocks.releaseQutexEarly(qutex); } + +public: + LockSet requiredLocks; + std::atomic isAwakeOrBeingAwakened{false}; + + /** + * @brief LockerAndInvoker - Template class for lockvoking mechanism + * + * This class wraps a std::bind result and provides locking functionality. + * When locks cannot be acquired, the object re-posts itself to the io_service + * queue, implementing the "spinqueueing" pattern. + */ + template + class LockerAndInvoker + : public LockerAndInvokerBase + { + public: + /** + * @brief Constructor that immediately posts to io_service + * @param serializedContinuation Reference to the serialized continuation + * containing LockSet and target io_service + * @param target The ComponentThread whose io_service to post to + * @param invocationTarget The std::bind result to invoke when locks are acquired + */ + LockerAndInvoker( + SerializedAsynchronousContinuation + &serializedContinuation, + const std::shared_ptr& target, + InvocationTargetT invocationTarget) + : LockerAndInvokerBase(&serializedContinuation), +#ifdef CONFIG_ENABLE_DEBUG_LOCKS + creationTimestamp(std::chrono::steady_clock::now()), +#endif + serializedContinuation(serializedContinuation), + target(target), + invocationTarget(std::move(invocationTarget)) + { +#ifdef CONFIG_ENABLE_DEBUG_LOCKS + std::optional> firstDuplicatedQutex = + traceContinuationHistoryForDeadlock(); + + if (firstDuplicatedQutex.has_value()) + { + handleDeadlock(firstDuplicatedQutex.value().get()); + throw std::runtime_error( + "LockerAndInvoker::LockerAndInvoker(): Deadlock detected"); + } +#endif // CONFIG_ENABLE_DEBUG_LOCKS + + firstWake(); + } + + /** + * @brief Function call operator - tries to acquire locks and either + * invokes the target or returns (already registered in qutex queues) + */ + void operator()(); + + /** + * @brief Get the iterator for this lockvoker in the specified Qutex's queue + * @param qutex The Qutex to get the iterator for + * @return Iterator pointing to this lockvoker in the Qutex's queue + */ + LockerAndInvokerBase::List::iterator + getLockvokerIteratorForQutex(Qutex& qutex) const override + { + return serializedContinuation.requiredLocks.getLockUsageDesc( + qutex).iterator; + } + + /** + * @brief Awaken this lockvoker by posting it to its io_service + * @param forceAwaken If true, post even if already awake + */ + void awaken(bool forceAwaken = false) override + { + bool prevVal = serializedContinuation.isAwakeOrBeingAwakened + .exchange(true); + + if (prevVal == true && !forceAwaken) + { return; } + + target->getIoService().post(*this); + } + + size_t getLockSetSize() const override + { return serializedContinuation.requiredLocks.locks.size(); } + + Qutex& getLockAt(size_t index) const override + { + return serializedContinuation.requiredLocks.locks[index] + .qutex.get(); + } + + private: + // Allow awakening by resetting the awake flag + void allowAwakening() + { serializedContinuation.isAwakeOrBeingAwakened.store(false); } + + /** EXPLANATION: + * We create a copy of the Lockvoker and then give sh_ptrs to that + * *COPY*, to each Qutex's internal queue. This enables us to keep + * the AsyncContinuation sh_ptr (which the Lockvoker contains within + * itself) alive without wasting too much memory. + * + * This way the io_service objects can remove the lockvoker from + * their queues and there'll be a copy of the lockvoker in each + * Qutex's queue. + * + * For non-serialized, posted continuations, they won't be removed + * from the io_service queue until they're executed, so there's no + * need to create copies of them. Lockvokers are removed from their + * io_service, potentially without being executed if they fail to + * acquire all locks. + */ + void registerInLockSet() + { + auto sharedLockvoker = std::make_shared< + LockerAndInvoker>(*this); + + serializedContinuation.requiredLocks.registerInQutexQueues( + sharedLockvoker); + } + + /** + * @brief First wake - register in queues and awaken + * + * Sets isAwake=true before calling awaken with forceAwaken to ensure + * that none of the locks we just registered with awaken()s a duplicate + * copy of this lockvoker on the io_service. + */ + void firstWake() + { + serializedContinuation.isAwakeOrBeingAwakened.store(true); + registerInLockSet(); + // Force awaken since we just set the flag above + awaken(true); + } + + // Has CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS elapsed since creation? + bool isDeadlockLikely() const + { +#ifdef CONFIG_ENABLE_DEBUG_LOCKS + auto now = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast( + now - creationTimestamp); + return elapsed.count() >= CONFIG_DEBUG_QUTEX_DEADLOCK_TIMEOUT_MS; +#else + return false; +#endif + } + + // Wrapper around isDeadlockLikely for gridlock detection + bool isGridlockLikely() const + { return isDeadlockLikely(); } + +#ifdef CONFIG_ENABLE_DEBUG_LOCKS + struct obsolete { + bool traceContinuationHistoryForGridlockOn(Qutex &firstFailedQutex); + }; + + bool traceContinuationHistoryForDeadlockOn(Qutex &firstFailedQutex); + std::optional> + traceContinuationHistoryForDeadlock(void) + { + for (auto& lockUsageDesc + : serializedContinuation.requiredLocks.locks) + { + if (traceContinuationHistoryForDeadlockOn( + lockUsageDesc.qutex.get())) + { + return std::ref(lockUsageDesc.qutex.get()); + } + } + return std::nullopt; + } + + /** + * @brief Handle a likely deadlock situation by logging debug information + * @param firstFailedQutex The first qutex that failed acquisition + */ + void handleDeadlock(const Qutex &firstFailedQutex) + { + std::cerr << __func__ << ": Deadlock: " + << "Lockvoker has been waiting for " + << std::chrono::duration_cast( + std::chrono::steady_clock::now() - this->creationTimestamp) + .count() + << "ms, failed on qutex @" << &firstFailedQutex + << " (" << firstFailedQutex.name << ")" << std::endl; + } + + void handleGridlock(const Qutex &firstFailedQutex) + { + std::cerr << __func__ << ": Gridlock: " + << "Lockvoker has been waiting for " + << std::chrono::duration_cast( + std::chrono::steady_clock::now() - this->creationTimestamp) + .count() + << "ms, failed on qutex @" << &firstFailedQutex + << " (" << firstFailedQutex.name << ")" << std::endl; + } +#endif + + private: +#ifdef CONFIG_ENABLE_DEBUG_LOCKS + std::chrono::steady_clock::time_point creationTimestamp; +#endif + SerializedAsynchronousContinuation + &serializedContinuation; + std::shared_ptr target; + InvocationTargetT invocationTarget; + }; +}; + +/******************************************************************************/ + +#ifdef CONFIG_ENABLE_DEBUG_LOCKS + +template +std::unique_ptr>> +SerializedAsynchronousContinuation::getAcquiredQutexHistory() +const +{ + auto heldLocks = std::make_unique< + std::forward_list>>(); + + /** EXPLANATION: + * Walk through the continuation chain to collect all acquired locks + * + * We don't add the current continuation's locks because it's the one + * failing to acquire locks and backing off. So we start from the previous + * continuation. + */ + for (std::shared_ptr currContin = + this->getCallersContinuationShPtr(); + currContin != nullptr; + currContin = currContin->getCallersContinuationShPtr()) + { + auto serializedCont = std::dynamic_pointer_cast< + SerializedAsynchronousContinuation>(currContin); + + if (serializedCont == nullptr) { continue; } + + // Add this continuation's locks to the held locks list + for (size_t i = 0; i < serializedCont->requiredLocks.locks.size(); ++i) + { + heldLocks->push_front(serializedCont->requiredLocks.locks[i].qutex); + } + } + + return heldLocks; +} + +template +template +bool +SerializedAsynchronousContinuation +::LockerAndInvoker +::traceContinuationHistoryForDeadlockOn(Qutex& firstFailedQutex) +{ + /** EXPLANATION: + * In this function we will trace through the chain of continuations that + * led up to this Lockvoker's continuation. For each continuation which is + * a SerializedAsynchronousContinuation, we check through its LockSet to see + * if it contains the lock that failed acquisition. If it does, we have a + * deadlock. + */ + + /* We can't start with the continuation directly referenced by this starting + * Lockvoker as it would contain the all locks we're currently trying to + * acquire...and rightly so because it's the continuation for this current + * lockvoker. + */ + for (std::shared_ptr currContin = + this->serializedContinuation.getCallersContinuationShPtr(); + currContin != nullptr; + currContin = currContin->getCallersContinuationShPtr()) + { + auto serializedCont = std::dynamic_pointer_cast< + SerializedAsynchronousContinuation>(currContin); + + if (serializedCont == nullptr) { continue; } + + // Check if the firstFailedQutex is in this continuation's LockSet + try { + serializedCont->requiredLocks.getLockUsageDesc(firstFailedQutex); + } catch (const std::runtime_error& e) { + std::cerr << __func__ << ": " << e.what() << std::endl; + continue; + } + + std::cout << __func__ << ":Deadlock detected: Found " + << "firstFailedQutex @" << &firstFailedQutex + << " (" << firstFailedQutex.name << ") in LockSet of " + << "SerializedAsynchronousContinuation @" + << serializedCont.get() << std::endl; + + return true; + } + + return false; +} + +template +template +bool +SerializedAsynchronousContinuation +::LockerAndInvoker +::obsolete::traceContinuationHistoryForGridlockOn(Qutex &firstFailedQutex) +{ + /** EXPLANATION: + * In this function we check for gridlocks which are slightly different + * from deadlocks. In a gridlock, two requests are waiting for locks that + * are held by the other. I.e: + * + * R1 holds LockA and is waiting for LockB. + * R2 holds LockB and is waiting for LockA. + * + * This differs from deadlocks because it's not a single request which is + * attempting to re-acquire a lock that it already holds. + * + * To detect this condition, we wait until the acquisition timeout has + * expired. Then: we extract the current owner of the first lock we're + * failing to acquire. + * + * From there, we go through each of the locks in the foreign owner's + * current (i.e: immediate, most recent continuation's) required LockSet. + * For each of the locks in the foreign owner's most immediate required + * LockSet, we trace backward in our *OWN* history to see if any of *OUR* + * continuations (excluding our most immediate continuation) contains that + * lock. + * + * If we find a match, that means that we're holding a lock that the foreign + * owner is waiting for. And we already know that the foreign owner is + * holding a lock that we're waiting for (when we extracted the current + * owner of the first failed lock in our most immediate Lockset). + * + * Hence, we have a gridlock. + */ + + std::shared_ptr foreignOwnerShPtr = + firstFailedQutex.getCurrOwner(); + // If no current owner, can't be a gridlock + if (foreignOwnerShPtr == nullptr) + { return false; } + + // Use reference for the rest of the function for safety. + LockerAndInvokerBase &foreignOwner = *foreignOwnerShPtr; + + /* For each lock in the foreign owner's LockSet, check if we hold it + * in any of our previous continuations (excluding our most immediate one) + */ + for (size_t i = 0; i < foreignOwner.getLockSetSize(); ++i) + { + Qutex& foreignLock = foreignOwner.getLockAt(i); + + /* Skip the firstFailedQutex since we already know the foreign owner + * holds it -- hence it's impossible for any of our previous + * continuations to hold it. + */ + if (&foreignLock == &firstFailedQutex) + { continue; } + + /** EXPLANATION: + * Trace backward through our continuation history (excluding our most + * immediate continuation). + * + * The reason we exclude our most immediate continuation is because the + * LockSet acquisition algorithm backs off if it fails to acquire ALL + * locks in the set. So if the lock that the foreign owner is waiting + * for is in our most immediate continuation, and NOT in one of our + * previous continuations, then we will back off and the foreign owner + * should eventually be able to acquire that lock. + */ + for (std::shared_ptr currContin = + this->serializedContinuation.getCallersContinuationShPtr(); + currContin != nullptr; + currContin = currContin->getCallersContinuationShPtr()) + { + auto serializedCont = std::dynamic_pointer_cast< + SerializedAsynchronousContinuation>(currContin); + + if (serializedCont == nullptr) { continue; } + + // Check if this continuation holds the foreign lock + try { + const auto& lockUsageDesc = serializedCont->requiredLocks + .getLockUsageDesc(foreignLock); + + // Matched! We hold a lock that the foreign owner is waiting for + std::cout << __func__ << ": Gridlock detected: We hold lock @" + << &foreignLock << " (" << foreignLock.name << ") in " + "continuation @" << serializedCont.get() + << ", while foreign owner @" << &foreignOwner + << " holds lock @" << &firstFailedQutex << " (" + << firstFailedQutex.name << ") that we're waiting for" + << std::endl; + + return true; + } catch (const std::runtime_error& e) { + // This continuation doesn't hold the foreign lock. Continue. + continue; + } + } + } + + return false; +} + +#endif // CONFIG_ENABLE_DEBUG_LOCKS + +template +template +void SerializedAsynchronousContinuation +::LockerAndInvoker::operator()() +{ + if (ComponentThread::getSelf() != target) + { + throw std::runtime_error( + "LockerAndInvoker::operator(): Thread safety violation - " + "executing on wrong ComponentThread"); + } + + std::optional> firstFailedQutexRet; + bool deadlockLikely = isDeadlockLikely(); + bool gridlockLikely = isGridlockLikely(); + + if (!serializedContinuation.requiredLocks.tryAcquireOrBackOff( + *this, firstFailedQutexRet)) + { + // Just allow this lockvoker to be dropped from its io_service. + allowAwakening(); + if (!deadlockLikely && !gridlockLikely) + { return; } + +#ifdef CONFIG_ENABLE_DEBUG_LOCKS + Qutex &firstFailedQutex = firstFailedQutexRet.value().get(); + bool isDeadlock = traceContinuationHistoryForDeadlockOn( + firstFailedQutex); + + bool gridlockIsHeuristicallyLikely = false; + bool gridlockIsAlgorithmicallyLikely = false; + + if (gridlockLikely) + { + auto& tracker = QutexAcquisitionHistoryTracker + ::getInstance(); + + auto heldLocks = serializedContinuation + .getAcquiredQutexHistory(); + + // Add this continuation to the tracker + auto currentContinuationShPtr = serializedContinuation + .shared_from_this(); + + tracker.addIfNotExists( + currentContinuationShPtr, + firstFailedQutex, std::move(heldLocks)); + + gridlockIsHeuristicallyLikely = tracker + .heuristicallyTraceContinuationHistoryForGridlockOn( + firstFailedQutex, currentContinuationShPtr); + + if (gridlockIsHeuristicallyLikely) + { + gridlockIsAlgorithmicallyLikely = tracker + .completelyTraceContinuationHistoryForGridlockOn( + firstFailedQutex); + } + } + + bool isGridlock = (gridlockIsHeuristicallyLikely + || gridlockIsAlgorithmicallyLikely); + + if (!isDeadlock && !isGridlock) + { return; } + + if (isDeadlock) { handleDeadlock(firstFailedQutex); } + if (isGridlock) { handleGridlock(firstFailedQutex); } +#endif + return; + } + + /** EXPLANATION: + * Successfully acquired all locks, so unregister from qutex queues. + * We do this here so that we can free up queue slots in the qutex + * queues for other lockvokers that may be waiting to acquire the + * locks. The size of the qutex queues does matter for other + * contending lockvokers; and so also does their position in the + * queues. + * + * The alternative is to leave ourself in the queues until we + * eventually release all locks; and given that we may hold locks + * even across true async hardware bottlenecks, this could take a + * long time. + * + * Granted, the fact that we own the locks means that even though + * we've removed ourselves from the queues, other lockvokers still + * can't acquire the locks anyway. + */ + serializedContinuation.requiredLocks.unregisterFromQutexQueues(); + +#ifdef CONFIG_ENABLE_DEBUG_LOCKS + /** EXPLANATION: + * If we were being tracked for gridlock detection but successfully + * acquired all locks, it was a false positive due to timed delay, + * long-running operation, or I/O delay + */ + if (gridlockLikely) + { + std::shared_ptr + currentContinuationShPtr = + serializedContinuation.shared_from_this(); + + bool removed = QutexAcquisitionHistoryTracker::getInstance() + .remove(currentContinuationShPtr); + + if (removed) + { + std::cerr + << "LockerAndInvoker::operator(): False positive " + "gridlock detection - continuation @" + << &serializedContinuation + << " was being tracked but successfully acquired all " + "locks. This was likely due to timed delay, " + "long-running operation, or I/O delay." + << std::endl; + } + } +#endif + + invocationTarget(); +} + +} // namespace sscl + +#endif // SERIALIZED_ASYNCHRONOUS_CONTINUATION_H diff --git a/include/spinscale/spinLock.h b/include/spinscale/spinLock.h new file mode 100644 index 0000000..0b3300e --- /dev/null +++ b/include/spinscale/spinLock.h @@ -0,0 +1,121 @@ +#ifndef SPIN_LOCK_H +#define SPIN_LOCK_H + +#include +#ifdef __x86_64__ +#include +#elif defined(__i386__) +#include +#elif defined(__arm__) +#include +#elif defined(__aarch64__) +#include +#elif defined(__aarch32__) +#include +#endif + +namespace sscl { + +/** + * @brief Simple spinlock using std::atomic + */ +class SpinLock +{ +public: + SpinLock() + : locked(false) + {} + + bool tryAcquire() + { + bool expected = false; + return locked.compare_exchange_strong(expected, true); + } + + inline void spinPause() + { +#ifdef __x86_64__ + _mm_pause(); +#elif defined(__i386__) + _mm_pause(); +#elif defined(__arm__) + __asm__ volatile("yield"); +#elif defined(__aarch64__) + __asm__ volatile("yield"); +#elif defined(__aarch32__) + __asm__ volatile("yield"); +#else +# error "Unsupported architecture" +#endif + } + + void acquire() + { + while (!tryAcquire()) + { + /** EXPLANATION: + * Busy-wait: keep trying to acquire the lock + * The CPU will spin here until the lock becomes available + * + * The spinPause() function is architecture-specific and is + * essential because I once fried an older Intel M-class laptop CPU + * when I forgot to include a PAUSE instruction in a for (;;){} + * loop. I'm not interested in frying my RPi or my other testbed + * robot boards. + */ + spinPause(); + } + } + + void release() + { + locked.store(false); + } + + /** + * @brief RAII guard for SpinLock + * Locks the spinlock on construction and unlocks on destruction + */ + class Guard + { + public: + explicit Guard(SpinLock& lock) + : lock_(lock), unlocked_(false) + { + lock_.acquire(); + } + + ~Guard() + { + if (!unlocked_) { + lock_.release(); + } + } + + void unlockPrematurely() + { + if (!unlocked_) + { + lock_.release(); + unlocked_ = true; + } + } + + // Non-copyable, non-movable + Guard(const Guard&) = delete; + Guard& operator=(const Guard&) = delete; + Guard(Guard&&) = delete; + Guard& operator=(Guard&&) = delete; + + private: + SpinLock& lock_; + bool unlocked_; + }; + +private: + std::atomic locked; +}; + +} // namespace sscl + +#endif // SPIN_LOCK_H diff --git a/src/component.cpp b/src/component.cpp new file mode 100644 index 0000000..988c095 --- /dev/null +++ b/src/component.cpp @@ -0,0 +1,28 @@ +#include +#include +#include + +namespace sscl { + +Component::Component(const std::shared_ptr &thread) +: thread(thread) +{ +} + +PuppetComponent::PuppetComponent( + PuppetApplication &parent, const std::shared_ptr &thread) +: Component(thread), +parent(parent) +{ +} + +namespace mrntt { + +MarionetteComponent::MarionetteComponent( + const std::shared_ptr &thread) +: sscl::Component(thread) +{ +} + +} // namespace mrntt +} // namespace sscl diff --git a/src/componentThread.cpp b/src/componentThread.cpp new file mode 100644 index 0000000..e53163b --- /dev/null +++ b/src/componentThread.cpp @@ -0,0 +1,325 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace sscl { + +namespace mrntt { +// Global variable to store the marionette thread ID +// Default value is 0, but should be set by application code via setMarionetteThreadId() +ThreadId marionetteThreadId = 0; + +void setMarionetteThreadId(ThreadId id) +{ + marionetteThreadId = id; +} +} // namespace mrntt + +} // namespace sscl + +namespace sscl { + +thread_local std::shared_ptr thisComponentThread; + +namespace mrntt { +// Global marionette thread instance - defined here but initialized by application +std::shared_ptr thread; +} // namespace mrntt + +// Implementation of static method +std::shared_ptr ComponentThread::getMrntt() +{ + return sscl::mrntt::thread; +} + +void MarionetteThread::initializeTls(void) +{ + thisComponentThread = shared_from_this(); +} + +void PuppetThread::initializeTls(void) +{ + thisComponentThread = shared_from_this(); +} + +bool ComponentThread::tlsInitialized(void) +{ + return thisComponentThread != nullptr; +} + +const std::shared_ptr ComponentThread::getSelf(void) +{ + if (!thisComponentThread) + { + throw std::runtime_error(std::string(__func__) + + ": TLS not initialized"); + } + + return thisComponentThread; +} + +class PuppetThread::ThreadLifetimeMgmtOp +: public PostedAsynchronousContinuation +{ +public: + ThreadLifetimeMgmtOp( + const std::shared_ptr &caller, + const std::shared_ptr &target, + Callback callback) + : PostedAsynchronousContinuation( + caller, callback), + target(target) + {} + +public: + const std::shared_ptr target; + +public: + void joltThreadReq1_posted( + [[maybe_unused]] std::shared_ptr context + ) + { + std::cout << __func__ << ": Thread '" << target->name << "': handling " + "JOLT request." + << "\n"; + + target->io_service.stop(); + callOriginalCb(); + } + + void startThreadReq1_posted( + [[maybe_unused]] std::shared_ptr context + ) + { + std::cout << __func__ << ": Thread '" << target->name << "': handling " + "startThread." + << "\n"; + + // Execute private setup sequence here + // This is where each thread would implement its specific initialization + + callOriginalCb(); + } + + void exitThreadReq1_mainQueue_posted( + [[maybe_unused]] std::shared_ptr context + ) + { + std::cout << __func__ << ": Thread '" << target->name << "': handling " + "exitThread (main queue)." << "\n"; + + target->cleanup(); + target->io_service.stop(); + callOriginalCb(); + } + + void exitThreadReq1_pauseQueue_posted( + [[maybe_unused]] std::shared_ptr context + ) + { + std::cout << __func__ << ": Thread '" << target->name << "': handling " + "exitThread (pause queue)."<< "\n"; + + target->cleanup(); + target->pause_io_service.stop(); + target->io_service.stop(); + callOriginalCb(); + } + + void pauseThreadReq1_posted( + [[maybe_unused]] std::shared_ptr context + ) + { + std::cout << __func__ << ": Thread '" << target->name << "': handling " + "pauseThread." << "\n"; + + /* We have to invoke the callback here before moving on because + * our next operation is going to block the thread, so it won't + * have a chance to invoke the callback until it's unblocked. + */ + callOriginalCb(); + target->pause_io_service.reset(); + target->pause_io_service.run(); + } + + void resumeThreadReq1_posted( + [[maybe_unused]] std::shared_ptr context + ) + { + std::cout << __func__ << ": Thread '" << target->name << "': handling " + "resumeThread." << "\n"; + + target->pause_io_service.stop(); + callOriginalCb(); + } +}; + +void ComponentThread::cleanup(void) +{ + this->keepLooping = false; +} + +void PuppetThread::joltThreadReq( + const std::shared_ptr& selfPtr, + Callback callback) +{ + /** EXPLANATION: + * We can't use shared_from_this() here because JOLTing occurs prior to + * TLS being set up. + * + * We also can't use getSelf() as yet for the same reason: getSelf() + * requires TLS to be set up. + * + * To obtain a sh_ptr to the caller, we just supply the mrntt thread since + * JOLT is always invoked by the mrntt thread. The JOLT sequence that the + * CRT main() function invokes on the mrntt thread is special since it + * supplies cmdline args and envp. + * + * To obtain a sh_ptr to the target thread, we use the selfPtr parameter + * passed in by the caller. + */ + if (id == sscl::mrntt::marionetteThreadId) + { + throw std::runtime_error(std::string(__func__) + + ": invoked on mrntt thread"); + } + + std::shared_ptr mrntt = sscl::mrntt::thread; + + auto request = std::make_shared( + mrntt, selfPtr, callback); + + this->getIoService().post( + STC(std::bind( + &ThreadLifetimeMgmtOp::joltThreadReq1_posted, + request.get(), request))); +} + +// Thread management method implementations +void PuppetThread::startThreadReq(Callback callback) +{ + std::shared_ptr caller = getSelf(); + auto request = std::make_shared( + caller, std::static_pointer_cast(shared_from_this()), + callback); + + this->getIoService().post( + STC(std::bind( + &ThreadLifetimeMgmtOp::startThreadReq1_posted, + request.get(), request))); +} + +void PuppetThread::exitThreadReq(Callback callback) +{ + std::shared_ptr caller = getSelf(); + auto request = std::make_shared( + caller, std::static_pointer_cast(shared_from_this()), + callback); + + this->getIoService().post( + STC(std::bind( + &ThreadLifetimeMgmtOp::exitThreadReq1_mainQueue_posted, + request.get(), request))); + + pause_io_service.post( + STC(std::bind( + &ThreadLifetimeMgmtOp::exitThreadReq1_pauseQueue_posted, + request.get(), request))); +} + +void PuppetThread::pauseThreadReq(Callback callback) +{ + if (id == sscl::mrntt::marionetteThreadId) + { + throw std::runtime_error(std::string(__func__) + + ": invoked on mrntt thread"); + } + + std::shared_ptr caller = getSelf(); + auto request = std::make_shared( + caller, std::static_pointer_cast(shared_from_this()), + callback); + + this->getIoService().post( + STC(std::bind( + &ThreadLifetimeMgmtOp::pauseThreadReq1_posted, + request.get(), request))); +} + +void PuppetThread::resumeThreadReq(Callback callback) +{ + if (id == sscl::mrntt::marionetteThreadId) + { + throw std::runtime_error(std::string(__func__) + + ": invoked on mrntt thread"); + } + + // Post to the pause_io_service to unblock the paused thread + std::shared_ptr caller = getSelf(); + auto request = std::make_shared( + caller, std::static_pointer_cast(shared_from_this()), + callback); + + pause_io_service.post( + STC(std::bind( + &ThreadLifetimeMgmtOp::resumeThreadReq1_posted, + request.get(), request))); +} + +// CPU management method implementations +int ComponentThread::getAvailableCpuCount() +{ + int cpuCount = sysconf(_SC_NPROCESSORS_ONLN); + if (cpuCount <= 0) + { + throw std::runtime_error(std::string(__func__) + + ": Failed to determine CPU count"); + } + + // Check if std::thread::hardware_concurrency() matches sysconf result + unsigned int hwConcurrency = std::thread::hardware_concurrency(); + if (hwConcurrency != static_cast(cpuCount)) + { + std::cerr << "Warning: CPU count mismatch - " + "std::thread::hardware_concurrency() = " + << hwConcurrency << ", sysconf(_SC_NPROCESSORS_ONLN) = " + << cpuCount << "\n"; + } + + return cpuCount; +} + +void PuppetThread::pinToCpu(int cpuId) +{ + if (cpuId < 0) + { + throw std::runtime_error(std::string(__func__) + + ": Invalid CPU ID: " + std::to_string(cpuId)); + } + + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(cpuId, &cpuset); + + int result = pthread_setaffinity_np( + thread.native_handle(), sizeof(cpu_set_t), &cpuset); + if (result != 0) + { + throw std::runtime_error(std::string(__func__) + + ": Failed to pin thread to CPU " + std::to_string(cpuId) + + ": " + std::strerror(result)); + } + + pinnedCpuId = cpuId; +} + +} // namespace sscl diff --git a/src/lockerAndInvokerBase.cpp b/src/lockerAndInvokerBase.cpp new file mode 100644 index 0000000..47df4a1 --- /dev/null +++ b/src/lockerAndInvokerBase.cpp @@ -0,0 +1,5 @@ +#include + +namespace sscl { + +} // namespace sscl diff --git a/src/puppetApplication.cpp b/src/puppetApplication.cpp new file mode 100644 index 0000000..f48dfec --- /dev/null +++ b/src/puppetApplication.cpp @@ -0,0 +1,222 @@ +#include +#include +#include +#include +#include +#include + +namespace sscl { + +PuppetApplication::PuppetApplication( + const std::vector> &threads) +: componentThreads(threads) +{ +} + +class PuppetApplication::PuppetThreadLifetimeMgmtOp +: public NonPostedAsynchronousContinuation +{ +public: + PuppetThreadLifetimeMgmtOp( + PuppetApplication &parent, unsigned int nThreads, + Callback callback) + : NonPostedAsynchronousContinuation(callback), + loop(nThreads), + parent(parent) + {} + +public: + AsynchronousLoop loop; + PuppetApplication &parent; + +public: + void joltAllPuppetThreadsReq1( + [[maybe_unused]] std::shared_ptr context + ) + { + loop.incrementSuccessOrFailureDueTo(true); + if (!loop.isComplete()) { + return; + } + + parent.threadsHaveBeenJolted = true; + callOriginalCb(); + } + + void executeGenericOpOnAllPuppetThreadsReq1( + [[maybe_unused]] std::shared_ptr context + ) + { + loop.incrementSuccessOrFailureDueTo(true); + if (!loop.isComplete()) { + return; + } + + callOriginalCb(); + } + + void exitAllPuppetThreadsReq1( + [[maybe_unused]] std::shared_ptr context + ) + { + loop.incrementSuccessOrFailureDueTo(true); + if (!loop.isComplete()) { + return; + } + + for (auto& thread : parent.componentThreads) { + thread->thread.join(); + } + + callOriginalCb(); + } +}; + +void PuppetApplication::joltAllPuppetThreadsReq( + Callback callback + ) +{ + if (threadsHaveBeenJolted) + { + std::cout << "Mrntt: All puppet threads already JOLTed. " + << "Skipping JOLT request." << "\n"; + callback.callbackFn(); + return; + } + + // If no threads, set flag and call callback immediately + if (componentThreads.size() == 0 && callback.callbackFn) + { + threadsHaveBeenJolted = true; + callback.callbackFn(); + return; + } + + // Create a counter to track when all threads have been jolted + auto request = std::make_shared( + *this, componentThreads.size(), callback); + + for (auto& thread : componentThreads) + { + thread->joltThreadReq( + thread, + {request, std::bind( + &PuppetThreadLifetimeMgmtOp::joltAllPuppetThreadsReq1, + request.get(), request)}); + } +} + +void PuppetApplication::startAllPuppetThreadsReq( + Callback callback + ) +{ + // If no threads, call callback immediately + if (componentThreads.size() == 0 && callback.callbackFn) + { + callback.callbackFn(); + return; + } + + // Create a counter to track when all threads have started + auto request = std::make_shared( + *this, componentThreads.size(), callback); + + for (auto& thread : componentThreads) + { + thread->startThreadReq( + {request, std::bind( + &PuppetThreadLifetimeMgmtOp::executeGenericOpOnAllPuppetThreadsReq1, + request.get(), request)}); + } +} + +void PuppetApplication::pauseAllPuppetThreadsReq( + Callback callback + ) +{ + // If no threads, call callback immediately + if (componentThreads.size() == 0 && callback.callbackFn) + { + callback.callbackFn(); + return; + } + + // Create a counter to track when all threads have paused + auto request = std::make_shared( + *this, componentThreads.size(), callback); + + for (auto& thread : componentThreads) + { + thread->pauseThreadReq( + {request, std::bind( + &PuppetThreadLifetimeMgmtOp::executeGenericOpOnAllPuppetThreadsReq1, + request.get(), request)}); + } +} + +void PuppetApplication::resumeAllPuppetThreadsReq( + Callback callback + ) +{ + // If no threads, call callback immediately + if (componentThreads.size() == 0 && callback.callbackFn) + { + callback.callbackFn(); + return; + } + + // Create a counter to track when all threads have resumed + auto request = std::make_shared( + *this, componentThreads.size(), callback); + + for (auto& thread : componentThreads) + { + thread->resumeThreadReq( + {request, std::bind( + &PuppetThreadLifetimeMgmtOp::executeGenericOpOnAllPuppetThreadsReq1, + request.get(), request)}); + } +} + +void PuppetApplication::exitAllPuppetThreadsReq( + Callback callback + ) +{ + // If no threads, call callback immediately + if (componentThreads.size() == 0 && callback.callbackFn) + { + callback.callbackFn(); + return; + } + + // Create a counter to track when all threads have exited + auto request = std::make_shared( + *this, componentThreads.size(), callback); + + for (auto& thread : componentThreads) + { + thread->exitThreadReq( + {request, std::bind( + &PuppetThreadLifetimeMgmtOp::executeGenericOpOnAllPuppetThreadsReq1, + request.get(), request)}); + } +} + +void PuppetApplication::distributeAndPinThreadsAcrossCpus() +{ + int cpuCount = ComponentThread::getAvailableCpuCount(); + + // Distribute and pin threads across CPUs + int threadIndex = 0; + for (auto& thread : componentThreads) + { + int targetCpu = threadIndex % cpuCount; + thread->pinToCpu(targetCpu); + ++threadIndex; + } + + std::cout << __func__ << ": Distributed " << threadIndex << " threads " + << "across " << cpuCount << " CPUs\n"; +} + +} // namespace sscl diff --git a/src/qutex.cpp b/src/qutex.cpp new file mode 100644 index 0000000..ac1b7c2 --- /dev/null +++ b/src/qutex.cpp @@ -0,0 +1,380 @@ +#include +#include + +namespace sscl { + +bool Qutex::tryAcquire( + const LockerAndInvokerBase &tryingLockvoker, int nRequiredLocks + ) +{ + lock.acquire(); + + const int qNItems = static_cast(queue.size()); + + // If queue is empty, this should never happen since we register before trying to acquire + if (qNItems < 1) + { + lock.release(); + + throw std::runtime_error( + std::string(__func__) + + ": tryAcquire called on empty queue - this should never happen"); + } + + // If lock is already owned, fail + if (isOwned) + { + lock.release(); + return false; + } + + /** EXPLANATION: + * Calculate how many items from the rear we need to scan + * + * For nRequiredLocks=1: must be at front (nRearItemsToScan = qNItems, scan all) + * For nRequiredLocks=2: must be in top 50% (nRearItemsToScan = qNItems/2) + * For nRequiredLocks=3: must be in top 66% (nRearItemsToScan = qNItems/3) + * etc. + */ + const int nRearItemsToScan = qNItems / nRequiredLocks; + + // If we're the only item in queue, or if the fraction calculation + // results in 0 rear items to scan, we automatically succeed + if (qNItems == 1 || nRearItemsToScan < 1) + { + isOwned = true; +#ifdef CONFIG_ENABLE_DEBUG_LOCKS + // Use the stored iterator from the LockSet + auto it = tryingLockvoker.getLockvokerIteratorForQutex(*this); + currOwner = *it; +#endif + lock.release(); + return true; + } + + // For single-lock requests, they must be at the front of the queue + if (nRequiredLocks == 1) + { + bool ret = false; + + if ((*queue.front()) == tryingLockvoker) + { + isOwned = true; +#ifdef CONFIG_ENABLE_DEBUG_LOCKS + currOwner = queue.front(); +#endif + ret = true; + } + else { + ret = false; + } + + lock.release(); + return ret; + } + + // For multi-lock requests, check if the lockvoker is in the rear portion + // If it's NOT in the rear portion, then it's in the top X% and should succeed + auto rIt = queue.rbegin(); + auto rEndIt = queue.rend(); + bool foundInRear = false; + + for (int i = 0; i < nRearItemsToScan && rIt != rEndIt; ++rIt, ++i) + { + if ((**rIt) == tryingLockvoker) + { + foundInRear = true; + break; + } + } + + if (foundInRear) + { + // Found in rear portion - not in top X%, so fail + lock.release(); + return false; + } + + // Not found in rear portion - must be in top X%, so succeed + isOwned = true; +#ifdef CONFIG_ENABLE_DEBUG_LOCKS + // Use the stored iterator from the LockSet + auto it = tryingLockvoker.getLockvokerIteratorForQutex(*this); + currOwner = *it; +#endif + lock.release(); + return true; +} + +void Qutex::backoff( + const LockerAndInvokerBase &failedAcquirer, int nRequiredLocks + ) +{ + lock.acquire(); + + const int nQItems = static_cast(queue.size()); + + if (nQItems < 1) + { + lock.release(); + + throw std::runtime_error( + std::string(__func__) + + ": backoff called on empty queue - this should never happen"); + } + + /* Check if failedAcquirer is at the front of the queue with + * nRequiredLocks == 1. This should never happen because an + * acquirer at the front of the queue with nRequiredLocks == 1 + * should always succeed. + */ + const LockerAndInvokerBase& oldFront = *queue.front(); + if (oldFront == failedAcquirer && nRequiredLocks == 1) + { + lock.release(); + + throw std::runtime_error( + std::string(__func__) + + ": Failed acquirer is at front of queue with nRequiredLocks==1 - " + "acquirer at front of queue with nRequiredLocks==1 should always " + "succeed."); + } + + // Rotate queue members if failedAcquirer is at front of queue + if (oldFront == failedAcquirer && nQItems > 1) + { + /** EXPLANATION: + * Rotate the top LockSet.size() items in the queue by moving + * the failedAcquirer to the last position in the top + * LockSet.size() items within the queue. + * + * I.e: if queue.size()==20, and lockSet.size()==5, then move + * failedAcquirer from the front to the 5th position in the queue, + * which should push the other 4 items forward. + * If queue.size()==3 and LockSet.size()==5, then just + * push_back(failedAcquirer). + * + * It is impossible for a Qutex queue to have only one + * item in it, yet for that Lockvoker item to have failed to + * acquire the Qutex. Being the only item in the ticketQ + * means that you must succeed at acquiring the Qutex. + */ + int indexOfItemToInsertCurrFrontBefore; + if (nQItems > nRequiredLocks) { + indexOfItemToInsertCurrFrontBefore = nRequiredLocks; + } else + { + // -1 means insert at back -- i.e, use list::end() as insertPos. + indexOfItemToInsertCurrFrontBefore = -1; + } + + /* EXPLANATION: + * Rotate them here. + * + * The reason why we do this rotation is to avoid a particular kind + * of deadlock wherein a grid of async requests is perfectly + * configured so as to guarantee that none of them can make any + * forward progress unless they get reordered. + * + * Consider 2 different locks with 2 different items in them + * each, both of which come from 2 particular requests: + * Qutex1: Lockvoker1, Lv2 + * Qutex2: Lv2, Lv1 + * + * Moreover, both of these lockvokers have requiredLocks.size()==2, + * and the particular 2 locks that each one requires are indeed + * Qutex1 and Qutex2. + * + * This particular setup basically means that in TL1's queue, Lv1 + * will wakeup since it's at the front of TL1. It'll successfully + * acquire TL1 (since it's at the front), and then it'll try to + * acquire TL2. But since Lv1 isn't in the top 50% of items in TL2's + * queue, Lv1 will fail to acquire TL2. + * + * Then similarly, in TL2's queue, Lv2 will wakeup since it's at + * the front. Again, it'll successfully acquire TL2 since it's at + * the front of TL2's queue. But then it'll try to acquire TL1. + * Since it's not in the top 50% of TL1's enqueued items, it'll fail + * to acquire TL1. + * + * N.B: This type of perfectly ordered deadlock can occur in any + * kind of NxN situation where ticketQ.size()==requiredLocks.size(). + * That could be 4x4, 5x5, 6x6, etc. It doesn't happen in 1x1 + * because a Lockvoker that only requires one lock will always just + * succeed if it's at the front of its queue. + * + * This state of affairs is stable and will persist unless these + * queues are reordered in some way. Hence: that's why we rotate the + * items in a QutexQ after backing off of it. Backing off means + * Not necessarily that the calling LockVoker failed to acquire + * THIS PARTICULAR Qutex, but rather than it failed to acquire + * ALL of its required locks. + * + * Hence, if we are backing out, we should also rotate the items + * in the queue if the current front item is the failed acquirer. + * So that's why we do this rotation here. + */ + + // Find the iterator for the failed acquirer (which is at the front) + auto frontIt = queue.begin(); + + // Find the position to insert before using indexOfItemToInsertCurrFrontBefore + auto insertPos = queue.begin(); + if (indexOfItemToInsertCurrFrontBefore == -1) + { + // -1 means insert at the back (before end()) + insertPos = queue.end(); + } + else + { + // Move to the specified position (0-based index) + for ( + int i = 0; + i < indexOfItemToInsertCurrFrontBefore + && insertPos != queue.end(); ++i) + { + ++insertPos; + } + } + + /** NOTE: + * According to https://en.cppreference.com/w/cpp/container/list/splice: + * "No iterators or references become invalidated. If *this and other + * refer to different objects, the iterators to the transferred elements + * now refer into *this, not into other." + * + * So our stored iterator inside of LockSet will still be valid after + * the splice, and we can use it to unregister the lockvoker later on. + */ + queue.splice(insertPos, queue, frontIt); + } + + isOwned = false; +#ifdef CONFIG_ENABLE_DEBUG_LOCKS + currOwner = nullptr; +#endif + LockerAndInvokerBase &newFront = *queue.front(); + + lock.release(); + + /** EXPLANATION: + * Why should this never happen? Well, if we were at the front of the queue + * and we failed to acquire the lock, we should have been rotated away from + * the front. On the other hand, if we were not at the front of the queue + * and we failed to acquire the lock, then we weren't at the front of the + * queue to begin with. + * The exception is if the queue has only one item in it. + * + * Hence there ought to be no way for the failedAcquirer to be at the front + * of the queue at this point UNLESS the queue has only one item in it. + */ + if (newFront == failedAcquirer && nQItems > 1) + { + throw std::runtime_error( + std::string(__func__) + + ": Failed acquirer is at the front of the queue at the end of " + "backoff, yet nQItems > 1 - this should never happen"); + } + + /** EXPLANATION: + * We should always awaken whoever is at the front of the queue, even if + * we didn't rotate. Why? Consider this scenario: + * + * Lv1 has LockSet.size==1. Lv2 has LockSet.size==3. + * Lv1's required lock overlaps with Lv2's set of 3 required locks. + * Lv1 registers itself in its 1 qutex's queue. + * Lv2 registers itself in all 3 of its qutexes' queues. + * Lv2 acquires the lock that it needs in common with Lv1. + * (Assume that Lv2 was not at the front of the common qutex's + * internal queue -- it only needed to be in the top 66%.) + * Lv1 tries to acquire the common lock and fails. It gets taken off of + * its io_service. It's now asleep until it gets + * re-added into an io_service. + * Lv2 fails to acquire the other 2 locks it needs and backoff()s from + * the common lock it shares with Lv1. + * + * If Lv2 does NOT awaken the item at the front of the common lock's + * queue (aka: Lv1), then Lv1 is doomed to never wake up again. + * + * Hence: backout() callers should always wake up the lockvoker at the + * front of their queue before leaving. + * + * The exception is if the item at the front is the backout() caller + * itself. This can happen if, for example a multi-locking lockvoker + * is backing off of a qutex within which it's the only waiter. + */ + if (nQItems > 1) { + newFront.awaken(); + } +} + +void Qutex::release() +{ + lock.acquire(); + + /** EXPLAINATION: + * A qutex must not have its release() called when it's not owned. The + * plumbing required to permit that is a bit excessive, and we have + * instrumentation to track early qutex release()ing in + * SerializedAsynchronousContinuation. + */ + if (!isOwned +#ifdef CONFIG_ENABLE_DEBUG_LOCKS + || currOwner == nullptr +#endif + ) + { + throw std::runtime_error( + std::string(__func__) + + ": release() called on unowned qutex - this should never happen"); + } + + isOwned = false; +#ifdef CONFIG_ENABLE_DEBUG_LOCKS + currOwner = nullptr; +#endif + + // It's possible for there to be 0 items left in queue after unregistering. + if (queue.empty()) + { + lock.release(); + return; + } + + /** EXPLANATION: + * It would be nice to be able to optimize by only awakening if the + * release()ing lockvoker was at the front of the qutexQ, but if we + * don't unconditionally wakeup() the front item, we could get lost + * wakeups. Consider: + * + * Lv1 only has 1 requiredLock. + * Lv2 has 3 requiredLocks. One of its requiredLocks overlaps with + * Lv1's single requiredLock. So they both share a common lock. + * Lv3's currently owns Lv1 & Lv2's common requiredLock. + * Lv3 release()s that common lock. + * Lv1 happens to be next in queue after Lv3 unregisters itself. + * Lv3 wakes up Lv1. + * Just before Lv1 can acquire the common lock, Lv2 acquires it now, + * because it only needs to be in the top 66% to succeed. + * Lv1 checks the currOwner and sees that it's owned. Lv1 is now + * dequeued from its io_service. It won't be awakened until someone + * awakens it. + * Lv2 finishes its critical section and releas()es the common lock. + * Lv2 was not at the front of the qutexQ, so it does NOT awaken the + * current item at the front. + * + * Thus, Lv1 never gets awakened again. The end. + * This also means that no LockSet.size()==1 lockvoker will ever be able + * to run again since they can only run if they are at the front of the + * qutexQ. + * + * Therefore we must always awaken the front item when releas()ing. + */ + LockerAndInvokerBase &front = *queue.front(); + + lock.release(); + + front.awaken(); +} + +} // namespace sscl diff --git a/src/qutexAcquisitionHistoryTracker.cpp b/src/qutexAcquisitionHistoryTracker.cpp new file mode 100644 index 0000000..4731ca0 --- /dev/null +++ b/src/qutexAcquisitionHistoryTracker.cpp @@ -0,0 +1,393 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace sscl { + +void DependencyGraph::addNode(const Node& node) +{ + adjacencyList[node]; // Creates empty set if node doesn't exist +} + +void DependencyGraph::addEdge(const Node& source, const Node& target) +{ + addNode(source); + addNode(target); + adjacencyList[source].insert(target); +} + +std::vector> +DependencyGraph::findCycles() const +{ + std::unordered_set visited; + std::unordered_set recursionStack; + std::vector> cycles; + std::vector path; + + for (const auto& entry : adjacencyList) + { + const Node& node = entry.first; + if (visited.find(node) == visited.end()) { + dfsCycleDetection(node, visited, recursionStack, path, cycles); + } + } + + return cycles; +} + +bool DependencyGraph::hasCycles() const +{ + std::unordered_set visited; + std::unordered_set recursionStack; + std::vector> cycles; + std::vector path; + + for (const auto& entry : adjacencyList) + { + const Node& node = entry.first; + if (visited.find(node) == visited.end()) + { + dfsCycleDetection(node, visited, recursionStack, path, cycles); + if (!cycles.empty()) + { return true; } + } + } + + return false; +} + +size_t DependencyGraph::getNodeCount() const +{ + return adjacencyList.size(); +} + +void DependencyGraph::dfsCycleDetection( + const Node& node, + std::unordered_set& visited, + std::unordered_set& recursionStack, + std::vector& path, + std::vector>& cycles + ) + const +{ + // Mark current node as visited and add to recursion stack + visited.insert(node); + recursionStack.insert(node); + path.push_back(node); + + // Check all adjacent nodes + auto it = adjacencyList.find(node); + if (it != adjacencyList.end()) + { + for (const auto& adjacent : it->second) + { + // If adjacent node is in recursion stack, we found a cycle + if (recursionStack.find(adjacent) != recursionStack.end()) + { + // Find the start of the cycle in the current path + auto cycleStart = std::find(path.begin(), path.end(), adjacent); + if (cycleStart != path.end()) + { + std::vector cycle(cycleStart, path.end()); + cycle.push_back(adjacent); // Complete the cycle + cycles.push_back(cycle); + } + } + // If adjacent node hasn't been visited, recurse + else if (visited.find(adjacent) == visited.end()) + { + dfsCycleDetection( + adjacent, visited, recursionStack, path, cycles); + } + } + } + + // Remove from recursion stack and path when backtracking + recursionStack.erase(node); + path.pop_back(); +} + +// QutexAcquisitionHistoryTracker implementation +std::unique_ptr +QutexAcquisitionHistoryTracker::generateGraph(bool dontAcquireLock) +{ + auto graph = std::make_unique(); + + if (!dontAcquireLock) { + acquisitionHistoryLock.acquire(); + } + + // First pass: Add all continuations as nodes + for (const auto& entry : acquisitionHistory) + { + const auto& continuation = entry.first; + graph->addNode(continuation); + } + + // Second pass: Add edges based on lock dependencies + for (const auto& entry : acquisitionHistory) + { + const auto& continuation = entry.first; + const auto& historyEntry = entry.second; + const auto& wantedLock = historyEntry.first; + const auto& heldLocks = historyEntry.second; + + if (!heldLocks) { continue; } + + // Check if any other continuation holds the lock this continuation wants + for (const auto& otherEntry : acquisitionHistory) + { + const auto& otherContinuation = otherEntry.first; + const auto& otherHistoryEntry = otherEntry.second; + const auto& otherHeldLocks = otherHistoryEntry.second; + + // Skip self-comparison + if (continuation == otherContinuation) { continue; } + if (!otherHeldLocks) { continue; } + + // Check if other continuation holds the wanted lock + for (const auto& otherHeldLock : *otherHeldLocks) + { + if (&otherHeldLock.get() == &wantedLock.get()) + { + // Add edge: continuation -> otherContinuation + // (continuation wants a lock held by otherContinuation) + graph->addEdge(continuation, otherContinuation); + break; + } + } + } + } + + if (!dontAcquireLock) { + acquisitionHistoryLock.release(); + } + + return graph; +} + +/** EXPLANATION - GRIDLOCK DETECTION ALGORITHM: + * This file implements gridlock detection algorithms that use a central + * acquisition history to track all lockvokers suspected of being gridlocked. + * + * ALGORITHM OVERVIEW: + * 1. When a lockvoker finds that DEADLOCK_TIMEOUT_MS has elapsed and it + * still can't acquire a particular lock (firstFailedQutex), it creates + * a new entry in a global acquisition history. + * + * 2. The acquisition history is an unordered_map with: + * - Key: std::shared_ptr + * (the timed-out lockvoker's continuation) + * - Value: std::pair< + * std::reference_wrapper, + * std::unique_ptr>>> + * * pair.first: The firstFailedQutex that this lockvoker WANTS but + * can't acquire. This metadata is essential for later-arriving + * entrants to analyze what their predecessor timed-out sequences + * want. + * * pair.second: A unique_ptr to a list of all acquired Qutexes in this + * lockvoker's continuation history. + * + * 3. Each timed-out lockvoker: + * a) Adds itself to the acquisition history map with its wanted lock and + * acquired locks + * b) Iterates through all OTHER entries in the map (excluding itself) + * c) For each other entry, checks if that entry's acquired locks + * (pair.second) contains the lock that this lockvoker wants + * (aka: firstFailedQutex/pair.first) + * d) If found, we have detected a gridlock: two sequences where at least + * one wants a lock held by the other, and the other wants a lock that + * it can't acquire. + * + * GRIDLOCK CONDITION: + * A gridlock exists when we find a circular chain of dependencies: + * - Lockvoker A wants LockX but can't acquire it (held by Lockvoker B) + * - Lockvoker B wants LockY but can't acquire it (held by Lockvoker C, D, etc.) + * - The chain must be circular (eventually leading back to Lockvoker A or another + * lockvoker in the chain) to ensure it's a true gridlock, not just a delay + * + * TIMED DELAY, I/O DELAY, or LONG-RUNNING OPERATION FALSE-POSITIVE: + * Without circularity detection, we could incorrectly flag a simple delay, I/O + * delay, or long-running operation as a gridlock. For example: Lockvoker A + * wants LockX (held by Lockvoker B), and Lockvoker B is currently in a 10-second + * sleep/delay. When B wakes up, it will release LockX, allowing A to proceed. + * This is not a gridlock - it's just A waiting longer than DEADLOCK_TIMEOUT_MS + * for B to finish its work. True gridlocks require circular dependencies where + * no sequence can make progress because they're all waiting for each other in + * a cycle. + * + * The central history metadata enables us to detect complex gridlocks involving + * multiple lockvokers (2, 3, 4, 5+ sequences) by building up the acquisition + * history over time as different lockvokers timeout and add their information. + */ + +bool QutexAcquisitionHistoryTracker +::heuristicallyTraceContinuationHistoryForGridlockOn( + Qutex &firstFailedQutex, + std::shared_ptr& currentContinuation) +{ + /** HEURISTIC APPROACH: + * Due to the computational complexity of full circularity detection, + * we implement a heuristically adequate check: when we find 2 sequences + * where one depends on the other, and the other has reached timeout, + * we assume this is a likely gridlock. This approach is not + * algorithmically complete (it may miss some complex circular + * dependencies or flag false positives), but it is heuristically useful + * for debugging and identifying potential concurrency issues in + * practice. + * + * See the file-local comment above for the complete algorithm + * explanation. + */ + + /** NOTICE: + * Generally we should have all global data structures owned by a single + * ComponentThread; and qutexes really should only be used to serialize + * async sequences being enqueued on the same ComponentThread. But this + * doesn't prevent multiple CPUs from trying to add/remove entries to/from + * the acquisition history at the same time. Why? The acquisition history + * isn't per-CPU, it's global. + * + * The problem with using a SpinLock here is that if the STL uses mutexes + * internally to lock containers, we could end up in a situation where + * spinning waiters will be busy-spinning while the owner is sleeping? + * + * But this should not happen since the nature of the order of operations is + * that the spinlock ensures that only one CPU at a time can be + * adding/removing entries; and thus everytime an method is called on the + * unordered_map, the caller will always succeed at acquiring the underlying + * STL mutex. + * + * So it should be safe to use a SpinLock here. + */ + acquisitionHistoryLock.acquire(); + + // Iterate through all entries in the acquisition history + for (const auto& entry : acquisitionHistory) + { + const auto& continuation = entry.first; + const auto& historyEntry = entry.second; + + // Skip the current continuation (don't compare with itself) + if (continuation == currentContinuation) { + continue; + } + + // Check if firstFailedQutex is in this continuation's held locks + const std::unique_ptr>>& + heldLocks = historyEntry.second; + + if (!heldLocks) + { continue; } + + for (const auto& heldLock : *heldLocks) + { + /* Found firstFailedQutex in another continuation's held locks + * This indicates a potential gridlock + */ + if (&heldLock.get() != &firstFailedQutex) + { continue; } + + acquisitionHistoryLock.release(); + + std::cerr << __func__ << ": GRIDLOCK DETECTED: Current " + "continuation @" << currentContinuation.get() + << " wants lock '" << firstFailedQutex.name + << "' which is held by continuation @" + << continuation.get() << std::endl; + + return true; + } + } + + acquisitionHistoryLock.release(); + return false; +} + +bool QutexAcquisitionHistoryTracker +::completelyTraceContinuationHistoryForGridlockOn(Qutex &firstFailedQutex) +{ + (void)firstFailedQutex; + + /** ALGORITHMICALLY COMPLETE VERSION: + * This function implements the algorithmically complete version of gridlock + * detection that performs full circularity detection. It builds a dependency + * graph from the acquisition history and uses DFS with cycle detection to + * identify true circular dependencies. + * + * See the file-local comment above for the complete algorithm explanation. + */ + + acquisitionHistoryLock.acquire(); + + // Helper function to print continuation dependency info + auto printContinuationDependency = [&]( + const auto& fromContinuation, const auto& toContinuation + ) + { + auto it = acquisitionHistory.find(fromContinuation); + if (it != acquisitionHistory.end()) + { + const auto& wantedLock = it->second.first; + std::cerr << " Continuation @" << fromContinuation.get() + << " wants lock[\"" << wantedLock.get().name << "\"], " + << "held by continuation @" << toContinuation.get() + << std::endl; + } + else + { + std::cerr << " Continuation @" << fromContinuation.get() + << " -> continuation @" << toContinuation.get() + << std::endl; + } + }; + + // Pass true to dontAcquireLock since we already hold it + auto graph = generateGraph(true); + + // Early return if no graph or no cycles + if (!graph || !graph->hasCycles()) + { + acquisitionHistoryLock.release(); + return false; + } + + auto cycles = graph->findCycles(); + + std::cerr << __func__ << ": CIRCULAR DEPENDENCIES DETECTED: Found " + << cycles.size() << " cycle(s) in lock dependency graph:" << std::endl; + + for (size_t i = 0; i < cycles.size(); ++i) + { + const auto& cycle = cycles[i]; + std::cerr << " Cycle " << (i + 1) << ":\n"; + + for (size_t j = 0; j < cycle.size() - 1; ++j) + { + const auto& currentContinuation = cycle[j]; + const auto& nextContinuation = cycle[j + 1]; + printContinuationDependency(currentContinuation, nextContinuation); + } + + if (cycle.empty()) + { continue; } + + /* Handle the last edge (back to start of cycle) + */ + const auto& lastContinuation = cycle[cycle.size() - 1]; + const auto& firstContinuation = cycle[0]; + printContinuationDependency(lastContinuation, firstContinuation); + } + + acquisitionHistoryLock.release(); + + return true; +} + +} // namespace sscl