diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 6079780044c68..ffe1773edcec7 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -75,6 +75,7 @@ llvm/include/llvm/SYCLLowerIR/SYCLCreateNVVMAnnotations.h @intel/llvm-reviewers- llvm/lib/SYCLLowerIR/SYCLCreateNVVMAnnotations.cpp @intel/llvm-reviewers-cuda llvm/lib/Target/NVPTX @intel/llvm-reviewers-cuda llvm/lib/Target/AMDGPU @intel/llvm-reviewers-cuda +unified-runtime/source/common/cuda-hip @intel/llvm-reviewers-cuda # XPTI instrumentation utilities xpti/ @intel/llvm-reviewers-runtime diff --git a/unified-runtime/source/adapters/cuda/async_alloc.cpp b/unified-runtime/source/adapters/cuda/async_alloc.cpp index 1826ebe531c5a..8f545b5f2d980 100644 --- a/unified-runtime/source/adapters/cuda/async_alloc.cpp +++ b/unified-runtime/source/adapters/cuda/async_alloc.cpp @@ -25,7 +25,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueUSMDeviceAllocExp( ScopedContext Active(hQueue->getDevice()); uint32_t StreamToken; - ur_stream_guard_ Guard; + ur_stream_guard Guard; CUstream CuStream = hQueue->getNextComputeStream( numEventsInWaitList, phEventWaitList, Guard, &StreamToken); @@ -83,7 +83,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueUSMFreeExp( ScopedContext Active(hQueue->getDevice()); uint32_t StreamToken; - ur_stream_guard_ Guard; + ur_stream_guard Guard; CUstream CuStream = hQueue->getNextComputeStream( numEventsInWaitList, phEventWaitList, Guard, &StreamToken); diff --git a/unified-runtime/source/adapters/cuda/command_buffer.cpp b/unified-runtime/source/adapters/cuda/command_buffer.cpp index f494c7b557538..01dc632fa6fdd 100644 --- a/unified-runtime/source/adapters/cuda/command_buffer.cpp +++ b/unified-runtime/source/adapters/cuda/command_buffer.cpp @@ -1133,7 +1133,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueCommandBufferExp( std::unique_ptr RetImplEvent{nullptr}; ScopedContext Active(hQueue->getDevice()); uint32_t StreamToken; - ur_stream_guard_ Guard; + ur_stream_guard Guard; CUstream CuStream = hQueue->getNextComputeStream( numEventsInWaitList, phEventWaitList, Guard, &StreamToken); diff --git a/unified-runtime/source/adapters/cuda/enqueue.cpp b/unified-runtime/source/adapters/cuda/enqueue.cpp index fb3e1952ed1c9..e48a1e5ea15f7 100644 --- a/unified-runtime/source/adapters/cuda/enqueue.cpp +++ b/unified-runtime/source/adapters/cuda/enqueue.cpp @@ -315,7 +315,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueEventsWaitWithBarrier( try { ScopedContext Active(hQueue->getDevice()); uint32_t StreamToken; - ur_stream_guard_ Guard; + ur_stream_guard Guard; CUstream CuStream = hQueue->getNextComputeStream( numEventsInWaitList, phEventWaitList, Guard, &StreamToken); { @@ -440,7 +440,7 @@ enqueueKernelLaunch(ur_queue_handle_t hQueue, ur_kernel_handle_t hKernel, ScopedContext Active(hQueue->getDevice()); uint32_t StreamToken; - ur_stream_guard_ Guard; + ur_stream_guard Guard; CUstream CuStream = hQueue->getNextComputeStream( numEventsInWaitList, phEventWaitList, Guard, &StreamToken); @@ -628,7 +628,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueKernelLaunchCustomExp( ScopedContext Active(hQueue->getDevice()); uint32_t StreamToken; - ur_stream_guard_ Guard; + ur_stream_guard Guard; CUstream CuStream = hQueue->getNextComputeStream( numEventsInWaitList, phEventWaitList, Guard, &StreamToken); @@ -1517,7 +1517,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueUSMFill( try { ScopedContext Active(hQueue->getDevice()); uint32_t StreamToken; - ur_stream_guard_ Guard; + ur_stream_guard Guard; CUstream CuStream = hQueue->getNextComputeStream( numEventsInWaitList, phEventWaitList, Guard, &StreamToken); UR_CHECK_ERROR(enqueueEventsWait(hQueue, CuStream, numEventsInWaitList, diff --git a/unified-runtime/source/adapters/cuda/event.cpp b/unified-runtime/source/adapters/cuda/event.cpp index d440567ffaac6..b8ec9917d3964 100644 --- a/unified-runtime/source/adapters/cuda/event.cpp +++ b/unified-runtime/source/adapters/cuda/event.cpp @@ -87,17 +87,17 @@ bool ur_event_handle_t_::isCompleted() const noexcept try { uint64_t ur_event_handle_t_::getQueuedTime() const { assert(isStarted()); - return Queue->get_device()->getElapsedTime(EvQueued); + return Queue->getDevice()->getElapsedTime(EvQueued); } uint64_t ur_event_handle_t_::getStartTime() const { assert(isStarted()); - return Queue->get_device()->getElapsedTime(EvStart); + return Queue->getDevice()->getElapsedTime(EvStart); } uint64_t ur_event_handle_t_::getEndTime() const { assert(isStarted() && isRecorded()); - return Queue->get_device()->getElapsedTime(EvEnd); + return Queue->getDevice()->getElapsedTime(EvEnd); } ur_result_t ur_event_handle_t_::record() { @@ -111,7 +111,7 @@ ur_result_t ur_event_handle_t_::record() { UR_ASSERT(Queue, UR_RESULT_ERROR_INVALID_QUEUE); try { - EventID = Queue->getNextEventID(); + EventID = Queue->getNextEventId(); if (EventID == 0) { die("Unrecoverable program state reached in event identifier overflow"); } diff --git a/unified-runtime/source/adapters/cuda/queue.cpp b/unified-runtime/source/adapters/cuda/queue.cpp index 115766c7d1c29..47e5d81f1203d 100644 --- a/unified-runtime/source/adapters/cuda/queue.cpp +++ b/unified-runtime/source/adapters/cuda/queue.cpp @@ -32,93 +32,17 @@ void ur_queue_handle_t_::transferStreamWaitForBarrierIfNeeded( } } -CUstream ur_queue_handle_t_::getNextComputeStream(uint32_t *StreamToken) { - if (getThreadLocalStream() != CUstream{0}) - return getThreadLocalStream(); - uint32_t StreamI; - uint32_t Token; - while (true) { - if (NumComputeStreams < ComputeStreams.size()) { - // the check above is for performance - so as not to lock mutex every time - std::lock_guard guard(ComputeStreamMutex); - // The second check is done after mutex is locked so other threads can not - // change NumComputeStreams after that - if (NumComputeStreams < ComputeStreams.size()) { - UR_CHECK_ERROR(cuStreamCreateWithPriority( - &ComputeStreams[NumComputeStreams], Flags, Priority)); - ++NumComputeStreams; - } - } - Token = ComputeStreamIndex++; - StreamI = Token % ComputeStreams.size(); - // if a stream has been reused before it was next selected round-robin - // fashion, we want to delay its next use and instead select another one - // that is more likely to have completed all the enqueued work. - if (DelayCompute[StreamI]) { - DelayCompute[StreamI] = false; - } else { - break; - } - } - if (StreamToken) { - *StreamToken = Token; - } - CUstream res = ComputeStreams[StreamI]; - computeStreamWaitForBarrierIfNeeded(res, StreamI); - return res; +ur_queue_handle_t ur_queue_handle_t_::getEventQueue(const ur_event_handle_t e) { + return e->getQueue(); } -CUstream ur_queue_handle_t_::getNextComputeStream( - uint32_t NumEventsInWaitList, const ur_event_handle_t *EventWaitList, - ur_stream_guard_ &Guard, uint32_t *StreamToken) { - if (getThreadLocalStream() != CUstream{0}) - return getThreadLocalStream(); - for (uint32_t i = 0; i < NumEventsInWaitList; i++) { - uint32_t Token = EventWaitList[i]->getComputeStreamToken(); - if (reinterpret_cast(EventWaitList[i]->getQueue()) == - this && - canReuseStream(Token)) { - std::unique_lock ComputeSyncGuard(ComputeStreamSyncMutex); - // redo the check after lock to avoid data races on - // LastSyncComputeStreams - if (canReuseStream(Token)) { - uint32_t StreamI = Token % DelayCompute.size(); - DelayCompute[StreamI] = true; - if (StreamToken) { - *StreamToken = Token; - } - Guard = ur_stream_guard_{std::move(ComputeSyncGuard)}; - CUstream Result = EventWaitList[i]->getStream(); - computeStreamWaitForBarrierIfNeeded(Result, StreamI); - return Result; - } - } - } - Guard = {}; - return getNextComputeStream(StreamToken); +uint32_t +ur_queue_handle_t_::getEventComputeStreamToken(const ur_event_handle_t e) { + return e->getComputeStreamToken(); } -CUstream ur_queue_handle_t_::getNextTransferStream() { - if (getThreadLocalStream() != CUstream{0}) - return getThreadLocalStream(); - if (TransferStreams.empty()) { // for example in in-order queue - return getNextComputeStream(); - } - if (NumTransferStreams < TransferStreams.size()) { - // the check above is for performance - so as not to lock mutex every time - std::lock_guard Guuard(TransferStreamMutex); - // The second check is done after mutex is locked so other threads can not - // change NumTransferStreams after that - if (NumTransferStreams < TransferStreams.size()) { - UR_CHECK_ERROR(cuStreamCreateWithPriority( - &TransferStreams[NumTransferStreams], Flags, Priority)); - ++NumTransferStreams; - } - } - uint32_t StreamI = TransferStreamIndex++ % TransferStreams.size(); - CUstream Result = TransferStreams[StreamI]; - transferStreamWaitForBarrierIfNeeded(Result, StreamI); - return Result; +CUstream ur_queue_handle_t_::getEventStream(const ur_event_handle_t e) { + return e->getStream(); } /// Creates a `ur_queue_handle_t` object on the CUDA backend. diff --git a/unified-runtime/source/adapters/cuda/queue.hpp b/unified-runtime/source/adapters/cuda/queue.hpp index 89132c99ca639..f6369996b86af 100644 --- a/unified-runtime/source/adapters/cuda/queue.hpp +++ b/unified-runtime/source/adapters/cuda/queue.hpp @@ -17,100 +17,24 @@ #include #include -using ur_stream_guard_ = std::unique_lock; +#include /// UR queue mapping on to CUstream objects. /// -struct ur_queue_handle_t_ { +struct ur_queue_handle_t_ : stream_queue_t { + using stream_queue_t::stream_queue_t; - using native_type = CUstream; - static constexpr int DefaultNumComputeStreams = 128; - static constexpr int DefaultNumTransferStreams = 64; - - std::vector ComputeStreams; - std::vector TransferStreams; - // Stream used for recording EvQueue, which holds information about when the - // command in question is enqueued on host, as opposed to started. It is - // created only if profiling is enabled - either for queue or per event. - native_type HostSubmitTimeStream{0}; - // delay_compute_ keeps track of which streams have been recently reused and - // their next use should be delayed. If a stream has been recently reused it - // will be skipped the next time it would be selected round-robin style. When - // skipped, its delay flag is cleared. - std::vector DelayCompute; - // keep track of which streams have applied barrier - std::vector ComputeAppliedBarrier; - std::vector TransferAppliedBarrier; - ur_context_handle_t_ *Context; - ur_device_handle_t_ *Device; CUevent BarrierEvent = nullptr; CUevent BarrierTmpEvent = nullptr; - std::atomic_uint32_t RefCount; - std::atomic_uint32_t EventCount; - std::atomic_uint32_t ComputeStreamIndex; - std::atomic_uint32_t TransferStreamIndex; - unsigned int NumComputeStreams; - unsigned int NumTransferStreams; - unsigned int LastSyncComputeStreams; - unsigned int LastSyncTransferStreams; - unsigned int Flags; - ur_queue_flags_t URFlags; - int Priority; - // When ComputeStreamSyncMutex and ComputeStreamMutex both need to be - // locked at the same time, ComputeStreamSyncMutex should be locked first - // to avoid deadlocks - std::mutex ComputeStreamSyncMutex; - std::mutex ComputeStreamMutex; - std::mutex TransferStreamMutex; - std::mutex BarrierMutex; - bool HasOwnership; - - ur_queue_handle_t_(std::vector &&ComputeStreams, - std::vector &&TransferStreams, - ur_context_handle_t_ *Context, ur_device_handle_t_ *Device, - unsigned int Flags, ur_queue_flags_t URFlags, int Priority, - bool BackendOwns = true) - : ComputeStreams{std::move(ComputeStreams)}, - TransferStreams{std::move(TransferStreams)}, - DelayCompute(this->ComputeStreams.size(), false), - ComputeAppliedBarrier(this->ComputeStreams.size()), - TransferAppliedBarrier(this->TransferStreams.size()), Context{Context}, - Device{Device}, RefCount{1}, EventCount{0}, ComputeStreamIndex{0}, - TransferStreamIndex{0}, NumComputeStreams{0}, NumTransferStreams{0}, - LastSyncComputeStreams{0}, LastSyncTransferStreams{0}, Flags(Flags), - URFlags(URFlags), Priority(Priority), HasOwnership{BackendOwns} { - urContextRetain(Context); - urDeviceRetain(Device); - } - ~ur_queue_handle_t_() { - urContextRelease(Context); - urDeviceRelease(Device); - } - - void computeStreamWaitForBarrierIfNeeded(CUstream Strean, uint32_t StreamI); - void transferStreamWaitForBarrierIfNeeded(CUstream Stream, uint32_t StreamI); - - // get_next_compute/transfer_stream() functions return streams from - // appropriate pools in round-robin fashion - native_type getNextComputeStream(uint32_t *StreamToken = nullptr); - // this overload tries select a stream that was used by one of dependencies. - // If that is not possible returns a new stream. If a stream is reused it - // returns a lock that needs to remain locked as long as the stream is in use - native_type getNextComputeStream(uint32_t NumEventsInWaitList, - const ur_event_handle_t *EventWaitList, - ur_stream_guard_ &Guard, - uint32_t *StreamToken = nullptr); - - // Thread local stream will be used if ScopedStream is active - static CUstream &getThreadLocalStream() { - static thread_local CUstream stream{0}; - return stream; - } - - native_type getNextTransferStream(); - native_type get() { return getNextComputeStream(); }; - ur_device_handle_t getDevice() const noexcept { return Device; }; + void computeStreamWaitForBarrierIfNeeded(CUstream Strean, + uint32_t StreamI) override; + void transferStreamWaitForBarrierIfNeeded(CUstream Stream, + uint32_t StreamI) override; + ur_queue_handle_t getEventQueue(const ur_event_handle_t) override; + uint32_t getEventComputeStreamToken(const ur_event_handle_t) override; + CUstream getEventStream(const ur_event_handle_t) override; // Function which creates the profiling stream. Called only from makeNative // event when profiling is required. @@ -122,155 +46,10 @@ struct ur_queue_handle_t_ { }); } - native_type getHostSubmitTimeStream() { return HostSubmitTimeStream; } - - bool hasBeenSynchronized(uint32_t StreamToken) { - // stream token not associated with one of the compute streams - if (StreamToken == std::numeric_limits::max()) { - return false; - } - return LastSyncComputeStreams > StreamToken; - } - - bool canReuseStream(uint32_t StreamToken) { - // stream token not associated with one of the compute streams - if (StreamToken == std::numeric_limits::max()) { - return false; - } - // If the command represented by the stream token was not the last command - // enqueued to the stream we can not reuse the stream - we need to allow for - // commands enqueued after it and the one we are about to enqueue to run - // concurrently - bool IsLastCommand = - (ComputeStreamIndex - StreamToken) <= ComputeStreams.size(); - // If there was a barrier enqueued to the queue after the command - // represented by the stream token we should not reuse the stream, as we can - // not take that stream into account for the bookkeeping for the next - // barrier - such a stream would not be synchronized with. Performance-wise - // it does not matter that we do not reuse the stream, as the work - // represented by the stream token is guaranteed to be complete by the - // barrier before any work we are about to enqueue to the stream will start, - // so the event does not need to be synchronized with. - return IsLastCommand && !hasBeenSynchronized(StreamToken); + void createStreamWithPriority(CUstream *Stream, unsigned int Flags, + int Priority) override { + UR_CHECK_ERROR(cuStreamCreateWithPriority(Stream, Flags, Priority)); } - - template bool allOf(T &&F) { - { - std::lock_guard ComputeGuard(ComputeStreamMutex); - unsigned int End = std::min( - static_cast(ComputeStreams.size()), NumComputeStreams); - if (!std::all_of(ComputeStreams.begin(), ComputeStreams.begin() + End, F)) - return false; - } - { - std::lock_guard TransferGuard(TransferStreamMutex); - unsigned int End = - std::min(static_cast(TransferStreams.size()), - NumTransferStreams); - if (!std::all_of(TransferStreams.begin(), TransferStreams.begin() + End, - F)) - return false; - } - return true; - } - - template void forEachStream(T &&F) { - { - std::lock_guard compute_guard(ComputeStreamMutex); - unsigned int End = std::min( - static_cast(ComputeStreams.size()), NumComputeStreams); - for (unsigned int i = 0; i < End; i++) { - F(ComputeStreams[i]); - } - } - { - std::lock_guard transfer_guard(TransferStreamMutex); - unsigned int End = - std::min(static_cast(TransferStreams.size()), - NumTransferStreams); - for (unsigned int i = 0; i < End; i++) { - F(TransferStreams[i]); - } - } - } - - template void syncStreams(T &&F) { - auto SyncCompute = [&F, &Streams = ComputeStreams, &Delay = DelayCompute]( - unsigned int Start, unsigned int Stop) { - for (unsigned int i = Start; i < Stop; i++) { - F(Streams[i]); - Delay[i] = false; - } - }; - auto SyncTransfer = [&F, &streams = TransferStreams](unsigned int Start, - unsigned int Stop) { - for (unsigned int i = Start; i < Stop; i++) { - F(streams[i]); - } - }; - { - unsigned int Size = static_cast(ComputeStreams.size()); - std::lock_guard ComputeSyncGuard(ComputeStreamSyncMutex); - std::lock_guard ComputeGuard(ComputeStreamMutex); - unsigned int Start = LastSyncComputeStreams; - unsigned int End = NumComputeStreams < Size ? NumComputeStreams - : ComputeStreamIndex.load(); - if (ResetUsed) { - LastSyncComputeStreams = End; - } - if (End - Start >= Size) { - SyncCompute(0, Size); - } else { - Start %= Size; - End %= Size; - if (Start <= End) { - SyncCompute(Start, End); - } else { - SyncCompute(Start, Size); - SyncCompute(0, End); - } - } - } - { - unsigned int Size = static_cast(TransferStreams.size()); - if (!Size) { - return; - } - std::lock_guard TransferGuard(TransferStreamMutex); - unsigned int Start = LastSyncTransferStreams; - unsigned int End = NumTransferStreams < Size ? NumTransferStreams - : TransferStreamIndex.load(); - if (ResetUsed) { - LastSyncTransferStreams = End; - } - if (End - Start >= Size) { - SyncTransfer(0, Size); - } else { - Start %= Size; - End %= Size; - if (Start <= End) { - SyncTransfer(Start, End); - } else { - SyncTransfer(Start, Size); - SyncTransfer(0, End); - } - } - } - } - - ur_context_handle_t_ *getContext() const { return Context; }; - - ur_device_handle_t_ *get_device() const { return Device; }; - - uint32_t incrementReferenceCount() noexcept { return ++RefCount; } - - uint32_t decrementReferenceCount() noexcept { return --RefCount; } - - uint32_t getReferenceCount() const noexcept { return RefCount; } - - uint32_t getNextEventID() noexcept { return ++EventCount; } - - bool backendHasOwnership() const noexcept { return HasOwnership; } }; // RAII object to make hQueue stream getter methods all return the same stream @@ -286,7 +65,7 @@ class ScopedStream { ScopedStream(ur_queue_handle_t hQueue, uint32_t NumEventsInWaitList, const ur_event_handle_t *EventWaitList) : hQueue{hQueue} { - ur_stream_guard_ Guard; + ur_stream_guard Guard; hQueue->getThreadLocalStream() = hQueue->getNextComputeStream(NumEventsInWaitList, EventWaitList, Guard); } diff --git a/unified-runtime/source/adapters/hip/queue.cpp b/unified-runtime/source/adapters/hip/queue.cpp index 12c96baa0b9ed..d8bf30e9396ef 100644 --- a/unified-runtime/source/adapters/hip/queue.cpp +++ b/unified-runtime/source/adapters/hip/queue.cpp @@ -28,89 +28,17 @@ void ur_queue_handle_t_::transferStreamWaitForBarrierIfNeeded( } } -hipStream_t ur_queue_handle_t_::getNextComputeStream(uint32_t *StreamToken) { - if (getThreadLocalStream() != hipStream_t{0}) - return getThreadLocalStream(); - uint32_t Stream_i; - uint32_t Token; - while (true) { - if (NumComputeStreams < ComputeStreams.size()) { - // the check above is for performance - so as not to lock mutex every time - std::lock_guard guard(ComputeStreamMutex); - // The second check is done after mutex is locked so other threads can not - // change NumComputeStreams after that - if (NumComputeStreams < ComputeStreams.size()) { - UR_CHECK_ERROR(hipStreamCreateWithPriority( - &ComputeStreams[NumComputeStreams++], Flags, Priority)); - } - } - Token = ComputeStreamIdx++; - Stream_i = Token % ComputeStreams.size(); - // if a stream has been reused before it was next selected round-robin - // fashion, we want to delay its next use and instead select another one - // that is more likely to have completed all the enqueued work. - if (DelayCompute[Stream_i]) { - DelayCompute[Stream_i] = false; - } else { - break; - } - } - if (StreamToken) { - *StreamToken = Token; - } - hipStream_t Res = ComputeStreams[Stream_i]; - computeStreamWaitForBarrierIfNeeded(Res, Stream_i); - return Res; +ur_queue_handle_t ur_queue_handle_t_::getEventQueue(const ur_event_handle_t e) { + return e->getQueue(); } -hipStream_t ur_queue_handle_t_::getNextComputeStream( - uint32_t NumEventsInWaitList, const ur_event_handle_t *EventWaitList, - ur_stream_guard &Guard, uint32_t *StreamToken) { - if (getThreadLocalStream() != hipStream_t{0}) - return getThreadLocalStream(); - for (uint32_t i = 0; i < NumEventsInWaitList; i++) { - uint32_t Token = EventWaitList[i]->getComputeStreamToken(); - if (EventWaitList[i]->getQueue() == this && canReuseStream(Token)) { - std::unique_lock ComputeSyncGuard(ComputeStreamSyncMutex); - // redo the check after lock to avoid data races on - // LastSyncComputeStreams - if (canReuseStream(Token)) { - uint32_t Stream_i = Token % DelayCompute.size(); - DelayCompute[Stream_i] = true; - if (StreamToken) { - *StreamToken = Token; - } - Guard = ur_stream_guard{std::move(ComputeSyncGuard)}; - hipStream_t Res = EventWaitList[i]->getStream(); - computeStreamWaitForBarrierIfNeeded(Res, Stream_i); - return Res; - } - } - } - Guard = {}; - return getNextComputeStream(StreamToken); +uint32_t +ur_queue_handle_t_::getEventComputeStreamToken(const ur_event_handle_t e) { + return e->getComputeStreamToken(); } -hipStream_t ur_queue_handle_t_::getNextTransferStream() { - if (getThreadLocalStream() != hipStream_t{0}) - return getThreadLocalStream(); - if (TransferStreams.empty()) { // for example in in-order queue - return getNextComputeStream(); - } - if (NumTransferStreams < TransferStreams.size()) { - // the check above is for performance - so as not to lock mutex every time - std::lock_guard Guard(TransferStreamMutex); - // The second check is done after mutex is locked so other threads can not - // change NumTransferStreams after that - if (NumTransferStreams < TransferStreams.size()) { - UR_CHECK_ERROR(hipStreamCreateWithPriority( - &TransferStreams[NumTransferStreams++], Flags, Priority)); - } - } - uint32_t Stream_i = TransferStreamIdx++ % TransferStreams.size(); - hipStream_t Res = TransferStreams[Stream_i]; - transferStreamWaitForBarrierIfNeeded(Res, Stream_i); - return Res; +hipStream_t ur_queue_handle_t_::getEventStream(const ur_event_handle_t e) { + return e->getStream(); } UR_APIEXPORT ur_result_t UR_APICALL @@ -246,17 +174,13 @@ UR_APIEXPORT ur_result_t UR_APICALL urQueueRelease(ur_queue_handle_t hQueue) { } UR_APIEXPORT ur_result_t UR_APICALL urQueueFinish(ur_queue_handle_t hQueue) { - // set default result to a negative result (avoid false-positve tests) - ur_result_t Result = UR_RESULT_ERROR_OUT_OF_RESOURCES; + ur_result_t Result = UR_RESULT_SUCCESS; try { - ScopedDevice Active(hQueue->getDevice()); - hQueue->syncStreams([&Result](hipStream_t S) { - UR_CHECK_ERROR(hipStreamSynchronize(S)); - Result = UR_RESULT_SUCCESS; - }); + hQueue->syncStreams( + [](hipStream_t S) { UR_CHECK_ERROR(hipStreamSynchronize(S)); }); } catch (ur_result_t Err) { Result = Err; diff --git a/unified-runtime/source/adapters/hip/queue.hpp b/unified-runtime/source/adapters/hip/queue.hpp index ccd94c141992a..b06f681953962 100644 --- a/unified-runtime/source/adapters/hip/queue.hpp +++ b/unified-runtime/source/adapters/hip/queue.hpp @@ -14,93 +14,24 @@ #include #include -using ur_stream_guard = std::unique_lock; +#include /// UR queue mapping on to hipStream_t objects. /// -struct ur_queue_handle_t_ { - using native_type = hipStream_t; - static constexpr int DefaultNumComputeStreams = 64; - static constexpr int DefaultNumTransferStreams = 16; +struct ur_queue_handle_t_ : stream_queue_t { + using stream_queue_t::stream_queue_t; - std::vector ComputeStreams; - std::vector TransferStreams; - // Stream used for recording EvQueue, which holds information about when the - // command in question is enqueued on host, as opposed to started. It is - // created only if profiling is enabled - either for queue or per event. - native_type HostSubmitTimeStream{0}; - // DelayCompute keeps track of which streams have been recently reused and - // their next use should be delayed. If a stream has been recently reused it - // will be skipped the next time it would be selected round-robin style. When - // skipped, its delay flag is cleared. - std::vector DelayCompute; - // keep track of which streams have applied barrier - std::vector ComputeAppliedBarrier; - std::vector TransferAppliedBarrier; - ur_context_handle_t Context; - ur_device_handle_t Device; hipEvent_t BarrierEvent = nullptr; hipEvent_t BarrierTmpEvent = nullptr; - std::atomic_uint32_t RefCount; - std::atomic_uint32_t EventCount; - std::atomic_uint32_t ComputeStreamIdx; - std::atomic_uint32_t TransferStreamIdx; - unsigned int NumComputeStreams; - unsigned int NumTransferStreams; - unsigned int LastSyncComputeStreams; - unsigned int LastSyncTransferStreams; - unsigned int Flags; - ur_queue_flags_t URFlags; - int Priority; - // When ComputeStreamSyncMutex and ComputeStreamMutex both need to be - // locked at the same time, ComputeStreamSyncMutex should be locked first - // to avoid deadlocks - std::mutex ComputeStreamSyncMutex; - std::mutex ComputeStreamMutex; - std::mutex TransferStreamMutex; - std::mutex BarrierMutex; - bool HasOwnership; - ur_queue_handle_t_(std::vector &&ComputeStreams, - std::vector &&TransferStreams, - ur_context_handle_t Context, ur_device_handle_t Device, - unsigned int Flags, ur_queue_flags_t URFlags, int Priority, - bool BackendOwns = true) - : ComputeStreams{std::move(ComputeStreams)}, - TransferStreams{std::move(TransferStreams)}, - DelayCompute(this->ComputeStreams.size(), false), - ComputeAppliedBarrier(this->ComputeStreams.size()), - TransferAppliedBarrier(this->TransferStreams.size()), Context{Context}, - Device{Device}, RefCount{1}, EventCount{0}, ComputeStreamIdx{0}, - TransferStreamIdx{0}, NumComputeStreams{0}, NumTransferStreams{0}, - LastSyncComputeStreams{0}, LastSyncTransferStreams{0}, Flags(Flags), - URFlags(URFlags), Priority(Priority), HasOwnership{BackendOwns} { - urContextRetain(Context); - urDeviceRetain(Device); - } - - ~ur_queue_handle_t_() { - urContextRelease(Context); - urDeviceRelease(Device); - } - - void computeStreamWaitForBarrierIfNeeded(hipStream_t Stream, - uint32_t Stream_i); + void computeStreamWaitForBarrierIfNeeded(hipStream_t Strean, + uint32_t StreamI) override; void transferStreamWaitForBarrierIfNeeded(hipStream_t Stream, - uint32_t Stream_i); - - // getNextCompute/TransferStream() functions return streams from - // appropriate pools in round-robin fashion - native_type getNextComputeStream(uint32_t *StreamToken = nullptr); - // this overload tries select a stream that was used by one of dependencies. - // If that is not possible returns a new stream. If a stream is reused it - // returns a lock that needs to remain locked as long as the stream is in use - native_type getNextComputeStream(uint32_t NumEventsInWaitList, - const ur_event_handle_t *EventWaitList, - ur_stream_guard &Guard, - uint32_t *StreamToken = nullptr); - native_type getNextTransferStream(); - native_type get() { return getNextComputeStream(); }; + uint32_t StreamI) override; + ur_queue_handle_t getEventQueue(const ur_event_handle_t) override; + uint32_t getEventComputeStreamToken(const ur_event_handle_t) override; + hipStream_t getEventStream(const ur_event_handle_t) override; // Function which creates the profiling stream. Called only from makeNative // event when profiling is required. @@ -111,161 +42,11 @@ struct ur_queue_handle_t_ { hipStreamNonBlocking)); }); } - native_type getHostSubmitTimeStream() { return HostSubmitTimeStream; } - - bool hasBeenSynchronized(uint32_t StreamToken) { - // stream token not associated with one of the compute streams - if (StreamToken == std::numeric_limits::max()) { - return false; - } - return LastSyncComputeStreams > StreamToken; - } - - bool canReuseStream(uint32_t StreamToken) { - // stream token not associated with one of the compute streams - if (StreamToken == std::numeric_limits::max()) { - return false; - } - // If the command represented by the stream token was not the last command - // enqueued to the stream we can not reuse the stream - we need to allow for - // commands enqueued after it and the one we are about to enqueue to run - // concurrently - bool IsLastCommand = - (ComputeStreamIdx - StreamToken) <= ComputeStreams.size(); - // If there was a barrier enqueued to the queue after the command - // represented by the stream token we should not reuse the stream, as we can - // not take that stream into account for the bookkeeping for the next - // barrier - such a stream would not be synchronized with. Performance-wise - // it does not matter that we do not reuse the stream, as the work - // represented by the stream token is guaranteed to be complete by the - // barrier before any work we are about to enqueue to the stream will start, - // so the event does not need to be synchronized with. - return IsLastCommand && !hasBeenSynchronized(StreamToken); - } - - template bool allOf(T &&F) { - { - std::lock_guard ComputeGuard(ComputeStreamMutex); - unsigned int End = std::min( - static_cast(ComputeStreams.size()), NumComputeStreams); - if (!std::all_of(ComputeStreams.begin(), ComputeStreams.begin() + End, F)) - return false; - } - { - std::lock_guard TransferGuard(TransferStreamMutex); - unsigned int End = - std::min(static_cast(TransferStreams.size()), - NumTransferStreams); - if (!std::all_of(TransferStreams.begin(), TransferStreams.begin() + End, - F)) - return false; - } - return true; - } - template void forEachStream(T &&F) { - { - std::lock_guard ComputeGuard(ComputeStreamMutex); - unsigned int End = std::min( - static_cast(ComputeStreams.size()), NumComputeStreams); - for (unsigned int i = 0; i < End; i++) { - F(ComputeStreams[i]); - } - } - { - std::lock_guard TransferGuard(TransferStreamMutex); - unsigned int End = - std::min(static_cast(TransferStreams.size()), - NumTransferStreams); - for (unsigned int i = 0; i < End; i++) { - F(TransferStreams[i]); - } - } + void createStreamWithPriority(hipStream_t *Stream, unsigned int Flags, + int Priority) override { + UR_CHECK_ERROR(hipStreamCreateWithPriority(Stream, Flags, Priority)); } - - template void syncStreams(T &&F) { - auto SyncCompute = [&F, &Streams = ComputeStreams, &Delay = DelayCompute]( - unsigned int Start, unsigned int Stop) { - for (unsigned int i = Start; i < Stop; i++) { - F(Streams[i]); - Delay[i] = false; - } - }; - auto SyncTransfer = [&F, &Streams = TransferStreams](unsigned int Start, - unsigned int Stop) { - for (unsigned int i = Start; i < Stop; i++) { - F(Streams[i]); - } - }; - { - unsigned int Size = static_cast(ComputeStreams.size()); - std::lock_guard ComputeSyncGuard(ComputeStreamSyncMutex); - std::lock_guard ComputeGuard(ComputeStreamMutex); - unsigned int Start = LastSyncComputeStreams; - unsigned int End = NumComputeStreams < Size ? NumComputeStreams - : ComputeStreamIdx.load(); - if (End - Start >= Size) { - SyncCompute(0, Size); - } else { - Start %= Size; - End %= Size; - if (Start < End) { - SyncCompute(Start, End); - } else { - SyncCompute(Start, Size); - SyncCompute(0, End); - } - } - if (ResetUsed) { - LastSyncComputeStreams = End; - } - } - { - unsigned int Size = static_cast(TransferStreams.size()); - if (!Size) { - return; - } - std::lock_guard TransferGuard(TransferStreamMutex); - unsigned int Start = LastSyncTransferStreams; - unsigned int End = NumTransferStreams < Size ? NumTransferStreams - : TransferStreamIdx.load(); - if (End - Start >= Size) { - SyncTransfer(0, Size); - } else { - Start %= Size; - End %= Size; - if (Start < End) { - SyncTransfer(Start, End); - } else { - SyncTransfer(Start, Size); - SyncTransfer(0, End); - } - } - if (ResetUsed) { - LastSyncTransferStreams = End; - } - } - } - - // Thread local stream will be used if ScopedStream is active - static hipStream_t &getThreadLocalStream() { - static thread_local hipStream_t stream{0}; - return stream; - } - - ur_context_handle_t getContext() const { return Context; }; - - ur_device_handle_t getDevice() const { return Device; }; - - uint32_t incrementReferenceCount() noexcept { return ++RefCount; } - - uint32_t decrementReferenceCount() noexcept { return --RefCount; } - - uint32_t getReferenceCount() const noexcept { return RefCount; } - - uint32_t getNextEventId() noexcept { return ++EventCount; } - - bool backendHasOwnership() const noexcept { return HasOwnership; } }; // RAII object to make hQueue stream getter methods all return the same stream diff --git a/unified-runtime/source/common/cuda-hip/stream_queue.hpp b/unified-runtime/source/common/cuda-hip/stream_queue.hpp new file mode 100644 index 0000000000000..8952fd04948fb --- /dev/null +++ b/unified-runtime/source/common/cuda-hip/stream_queue.hpp @@ -0,0 +1,344 @@ +/* + * + * Copyright (C) 2025 Intel Corporation + * + * Part of the Unified-Runtime Project, under the Apache License v2.0 with LLVM + * Exceptions. See LICENSE.TXT + * + * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + * + */ + +#pragma once + +#include +#include +#include + +using ur_stream_guard = std::unique_lock; + +/// Generic implementation of an out-of-order UR queue based on in-order +/// backend 'stream' objects. +/// +/// This class is specifically designed for the CUDA and HIP adapters. +template struct stream_queue_t { + using native_type = ST; + static constexpr int DefaultNumComputeStreams = CS; + static constexpr int DefaultNumTransferStreams = TS; + + std::vector ComputeStreams; + std::vector TransferStreams; + // Stream used for recording EvQueue, which holds information about when the + // command in question is enqueued on host, as opposed to started. It is + // created only if profiling is enabled - either for queue or per event. + native_type HostSubmitTimeStream{0}; + // delay_compute_ keeps track of which streams have been recently reused and + // their next use should be delayed. If a stream has been recently reused it + // will be skipped the next time it would be selected round-robin style. When + // skipped, its delay flag is cleared. + std::vector DelayCompute; + // keep track of which streams have applied barrier + std::vector ComputeAppliedBarrier; + std::vector TransferAppliedBarrier; + ur_context_handle_t_ *Context; + ur_device_handle_t_ *Device; + std::atomic_uint32_t RefCount; + std::atomic_uint32_t EventCount; + std::atomic_uint32_t ComputeStreamIndex; + std::atomic_uint32_t TransferStreamIndex; + unsigned int NumComputeStreams; + unsigned int NumTransferStreams; + unsigned int LastSyncComputeStreams; + unsigned int LastSyncTransferStreams; + unsigned int Flags; + ur_queue_flags_t URFlags; + int Priority; + // When ComputeStreamSyncMutex and ComputeStreamMutex both need to be + // locked at the same time, ComputeStreamSyncMutex should be locked first + // to avoid deadlocks + std::mutex ComputeStreamSyncMutex; + std::mutex ComputeStreamMutex; + std::mutex TransferStreamMutex; + std::mutex BarrierMutex; + bool HasOwnership; + + stream_queue_t(std::vector &&ComputeStreams, + std::vector &&TransferStreams, + ur_context_handle_t_ *Context, ur_device_handle_t_ *Device, + unsigned int Flags, ur_queue_flags_t URFlags, int Priority, + bool BackendOwns = true) + : ComputeStreams{std::move(ComputeStreams)}, + TransferStreams{std::move(TransferStreams)}, + DelayCompute(this->ComputeStreams.size(), false), + ComputeAppliedBarrier(this->ComputeStreams.size()), + TransferAppliedBarrier(this->TransferStreams.size()), Context{Context}, + Device{Device}, RefCount{1}, EventCount{0}, ComputeStreamIndex{0}, + TransferStreamIndex{0}, NumComputeStreams{0}, NumTransferStreams{0}, + LastSyncComputeStreams{0}, LastSyncTransferStreams{0}, Flags(Flags), + URFlags(URFlags), Priority(Priority), HasOwnership{BackendOwns} { + urContextRetain(Context); + } + + virtual ~stream_queue_t() { urContextRelease(Context); } + + virtual void computeStreamWaitForBarrierIfNeeded(native_type Strean, + uint32_t StreamI) = 0; + virtual void transferStreamWaitForBarrierIfNeeded(native_type Stream, + uint32_t StreamI) = 0; + virtual void createStreamWithPriority(native_type *Stream, unsigned int Flags, + int Priority) = 0; + virtual ur_queue_handle_t getEventQueue(const ur_event_handle_t) = 0; + virtual uint32_t getEventComputeStreamToken(const ur_event_handle_t) = 0; + virtual native_type getEventStream(const ur_event_handle_t) = 0; + + // get_next_compute/transfer_stream() functions return streams from + // appropriate pools in round-robin fashion + native_type getNextComputeStream(uint32_t *StreamToken = nullptr) { + if (getThreadLocalStream() != native_type{0}) + return getThreadLocalStream(); + uint32_t StreamI; + uint32_t Token; + while (true) { + if (NumComputeStreams < ComputeStreams.size()) { + // the check above is for performance - so as not to lock mutex every + // time + std::lock_guard guard(ComputeStreamMutex); + // The second check is done after mutex is locked so other threads can + // not change NumComputeStreams after that + if (NumComputeStreams < ComputeStreams.size()) { + createStreamWithPriority(&ComputeStreams[NumComputeStreams], Flags, + Priority); + ++NumComputeStreams; + } + } + Token = ComputeStreamIndex++; + StreamI = Token % ComputeStreams.size(); + // if a stream has been reused before it was next selected round-robin + // fashion, we want to delay its next use and instead select another one + // that is more likely to have completed all the enqueued work. + if (DelayCompute[StreamI]) { + DelayCompute[StreamI] = false; + } else { + break; + } + } + if (StreamToken) { + *StreamToken = Token; + } + native_type res = ComputeStreams[StreamI]; + computeStreamWaitForBarrierIfNeeded(res, StreamI); + return res; + } + + // this overload tries select a stream that was used by one of dependencies. + // If that is not possible returns a new stream. If a stream is reused it + // returns a lock that needs to remain locked as long as the stream is in use + native_type getNextComputeStream(uint32_t NumEventsInWaitList, + const ur_event_handle_t *EventWaitList, + ur_stream_guard &Guard, + uint32_t *StreamToken = nullptr) { + if (getThreadLocalStream() != native_type{0}) + return getThreadLocalStream(); + for (uint32_t i = 0; i < NumEventsInWaitList; i++) { + uint32_t Token = getEventComputeStreamToken(EventWaitList[i]); + if (getEventQueue(EventWaitList[i]) == this && canReuseStream(Token)) { + std::unique_lock ComputeSyncGuard(ComputeStreamSyncMutex); + // redo the check after lock to avoid data races on + // LastSyncComputeStreams + if (canReuseStream(Token)) { + uint32_t StreamI = Token % DelayCompute.size(); + DelayCompute[StreamI] = true; + if (StreamToken) { + *StreamToken = Token; + } + Guard = ur_stream_guard{std::move(ComputeSyncGuard)}; + native_type Result = getEventStream(EventWaitList[i]); + computeStreamWaitForBarrierIfNeeded(Result, StreamI); + return Result; + } + } + } + Guard = {}; + return getNextComputeStream(StreamToken); + } + + // Thread local stream will be used if ScopedStream is active + static native_type &getThreadLocalStream() { + static thread_local native_type stream{0}; + return stream; + } + + native_type getNextTransferStream() { + if (getThreadLocalStream() != native_type{0}) + return getThreadLocalStream(); + if (TransferStreams.empty()) { // for example in in-order queue + return getNextComputeStream(); + } + if (NumTransferStreams < TransferStreams.size()) { + // the check above is for performance - so as not to lock mutex every time + std::lock_guard Guard(TransferStreamMutex); + // The second check is done after mutex is locked so other threads can not + // change NumTransferStreams after that + if (NumTransferStreams < TransferStreams.size()) { + createStreamWithPriority(&TransferStreams[NumTransferStreams], Flags, + Priority); + ++NumTransferStreams; + } + } + uint32_t StreamI = TransferStreamIndex++ % TransferStreams.size(); + native_type Result = TransferStreams[StreamI]; + transferStreamWaitForBarrierIfNeeded(Result, StreamI); + return Result; + } + + native_type get() { return getNextComputeStream(); }; + ur_device_handle_t getDevice() const noexcept { return Device; }; + + native_type getHostSubmitTimeStream() { return HostSubmitTimeStream; } + + bool hasBeenSynchronized(uint32_t StreamToken) { + // stream token not associated with one of the compute streams + if (StreamToken == std::numeric_limits::max()) { + return false; + } + return LastSyncComputeStreams > StreamToken; + } + + bool canReuseStream(uint32_t StreamToken) { + // stream token not associated with one of the compute streams + if (StreamToken == std::numeric_limits::max()) { + return false; + } + // If the command represented by the stream token was not the last command + // enqueued to the stream we can not reuse the stream - we need to allow for + // commands enqueued after it and the one we are about to enqueue to run + // concurrently + bool IsLastCommand = + (ComputeStreamIndex - StreamToken) <= ComputeStreams.size(); + // If there was a barrier enqueued to the queue after the command + // represented by the stream token we should not reuse the stream, as we can + // not take that stream into account for the bookkeeping for the next + // barrier - such a stream would not be synchronized with. Performance-wise + // it does not matter that we do not reuse the stream, as the work + // represented by the stream token is guaranteed to be complete by the + // barrier before any work we are about to enqueue to the stream will start, + // so the event does not need to be synchronized with. + return IsLastCommand && !hasBeenSynchronized(StreamToken); + } + + template bool allOf(T &&F) { + { + std::lock_guard ComputeGuard(ComputeStreamMutex); + unsigned int End = std::min( + static_cast(ComputeStreams.size()), NumComputeStreams); + if (!std::all_of(ComputeStreams.begin(), ComputeStreams.begin() + End, F)) + return false; + } + { + std::lock_guard TransferGuard(TransferStreamMutex); + unsigned int End = + std::min(static_cast(TransferStreams.size()), + NumTransferStreams); + if (!std::all_of(TransferStreams.begin(), TransferStreams.begin() + End, + F)) + return false; + } + return true; + } + + template void forEachStream(T &&F) { + { + std::lock_guard compute_guard(ComputeStreamMutex); + unsigned int End = std::min( + static_cast(ComputeStreams.size()), NumComputeStreams); + for (unsigned int i = 0; i < End; i++) { + F(ComputeStreams[i]); + } + } + { + std::lock_guard transfer_guard(TransferStreamMutex); + unsigned int End = + std::min(static_cast(TransferStreams.size()), + NumTransferStreams); + for (unsigned int i = 0; i < End; i++) { + F(TransferStreams[i]); + } + } + } + + template void syncStreams(T &&F) { + auto SyncCompute = [&F, &Streams = ComputeStreams, &Delay = DelayCompute]( + unsigned int Start, unsigned int Stop) { + for (unsigned int i = Start; i < Stop; i++) { + F(Streams[i]); + Delay[i] = false; + } + }; + auto SyncTransfer = [&F, &streams = TransferStreams](unsigned int Start, + unsigned int Stop) { + for (unsigned int i = Start; i < Stop; i++) { + F(streams[i]); + } + }; + { + unsigned int Size = static_cast(ComputeStreams.size()); + std::lock_guard ComputeSyncGuard(ComputeStreamSyncMutex); + std::lock_guard ComputeGuard(ComputeStreamMutex); + unsigned int Start = LastSyncComputeStreams; + unsigned int End = NumComputeStreams < Size ? NumComputeStreams + : ComputeStreamIndex.load(); + if (ResetUsed) { + LastSyncComputeStreams = End; + } + if (End - Start >= Size) { + SyncCompute(0, Size); + } else { + Start %= Size; + End %= Size; + if (Start <= End) { + SyncCompute(Start, End); + } else { + SyncCompute(Start, Size); + SyncCompute(0, End); + } + } + } + { + unsigned int Size = static_cast(TransferStreams.size()); + if (!Size) { + return; + } + std::lock_guard TransferGuard(TransferStreamMutex); + unsigned int Start = LastSyncTransferStreams; + unsigned int End = NumTransferStreams < Size ? NumTransferStreams + : TransferStreamIndex.load(); + if (ResetUsed) { + LastSyncTransferStreams = End; + } + if (End - Start >= Size) { + SyncTransfer(0, Size); + } else { + Start %= Size; + End %= Size; + if (Start <= End) { + SyncTransfer(Start, End); + } else { + SyncTransfer(Start, Size); + SyncTransfer(0, End); + } + } + } + } + + ur_context_handle_t_ *getContext() const { return Context; }; + + uint32_t incrementReferenceCount() noexcept { return ++RefCount; } + + uint32_t decrementReferenceCount() noexcept { return --RefCount; } + + uint32_t getReferenceCount() const noexcept { return RefCount; } + + uint32_t getNextEventId() noexcept { return ++EventCount; } + + bool backendHasOwnership() const noexcept { return HasOwnership; } +};