Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 37 additions & 8 deletions Src/Base/AMReX_GpuDevice.H
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <cstdlib>
#include <cstring>
#include <memory>
#include <mutex>

#define AMREX_GPU_MAX_STREAMS 8

Expand Down Expand Up @@ -46,8 +47,24 @@ using gpuDeviceProp_t = cudaDeviceProp;
}
#endif

namespace amrex {
class Arena;
}

namespace amrex::Gpu {

#ifdef AMREX_USE_GPU
class StreamManager {
gpuStream_t m_stream;
std::mutex m_mutex;
Vector<std::pair<Arena*, void*>> m_free_wait_list;
public:
[[nodiscard]] gpuStream_t& get ();
void sync ();
void stream_free (Arena* arena, void* mem);
};
#endif

class Device
{

Expand All @@ -57,14 +74,16 @@ public:
static void Finalize ();

#if defined(AMREX_USE_GPU)
static gpuStream_t gpuStream () noexcept { return gpu_stream[OpenMP::get_thread_num()]; }
static gpuStream_t gpuStream () noexcept {
return gpu_stream_pool[gpu_stream_index[OpenMP::get_thread_num()]].get();
}
#ifdef AMREX_USE_CUDA
/** for backward compatibility */
static cudaStream_t cudaStream () noexcept { return gpu_stream[OpenMP::get_thread_num()]; }
static cudaStream_t cudaStream () noexcept { return gpuStream(); }
#endif
#ifdef AMREX_USE_SYCL
static sycl::queue& streamQueue () noexcept { return *(gpu_stream[OpenMP::get_thread_num()].queue); }
static sycl::queue& streamQueue (int i) noexcept { return *(gpu_stream_pool[i].queue); }
static sycl::queue& streamQueue () noexcept { return *(gpuStream().queue); }
static sycl::queue& streamQueue (int i) noexcept { return *(gpu_stream_pool[i].get().queue); }
#endif
#endif

Expand Down Expand Up @@ -104,6 +123,8 @@ public:
*/
static void streamSynchronizeAll () noexcept;

static void streamFree (Arena* arena, void* mem) noexcept;

#if defined(__CUDACC__)
/** Generic graph selection. These should be called by users. */
static void startGraphRecording(bool first_iter, void* h_ptr, void* d_ptr, size_t sz);
Expand Down Expand Up @@ -196,10 +217,10 @@ private:
static AMREX_EXPORT dim3 numThreadsMin;
static AMREX_EXPORT dim3 numBlocksOverride, numThreadsOverride;

static AMREX_EXPORT Vector<gpuStream_t> gpu_stream_pool; // The size of this is max_gpu_stream
// The non-owning gpu_stream is used to store the current stream that will be used.
// gpu_stream is a vector so that it's thread safe to write to it.
static AMREX_EXPORT Vector<gpuStream_t> gpu_stream; // The size of this is omp_max_threads
static AMREX_EXPORT Vector<StreamManager> gpu_stream_pool; // The size of this is max_gpu_stream
// The non-owning gpu_stream_index is used to store the current stream index that will be used.
// gpu_stream_index is a vector so that it's thread safe to write to it.
static AMREX_EXPORT Vector<int> gpu_stream_index; // The size of this is omp_max_threads
static AMREX_EXPORT gpuDeviceProp_t device_prop;
static AMREX_EXPORT int memory_pools_supported;
static AMREX_EXPORT unsigned int max_blocks_per_launch;
Expand All @@ -208,6 +229,8 @@ private:
static AMREX_EXPORT std::unique_ptr<sycl::context> sycl_context;
static AMREX_EXPORT std::unique_ptr<sycl::device> sycl_device;
#endif

friend StreamManager;
#endif
};

Expand Down Expand Up @@ -245,6 +268,12 @@ streamSynchronizeAll () noexcept
Device::streamSynchronizeAll();
}

inline void
streamFree (Arena* arena, void* mem) noexcept
{
Device::streamFree(arena, mem);
}

#ifdef AMREX_USE_GPU

inline void
Expand Down
145 changes: 105 additions & 40 deletions Src/Base/AMReX_GpuDevice.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

#include <AMReX_Arena.H>
#include <AMReX_GpuDevice.H>
#include <AMReX_GpuLaunch.H>
#include <AMReX_Machine.H>
Expand Down Expand Up @@ -97,10 +98,10 @@ dim3 Device::numThreadsOverride = dim3(0, 0, 0);
dim3 Device::numBlocksOverride = dim3(0, 0, 0);
unsigned int Device::max_blocks_per_launch = 2560;

Vector<gpuStream_t> Device::gpu_stream_pool;
Vector<gpuStream_t> Device::gpu_stream;
gpuDeviceProp_t Device::device_prop;
int Device::memory_pools_supported = 0;
Vector<StreamManager> Device::gpu_stream_pool;
Vector<int> Device::gpu_stream_index;
gpuDeviceProp_t Device::device_prop;
int Device::memory_pools_supported = 0;

constexpr int Device::warp_size;

Expand Down Expand Up @@ -141,6 +142,64 @@ namespace {
}
}

[[nodiscard]] gpuStream_t&
StreamManager::get () {
return m_stream;
}

void
StreamManager::sync () {
decltype(m_free_wait_list) new_empty_wait_list{};

{
// lock mutex before accessing and modifying member variables
std::lock_guard<std::mutex> lock(m_mutex);
m_free_wait_list.swap(new_empty_wait_list);
}
// unlock mutex before stream sync and memory free
// to avoid deadlocks from the CArena mutex

// actual stream sync
#ifdef AMREX_USE_SYCL
try {
m_stream.queue->wait_and_throw();
} catch (sycl::exception const& ex) {
amrex::Abort(std::string("streamSynchronize: ")+ex.what()+"!!!!!");
}
#else
AMREX_HIP_OR_CUDA( AMREX_HIP_SAFE_CALL(hipStreamSynchronize(m_stream));,
AMREX_CUDA_SAFE_CALL(cudaStreamSynchronize(m_stream)); )
#endif

// synconizing the stream may have taken a long time and
// there may be new kernels launched already, so we free memory
// according to the state from before the stream was synced

for (auto [arena, mem] : new_empty_wait_list) {
arena->free(mem);
}
}

void
StreamManager::stream_free (Arena* arena, void* mem) {
if (arena->isDeviceAccessible()) {
std::size_t free_wait_list_size = 0;
{
// lock mutex before accessing and modifying member variables
std::lock_guard<std::mutex> lock(m_mutex);
m_free_wait_list.emplace_back(arena, mem);
free_wait_list_size = m_free_wait_list.size();
}
// Limit the number of memory allocations in m_free_wait_list
// in case the stream is never synchronized
if (free_wait_list_size > 100) {
sync();
}
} else {
arena->free(mem);
}
}

#endif

void
Expand Down Expand Up @@ -384,24 +443,25 @@ void
Device::Finalize ()
{
#ifdef AMREX_USE_GPU
streamSynchronizeAll();
Device::profilerStop();

#ifdef AMREX_USE_SYCL
for (auto& s : gpu_stream_pool) {
delete s.queue;
s.queue = nullptr;
delete s.get().queue;
s.get().queue = nullptr;
}
sycl_context.reset();
sycl_device.reset();
#else
for (int i = 0; i < max_gpu_streams; ++i)
{
AMREX_HIP_OR_CUDA( AMREX_HIP_SAFE_CALL( hipStreamDestroy(gpu_stream_pool[i]));,
AMREX_CUDA_SAFE_CALL(cudaStreamDestroy(gpu_stream_pool[i])); );
AMREX_HIP_OR_CUDA( AMREX_HIP_SAFE_CALL( hipStreamDestroy(gpu_stream_pool[i].get()));,
AMREX_CUDA_SAFE_CALL(cudaStreamDestroy(gpu_stream_pool[i].get())); );
}
#endif

gpu_stream.clear();
gpu_stream_index.clear();

#ifdef AMREX_USE_ACC
amrex_finalize_acc();
Expand All @@ -417,7 +477,10 @@ Device::initialize_gpu (bool minimal)

#ifdef AMREX_USE_GPU

gpu_stream_pool.resize(max_gpu_streams);
if (gpu_stream_pool.size() != max_gpu_streams) {
// no copy/move constructor for std::mutex
gpu_stream_pool = Vector<StreamManager>(max_gpu_streams);
}

#ifdef AMREX_USE_HIP

Expand All @@ -430,7 +493,7 @@ Device::initialize_gpu (bool minimal)
// AMD devices do not support shared cache banking.

for (int i = 0; i < max_gpu_streams; ++i) {
AMREX_HIP_SAFE_CALL(hipStreamCreate(&gpu_stream_pool[i]));
AMREX_HIP_SAFE_CALL(hipStreamCreate(&gpu_stream_pool[i].get()));
}

#ifdef AMREX_GPU_STREAM_ALLOC_SUPPORT
Expand Down Expand Up @@ -458,9 +521,9 @@ Device::initialize_gpu (bool minimal)
#endif

for (int i = 0; i < max_gpu_streams; ++i) {
AMREX_CUDA_SAFE_CALL(cudaStreamCreate(&gpu_stream_pool[i]));
AMREX_CUDA_SAFE_CALL(cudaStreamCreate(&gpu_stream_pool[i].get()));
#ifdef AMREX_USE_ACC
acc_set_cuda_stream(i, gpu_stream_pool[i]);
acc_set_cuda_stream(i, gpu_stream_pool[i].get());
#endif
}

Expand All @@ -473,7 +536,7 @@ Device::initialize_gpu (bool minimal)
sycl_device = std::make_unique<sycl::device>(gpu_devices[device_id]);
sycl_context = std::make_unique<sycl::context>(*sycl_device, amrex_sycl_error_handler);
for (int i = 0; i < max_gpu_streams; ++i) {
gpu_stream_pool[i].queue = new sycl::queue(*sycl_context, *sycl_device,
gpu_stream_pool[i].get().queue = new sycl::queue(*sycl_context, *sycl_device,
sycl::property_list{sycl::property::queue::in_order{}});
}
}
Expand Down Expand Up @@ -556,7 +619,7 @@ Device::initialize_gpu (bool minimal)
}
#endif

gpu_stream.resize(OpenMP::get_max_threads(), gpu_stream_pool[0]);
gpu_stream_index.resize(OpenMP::get_max_threads(), 0);

ParmParse pp("device");

Expand Down Expand Up @@ -626,8 +689,13 @@ int Device::numDevicePartners () noexcept
int
Device::streamIndex (gpuStream_t s) noexcept
{
auto it = std::find(std::begin(gpu_stream_pool), std::end(gpu_stream_pool), s);
return static_cast<int>(std::distance(std::begin(gpu_stream_pool), it));
const int N = gpu_stream_pool.size();
for (int i = 0; i < N ; ++i) {
if (gpu_stream_pool[i].get() == s) {
return i;
}
}
return N;
}
#endif

Expand All @@ -636,7 +704,7 @@ Device::setStreamIndex (int idx) noexcept
{
amrex::ignore_unused(idx);
#ifdef AMREX_USE_GPU
gpu_stream[OpenMP::get_thread_num()] = gpu_stream_pool[idx % max_gpu_streams];
gpu_stream_index[OpenMP::get_thread_num()] = idx % max_gpu_streams;
#ifdef AMREX_USE_ACC
amrex_set_acc_stream(idx % max_gpu_streams);
#endif
Expand All @@ -647,16 +715,16 @@ Device::setStreamIndex (int idx) noexcept
gpuStream_t
Device::resetStream () noexcept
{
gpuStream_t r = gpu_stream[OpenMP::get_thread_num()];
gpu_stream[OpenMP::get_thread_num()] = gpu_stream_pool[0];
gpuStream_t r = gpuStream();
gpu_stream_index[OpenMP::get_thread_num()] = 0;
return r;
}

gpuStream_t
Device::setStream (gpuStream_t s) noexcept
{
gpuStream_t r = gpu_stream[OpenMP::get_thread_num()];
gpu_stream[OpenMP::get_thread_num()] = s;
gpuStream_t r = gpuStream();
gpu_stream_index[OpenMP::get_thread_num()] = streamIndex(s);
return r;
}
#endif
Expand All @@ -665,9 +733,9 @@ void
Device::synchronize () noexcept
{
#ifdef AMREX_USE_SYCL
for (auto const& s : gpu_stream_pool) {
for (auto& s : gpu_stream_pool) {
try {
s.queue->wait_and_throw();
s.get().queue->wait_and_throw();
} catch (sycl::exception const& ex) {
amrex::Abort(std::string("synchronize: ")+ex.what()+"!!!!!");
}
Expand All @@ -681,31 +749,28 @@ Device::synchronize () noexcept
void
Device::streamSynchronize () noexcept
{
#ifdef AMREX_USE_SYCL
auto& q = streamQueue();
try {
q.wait_and_throw();
} catch (sycl::exception const& ex) {
amrex::Abort(std::string("streamSynchronize: ")+ex.what()+"!!!!!");
}
#else
AMREX_HIP_OR_CUDA( AMREX_HIP_SAFE_CALL(hipStreamSynchronize(gpuStream()));,
AMREX_CUDA_SAFE_CALL(cudaStreamSynchronize(gpuStream())); )
#ifdef AMREX_USE_GPU
gpu_stream_pool[gpu_stream_index[OpenMP::get_thread_num()]].sync();
#endif
}

void
Device::streamSynchronizeAll () noexcept
{
#ifdef AMREX_USE_GPU
#ifdef AMREX_USE_SYCL
Device::synchronize();
#else
for (auto const& s : gpu_stream_pool) {
AMREX_HIP_OR_CUDA( AMREX_HIP_SAFE_CALL(hipStreamSynchronize(s));,
AMREX_CUDA_SAFE_CALL(cudaStreamSynchronize(s)); )
for (auto& s : gpu_stream_pool) {
s.sync();
}
#endif
}

void
Device::streamFree (Arena* arena, void* mem) noexcept
{
#ifdef AMREX_USE_GPU
gpu_stream_pool[gpu_stream_index[OpenMP::get_thread_num()]].stream_free(arena, mem);
#else
arena->free(mem);
#endif
}

Expand Down
12 changes: 12 additions & 0 deletions Src/Base/AMReX_PODVector.H
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,18 @@ namespace amrex
std::swap(static_cast<Allocator&>(a_vector), static_cast<Allocator&>(*this));
}

void stream_free () noexcept
{
if (m_data != nullptr) {
if constexpr (IsArenaAllocator<Allocator>::value) {
Gpu::streamFree(Allocator::arena(), m_data);
} else {
deallocate(m_data, capacity());
}
m_data = nullptr;
}
}

private:

void reserve_doit (size_type a_capacity) {
Expand Down
Loading
Loading