diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt new file mode 100644 index 0000000..956f743 --- /dev/null +++ b/cpp/CMakeLists.txt @@ -0,0 +1,22 @@ +cmake_minimum_required(VERSION 3.13) + +set(LINKS_PLATFORM_TESTS OFF CACHE BOOL "Whether to compile tests") +set(LINKS_PLATFORM_EXTRA_FLAGS "" CACHE STRING "Extra compiler flags") + +project(Platform.Threading CXX) +include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake) +conan_basic_setup(TARGETS) + +list(APPEND LINKS_PLATFORM_EXTRA_FLAGS ${CONAN_USER_PLATFORM.HASHING_suggested_flags}) + +add_library(${PROJECT_NAME}.Library INTERFACE) +target_include_directories(${PROJECT_NAME}.Library INTERFACE ${PROJECT_NAME}) +target_link_libraries(${PROJECT_NAME}.Library INTERFACE CONAN_PKG::platform.collections) +target_compile_options(${PROJECT_NAME}.Library INTERFACE ${LINKS_PLATFORM_EXTRA_FLAGS}) + +if(${LINKS_PLATFORM_TESTS}) + add_executable(${PROJECT_NAME}.Tests ${PROJECT_NAME}.Tests/AllTests.cpp) + set_target_properties(${PROJECT_NAME}.Tests PROPERTIES CXX_STANDARD 20) + target_link_libraries(${PROJECT_NAME}.Tests PRIVATE CONAN_PKG::gtest) + target_link_libraries(${PROJECT_NAME}.Tests PRIVATE ${PROJECT_NAME}.Library) +endif() \ No newline at end of file diff --git a/cpp/Platform.Threading.Tests/AllTests.cpp b/cpp/Platform.Threading.Tests/AllTests.cpp new file mode 100644 index 0000000..f57ede8 --- /dev/null +++ b/cpp/Platform.Threading.Tests/AllTests.cpp @@ -0,0 +1,5 @@ +#include +#include "Platform.Threading.h" + +#include "Await.cpp" +#include "Sync.cpp" diff --git a/cpp/Platform.Threading.Tests/Await.cpp b/cpp/Platform.Threading.Tests/Await.cpp new file mode 100644 index 0000000..6e8d546 --- /dev/null +++ b/cpp/Platform.Threading.Tests/Await.cpp @@ -0,0 +1,19 @@ +namespace Platform::Threading::Tests +{ + TEST(Await, One) + { + using namespace Synchronization; + using namespace std::chrono_literals; + + auto queue = std::queue>(); + queue.push(std::async([]() { std::this_thread::sleep_for(50ms); std::cout << "I like C++\n"; })); + queue.push(std::async([]() { std::cout << "C# it's Go\n"; })); + queue.push(std::async([]() { std::this_thread::sleep_for(100ms); std::cout << "lang\n"; })); + queue.push(std::async([]() { std::cout << "async programming is super\n"; })); + + auto sync_queue = Sync(std::move(queue)); + AwaitOne(sync_queue).wait(); + auto queue_two = Drop(std::move(sync_queue)); + std::cout << "not waited count: " << queue_two.size() << std::endl; + } +} \ No newline at end of file diff --git a/cpp/Platform.Threading.Tests/Sync.cpp b/cpp/Platform.Threading.Tests/Sync.cpp new file mode 100644 index 0000000..24c98a9 --- /dev/null +++ b/cpp/Platform.Threading.Tests/Sync.cpp @@ -0,0 +1,67 @@ +namespace Platform::Threading::Tests +{ + TEST(Synchronization, Sync_Stupid) + { + using namespace Synchronization; + + auto threads_count = 1000; + auto map_operations_count = 1000; + Sync> dict{}; + + auto work = [&] + { + for (int i = 0; i < map_operations_count; i++) + { + dict->operator[](std::to_string(i))++; + } + }; + + std::vector threads; + + for (int i = 0; i < threads_count; i++) + { + threads.push_back(std::thread(work)); + } + + for (auto& thread : threads) { + thread.join(); + } + + for (auto&& [key, value] : Drop(std::move(dict))) { + ASSERT_EQ(value, threads_count); + } + } + + TEST(Synchronization, Explicit) + { + using namespace Synchronization; + + auto threads_count = 1000; + auto map_operations_count = 1000; + + std::map dict{}; + + auto work = [&] + { + for (int i = 0; i < map_operations_count; i++) + { + dict.operator[](std::to_string(i))++; + } + }; + + std::vector threads; + + for (int i = 0; i < threads_count; i++) + { + threads.push_back(std::thread([&] { ExecuteWriteOperation(work); })); + } + + for (auto& thread : threads) { + thread.join(); + } + + for (auto&& [key, value] : dict) { + ASSERT_EQ(value, threads_count); + } + } +} \ No newline at end of file diff --git a/cpp/Platform.Threading.Tests/ThreadHelpersTests.cpp b/cpp/Platform.Threading.Tests/ThreadHelpersTests.cpp deleted file mode 100644 index 3f9115c..0000000 --- a/cpp/Platform.Threading.Tests/ThreadHelpersTests.cpp +++ /dev/null @@ -1,18 +0,0 @@ -namespace Platform::Threading::Tests -{ - TEST_CLASS(ThreadHelpersTests) - { - public: TEST_METHOD(InvokeTest) - { - auto number = 0; - ThreadHelpers.InvokeWithExtendedMaxStackSize([&]()-> auto { return number = 1; }); - Assert::AreEqual(1, number); - ThreadHelpers.InvokeWithExtendedMaxStackSize(2, param { return number = (std::int32_t)param); } - Assert::AreEqual(2, number); - ThreadHelpers.InvokeWithModifiedMaxStackSize([&]()-> auto { return number = 1; }, maxStackSize: 512); - Assert::AreEqual(1, number); - ThreadHelpers.InvokeWithModifiedMaxStackSize(2, param { return number = (std::int32_t)param, maxStackSize: 512); } - Assert::AreEqual(2, number); - } - }; -} diff --git a/cpp/Platform.Threading/ConcurrentQueueExtensions.h b/cpp/Platform.Threading/ConcurrentQueueExtensions.h index a7337a4..db10f16 100644 --- a/cpp/Platform.Threading/ConcurrentQueueExtensions.h +++ b/cpp/Platform.Threading/ConcurrentQueueExtensions.h @@ -1,23 +1,38 @@ namespace Platform::Threading { - class ConcurrentQueueExtensions + template + auto AwaitAll(Synchronization::Sync>>& unsafe_queue) { - public: static async Task AwaitAll(ConcurrentQueue queue) + auto lambda = [&unsafe_queue] { - foreach (auto item in queue.DequeueAll()) + auto locked_queue = *unsafe_queue; + auto& queue = *locked_queue; + while (!queue.empty()) { - await item.ConfigureAwait(continueOnCapturedContext: false); + auto& item = queue.front(); + item.wait(); + queue.pop(); } - } + }; + return std::async(lambda); + } - public: static async Task AwaitOne(ConcurrentQueue queue) + template + auto AwaitOne(Synchronization::Sync>>& unsafe_queue) + { + auto lambda = [&unsafe_queue] { - if (queue.TryDequeue(out Task item)) + auto locked_queue = *unsafe_queue; + auto& queue = *locked_queue; + if (!queue.empty()) { - await item.ConfigureAwait(continueOnCapturedContext: false); + auto& item = queue.front(); + item.wait(); + queue.pop(); } - } + }; + return std::async(lambda); + } - public: static void EnqueueAsRunnedTask(ConcurrentQueue queue, std::function action) { queue.Enqueue(Task.Run(action)); } - }; + // public: static void EnqueueAsRunnedTask(ConcurrentQueue queue, std::function action) { queue.Enqueue(Task.Run(action)); } } diff --git a/cpp/Platform.Threading/Platform.Threading.h b/cpp/Platform.Threading/Platform.Threading.h new file mode 100644 index 0000000..87af370 --- /dev/null +++ b/cpp/Platform.Threading/Platform.Threading.h @@ -0,0 +1,23 @@ +#ifndef PLATFORM_THREADING +#define PLATFORM_THREADING + +#include // TODO: in Collections + +#include + + +#include +#include +#include +#include + +namespace Platform::Threading::Synchronization +{ + #include "Synchronization/Sync/Sync.h" +} + +#include "Synchronization/ReaderWriterLockSynchronization.h" +#include "ConcurrentQueueExtensions.h" + + +#endif diff --git a/cpp/Platform.Threading/Synchronization/ISynchronization.h b/cpp/Platform.Threading/Synchronization/ISynchronization.h deleted file mode 100644 index 7a66bb2..0000000 --- a/cpp/Platform.Threading/Synchronization/ISynchronization.h +++ /dev/null @@ -1,14 +0,0 @@ -namespace Platform::Threading::Synchronization -{ - class ISynchronization - { - public: - virtual void ExecuteReadOperation(std::function action) = 0; - - TResult ExecuteReadOperation(std::function function); - - virtual void ExecuteWriteOperation(std::function action) = 0; - - TResult ExecuteWriteOperation(std::function function); - }; -} \ No newline at end of file diff --git a/cpp/Platform.Threading/Synchronization/ISynchronizationExtensions.h b/cpp/Platform.Threading/Synchronization/ISynchronizationExtensions.h deleted file mode 100644 index a1c8b17..0000000 --- a/cpp/Platform.Threading/Synchronization/ISynchronizationExtensions.h +++ /dev/null @@ -1,37 +0,0 @@ -namespace Platform::Threading::Synchronization -{ - class ISynchronizationExtensions - { - public: static TResult ExecuteReadOperation(ISynchronization &synchronization, TParam parameter, Func function) { return synchronization.ExecuteReadOperation([&]()-> auto { return function(parameter); } }); - - public: template static void ExecuteReadOperation(ISynchronization &synchronization, TParam parameter, std::function action) { synchronization.ExecuteReadOperation([&]()-> auto { return action(parameter); }); } - - public: static TResult ExecuteWriteOperation(ISynchronization &synchronization, TParam parameter, Func function) { return synchronization.ExecuteWriteOperation([&]()-> auto { return function(parameter); } }); - - public: template static void ExecuteWriteOperation(ISynchronization &synchronization, TParam parameter, std::function action) { synchronization.ExecuteWriteOperation([&]()-> auto { return action(parameter); }); } - - public: static TResult ExecuteReadOperation(ISynchronization &synchronization, TParam1 parameter1, TParam2 parameter2, Func function) { return synchronization.ExecuteReadOperation([&]()-> auto { return function(parameter1, parameter2); } }); - - public: static void ExecuteReadOperation(ISynchronization &synchronization, TParam1 parameter1, TParam2 parameter2, std::function action) { return synchronization.ExecuteReadOperation([&]()-> auto { return action(parameter1, parameter2); } }); - - public: static TResult ExecuteWriteOperation(ISynchronization &synchronization, TParam1 parameter1, TParam2 parameter2, Func function) { return synchronization.ExecuteWriteOperation([&]()-> auto { return function(parameter1, parameter2); } }); - - public: static void ExecuteWriteOperation(ISynchronization &synchronization, TParam1 parameter1, TParam2 parameter2, std::function action) { return synchronization.ExecuteWriteOperation([&]()-> auto { return action(parameter1, parameter2); } }); - - public: static TResult ExecuteReadOperation(ISynchronization &synchronization, TParam1 parameter1, TParam2 parameter2, TParam3 parameter3, Func function) { return synchronization.ExecuteReadOperation([&]()-> auto { return function(parameter1, parameter2, parameter3); } }); - - public: static void ExecuteReadOperation(ISynchronization &synchronization, TParam1 parameter1, TParam2 parameter2, TParam3 parameter3, std::function action) { return synchronization.ExecuteReadOperation([&]()-> auto { return action(parameter1, parameter2, parameter3); } }); - - public: static TResult ExecuteWriteOperation(ISynchronization &synchronization, TParam1 parameter1, TParam2 parameter2, TParam3 parameter3, Func function) { return synchronization.ExecuteWriteOperation([&]()-> auto { return function(parameter1, parameter2, parameter3); } }); - - public: static void ExecuteWriteOperation(ISynchronization &synchronization, TParam1 parameter1, TParam2 parameter2, TParam3 parameter3, std::function action) { return synchronization.ExecuteWriteOperation([&]()-> auto { return action(parameter1, parameter2, parameter3); } }); - - public: static TResult ExecuteReadOperation(ISynchronization &synchronization, TParam1 parameter1, TParam2 parameter2, TParam3 parameter3, TParam4 parameter4, Func function) { return synchronization.ExecuteReadOperation([&]()-> auto { return function(parameter1, parameter2, parameter3, parameter4); } }); - - public: static void ExecuteReadOperation(ISynchronization &synchronization, TParam1 parameter1, TParam2 parameter2, TParam3 parameter3, TParam4 parameter4, std::function action) { return synchronization.ExecuteReadOperation([&]()-> auto { return action(parameter1, parameter2, parameter3, parameter4); } }); - - public: static TResult ExecuteWriteOperation(ISynchronization &synchronization, TParam1 parameter1, TParam2 parameter2, TParam3 parameter3, TParam4 parameter4, Func function) { return synchronization.ExecuteWriteOperation([&]()-> auto { return function(parameter1, parameter2, parameter3, parameter4); } }); - - public: static void ExecuteWriteOperation(ISynchronization &synchronization, TParam1 parameter1, TParam2 parameter2, TParam3 parameter3, TParam4 parameter4, std::function action) { return synchronization.ExecuteWriteOperation([&]()-> auto { return action(parameter1, parameter2, parameter3, parameter4); } }); - }; -} diff --git a/cpp/Platform.Threading/Synchronization/ISynchronized.h b/cpp/Platform.Threading/Synchronization/ISynchronized.h deleted file mode 100644 index 922527a..0000000 --- a/cpp/Platform.Threading/Synchronization/ISynchronized.h +++ /dev/null @@ -1,13 +0,0 @@ -namespace Platform::Threading::Synchronization -{ - template class ISynchronized; - template class ISynchronized - { - public: - const ISynchronization *SyncRoot; - - const TInterface Unsync; - - const TInterface Sync; - }; -} diff --git a/cpp/Platform.Threading/Synchronization/ReaderWriterLockSynchronization.h b/cpp/Platform.Threading/Synchronization/ReaderWriterLockSynchronization.h index e564c53..88eb62d 100644 --- a/cpp/Platform.Threading/Synchronization/ReaderWriterLockSynchronization.h +++ b/cpp/Platform.Threading/Synchronization/ReaderWriterLockSynchronization.h @@ -1,59 +1,30 @@ namespace Platform::Threading::Synchronization { - class ReaderWriterLockSynchronization : public ISynchronization + template< + typename mutex_t = std::recursive_mutex, + template typename lock_t = std::unique_lock> + void ExecuteReadOperation(auto&& action, auto&&... args) { - private: readonly ReaderWriterLockSlim _rwLock = ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion); + static mutex_t mutex; - public: void ExecuteReadOperation(std::function action) + lock_t lock(mutex); + try { - _rwLock.EnterReadLock(); - try - { - action(); - } - finally - { - _rwLock.ExitReadLock(); - } - } + action(std::forward(args)...); + } catch(...) {} + } - public: TResult ExecuteReadOperation(std::function function) - { - _rwLock.EnterReadLock(); - try - { - return function(); - } - finally - { - _rwLock.ExitReadLock(); - } - } - - public: void ExecuteWriteOperation(std::function action) - { - _rwLock.EnterWriteLock(); - try - { - action(); - } - finally - { - _rwLock.ExitWriteLock(); - } - } + template< + typename mutex_t = std::recursive_mutex, + template typename lock_t = std::unique_lock> + void ExecuteWriteOperation(auto&& action, auto&&... args) + { + static mutex_t mutex; - public: TResult ExecuteWriteOperation(std::function function) + lock_t lock(mutex); + try { - _rwLock.EnterWriteLock(); - try - { - return function(); - } - finally - { - _rwLock.ExitWriteLock(); - } - } - }; + action(std::forward(args)...); + } catch(...) {} + } } \ No newline at end of file diff --git a/cpp/Platform.Threading/Synchronization/Sync/Sync.h b/cpp/Platform.Threading/Synchronization/Sync/Sync.h new file mode 100644 index 0000000..e3152b3 --- /dev/null +++ b/cpp/Platform.Threading/Synchronization/Sync/Sync.h @@ -0,0 +1,65 @@ +template, + typename lock_t = std::unique_lock> +class Sync +{ + mutable T data; + mutable mutex_t mutex; + + template + struct locked_caller + { + borrow_lock lock; + T* const ptr; + + public: + locked_caller(T* const ptr, mutex_t& mutex) : ptr(ptr), lock(mutex) {} + + T* operator->() && { return ptr; } + const T* operator->() const && { return ptr; } + }; + + template> + struct locked_ref : private base + { + locked_ref(T* const ptr, mutex_t& mutex) : base(ptr, mutex) {} + + T& operator*() & { return *base::ptr; } + const T& operator*() const & { return *base::ptr; } + + T* operator->() & { return base::ptr; } + const T* operator->() const & { return base::ptr; } + }; + +public: + Sync() = default; + + Sync(auto&&... args) requires requires { decltype(data)(std::forward(args)...); } + : data(std::forward(args)...), + mutex() {} + + Sync(const Sync& other) + : data(other.data), + mutex() {} + + Sync(Sync&&) noexcept = default; + + auto operator*() { return locked_ref(&data, mutex); } + auto operator*() const { return locked_ref(&data, mutex); } + + auto operator->() { return locked_caller(&data, mutex); } + auto operator->() const { return locked_caller(&data, mutex); } + + /// friends + template + friend auto&& Drop(Sync&& self); +}; + +template +Sync(T) -> Sync; + +template +auto&& Drop(Sync&& self) { + return std::move(self.data); +} diff --git a/cpp/Platform.Threading/Synchronization/Unsynchronization.h b/cpp/Platform.Threading/Synchronization/Unsynchronization.h deleted file mode 100644 index e67301f..0000000 --- a/cpp/Platform.Threading/Synchronization/Unsynchronization.h +++ /dev/null @@ -1,13 +0,0 @@ -namespace Platform::Threading::Synchronization -{ - class Unsynchronization : public ISynchronization - { - public: void ExecuteReadOperation(std::function action) { action(); } - - public: TResult ExecuteReadOperation(std::function function) { return function(); } - - public: void ExecuteWriteOperation(std::function action) { action(); } - - public: TResult ExecuteWriteOperation(std::function function) { return function(); } - }; -} \ No newline at end of file diff --git a/cpp/Platform.Threading/TaskExtensions.h b/cpp/Platform.Threading/TaskExtensions.h deleted file mode 100644 index b78a7da..0000000 --- a/cpp/Platform.Threading/TaskExtensions.h +++ /dev/null @@ -1,7 +0,0 @@ -namespace Platform::Threading -{ - class TaskExtensions - { - public: template static TReturn AwaitResult(Task task) { return task.GetAwaiter().GetResult(); } - }; -} diff --git a/cpp/Platform.Threading/ThreadHelpers.h b/cpp/Platform.Threading/ThreadHelpers.h deleted file mode 100644 index 1b4f38c..0000000 --- a/cpp/Platform.Threading/ThreadHelpers.h +++ /dev/null @@ -1,39 +0,0 @@ -namespace Platform::Threading -{ - class ThreadHelpers - { - public: static std::int32_t DefaultMaxStackSize; - - public: inline static const std::int32_t DefaultExtendedMaxStackSize = 256 * 1024 * 1024; - - public: inline static const std::int32_t DefaultSleepInterval = 1; - - public: template static void InvokeWithModifiedMaxStackSize(T param, std::function action, std::int32_t maxStackSize) { StartNew(param, action, maxStackSize).Join(); } - - public: template static void InvokeWithExtendedMaxStackSize(T param, std::function action) { InvokeWithModifiedMaxStackSize(param, action, DefaultExtendedMaxStackSize); } - - public: static void InvokeWithModifiedMaxStackSize(std::function action, std::int32_t maxStackSize) { StartNew(action, maxStackSize).Join(); } - - public: static void InvokeWithExtendedMaxStackSize(std::function action) { InvokeWithModifiedMaxStackSize(action, DefaultExtendedMaxStackSize); } - - public: template static Thread StartNew(T param, std::function action, std::int32_t maxStackSize) - { - auto thread = Thread(ParameterizedThreadStart(action), maxStackSize); - thread.Start(param); - return thread; - } - - public: template static Thread StartNew(T param, std::function action) { return StartNew(param, action, DefaultMaxStackSize); } - - public: static Thread StartNew(std::function action, std::int32_t maxStackSize) - { - auto thread = Thread(ThreadStart(action), maxStackSize); - thread.Start(); - return thread; - } - - public: static Thread StartNew(std::function action) { return StartNew(action, DefaultMaxStackSize); } - - public: static void Sleep() { Thread.Sleep(DefaultSleepInterval); } - }; -} diff --git a/cpp/conanfile.txt b/cpp/conanfile.txt new file mode 100644 index 0000000..56ce160 --- /dev/null +++ b/cpp/conanfile.txt @@ -0,0 +1,6 @@ +[requires] +gtest/cci.20210126 +platform.collections/0.1.0 + +[generators] +cmake