Compare commits

...

9 Commits

Author SHA1 Message Date
hayodea 3d81ee92aa New test support harness primitives for testing stimbuffapis 2026-06-13 18:47:44 -04:00
hayodea 2f31e9a034 Adversarial review on test porting plan 2026-06-13 17:59:06 -04:00
hayodea a29c779f6e Tests: Add all tests from the coro creation repo
We went back and brought along all the tests we implemented while
we were building the new coro framework.
2026-06-13 17:17:57 -04:00
hayodea 1763685c0e Tests: Move qutex and nursery tests into Libspinscale repo 2026-06-13 16:17:40 -04:00
hayodea 016b2d26de SharedRsrcGrp: Allow construction of rsrc by copy 2026-06-13 11:46:16 -04:00
hayodea ffe86369e2 Add EnvKvStore for envvar parsing and interleaving 2026-06-11 19:16:46 -04:00
hayodea 00be517f30 Printing: print fewer blank newlines 2026-06-11 11:16:37 -04:00
hayodea ebf0fa2921 Nursery: Document intended usage form 2026-06-09 21:25:25 -04:00
hayodea d33e70f14a Nursery: document syncAwaitAll's caller io_context requirement for LLMs 2026-06-09 16:48:58 -04:00
28 changed files with 5558 additions and 22 deletions
+3
View File
@@ -0,0 +1,3 @@
[submodule "googletest"]
path = googletest
url = https://github.com/google/googletest.git
+28
View File
@@ -1,6 +1,8 @@
cmake_minimum_required(VERSION 3.16)
project(libspinscale VERSION 0.1.0 LANGUAGES CXX)
include(GNUInstallDirs)
# Set C++ standard
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
@@ -80,6 +82,7 @@ add_library(spinscale SHARED
src/qutex.cpp
src/componentThread.cpp
src/component.cpp
src/envKvStore.cpp
src/puppeteerComponent.cpp
src/puppetApplication.cpp
src/runtime.cpp
@@ -147,6 +150,31 @@ install(DIRECTORY include/spinscale
FILES_MATCHING PATTERN "*.h"
)
if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR)
set(_libspinscaleTestsDefault ON)
else()
set(_libspinscaleTestsDefault OFF)
if(DEFINED ENABLE_TESTS AND ENABLE_TESTS)
set(_libspinscaleTestsDefault ON)
endif()
endif()
option(LIBSPINSCALE_BUILD_TESTS "Build libspinscale unit tests"
${_libspinscaleTestsDefault})
if(LIBSPINSCALE_BUILD_TESTS)
if(NOT TARGET gtest AND NOT TARGET gtest_main)
set(gtest_force_shared_crt ON CACHE BOOL "" FORCE)
add_subdirectory(
${CMAKE_CURRENT_SOURCE_DIR}/googletest
${CMAKE_CURRENT_BINARY_DIR}/googletest
EXCLUDE_FROM_ALL)
endif()
enable_testing()
add_subdirectory(tests)
endif()
install(FILES include/boostAsioLinkageFix.h
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}
)
+10 -1
View File
@@ -204,7 +204,8 @@ nursery.launch(
nursery.requestCancelOnAll();
nursery.closeAdmission();
nursery.syncAwaitAllSettlements(ioContext);
nursery.syncAwaitAllSettlements(
sscl::ComponentThread::getSelf()->getIoContext());
```
Each slot owns a `SyncCancelerForAsyncWork`. `requestCancelOnAll()` only signals
@@ -213,6 +214,14 @@ completion callbacks run. Call `closeAdmission()` explicitly before
`asyncAwaitAllSettlements()` or `syncAwaitAllSettlements()`; those APIs wait until
all slots have retired naturally and throw if admission is still open.
`syncAwaitAllSettlements()` runs a nested `io_context` loop on the **calling
thread** (it blocks in `run_one()` until every slot has retired). Pass the
caller thread's `io_context` — usually `ComponentThread::getSelf()->getIoContext()`
— not some other thread's context. While the caller is blocked pumping another
thread's queue, handlers posted to the caller's own `io_context` are abandoned
and the drain can deadlock even when in-flight work has already completed on a
different thread.
`launch(factory, onSettledHook)` registers a non-null hook before `fillSlot()`.
Omit the hook (default `nullptr`) when no settlement callback is needed.
`Slot::Lease` is commit-required: an uncommitted lease removes its
Submodule
+1
Submodule googletest added at 7140cd416c
+27 -18
View File
@@ -74,6 +74,29 @@ auto asAwaiter(T &t) noexcept(noexcept(get_operator_co_await(t)))
return get_operator_co_await(t);
}
inline bool endsWithLineBreak(const std::string &message)
{
return !message.empty()
&& (message.back() == '\n' || message.back() == '\r');
}
inline void appendGroupAdapterExceptionLine(
std::ostringstream &ostream, std::exception_ptr exceptionPtr)
{
ostream << "Exc thrown in Group Adapter: ";
try {
std::rethrow_exception(exceptionPtr);
} catch (const std::exception &e) {
const std::string message = e.what();
ostream << message;
if (!endsWithLineBreak(message)) {
ostream << "\n";
}
} catch (...) {
ostream << "<unknown exception type>\n";
}
}
} // namespace detail
template <typename T>
@@ -225,15 +248,8 @@ struct Group
}
doThrow = true;
ostream << "Exc thrown in Group Adapter: ";
try {
std::rethrow_exception(item.adapterException);
} catch (const std::exception &e) {
ostream << e.what();
} catch (...) {
ostream << "<unknown exception type>";
}
ostream << "\n";
detail::appendGroupAdapterExceptionLine(
ostream, item.adapterException);
}
if (doThrow) {
@@ -578,15 +594,8 @@ struct Group
assert(item.calleeException);
hasFailures = true;
ostream << "Exc thrown in Group Adapter: ";
try {
std::rethrow_exception(item.calleeException);
} catch (const std::exception &e) {
ostream << e.what();
} catch (...) {
ostream << "<unknown exception type>";
}
ostream << "\n";
detail::appendGroupAdapterExceptionLine(
ostream, item.calleeException);
}
if (!hasFailures) {
+23 -3
View File
@@ -1,6 +1,8 @@
#ifndef NON_VIRAL_TASK_NURSERY_H
#define NON_VIRAL_TASK_NURSERY_H
#include <boostAsioLinkageFix.h>
#include <cstddef>
#include <exception>
#include <functional>
@@ -44,10 +46,25 @@ struct MemberInvoker : MemberInvokerBase
* callbacks, tracks unsettled slots, fans out cooperative cancel via per-slot
* SyncCancelerForAsyncWork, and provides drain APIs.
*
* Each nursery member must be one complete, self-contained non-viral async
* flow. For an external HTTP request, that means one coroutine should shepherd
* the whole request from framework callback through all sscl component awaits
* to final response/commit/error handling. Do not use the nursery as a place
* to reserve a slot, perform partial setup elsewhere, and later return to
* fill the slot. Do not add each individual awaited operation as a separate
* nursery member. The external submitter should add the complete flow to the
* nursery and then return; the nursery owns that flow until the flow settles.
*
* Call closeAdmission() explicitly before asyncAwaitAllSettlements() or
* syncAwaitAllSettlements(). syncAwaitAllSettlements() caller must pass the
* io_context where non-viral posting completions will land, and must ensure
* that io_context is prepared to run (e.g. not left stopped without restart).
* syncAwaitAllSettlements().
*
* syncAwaitAllSettlements() runs a nested io_context loop on the calling
* thread (AsynchronousBridge). Pass the calling thread's io_context —
* typically
* ComponentThread::getSelf()->getIoContext() — not another thread's
* io_context. If the caller pumps a different thread's queue while blocked,
* completions posted back to the caller's own io_context are never executed
* and the drain can deadlock even after cooperative cancel.
*/
class NonViralTaskNursery
{
@@ -307,6 +324,9 @@ public:
}
}
/** Nested drain: blocks the calling thread in run_one() on @p ioContext until
* all slots retire. @p ioContext must be the caller thread's io_context.
*/
void syncAwaitAllSettlements(boost::asio::io_context &ioContext)
{
if (admissionIsOpen())
@@ -25,6 +25,10 @@ public:
boost::asio::post(io_context, []{});
}
/** Blocks the calling thread in run_one() on the bridge's io_context.
* Used by syncAwaitAllSettlements(); that io_context must be the caller
* thread's own queue so posted completions on the caller are not starved.
*/
void waitForAsyncOperationCompleteOrIoContextStopped(void)
{
for (;;)
+44
View File
@@ -0,0 +1,44 @@
#ifndef SPINSCALE_ENV_KV_STORE_H
#define SPINSCALE_ENV_KV_STORE_H
#include <filesystem>
#include <optional>
#include <ostream>
#include <string>
#include <string_view>
#include <unordered_map>
#include <vector>
namespace sscl {
class EnvKvStore
{
public:
explicit EnvKvStore(
const std::vector<std::filesystem::path> &envFilePaths,
std::ostream &warningStream);
explicit EnvKvStore(
const std::vector<std::filesystem::path> &envFilePaths);
std::optional<std::string> get(std::string_view name) const;
private:
void loadFiles(
const std::vector<std::filesystem::path> &envFilePaths,
std::ostream &warningStream);
void loadFile(
const std::filesystem::path &envFilePath,
std::ostream &warningStream);
void storeValue(
const std::filesystem::path &envFilePath,
const std::string &name,
const std::string &value,
std::ostream &warningStream);
private:
std::unordered_map<std::string, std::string> values;
};
} // namespace sscl
#endif // SPINSCALE_ENV_KV_STORE_H
+5
View File
@@ -15,6 +15,11 @@ public:
: lock(lockName)
{}
SharedResourceGroup(
const std::string& lockName, const ResourceType& initialRsrc)
: lock(lockName), rsrc(initialRsrc)
{}
~SharedResourceGroup() = default;
LockType lock;
+278
View File
@@ -0,0 +1,278 @@
#include <algorithm>
#include <cctype>
#include <cstdlib>
#include <fstream>
#include <iostream>
#include <ranges>
#include <sstream>
#include <stdexcept>
#include <spinscale/envKvStore.h>
namespace sscl {
namespace {
std::string trim(std::string_view value)
{
auto begin = std::ranges::find_if_not(
value, [](unsigned char c) { return std::isspace(c); });
auto rbegin = std::ranges::find_if_not(
value | std::views::reverse,
[](unsigned char c) { return std::isspace(c); });
auto end = rbegin.base();
if (begin >= end)
{
return {};
}
return std::string(begin, end);
}
bool characterIsValidNameStart(char c)
{
return std::isalpha(static_cast<unsigned char>(c)) || c == '_';
}
bool characterIsValidNameBody(char c)
{
return std::isalnum(static_cast<unsigned char>(c)) || c == '_';
}
bool nameIsValid(std::string_view name)
{
if (name.empty() || !characterIsValidNameStart(name.front()))
{
return false;
}
return std::ranges::all_of(name.substr(1), characterIsValidNameBody);
}
std::runtime_error makeParseError(
const std::filesystem::path &envFilePath,
std::size_t lineNumber,
const std::string &message)
{
std::ostringstream stream;
stream << envFilePath << ":" << lineNumber << ": " << message;
return std::runtime_error(stream.str());
}
bool lineIsBlankOrComment(const std::string &line)
{
std::string trimmed = trim(line);
return trimmed.empty() || trimmed.front() == '#';
}
std::size_t findClosingQuote(
const std::filesystem::path &envFilePath,
std::size_t lineNumber,
std::string_view value)
{
char quote = value.front();
bool escapeNext = false;
for (std::size_t i = 1; i < value.size(); ++i)
{
if (escapeNext)
{
escapeNext = false;
continue;
}
if (quote == '"' && value[i] == '\\')
{
escapeNext = true;
continue;
}
if (value[i] == quote)
{
return i;
}
}
throw makeParseError(envFilePath, lineNumber, "Unterminated quoted value.");
}
std::string decodeDoubleQuotedValue(std::string_view value)
{
std::string decoded;
decoded.reserve(value.size());
bool escapeNext = false;
for (char c : value)
{
if (!escapeNext && c == '\\')
{
escapeNext = true;
continue;
}
if (escapeNext)
{
switch (c)
{
case 'n':
decoded.push_back('\n');
break;
case 'r':
decoded.push_back('\r');
break;
case 't':
decoded.push_back('\t');
break;
default:
decoded.push_back(c);
break;
}
escapeNext = false;
continue;
}
decoded.push_back(c);
}
if (escapeNext)
{
decoded.push_back('\\');
}
return decoded;
}
std::string parseQuotedValue(
const std::filesystem::path &envFilePath,
std::size_t lineNumber,
std::string_view value)
{
char quote = value.front();
std::size_t closingQuote = findClosingQuote(envFilePath, lineNumber, value);
std::string trailing = trim(value.substr(closingQuote + 1));
if (!trailing.empty() && trailing.front() != '#')
{
throw makeParseError(
envFilePath, lineNumber, "Unexpected text after quoted value.");
}
std::string_view quotedBody = value.substr(1, closingQuote - 1);
if (quote == '"')
{
return decodeDoubleQuotedValue(quotedBody);
}
return std::string(quotedBody);
}
std::string parseValue(
const std::filesystem::path &envFilePath,
std::size_t lineNumber,
std::string_view rawValue)
{
std::string value = trim(rawValue);
if (value.empty())
{
return {};
}
if (value.front() == '\'' || value.front() == '"')
{
return parseQuotedValue(envFilePath, lineNumber, value);
}
return trim(value.substr(0, value.find('#')));
}
std::pair<std::string, std::string> parseAssignment(
const std::filesystem::path &envFilePath,
std::size_t lineNumber,
const std::string &line)
{
std::size_t separator = line.find('=');
if (separator == std::string::npos)
{
throw makeParseError(envFilePath, lineNumber, "Expected KEY=value.");
}
std::string name = trim(std::string_view(line).substr(0, separator));
if (!nameIsValid(name))
{
throw makeParseError(envFilePath, lineNumber, "Invalid variable name.");
}
return {
std::move(name),
parseValue(
envFilePath,
lineNumber,
std::string_view(line).substr(separator + 1))};
}
} // namespace
EnvKvStore::EnvKvStore(
const std::vector<std::filesystem::path> &envFilePaths,
std::ostream &warningStream)
{
loadFiles(envFilePaths, warningStream);
}
EnvKvStore::EnvKvStore(
const std::vector<std::filesystem::path> &envFilePaths)
: EnvKvStore(envFilePaths, std::cerr)
{
}
void EnvKvStore::loadFiles(
const std::vector<std::filesystem::path> &envFilePaths,
std::ostream &warningStream)
{
for (const std::filesystem::path &envFilePath : envFilePaths)
{
loadFile(envFilePath, warningStream);
}
}
std::optional<std::string> EnvKvStore::get(std::string_view name) const
{
std::string ownedName(name);
if (const char *value = std::getenv(ownedName.c_str()))
{
return std::string(value);
}
auto value = values.find(std::string(name));
if (value == values.end())
{
return std::nullopt;
}
return value->second;
}
void EnvKvStore::loadFile(
const std::filesystem::path &envFilePath,
std::ostream &warningStream)
{
std::ifstream file(envFilePath);
if (!file)
{
throw std::runtime_error(
"Failed to open env file: " + envFilePath.string());
}
std::string line;
std::size_t lineNumber = 0;
while (std::getline(file, line))
{
++lineNumber;
if (lineIsBlankOrComment(line))
{
continue;
}
auto [name, value] = parseAssignment(envFilePath, lineNumber, line);
storeValue(envFilePath, name, value, warningStream);
}
}
void EnvKvStore::storeValue(
const std::filesystem::path &envFilePath,
const std::string &name,
const std::string &value,
std::ostream &warningStream)
{
if (auto oldValue = values.find(name); oldValue != values.end())
{
warningStream << "Warning: env file " << envFilePath
<< " overwrites " << name << " from `" << oldValue->second
<< "` to `" << value << "`.\n";
oldValue->second = value;
return;
}
values.emplace(name, value);
}
} // namespace sscl
+56
View File
@@ -0,0 +1,56 @@
add_library(spinscale_test_support STATIC
support/threadHarness.cpp
support/probeComponentThread.cpp
)
target_include_directories(spinscale_test_support PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_SOURCE_DIR}/tests/fixtures
)
target_link_libraries(spinscale_test_support PUBLIC
spinscale
gtest
)
function(add_spinscale_gtest target)
add_executable(${target} ${ARGN})
target_link_libraries(${target} PRIVATE
spinscale_test_support
gtest_main
)
add_dependencies(${target} gtest_main)
add_test(NAME ${target} COMMAND ${target})
endfunction()
add_spinscale_gtest(spinscale_env_kv_store_tests
env_kv_store_test.cpp
)
add_spinscale_gtest(qutex_tests
cps/qutex_tests.cpp
)
add_spinscale_gtest(nonViralTaskNursery_tests
co/nonViralTaskNursery_tests.cpp
)
add_spinscale_gtest(co_viral_non_posting_tests
co/viral_non_posting_tests.cpp
)
add_spinscale_gtest(co_posting_cross_thread_tests
co/posting_cross_thread_tests.cpp
)
add_spinscale_gtest(co_group_edge_tests
co/group_edge_tests.cpp
)
add_spinscale_gtest(co_group_timer_tests
co/group_timer_tests.cpp
)
add_spinscale_gtest(co_component_continuation_tests
co/component_continuation_tests.cpp
)
+250
View File
@@ -0,0 +1,250 @@
#include <exception>
#include <functional>
#include <mutex>
#include <stdexcept>
#include <string>
#include <thread>
#include <gtest/gtest.h>
#include <spinscale/co/coQutex.h>
#include <support/threadHarness.h>
namespace {
constexpr int leftValue = 1;
constexpr int rightValue = 2;
constexpr int expectedIntSum = 3;
constexpr int bodyArgument = 4;
constexpr const char *bodyStringArgument = "KEKW";
constexpr const char *leftString = "Hello";
constexpr const char *rightString = "World";
constexpr const char *expectedString = "Hello World";
using BodyNonViralInvoker =
sscl::tests::RoleNonViralPostingInvoker<
sscl::tests::PostingThreadRole::BODY>;
template <typename T>
using BodyViralInvoker =
sscl::tests::RoleViralPostingInvoker<
sscl::tests::PostingThreadRole::BODY,
T>;
template <typename T>
using WorldViralInvoker =
sscl::tests::RoleViralPostingInvoker<
sscl::tests::PostingThreadRole::WORLD,
T>;
template <typename T>
using LegViralInvoker =
sscl::tests::RoleViralPostingInvoker<
sscl::tests::PostingThreadRole::LEG,
T>;
class ComponentContinuationTrace
{
public:
void recordBodyThread()
{
std::lock_guard<std::mutex> guard(mutex);
bodyThreadId = std::this_thread::get_id();
}
void recordWorldThread()
{
std::lock_guard<std::mutex> guard(mutex);
worldThreadId = std::this_thread::get_id();
}
void recordLegThread()
{
std::lock_guard<std::mutex> guard(mutex);
legThreadId = std::this_thread::get_id();
}
void recordCompletionThread()
{
std::lock_guard<std::mutex> guard(mutex);
completionThreadId = std::this_thread::get_id();
}
void recordLegSum(int value)
{
std::lock_guard<std::mutex> guard(mutex);
legSum = value;
}
void recordWorldString(std::string value)
{
std::lock_guard<std::mutex> guard(mutex);
worldString = std::move(value);
}
void recordBodyString(std::string value)
{
std::lock_guard<std::mutex> guard(mutex);
bodyString = std::move(value);
}
std::thread::id bodyThread() const
{
std::lock_guard<std::mutex> guard(mutex);
return bodyThreadId;
}
std::thread::id worldThread() const
{
std::lock_guard<std::mutex> guard(mutex);
return worldThreadId;
}
std::thread::id legThread() const
{
std::lock_guard<std::mutex> guard(mutex);
return legThreadId;
}
std::thread::id completionThread() const
{
std::lock_guard<std::mutex> guard(mutex);
return completionThreadId;
}
int recordedLegSum() const
{
std::lock_guard<std::mutex> guard(mutex);
return legSum;
}
std::string recordedWorldString() const
{
std::lock_guard<std::mutex> guard(mutex);
return worldString;
}
std::string recordedBodyString() const
{
std::lock_guard<std::mutex> guard(mutex);
return bodyString;
}
private:
mutable std::mutex mutex;
std::thread::id bodyThreadId;
std::thread::id worldThreadId;
std::thread::id legThreadId;
std::thread::id completionThreadId;
int legSum = 0;
std::string worldString;
std::string bodyString;
};
LegViralInvoker<int> print2Ints(
int arg1,
int arg2,
ComponentContinuationTrace &trace)
{
sscl::co::CoQutex print2IntsLock;
trace.recordLegThread();
auto releaseHandle =
co_await print2IntsLock.getAcquireInvocationAndSuspensionPolicy();
const int sum = arg1 + arg2;
trace.recordLegSum(sum);
releaseHandle.release();
co_return sum;
}
WorldViralInvoker<std::string> print2Strings(
std::string arg1,
std::string arg2,
ComponentContinuationTrace &trace)
{
sscl::co::CoQutex print2StringsLock;
trace.recordWorldThread();
auto releaseHandle =
co_await print2StringsLock.getAcquireInvocationAndSuspensionPolicy();
const int returnedInt =
co_await print2Ints(leftValue, rightValue, trace);
releaseHandle.release();
if (returnedInt != expectedIntSum) {
throw std::runtime_error("LEG int return mismatch");
}
std::string returnedString = arg1 + " " + arg2;
trace.recordWorldString(returnedString);
co_return returnedString;
}
BodyNonViralInvoker initializeDemoCReq(
std::exception_ptr &exceptionPtr,
std::function<void()> completion,
int arg3,
std::string arg4,
ComponentContinuationTrace &trace)
{
(void)exceptionPtr;
(void)completion;
(void)arg3;
(void)arg4;
sscl::co::CoQutex initializeLock;
trace.recordBodyThread();
auto releaseHandle =
co_await initializeLock.getAcquireInvocationAndSuspensionPolicy();
std::string returnedString =
co_await print2Strings(leftString, rightString, trace);
releaseHandle.release();
trace.recordBodyString(returnedString);
co_return;
}
class ComponentContinuationTest
: public ::testing::Test
{
protected:
sscl::tests::PostingThreadSet threads;
};
} // namespace
TEST_F(ComponentContinuationTest, SyncMainStyleContinuationCrossesComponentThreads)
{
ComponentContinuationTrace trace;
ASSERT_NO_THROW(
sscl::tests::runNonViralPostingTask(
threads.caller(),
[&trace](
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
return initializeDemoCReq(
exceptionPtr,
[&trace, completion = std::move(completion)]() mutable
{
trace.recordCompletionThread();
completion();
},
bodyArgument,
bodyStringArgument,
trace);
}));
EXPECT_EQ(trace.bodyThread(), threads.body().osThreadId());
EXPECT_EQ(trace.worldThread(), threads.world().osThreadId());
EXPECT_EQ(trace.legThread(), threads.leg().osThreadId());
EXPECT_EQ(trace.completionThread(), threads.caller().osThreadId());
EXPECT_NE(trace.bodyThread(), trace.worldThread());
EXPECT_NE(trace.worldThread(), trace.legThread());
EXPECT_NE(trace.legThread(), trace.completionThread());
EXPECT_EQ(trace.recordedLegSum(), expectedIntSum);
EXPECT_EQ(trace.recordedWorldString(), expectedString);
EXPECT_EQ(trace.recordedBodyString(), expectedString);
}
+864
View File
@@ -0,0 +1,864 @@
#include <atomic>
#include <chrono>
#include <exception>
#include <functional>
#include <stdexcept>
#include <string>
#include <thread>
#include <gtest/gtest.h>
#include <boost/asio/post.hpp>
#include <boost/system/error_code.hpp>
#include <spinscale/co/group.h>
#include <spinscale/componentThread.h>
#include <support/groupAssertions.h>
#include <support/threadHarness.h>
#include <support/timerAwaiters.h>
namespace {
constexpr int delayShortMs = 50;
constexpr int delayMediumMs = 200;
constexpr int delayLongMs = 500;
constexpr int delayAddWhileSuspendedProbeMs = 80;
constexpr int expectedNonStdThrowValue = 42;
constexpr int wave2ImmediateSettlementLabel = 1000;
constexpr const char *expectedThrowMessage =
"group_edge_test intentional failure";
using CallerDriver =
sscl::tests::RoleNonViralPostingInvoker<
sscl::tests::PostingThreadRole::CALLER>;
using CalleeIntInvoker =
sscl::tests::RoleViralPostingInvoker<
sscl::tests::PostingThreadRole::CALLEE,
int>;
using CalleeVoidInvoker =
sscl::tests::RoleViralPostingInvoker<
sscl::tests::PostingThreadRole::CALLEE,
void>;
CalleeIntInvoker waitAndReturnLabel(int timerLabelMilliseconds)
{
const boost::system::error_code waitError =
co_await sscl::tests::DeadlineTimerAwaiter{
sscl::ComponentThread::getSelf()->getIoContext(),
timerLabelMilliseconds};
sscl::tests::throwIfTimerWaitFailed(waitError);
co_return timerLabelMilliseconds;
}
CalleeIntInvoker waitThenThrowAfterDelay(int delayMilliseconds)
{
const boost::system::error_code waitError =
co_await sscl::tests::DeadlineTimerAwaiter{
sscl::ComponentThread::getSelf()->getIoContext(),
delayMilliseconds};
sscl::tests::throwIfTimerWaitFailed(waitError);
throw std::runtime_error(expectedThrowMessage);
}
CalleeIntInvoker waitThenThrowIntAfterDelay(int delayMilliseconds)
{
const boost::system::error_code waitError =
co_await sscl::tests::DeadlineTimerAwaiter{
sscl::ComponentThread::getSelf()->getIoContext(),
delayMilliseconds};
sscl::tests::throwIfTimerWaitFailed(waitError);
throw expectedNonStdThrowValue;
}
CalleeIntInvoker returnLabelImmediately(int label)
{
co_return label;
}
CalleeVoidInvoker voidMemberAfterDelay(int delayMilliseconds)
{
const boost::system::error_code waitError =
co_await sscl::tests::DeadlineTimerAwaiter{
sscl::ComponentThread::getSelf()->getIoContext(),
delayMilliseconds};
sscl::tests::throwIfTimerWaitFailed(waitError);
co_return;
}
CalleeIntInvoker waitRecordThreadAndReturnLabel(
int timerLabelMilliseconds,
sscl::tests::CrossThreadTrace &trace)
{
const boost::system::error_code waitError =
co_await sscl::tests::DeadlineTimerAwaiter{
sscl::ComponentThread::getSelf()->getIoContext(),
timerLabelMilliseconds};
sscl::tests::throwIfTimerWaitFailed(waitError);
trace.recordCalleeExecutionThread();
co_return timerLabelMilliseconds;
}
sscl::co::ViralNonPostingInvoker<void> waitOnCallerThread(int delayMilliseconds)
{
const boost::system::error_code waitError =
co_await sscl::tests::DeadlineTimerAwaiter{
sscl::ComponentThread::getSelf()->getIoContext(),
delayMilliseconds};
sscl::tests::throwIfTimerWaitFailed(waitError);
co_return;
}
CallerDriver mixedSuccessAndFailureAwaitFirstThenAll(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
CalleeIntInvoker successInvoker = waitAndReturnLabel(1);
CalleeIntInvoker failureInvoker = waitThenThrowAfterDelay(delayShortMs);
group.add(successInvoker);
group.add(failureInvoker);
auto awaitFirst = group.getAwaitFirstSettlementInvoker();
auto [firstDescriptor, allAfterFirst] = co_await awaitFirst;
if (firstDescriptor.type
== sscl::co::Group::SettlementDescriptor::TypeE::COMPLETED) {
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
firstDescriptor,
1);
}
else if (firstDescriptor.type
== sscl::co::Group::SettlementDescriptor::TypeE::EXCEPTION_THROWN) {
sscl::tests::requireRuntimeErrorSettlement(
firstDescriptor,
expectedThrowMessage);
}
else {
throw std::runtime_error("first settlement has unexpected type");
}
auto awaitAll = group.getAwaitAllSettlementsInvoker();
auto &allDescriptors = co_await awaitAll;
if (allDescriptors.size() != 2 || allAfterFirst.size() != 2) {
throw std::runtime_error("mixed settlement count mismatch");
}
std::size_t completedCount = 0;
std::size_t exceptionCount = 0;
for (auto &descriptor : allDescriptors) {
if (descriptor.type
== sscl::co::Group::SettlementDescriptor::TypeE::COMPLETED) {
++completedCount;
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
descriptor,
1);
}
else if (descriptor.type
== sscl::co::Group::SettlementDescriptor::TypeE::EXCEPTION_THROWN) {
++exceptionCount;
sscl::tests::requireRuntimeErrorSettlement(
descriptor,
expectedThrowMessage);
}
}
if (completedCount != 1 || exceptionCount != 1) {
throw std::runtime_error("mixed settlement type counts mismatch");
}
co_return;
}
CallerDriver singleMemberAwaitFirstThenAll(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
CalleeIntInvoker onlyInvoker = waitAndReturnLabel(delayShortMs);
group.add(onlyInvoker);
auto awaitFirst = group.getAwaitFirstSettlementInvoker();
auto [firstDescriptor, allAfterFirst] = co_await awaitFirst;
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
firstDescriptor,
delayShortMs);
if (!group.allInvokersSettled() || allAfterFirst.size() != 1) {
throw std::runtime_error("single member state mismatch");
}
auto awaitAll = group.getAwaitAllSettlementsInvoker();
auto &allDescriptors = co_await awaitAll;
if (allDescriptors.size() != 1) {
throw std::runtime_error("single member await-all count mismatch");
}
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
allDescriptors[0],
delayShortMs);
co_return;
}
CallerDriver allCompleteBeforeCoAwait(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
CalleeIntInvoker invokerTen = returnLabelImmediately(10);
CalleeIntInvoker invokerTwenty = returnLabelImmediately(20);
CalleeIntInvoker invokerThirty = returnLabelImmediately(30);
group.add(invokerTen);
group.add(invokerTwenty);
group.add(invokerThirty);
co_await waitOnCallerThread(delayShortMs);
if (!group.allInvokersSettled() || !group.firstInvokerSettled()) {
throw std::runtime_error("immediate group did not settle before await");
}
auto awaitFirst = group.getAwaitFirstSettlementInvoker();
auto [firstDescriptor, allAfterFirst] = co_await awaitFirst;
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
firstDescriptor,
10);
auto awaitAll = group.getAwaitAllSettlementsInvoker();
auto &allDescriptors = co_await awaitAll;
if (allDescriptors.size() != 3 || allAfterFirst.size() != 3) {
throw std::runtime_error("immediate settlement count mismatch");
}
co_return;
}
std::jthread startAddWhileGroupAwaiterSuspendedProbe(
sscl::co::Group &group,
CalleeIntInvoker &lateInvoker,
std::atomic<bool> &groupIsAwaitingAll,
std::atomic<bool> &addWasRejected)
{
return std::jthread(
[&]()
{
while (!groupIsAwaitingAll.load(std::memory_order_acquire)) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
std::this_thread::sleep_for(
std::chrono::milliseconds(delayAddWhileSuspendedProbeMs));
boost::asio::post(
sscl::tests::ThreadRegistry::ioContext(
sscl::tests::PostingThreadRole::CALLER),
[&]()
{
try {
group.add(lateInvoker);
}
catch (const std::runtime_error &) {
addWasRejected.store(true, std::memory_order_release);
}
});
});
}
CallerDriver addWhileAwaitAllSuspended(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
std::atomic<bool> groupIsAwaitingAll{false};
std::atomic<bool> addWasRejected{false};
CalleeIntInvoker slowInvokerA = waitAndReturnLabel(delayLongMs);
CalleeIntInvoker slowInvokerB = waitAndReturnLabel(delayLongMs);
CalleeIntInvoker lateInvoker = waitAndReturnLabel(99);
group.add(slowInvokerA);
group.add(slowInvokerB);
std::jthread addProbeThread = startAddWhileGroupAwaiterSuspendedProbe(
group,
lateInvoker,
groupIsAwaitingAll,
addWasRejected);
auto awaitAll = group.getAwaitAllSettlementsInvoker();
groupIsAwaitingAll.store(true, std::memory_order_release);
co_await awaitAll;
addProbeThread.join();
if (!addWasRejected.load(std::memory_order_acquire)) {
throw std::runtime_error("expected add while suspended to throw");
}
co_return;
}
CallerDriver awaitAllOnlyMixedOutcomes(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
CalleeIntInvoker successInvoker = returnLabelImmediately(7);
CalleeIntInvoker failureInvoker = waitThenThrowAfterDelay(delayShortMs);
group.add(successInvoker);
group.add(failureInvoker);
auto awaitAll = group.getAwaitAllSettlementsInvoker();
auto &allDescriptors = co_await awaitAll;
if (allDescriptors.size() != 2) {
throw std::runtime_error("await-all-only count mismatch");
}
std::size_t completedCount = 0;
std::size_t exceptionCount = 0;
for (auto &descriptor : allDescriptors) {
if (descriptor.type
== sscl::co::Group::SettlementDescriptor::TypeE::COMPLETED) {
++completedCount;
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
descriptor,
7);
}
else if (descriptor.type
== sscl::co::Group::SettlementDescriptor::TypeE::EXCEPTION_THROWN) {
++exceptionCount;
sscl::tests::requireRuntimeErrorSettlement(
descriptor,
expectedThrowMessage);
}
}
if (completedCount != 1 || exceptionCount != 1) {
throw std::runtime_error("await-all-only mixed counts mismatch");
}
co_return;
}
CallerDriver checkForAndReThrowGroupExceptions(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
CalleeIntInvoker failureInvoker = waitThenThrowAfterDelay(delayShortMs);
group.add(failureInvoker);
(void)co_await group.getAwaitAllSettlementsInvoker();
try {
group.checkForAndReThrowGroupExceptions();
}
catch (const std::runtime_error &aggregateError) {
if (std::string(aggregateError.what()).find(expectedThrowMessage)
== std::string::npos) {
throw std::runtime_error("aggregate message missing callee text");
}
co_return;
}
throw std::runtime_error("expected aggregate group exception");
}
CallerDriver emptyGroupAwaitAllThrows(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
try {
(void)co_await group.getAwaitAllSettlementsInvoker();
}
catch (const std::runtime_error &runtimeError) {
sscl::tests::requireEmptyGroupError(runtimeError);
co_return;
}
throw std::runtime_error("expected empty group await-all to throw");
}
CallerDriver emptyGroupAwaitFirstThrows(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
try {
(void)co_await group.getAwaitFirstSettlementInvoker();
}
catch (const std::runtime_error &runtimeError) {
sscl::tests::requireEmptyGroupError(runtimeError);
co_return;
}
throw std::runtime_error("expected empty group await-first to throw");
}
CallerDriver wrongAwaitInvokerOrder(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
CalleeIntInvoker shortInvoker = waitAndReturnLabel(delayShortMs);
CalleeIntInvoker mediumInvoker = waitAndReturnLabel(delayMediumMs);
group.add(shortInvoker);
group.add(mediumInvoker);
auto awaitFirstHandle = group.getAwaitFirstSettlementInvoker();
auto awaitAllHandle = group.getAwaitAllSettlementsInvoker();
auto &allDescriptors = co_await awaitAllHandle;
if (allDescriptors.size() != 2) {
throw std::runtime_error("wrong-order await-all count mismatch");
}
auto [firstDescriptor, allAfterFirst] = co_await awaitFirstHandle;
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
firstDescriptor,
sscl::tests::completedIntValue(
firstDescriptor.invokerAs<CalleeIntInvoker>()));
if (!group.firstInvokerSettled() || allAfterFirst.size() != 2) {
throw std::runtime_error("wrong-order await-first state mismatch");
}
co_return;
}
CallerDriver doubleCoAwaitSameAwaitFirst(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
CalleeIntInvoker memberInvoker = returnLabelImmediately(delayShortMs);
group.add(memberInvoker);
co_await waitOnCallerThread(delayShortMs);
auto awaitFirst = group.getAwaitFirstSettlementInvoker();
auto [firstDescriptorA, allAfterFirstA] = co_await awaitFirst;
auto [firstDescriptorB, allAfterFirstB] = co_await awaitFirst;
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
firstDescriptorA,
delayShortMs);
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
firstDescriptorB,
delayShortMs);
if (&firstDescriptorA.invokerAs<CalleeIntInvoker>()
!= &firstDescriptorB.invokerAs<CalleeIntInvoker>()) {
throw std::runtime_error("double await-first descriptor mismatch");
}
if (allAfterFirstA.size() != allAfterFirstB.size()) {
throw std::runtime_error("double await-first snapshot mismatch");
}
co_return;
}
CallerDriver doubleCoAwaitSameAwaitAll(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
CalleeIntInvoker memberInvoker = waitAndReturnLabel(delayShortMs);
group.add(memberInvoker);
auto awaitAll = group.getAwaitAllSettlementsInvoker();
auto &allDescriptorsA = co_await awaitAll;
auto &allDescriptorsB = co_await awaitAll;
if (allDescriptorsA.size() != 1 || allDescriptorsB.size() != 1) {
throw std::runtime_error("double await-all count mismatch");
}
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
allDescriptorsA[0],
delayShortMs);
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
allDescriptorsB[0],
delayShortMs);
co_return;
}
CallerDriver twoAwaitFirstHandlesSequentially(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
CalleeIntInvoker shortInvoker = waitAndReturnLabel(delayShortMs);
CalleeIntInvoker mediumInvoker = waitAndReturnLabel(delayMediumMs);
group.add(shortInvoker);
group.add(mediumInvoker);
auto awaitFirstA = group.getAwaitFirstSettlementInvoker();
auto [firstDescriptorA, allAfterFirstA] = co_await awaitFirstA;
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
firstDescriptorA,
delayShortMs);
auto awaitFirstB = group.getAwaitFirstSettlementInvoker();
auto [firstDescriptorB, allAfterFirstB] = co_await awaitFirstB;
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
firstDescriptorB,
delayShortMs);
if (&firstDescriptorA.invokerAs<CalleeIntInvoker>()
!= &firstDescriptorB.invokerAs<CalleeIntInvoker>()) {
throw std::runtime_error("sticky first settlement mismatch");
}
(void)co_await group.getAwaitAllSettlementsInvoker();
(void)allAfterFirstA;
(void)allAfterFirstB;
co_return;
}
CallerDriver addSecondWaveAfterAwaitAll(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
CalleeIntInvoker wave1MemberA = waitAndReturnLabel(delayLongMs);
CalleeIntInvoker wave1MemberB = waitAndReturnLabel(delayLongMs);
group.add(wave1MemberA);
group.add(wave1MemberB);
(void)co_await group.getAwaitAllSettlementsInvoker();
CalleeIntInvoker wave2Immediate =
returnLabelImmediately(wave2ImmediateSettlementLabel);
CalleeIntInvoker wave2Slow = waitAndReturnLabel(delayMediumMs);
group.add(wave2Immediate);
group.add(wave2Slow);
co_await waitOnCallerThread(delayShortMs);
if (sscl::tests::completedIntValue(wave2Immediate)
!= wave2ImmediateSettlementLabel) {
throw std::runtime_error("wave-2 immediate member did not complete");
}
if (group.allInvokersSettled()) {
throw std::runtime_error("wave-2 slow member should still be in flight");
}
auto &allDescriptors =
co_await group.getAwaitAllSettlementsInvoker();
if (allDescriptors.size() != 4) {
throw std::runtime_error("expected four settlements after second wave");
}
co_return;
}
CallerDriver shortTimerAddedAfterLongStillWinsRace(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
CalleeIntInvoker longInvoker = waitAndReturnLabel(delayLongMs);
CalleeIntInvoker shortInvoker = waitAndReturnLabel(delayShortMs);
group.add(longInvoker);
group.add(shortInvoker);
auto awaitFirst = group.getAwaitFirstSettlementInvoker();
auto [firstDescriptor, allAfterFirst] = co_await awaitFirst;
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
firstDescriptor,
delayShortMs);
if (&firstDescriptor.invokerAs<CalleeIntInvoker>() != &shortInvoker) {
throw std::runtime_error("short timer should win first settlement");
}
(void)co_await group.getAwaitAllSettlementsInvoker();
(void)allAfterFirst;
co_return;
}
CallerDriver nonStdExceptionSettlement(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
CalleeIntInvoker failureInvoker = waitThenThrowIntAfterDelay(delayShortMs);
group.add(failureInvoker);
auto &allDescriptors = co_await group.getAwaitAllSettlementsInvoker();
if (allDescriptors.size() != 1) {
throw std::runtime_error("non-std exception count mismatch");
}
sscl::tests::requireIntExceptionSettlement(
allDescriptors[0],
expectedNonStdThrowValue);
try {
group.checkForAndReThrowGroupExceptions();
}
catch (const std::runtime_error &) {
co_return;
}
throw std::runtime_error("expected aggregate for non-std exception");
}
CallerDriver voidViralMemberInGroup(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
CalleeVoidInvoker voidInvoker = voidMemberAfterDelay(delayShortMs);
group.add(voidInvoker);
auto &allDescriptors = co_await group.getAwaitAllSettlementsInvoker();
if (allDescriptors.size() != 1) {
throw std::runtime_error("void group count mismatch");
}
if (allDescriptors[0].type
!= sscl::co::Group::SettlementDescriptor::TypeE::COMPLETED) {
throw std::runtime_error("void member did not complete");
}
co_return;
}
CallerDriver returnValuesRemainReadableAfterAwaitFirst(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
CalleeIntInvoker slowInvoker = waitAndReturnLabel(delayLongMs);
CalleeIntInvoker fastInvoker = waitAndReturnLabel(delayShortMs);
group.add(slowInvoker);
group.add(fastInvoker);
auto awaitFirst = group.getAwaitFirstSettlementInvoker();
auto [firstDescriptor, allAfterFirst] = co_await awaitFirst;
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
firstDescriptor,
delayShortMs);
const int fastLabelFromDescriptor = sscl::tests::completedIntValue(
firstDescriptor.invokerAs<CalleeIntInvoker>());
const int fastLabelFromLocal =
sscl::tests::completedIntValue(fastInvoker);
if (fastLabelFromDescriptor != fastLabelFromLocal) {
throw std::runtime_error("descriptor/local return value mismatch");
}
if (allAfterFirst.size() != 2) {
throw std::runtime_error("expected two settlement slots");
}
(void)co_await group.getAwaitAllSettlementsInvoker();
co_return;
}
CallerDriver groupMemberRunsOnCalleeAndAwaitResumesOnCaller(
std::exception_ptr &exceptionPtr,
std::function<void()> completion,
sscl::tests::CrossThreadTrace &trace)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
CalleeIntInvoker memberInvoker = waitRecordThreadAndReturnLabel(
delayShortMs,
trace);
group.add(memberInvoker);
auto awaitFirst = group.getAwaitFirstSettlementInvoker();
auto [firstDescriptor, allAfterFirst] = co_await awaitFirst;
trace.recordAwaitResumeThread();
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
firstDescriptor,
delayShortMs);
if (allAfterFirst.size() != 1) {
throw std::runtime_error("cross-thread group trace count mismatch");
}
co_return;
}
class GroupEdgeTest
: public ::testing::Test
{
protected:
template <typename Factory>
void runScenario(Factory &&factory)
{
ASSERT_NO_THROW(
sscl::tests::runNonViralPostingTask(
threads.caller(),
std::forward<Factory>(factory)));
}
sscl::tests::PostingThreadSet threads;
};
} // namespace
#define RUN_GROUP_EDGE_SCENARIO(testName, functionName) \
TEST_F(GroupEdgeTest, testName) \
{ \
runScenario( \
[]( \
std::exception_ptr &exceptionPtr, \
std::function<void()> completion) \
{ \
return functionName(exceptionPtr, std::move(completion)); \
}); \
}
RUN_GROUP_EDGE_SCENARIO(
MixedSuccessAndFailureAwaitFirstThenAll,
mixedSuccessAndFailureAwaitFirstThenAll)
RUN_GROUP_EDGE_SCENARIO(
SingleMemberAwaitFirstThenAll,
singleMemberAwaitFirstThenAll)
RUN_GROUP_EDGE_SCENARIO(AllCompleteBeforeCoAwait, allCompleteBeforeCoAwait)
RUN_GROUP_EDGE_SCENARIO(AddWhileAwaitAllSuspended, addWhileAwaitAllSuspended)
RUN_GROUP_EDGE_SCENARIO(AwaitAllOnlyMixedOutcomes, awaitAllOnlyMixedOutcomes)
RUN_GROUP_EDGE_SCENARIO(
CheckForAndReThrowGroupExceptions,
checkForAndReThrowGroupExceptions)
RUN_GROUP_EDGE_SCENARIO(EmptyGroupAwaitAllThrows, emptyGroupAwaitAllThrows)
RUN_GROUP_EDGE_SCENARIO(EmptyGroupAwaitFirstThrows, emptyGroupAwaitFirstThrows)
RUN_GROUP_EDGE_SCENARIO(WrongAwaitInvokerOrder, wrongAwaitInvokerOrder)
RUN_GROUP_EDGE_SCENARIO(DoubleCoAwaitSameAwaitFirst, doubleCoAwaitSameAwaitFirst)
RUN_GROUP_EDGE_SCENARIO(DoubleCoAwaitSameAwaitAll, doubleCoAwaitSameAwaitAll)
RUN_GROUP_EDGE_SCENARIO(
TwoAwaitFirstHandlesSequentially,
twoAwaitFirstHandlesSequentially)
RUN_GROUP_EDGE_SCENARIO(AddSecondWaveAfterAwaitAll, addSecondWaveAfterAwaitAll)
RUN_GROUP_EDGE_SCENARIO(
ShortTimerAddedAfterLongStillWinsRace,
shortTimerAddedAfterLongStillWinsRace)
RUN_GROUP_EDGE_SCENARIO(NonStdExceptionSettlement, nonStdExceptionSettlement)
RUN_GROUP_EDGE_SCENARIO(VoidViralMemberInGroup, voidViralMemberInGroup)
RUN_GROUP_EDGE_SCENARIO(
ReturnValuesRemainReadableAfterAwaitFirst,
returnValuesRemainReadableAfterAwaitFirst)
TEST_F(GroupEdgeTest, SuspendingMemberRunsOnCalleeAndAwaitResumesOnCaller)
{
sscl::tests::CrossThreadTrace trace;
runScenario(
[&trace](
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
return groupMemberRunsOnCalleeAndAwaitResumesOnCaller(
exceptionPtr,
std::move(completion),
trace);
});
EXPECT_EQ(trace.calleeExecutionThread(), threads.callee().osThreadId());
EXPECT_EQ(trace.awaitResumeThread(), threads.caller().osThreadId());
EXPECT_NE(trace.calleeExecutionThread(), trace.awaitResumeThread());
}
TEST_F(GroupEdgeTest, NonViralVoidGroupTemplateInstantiates)
{
GTEST_SKIP()
<< "NonViralPostingInvoker does not satisfy Group's awaitable concept.";
}
TEST_F(GroupEdgeTest, EarlyInvokerDestructionIsUnsupported)
{
GTEST_SKIP()
<< "Destroying a member invoker before group settlement completes is undefined.";
}
TEST_F(GroupEdgeTest, OverlappingGroupWaitsAssertInDebug)
{
GTEST_SKIP()
<< "Overlapping group co_await is debug-assert behavior.";
}
+368
View File
@@ -0,0 +1,368 @@
#include <chrono>
#include <exception>
#include <functional>
#include <map>
#include <mutex>
#include <stdexcept>
#include <string>
#include <thread>
#include <gtest/gtest.h>
#include <boost/asio/error.hpp>
#include <boost/system/error_code.hpp>
#include <spinscale/co/group.h>
#include <spinscale/componentThread.h>
#include <support/groupAssertions.h>
#include <support/threadHarness.h>
#include <support/timerAwaiters.h>
namespace {
constexpr int timerDelayShortMs = 50;
constexpr int timerDelayMediumMs = 200;
constexpr int timerDelayLongMs = 500;
constexpr int awaitAllTimingSlackMs = 25;
constexpr int awaitAllLongCancelTimingMarginMs = 50;
using CallerDriver =
sscl::tests::RoleNonViralPostingInvoker<
sscl::tests::PostingThreadRole::CALLER>;
using CalleeIntInvoker =
sscl::tests::RoleViralPostingInvoker<
sscl::tests::PostingThreadRole::CALLEE,
int>;
using Clock = std::chrono::steady_clock;
using Ms = std::chrono::milliseconds;
class GroupTimerThreadTrace
{
public:
void recordTimerCompletionThread(int timerLabelMilliseconds)
{
std::lock_guard<std::mutex> guard(mutex);
timerCompletionThreads[timerLabelMilliseconds] =
std::this_thread::get_id();
}
void recordAwaitFirstResumeThread()
{
std::lock_guard<std::mutex> guard(mutex);
awaitFirstResumeThread = std::this_thread::get_id();
}
void recordAwaitAllResumeThread()
{
std::lock_guard<std::mutex> guard(mutex);
awaitAllResumeThread = std::this_thread::get_id();
}
std::thread::id timerCompletionThread(int timerLabelMilliseconds) const
{
std::lock_guard<std::mutex> guard(mutex);
const auto iterator =
timerCompletionThreads.find(timerLabelMilliseconds);
if (iterator == timerCompletionThreads.end()) {
throw std::runtime_error("Missing timer completion thread trace");
}
return iterator->second;
}
std::thread::id awaitFirstThread() const
{
std::lock_guard<std::mutex> guard(mutex);
return awaitFirstResumeThread;
}
std::thread::id awaitAllThread() const
{
std::lock_guard<std::mutex> guard(mutex);
return awaitAllResumeThread;
}
private:
mutable std::mutex mutex;
std::map<int, std::thread::id> timerCompletionThreads;
std::thread::id awaitFirstResumeThread;
std::thread::id awaitAllResumeThread;
};
CalleeIntInvoker waitDeadlineTimer(
int timerLabelMilliseconds,
GroupTimerThreadTrace &trace)
{
const boost::system::error_code waitError =
co_await sscl::tests::DeadlineTimerAwaiter{
sscl::ComponentThread::getSelf()->getIoContext(),
timerLabelMilliseconds};
sscl::tests::throwIfTimerWaitFailed(waitError);
trace.recordTimerCompletionThread(timerLabelMilliseconds);
co_return timerLabelMilliseconds;
}
CalleeIntInvoker waitCancelableDeadlineTimer(
int timerLabelMilliseconds,
sscl::tests::CancelableDeadlineTimerRegistry &registry,
GroupTimerThreadTrace &trace)
{
const boost::system::error_code waitError =
co_await sscl::tests::RegisteredDeadlineTimerAwaiter{
sscl::ComponentThread::getSelf()->getIoContext(),
timerLabelMilliseconds,
timerLabelMilliseconds,
registry};
if (sscl::tests::timerWasCanceled(waitError)) {
trace.recordTimerCompletionThread(timerLabelMilliseconds);
co_return timerLabelMilliseconds;
}
sscl::tests::throwIfTimerWaitFailed(waitError);
trace.recordTimerCompletionThread(timerLabelMilliseconds);
co_return timerLabelMilliseconds;
}
void throwIfElapsedTooLong(
const Ms &elapsed,
const Ms &limit,
const char *message)
{
if (elapsed > limit) {
throw std::runtime_error(
std::string(message) + ": " + std::to_string(elapsed.count()));
}
}
void throwIfElapsedTooShort(
const Ms &elapsed,
const Ms &limit,
const char *message)
{
if (elapsed < limit) {
throw std::runtime_error(
std::string(message) + ": " + std::to_string(elapsed.count()));
}
}
CallerDriver runGroupTimerRace(
std::exception_ptr &exceptionPtr,
std::function<void()> completion,
GroupTimerThreadTrace &trace)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
CalleeIntInvoker invokerShort =
waitDeadlineTimer(timerDelayShortMs, trace);
CalleeIntInvoker invokerMedium =
waitDeadlineTimer(timerDelayMediumMs, trace);
CalleeIntInvoker invokerLong =
waitDeadlineTimer(timerDelayLongMs, trace);
group.add(invokerShort);
group.add(invokerMedium);
group.add(invokerLong);
const auto testStart = Clock::now();
auto awaitFirst = group.getAwaitFirstSettlementInvoker();
auto [firstSettlement, allSettlementsAfterFirst] = co_await awaitFirst;
trace.recordAwaitFirstResumeThread();
const auto firstElapsedMs =
std::chrono::duration_cast<Ms>(Clock::now() - testStart);
throwIfElapsedTooLong(
firstElapsedMs,
Ms(timerDelayMediumMs - awaitAllTimingSlackMs),
"await-first took too long");
if (&firstSettlement.invokerAs<CalleeIntInvoker>() != &invokerShort) {
throw std::runtime_error("first settlement was not shortest timer");
}
if (group.allInvokersSettled()) {
throw std::runtime_error("await-first returned after all settled");
}
auto awaitAll = group.getAwaitAllSettlementsInvoker();
auto &allSettlements = co_await awaitAll;
trace.recordAwaitAllResumeThread();
const auto allElapsedMs =
std::chrono::duration_cast<Ms>(Clock::now() - testStart);
throwIfElapsedTooShort(
allElapsedMs,
Ms(timerDelayLongMs - awaitAllLongCancelTimingMarginMs),
"await-all finished too soon");
if (allSettlements.size() != 3) {
throw std::runtime_error("expected three settlements");
}
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
firstSettlement,
timerDelayShortMs);
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
allSettlementsAfterFirst[0],
timerDelayShortMs);
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
allSettlementsAfterFirst[1],
timerDelayMediumMs);
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
allSettlementsAfterFirst[2],
timerDelayLongMs);
co_return;
}
CallerDriver runGroupTimerCancelLongAfterAwaitFirst(
std::exception_ptr &exceptionPtr,
std::function<void()> completion,
sscl::tests::CancelableDeadlineTimerRegistry &registry,
GroupTimerThreadTrace &trace)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
CalleeIntInvoker invokerShort =
waitCancelableDeadlineTimer(timerDelayShortMs, registry, trace);
CalleeIntInvoker invokerMedium =
waitCancelableDeadlineTimer(timerDelayMediumMs, registry, trace);
CalleeIntInvoker invokerLong =
waitCancelableDeadlineTimer(timerDelayLongMs, registry, trace);
group.add(invokerShort);
group.add(invokerMedium);
group.add(invokerLong);
const auto testStart = Clock::now();
auto awaitFirst = group.getAwaitFirstSettlementInvoker();
auto [firstSettlement, allSettlementsAfterFirst] = co_await awaitFirst;
trace.recordAwaitFirstResumeThread();
if (&firstSettlement.invokerAs<CalleeIntInvoker>() != &invokerShort) {
throw std::runtime_error("cancel test first settlement mismatch");
}
if (group.allInvokersSettled()) {
throw std::runtime_error("cancel test all settled after await-first");
}
registry.cancel(timerDelayLongMs);
auto awaitAll = group.getAwaitAllSettlementsInvoker();
auto &allSettlements = co_await awaitAll;
trace.recordAwaitAllResumeThread();
const auto allElapsedMs =
std::chrono::duration_cast<Ms>(Clock::now() - testStart);
if (allElapsedMs >= Ms(timerDelayLongMs - awaitAllLongCancelTimingMarginMs)) {
throw std::runtime_error("await-all waited for canceled long timer");
}
throwIfElapsedTooShort(
allElapsedMs,
Ms(timerDelayMediumMs - awaitAllTimingSlackMs),
"await-all finished before medium timer");
if (allSettlements.size() != 3) {
throw std::runtime_error("cancel test expected three settlements");
}
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
allSettlements[0],
timerDelayShortMs);
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
allSettlements[1],
timerDelayMediumMs);
sscl::tests::requireCompletedIntSettlement<CalleeIntInvoker>(
allSettlements[2],
timerDelayLongMs);
if (&allSettlements[2].invokerAs<CalleeIntInvoker>() != &invokerLong) {
throw std::runtime_error("cancel test long invoker mismatch");
}
(void)allSettlementsAfterFirst;
co_return;
}
class GroupTimerTest
: public ::testing::Test
{
protected:
void assertTimerTraceCrossedThreads(
const GroupTimerThreadTrace &trace)
{
EXPECT_EQ(
trace.timerCompletionThread(timerDelayShortMs),
threads.callee().osThreadId());
EXPECT_EQ(
trace.timerCompletionThread(timerDelayMediumMs),
threads.callee().osThreadId());
EXPECT_EQ(
trace.timerCompletionThread(timerDelayLongMs),
threads.callee().osThreadId());
EXPECT_EQ(trace.awaitFirstThread(), threads.caller().osThreadId());
EXPECT_EQ(trace.awaitAllThread(), threads.caller().osThreadId());
EXPECT_NE(
trace.timerCompletionThread(timerDelayShortMs),
trace.awaitFirstThread());
}
sscl::tests::PostingThreadSet threads;
};
} // namespace
TEST_F(GroupTimerTest, AwaitFirstReturnsShortestTimerAndAwaitAllWaitsForLongest)
{
GroupTimerThreadTrace trace;
ASSERT_NO_THROW(
sscl::tests::runNonViralPostingTask(
threads.caller(),
[&trace](
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
return runGroupTimerRace(
exceptionPtr,
std::move(completion),
trace);
}));
assertTimerTraceCrossedThreads(trace);
}
TEST_F(GroupTimerTest, CancelLongTimerAfterAwaitFirst)
{
sscl::tests::CancelableDeadlineTimerRegistry registry;
GroupTimerThreadTrace trace;
ASSERT_NO_THROW(
sscl::tests::runNonViralPostingTask(
threads.caller(),
[&registry, &trace](
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
return runGroupTimerCancelLongAfterAwaitFirst(
exceptionPtr,
std::move(completion),
registry,
trace);
}));
assertTimerTraceCrossedThreads(trace);
}
+657
View File
@@ -0,0 +1,657 @@
#include <atomic>
#include <chrono>
#include <coroutine>
#include <exception>
#include <functional>
#include <gtest/gtest.h>
#include <thread>
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
#include <spinscale/co/invokers.h>
#include <spinscale/co/nonViralTaskNursery.h>
#include <spinscale/syncCancelerForAsyncWork.h>
namespace {
struct ResumeGate
{
std::coroutine_handle<> waitingHandle;
bool await_ready() const noexcept
{ return false; }
bool await_suspend(std::coroutine_handle<> callerHandle) noexcept
{
waitingHandle = callerHandle;
return true;
}
void await_resume() const noexcept
{}
};
sscl::co::NonViralNonPostingInvoker immediateCompleteCReq(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
co_return;
}
sscl::co::NonViralNonPostingInvoker throwingCompleteCReq(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
throw std::runtime_error("nursery test failure");
co_return;
}
sscl::co::NonViralNonPostingInvoker suspendUntilResumeCReq(
std::exception_ptr &exceptionPtr,
std::function<void()> completion,
ResumeGate &gate)
{
(void)exceptionPtr;
(void)completion;
co_await gate;
co_return;
}
sscl::co::NonViralNonPostingInvoker cancelAwareSuspendCReq(
std::exception_ptr &exceptionPtr,
std::function<void()> completion,
sscl::SyncCancelerForAsyncWork &canceler,
ResumeGate &gate)
{
(void)exceptionPtr;
(void)completion;
while (!canceler.isCancellationRequested())
{
co_await gate;
}
co_return;
}
} // namespace
class NonViralTaskNurseryTest : public ::testing::Test
{
protected:
void SetUp() override
{
nursery.openAdmission();
}
sscl::co::NonViralTaskNursery nursery;
ResumeGate gate;
ResumeGate gate2;
};
TEST_F(NonViralTaskNurseryTest, GetNewSlotLeaseFillCommitRetires)
{
auto lease = nursery.getNewSlotLease();
lease.fillSlot(
[&lease]()
{
return immediateCompleteCReq(
lease.getExceptionStorage(),
lease.getCallerLambda());
});
lease.commit();
EXPECT_TRUE(nursery.allSettled());
EXPECT_EQ(nursery.unsettledCount(), 0U);
}
TEST_F(NonViralTaskNurseryTest, UncommittedLeaseReleasesReservation)
{
EXPECT_EQ(nursery.unsettledCount(), 0U);
{
auto lease = nursery.getNewSlotLease();
(void)lease;
}
EXPECT_TRUE(nursery.allSettled());
}
TEST_F(NonViralTaskNurseryTest, CloseAdmissionRejectsNewLeases)
{
nursery.closeAdmission();
EXPECT_THROW(nursery.getNewSlotLease(), std::runtime_error);
}
TEST_F(NonViralTaskNurseryTest, SetOnSettledHookRejectsAfterFillSlot)
{
auto lease = nursery.getNewSlotLease();
lease.fillSlot(
[&lease]()
{
return immediateCompleteCReq(
lease.getExceptionStorage(),
lease.getCallerLambda());
});
EXPECT_THROW(
lease.setOnSettledHook([](std::exception_ptr &) {}),
std::runtime_error);
lease.commit();
}
TEST_F(NonViralTaskNurseryTest, AsyncAwaitFiresOnDrain)
{
std::atomic<bool> drained{false};
auto lease = nursery.getNewSlotLease();
lease.fillSlot(
[&lease]()
{
return immediateCompleteCReq(
lease.getExceptionStorage(),
lease.getCallerLambda());
});
lease.commit();
nursery.closeAdmission();
nursery.asyncAwaitAllSettlements(
[&drained]()
{
drained.store(true, std::memory_order_release);
});
EXPECT_TRUE(drained.load(std::memory_order_acquire));
}
TEST_F(NonViralTaskNurseryTest, AsyncAwaitRejectsWhenAdmissionOpen)
{
EXPECT_THROW(nursery.asyncAwaitAllSettlements([]() {}), std::runtime_error);
}
TEST_F(NonViralTaskNurseryTest, SecondDrainWaiterThrows)
{
auto lease = nursery.getNewSlotLease();
lease.fillSlot(
[&lease, this]()
{
return suspendUntilResumeCReq(
lease.getExceptionStorage(),
lease.getCallerLambda(),
gate);
});
lease.commit();
nursery.closeAdmission();
bool firstWaiterRegistered = false;
nursery.asyncAwaitAllSettlements(
[&firstWaiterRegistered]()
{
firstWaiterRegistered = true;
});
EXPECT_FALSE(firstWaiterRegistered);
EXPECT_THROW(
nursery.asyncAwaitAllSettlements([]() {}),
std::runtime_error);
if (gate.waitingHandle)
{
gate.waitingHandle.resume();
}
}
TEST_F(NonViralTaskNurseryTest, SyncAwaitNestedRun)
{
boost::asio::io_context ioContext;
auto lease = nursery.getNewSlotLease();
lease.fillSlot(
[&lease, this]()
{
return suspendUntilResumeCReq(
lease.getExceptionStorage(),
lease.getCallerLambda(),
gate);
});
lease.commit();
std::thread awaitThread(
[this, &ioContext]()
{
nursery.closeAdmission();
nursery.syncAwaitAllSettlements(ioContext);
});
std::this_thread::sleep_for(std::chrono::milliseconds(10));
ASSERT_TRUE(static_cast<bool>(gate.waitingHandle));
gate.waitingHandle.resume();
awaitThread.join();
EXPECT_TRUE(nursery.allSettled());
}
TEST_F(NonViralTaskNurseryTest, RequestCancelOnAllDoesNotDestroyInvokers)
{
auto lease = nursery.getNewSlotLease();
lease.getSyncCanceler().startAcceptingWork();
lease.fillSlot(
[&lease, this]()
{
return suspendUntilResumeCReq(
lease.getExceptionStorage(),
lease.getCallerLambda(),
gate);
});
lease.commit();
EXPECT_EQ(nursery.unsettledCount(), 1U);
nursery.requestCancelOnAll();
EXPECT_EQ(nursery.unsettledCount(), 1U);
ASSERT_TRUE(static_cast<bool>(gate.waitingHandle));
gate.waitingHandle.resume();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT_TRUE(nursery.allSettled());
}
TEST_F(NonViralTaskNurseryTest, RequestCancelOnAllStopsCanceler)
{
auto lease = nursery.getNewSlotLease();
lease.getSyncCanceler().startAcceptingWork();
lease.fillSlot(
[&lease, this]()
{
return cancelAwareSuspendCReq(
lease.getExceptionStorage(),
lease.getCallerLambda(),
lease.getSyncCanceler(),
gate);
});
lease.commit();
nursery.requestCancelOnAll();
EXPECT_TRUE(lease.getSyncCanceler().isCancellationRequested());
ASSERT_TRUE(static_cast<bool>(gate.waitingHandle));
gate.waitingHandle.resume();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT_TRUE(nursery.allSettled());
}
TEST_F(NonViralTaskNurseryTest, ExceptionPtrRecorded)
{
std::exception_ptr captured;
auto lease = nursery.getNewSlotLease();
lease.fillSlot(
[&captured, &lease]()
{
std::exception_ptr &exceptionStorage =
lease.getExceptionStorage();
auto invoker = throwingCompleteCReq(
exceptionStorage,
lease.getCallerLambda());
captured = exceptionStorage;
return invoker;
});
lease.commit();
EXPECT_TRUE(captured != nullptr);
EXPECT_TRUE(nursery.allSettled());
}
TEST_F(NonViralTaskNurseryTest, LaunchSugar)
{
auto handle = nursery.launch(
[](sscl::co::NonViralTaskNursery::Slot::Lease &lease)
{
return immediateCompleteCReq(
lease.getExceptionStorage(),
lease.getCallerLambda());
});
EXPECT_TRUE(handle == handle);
EXPECT_TRUE(nursery.allSettled());
}
TEST_F(NonViralTaskNurseryTest, LaunchWithOnSettledHook)
{
std::atomic<bool> hookRan{false};
nursery.launch(
[](sscl::co::NonViralTaskNursery::Slot::Lease &lease)
{
return immediateCompleteCReq(
lease.getExceptionStorage(),
lease.getCallerLambda());
},
[&hookRan](std::exception_ptr &)
{
hookRan.store(true, std::memory_order_release);
});
EXPECT_TRUE(hookRan.load(std::memory_order_acquire));
EXPECT_TRUE(nursery.allSettled());
}
TEST_F(NonViralTaskNurseryTest, HandleStability)
{
auto handle = nursery.launch(
[](sscl::co::NonViralTaskNursery::Slot::Lease &lease)
{
return immediateCompleteCReq(
lease.getExceptionStorage(),
lease.getCallerLambda());
});
sscl::co::NonViralTaskNursery::Slot::Handle copy = handle;
EXPECT_TRUE(handle == copy);
EXPECT_TRUE(nursery.allSettled());
}
TEST_F(NonViralTaskNurseryTest, CommitWithoutFillSlotThrows)
{
auto lease = nursery.getNewSlotLease();
EXPECT_THROW(lease.commit(), std::runtime_error);
}
TEST_F(NonViralTaskNurseryTest, DoubleCommitThrows)
{
auto lease = nursery.getNewSlotLease();
lease.fillSlot(
[&lease]()
{
return immediateCompleteCReq(
lease.getExceptionStorage(),
lease.getCallerLambda());
});
lease.commit();
EXPECT_THROW(lease.commit(), std::runtime_error);
}
TEST_F(NonViralTaskNurseryTest, FillSlotTwiceThrows)
{
auto lease = nursery.getNewSlotLease();
lease.fillSlot(
[&lease, this]()
{
return suspendUntilResumeCReq(
lease.getExceptionStorage(),
lease.getCallerLambda(),
gate);
});
EXPECT_THROW(
lease.fillSlot(
[&lease]()
{
return immediateCompleteCReq(
lease.getExceptionStorage(),
lease.getCallerLambda());
}),
std::runtime_error);
if (gate.waitingHandle) {
gate.waitingHandle.resume();
}
lease.commit();
EXPECT_TRUE(nursery.allSettled());
}
TEST_F(NonViralTaskNurseryTest, SyncAwaitRejectsWhenAdmissionOpen)
{
boost::asio::io_context ioContext;
EXPECT_THROW(
nursery.syncAwaitAllSettlements(ioContext),
std::runtime_error);
}
TEST_F(NonViralTaskNurseryTest, SyncAwaitRejectsStoppedIoContext)
{
auto lease = nursery.getNewSlotLease();
lease.fillSlot(
[&lease, this]()
{
return suspendUntilResumeCReq(
lease.getExceptionStorage(),
lease.getCallerLambda(),
gate);
});
lease.commit();
nursery.closeAdmission();
boost::asio::io_context ioContext;
ioContext.stop();
EXPECT_THROW(
nursery.syncAwaitAllSettlements(ioContext),
std::runtime_error);
if (gate.waitingHandle) {
gate.waitingHandle.resume();
}
}
TEST_F(NonViralTaskNurseryTest, SyncAwaitReturnsImmediatelyWhenDrained)
{
boost::asio::io_context ioContext;
nursery.closeAdmission();
EXPECT_TRUE(nursery.allSettled());
nursery.syncAwaitAllSettlements(ioContext);
EXPECT_TRUE(nursery.allSettled());
}
TEST_F(NonViralTaskNurseryTest, UnsettledCountTracksInFlightTasks)
{
auto lease = nursery.getNewSlotLease();
lease.fillSlot(
[&lease, this]()
{
return suspendUntilResumeCReq(
lease.getExceptionStorage(),
lease.getCallerLambda(),
gate);
});
lease.commit();
EXPECT_EQ(nursery.unsettledCount(), 1U);
EXPECT_FALSE(nursery.allSettled());
if (gate.waitingHandle) {
gate.waitingHandle.resume();
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT_EQ(nursery.unsettledCount(), 0U);
EXPECT_TRUE(nursery.allSettled());
}
TEST_F(NonViralTaskNurseryTest, MultipleTasksDrainTogether)
{
std::atomic<bool> drained{false};
auto lease1 = nursery.getNewSlotLease();
lease1.fillSlot(
[&lease1, this]()
{
return suspendUntilResumeCReq(
lease1.getExceptionStorage(),
lease1.getCallerLambda(),
gate);
});
lease1.commit();
auto lease2 = nursery.getNewSlotLease();
lease2.fillSlot(
[&lease2, this]()
{
return suspendUntilResumeCReq(
lease2.getExceptionStorage(),
lease2.getCallerLambda(),
gate2);
});
lease2.commit();
EXPECT_EQ(nursery.unsettledCount(), 2U);
nursery.closeAdmission();
nursery.asyncAwaitAllSettlements(
[&drained]()
{
drained.store(true, std::memory_order_release);
});
EXPECT_FALSE(drained.load(std::memory_order_acquire));
if (gate.waitingHandle) {
gate.waitingHandle.resume();
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT_FALSE(drained.load(std::memory_order_acquire));
if (gate2.waitingHandle) {
gate2.waitingHandle.resume();
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT_TRUE(drained.load(std::memory_order_acquire));
EXPECT_TRUE(nursery.allSettled());
}
TEST_F(NonViralTaskNurseryTest, OnSettledHookRunsAtRetirement)
{
std::atomic<bool> hookRan{false};
auto lease = nursery.getNewSlotLease();
lease.setOnSettledHook(
[&hookRan](std::exception_ptr &)
{
hookRan.store(true, std::memory_order_release);
});
lease.fillSlot(
[&lease]()
{
return immediateCompleteCReq(
lease.getExceptionStorage(),
lease.getCallerLambda());
});
lease.commit();
EXPECT_TRUE(hookRan.load(std::memory_order_acquire));
}
TEST_F(NonViralTaskNurseryTest, OnSettledHookSeesRetiredSlot)
{
auto lease = nursery.getNewSlotLease();
lease.setOnSettledHook(
[this](std::exception_ptr &)
{
EXPECT_TRUE(nursery.allSettled());
EXPECT_EQ(nursery.unsettledCount(), 0U);
});
lease.fillSlot(
[&lease]()
{
return immediateCompleteCReq(
lease.getExceptionStorage(),
lease.getCallerLambda());
});
lease.commit();
}
TEST_F(NonViralTaskNurseryTest, DuplicateRetireThrows)
{
std::function<void()> completion;
auto lease = nursery.getNewSlotLease();
lease.fillSlot(
[&completion, &lease]()
{
completion = lease.getCallerLambda();
return immediateCompleteCReq(
lease.getExceptionStorage(),
completion);
});
lease.commit();
ASSERT_TRUE(static_cast<bool>(completion));
EXPECT_THROW(completion(), std::runtime_error);
EXPECT_TRUE(nursery.allSettled());
}
TEST_F(NonViralTaskNurseryTest, MovedLeaseTransfersReleaseObligation)
{
EXPECT_EQ(nursery.unsettledCount(), 0U);
{
auto lease = nursery.getNewSlotLease();
auto movedLease = std::move(lease);
(void)movedLease;
}
EXPECT_TRUE(nursery.allSettled());
EXPECT_EQ(nursery.unsettledCount(), 0U);
}
TEST_F(NonViralTaskNurseryTest, LaunchAssignsDistinctHandles)
{
auto handle1 = nursery.launch(
[this](sscl::co::NonViralTaskNursery::Slot::Lease &lease)
{
return suspendUntilResumeCReq(
lease.getExceptionStorage(),
lease.getCallerLambda(),
gate);
});
auto handle2 = nursery.launch(
[this](sscl::co::NonViralTaskNursery::Slot::Lease &lease)
{
return suspendUntilResumeCReq(
lease.getExceptionStorage(),
lease.getCallerLambda(),
gate2);
});
EXPECT_NE(handle1, handle2);
EXPECT_EQ(nursery.unsettledCount(), 2U);
if (gate.waitingHandle) {
gate.waitingHandle.resume();
}
if (gate2.waitingHandle) {
gate2.waitingHandle.resume();
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT_TRUE(nursery.allSettled());
}
TEST_F(NonViralTaskNurseryTest, AdmissionIsOpenReflectsCloseAndOpen)
{
EXPECT_TRUE(nursery.admissionIsOpen());
nursery.closeAdmission();
EXPECT_FALSE(nursery.admissionIsOpen());
nursery.openAdmission();
EXPECT_TRUE(nursery.admissionIsOpen());
}
+252
View File
@@ -0,0 +1,252 @@
#include <exception>
#include <functional>
#include <stdexcept>
#include <string>
#include <gtest/gtest.h>
#include <spinscale/co/postTarget.h>
#include <spinscale/componentThread.h>
#include <support/threadHarness.h>
#include <support/timerAwaiters.h>
namespace {
constexpr int expectedReturnValue = 42;
constexpr int explicitTargetReturnValue = 77;
constexpr const char *expectedThrowMessage =
"posting cross-thread intentional failure";
using CallerNonViralInvoker =
sscl::tests::RoleNonViralPostingInvoker<
sscl::tests::PostingThreadRole::CALLER>;
using CalleeNonViralInvoker =
sscl::tests::RoleNonViralPostingInvoker<
sscl::tests::PostingThreadRole::CALLEE>;
template <typename T>
using CalleeViralInvoker =
sscl::tests::RoleViralPostingInvoker<
sscl::tests::PostingThreadRole::CALLEE,
T>;
CalleeViralInvoker<int> returnFromCalleeThread(
sscl::tests::CrossThreadTrace &trace)
{
trace.recordCalleeExecutionThread();
trace.recordFinalSuspendThread();
co_return expectedReturnValue;
}
CalleeViralInvoker<int> returnFromExplicitTargetThread(
sscl::co::ExplicitPostTarget postTarget,
sscl::tests::CrossThreadTrace &trace)
{
(void)postTarget;
trace.recordCalleeExecutionThread();
trace.recordFinalSuspendThread();
co_return explicitTargetReturnValue;
}
CalleeViralInvoker<int> throwFromCalleeThread(
sscl::tests::CrossThreadTrace &trace)
{
constexpr int throwDelayMs = 1;
const boost::system::error_code waitError =
co_await sscl::tests::DeadlineTimerAwaiter{
sscl::ComponentThread::getSelf()->getIoContext(),
throwDelayMs};
sscl::tests::throwIfTimerWaitFailed(waitError);
trace.recordCalleeExecutionThread();
trace.recordFinalSuspendThread();
throw std::runtime_error(expectedThrowMessage);
}
CallerNonViralInvoker awaitCalleeDriver(
std::exception_ptr &exceptionPtr,
std::function<void()> completion,
sscl::tests::CrossThreadTrace &trace)
{
(void)exceptionPtr;
(void)completion;
const int value = co_await returnFromCalleeThread(trace);
trace.recordAwaitResumeThread();
if (value != expectedReturnValue) {
throw std::runtime_error("Unexpected callee return value");
}
co_return;
}
CallerNonViralInvoker awaitExplicitTargetDriver(
std::exception_ptr &exceptionPtr,
std::function<void()> completion,
sscl::tests::CrossThreadTrace &trace)
{
(void)exceptionPtr;
(void)completion;
sscl::co::ExplicitPostTarget postTarget{
sscl::tests::ThreadRegistry::ioContext(
sscl::tests::PostingThreadRole::ALTERNATE)};
const int value = co_await returnFromExplicitTargetThread(
postTarget,
trace);
trace.recordAwaitResumeThread();
if (value != explicitTargetReturnValue) {
throw std::runtime_error("Unexpected explicit-target return value");
}
co_return;
}
CallerNonViralInvoker awaitThrowingCalleeDriver(
std::exception_ptr &exceptionPtr,
std::function<void()> completion,
sscl::tests::CrossThreadTrace &trace)
{
(void)exceptionPtr;
(void)completion;
try {
(void)co_await throwFromCalleeThread(trace);
throw std::runtime_error("Expected callee exception");
}
catch (const std::runtime_error &runtimeError) {
trace.recordAwaitResumeThread();
if (std::string(runtimeError.what()) != expectedThrowMessage) {
throw std::runtime_error("Unexpected callee exception message");
}
}
co_return;
}
CalleeNonViralInvoker nonViralCalleeCompletesToCaller(
std::exception_ptr &exceptionPtr,
std::function<void()> completion,
sscl::tests::CrossThreadTrace &trace)
{
(void)exceptionPtr;
(void)completion;
trace.recordCalleeExecutionThread();
trace.recordFinalSuspendThread();
co_return;
}
class PostingCrossThreadTest
: public ::testing::Test
{
protected:
sscl::tests::PostingThreadSet threads;
};
} // namespace
TEST_F(PostingCrossThreadTest, ViralAwaitPostsCalleeAndResumesCaller)
{
sscl::tests::CrossThreadTrace trace;
ASSERT_NO_THROW(
sscl::tests::runNonViralPostingTask(
threads.caller(),
[&trace](
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
trace.recordConstructionThread();
return awaitCalleeDriver(
exceptionPtr,
std::move(completion),
trace);
}));
EXPECT_EQ(trace.constructionThread(), threads.caller().osThreadId());
EXPECT_EQ(trace.calleeExecutionThread(), threads.callee().osThreadId());
EXPECT_EQ(trace.finalSuspendThread(), threads.callee().osThreadId());
EXPECT_EQ(trace.awaitResumeThread(), threads.caller().osThreadId());
EXPECT_NE(trace.calleeExecutionThread(), trace.awaitResumeThread());
}
TEST_F(PostingCrossThreadTest, NonViralCompletionPostsBackToCaller)
{
sscl::tests::CrossThreadTrace trace;
ASSERT_NO_THROW(
sscl::tests::runNonViralPostingTask(
threads.caller(),
[&trace](
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
trace.recordConstructionThread();
return nonViralCalleeCompletesToCaller(
exceptionPtr,
[&trace, completion = std::move(completion)]() mutable
{
trace.recordCompletionCallbackThread();
completion();
},
trace);
}));
EXPECT_EQ(trace.constructionThread(), threads.caller().osThreadId());
EXPECT_EQ(trace.calleeExecutionThread(), threads.callee().osThreadId());
EXPECT_EQ(trace.finalSuspendThread(), threads.callee().osThreadId());
EXPECT_EQ(trace.completionCallbackThread(), threads.caller().osThreadId());
EXPECT_NE(trace.calleeExecutionThread(), trace.completionCallbackThread());
}
TEST_F(PostingCrossThreadTest, ExplicitPostTargetRoutesCalleeExecution)
{
sscl::tests::CrossThreadTrace trace;
ASSERT_NO_THROW(
sscl::tests::runNonViralPostingTask(
threads.caller(),
[&trace](
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
trace.recordConstructionThread();
return awaitExplicitTargetDriver(
exceptionPtr,
std::move(completion),
trace);
}));
EXPECT_EQ(trace.constructionThread(), threads.caller().osThreadId());
EXPECT_EQ(trace.calleeExecutionThread(), threads.alternate().osThreadId());
EXPECT_EQ(trace.awaitResumeThread(), threads.caller().osThreadId());
EXPECT_NE(trace.calleeExecutionThread(), threads.callee().osThreadId());
EXPECT_NE(trace.calleeExecutionThread(), trace.awaitResumeThread());
}
TEST_F(PostingCrossThreadTest, CalleeExceptionIsObservedOnCallerThread)
{
sscl::tests::CrossThreadTrace trace;
ASSERT_NO_THROW(
sscl::tests::runNonViralPostingTask(
threads.caller(),
[&trace](
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
trace.recordConstructionThread();
return awaitThrowingCalleeDriver(
exceptionPtr,
std::move(completion),
trace);
}));
EXPECT_EQ(trace.constructionThread(), threads.caller().osThreadId());
EXPECT_EQ(trace.calleeExecutionThread(), threads.callee().osThreadId());
EXPECT_EQ(trace.awaitResumeThread(), threads.caller().osThreadId());
EXPECT_NE(trace.calleeExecutionThread(), trace.awaitResumeThread());
}
+633
View File
@@ -0,0 +1,633 @@
#include <chrono>
#include <exception>
#include <memory>
#include <stdexcept>
#include <string>
#include <thread>
#include <utility>
#include <gtest/gtest.h>
#include <boost/asio/io_context.hpp>
#include <boost/system/error_code.hpp>
#include <spinscale/co/invokers.h>
#include <spinscale/co/group.h>
#include <spinscale/componentThread.h>
#include <support/coroutineDriver.h>
#include <support/groupAssertions.h>
#include <support/threadHarness.h>
#include <support/timerAwaiters.h>
namespace {
constexpr int delayShortMs = 50;
constexpr int expectedNonStdThrowValue = 42;
constexpr const char *expectedThrowMessage =
"viral_non_posting_test intentional failure";
template <typename T>
using TestInvoker = sscl::co::ViralNonPostingInvoker<T>;
using TestDriver = TestInvoker<int>;
using TestVoidDriver = TestInvoker<void>;
using CallerPostingDriver =
sscl::tests::RoleNonViralPostingInvoker<
sscl::tests::PostingThreadRole::CALLER>;
struct ThreadIdPair
{
std::thread::id callerIdAtCoAwait;
std::thread::id calleeId;
};
struct MoveCountedInt
{
std::shared_ptr<std::size_t> moveCount;
int value = 0;
MoveCountedInt() = default;
MoveCountedInt(
std::shared_ptr<std::size_t> moveCountIn,
int valueIn)
: moveCount(std::move(moveCountIn)),
value(valueIn)
{}
MoveCountedInt(const MoveCountedInt &) = delete;
MoveCountedInt &operator=(const MoveCountedInt &) = delete;
MoveCountedInt(MoveCountedInt &&other) noexcept
: moveCount(std::exchange(other.moveCount, {})),
value(other.value)
{
if (moveCount) {
++(*moveCount);
}
}
MoveCountedInt &operator=(MoveCountedInt &&other) noexcept
{
moveCount = std::exchange(other.moveCount, {});
value = other.value;
return *this;
}
};
template <typename T>
struct CountingAwaiter
{
TestInvoker<T> &invoker;
std::size_t &awaitResumeCallCount;
bool await_ready() const noexcept
{ return invoker.await_ready(); }
template <typename CallerPromise>
bool await_suspend(
std::coroutine_handle<CallerPromise> callerSchedHandle) noexcept
{ return invoker.await_suspend(callerSchedHandle); }
auto await_resume()
{
++awaitResumeCallCount;
return invoker.await_resume();
}
};
class ViralNonPostingTest
: public ::testing::Test
{
protected:
void TearDown() override
{
ioContext.restart();
}
int runDriver(TestDriver &driver)
{
return sscl::tests::CoroutineDriver::pumpUntilIdleAndReturnValue(
ioContext,
driver);
}
int finishDriver(TestDriver &driver)
{
return sscl::tests::CoroutineDriver::completedReturnValue(driver);
}
boost::asio::io_context ioContext;
};
TestInvoker<int> returnLabelImmediately(int label)
{
co_return label;
}
TestInvoker<int> waitAndReturnLabel(
boost::asio::io_context &ioContext,
int delayMilliseconds)
{
const boost::system::error_code waitError =
co_await sscl::tests::DeadlineTimerAwaiter{
ioContext,
delayMilliseconds};
sscl::tests::throwIfTimerWaitFailed(waitError);
co_return delayMilliseconds;
}
TestVoidDriver voidReturnImmediately()
{
co_return;
}
TestVoidDriver voidMemberAfterDelay(
boost::asio::io_context &ioContext,
int delayMilliseconds)
{
const boost::system::error_code waitError =
co_await sscl::tests::DeadlineTimerAwaiter{
ioContext,
delayMilliseconds};
sscl::tests::throwIfTimerWaitFailed(waitError);
co_return;
}
TestInvoker<int> throwRuntimeErrorImmediately()
{
throw std::runtime_error(expectedThrowMessage);
}
TestInvoker<int> throwIntImmediately()
{
throw expectedNonStdThrowValue;
}
TestInvoker<ThreadIdPair> recordThreadIdsAtReturn()
{
ThreadIdPair pair;
pair.calleeId = std::this_thread::get_id();
co_return pair;
}
TestInvoker<ThreadIdPair> recordThreadIdsAfterDelay(
boost::asio::io_context &ioContext,
int delayMilliseconds)
{
const boost::system::error_code waitError =
co_await sscl::tests::DeadlineTimerAwaiter{
ioContext,
delayMilliseconds};
sscl::tests::throwIfTimerWaitFailed(waitError);
ThreadIdPair pair;
pair.calleeId = std::this_thread::get_id();
co_return pair;
}
TestInvoker<MoveCountedInt> returnMoveCountedInt(
std::shared_ptr<std::size_t> moveCount,
int value)
{
co_return MoveCountedInt{std::move(moveCount), value};
}
TestInvoker<int> innerDelayedCoAwait(
boost::asio::io_context &ioContext,
int delayMilliseconds)
{
const int label = co_await waitAndReturnLabel(
ioContext,
delayMilliseconds);
co_return label;
}
TestInvoker<int> nestedNonPostingSum(int left, int right)
{
const int leftSum = co_await returnLabelImmediately(left);
const int rightSum = co_await returnLabelImmediately(right);
co_return leftSum + rightSum;
}
TestInvoker<int> outerCoAwaitingDelayedInner(
boost::asio::io_context &ioContext,
int delayMilliseconds)
{
const int innerLabel = co_await innerDelayedCoAwait(
ioContext,
delayMilliseconds);
co_return innerLabel + 1;
}
TestDriver testImmediateReturnFastPath()
{
const int value = co_await returnLabelImmediately(42);
if (value != 42) {
throw std::runtime_error("immediateReturnFastPath value mismatch");
}
co_return 0;
}
TestDriver testAllCompleteBeforeCoAwait()
{
TestInvoker<int> invokerTen = returnLabelImmediately(10);
TestInvoker<int> invokerTwenty = returnLabelImmediately(20);
TestInvoker<int> invokerThirty = returnLabelImmediately(30);
const int valueTen = co_await invokerTen;
const int valueTwenty = co_await invokerTwenty;
const int valueThirty = co_await invokerThirty;
if (valueTen != 10 || valueTwenty != 20 || valueThirty != 30) {
throw std::runtime_error("allCompleteBeforeCoAwait label mismatch");
}
co_return 0;
}
TestDriver testCallerSuspendsThenResumes(boost::asio::io_context &ioContext)
{
const int value = co_await waitAndReturnLabel(ioContext, delayShortMs);
if (value != delayShortMs) {
throw std::runtime_error("callerSuspendsThenResumes label mismatch");
}
co_return 0;
}
TestDriver testMixedImmediateAndDelayedInSequence(
boost::asio::io_context &ioContext)
{
const int immediate = co_await returnLabelImmediately(7);
const int delayed = co_await waitAndReturnLabel(ioContext, delayShortMs);
if (immediate != 7 || delayed != delayShortMs) {
throw std::runtime_error("mixedImmediateAndDelayed label mismatch");
}
co_return 0;
}
TestDriver testAwaitResumeCalledOnceFastPath()
{
std::size_t awaitResumeCallCount = 0;
TestInvoker<int> invoker = returnLabelImmediately(42);
const int value = co_await CountingAwaiter<int>{
invoker,
awaitResumeCallCount};
if (value != 42 || awaitResumeCallCount != 1) {
throw std::runtime_error("fast path await_resume count mismatch");
}
co_return 0;
}
TestDriver testAwaitResumeCalledOnceSlowPath(
boost::asio::io_context &ioContext)
{
std::size_t awaitResumeCallCount = 0;
TestInvoker<int> invoker = waitAndReturnLabel(ioContext, delayShortMs);
const int value = co_await CountingAwaiter<int>{
invoker,
awaitResumeCallCount};
if (value != delayShortMs || awaitResumeCallCount != 1) {
throw std::runtime_error("slow path await_resume count mismatch");
}
co_return 0;
}
TestDriver testAwaitResumeCalledOnceNested(
boost::asio::io_context &ioContext)
{
std::size_t awaitResumeCallCount = 0;
TestInvoker<int> inner = innerDelayedCoAwait(ioContext, delayShortMs);
const int value = co_await CountingAwaiter<int>{
inner,
awaitResumeCallCount};
if (value != delayShortMs || awaitResumeCallCount != 1) {
throw std::runtime_error("nested await_resume count mismatch");
}
co_return 0;
}
TestDriver testMoveCountedReturnNotDoubleMoved()
{
auto moveCount = std::make_shared<std::size_t>(0);
TestInvoker<MoveCountedInt> invoker =
returnMoveCountedInt(moveCount, 99);
MoveCountedInt result = co_await invoker;
if (result.value != 99) {
throw std::runtime_error("move counted value mismatch");
}
if (*moveCount > 2 || *moveCount < 1) {
throw std::runtime_error("move counted return move-count mismatch");
}
co_return 0;
}
TestDriver testVoidReturnCompletes()
{
co_await voidReturnImmediately();
co_return 0;
}
TestDriver testReturnValuesReadableBeforeDestroy()
{
TestInvoker<int> invoker = returnLabelImmediately(55);
(void)co_await invoker;
if (invoker.completedReturnValues().myReturnValue != 55) {
throw std::runtime_error("completed return value not readable");
}
co_return 0;
}
TestDriver testExceptionRethrowsOnCoAwait()
{
try {
(void)co_await throwRuntimeErrorImmediately();
throw std::runtime_error("expected runtime_error");
}
catch (const std::runtime_error &runtimeError) {
if (std::string(runtimeError.what()) != expectedThrowMessage) {
throw std::runtime_error("unexpected runtime_error message");
}
}
co_return 0;
}
TestDriver testNonStdExceptionRethrows()
{
try {
(void)co_await throwIntImmediately();
throw std::runtime_error("expected int exception");
}
catch (int caughtValue) {
if (caughtValue != expectedNonStdThrowValue) {
throw std::runtime_error("unexpected int exception value");
}
}
co_return 0;
}
TestDriver testCalleeRunsOnCallerThread()
{
const std::thread::id callerThreadId = std::this_thread::get_id();
const ThreadIdPair pair = co_await recordThreadIdsAtReturn();
if (pair.calleeId != callerThreadId) {
throw std::runtime_error("callee thread mismatch");
}
co_return 0;
}
TestDriver testDelayedCalleeStillOnCallerThread(
boost::asio::io_context &ioContext)
{
const std::thread::id callerThreadId = std::this_thread::get_id();
const ThreadIdPair pair =
co_await recordThreadIdsAfterDelay(ioContext, delayShortMs);
if (pair.calleeId != callerThreadId) {
throw std::runtime_error("delayed callee thread mismatch");
}
co_return 0;
}
TestDriver testNestedNonPostingCoAwait()
{
const int sum = co_await nestedNonPostingSum(10, 32);
if (sum != 42) {
throw std::runtime_error("nested sum mismatch");
}
co_return 0;
}
TestDriver testNestedInnerSuspension(boost::asio::io_context &ioContext)
{
const int value = co_await outerCoAwaitingDelayedInner(
ioContext,
delayShortMs);
if (value != delayShortMs + 1) {
throw std::runtime_error("nested inner suspension value mismatch");
}
co_return 0;
}
CallerPostingDriver nonPostingVoidMemberInGroupDriver(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
TestVoidDriver voidInvoker = voidMemberAfterDelay(
sscl::ComponentThread::getSelf()->getIoContext(),
delayShortMs);
group.add(voidInvoker);
auto &allDescriptors = co_await group.getAwaitAllSettlementsInvoker();
if (allDescriptors.size() != 1) {
throw std::runtime_error("voidMemberInGroup count mismatch");
}
sscl::tests::requireCompletedSettlement(allDescriptors[0]);
co_return;
}
CallerPostingDriver nonPostingGroupMixedImmediateAndDelayedDriver(
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
(void)exceptionPtr;
(void)completion;
sscl::co::Group group;
TestInvoker<int> immediateInvoker = returnLabelImmediately(11);
TestInvoker<int> delayedInvoker = waitAndReturnLabel(
sscl::ComponentThread::getSelf()->getIoContext(),
delayShortMs);
group.add(immediateInvoker);
group.add(delayedInvoker);
auto &allDescriptors = co_await group.getAwaitAllSettlementsInvoker();
if (allDescriptors.size() != 2) {
throw std::runtime_error("groupMixedImmediateAndDelayed count mismatch");
}
bool sawImmediate = false;
bool sawDelayed = false;
for (auto &descriptor : allDescriptors) {
sscl::tests::requireCompletedSettlement(descriptor);
const int label = sscl::tests::completedIntValue(
descriptor.invokerAs<TestInvoker<int>>());
if (label == 11) {
sawImmediate = true;
}
else if (label == delayShortMs) {
sawDelayed = true;
}
else {
throw std::runtime_error(
"groupMixedImmediateAndDelayed unexpected label");
}
}
if (!sawImmediate || !sawDelayed) {
throw std::runtime_error(
"groupMixedImmediateAndDelayed missing expected label");
}
co_return;
}
} // namespace
TEST_F(ViralNonPostingTest, ImmediateReturnFastPath)
{
TestDriver driver = testImmediateReturnFastPath();
EXPECT_NO_THROW({ EXPECT_EQ(finishDriver(driver), 0); });
}
TEST_F(ViralNonPostingTest, AllCompleteBeforeCoAwait)
{
TestDriver driver = testAllCompleteBeforeCoAwait();
EXPECT_NO_THROW({ EXPECT_EQ(finishDriver(driver), 0); });
}
TEST_F(ViralNonPostingTest, CallerSuspendsThenResumes)
{
TestDriver driver = testCallerSuspendsThenResumes(ioContext);
EXPECT_NO_THROW({ EXPECT_EQ(runDriver(driver), 0); });
}
TEST_F(ViralNonPostingTest, MixedImmediateAndDelayedInSequence)
{
TestDriver driver = testMixedImmediateAndDelayedInSequence(ioContext);
EXPECT_NO_THROW({ EXPECT_EQ(runDriver(driver), 0); });
}
TEST_F(ViralNonPostingTest, AwaitResumeCalledOnceFastPath)
{
TestDriver driver = testAwaitResumeCalledOnceFastPath();
EXPECT_NO_THROW({ EXPECT_EQ(finishDriver(driver), 0); });
}
TEST_F(ViralNonPostingTest, AwaitResumeCalledOnceSlowPath)
{
TestDriver driver = testAwaitResumeCalledOnceSlowPath(ioContext);
EXPECT_NO_THROW({ EXPECT_EQ(runDriver(driver), 0); });
}
TEST_F(ViralNonPostingTest, AwaitResumeCalledOnceNested)
{
TestDriver driver = testAwaitResumeCalledOnceNested(ioContext);
EXPECT_NO_THROW({ EXPECT_EQ(runDriver(driver), 0); });
}
TEST_F(ViralNonPostingTest, MoveCountedReturnNotDoubleMoved)
{
TestDriver driver = testMoveCountedReturnNotDoubleMoved();
EXPECT_NO_THROW({ EXPECT_EQ(finishDriver(driver), 0); });
}
TEST_F(ViralNonPostingTest, VoidReturnCompletes)
{
TestDriver driver = testVoidReturnCompletes();
EXPECT_NO_THROW({ EXPECT_EQ(finishDriver(driver), 0); });
}
TEST_F(ViralNonPostingTest, ReturnValuesReadableBeforeDestroy)
{
TestDriver driver = testReturnValuesReadableBeforeDestroy();
EXPECT_NO_THROW({ EXPECT_EQ(finishDriver(driver), 0); });
}
TEST_F(ViralNonPostingTest, ExceptionRethrowsOnCoAwait)
{
TestDriver driver = testExceptionRethrowsOnCoAwait();
EXPECT_NO_THROW({ EXPECT_EQ(finishDriver(driver), 0); });
}
TEST_F(ViralNonPostingTest, NonStdExceptionRethrows)
{
TestDriver driver = testNonStdExceptionRethrows();
EXPECT_NO_THROW({ EXPECT_EQ(finishDriver(driver), 0); });
}
TEST_F(ViralNonPostingTest, CalleeRunsOnCallerThread)
{
TestDriver driver = testCalleeRunsOnCallerThread();
EXPECT_NO_THROW({ EXPECT_EQ(finishDriver(driver), 0); });
}
TEST_F(ViralNonPostingTest, DelayedCalleeStillOnCallerThread)
{
TestDriver driver = testDelayedCalleeStillOnCallerThread(ioContext);
EXPECT_NO_THROW({ EXPECT_EQ(runDriver(driver), 0); });
}
TEST_F(ViralNonPostingTest, NestedNonPostingCoAwait)
{
TestDriver driver = testNestedNonPostingCoAwait();
EXPECT_NO_THROW({ EXPECT_EQ(finishDriver(driver), 0); });
}
TEST_F(ViralNonPostingTest, NestedInnerSuspension)
{
TestDriver driver = testNestedInnerSuspension(ioContext);
EXPECT_NO_THROW({ EXPECT_EQ(runDriver(driver), 0); });
}
TEST(ViralNonPostingGroupIntegrationTest, VoidMemberInGroup)
{
sscl::tests::PostingThreadSet threads;
ASSERT_NO_THROW(
sscl::tests::runNonViralPostingTask(
threads.caller(),
[](
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
return nonPostingVoidMemberInGroupDriver(
exceptionPtr,
std::move(completion));
}));
}
TEST(ViralNonPostingGroupIntegrationTest, MixedImmediateAndDelayedInGroup)
{
sscl::tests::PostingThreadSet threads;
ASSERT_NO_THROW(
sscl::tests::runNonViralPostingTask(
threads.caller(),
[](
std::exception_ptr &exceptionPtr,
std::function<void()> completion)
{
return nonPostingGroupMixedImmediateAndDelayedDriver(
exceptionPtr,
std::move(completion));
}));
}
+371
View File
@@ -0,0 +1,371 @@
#include <gtest/gtest.h>
#include <spinscale/cps/qutex.h>
#include <spinscale/cps/lockerAndInvokerBase.h>
#include <memory>
#include <stdexcept>
#include <thread>
#include <chrono>
#include <vector>
namespace smo {
// Mock implementation of LockerAndInvokerBase for testing
class MockLockerAndInvoker : public sscl::cps::LockerAndInvokerBase {
public:
explicit MockLockerAndInvoker(const void* addr)
: sscl::cps::LockerAndInvokerBase(addr), awakened(false) {}
bool awakened;
mutable sscl::cps::Qutex* registeredQutex = nullptr;
mutable sscl::cps::LockerAndInvokerBase::List::iterator queueIterator;
sscl::cps::LockerAndInvokerBase::List::iterator
getLockvokerIteratorForQutex(sscl::cps::Qutex& qutex) const override
{
registeredQutex = &qutex;
for (auto it = qutex.queue.begin(); it != qutex.queue.end(); ++it)
{
if ((**it) == *this)
{
queueIterator = it;
return it;
}
}
throw std::runtime_error(
"MockLockerAndInvoker: not registered in qutex queue");
}
void awaken(bool forceAwaken = false) override
{
(void)forceAwaken;
awakened = true;
}
size_t getLockSetSize() const override
{
return 1;
}
sscl::cps::Qutex& getLockAt(size_t index) const override
{
if (index != 0 || registeredQutex == nullptr)
{
throw std::runtime_error(
"MockLockerAndInvoker: invalid lock index or no registered qutex");
}
return *registeredQutex;
}
};
class QutexTest : public ::testing::Test {
protected:
void SetUp() override {
// Create mock lockvokers with unique addresses
mock1 = std::make_shared<MockLockerAndInvoker>(&addr1);
mock2 = std::make_shared<MockLockerAndInvoker>(&addr2);
mock3 = std::make_shared<MockLockerAndInvoker>(&addr3);
mock4 = std::make_shared<MockLockerAndInvoker>(&addr4);
mock5 = std::make_shared<MockLockerAndInvoker>(&addr5);
}
void TearDown() override {
// Clean up
}
sscl::cps::Qutex qutex{"test-qutex"};
std::shared_ptr<MockLockerAndInvoker> mock1, mock2, mock3, mock4, mock5;
// Unique addresses for testing
int addr1 = 1;
int addr2 = 2;
int addr3 = 3;
int addr4 = 4;
int addr5 = 5;
};
// Test basic queue registration and unregistration
TEST_F(QutexTest, QueueRegistrationAndUnregistration) {
// Register mock1 in queue
auto it1 = qutex.registerInQueue(mock1);
EXPECT_EQ(qutex.queue.size(), 1);
EXPECT_FALSE(qutex.isOwned);
// Register mock2 in queue
auto it2 = qutex.registerInQueue(mock2);
EXPECT_EQ(qutex.queue.size(), 2);
// Unregister mock1
qutex.unregisterFromQueue(it1);
EXPECT_EQ(qutex.queue.size(), 1);
// Unregister mock2
qutex.unregisterFromQueue(it2);
EXPECT_EQ(qutex.queue.size(), 0);
}
// Test single lock acquisition when queue is empty
TEST_F(QutexTest, SingleLockAcquisitionEmptyQueue) {
// Register mock1
(void)qutex.registerInQueue(mock1);
// Try to acquire with nRequiredLocks = 1
bool acquired = qutex.tryAcquire(*mock1, 1);
EXPECT_TRUE(acquired);
EXPECT_TRUE(qutex.isOwned);
}
// Test single lock acquisition when at front of queue
TEST_F(QutexTest, SingleLockAcquisitionAtFront) {
// Register multiple lockvokers
(void)qutex.registerInQueue(mock1);
(void)qutex.registerInQueue(mock2);
(void)qutex.registerInQueue(mock3);
// mock1 should be at front, mock3 at back
EXPECT_EQ(qutex.queue.front().get(), mock1.get());
EXPECT_EQ(qutex.queue.back().get(), mock3.get());
// mock1 (at front) should succeed
bool acquired = qutex.tryAcquire(*mock1, 1);
EXPECT_TRUE(acquired);
EXPECT_TRUE(qutex.isOwned);
// mock2 (not at front) should fail
qutex.isOwned = false; // Reset for testing
bool acquired2 = qutex.tryAcquire(*mock2, 1);
EXPECT_FALSE(acquired2);
}
// Test single lock acquisition failure when not at front
TEST_F(QutexTest, SingleLockAcquisitionNotAtFront) {
// Register multiple lockvokers
(void)qutex.registerInQueue(mock1);
(void)qutex.registerInQueue(mock2);
// mock2 (not at front) should fail
bool acquired = qutex.tryAcquire(*mock2, 1);
EXPECT_FALSE(acquired);
EXPECT_FALSE(qutex.isOwned);
}
// Test multi-lock acquisition (nRequiredLocks > 1)
TEST_F(QutexTest, MultiLockAcquisition) {
// Register 4 lockvokers
(void)qutex.registerInQueue(mock1);
(void)qutex.registerInQueue(mock2);
(void)qutex.registerInQueue(mock3);
(void)qutex.registerInQueue(mock4);
// For nRequiredLocks = 2, need to be in top 50% (top 2 out of 4)
// mock1 (position 1) should succeed
bool acquired1 = qutex.tryAcquire(*mock1, 2);
EXPECT_TRUE(acquired1);
// Reset for next test
qutex.isOwned = false;
// mock2 (position 2) should succeed
bool acquired2 = qutex.tryAcquire(*mock2, 2);
EXPECT_TRUE(acquired2);
// Reset for next test
qutex.isOwned = false;
// mock3 (position 3) should fail (in bottom 50%)
bool acquired3 = qutex.tryAcquire(*mock3, 2);
EXPECT_FALSE(acquired3);
// Reset for next test
qutex.isOwned = false;
// mock4 (position 4) should fail (in bottom 50%)
bool acquired4 = qutex.tryAcquire(*mock4, 2);
EXPECT_FALSE(acquired4);
}
// Test multi-lock acquisition with 3 required locks
TEST_F(QutexTest, MultiLockAcquisitionThreeLocks) {
// Register 6 lockvokers
(void)qutex.registerInQueue(mock1);
(void)qutex.registerInQueue(mock2);
(void)qutex.registerInQueue(mock3);
(void)qutex.registerInQueue(mock4);
(void)qutex.registerInQueue(mock5);
// Create one more mock
int addr6 = 6;
auto mock6 = std::make_shared<MockLockerAndInvoker>(&addr6);
(void)qutex.registerInQueue(mock6);
// For nRequiredLocks = 3, need to be in top 66% (top 4 out of 6)
// Positions 1, 2, 3, 4 should succeed
// Positions 5, 6 should fail
bool acquired1 = qutex.tryAcquire(*mock1, 3);
EXPECT_TRUE(acquired1);
qutex.isOwned = false;
bool acquired2 = qutex.tryAcquire(*mock2, 3);
EXPECT_TRUE(acquired2);
qutex.isOwned = false;
bool acquired3 = qutex.tryAcquire(*mock3, 3);
EXPECT_TRUE(acquired3);
qutex.isOwned = false;
bool acquired4 = qutex.tryAcquire(*mock4, 3);
EXPECT_TRUE(acquired4);
qutex.isOwned = false;
bool acquired5 = qutex.tryAcquire(*mock5, 3);
EXPECT_FALSE(acquired5);
qutex.isOwned = false;
bool acquired6 = qutex.tryAcquire(*mock6, 3);
EXPECT_FALSE(acquired6);
}
// Test acquisition failure when already owned
TEST_F(QutexTest, AcquisitionFailureWhenOwned) {
// Register mock1
(void)qutex.registerInQueue(mock1);
// Manually set as owned
qutex.isOwned = true;
// Try to acquire should fail
bool acquired = qutex.tryAcquire(*mock1, 1);
EXPECT_FALSE(acquired);
EXPECT_TRUE(qutex.isOwned);
}
// Test backoff with single item (should not rotate)
TEST_F(QutexTest, BackoffSingleItem) {
// Register only one lockvoker
(void)qutex.registerInQueue(mock1);
// Set as owned first
qutex.isOwned = true;
// nRequiredLocks > 1 avoids the "front item with nRequiredLocks==1" guard
mock1->awakened = false;
qutex.backoff(*mock1, 2);
EXPECT_FALSE(qutex.isOwned);
EXPECT_EQ(qutex.queue.size(), 1u);
// Should not awaken since there's only one item
EXPECT_FALSE(mock1->awakened);
}
// Test backoff with multiple items and rotation
TEST_F(QutexTest, BackoffWithRotation) {
// Register multiple lockvokers
(void)qutex.registerInQueue(mock1);
(void)qutex.registerInQueue(mock2);
(void)qutex.registerInQueue(mock3);
// Set as owned first
qutex.isOwned = true;
// mock1 should be at front initially
EXPECT_EQ(qutex.queue.front().get(), mock1.get());
// Backoff from mock1 (at front) with nRequiredLocks = 2
mock2->awakened = false;
qutex.backoff(*mock1, 2);
// mock1 should have been rotated to position 2
// mock2 should now be at front
EXPECT_EQ(qutex.queue.front().get(), mock2.get());
EXPECT_FALSE(qutex.isOwned);
// mock2 should have been awakened
EXPECT_TRUE(mock2->awakened);
}
// Test backoff with rotation to back when queue smaller than nRequiredLocks
TEST_F(QutexTest, BackoffRotationToBack) {
// Register only 2 lockvokers
(void)qutex.registerInQueue(mock1);
(void)qutex.registerInQueue(mock2);
// Set as owned first
qutex.isOwned = true;
// mock1 should be at front initially
EXPECT_EQ(qutex.queue.front().get(), mock1.get());
EXPECT_EQ(qutex.queue.back().get(), mock2.get());
// Backoff from mock1 with nRequiredLocks = 5 (larger than queue size)
mock2->awakened = false;
qutex.backoff(*mock1, 5);
// mock1 should have been moved to the back
EXPECT_EQ(qutex.queue.front().get(), mock2.get());
EXPECT_EQ(qutex.queue.back().get(), mock1.get());
EXPECT_FALSE(qutex.isOwned);
// mock2 should have been awakened
EXPECT_TRUE(mock2->awakened);
}
// Test release functionality
TEST_F(QutexTest, Release) {
// Register multiple lockvokers
(void)qutex.registerInQueue(mock1);
(void)qutex.registerInQueue(mock2);
ASSERT_TRUE(qutex.tryAcquire(*mock1, 1));
// Release should set isOwned to false and awaken front item
mock1->awakened = false;
qutex.release();
EXPECT_FALSE(qutex.isOwned);
EXPECT_TRUE(mock1->awakened);
}
// Test release without a prior acquire is rejected
TEST_F(QutexTest, ReleaseWithoutAcquireThrows) {
EXPECT_THROW(qutex.release(), std::runtime_error);
EXPECT_TRUE(qutex.queue.empty());
}
// Test exception when trying to acquire from empty queue
TEST_F(QutexTest, ExceptionOnEmptyQueueAcquisition) {
// Don't register any lockvokers
EXPECT_THROW(qutex.tryAcquire(*mock1, 1), std::runtime_error);
}
// Test exception when backoff called on empty queue
TEST_F(QutexTest, ExceptionOnEmptyQueueBackoff) {
// Don't register any lockvokers
EXPECT_THROW(qutex.backoff(*mock1, 1), std::runtime_error);
}
// Test edge case: single lockvoker with multiple required locks
TEST_F(QutexTest, SingleLockvokerMultipleRequiredLocks) {
// Register only one lockvoker
(void)qutex.registerInQueue(mock1);
// Should succeed regardless of nRequiredLocks when only one item
bool acquired = qutex.tryAcquire(*mock1, 5);
EXPECT_TRUE(acquired);
EXPECT_TRUE(qutex.isOwned);
}
// Test unregistration without locking
TEST_F(QutexTest, UnregistrationWithoutLocking) {
// Register lockvoker
auto it1 = qutex.registerInQueue(mock1);
EXPECT_EQ(qutex.queue.size(), 1);
// Unregister without locking
qutex.unregisterFromQueue(it1, false);
EXPECT_EQ(qutex.queue.size(), 0);
}
} // namespace smo
+157
View File
@@ -0,0 +1,157 @@
#include <cstdlib>
#include <chrono>
#include <filesystem>
#include <fstream>
#include <sstream>
#include <string>
#include <vector>
#include <gtest/gtest.h>
#include <spinscale/envKvStore.h>
namespace {
class EnvKvStoreTest
: public testing::Test
{
protected:
void SetUp() override
{
root = std::filesystem::temp_directory_path()
/ ("spinscale-env-test-"
+ std::to_string(std::chrono::steady_clock::now()
.time_since_epoch().count())
+ "-" + std::to_string(testCounter++));
std::filesystem::create_directories(root);
unsetenv("SSCL_ENV_TEST_VALUE");
}
void TearDown() override
{
unsetenv("SSCL_ENV_TEST_VALUE");
std::filesystem::remove_all(root);
}
std::filesystem::path writeFile(
const std::string &filename,
const std::string &contents)
{
std::filesystem::path path = root / filename;
std::ofstream file(path);
file << contents;
return path;
}
std::filesystem::path root;
static inline int testCounter = 0;
};
} // namespace
TEST_F(EnvKvStoreTest, ParsesSupportedDotenvForms)
{
std::filesystem::path envFile = writeFile(
"one.env",
"\n"
"# comment\n"
"PLAIN=value\n"
" TRIMMED = value with spaces \n"
"SINGLE=' preserved value '\n"
"DOUBLE=\"another preserved value\"\n"
"ESCAPED=\"quote: \\\" slash: \\\\ tab: \\t\"\n"
"COMMENTED=value # comment\n");
std::ostringstream warnings;
sscl::EnvKvStore store({envFile}, warnings);
EXPECT_EQ(store.get("PLAIN"), "value");
EXPECT_EQ(store.get("TRIMMED"), "value with spaces");
EXPECT_EQ(store.get("SINGLE"), " preserved value ");
EXPECT_EQ(store.get("DOUBLE"), "another preserved value");
EXPECT_EQ(store.get("ESCAPED"), "quote: \" slash: \\ tab: \t");
EXPECT_EQ(store.get("COMMENTED"), "value");
EXPECT_TRUE(warnings.str().empty());
}
TEST_F(EnvKvStoreTest, LaterFilesOverwriteEarlierFilesAndWarn)
{
std::filesystem::path first = writeFile("first.env", "VALUE=first\n");
std::filesystem::path second = writeFile("second.env", "VALUE=second\n");
std::ostringstream warnings;
sscl::EnvKvStore store({first, second}, warnings);
EXPECT_EQ(store.get("VALUE"), "second");
EXPECT_NE(warnings.str().find("VALUE"), std::string::npos);
EXPECT_NE(warnings.str().find("first"), std::string::npos);
EXPECT_NE(warnings.str().find("second"), std::string::npos);
EXPECT_NE(warnings.str().find(second.string()), std::string::npos);
}
TEST_F(EnvKvStoreTest, DuplicateKeysInsideSameFileOverwriteAndWarn)
{
std::filesystem::path envFile =
writeFile("one.env", "VALUE=first\nVALUE=second\n");
std::ostringstream warnings;
sscl::EnvKvStore store({envFile}, warnings);
EXPECT_EQ(store.get("VALUE"), "second");
EXPECT_NE(warnings.str().find("VALUE"), std::string::npos);
EXPECT_NE(warnings.str().find("first"), std::string::npos);
EXPECT_NE(warnings.str().find("second"), std::string::npos);
EXPECT_NE(warnings.str().find(envFile.string()), std::string::npos);
}
TEST_F(EnvKvStoreTest, ProcessEnvironmentOverridesStoreSilently)
{
std::filesystem::path envFile =
writeFile("one.env", "SSCL_ENV_TEST_VALUE=file\n");
setenv("SSCL_ENV_TEST_VALUE", "process", 1);
std::ostringstream warnings;
sscl::EnvKvStore store({envFile}, warnings);
EXPECT_EQ(store.get("SSCL_ENV_TEST_VALUE"), "process");
EXPECT_TRUE(warnings.str().empty());
}
TEST_F(EnvKvStoreTest, EmptyProcessEnvironmentValueOverridesStoreSilently)
{
std::filesystem::path envFile =
writeFile("one.env", "SSCL_ENV_TEST_VALUE=file\n");
setenv("SSCL_ENV_TEST_VALUE", "", 1);
std::ostringstream warnings;
sscl::EnvKvStore store({envFile}, warnings);
EXPECT_EQ(store.get("SSCL_ENV_TEST_VALUE"), "");
EXPECT_TRUE(warnings.str().empty());
}
TEST_F(EnvKvStoreTest, MissingFileThrows)
{
std::ostringstream warnings;
EXPECT_THROW(
sscl::EnvKvStore({root / "missing.env"}, warnings),
std::runtime_error);
}
TEST_F(EnvKvStoreTest, MalformedLineThrows)
{
std::filesystem::path envFile = writeFile("bad.env", "NOT AN ASSIGNMENT\n");
std::ostringstream warnings;
try
{
sscl::EnvKvStore store({envFile}, warnings);
FAIL() << "Expected malformed env file to throw.";
}
catch (const std::runtime_error &e)
{
std::string message = e.what();
EXPECT_NE(message.find(envFile.string()), std::string::npos);
EXPECT_NE(message.find(":1:"), std::string::npos);
}
}
+71
View File
@@ -0,0 +1,71 @@
#ifndef SPINSCALE_TEST_SUPPORT_BAKED_DEVICE_CATALOG_H
#define SPINSCALE_TEST_SUPPORT_BAKED_DEVICE_CATALOG_H
#include <cstddef>
#include <optional>
#include <string>
#include <vector>
#include <bakedCameraProfiles.h>
namespace sscl::tests {
inline std::vector<const test_fixtures::BakedCameraProfile *>
profilesForMachine(const char *machineTag)
{
std::vector<const test_fixtures::BakedCameraProfile *> matches;
for (std::size_t i = 0; i < test_fixtures::bakedCameraProfileCount; ++i)
{
const test_fixtures::BakedCameraProfile& profile =
test_fixtures::bakedCameraProfiles[i];
if (std::string(profile.machineTag) == machineTag) {
matches.push_back(&profile);
}
}
return matches;
}
inline std::optional<const test_fixtures::BakedCameraProfile *>
findProfileByTag(const char *machineTag, const char *profileTag)
{
for (std::size_t i = 0; i < test_fixtures::bakedCameraProfileCount; ++i)
{
const test_fixtures::BakedCameraProfile& profile =
test_fixtures::bakedCameraProfiles[i];
if (std::string(profile.machineTag) == machineTag
&& std::string(profile.profileTag) == profileTag)
{
return &profile;
}
}
return std::nullopt;
}
inline std::vector<const test_fixtures::BakedCameraProfile *>
requiredProfilesForMachine(const char *machineTag)
{
std::vector<const test_fixtures::BakedCameraProfile *> matches;
for (std::size_t i = 0; i < test_fixtures::bakedCameraProfileCount; ++i)
{
const test_fixtures::BakedCameraProfile& profile =
test_fixtures::bakedCameraProfiles[i];
if (std::string(profile.machineTag) == machineTag
&& profile.requiredOnMachine)
{
matches.push_back(&profile);
}
}
return matches;
}
} // namespace sscl::tests
#endif // SPINSCALE_TEST_SUPPORT_BAKED_DEVICE_CATALOG_H
+38
View File
@@ -0,0 +1,38 @@
#ifndef SPINSCALE_TEST_SUPPORT_COROUTINE_DRIVER_H
#define SPINSCALE_TEST_SUPPORT_COROUTINE_DRIVER_H
#include <exception>
#include <boost/asio/io_context.hpp>
#include <support/threadHarness.h>
namespace sscl::tests {
class CoroutineDriver
{
public:
template <typename Invoker>
static auto completedReturnValue(Invoker &invoker)
{
if (invoker.completedReturnValues().myExceptionPtr) {
std::rethrow_exception(
invoker.completedReturnValues().myExceptionPtr);
}
return invoker.completedReturnValues().myReturnValue;
}
template <typename Invoker>
static auto pumpUntilIdleAndReturnValue(
boost::asio::io_context &ioContext,
Invoker &invoker)
{
IoContextPump::pumpUntilIdle(ioContext);
return completedReturnValue(invoker);
}
};
} // namespace sscl::tests
#endif // SPINSCALE_TEST_SUPPORT_COROUTINE_DRIVER_H
+63
View File
@@ -0,0 +1,63 @@
#ifndef SPINSCALE_TEST_SUPPORT_EXCEPTION_ASSERTIONS_H
#define SPINSCALE_TEST_SUPPORT_EXCEPTION_ASSERTIONS_H
#include <exception>
#include <stdexcept>
#include <string>
#include <gtest/gtest.h>
namespace sscl::tests {
inline void requireExceptionMessageContains(
const std::exception &exception,
const std::string &expectedSubstring)
{
const std::string message = exception.what();
if (message.find(expectedSubstring) == std::string::npos) {
throw std::runtime_error(
"Expected exception message to contain \""
+ expectedSubstring
+ "\", got \""
+ message
+ "\"");
}
}
inline void expectExceptionMessageContains(
const std::exception &exception,
const std::string &expectedSubstring)
{
EXPECT_NO_THROW(
requireExceptionMessageContains(exception, expectedSubstring));
}
inline void requireExceptionPtrMessageContains(
const std::exception_ptr &exceptionPtr,
const std::string &expectedSubstring)
{
try {
std::rethrow_exception(exceptionPtr);
}
catch (const std::exception &exception) {
requireExceptionMessageContains(exception, expectedSubstring);
return;
}
catch (...) {
throw std::runtime_error("Expected std::exception in exception_ptr");
}
}
inline void expectExceptionPtrMessageContains(
const std::exception_ptr &exceptionPtr,
const std::string &expectedSubstring)
{
EXPECT_NO_THROW(
requireExceptionPtrMessageContains(
exceptionPtr,
expectedSubstring));
}
} // namespace sscl::tests
#endif // SPINSCALE_TEST_SUPPORT_EXCEPTION_ASSERTIONS_H
+177
View File
@@ -0,0 +1,177 @@
#ifndef SPINSCALE_TEST_SUPPORT_GROUP_ASSERTIONS_H
#define SPINSCALE_TEST_SUPPORT_GROUP_ASSERTIONS_H
#include <exception>
#include <stdexcept>
#include <string>
#include <gtest/gtest.h>
#include <spinscale/co/group.h>
namespace sscl::tests {
template <typename Invoker>
int completedIntValue(Invoker &invoker)
{
if (invoker.completedReturnValues().myExceptionPtr) {
std::rethrow_exception(
invoker.completedReturnValues().myExceptionPtr);
}
return invoker.completedReturnValues().myReturnValue;
}
inline void requireCompletedSettlement(
const sscl::co::Group::SettlementDescriptor &descriptor)
{
if (descriptor.type !=
sscl::co::Group::SettlementDescriptor::TypeE::COMPLETED)
{
throw std::runtime_error("Expected completed settlement");
}
}
template <typename Invoker>
void requireCompletedIntSettlement(
const sscl::co::Group::SettlementDescriptor &descriptor,
int expectedValue)
{
requireCompletedSettlement(descriptor);
const int actualValue = completedIntValue(descriptor.invokerAs<Invoker>());
if (actualValue != expectedValue) {
throw std::runtime_error(
"Expected completed settlement value "
+ std::to_string(expectedValue)
+ ", got "
+ std::to_string(actualValue));
}
}
template <typename Invoker>
void expectCompletedIntSettlement(
const sscl::co::Group::SettlementDescriptor &descriptor,
int expectedValue)
{
EXPECT_NO_THROW(
requireCompletedIntSettlement<Invoker>(
descriptor,
expectedValue));
}
inline void expectCompletedSettlement(
const sscl::co::Group::SettlementDescriptor &descriptor)
{
EXPECT_NO_THROW(requireCompletedSettlement(descriptor));
}
inline void requireExceptionSettlement(
const sscl::co::Group::SettlementDescriptor &descriptor)
{
if (descriptor.type !=
sscl::co::Group::SettlementDescriptor::TypeE::EXCEPTION_THROWN)
{
throw std::runtime_error("Expected exception settlement");
}
if (!descriptor.calleeException) {
throw std::runtime_error("Expected exception pointer in settlement");
}
}
inline void expectExceptionSettlement(
const sscl::co::Group::SettlementDescriptor &descriptor)
{
EXPECT_NO_THROW(requireExceptionSettlement(descriptor));
}
inline void requireRuntimeErrorSettlement(
const sscl::co::Group::SettlementDescriptor &descriptor,
const std::string &expectedMessage)
{
requireExceptionSettlement(descriptor);
try {
std::rethrow_exception(descriptor.calleeException);
}
catch (const std::runtime_error &runtimeError) {
const std::string actualMessage = runtimeError.what();
if (actualMessage != expectedMessage) {
throw std::runtime_error(
"Expected runtime_error settlement message \""
+ expectedMessage
+ "\", got \""
+ actualMessage
+ "\"");
}
return;
}
catch (...) {
throw std::runtime_error("Expected std::runtime_error settlement");
}
}
inline void requireIntExceptionSettlement(
const sscl::co::Group::SettlementDescriptor &descriptor,
int expectedValue)
{
requireExceptionSettlement(descriptor);
try {
std::rethrow_exception(descriptor.calleeException);
}
catch (int caughtValue) {
if (caughtValue != expectedValue) {
throw std::runtime_error(
"Expected int exception settlement value "
+ std::to_string(expectedValue)
+ ", got "
+ std::to_string(caughtValue));
}
return;
}
catch (...) {
throw std::runtime_error("Expected int exception settlement");
}
}
inline void expectIntExceptionSettlement(
const sscl::co::Group::SettlementDescriptor &descriptor,
int expectedValue)
{
EXPECT_NO_THROW(
requireIntExceptionSettlement(
descriptor,
expectedValue));
}
inline void expectRuntimeErrorSettlement(
const sscl::co::Group::SettlementDescriptor &descriptor,
const std::string &expectedMessage)
{
EXPECT_NO_THROW(
requireRuntimeErrorSettlement(
descriptor,
expectedMessage));
}
inline void requireEmptyGroupError(
const std::runtime_error &runtimeError)
{
constexpr const char *expectedMessage =
"co_await: Group has no member invokers; call add() before awaiting";
if (std::string(runtimeError.what()) != expectedMessage) {
throw std::runtime_error("Unexpected empty group error message");
}
}
inline void expectEmptyGroupError(
const std::runtime_error &runtimeError)
{
EXPECT_NO_THROW(requireEmptyGroupError(runtimeError));
}
} // namespace sscl::tests
#endif // SPINSCALE_TEST_SUPPORT_GROUP_ASSERTIONS_H
+118
View File
@@ -0,0 +1,118 @@
#include <support/probeComponentThread.h>
#include <iostream>
#include <spinscale/component.h>
namespace sscl::tests {
namespace {
constexpr sscl::ThreadId PROBE_PUPPETEER_THREAD_ID = 2;
class ProbeDummyPuppeteerComponent
: public sscl::pptr::PuppeteerComponent
{
public:
explicit ProbeDummyPuppeteerComponent(
const std::shared_ptr<sscl::PuppeteerThread>& componentThreadIn)
: sscl::pptr::PuppeteerComponent(componentThreadIn)
{}
void handleLoopExceptionHook() override
{
std::cerr << "ProbeComponentThreadHarness: puppeteer loop exception\n";
}
};
void probePuppeteerMain(
const sscl::PuppeteerThread::EntryFnArguments& args,
const std::function<void(
const std::shared_ptr<sscl::ComponentThread>&)>& work,
std::promise<std::exception_ptr>& donePromise)
{
sscl::PuppeteerThread& thr = args.usableBeforeJolt;
thr.initializeTls();
sscl::ComponentThread::setPuppeteerThreadId(PROBE_PUPPETEER_THREAD_ID);
std::shared_ptr<sscl::PuppeteerThread> thrPtr =
std::static_pointer_cast<sscl::PuppeteerThread>(thr.shared_from_this());
sscl::ComponentThread::setPuppeteerThread(thrPtr);
try {
work(thrPtr);
donePromise.set_value(nullptr);
}
catch (...) {
donePromise.set_value(std::current_exception());
}
thr.getIoContext().stop();
}
} // namespace
ProbeComponentThreadHarness::ProbeComponentThreadHarness(
const char *threadName)
: threadName(threadName),
dummyComponent(std::make_shared<ProbeDummyPuppeteerComponent>(
std::shared_ptr<sscl::PuppeteerThread>()))
{}
ProbeComponentThreadHarness::~ProbeComponentThreadHarness() = default;
std::shared_ptr<sscl::ComponentThread>
ProbeComponentThreadHarness::componentThread() const
{
return lastComponentThread;
}
void ProbeComponentThreadHarness::runSync(
const std::function<void(
const std::shared_ptr<sscl::ComponentThread>&)>& work)
{
std::promise<std::exception_ptr> donePromise;
std::future<std::exception_ptr> doneFuture = donePromise.get_future();
std::shared_ptr<sscl::PuppeteerThread> runThread =
std::make_shared<sscl::PuppeteerThread>(
PROBE_PUPPETEER_THREAD_ID,
threadName,
[&work, &donePromise](
const sscl::PuppeteerThread::EntryFnArguments& args)
{
probePuppeteerMain(args, work, donePromise);
},
*dummyComponent,
nullptr);
dummyComponent->thread = runThread;
lastComponentThread = runThread;
runThread->thread.join();
std::exception_ptr probeException = doneFuture.get();
if (probeException) {
std::rethrow_exception(probeException);
}
}
void runNonViralNurseryOnComponentThread(
const std::shared_ptr<sscl::ComponentThread>& componentThread,
std::function<sscl::co::NonViralNonPostingInvoker(
sscl::co::NonViralTaskNursery::Slot::Lease&)> invokerFactory,
std::chrono::milliseconds timeout)
{
(void)timeout;
sscl::co::NonViralTaskNursery nursery;
nursery.openAdmission();
nursery.launch(
[&invokerFactory](sscl::co::NonViralTaskNursery::Slot::Lease& lease)
{
return invokerFactory(lease);
});
nursery.closeAdmission();
nursery.syncAwaitAllSettlements(componentThread->getIoContext());
}
} // namespace sscl::tests
+66
View File
@@ -0,0 +1,66 @@
#ifndef SPINSCALE_TEST_SUPPORT_PROBE_COMPONENT_THREAD_H
#define SPINSCALE_TEST_SUPPORT_PROBE_COMPONENT_THREAD_H
#include <chrono>
#include <exception>
#include <functional>
#include <future>
#include <memory>
#include <stdexcept>
#include <spinscale/componentThread.h>
#include <spinscale/co/invokers.h>
#include <spinscale/co/nonViralTaskNursery.h>
namespace sscl::tests {
constexpr std::chrono::milliseconds defaultProbeTaskTimeout{10000};
void runNonViralNurseryOnComponentThread(
const std::shared_ptr<sscl::ComponentThread>& componentThread,
std::function<sscl::co::NonViralNonPostingInvoker(
sscl::co::NonViralTaskNursery::Slot::Lease&)> invokerFactory,
std::chrono::milliseconds timeout = defaultProbeTaskTimeout);
class ProbeComponentThreadHarness
{
public:
explicit ProbeComponentThreadHarness(
const char *threadName = "spinscale-probe");
~ProbeComponentThreadHarness();
ProbeComponentThreadHarness(const ProbeComponentThreadHarness &) = delete;
ProbeComponentThreadHarness &operator=(
const ProbeComponentThreadHarness &) = delete;
std::shared_ptr<sscl::ComponentThread> componentThread() const;
void runSync(
const std::function<void(
const std::shared_ptr<sscl::ComponentThread>&)>& work);
template <typename InvokerFactory>
void runNonViralNurseryTask(
InvokerFactory &&invokerFactory,
std::chrono::milliseconds timeout = defaultProbeTaskTimeout)
{
runSync(
[this, &invokerFactory, timeout](
const std::shared_ptr<sscl::ComponentThread>& componentThread)
{
sscl::tests::runNonViralNurseryOnComponentThread(
componentThread,
std::forward<InvokerFactory>(invokerFactory),
timeout);
});
}
private:
std::string threadName;
std::shared_ptr<sscl::pptr::PuppeteerComponent> dummyComponent;
std::shared_ptr<sscl::ComponentThread> lastComponentThread;
};
} // namespace sscl::tests
#endif // SPINSCALE_TEST_SUPPORT_PROBE_COMPONENT_THREAD_H
+455
View File
@@ -0,0 +1,455 @@
#include <support/threadHarness.h>
#include <cstdlib>
#include <iostream>
namespace sscl::tests {
struct DedicatedIoThread::StartupState
{
std::mutex mutex;
std::condition_variable condition;
std::thread::id osThreadId;
std::exception_ptr startupException;
bool allowInitialization = false;
bool initialized = false;
};
namespace {
constexpr const char *callerThreadName = "test:caller";
constexpr const char *calleeThreadName = "test:callee";
constexpr const char *alternateThreadName = "test:alternate";
constexpr const char *bodyThreadName = "test:body";
constexpr const char *worldThreadName = "test:world";
constexpr const char *legThreadName = "test:leg";
void runDedicatedThread(
const std::shared_ptr<DedicatedIoThread::StartupState> &state,
const sscl::PuppeteerThread::EntryFnArguments &args)
{
{
std::unique_lock<std::mutex> lock(state->mutex);
state->condition.wait(
lock,
[&state]() { return state->allowInitialization; });
}
try
{
args.usableBeforeJolt.initializeTls();
{
std::lock_guard<std::mutex> guard(state->mutex);
state->osThreadId = std::this_thread::get_id();
state->initialized = true;
}
state->condition.notify_all();
args.usableBeforeJolt.getIoContext().restart();
args.usableBeforeJolt.getIoContext().run();
}
catch (...)
{
{
std::lock_guard<std::mutex> guard(state->mutex);
state->startupException = std::current_exception();
state->initialized = true;
}
state->condition.notify_all();
}
}
} // namespace
std::string threadRoleName(PostingThreadRole role)
{
switch (role)
{
case PostingThreadRole::CALLER:
return callerThreadName;
case PostingThreadRole::CALLEE:
return calleeThreadName;
case PostingThreadRole::ALTERNATE:
return alternateThreadName;
case PostingThreadRole::BODY:
return bodyThreadName;
case PostingThreadRole::WORLD:
return worldThreadName;
case PostingThreadRole::LEG:
return legThreadName;
}
throw std::runtime_error("Unknown PostingThreadRole");
}
void IoContextPump::pumpUntilIdle(
boost::asio::io_context &ioContext,
std::chrono::milliseconds idleTimeout,
std::chrono::milliseconds totalTimeout)
{
const auto totalDeadline =
std::chrono::steady_clock::now() + totalTimeout;
auto lastProgress = std::chrono::steady_clock::now();
while (std::chrono::steady_clock::now() < totalDeadline)
{
if (ioContext.poll_one() > 0)
{
lastProgress = std::chrono::steady_clock::now();
continue;
}
if (std::chrono::steady_clock::now() - lastProgress >= idleTimeout) {
return;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
ThreadBoundComponent::ThreadBoundComponent()
: sscl::pptr::PuppeteerComponent(nullptr)
{
}
void ThreadBoundComponent::handleLoopExceptionHook()
{
loopException = std::current_exception();
}
DedicatedIoThread::DedicatedIoThread(PostingThreadRole roleIn)
: role(roleIn),
startupState(std::make_shared<StartupState>()),
component(),
thread(std::make_shared<sscl::PuppeteerThread>(
static_cast<sscl::ThreadId>(roleIn),
threadRoleName(roleIn),
[state = startupState](
const sscl::PuppeteerThread::EntryFnArguments &args)
{
runDedicatedThread(state, args);
},
component,
nullptr))
{
component.thread = thread;
releaseStartupBarrier();
waitUntilInitialized();
}
DedicatedIoThread::~DedicatedIoThread()
{
stopAndJoin();
}
boost::asio::io_context &DedicatedIoThread::ioContext()
{
return thread->getIoContext();
}
sscl::ThreadId DedicatedIoThread::threadId() const noexcept
{
return static_cast<sscl::ThreadId>(role);
}
std::thread::id DedicatedIoThread::osThreadId() const
{
std::lock_guard<std::mutex> guard(startupState->mutex);
return startupState->osThreadId;
}
std::shared_ptr<sscl::PuppeteerThread> DedicatedIoThread::componentThread() const
{
return thread;
}
void DedicatedIoThread::stopAndJoin()
{
if (!thread) {
return;
}
releaseStartupBarrier();
thread->getIoContext().stop();
if (thread->thread.joinable()) {
thread->thread.join();
}
thread.reset();
}
void DedicatedIoThread::releaseStartupBarrier()
{
{
std::lock_guard<std::mutex> guard(startupState->mutex);
startupState->allowInitialization = true;
}
startupState->condition.notify_all();
}
void DedicatedIoThread::waitUntilInitialized()
{
std::unique_lock<std::mutex> lock(startupState->mutex);
const bool initialized = startupState->condition.wait_for(
lock,
defaultPostingTaskTimeout,
[this]() { return startupState->initialized; });
if (!initialized) {
throw std::runtime_error("Timed out waiting for test thread startup");
}
std::exception_ptr startupException = startupState->startupException;
lock.unlock();
if (startupException) {
std::rethrow_exception(startupException);
}
}
void ThreadRegistry::registerThread(
PostingThreadRole role,
DedicatedIoThread &thread)
{
std::lock_guard<std::mutex> guard(registryMutex());
auto [iterator, inserted] = threadsByRole().emplace(role, &thread);
if (!inserted) {
throw std::runtime_error(
"Test thread role already registered for " + threadRoleName(role));
}
}
void ThreadRegistry::unregisterThread(
PostingThreadRole role,
DedicatedIoThread &expectedThread)
{
std::lock_guard<std::mutex> guard(registryMutex());
auto iterator = threadsByRole().find(role);
if (iterator == threadsByRole().end()) {
return;
}
if (iterator->second != &expectedThread) {
throw std::runtime_error(
"Test thread role registered to a different thread for "
+ threadRoleName(role));
}
threadsByRole().erase(iterator);
}
boost::asio::io_context &ThreadRegistry::ioContext(PostingThreadRole role)
{
std::lock_guard<std::mutex> guard(registryMutex());
auto iterator = threadsByRole().find(role);
if (iterator == threadsByRole().end()) {
throw std::runtime_error(
"No test thread registered for " + threadRoleName(role));
}
return iterator->second->ioContext();
}
std::thread::id ThreadRegistry::osThreadId(PostingThreadRole role)
{
std::lock_guard<std::mutex> guard(registryMutex());
auto iterator = threadsByRole().find(role);
if (iterator == threadsByRole().end()) {
throw std::runtime_error(
"No test thread registered for " + threadRoleName(role));
}
return iterator->second->osThreadId();
}
std::mutex &ThreadRegistry::registryMutex()
{
static std::mutex mutex;
return mutex;
}
std::map<PostingThreadRole, DedicatedIoThread *> &
ThreadRegistry::threadsByRole()
{
static std::map<PostingThreadRole, DedicatedIoThread *> threads;
return threads;
}
PostingThreadSet::PostingThreadSet()
: callerThread(PostingThreadRole::CALLER),
calleeThread(PostingThreadRole::CALLEE),
alternateThread(PostingThreadRole::ALTERNATE),
bodyThread(PostingThreadRole::BODY),
worldThread(PostingThreadRole::WORLD),
legThread(PostingThreadRole::LEG)
{
previousPuppeteerThread = sscl::ComponentThread::getPptr();
previousPuppeteerThreadId = sscl::pptr::puppeteerThreadId;
registerAllThreads();
installCallerAsPuppeteer();
}
PostingThreadSet::~PostingThreadSet()
{
restorePreviousPuppeteer();
unregisterAllThreads();
}
void PostingThreadSet::registerAllThreads()
{
ThreadRegistry::registerThread(PostingThreadRole::CALLER, callerThread);
ThreadRegistry::registerThread(PostingThreadRole::CALLEE, calleeThread);
ThreadRegistry::registerThread(PostingThreadRole::ALTERNATE, alternateThread);
ThreadRegistry::registerThread(PostingThreadRole::BODY, bodyThread);
ThreadRegistry::registerThread(PostingThreadRole::WORLD, worldThread);
ThreadRegistry::registerThread(PostingThreadRole::LEG, legThread);
}
void PostingThreadSet::unregisterAllThreads()
{
ThreadRegistry::unregisterThread(PostingThreadRole::CALLER, callerThread);
ThreadRegistry::unregisterThread(PostingThreadRole::CALLEE, calleeThread);
ThreadRegistry::unregisterThread(
PostingThreadRole::ALTERNATE,
alternateThread);
ThreadRegistry::unregisterThread(PostingThreadRole::BODY, bodyThread);
ThreadRegistry::unregisterThread(PostingThreadRole::WORLD, worldThread);
ThreadRegistry::unregisterThread(PostingThreadRole::LEG, legThread);
}
void PostingThreadSet::installCallerAsPuppeteer()
{
sscl::ComponentThread::setPuppeteerThreadId(
static_cast<sscl::ThreadId>(PostingThreadRole::CALLER));
sscl::ComponentThread::setPuppeteerThread(callerThread.componentThread());
}
void PostingThreadSet::restorePreviousPuppeteer()
{
sscl::ComponentThread::setPuppeteerThreadId(previousPuppeteerThreadId);
sscl::ComponentThread::setPuppeteerThread(previousPuppeteerThread);
}
DedicatedIoThread &PostingThreadSet::thread(PostingThreadRole role)
{
switch (role)
{
case PostingThreadRole::CALLER:
return callerThread;
case PostingThreadRole::CALLEE:
return calleeThread;
case PostingThreadRole::ALTERNATE:
return alternateThread;
case PostingThreadRole::BODY:
return bodyThread;
case PostingThreadRole::WORLD:
return worldThread;
case PostingThreadRole::LEG:
return legThread;
}
throw std::runtime_error("Unknown PostingThreadRole");
}
DedicatedIoThread &PostingThreadSet::caller()
{
return callerThread;
}
DedicatedIoThread &PostingThreadSet::callee()
{
return calleeThread;
}
DedicatedIoThread &PostingThreadSet::alternate()
{
return alternateThread;
}
DedicatedIoThread &PostingThreadSet::body()
{
return bodyThread;
}
DedicatedIoThread &PostingThreadSet::world()
{
return worldThread;
}
DedicatedIoThread &PostingThreadSet::leg()
{
return legThread;
}
void CrossThreadTrace::recordConstructionThread()
{
record(constructionThreadId);
}
void CrossThreadTrace::recordCalleeExecutionThread()
{
record(calleeExecutionThreadId);
}
void CrossThreadTrace::recordFinalSuspendThread()
{
record(finalSuspendThreadId);
}
void CrossThreadTrace::recordAwaitResumeThread()
{
record(awaitResumeThreadId);
}
void CrossThreadTrace::recordCompletionCallbackThread()
{
record(completionCallbackThreadId);
}
std::thread::id CrossThreadTrace::constructionThread() const
{
return read(constructionThreadId);
}
std::thread::id CrossThreadTrace::calleeExecutionThread() const
{
return read(calleeExecutionThreadId);
}
std::thread::id CrossThreadTrace::finalSuspendThread() const
{
return read(finalSuspendThreadId);
}
std::thread::id CrossThreadTrace::awaitResumeThread() const
{
return read(awaitResumeThreadId);
}
std::thread::id CrossThreadTrace::completionCallbackThread() const
{
return read(completionCallbackThreadId);
}
void CrossThreadTrace::record(std::thread::id &slot)
{
std::lock_guard<std::mutex> guard(mutex);
slot = std::this_thread::get_id();
}
std::thread::id CrossThreadTrace::read(const std::thread::id &slot) const
{
std::lock_guard<std::mutex> guard(mutex);
return slot;
}
} // namespace sscl::tests
+378
View File
@@ -0,0 +1,378 @@
#ifndef SPINSCALE_TEST_SUPPORT_THREAD_HARNESS_H
#define SPINSCALE_TEST_SUPPORT_THREAD_HARNESS_H
#include <chrono>
#include <condition_variable>
#include <exception>
#include <functional>
#include <future>
#include <map>
#include <memory>
#include <mutex>
#include <optional>
#include <stdexcept>
#include <string>
#include <thread>
#include <type_traits>
#include <utility>
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
#include <spinscale/co/invokers.h>
#include <spinscale/co/postingPromise.h>
#include <spinscale/component.h>
#include <spinscale/componentThread.h>
namespace sscl::tests {
constexpr std::chrono::milliseconds defaultIdleTimeout{800};
constexpr std::chrono::milliseconds defaultTotalTimeout{10000};
constexpr std::chrono::milliseconds defaultPostingTaskTimeout{10000};
enum class PostingThreadRole : sscl::ThreadId
{
CALLER = 70,
CALLEE = 71,
ALTERNATE = 72,
BODY = 73,
WORLD = 74,
LEG = 75,
};
std::string threadRoleName(PostingThreadRole role);
class IoContextPump
{
public:
static void pumpUntilIdle(
boost::asio::io_context &ioContext,
std::chrono::milliseconds idleTimeout = defaultIdleTimeout,
std::chrono::milliseconds totalTimeout = defaultTotalTimeout);
template <typename Predicate>
static bool pumpUntil(
boost::asio::io_context &ioContext,
Predicate &&predicate,
std::chrono::milliseconds idleTimeout = defaultIdleTimeout,
std::chrono::milliseconds totalTimeout = defaultTotalTimeout)
{
const auto totalDeadline =
std::chrono::steady_clock::now() + totalTimeout;
auto lastProgress = std::chrono::steady_clock::now();
while (std::chrono::steady_clock::now() < totalDeadline)
{
if (std::invoke(predicate)) {
return true;
}
if (ioContext.poll_one() > 0)
{
lastProgress = std::chrono::steady_clock::now();
continue;
}
if (std::chrono::steady_clock::now() - lastProgress >= idleTimeout) {
return std::invoke(predicate);
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
return std::invoke(predicate);
}
};
class ThreadBoundComponent final
: public sscl::pptr::PuppeteerComponent
{
public:
ThreadBoundComponent();
void handleLoopExceptionHook() override;
std::exception_ptr loopException;
};
class DedicatedIoThread
{
public:
explicit DedicatedIoThread(PostingThreadRole role);
~DedicatedIoThread();
DedicatedIoThread(const DedicatedIoThread &) = delete;
DedicatedIoThread &operator=(const DedicatedIoThread &) = delete;
DedicatedIoThread(DedicatedIoThread &&) = delete;
DedicatedIoThread &operator=(DedicatedIoThread &&) = delete;
boost::asio::io_context &ioContext();
sscl::ThreadId threadId() const noexcept;
std::thread::id osThreadId() const;
std::shared_ptr<sscl::PuppeteerThread> componentThread() const;
void stopAndJoin();
struct StartupState;
template <typename Function>
void post(Function &&function)
{
boost::asio::post(
ioContext(),
std::forward<Function>(function));
}
template <typename Function>
auto runSync(Function &&function)
-> std::invoke_result_t<Function &>
{
using Result = std::invoke_result_t<Function &>;
if (std::this_thread::get_id() == osThreadId()) {
if constexpr (std::is_void_v<Result>) {
std::invoke(function);
return;
} else {
return std::invoke(function);
}
}
auto promise = std::make_shared<std::promise<Result>>();
auto future = promise->get_future();
post(
[promise, function = std::forward<Function>(function)]() mutable
{
try
{
if constexpr (std::is_void_v<Result>)
{
std::invoke(function);
promise->set_value();
}
else
{
promise->set_value(std::invoke(function));
}
}
catch (...)
{
promise->set_exception(std::current_exception());
}
});
return future.get();
}
private:
void releaseStartupBarrier();
void waitUntilInitialized();
PostingThreadRole role;
std::shared_ptr<StartupState> startupState;
ThreadBoundComponent component;
std::shared_ptr<sscl::PuppeteerThread> thread;
};
class ThreadRegistry
{
public:
static void registerThread(
PostingThreadRole role,
DedicatedIoThread &thread);
static void unregisterThread(
PostingThreadRole role,
DedicatedIoThread &expectedThread);
static boost::asio::io_context &ioContext(PostingThreadRole role);
static std::thread::id osThreadId(PostingThreadRole role);
private:
static std::mutex &registryMutex();
static std::map<PostingThreadRole, DedicatedIoThread *> &threadsByRole();
};
template <PostingThreadRole role>
struct PostingThreadTag
{
static boost::asio::io_context &io_context()
{
return ThreadRegistry::ioContext(role);
}
};
template <PostingThreadRole role, typename T>
using RolePostingPromise =
sscl::co::TaggedPostingPromise<T, PostingThreadTag<role>>;
template <PostingThreadRole role>
struct RolePostingPromiseTemplate
{
template <typename T>
using Type = RolePostingPromise<role, T>;
};
template <PostingThreadRole role, typename T>
using RoleViralPostingInvoker =
sscl::co::ViralPostingInvoker<
RolePostingPromiseTemplate<role>::template Type,
T>;
template <PostingThreadRole role>
using RoleNonViralPostingInvoker =
sscl::co::NonViralPostingInvoker<
RolePostingPromiseTemplate<role>::template Type>;
class PostingThreadSet
{
public:
PostingThreadSet();
~PostingThreadSet();
PostingThreadSet(const PostingThreadSet &) = delete;
PostingThreadSet &operator=(const PostingThreadSet &) = delete;
PostingThreadSet(PostingThreadSet &&) = delete;
PostingThreadSet &operator=(PostingThreadSet &&) = delete;
DedicatedIoThread &thread(PostingThreadRole role);
DedicatedIoThread &caller();
DedicatedIoThread &callee();
DedicatedIoThread &alternate();
DedicatedIoThread &body();
DedicatedIoThread &world();
DedicatedIoThread &leg();
private:
void registerAllThreads();
void unregisterAllThreads();
void installCallerAsPuppeteer();
void restorePreviousPuppeteer();
DedicatedIoThread callerThread;
DedicatedIoThread calleeThread;
DedicatedIoThread alternateThread;
DedicatedIoThread bodyThread;
DedicatedIoThread worldThread;
DedicatedIoThread legThread;
std::shared_ptr<sscl::PuppeteerThread> previousPuppeteerThread;
sscl::ThreadId previousPuppeteerThreadId = 0;
};
template <typename Function>
auto RunOnThread(DedicatedIoThread &thread, Function &&function)
-> std::invoke_result_t<Function &>
{
return thread.runSync(std::forward<Function>(function));
}
class CrossThreadTrace
{
public:
void recordConstructionThread();
void recordCalleeExecutionThread();
void recordFinalSuspendThread();
void recordAwaitResumeThread();
void recordCompletionCallbackThread();
std::thread::id constructionThread() const;
std::thread::id calleeExecutionThread() const;
std::thread::id finalSuspendThread() const;
std::thread::id awaitResumeThread() const;
std::thread::id completionCallbackThread() const;
private:
void record(std::thread::id &slot);
std::thread::id read(const std::thread::id &slot) const;
mutable std::mutex mutex;
std::thread::id constructionThreadId;
std::thread::id calleeExecutionThreadId;
std::thread::id finalSuspendThreadId;
std::thread::id awaitResumeThreadId;
std::thread::id completionCallbackThreadId;
};
template <typename InvokerFactory>
void runNonViralPostingTask(
DedicatedIoThread &callerThread,
InvokerFactory &&invokerFactory,
std::chrono::milliseconds timeout = defaultPostingTaskTimeout)
{
using Factory = std::decay_t<InvokerFactory>;
using Invoker = std::invoke_result_t<
Factory &, std::exception_ptr &, std::function<void()>>;
struct TaskState
{
explicit TaskState(Factory factoryIn)
: factory(std::move(factoryIn))
{}
Factory factory;
std::exception_ptr coroutineException;
std::exception_ptr taskException;
std::optional<Invoker> invoker;
std::mutex mutex;
std::condition_variable condition;
bool completed = false;
};
auto taskState = std::make_shared<TaskState>(
std::forward<InvokerFactory>(invokerFactory));
callerThread.post(
[taskState]()
{
auto completeTask = [taskState]()
{
taskState->taskException = taskState->coroutineException;
taskState->invoker.reset();
{
std::lock_guard<std::mutex> guard(taskState->mutex);
taskState->completed = true;
}
taskState->condition.notify_one();
};
try
{
taskState->invoker.emplace(
std::invoke(
taskState->factory,
taskState->coroutineException,
std::move(completeTask)));
}
catch (...)
{
{
std::lock_guard<std::mutex> guard(taskState->mutex);
taskState->taskException = std::current_exception();
taskState->completed = true;
}
taskState->condition.notify_one();
}
});
std::unique_lock<std::mutex> lock(taskState->mutex);
const bool completed = taskState->condition.wait_for(
lock,
timeout,
[&taskState]() { return taskState->completed; });
if (!completed) {
throw std::runtime_error("Timed out waiting for posting coroutine task");
}
std::exception_ptr taskException = taskState->taskException;
lock.unlock();
if (taskException) {
std::rethrow_exception(taskException);
}
}
} // namespace sscl::tests
#endif // SPINSCALE_TEST_SUPPORT_THREAD_HARNESS_H
+161
View File
@@ -0,0 +1,161 @@
#ifndef SPINSCALE_TEST_SUPPORT_TIMER_AWAITERS_H
#define SPINSCALE_TEST_SUPPORT_TIMER_AWAITERS_H
#include <coroutine>
#include <memory>
#include <mutex>
#include <optional>
#include <stdexcept>
#include <string>
#include <unordered_map>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/system/error_code.hpp>
namespace sscl::tests {
using SharedDeadlineTimer = std::shared_ptr<boost::asio::deadline_timer>;
class CancelableDeadlineTimerRegistry
{
public:
void clear()
{
std::lock_guard<std::mutex> guard(mutex);
timersByLabel.clear();
}
void registerTimer(
int labelMilliseconds,
const SharedDeadlineTimer &timer)
{
std::lock_guard<std::mutex> guard(mutex);
timersByLabel[labelMilliseconds] = timer;
}
void cancel(int labelMilliseconds)
{
std::lock_guard<std::mutex> guard(mutex);
const auto iterator = timersByLabel.find(labelMilliseconds);
if (iterator == timersByLabel.end()) {
throw std::runtime_error(
"No cancelable deadline_timer registered for label "
+ std::to_string(labelMilliseconds));
}
const SharedDeadlineTimer timer = iterator->second.lock();
if (!timer) {
throw std::runtime_error(
"Cancelable deadline_timer expired before cancel for label "
+ std::to_string(labelMilliseconds));
}
timer->cancel();
}
private:
std::mutex mutex;
std::unordered_map<int, std::weak_ptr<boost::asio::deadline_timer>>
timersByLabel;
};
struct DeadlineTimerAwaiter
{
DeadlineTimerAwaiter(
boost::asio::io_context &ioContext,
int delayMilliseconds)
: timer(std::make_shared<boost::asio::deadline_timer>(ioContext))
{
start(delayMilliseconds);
}
DeadlineTimerAwaiter(
SharedDeadlineTimer sharedTimer,
int delayMilliseconds)
: timer(std::move(sharedTimer))
{
start(delayMilliseconds);
}
bool await_ready() const noexcept
{ return waitCompleted; }
bool await_suspend(std::coroutine_handle<> handle) noexcept
{
resumeHandle = handle;
return !waitCompleted;
}
boost::system::error_code await_resume() const noexcept
{ return completionErrorCode; }
private:
void start(int delayMilliseconds)
{
timer->expires_from_now(
boost::posix_time::milliseconds(delayMilliseconds));
timer->async_wait(
[this](const boost::system::error_code &errorCode)
{
completionErrorCode = errorCode;
waitCompleted = true;
if (resumeHandle) {
resumeHandle.resume();
}
});
}
SharedDeadlineTimer timer;
boost::system::error_code completionErrorCode;
bool waitCompleted = false;
std::coroutine_handle<> resumeHandle;
};
struct RegisteredDeadlineTimerAwaiter
{
RegisteredDeadlineTimerAwaiter(
boost::asio::io_context &ioContext,
int delayMilliseconds,
int registrationLabelMilliseconds,
CancelableDeadlineTimerRegistry &registry)
: timer(std::make_shared<boost::asio::deadline_timer>(ioContext))
{
registry.registerTimer(registrationLabelMilliseconds, timer);
waiter.emplace(timer, delayMilliseconds);
}
bool await_ready() const noexcept
{ return waiter->await_ready(); }
bool await_suspend(std::coroutine_handle<> handle) noexcept
{ return waiter->await_suspend(handle); }
boost::system::error_code await_resume() const noexcept
{ return waiter->await_resume(); }
SharedDeadlineTimer timer;
std::optional<DeadlineTimerAwaiter> waiter;
};
inline void throwIfTimerWaitFailed(
const boost::system::error_code &waitError)
{
if (waitError) {
throw std::runtime_error(
"deadline_timer wait failed: " + waitError.message());
}
}
inline bool timerWasCanceled(const boost::system::error_code &waitError)
{
return waitError == boost::asio::error::operation_aborted;
}
} // namespace sscl::tests
#endif // SPINSCALE_TEST_SUPPORT_TIMER_AWAITERS_H