diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index df7d6e795..e7adc177e 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -7,6 +7,9 @@ arch: x86_64 command: "julia --project -e 'using Pkg; Pkg.develop(;path=\"lib/TimespanLogging\")'" +.gputest: &gputest + if: build.message !~ /\[skip tests\]/ + .bench: &bench if: build.message =~ /\[run benchmarks\]/ agents: @@ -87,6 +90,83 @@ steps: - JuliaCI/julia-coverage#v1: codecov: true + - label: Julia 1.11 (CUDA) + timeout_in_minutes: 20 + <<: *gputest + plugins: + - JuliaCI/julia#v1: + version: "1.11" + - JuliaCI/julia-test#v1: ~ + - JuliaCI/julia-coverage#v1: + codecov: true + agents: + queue: "juliagpu" + cuda: "*" + env: + CI_USE_CUDA: "1" + + - label: Julia 1.11 (ROCm) + timeout_in_minutes: 20 + <<: *gputest + plugins: + - JuliaCI/julia#v1: + version: "1.11" + - JuliaCI/julia-test#v1: ~ + - JuliaCI/julia-coverage#v1: + codecov: true + agents: + queue: "juliagpu" + rocm: "*" + env: + CI_USE_ROCM: "1" + + - label: Julia 1.11 (oneAPI) + timeout_in_minutes: 20 + <<: *gputest + plugins: + - JuliaCI/julia#v1: + version: "1.11" + - JuliaCI/julia-test#v1: ~ + - JuliaCI/julia-coverage#v1: + codecov: true + agents: + queue: "juliagpu" + intel: "*" + env: + CI_USE_ONEAPI: "1" + + - label: Julia 1.11 (Metal) + timeout_in_minutes: 20 + <<: *gputest + plugins: + - JuliaCI/julia#v1: + version: "1.11" + - JuliaCI/julia-test#v1: ~ + - JuliaCI/julia-coverage#v1: + codecov: true + agents: + queue: "juliaecosystem" + os: "macos" + arch: "aarch64" + env: + CI_USE_METAL: "1" + + - label: Julia 1.11 (OpenCL) + timeout_in_minutes: 20 + <<: *gputest + plugins: + - JuliaCI/julia#v1: + version: "1.11" + - JuliaCI/julia-test#v1: + - JuliaCI/julia-coverage#v1: + codecov: true + agents: + queue: "juliaecosystem" + os: linux + arch: x86_64 + env: + CI_USE_OPENCL: "1" + - label: Julia 1 - TimespanLogging timeout_in_minutes: 20 <<: *test diff --git a/Project.toml b/Project.toml index a89547522..c176a40fc 100644 --- a/Project.toml +++ b/Project.toml @@ -7,7 +7,9 @@ Adapt = "79e6a3ab-5dfb-504d-930d-738a2a938a0e" DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" DistributedNext = "fab6aee4-877b-4bac-a744-3eca44acbb6f" +FillArrays = "1a297f60-69ca-5386-bcde-b61e274b549b" Graphs = "86223c79-3864-5bf0-83f7-82e725a168b6" +KernelAbstractions = "63c18a36-062a-441e-b654-da1e3ab1ce7c" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" MacroTools = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09" MemPool = "f9f48841-c794-520a-933b-121f7ba6ed94" @@ -28,35 +30,52 @@ TimespanLogging = "a526e669-04d3-4846-9525-c66122c55f63" UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" [weakdeps] +AMDGPU = "21141c5a-9bdb-4563-92ae-f87d6854732e" +CUDA = "052768ef-5323-5732-b1bb-66c8b64840ba" Colors = "5ae59095-9a9b-59fe-a467-6f913c188581" DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f" GraphViz = "f526b714-d49f-11e8-06ff-31ed36ee7ee0" JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" +Metal = "dde4c033-4e86-420c-a63e-0dd931031962" +OpenCL = "08131aa3-fb12-5dee-8b74-c09406e224a2" +oneAPI = "8f75cd03-7ff8-4ecb-9b8f-daf728133b1b" Plots = "91a5bcdd-55d7-5caf-9e0b-520d859cae80" PythonCall = "6099a3de-0909-46bc-b1f4-468b9a2dfc0d" [extensions] +CUDAExt = "CUDA" DistributionsExt = "Distributions" GraphVizExt = "GraphViz" GraphVizSimpleExt = "Colors" +IntelExt = "oneAPI" JSON3Ext = "JSON3" +MetalExt = "Metal" +OpenCLExt = "OpenCL" PlotsExt = ["DataFrames", "Plots"] PythonExt = "PythonCall" +ROCExt = "AMDGPU" [compat] -Adapt = "4.0.4" +AMDGPU = "1" +Adapt = "4" +CUDA = "3, 4, 5" Colors = "0.12, 0.13" DataFrames = "1" DataStructures = "0.18" DistributedNext = "1.0.0" Distributions = "0.25" +FillArrays = "1.13.0" GraphViz = "0.2" Graphs = "1" JSON3 = "1" +KernelAbstractions = "0.9" MacroTools = "0.5" MemPool = "0.4.12" +Metal = "1.1" OnlineStats = "1" +OpenCL = "0.10" +oneAPI = "1, 2" Plots = "1" PrecompileTools = "1.2" Preferences = "1.4.3" diff --git a/docs/make.jl b/docs/make.jl index 8f1f97f5c..159f7aa8d 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -27,6 +27,7 @@ makedocs(; "Processors" => "processors.md", "Task Queues" => "task-queues.md", "Datadeps" => "datadeps.md", + "GPUs" => "gpu.md", "Option Propagation" => "propagation.md", "Logging and Visualization" => [ "Logging: Basics" => "logging.md", diff --git a/docs/src/gpu.md b/docs/src/gpu.md new file mode 100644 index 000000000..626de7f56 --- /dev/null +++ b/docs/src/gpu.md @@ -0,0 +1,362 @@ +# GPU Support + +Dagger supports GPU acceleration for CUDA, ROCm (AMD), Intel oneAPI, Metal (Apple), and OpenCL devices. GPU support enables automatic data movement between CPU and GPU memory, distributed GPU computing across multiple devices, and seamless integration with Julia's GPU ecosystem. + +Dagger's GPU support is built on top of the [KernelAbstractions.jl](https://github.com/JuliaGPU/KernelAbstractions.jl) package, as well as the specific GPU-specific packages for each backend (e.g. [CUDA.jl](https://github.com/JuliaGPU/CUDA.jl), [AMDGPU.jl](https://github.com/JuliaGPU/AMDGPU.jl), [oneAPI.jl](https://github.com/JuliaGPU/oneAPI.jl), [Metal.jl](https://github.com/JuliaGPU/Metal.jl), and [OpenCL.jl](https://github.com/JuliaGPU/OpenCL.jl)). Dagger's GPU support is designed to be fully interoperable with the Julia GPU ecosystem, allowing you to use Dagger to distribute your GPU computations across multiple devices. + +There are a few ways to use Dagger's GPU support: + +1. **KernelAbstractions**: Use the `KernelAbstractions.jl` interface to write GPU kernels, and then use `Dagger.Kernel` and `Dagger.@spawn` to execute them. +2. **DArray**: Use the `DArray` interface to create distributed GPU arrays, and then call regular array operations on them, which will be automatically executed on the GPU. +3. **Datadeps**: Use the `Datadeps.jl` interface to create GPU-compatible algorithms, within which you can call kernels or array operations. +4. **Manual**: Use `Dagger.gpu_kernel_backend()` to get the appropriate backend for the current processor, and use that to execute kernels. + +In all cases, you need to ensure that right GPU-specific package is loaded. + +## Package Loading + +Dagger's GPU support requires loading one of the following packages: + +- [CUDA.jl](https://github.com/JuliaGPU/CUDA.jl) for NVIDIA GPUs +- [AMDGPU.jl](https://github.com/JuliaGPU/AMDGPU.jl) for AMD GPUs +- [oneAPI.jl](https://github.com/JuliaGPU/oneAPI.jl) for Intel GPUs +- [Metal.jl](https://github.com/JuliaGPU/Metal.jl) for Apple GPUs +- [OpenCL.jl](https://github.com/JuliaGPU/OpenCL.jl) for OpenCL devices + +### Backend Detection + +You can check if a given kind of GPU is supported by calling: + +- CUDA: `Dagger.gpu_can_compute(:CUDA)` +- AMDGPU: `Dagger.gpu_can_compute(:ROC)` +- oneAPI: `Dagger.gpu_can_compute(:oneAPI)` +- Metal: `Dagger.gpu_can_compute(:Metal)` +- OpenCL: `Dagger.gpu_can_compute(:OpenCL)` + +### Backend-Specific Scopes + +Once you've loaded the appropriate package, you can create a scope for that +backend by calling: + +```julia +# First GPU of different GPU types +cuda_scope = Dagger.scope(cuda_gpu=1) +rocm_scope = Dagger.scope(rocm_gpu=1) +intel_scope = Dagger.scope(intel_gpu=1) +metal_scope = Dagger.scope(metal_gpu=1) +opencl_scope = Dagger.scope(cl_device=1) +``` + +These kinds of scopes can be passed to `Dagger.@spawn` or `Dagger.with_options` +to enable GPU acceleration on the given backend. Note that by default, Dagger +will not use any GPU if a compatible scope isn't provided through one of these +mechanisms. + +## KernelAbstractions + +The most direct way to use GPU acceleration in Dagger is through the KernelAbstractions.jl interface. Dagger provides seamless integration with KernelAbstractions, automatically selecting the appropriate backend for the current processor. + +### Basic Kernel Usage + +Write your kernels using the standard KernelAbstractions syntax: + +```julia +using KernelAbstractions + +@kernel function vector_add!(c, a, b) + i = @index(Global, Linear) + c[i] = a[i] + b[i] +end + +@kernel function fill_kernel!(arr, value) + i = @index(Global, Linear) + arr[i] = value +end +``` + +### Using `Dagger.Kernel` for Automatic Backend Selection + +`Dagger.Kernel` wraps your kernel functions and automatically selects the correct backend based on the current processor: + +```julia +# Use in tasks - backend is selected automatically +cpu_array = Dagger.@mutable zeros(1000) +gpu_array = Dagger.@mutable CUDA.zeros(1000) + +# Runs on CPU +fetch(Dagger.@spawn Dagger.Kernel(fill_kernel!)(cpu_array, 42.0; ndrange=length(cpu_array))) + +# Runs on GPU when scoped appropriately +Dagger.with_options(;scope=Dagger.scope(cuda_gpu=1)) do + fetch(Dagger.@spawn Dagger.Kernel(fill_kernel!)(gpu_array, 42.0; ndrange=length(gpu_array))) + + # Synchronize the GPU + Dagger.gpu_synchronize(:CUDA) +end +``` + +Notice the usage of `Dagger.@mutable` to create mutable arrays on the GPU. This +is required when mutating arrays in-place with Dagger-launched kernels. + +### Manual Backend Selection with `gpu_kernel_backend` + +For more control, use `Dagger.gpu_kernel_backend()` to get the backend for the current processor: + +```julia +function manual_kernel_execution(arr, value) + # Get the backend for the current processor + backend = Dagger.gpu_kernel_backend() + + # Create kernel with specific backend + kernel = fill_kernel!(backend) + + # Execute kernel + kernel(arr, value; ndrange=length(arr)) + + return arr +end + +# Use within a Dagger task +arr = Dagger.@mutable CUDA.zeros(1000) +Dagger.with_options(;scope=Dagger.scope(cuda_gpu=1)) do + fetch(Dagger.@spawn manual_kernel_execution(arr, 42.0)) + + Dagger.gpu_synchronize(:CUDA) +end +``` + +### Kernel Synchronization + +Dagger handles synchronization automatically within Dagger tasks, but if you +mixed Dagger-launched and non-Dagger-launched kernels, you can synchronize the +GPU manually: + +```julia +# Launch kernel as a task - Dagger.Kernel handles backend selection automatically +arr = Dagger.@mutable CUDA.zeros(1000) +Dagger.with_options(;scope=Dagger.scope(cuda_gpu=1)) do + result = fetch(Dagger.@spawn Dagger.Kernel(fill_kernel!)(arr, 42.0; ndrange=length(arr))) + + # Synchronize kernels launched by Dagger tasks + Dagger.gpu_synchronize() + + # Launch kernel as a task - Dagger.Kernel handles backend selection automatically + fill_kernel(CUDABackend())(arr, 42.0; ndrange=length(arr)) + + return result +end +``` + +## DArray: Distributed GPU Arrays + +Dagger's `DArray` type seamlessly supports GPU acceleration, allowing you to +create distributed arrays that are automatically allocated in GPU memory when +using appropriate scopes. + +### GPU Array Allocation + +Allocate `DArray`s directly on GPU devices: + +```julia +using CUDA # or AMDGPU, oneAPI, Metal + +# Single GPU allocation +gpu_scope = Dagger.scope(cuda_gpu=1) +Dagger.with_options(;scope=gpu_scope) do + # All standard allocation functions work + DA_rand = rand(Blocks(32, 32), Float32, 128, 128) + DA_ones = ones(Blocks(32, 32), Float32, 128, 128) + DA_zeros = zeros(Blocks(32, 32), Float32, 128, 128) + DA_randn = randn(Blocks(32, 32), Float32, 128, 128) +end +``` + +### Multi-GPU Distribution + +Distribute arrays across multiple GPUs: + +```julia +# Use all available CUDA GPUs +all_gpu_scope = Dagger.scope(cuda_gpus=:) +Dagger.with_options(;scope=all_gpu_scope) do + DA = rand(Blocks(64, 64), Float32, 256, 256) + # Each chunk may be allocated on a different GPU +end + +# Use specific GPUs +multi_gpu_scope = Dagger.scope(cuda_gpus=[1, 2, 3]) +Dagger.with_options(;scope=multi_gpu_scope) do + DA = ones(Blocks(32, 32), Float32, 192, 192) +end +``` + +### Converting Between CPU and GPU Arrays + +Move existing arrays to GPU: + +```julia +# Create CPU DArray +cpu_array = rand(Blocks(32, 32), 128, 128) + +# Convert to GPU +gpu_scope = Dagger.scope(cuda_gpu=1) +Dagger.with_options(;scope=gpu_scope) do + gpu_array = similar(cpu_array) + # gpu_array now has the same structure but is allocated on GPU +end + +# Convert back to CPU +cpu_result = collect(gpu_array) # Brings all data back to CPU +``` + +### (Advanced) Verifying GPU Allocation + +If necessary for testing or debugging, you can check that your `DArray` chunks +are actually living on the GPU: + +```julia +gpu_scope = Dagger.scope(cuda_gpu=1) +Dagger.with_options(;scope=gpu_scope) do + DA = rand(Blocks(4, 4), Float32, 8, 8) + + # Check each chunk + for chunk in DA.chunks + raw_chunk = fetch(chunk; raw=true) + @assert raw_chunk isa Dagger.Chunk{<:CuArray} + + # Verify it's on the correct GPU device + @assert remotecall_fetch(raw_chunk.handle.owner, raw_chunk) do chunk + arr = Dagger.MemPool.poolget(chunk.handle) + return CUDA.device(arr) == CUDA.devices()[1] # GPU 1 + end + end +end +``` + +## Datadeps: GPU-Compatible Algorithms + +Datadeps regions work seamlessly with GPU arrays, enabling complex GPU +algorithms with automatic dependency management. Unlike without Datadeps, you +don't need to use `Dagger.@mutable`, as Datadeps ensures that array mutation is +performed correctly. + +### In-Place GPU Operations + +```julia +using LinearAlgebra + +# Create GPU arrays +gpu_scope = Dagger.scope(cuda_gpu=1) +Dagger.with_options(;scope=gpu_scope) do + DA = rand(Blocks(4, 4), Float32, 8, 8) + DB = rand(Blocks(4, 4), Float32, 8, 8) + DC = zeros(Blocks(4, 4), Float32, 8, 8) + + # In-place matrix multiplication on GPU + Dagger.spawn_datadeps() do + Dagger.@spawn mul!(Out(DC), In(DA), In(DB)) + end + + # Verify result + @assert collect(DC) ≈ collect(DA) * collect(DB) +end +``` + +Notice that we didn't need to call `Dagger.gpu_synchronize()` here, because +the `DArray` is automatically synchronized when the `DArray` is collected. + +### Out-of-Place GPU Operations + +Because Dagger options propagate into function calls, you can call algorithms +that use Datadeps (such as `DArray` matrix multiplication) on GPUs, without +having to do any extra work: + +```julia +gpu_scope = Dagger.scope(cuda_gpu=1) +Dagger.with_options(;scope=gpu_scope) do + DA = rand(Blocks(4, 4), Float32, 8, 8) + DB = rand(Blocks(4, 4), Float32, 8, 8) + + # Out-of-place operations + DC = DA * DB # Automatically runs on GPU + + @assert collect(DC) ≈ collect(DA) * collect(DB) +end +``` + +### Complex GPU Algorithms + +```julia +using LinearAlgebra + +gpu_scope = Dagger.scope(cuda_gpu=1) +Dagger.with_options(;scope=gpu_scope) do + # Create a positive definite matrix for Cholesky decomposition + A = rand(Float32, 8, 8) + A = A * A' + A[diagind(A)] .+= size(A, 1) + DA = DArray(A, Blocks(4, 4)) + + # Cholesky decomposition on GPU + chol_result = cholesky(DA) + @assert collect(chol_result.U) ≈ cholesky(collect(DA)).U +end +``` + +### Cross-Backend Operations + +```julia +# You can even mix different GPU types in a single computation +# (though data movement between different GPU types goes through CPU) + +cuda_data = Dagger.with_options(;scope=Dagger.scope(cuda_gpu=1)) do + rand(Blocks(32, 32), Float32, 64, 64) +end + +rocm_result = Dagger.with_options(;scope=Dagger.scope(rocm_gpu=1)) do + # Data automatically moved: CUDA GPU -> CPU -> ROCm GPU + fetch(Dagger.@spawn sum(cuda_data)) +end +``` + +## Distributed GPU Computing + +You can easily combine GPU acceleration with distributed computing across +multiple workers. + +### Multi-Worker GPU Setup + +```julia +using Distributed +addprocs(4) # Add 4 workers + +@everywhere using Dagger, CUDA + +# Use GPU 1 on worker 2 +distributed_gpu_scope = Dagger.scope(worker=2, cuda_gpu=1) + +Dagger.with_options(;scope=distributed_gpu_scope) do + # Create a GPU array and sum it on worker 2, GPU 1 + DA = rand(Blocks(32, 32), Float32, 128, 128) + result = fetch(Dagger.@spawn sum(DA)) +end +``` + +### Load Balancing Across GPUs and Workers + +```julia +# Distribute work across multiple workers and their GPUs +workers_with_gpus = [ + Dagger.scope(worker=2, cuda_gpu=1), + Dagger.scope(worker=3, cuda_gpu=1), + Dagger.scope(worker=4, rocm_gpu=1) # Mix of GPU types +] + +# Dagger will automatically balance work across available resources +results = map(workers_with_gpus) do scope + Dagger.with_options(;scope) do + DA = rand(Blocks(16, 16), Float32, 64, 64) + fetch(Dagger.@spawn sum(DA)) + end +end +``` \ No newline at end of file diff --git a/docs/src/index.md b/docs/src/index.md index f98e715c1..1d59189f1 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -447,3 +447,148 @@ continuously and writes each random number to a file. The streaming region is terminated when a random number less than 0.01 is generated, which is done by calling `Dagger.finish_stream()` (this terminates the current task, and will also terminate all streaming tasks launched by `spawn_streaming`). + +## Quickstart: GPUs + +Dagger supports GPU acceleration for CUDA, ROCm (AMD), Intel oneAPI, Metal (Apple), and OpenCL devices. GPU support enables automatic data movement between CPU and GPU memory, distributed GPU computing across multiple devices, and seamless integration with Julia's GPU ecosystem. + +For more details: [GPU Support](@ref) + +### Allocate distributed arrays on GPUs + +You can allocate `DArray`s directly on GPUs by using scopes to target specific GPU devices. The arrays will be automatically allocated in GPU memory: + +```julia +using CUDA # or AMDGPU, oneAPI, Metal + +# Allocate a DArray on the first CUDA GPU +gpu_scope = Dagger.scope(cuda_gpu=1) +Dagger.with_options(;scope=gpu_scope) do + DA = rand(Blocks(32, 32), Float32, 128, 128) + # DA is now distributed across GPU memory +end +``` + +You can also target multiple GPUs or all available GPUs: + +```julia +# Use all available CUDA GPUs +all_gpu_scope = Dagger.scope(cuda_gpus=:) +Dagger.with_options(;scope=all_gpu_scope) do + DA = ones(Blocks(64, 64), Float32, 256, 256) +end + +# Use specific GPUs (e.g., GPUs 1 and 2) +multi_gpu_scope = Dagger.scope(cuda_gpus=[1, 2]) +Dagger.with_options(;scope=multi_gpu_scope) do + DA = zeros(Blocks(32, 32), Float32, 128, 128) +end +``` + +### Convert CPU arrays to GPU arrays + +You can move existing CPU `DArray`s to GPU by using `similar` within a GPU scope: + +```julia +# Create a CPU DArray +cpu_array = rand(Blocks(32, 32), 128, 128) + +# Move to GPU +gpu_scope = Dagger.scope(cuda_gpu=1) +Dagger.with_options(;scope=gpu_scope) do + gpu_array = similar(cpu_array) + # gpu_array is now allocated on GPU with same structure as cpu_array +end +``` + +### Run custom GPU kernels with `Dagger.Kernel` + +Dagger integrates with KernelAbstractions.jl to run custom GPU kernels. Use `Dagger.Kernel` to wrap your kernel functions: + +```julia +using KernelAbstractions + +# Define a kernel function +@kernel function vector_add!(c, a, b) + i = @index(Global, Linear) + c[i] = a[i] + b[i] +end + +# Run on GPU +# Note: GPU arrays must be marked @mutable or use Datadeps to ensure mutability +gpu_scope = Dagger.scope(cuda_gpu=1) +a = Dagger.@mutable CUDA.rand(1000) +b = Dagger.@mutable CUDA.rand(1000) +c = Dagger.@mutable CUDA.zeros(1000) +result = Dagger.with_options(;scope=gpu_scope) do + fetch(Dagger.@spawn Dagger.Kernel(vector_add!)(c, a, b; ndrange=length(c))) + # Synchronize the GPU + Dagger.gpu_synchronize(:CUDA) +end +``` + +### Use `gpu_kernel_backend` within tasks + +When writing functions that will run on different devices, use `Dagger.gpu_kernel_backend()` to get the appropriate KernelAbstractions backend for the current processor: + +```julia +@kernel function fill_kernel!(arr, value) + i = @index(Global, Linear) + arr[i] = value +end + +function fill_array_task!(arr, value) + # Get the backend for the current processor (CPU, CUDA, ROCm, etc.) + backend = Dagger.gpu_kernel_backend() + kernel = fill_kernel!(backend) + kernel(arr, value; ndrange=length(arr)) + return arr +end + +# This function works on both CPU and GPU +cpu_array = Dagger.@mutable zeros(1000) +gpu_array = Dagger.@mutable CUDA.zeros(1000) + +# Runs on CPU +fetch(Dagger.@spawn fill_array_task!(cpu_array, 42.0)) + +# Runs on GPU when scoped appropriately +Dagger.with_options(;scope=Dagger.scope(cuda_gpu=1)) do + fetch(Dagger.@spawn fill_array_task!(gpu_array, 42.0)) + + # Synchronize the GPU + Dagger.gpu_synchronize(:CUDA) +end +``` + +### Multi-GPU and multi-backend support + +Dagger supports multiple GPU backends simultaneously. You can specify different GPU types using their respective scope keywords: + +```julia +# CUDA GPUs +cuda_scope = Dagger.scope(cuda_gpu=1) + +# ROCm/AMD GPUs +rocm_scope = Dagger.scope(rocm_gpu=1) + +# Intel GPUs +intel_scope = Dagger.scope(intel_gpu=1) + +# Metal GPUs (Apple) +metal_scope = Dagger.scope(metal_gpu=1) + +# OpenCL devices +opencl_scope = Dagger.scope(cl_device=1) +``` + +You can also combine GPU scopes with worker scopes for distributed GPU computing: + +```julia +# Use CUDA GPU 1 on worker 2 +distributed_gpu_scope = Dagger.scope(worker=2, cuda_gpu=1) +Dagger.with_options(;scope=distributed_gpu_scope) do + DA = rand(Blocks(32, 32), Float32, 128, 128) + result = fetch(Dagger.@spawn sum(DA)) +end +``` diff --git a/ext/CUDAExt.jl b/ext/CUDAExt.jl new file mode 100644 index 000000000..0e12b4ca2 --- /dev/null +++ b/ext/CUDAExt.jl @@ -0,0 +1,359 @@ +module CUDAExt + +export CuArrayDeviceProc + +import Dagger, MemPool +import Dagger: CPURAMMemorySpace, Chunk, unwrap +import MemPool: DRef, poolget +import Distributed: myid, remotecall_fetch +import LinearAlgebra +using KernelAbstractions, Adapt + +const CPUProc = Union{Dagger.OSProc,Dagger.ThreadProc} + +if isdefined(Base, :get_extension) + import CUDA +else + import ..CUDA +end +import CUDA: CuDevice, CuContext, CuStream, CuArray, CUDABackend +import CUDA: devices, attribute, context, context!, stream, stream! +import CUDA: CUBLAS, CUSOLVER + +using UUIDs + +"Represents a single CUDA GPU device." +struct CuArrayDeviceProc <: Dagger.Processor + owner::Int + device::Int + device_uuid::UUID +end +Dagger.get_parent(proc::CuArrayDeviceProc) = Dagger.OSProc(proc.owner) +Dagger.root_worker_id(proc::CuArrayDeviceProc) = proc.owner +Base.show(io::IO, proc::CuArrayDeviceProc) = + print(io, "CuArrayDeviceProc(worker $(proc.owner), device $(proc.device), uuid $(proc.device_uuid))") +Dagger.short_name(proc::CuArrayDeviceProc) = "W: $(proc.owner), CUDA: $(proc.device)" +Dagger.@gpuproc(CuArrayDeviceProc, CuArray) + +"Represents the memory space of a single CUDA GPU's VRAM." +struct CUDAVRAMMemorySpace <: Dagger.MemorySpace + owner::Int + device::Int + device_uuid::UUID +end +Dagger.root_worker_id(space::CUDAVRAMMemorySpace) = space.owner +function Dagger.memory_space(x::CuArray) + dev = CUDA.device(x) + device_id = dev.handle + device_uuid = CUDA.uuid(dev) + return CUDAVRAMMemorySpace(myid(), device_id, device_uuid) +end + +Dagger.memory_spaces(proc::CuArrayDeviceProc) = Set([CUDAVRAMMemorySpace(proc.owner, proc.device, proc.device_uuid)]) +Dagger.processors(space::CUDAVRAMMemorySpace) = Set([CuArrayDeviceProc(space.owner, space.device, space.device_uuid)]) + +function to_device(proc::CuArrayDeviceProc) + @assert Dagger.root_worker_id(proc) == myid() + return DEVICES[proc.device] +end +function to_context(proc::CuArrayDeviceProc) + @assert Dagger.root_worker_id(proc) == myid() + return CONTEXTS[proc.device] +end +to_context(handle::Integer) = CONTEXTS[handle] +to_context(dev::CuDevice) = to_context(dev.handle) + +function with_context!(handle::Integer) + context!(CONTEXTS[handle]) + stream!(STREAMS[handle]) +end +function with_context!(proc::CuArrayDeviceProc) + @assert Dagger.root_worker_id(proc) == myid() + with_context!(proc.device) +end +function with_context!(space::CUDAVRAMMemorySpace) + @assert Dagger.root_worker_id(space) == myid() + with_context!(space.device) +end +function with_context(f, x) + old_ctx = context() + old_stream = stream() + + with_context!(x) + try + f() + finally + context!(old_ctx) + stream!(old_stream) + end +end + +function _sync_with_context(x::Union{Dagger.Processor,Dagger.MemorySpace}) + with_context(x) do + CUDA.synchronize() + end +end +function sync_with_context(x::Union{Dagger.Processor,Dagger.MemorySpace}) + if Dagger.root_worker_id(x) == myid() + _sync_with_context(x) + else + # Do nothing, as we have received our value over a serialization + # boundary, which should synchronize for us + end +end + +# Allocations +Dagger.allocate_array_func(::CuArrayDeviceProc, ::typeof(rand)) = CUDA.rand +Dagger.allocate_array_func(::CuArrayDeviceProc, ::typeof(randn)) = CUDA.randn +Dagger.allocate_array_func(::CuArrayDeviceProc, ::typeof(ones)) = CUDA.ones +Dagger.allocate_array_func(::CuArrayDeviceProc, ::typeof(zeros)) = CUDA.zeros +struct AllocateUndef{S} end +(::AllocateUndef{S})(T, dims::Dims{N}) where {S,N} = CuArray{S,N}(undef, dims) +Dagger.allocate_array_func(::CuArrayDeviceProc, ::Dagger.AllocateUndef{S}) where S = AllocateUndef{S}() + +# In-place +# N.B. These methods assume that later operations will implicitly or +# explicitly synchronize with their associated stream +function Dagger.move!(to_space::Dagger.CPURAMMemorySpace, from_space::CUDAVRAMMemorySpace, to::AbstractArray{T,N}, from::AbstractArray{T,N}) where {T,N} + if Dagger.root_worker_id(from_space) == myid() + sync_with_context(from_space) + with_context!(from_space) + end + copyto!(to, from) + # N.B. DtoH will synchronize + return +end +function Dagger.move!(to_space::CUDAVRAMMemorySpace, from_space::Dagger.CPURAMMemorySpace, to::AbstractArray{T,N}, from::AbstractArray{T,N}) where {T,N} + with_context!(to_space) + copyto!(to, from) + return +end +function Dagger.move!(to_space::CUDAVRAMMemorySpace, from_space::CUDAVRAMMemorySpace, to::AbstractArray{T,N}, from::AbstractArray{T,N}) where {T,N} + sync_with_context(from_space) + with_context!(to_space) + copyto!(to, from) + return +end + +# Out-of-place HtoD +function Dagger.move(from_proc::CPUProc, to_proc::CuArrayDeviceProc, x) + with_context(to_proc) do + arr = adapt(CuArray, x) + CUDA.synchronize() + return arr + end +end +function Dagger.move(from_proc::CPUProc, to_proc::CuArrayDeviceProc, x::Chunk) + from_w = Dagger.root_worker_id(from_proc) + to_w = Dagger.root_worker_id(to_proc) + @assert myid() == to_w + cpu_data = remotecall_fetch(unwrap, from_w, x) + with_context(to_proc) do + arr = adapt(CuArray, cpu_data) + CUDA.synchronize() + return arr + end +end +function Dagger.move(from_proc::CPUProc, to_proc::CuArrayDeviceProc, x::CuArray) + if CUDA.device(x) == to_device(to_proc) + return x + end + with_context(to_proc) do + _x = similar(x) + copyto!(_x, x) + CUDA.synchronize() + return _x + end +end + +# Out-of-place DtoH +function Dagger.move(from_proc::CuArrayDeviceProc, to_proc::CPUProc, x) + with_context(from_proc) do + CUDA.synchronize() + _x = adapt(Array, x) + CUDA.synchronize() + return _x + end +end +function Dagger.move(from_proc::CuArrayDeviceProc, to_proc::CPUProc, x::Chunk) + from_w = Dagger.root_worker_id(from_proc) + to_w = Dagger.root_worker_id(to_proc) + @assert myid() == to_w + remotecall_fetch(from_w, x) do x + arr = unwrap(x) + return Dagger.move(from_proc, to_proc, arr) + end +end +function Dagger.move(from_proc::CuArrayDeviceProc, to_proc::CPUProc, x::CuArray{T,N}) where {T,N} + with_context(from_proc) do + CUDA.synchronize() + _x = Array{T,N}(undef, size(x)) + copyto!(_x, x) + CUDA.synchronize() + return _x + end +end + +# Out-of-place DtoD +function Dagger.move(from_proc::CuArrayDeviceProc, to_proc::CuArrayDeviceProc, x::Dagger.Chunk{T}) where T<:CuArray + if from_proc == to_proc + # Same process and GPU, no change + arr = unwrap(x) + with_context(CUDA.synchronize, from_proc) + return arr + elseif Dagger.root_worker_id(from_proc) == Dagger.root_worker_id(to_proc) + # Same process but different GPUs, use DtoD copy + from_arr = unwrap(x) + with_context(CUDA.synchronize, from_proc) + return with_context(to_proc) do + to_arr = similar(from_arr) + copyto!(to_arr, from_arr) + CUDA.synchronize() + return to_arr + end + elseif Dagger.system_uuid(from_proc.owner) == Dagger.system_uuid(to_proc.owner) && from_proc.device_uuid == to_proc.device_uuid + # Same node, we can use IPC + ipc_handle, eT, shape = remotecall_fetch(from_proc.owner, x) do x + arr = unwrap(x) + ipc_handle_ref = Ref{CUDA.CUipcMemHandle}() + GC.@preserve arr begin + CUDA.cuIpcGetMemHandle(ipc_handle_ref, pointer(arr)) + end + (ipc_handle_ref[], eltype(arr), size(arr)) + end + r_ptr = Ref{CUDA.CUdeviceptr}() + CUDA.device!(from_proc.device) do + CUDA.cuIpcOpenMemHandle(r_ptr, ipc_handle, CUDA.CU_IPC_MEM_LAZY_ENABLE_PEER_ACCESS) + end + ptr = Base.unsafe_convert(CUDA.CuPtr{eT}, r_ptr[]) + arr = unsafe_wrap(CuArray, ptr, shape; own=false) + finalizer(arr) do arr + CUDA.cuIpcCloseMemHandle(pointer(arr)) + end + if from_proc.device_uuid != to_proc.device_uuid + return CUDA.device!(to_proc.device) do + to_arr = similar(arr) + copyto!(to_arr, arr) + to_arr + end + else + return arr + end + else + # Different node, use DtoH, serialization, HtoD + return CuArray(remotecall_fetch(from_proc.owner, x) do x + Array(unwrap(x)) + end) + end +end + +# Adapt generic functions +Dagger.move(from_proc::CPUProc, to_proc::CuArrayDeviceProc, x::Function) = x +Dagger.move(from_proc::CPUProc, to_proc::CuArrayDeviceProc, x::Chunk{T}) where {T<:Function} = + Dagger.move(from_proc, to_proc, fetch(x)) + +# Adapt BLAS/LAPACK functions +import LinearAlgebra: BLAS, LAPACK +for lib in [BLAS, LAPACK] + for name in names(lib; all=true) + name == nameof(lib) && continue + startswith(string(name), '#') && continue + endswith(string(name), '!') || continue + + for culib in [CUBLAS, CUSOLVER] + if name in names(culib; all=true) + fn = getproperty(lib, name) + cufn = getproperty(culib, name) + @eval Dagger.move(from_proc::CPUProc, to_proc::CuArrayDeviceProc, ::$(typeof(fn))) = $cufn + end + end + end +end + +# Task execution +function Dagger.execute!(proc::CuArrayDeviceProc, f, args...; kwargs...) + @nospecialize f args kwargs + tls = Dagger.get_tls() + task = Threads.@spawn begin + Dagger.set_tls!(tls) + with_context!(proc) + result = Base.@invokelatest f(args...; kwargs...) + # N.B. Synchronization must be done when accessing result or args + return result + end + + try + fetch(task) + catch err + stk = current_exceptions(task) + err, frames = stk[1] + rethrow(CapturedException(err, frames)) + end +end + +Dagger.gpu_processor(::Val{:CUDA}) = CuArrayDeviceProc +Dagger.gpu_can_compute(::Val{:CUDA}) = CUDA.has_cuda() +Dagger.gpu_kernel_backend(::CuArrayDeviceProc) = CUDABackend() +Dagger.gpu_with_device(f, proc::CuArrayDeviceProc) = + CUDA.device!(f, proc.device) +function Dagger.gpu_synchronize(proc::CuArrayDeviceProc) + with_context(proc) do + CUDA.synchronize() + end +end +function Dagger.gpu_synchronize(::Val{:CUDA}) + for dev in CUDA.devices() + _sync_with_context(CuArrayDeviceProc(myid(), dev.handle, CUDA.uuid(dev))) + end +end + +Dagger.to_scope(::Val{:cuda_gpu}, sc::NamedTuple) = + Dagger.to_scope(Val{:cuda_gpus}(), merge(sc, (;cuda_gpus=[sc.cuda_gpu]))) +Dagger.scope_key_precedence(::Val{:cuda_gpu}) = 1 +function Dagger.to_scope(::Val{:cuda_gpus}, sc::NamedTuple) + if haskey(sc, :worker) + workers = Int[sc.worker] + elseif haskey(sc, :workers) && sc.workers != Colon() + workers = sc.workers + else + workers = map(gproc->gproc.pid, Dagger.procs(Dagger.Sch.eager_context())) + end + scopes = Dagger.ExactScope[] + dev_ids = sc.cuda_gpus + for worker in workers + procs = Dagger.get_processors(Dagger.OSProc(worker)) + for proc in procs + proc isa CuArrayDeviceProc || continue + if dev_ids == Colon() || proc.device+1 in dev_ids + scope = Dagger.ExactScope(proc) + push!(scopes, scope) + end + end + end + return Dagger.UnionScope(scopes) +end +Dagger.scope_key_precedence(::Val{:cuda_gpus}) = 1 + +const DEVICES = Dict{Int, CuDevice}() +const CONTEXTS = Dict{Int, CuContext}() +const STREAMS = Dict{Int, CuStream}() + +function __init__() + if CUDA.has_cuda() + for dev in CUDA.devices() + @debug "Registering CUDA GPU processor with Dagger: $dev" + Dagger.add_processor_callback!("cuarray_device_$(dev.handle)") do + proc = CuArrayDeviceProc(myid(), dev.handle, CUDA.uuid(dev)) + DEVICES[dev.handle] = dev + ctx = context(dev) + CONTEXTS[dev.handle] = ctx + context!(ctx) do + STREAMS[dev.handle] = stream() + end + return proc + end + end + end +end + +end # module CUDAExt diff --git a/ext/IntelExt.jl b/ext/IntelExt.jl new file mode 100644 index 000000000..95df7169f --- /dev/null +++ b/ext/IntelExt.jl @@ -0,0 +1,328 @@ +module IntelExt + +export oneArrayDeviceProc + +import Dagger, MemPool +import Dagger: CPURAMMemorySpace, Chunk, unwrap +import MemPool: DRef, poolget +import Distributed: myid, remotecall_fetch +import LinearAlgebra +using KernelAbstractions, Adapt + +const CPUProc = Union{Dagger.OSProc,Dagger.ThreadProc} + +if isdefined(Base, :get_extension) + import oneAPI +else + import ..oneAPI +end +import oneAPI: ZeDevice, ZeDriver, ZeContext, oneArray, oneAPIBackend +import oneAPI: driver, driver!, device, device!, context, context! +#import oneAPI: CUBLAS, CUSOLVER + +using UUIDs + +"Represents a single Intel GPU device." +struct oneArrayDeviceProc <: Dagger.Processor + owner::Int + device_id::Int +end +Dagger.get_parent(proc::oneArrayDeviceProc) = Dagger.OSProc(proc.owner) +Dagger.root_worker_id(proc::oneArrayDeviceProc) = proc.owner +Base.show(io::IO, proc::oneArrayDeviceProc) = + print(io, "oneArrayDeviceProc(worker $(proc.owner), device $(proc.device_id))") +Dagger.short_name(proc::oneArrayDeviceProc) = "W: $(proc.owner), oneAPI: $(proc.device)" +Dagger.@gpuproc(oneArrayDeviceProc, oneArray) + +"Represents the memory space of a single Intel GPU's VRAM." +struct IntelVRAMMemorySpace <: Dagger.MemorySpace + owner::Int + device_id::Int +end +Dagger.root_worker_id(space::IntelVRAMMemorySpace) = space.owner +function Dagger.memory_space(x::oneArray) + dev = oneAPI.device(x) + device_id = _device_id(dev) + return IntelVRAMMemorySpace(myid(), device_id) +end +_device_id(dev::ZeDevice) = findfirst(other_dev->other_dev === dev, collect(oneAPI.devices())) + +Dagger.memory_spaces(proc::oneArrayDeviceProc) = Set([IntelVRAMMemorySpace(proc.owner, proc.device_id)]) +Dagger.processors(space::IntelVRAMMemorySpace) = Set([oneArrayDeviceProc(space.owner, space.device_id)]) + +function to_device(proc::oneArrayDeviceProc) + @assert Dagger.root_worker_id(proc) == myid() + return DEVICES[proc.device_id] +end + +function with_context!(device_id::Integer) + driver!(DRIVERS[device_id]) + device!(DEVICES[device_id]) + context!(CONTEXTS[device_id]) +end +function with_context!(proc::oneArrayDeviceProc) + @assert Dagger.root_worker_id(proc) == myid() + with_context!(proc.device_id) +end +function with_context!(space::IntelVRAMMemorySpace) + @assert Dagger.root_worker_id(space) == myid() + with_context!(space.device_id) +end +function with_context(f, x) + old_drv = driver() + old_dev = device() + old_ctx = context() + + with_context!(x) + try + f() + finally + driver!(old_drv) + device!(old_dev) + context!(old_ctx) + end +end + +function _sync_with_context(x::Union{Dagger.Processor,Dagger.MemorySpace}) + with_context(x) do + oneAPI.synchronize() + end +end +function sync_with_context(x::Union{Dagger.Processor,Dagger.MemorySpace}) + if Dagger.root_worker_id(x) == myid() + _sync_with_context(x) + else + # Do nothing, as we have received our value over a serialization + # boundary, which should synchronize for us + end +end + +# Allocations +Dagger.allocate_array_func(::oneArrayDeviceProc, ::typeof(rand)) = oneAPI.rand +Dagger.allocate_array_func(::oneArrayDeviceProc, ::typeof(randn)) = oneAPI.randn +Dagger.allocate_array_func(::oneArrayDeviceProc, ::typeof(ones)) = oneAPI.ones +Dagger.allocate_array_func(::oneArrayDeviceProc, ::typeof(zeros)) = oneAPI.zeros +struct AllocateUndef{S} end +(::AllocateUndef{S})(T, dims::Dims{N}) where {S,N} = oneArray{S,N}(undef, dims) +Dagger.allocate_array_func(::oneArrayDeviceProc, ::Dagger.AllocateUndef{S}) where S = AllocateUndef{S}() + +# In-place +# N.B. These methods assume that later operations will implicitly or +# explicitly synchronize with their associated stream +function Dagger.move!(to_space::Dagger.CPURAMMemorySpace, from_space::IntelVRAMMemorySpace, to::AbstractArray{T,N}, from::AbstractArray{T,N}) where {T,N} + if Dagger.root_worker_id(from_space) == myid() + sync_with_context(from_space) + with_context!(from_space) + end + copyto!(to, from) + # N.B. DtoH will synchronize + return +end +function Dagger.move!(to_space::IntelVRAMMemorySpace, from_space::Dagger.CPURAMMemorySpace, to::AbstractArray{T,N}, from::AbstractArray{T,N}) where {T,N} + with_context!(to_space) + copyto!(to, from) + return +end +function Dagger.move!(to_space::IntelVRAMMemorySpace, from_space::IntelVRAMMemorySpace, to::AbstractArray{T,N}, from::AbstractArray{T,N}) where {T,N} + sync_with_context(from_space) + with_context!(to_space) + copyto!(to, from) + return +end + +# Out-of-place HtoD +function Dagger.move(from_proc::CPUProc, to_proc::oneArrayDeviceProc, x) + with_context(to_proc) do + arr = adapt(oneArray, x) + oneAPI.synchronize() + return arr + end +end +function Dagger.move(from_proc::CPUProc, to_proc::oneArrayDeviceProc, x::Chunk) + from_w = Dagger.root_worker_id(from_proc) + to_w = Dagger.root_worker_id(to_proc) + @assert myid() == to_w + cpu_data = remotecall_fetch(unwrap, from_w, x) + with_context(to_proc) do + arr = adapt(oneArray, cpu_data) + oneAPI.synchronize() + return arr + end +end +function Dagger.move(from_proc::CPUProc, to_proc::oneArrayDeviceProc, x::oneArray) + if oneAPI.device(x) == to_device(to_proc) + return x + end + with_context(to_proc) do + _x = similar(x) + copyto!(_x, x) + oneAPI.synchronize() + return _x + end +end + +# Out-of-place DtoH +function Dagger.move(from_proc::oneArrayDeviceProc, to_proc::CPUProc, x) + with_context(from_proc) do + oneAPI.synchronize() + _x = adapt(Array, x) + oneAPI.synchronize() + return _x + end +end +function Dagger.move(from_proc::oneArrayDeviceProc, to_proc::CPUProc, x::Chunk) + from_w = Dagger.root_worker_id(from_proc) + to_w = Dagger.root_worker_id(to_proc) + @assert myid() == to_w + remotecall_fetch(from_w, x) do x + arr = unwrap(x) + return Dagger.move(from_proc, to_proc, arr) + end +end +function Dagger.move(from_proc::oneArrayDeviceProc, to_proc::CPUProc, x::oneArray{T,N}) where {T,N} + with_context(from_proc) do + oneAPI.synchronize() + _x = Array{T,N}(undef, size(x)) + copyto!(_x, x) + oneAPI.synchronize() + return _x + end +end + +# Out-of-place DtoD +function Dagger.move(from_proc::oneArrayDeviceProc, to_proc::oneArrayDeviceProc, x::Dagger.Chunk{T}) where T<:oneArray + if from_proc == to_proc + # Same process and GPU, no change + arr = unwrap(x) + with_context(oneAPI.synchronize, from_proc) + return arr + elseif Dagger.root_worker_id(from_proc) == Dagger.root_worker_id(to_proc) + # Same process but different GPUs, use DtoD copy + from_arr = unwrap(x) + with_context(oneAPI.synchronize, from_proc) + return with_context(to_proc) do + to_arr = similar(from_arr) + copyto!(to_arr, from_arr) + oneAPI.synchronize() + return to_arr + end + else + # Different node, use DtoH, serialization, HtoD + return oneArray(remotecall_fetch(from_proc.owner, x) do x + Array(unwrap(x)) + end) + end +end + +# Adapt generic functions +Dagger.move(from_proc::CPUProc, to_proc::oneArrayDeviceProc, x::Function) = x +Dagger.move(from_proc::CPUProc, to_proc::oneArrayDeviceProc, x::Chunk{T}) where {T<:Function} = + Dagger.move(from_proc, to_proc, fetch(x)) + +#= FIXME: Adapt BLAS/LAPACK functions +import LinearAlgebra: BLAS, LAPACK +for lib in [BLAS, LAPACK] + for name in names(lib; all=true) + name == nameof(lib) && continue + startswith(string(name), '#') && continue + endswith(string(name), '!') || continue + + for culib in [CUBLAS, CUSOLVER] + if name in names(culib; all=true) + fn = getproperty(lib, name) + cufn = getproperty(culib, name) + @eval Dagger.move(from_proc::CPUProc, to_proc::oneArrayDeviceProc, ::$(typeof(fn))) = $cufn + end + end + end +end +=# + +# Task execution +function Dagger.execute!(proc::oneArrayDeviceProc, f, args...; kwargs...) + @nospecialize f args kwargs + tls = Dagger.get_tls() + task = Threads.@spawn begin + Dagger.set_tls!(tls) + with_context!(proc) + result = Base.@invokelatest f(args...; kwargs...) + # N.B. Synchronization must be done when accessing result or args + return result + end + + try + fetch(task) + catch err + stk = current_exceptions(task) + err, frames = stk[1] + rethrow(CapturedException(err, frames)) + end +end + +Dagger.gpu_processor(::Val{:oneAPI}) = oneArrayDeviceProc +Dagger.gpu_can_compute(::Val{:oneAPI}) = oneAPI.functional() +Dagger.gpu_kernel_backend(::oneArrayDeviceProc) = oneAPIBackend() +Dagger.gpu_with_device(f, proc::oneArrayDeviceProc) = + device!(f, proc.device_id) +function Dagger.gpu_synchronize(proc::oneArrayDeviceProc) + with_context(proc) do + oneAPI.synchronize() + end +end +function Dagger.gpu_synchronize(::Val{:oneAPI}) + for dev in oneAPI.devices() + idx = findfirst(other_dev->other_dev == dev, collect(oneAPI.devices())) + _sync_with_context(oneArrayDeviceProc(myid(), idx)) + end +end + +Dagger.to_scope(::Val{:intel_gpu}, sc::NamedTuple) = + Dagger.to_scope(Val{:intel_gpus}(), merge(sc, (;intel_gpus=[sc.intel_gpu]))) +Dagger.scope_key_precedence(::Val{:intel_gpu}) = 1 +function Dagger.to_scope(::Val{:intel_gpus}, sc::NamedTuple) + if haskey(sc, :worker) + workers = Int[sc.worker] + elseif haskey(sc, :workers) && sc.workers != Colon() + workers = sc.workers + else + workers = map(gproc->gproc.pid, Dagger.procs(Dagger.Sch.eager_context())) + end + scopes = Dagger.ExactScope[] + dev_ids = sc.intel_gpus + for worker in workers + procs = Dagger.get_processors(Dagger.OSProc(worker)) + for proc in procs + proc isa oneArrayDeviceProc || continue + if dev_ids == Colon() || proc.device_id in dev_ids + scope = Dagger.ExactScope(proc) + push!(scopes, scope) + end + end + end + return Dagger.UnionScope(scopes) +end +Dagger.scope_key_precedence(::Val{:intel_gpus}) = 1 + +const DEVICES = Dict{Int, ZeDevice}() +const DRIVERS = Dict{Int, ZeDriver}() +const CONTEXTS = Dict{Int, ZeContext}() + +function __init__() + if oneAPI.functional() + for (device_id, dev) in enumerate(oneAPI.devices()) + @debug "Registering Intel GPU processor with Dagger: $dev" + Dagger.add_processor_callback!("zearray_device_$(device_id)") do + proc = oneArrayDeviceProc(myid(), device_id) + DEVICES[device_id] = dev + driver!(dev.driver) + DRIVERS[device_id] = dev.driver + device!(dev) + ctx = ZeContext(dev.driver) + CONTEXTS[device_id] = ctx + return proc + end + end + end +end + +end # module IntelExt diff --git a/ext/MetalExt.jl b/ext/MetalExt.jl new file mode 100644 index 000000000..b9b28cccf --- /dev/null +++ b/ext/MetalExt.jl @@ -0,0 +1,393 @@ +module MetalExt + +export MtlArrayDeviceProc + +import Dagger, MemPool +import Dagger: CPURAMMemorySpace, Chunk, unwrap +import MemPool: DRef, poolget +import Distributed: myid, remotecall_fetch +import LinearAlgebra +using KernelAbstractions, Adapt + +const CPUProc = Union{Dagger.OSProc,Dagger.ThreadProc} + +if isdefined(Base, :get_extension) + import Metal +else + import ..Metal +end +import Metal: MtlArray, MetalBackend +# FIXME: import Metal: MTLBLAS, MTLSOLVER +const MtlDevice = Metal.MTL.MTLDeviceInstance +const MtlStream = Metal.MTL.MTLCommandQueue + +struct MtlArrayDeviceProc <: Dagger.Processor + owner::Int + device_id::UInt64 +end + +Dagger.get_parent(proc::MtlArrayDeviceProc) = Dagger.OSProc(proc.owner) +Dagger.root_worker_id(proc::MtlArrayDeviceProc) = proc.owner +Dagger.short_name(proc::MtlArrayDeviceProc) = "W: $(proc.owner), Metal: $(proc.device_id)" +Dagger.@gpuproc(MtlArrayDeviceProc, MtlArray) + +"Represents the memory space of a single Metal GPU's VRAM." +struct MetalVRAMMemorySpace <: Dagger.MemorySpace + owner::Int + device_id::Int +end +Dagger.root_worker_id(space::MetalVRAMMemorySpace) = space.owner +function Dagger.memory_space(x::MtlArray) + dev = Metal.device(x) + device_id = _device_id(dev) + return MetalVRAMMemorySpace(myid(), device_id) +end +_device_id(dev::MtlDevice) = findfirst(other_dev->other_dev === dev, Metal.devices()) + +Dagger.memory_spaces(proc::MtlArrayDeviceProc) = Set([MetalVRAMMemorySpace(proc.owner, proc.device_id)]) +Dagger.processors(space::MetalVRAMMemorySpace) = Set([MtlArrayDeviceProc(space.owner, space.device_id)]) + +function to_device(proc::MtlArrayDeviceProc) + @assert Dagger.root_worker_id(proc) == myid() + return DEVICES[proc.device_id] +end +function to_context(proc::MtlArrayDeviceProc) + @assert Dagger.root_worker_id(proc) == myid() + return Metal.global_queue() #CONTEXTS[proc.device] +end +to_context(device_id::Integer) = Metal.global_queue(DEVICES[device_id]) +to_context(dev::MtlDevice) = to_context(_device_id(dev)) + +function with_context!(device_id::Integer) +end +function with_context!(proc::MtlArrayDeviceProc) + @assert Dagger.root_worker_id(proc) == myid() +end +function with_context!(space::MetalVRAMMemorySpace) + @assert Dagger.root_worker_id(space) == myid() +end +function with_context(f, x) + with_context!(x) + return f() +end + +function _sync_with_context(x::Union{Dagger.Processor,Dagger.MemorySpace}) + with_context(x) do + Metal.synchronize() + end +end +function sync_with_context(x::Union{Dagger.Processor,Dagger.MemorySpace}) + if Dagger.root_worker_id(x) == myid() + _sync_with_context(x) + else + # Do nothing, as we have received our value over a serialization + # boundary, which should synchronize for us + end +end + +# Allocations +Dagger.allocate_array_func(::MtlArrayDeviceProc, ::typeof(rand)) = Metal.rand +Dagger.allocate_array_func(::MtlArrayDeviceProc, ::typeof(randn)) = Metal.randn +Dagger.allocate_array_func(::MtlArrayDeviceProc, ::typeof(ones)) = Metal.ones +Dagger.allocate_array_func(::MtlArrayDeviceProc, ::typeof(zeros)) = Metal.zeros +struct AllocateUndef{S} end +(::AllocateUndef{S})(T, dims::Dims{N}) where {S,N} = MtlArray{S,N}(undef, dims) +Dagger.allocate_array_func(::MtlArrayDeviceProc, ::Dagger.AllocateUndef{S}) where S = AllocateUndef{S}() + +# In-place +# N.B. These methods assume that later operations will implicitly or +# explicitly synchronize with their associated stream +function Dagger.move!(to_space::Dagger.CPURAMMemorySpace, from_space::MetalVRAMMemorySpace, to::AbstractArray{T,N}, from::AbstractArray{T,N}) where {T,N} + if Dagger.root_worker_id(from_space) == myid() + sync_with_context(from_space) + with_context!(from_space) + end + copyto!(to, from) + # N.B. DtoH will synchronize + return +end +function Dagger.move!(to_space::MetalVRAMMemorySpace, from_space::Dagger.CPURAMMemorySpace, to::AbstractArray{T,N}, from::AbstractArray{T,N}) where {T,N} + with_context!(to_space) + copyto!(to, from) + return +end +function Dagger.move!(to_space::MetalVRAMMemorySpace, from_space::MetalVRAMMemorySpace, to::AbstractArray{T,N}, from::AbstractArray{T,N}) where {T,N} + sync_with_context(from_space) + with_context!(to_space) + copyto!(to, from) + return +end + +# Out-of-place HtoD +function Dagger.move(from_proc::CPUProc, to_proc::MtlArrayDeviceProc, x) + with_context(to_proc) do + arr = adapt(MtlArray, x) + Metal.synchronize() + return arr + end +end +function Dagger.move(from_proc::CPUProc, to_proc::MtlArrayDeviceProc, x::Chunk) + from_w = Dagger.root_worker_id(from_proc) + to_w = Dagger.root_worker_id(to_proc) + @assert myid() == to_w + cpu_data = remotecall_fetch(unwrap, from_w, x) + with_context(to_proc) do + arr = adapt(MtlArray, cpu_data) + Metal.synchronize() + return arr + end +end +function Dagger.move(from_proc::CPUProc, to_proc::MtlArrayDeviceProc, x::MtlArray) + if Metal.device(x) == to_device(to_proc) + return x + end + with_context(to_proc) do + _x = similar(x) + copyto!(_x, x) + Metal.synchronize() + return _x + end +end + +# Out-of-place DtoH +function Dagger.move(from_proc::MtlArrayDeviceProc, to_proc::CPUProc, x) + with_context(from_proc) do + Metal.synchronize() + _x = adapt(Array, x) + Metal.synchronize() + return _x + end +end +function Dagger.move(from_proc::MtlArrayDeviceProc, to_proc::CPUProc, x::Chunk) + from_w = Dagger.root_worker_id(from_proc) + to_w = Dagger.root_worker_id(to_proc) + @assert myid() == to_w + remotecall_fetch(from_w, x) do x + arr = unwrap(x) + return Dagger.move(from_proc, to_proc, arr) + end +end +function Dagger.move(from_proc::MtlArrayDeviceProc, to_proc::CPUProc, x::MtlArray{T,N}) where {T,N} + with_context(from_proc) do + Metal.synchronize() + _x = Array{T,N}(undef, size(x)) + copyto!(_x, x) + Metal.synchronize() + return _x + end +end + +# Out-of-place DtoD +function Dagger.move(from_proc::MtlArrayDeviceProc, to_proc::MtlArrayDeviceProc, x::Dagger.Chunk{T}) where T<:MtlArray + if from_proc == to_proc + # Same process and GPU, no change + arr = unwrap(x) + with_context(Metal.synchronize, from_proc) + return arr + # FIXME: elseif Dagger.root_worker_id(from_proc) == Dagger.root_worker_id(to_proc) + else + # Different node, use DtoH, serialization, HtoD + return MtlArray(remotecall_fetch(from_proc.owner, x) do x + Array(unwrap(x)) + end) + end +end + +# Adapt generic functions +Dagger.move(from_proc::CPUProc, to_proc::MtlArrayDeviceProc, x::Function) = x +Dagger.move(from_proc::CPUProc, to_proc::MtlArrayDeviceProc, x::Chunk{T}) where {T<:Function} = + Dagger.move(from_proc, to_proc, fetch(x)) + +#= FIXME: Adapt BLAS/LAPACK functions +import LinearAlgebra: BLAS, LAPACK +for lib in [BLAS, LAPACK] + for name in names(lib; all=true) + name == nameof(lib) && continue + startswith(string(name), '#') && continue + endswith(string(name), '!') || continue + + for culib in [MTLBLAS, MTLSOLVER] + if name in names(culib; all=true) + fn = getproperty(lib, name) + cufn = getproperty(culib, name) + @eval Dagger.move(from_proc::CPUProc, to_proc::MtlArrayDeviceProc, ::$(typeof(fn))) = $cufn + end + end + end +end +=# + +function Dagger.move_optimized( + from_proc::CPUProc, + to_proc::MtlArrayDeviceProc, + x::Array +) + # FIXME + return nothing + + # If we have unified memory, we can try casting the `Array` to `MtlArray`. + device = _get_metal_device(to_proc) + + if (device !== nothing) && device.hasUnifiedMemory + marray = _cast_array_to_mtlarray(x, device) + marray !== nothing && return marray + end + + return nothing +end +function Dagger.move_optimized( + from_proc::MtlArrayDeviceProc, + to_proc::CPUProc, + x::Array +) + # FIXME + return nothing + + # If we have unified memory, we can just cast the `MtlArray` to an `Array`. + device = _get_metal_device(from_proc) + + if (device !== nothing) && device.hasUnifiedMemory + return unsafe_wrap(Array{T}, x, size(x)) + end + + return nothing +end + +# Task execution +function Dagger.execute!(proc::MtlArrayDeviceProc, f, args...; kwargs...) + @nospecialize f args kwargs + tls = Dagger.get_tls() + task = Threads.@spawn begin + Dagger.set_tls!(tls) + with_context!(proc) + result = Base.@invokelatest f(args...; kwargs...) + # N.B. Synchronization must be done when accessing result or args + return result + end + + try + fetch(task) + catch err + stk = current_exceptions(task) + err, frames = stk[1] + rethrow(CapturedException(err, frames)) + end +end + +function Base.show(io::IO, proc::MtlArrayDeviceProc) + print(io, "MtlArrayDeviceProc(worker $(proc.owner), device $(something(_get_metal_device(proc)).name))") +end + +Dagger.gpu_processor(::Val{:Metal}) = MtlArrayDeviceProc +Dagger.gpu_can_compute(::Val{:Metal}) = Metal.functional() +Dagger.gpu_kernel_backend(proc::MtlArrayDeviceProc) = MetalBackend() +# TODO: Switch devices +Dagger.gpu_with_device(f, proc::MtlArrayDeviceProc) = f() + +function Dagger.gpu_synchronize(proc::MtlArrayDeviceProc)q + with_context(proc) do + Metal.synchronize() + end +end +function Dagger.gpu_synchronize(::Val{:Metal}) + for dev in Metal.devices() + _sync_with_context(MtlArrayDeviceProc(myid(), _device_id(dev))) + end +end + +Dagger.to_scope(::Val{:metal_gpu}, sc::NamedTuple) = + Dagger.to_scope(Val{:metal_gpus}(), merge(sc, (;metal_gpus=[sc.metal_gpu]))) +Dagger.scope_key_precedence(::Val{:metal_gpu}) = 1 +function Dagger.to_scope(::Val{:metal_gpus}, sc::NamedTuple) + if haskey(sc, :worker) + workers = Int[sc.worker] + elseif haskey(sc, :workers) && sc.workers != Colon() + workers = sc.workers + else + workers = map(gproc->gproc.pid, Dagger.procs(Dagger.Sch.eager_context())) + end + scopes = Dagger.ExactScope[] + dev_ids = sc.metal_gpus + for worker in workers + procs = Dagger.get_processors(Dagger.OSProc(worker)) + for proc in procs + proc isa MtlArrayDeviceProc || continue + if dev_ids == Colon() || proc.device_id in dev_ids + scope = Dagger.ExactScope(proc) + push!(scopes, scope) + end + end + end + return Dagger.UnionScope(scopes) +end +Dagger.scope_key_precedence(::Val{:metal_gpus}) = 1 + +const DEVICES = Dict{Int, MtlDevice}() +#const CONTEXTS = Dict{Int, MtlContext}() +#const STREAMS = Dict{Int, MtlStream}() + +function __init__() + if Metal.functional() + for (dev_id, dev) in enumerate(Metal.devices()) + @debug "Registering Metal GPU processor with Dagger: $dev" + # FIXME: We only get a Ptr now + Dagger.add_processor_callback!("metal_device_$(dev_id)") do + proc = MtlArrayDeviceProc(myid(), dev_id) + DEVICES[dev_id] = dev + #= + ctx = context(dev) + CONTEXTS[dev.device_id] = ctx + context!(ctx) do + STREAMS[dev.device_id] = stream() + end + =# + return proc + end + end + end +end + + +################################################################################ +# Private functions +################################################################################ + +# Try casting the array `x` to an `MtlArray`. If the casting is not possible, +# return `nothing`. +function _cast_array_to_mtlarray(x::Array{T,N}, device::MtlDevice) where {T,N} + # Try creating the buffer without copying. + dims = size(x) + nbytes_array = prod(dims) * sizeof(T) + pagesize = ccall(:getpagesize, Cint, ()) + num_pages = div(nbytes_array, pagesize, RoundUp) + nbytes = num_pages * pagesize + + pbuf = Metal.MTL.mtDeviceNewBufferWithBytesNoCopy( + device, + pointer(x), + nbytes, + Metal.Shared | Metal.MTL.DefaultTracking | Metal.MTL.DefaultCPUCache + ) + + if pbuf != C_NULL + buf = MtlBuffer(pbuf) + marray = MtlArray{T,N}(buf, dims) + return marray + end + + # If we reached here, the conversion was not possible. + return nothing +end + +# Return the Metal device handler given the ID recorded in `proc`. +function _get_metal_device(proc::MtlArrayDeviceProc) + devices = Metal.devices() + + if devices === nothing + return nothing + else + return devices[proc.device_id] + end +end + +end # module MetalExt diff --git a/ext/OpenCLExt.jl b/ext/OpenCLExt.jl new file mode 100644 index 000000000..ce085d310 --- /dev/null +++ b/ext/OpenCLExt.jl @@ -0,0 +1,311 @@ +module OpenCLExt + +export CLArrayDeviceProc + +import Dagger, MemPool +import Dagger: CPURAMMemorySpace, Chunk, unwrap +import MemPool: DRef, poolget +import Distributed: myid, remotecall_fetch +import LinearAlgebra +using KernelAbstractions, Adapt + +const CPUProc = Union{Dagger.OSProc,Dagger.ThreadProc} + +if isdefined(Base, :get_extension) + import OpenCL +else + import ..OpenCL +end +import OpenCL: CLArray, OpenCLBackend, cl +import .cl: Device, Context, CmdQueue + +using UUIDs + +"Represents a single OpenCL device." +struct CLArrayDeviceProc <: Dagger.Processor + owner::Int + device::Int +end +Dagger.get_parent(proc::CLArrayDeviceProc) = Dagger.OSProc(proc.owner) +Dagger.root_worker_id(proc::CLArrayDeviceProc) = proc.owner +Base.show(io::IO, proc::CLArrayDeviceProc) = + print(io, "CLArrayDeviceProc(worker $(proc.owner), device $(proc.device))") +Dagger.short_name(proc::CLArrayDeviceProc) = "W: $(proc.owner), CL: $(proc.device)" +Dagger.@gpuproc(CLArrayDeviceProc, CLArray) + +"Represents the memory space of a single OpenCL device's RAM." +struct CLMemorySpace <: Dagger.MemorySpace + owner::Int + device::Int +end +Dagger.root_worker_id(space::CLMemorySpace) = space.owner +function Dagger.memory_space(x::CLArray) + queue = x.data[].queue + idx = findfirst(==(queue), QUEUES) + return CLMemorySpace(myid(), idx) +end + +Dagger.memory_spaces(proc::CLArrayDeviceProc) = Set([CLMemorySpace(proc.owner, proc.device)]) +Dagger.processors(space::CLMemorySpace) = Set([CLArrayDeviceProc(space.owner, space.device)]) + +function to_device(proc::CLArrayDeviceProc) + @assert Dagger.root_worker_id(proc) == myid() + return DEVICES[proc.device] +end +function to_context(proc::CLArrayDeviceProc) + @assert Dagger.root_worker_id(proc) == myid() + return CONTEXTS[proc.device] +end +to_context(handle::Integer) = CONTEXTS[handle] +to_context(dev::Device) = to_context(dev.handle) + +function with_context!(handle::Integer) + cl.context!(CONTEXTS[handle]) + cl.queue!(QUEUES[handle]) +end +function with_context!(proc::CLArrayDeviceProc) + @assert Dagger.root_worker_id(proc) == myid() + with_context!(proc.device) +end +function with_context!(space::CLMemorySpace) + @assert Dagger.root_worker_id(space) == myid() + with_context!(space.device) +end +function with_context(f, x) + old_ctx = cl.context() + old_queue = cl.queue() + + with_context!(x) + try + f() + finally + cl.context!(old_ctx) + cl.queue!(old_queue) + end +end + +function _sync_with_context(x::Union{Dagger.Processor,Dagger.MemorySpace}) + with_context(x) do + cl.finish(cl.queue()) + end +end +function sync_with_context(x::Union{Dagger.Processor,Dagger.MemorySpace}) + if Dagger.root_worker_id(x) == myid() + _sync_with_context(x) + else + # Do nothing, as we have received our value over a serialization + # boundary, which should synchronize for us + end +end + +# Allocations +Dagger.allocate_array_func(::CLArrayDeviceProc, ::typeof(rand)) = OpenCL.rand +Dagger.allocate_array_func(::CLArrayDeviceProc, ::typeof(randn)) = OpenCL.randn +Dagger.allocate_array_func(::CLArrayDeviceProc, ::typeof(ones)) = OpenCL.ones +Dagger.allocate_array_func(::CLArrayDeviceProc, ::typeof(zeros)) = OpenCL.zeros +struct AllocateUndef{S} end +(::AllocateUndef{S})(T, dims::Dims{N}) where {S,N} = CLArray{S,N}(undef, dims) +Dagger.allocate_array_func(::CLArrayDeviceProc, ::Dagger.AllocateUndef{S}) where S = AllocateUndef{S}() + +# In-place +# N.B. These methods assume that later operations will implicitly or +# explicitly synchronize with their associated stream +function Dagger.move!(to_space::Dagger.CPURAMMemorySpace, from_space::CLMemorySpace, to::AbstractArray{T,N}, from::AbstractArray{T,N}) where {T,N} + if Dagger.root_worker_id(from_space) == myid() + _sync_with_context(from_space) + with_context!(from_space) + end + copyto!(to, from) + # N.B. DtoH will synchronize + return +end +function Dagger.move!(to_space::CLMemorySpace, from_space::Dagger.CPURAMMemorySpace, to::AbstractArray{T,N}, from::AbstractArray{T,N}) where {T,N} + with_context!(to_space) + copyto!(to, from) + return +end +function Dagger.move!(to_space::CLMemorySpace, from_space::CLMemorySpace, to::AbstractArray{T,N}, from::AbstractArray{T,N}) where {T,N} + sync_with_context(from_space) + with_context!(to_space) + copyto!(to, from) + return +end + +# Out-of-place HtoD +function Dagger.move(from_proc::CPUProc, to_proc::CLArrayDeviceProc, x) + with_context(to_proc) do + arr = adapt(CLArray, x) + cl.finish(cl.queue()) + return arr + end +end +function Dagger.move(from_proc::CPUProc, to_proc::CLArrayDeviceProc, x::Chunk) + from_w = Dagger.root_worker_id(from_proc) + to_w = Dagger.root_worker_id(to_proc) + @assert myid() == to_w + cpu_data = remotecall_fetch(unwrap, from_w, x) + with_context(to_proc) do + arr = adapt(CLArray, cpu_data) + cl.finish(cl.queue()) + return arr + end +end +function Dagger.move(from_proc::CPUProc, to_proc::CLArrayDeviceProc, x::CLArray) + queue = x.data[].queue + if queue == QUEUES[to_proc.device] + return x + end + with_context(to_proc) do + _x = similar(x) + copyto!(_x, x) + cl.finish(cl.queue()) + return _x + end +end + +# Out-of-place DtoH +function Dagger.move(from_proc::CLArrayDeviceProc, to_proc::CPUProc, x) + with_context(from_proc) do + cl.finish(cl.queue()) + _x = adapt(Array, x) + cl.finish(cl.queue()) + return _x + end +end +function Dagger.move(from_proc::CLArrayDeviceProc, to_proc::CPUProc, x::Chunk) + from_w = Dagger.root_worker_id(from_proc) + to_w = Dagger.root_worker_id(to_proc) + @assert myid() == to_w + remotecall_fetch(from_w, x) do x + arr = unwrap(x) + return Dagger.move(from_proc, to_proc, arr) + end +end +function Dagger.move(from_proc::CLArrayDeviceProc, to_proc::CPUProc, x::CLArray{T,N}) where {T,N} + with_context(from_proc) do + cl.finish(cl.queue()) + _x = Array{T,N}(undef, size(x)) + copyto!(_x, x) + cl.finish(cl.queue()) + return _x + end +end + +# Out-of-place DtoD +function Dagger.move(from_proc::CLArrayDeviceProc, to_proc::CLArrayDeviceProc, x::Dagger.Chunk{T}) where T<:CLArray + if from_proc == to_proc + # Same process and GPU, no change + arr = unwrap(x) + _sync_with_context(from_proc) + return arr + elseif Dagger.root_worker_id(from_proc) == Dagger.root_worker_id(to_proc) + # Same process but different GPUs, use DtoD copy + from_arr = unwrap(x) + _sync_with_context(from_proc) + return with_context(to_proc) do + to_arr = similar(from_arr) + copyto!(to_arr, from_arr) + cl.finish(cl.queue()) + return to_arr + end + else + # Different node, use DtoH, serialization, HtoD + return CLArray(remotecall_fetch(from_proc.owner, x) do x + Array(unwrap(x)) + end) + end +end + +# Adapt generic functions +Dagger.move(from_proc::CPUProc, to_proc::CLArrayDeviceProc, x::Function) = x +Dagger.move(from_proc::CPUProc, to_proc::CLArrayDeviceProc, x::Chunk{T}) where {T<:Function} = + Dagger.move(from_proc, to_proc, fetch(x)) + +# Task execution +function Dagger.execute!(proc::CLArrayDeviceProc, f, args...; kwargs...) + @nospecialize f args kwargs + tls = Dagger.get_tls() + task = Threads.@spawn begin + Dagger.set_tls!(tls) + with_context!(proc) + result = Base.@invokelatest f(args...; kwargs...) + # N.B. Synchronization must be done when accessing result or args + return result + end + + try + fetch(task) + catch err + stk = current_exceptions(task) + err, frames = stk[1] + rethrow(CapturedException(err, frames)) + end +end + +Dagger.gpu_processor(::Val{:OpenCL}) = CLArrayDeviceProc +Dagger.gpu_can_compute(::Val{:OpenCL}) = length(cl.platforms()) > 0 +Dagger.gpu_kernel_backend(::CLArrayDeviceProc) = OpenCLBackend() +Dagger.gpu_with_device(f, proc::CLArrayDeviceProc) = + cl.device!(f, proc.device) +function Dagger.gpu_synchronize(proc::CLArrayDeviceProc) + with_context(proc) do + cl.finish(QUEUES[proc.device]) + end +end +function Dagger.gpu_synchronize(::Val{:OpenCL}) + for idx in keys(DEVICES) + _sync_with_context(CLArrayDeviceProc(myid(), idx)) + end +end + +Dagger.to_scope(::Val{:cl_device}, sc::NamedTuple) = + Dagger.to_scope(Val{:cl_devices}(), merge(sc, (;cl_devices=[sc.cl_device]))) +Dagger.scope_key_precedence(::Val{:cl_device}) = 1 +function Dagger.to_scope(::Val{:cl_devices}, sc::NamedTuple) + if haskey(sc, :worker) + workers = Int[sc.worker] + elseif haskey(sc, :workers) && sc.workers != Colon() + workers = sc.workers + else + workers = map(gproc->gproc.pid, Dagger.procs(Dagger.Sch.eager_context())) + end + scopes = Dagger.ExactScope[] + dev_ids = sc.cl_devices + for worker in workers + procs = Dagger.get_processors(Dagger.OSProc(worker)) + for proc in procs + proc isa CLArrayDeviceProc || continue + if dev_ids == Colon() || proc.device in dev_ids + scope = Dagger.ExactScope(proc) + push!(scopes, scope) + end + end + end + return Dagger.UnionScope(scopes) +end +Dagger.scope_key_precedence(::Val{:cl_devices}) = 1 + +const DEVICES = Dict{Int, Device}() +const CONTEXTS = Dict{Int, Context}() +const QUEUES = Dict{Int, CmdQueue}() + +function __init__() + # FIXME: Support multiple platforms + if length(cl.platforms()) > 0 + platform = cl.default_platform() + for (idx, dev) in enumerate(cl.devices(platform)) + @debug "Registering OpenCL device processor with Dagger: $dev" + Dagger.add_processor_callback!("clarray_device_$(idx)") do + proc = CLArrayDeviceProc(myid(), idx) + cl.device!(dev) do + DEVICES[idx] = dev + CONTEXTS[idx] = cl.context() + QUEUES[idx] = cl.queue() + end + return proc + end + end + end +end + +end # module OpenCLExt diff --git a/ext/ROCExt.jl b/ext/ROCExt.jl new file mode 100644 index 000000000..6aaa5a867 --- /dev/null +++ b/ext/ROCExt.jl @@ -0,0 +1,330 @@ +module ROCExt + +export ROCArrayDeviceProc + +import Dagger, MemPool +import Dagger: CPURAMMemorySpace, Chunk, unwrap +import MemPool: DRef, poolget +import Distributed: myid, remotecall_fetch +import LinearAlgebra +using KernelAbstractions, Adapt + +const CPUProc = Union{Dagger.OSProc,Dagger.ThreadProc} + +if isdefined(Base, :get_extension) + import AMDGPU +else + import ..AMDGPU +end +import AMDGPU: HIPDevice, HIPContext, HIPStream, ROCArray, ROCBackend +import AMDGPU: devices, context, context!, stream, stream! +import AMDGPU: rocBLAS, rocSOLVER + +struct ROCArrayDeviceProc <: Dagger.Processor + owner::Int + device_id::Int +end +Dagger.get_parent(proc::ROCArrayDeviceProc) = Dagger.OSProc(proc.owner) +Dagger.root_worker_id(proc::ROCArrayDeviceProc) = proc.owner +Base.show(io::IO, proc::ROCArrayDeviceProc) = + print(io, "ROCArrayDeviceProc(worker $(proc.owner), device $(proc.device_id))") +Dagger.short_name(proc::ROCArrayDeviceProc) = "W: $(proc.owner), ROCm: $(proc.device_id)" +Dagger.@gpuproc(ROCArrayDeviceProc, ROCArray) + +"Represents the memory space of a single ROCm GPU's VRAM." +struct ROCVRAMMemorySpace <: Dagger.MemorySpace + owner::Int + device_id::Int +end +Dagger.root_worker_id(space::ROCVRAMMemorySpace) = space.owner +Dagger.memory_space(x::ROCArray) = + ROCVRAMMemorySpace(myid(), AMDGPU.device(x).device_id) + +Dagger.memory_spaces(proc::ROCArrayDeviceProc) = Set([ROCVRAMMemorySpace(proc.owner, proc.device_id)]) +Dagger.processors(space::ROCVRAMMemorySpace) = Set([ROCArrayDeviceProc(space.owner, space.device_id)]) + +function to_device(proc::ROCArrayDeviceProc) + @assert Dagger.root_worker_id(proc) == myid() + return DEVICES[proc.device_id] +end +function to_context(proc::ROCArrayDeviceProc) + @assert Dagger.root_worker_id(proc) == myid() + return CONTEXTS[proc.device_id] +end +to_context(handle::Integer) = CONTEXTS[handle] +to_context(dev::HIPDevice) = to_context(dev.device_id) + +function with_context!(handle::Integer) + context!(CONTEXTS[handle]) + AMDGPU.device!(DEVICES[handle]) + stream!(STREAMS[handle]) +end +function with_context!(proc::ROCArrayDeviceProc) + @assert Dagger.root_worker_id(proc) == myid() + with_context!(proc.device_id) +end +function with_context!(space::ROCVRAMMemorySpace) + @assert Dagger.root_worker_id(space) == myid() + with_context!(space.device_id) +end +function with_context(f, x) + old_ctx = context() + old_device = AMDGPU.device() + old_stream = stream() + + with_context!(x) + try + f() + finally + context!(old_ctx) + AMDGPU.device!(old_device) + stream!(old_stream) + end +end + +function _sync_with_context(x::Union{Dagger.Processor,Dagger.MemorySpace}) + with_context(x) do + AMDGPU.synchronize() + end +end +function sync_with_context(x::Union{Dagger.Processor,Dagger.MemorySpace}) + if Dagger.root_worker_id(x) == myid() + _sync_with_context(x) + else + # Do nothing, as we have received our value over a serialization + # boundary, which should synchronize for us + end +end + +# Allocations +# FIXME: Avoids some segfaults in rocRAND +fake_rand(::Type{T}, dims::NTuple{N}) where {T,N} = ROCArray(rand(T, dims)) +fake_randn(::Type{T}, dims::NTuple{N}) where {T,N} = ROCArray(randn(T, dims)) +Dagger.allocate_array_func(::ROCArrayDeviceProc, ::typeof(rand)) = fake_rand +Dagger.allocate_array_func(::ROCArrayDeviceProc, ::typeof(randn)) = fake_randn +Dagger.allocate_array_func(::ROCArrayDeviceProc, ::typeof(ones)) = AMDGPU.ones +Dagger.allocate_array_func(::ROCArrayDeviceProc, ::typeof(zeros)) = AMDGPU.zeros +struct AllocateUndef{S} end +(::AllocateUndef{S})(T, dims::Dims{N}) where {S,N} = ROCArray{S,N}(undef, dims) +Dagger.allocate_array_func(::ROCArrayDeviceProc, ::Dagger.AllocateUndef{S}) where S = AllocateUndef{S}() + +# In-place +# N.B. These methods assume that later operations will implicitly or +# explicitly synchronize with their associated stream +function Dagger.move!(to_space::Dagger.CPURAMMemorySpace, from_space::ROCVRAMMemorySpace, to::AbstractArray{T,N}, from::AbstractArray{T,N}) where {T,N} + if Dagger.root_worker_id(from_space) == myid() + sync_with_context(from_space) + with_context!(from_space) + end + copyto!(to, from) + # N.B. DtoH will synchronize + return +end +function Dagger.move!(to_space::ROCVRAMMemorySpace, from_space::Dagger.CPURAMMemorySpace, to::AbstractArray{T,N}, from::AbstractArray{T,N}) where {T,N} + with_context!(to_space) + copyto!(to, from) + return +end +function Dagger.move!(to_space::ROCVRAMMemorySpace, from_space::ROCVRAMMemorySpace, to::AbstractArray{T,N}, from::AbstractArray{T,N}) where {T,N} + sync_with_context(from_space) + with_context!(to_space) + copyto!(to, from) + return +end + +# Out-of-place HtoD +function Dagger.move(from_proc::CPUProc, to_proc::ROCArrayDeviceProc, x) + with_context(to_proc) do + arr = adapt(ROCArray, x) + AMDGPU.synchronize() + return arr + end +end +function Dagger.move(from_proc::CPUProc, to_proc::ROCArrayDeviceProc, x::Chunk) + from_w = Dagger.root_worker_id(from_proc) + to_w = Dagger.root_worker_id(to_proc) + @assert myid() == to_w + cpu_data = remotecall_fetch(unwrap, from_w, x) + with_context(to_proc) do + arr = adapt(ROCArray, cpu_data) + AMDGPU.synchronize() + return arr + end +end +function Dagger.move(from_proc::CPUProc, to_proc::ROCArrayDeviceProc, x::ROCArray) + if AMDGPU.device(x) == to_device(to_proc) + return x + end + with_context(to_proc) do + _x = similar(x) + copyto!(_x, x) + AMDGPU.synchronize() + return _x + end +end + +# Out-of-place DtoH +function Dagger.move(from_proc::ROCArrayDeviceProc, to_proc::CPUProc, x) + with_context(from_proc) do + AMDGPU.synchronize() + _x = adapt(Array, x) + AMDGPU.synchronize() + return _x + end +end +function Dagger.move(from_proc::ROCArrayDeviceProc, to_proc::CPUProc, x::Chunk) + from_w = Dagger.root_worker_id(from_proc) + to_w = Dagger.root_worker_id(to_proc) + @assert myid() == to_w + remotecall_fetch(from_w, x) do x + arr = unwrap(x) + return Dagger.move(from_proc, to_proc, arr) + end +end +function Dagger.move(from_proc::ROCArrayDeviceProc, to_proc::CPUProc, x::ROCArray{T,N}) where {T,N} + with_context(AMDGPU.device(x).device_id) do + AMDGPU.synchronize() + _x = Array{T,N}(undef, size(x)) + copyto!(_x, x) + AMDGPU.synchronize() + return _x + end +end + +# Out-of-place DtoD +function Dagger.move(from_proc::ROCArrayDeviceProc, to_proc::ROCArrayDeviceProc, x::Dagger.Chunk{T}) where T<:ROCArray + if from_proc == to_proc + # Same process and GPU, no change + arr = unwrap(x) + with_context(AMDGPU.synchronize, from_proc) + return arr + elseif Dagger.root_worker_id(from_proc) == Dagger.root_worker_id(to_proc) + # Same process but different GPUs, use DtoD copy + from_arr = unwrap(x) + dev = AMDGPU.device(from_arr) + with_context(AMDGPU.synchronize, dev.device_id) + return with_context(to_proc) do + to_arr = similar(from_arr) + copyto!(to_arr, from_arr) + AMDGPU.synchronize() + to_arr + end + else + # Different node, use DtoH, serialization, HtoD + return ROCArray(remotecall_fetch(from_proc.owner, x) do x + Array(unwrap(x)) + end) + end +end + +# Adapt generic functions +Dagger.move(from_proc::CPUProc, to_proc::ROCArrayDeviceProc, x::Function) = x +Dagger.move(from_proc::CPUProc, to_proc::ROCArrayDeviceProc, x::Chunk{T}) where {T<:Function} = + Dagger.move(from_proc, to_proc, fetch(x)) + +# Adapt BLAS/LAPACK functions +import LinearAlgebra: BLAS, LAPACK +for lib in [BLAS, LAPACK] + for name in names(lib; all=true) + name == nameof(lib) && continue + startswith(string(name), '#') && continue + endswith(string(name), '!') || continue + + for roclib in [rocBLAS, rocSOLVER] + if name in names(roclib; all=true) + fn = getproperty(lib, name) + rocfn = getproperty(roclib, name) + @eval Dagger.move(from_proc::CPUProc, to_proc::ROCArrayDeviceProc, ::$(typeof(fn))) = $rocfn + end + end + end +end + +# Task execution +function Dagger.execute!(proc::ROCArrayDeviceProc, f, args...; kwargs...) + @nospecialize f args kwargs + tls = Dagger.get_tls() + task = Threads.@spawn begin + Dagger.set_tls!(tls) + with_context!(proc) + result = Base.@invokelatest f(args...; kwargs...) + # N.B. Synchronization must be done when accessing result or args + return result + end + + try + fetch(task) + catch err + stk = current_exceptions(task) + err, frames = stk[1] + rethrow(CapturedException(err, frames)) + end +end + +Dagger.gpu_processor(::Val{:ROC}) = ROCArrayDeviceProc +Dagger.gpu_can_compute(::Val{:ROC}) = AMDGPU.functional() +Dagger.gpu_kernel_backend(proc::ROCArrayDeviceProc) = ROCBackend() +Dagger.gpu_with_device(f, proc::ROCArrayDeviceProc) = + AMDGPU.device!(f, AMDGPU.devices()[proc.device_id]) +function Dagger.gpu_synchronize(proc::ROCArrayDeviceProc) + with_context(proc) do + AMDGPU.synchronize() + end +end +function Dagger.gpu_synchronize(::Val{:ROC}) + for dev in AMDGPU.devices() + _sync_with_context(ROCArrayDeviceProc(myid(), dev.device_id)) + end +end + +Dagger.to_scope(::Val{:rocm_gpu}, sc::NamedTuple) = + Dagger.to_scope(Val{:rocm_gpus}(), merge(sc, (;rocm_gpus=[sc.rocm_gpu]))) +function Dagger.to_scope(::Val{:rocm_gpus}, sc::NamedTuple) + if haskey(sc, :worker) + workers = Int[sc.worker] + elseif haskey(sc, :workers) && sc.workers != Colon() + workers = sc.workers + else + workers = map(gproc->gproc.pid, Dagger.procs(Dagger.Sch.eager_context())) + end + scopes = Dagger.ExactScope[] + dev_ids = sc.rocm_gpus + for worker in workers + procs = Dagger.get_processors(Dagger.OSProc(worker)) + for proc in procs + proc isa ROCArrayDeviceProc || continue + if dev_ids == Colon() || proc.device_id in dev_ids + scope = Dagger.ExactScope(proc) + push!(scopes, scope) + end + end + end + return Dagger.UnionScope(scopes) +end +Dagger.scope_key_precedence(::Val{:rocm_gpu}) = 2 +Dagger.scope_key_precedence(::Val{:rocm_gpus}) = 1 + +const DEVICES = Dict{Int, HIPDevice}() +const CONTEXTS = Dict{Int, HIPContext}() +const STREAMS = Dict{Int, HIPStream}() + +function __init__() + if AMDGPU.functional() + for device_id in 1:length(AMDGPU.devices()) + dev = AMDGPU.devices()[device_id] + @debug "Registering ROCm GPU processor with Dagger: $dev" + Dagger.add_processor_callback!("rocarray_device_$device_id") do + proc = ROCArrayDeviceProc(myid(), device_id) + DEVICES[dev.device_id] = dev + ctx = HIPContext(dev) + CONTEXTS[dev.device_id] = ctx + context!(ctx) do + STREAMS[dev.device_id] = HIPStream() + end + return proc + end + end + end +end + +end # module ROCExt diff --git a/src/Dagger.jl b/src/Dagger.jl index fd6395a4b..9baa504ae 100644 --- a/src/Dagger.jl +++ b/src/Dagger.jl @@ -103,6 +103,11 @@ include("array/cholesky.jl") include("array/lu.jl") include("array/random.jl") +import KernelAbstractions, Adapt + +# GPU +include("gpu.jl") + # Logging and Visualization include("visualization.jl") include("ui/gantt-common.jl") diff --git a/src/gpu.jl b/src/gpu.jl new file mode 100644 index 000000000..06d749543 --- /dev/null +++ b/src/gpu.jl @@ -0,0 +1,103 @@ +const CPUProc = Union{OSProc, ThreadProc} + +""" + Kernel{F} + +A type that wraps a KernelAbstractions kernel function. Can be passed to +`Dagger.@spawn` to launch a kernel on a GPU. Synchronization is handled +automatically, but you can also call `Dagger.gpu_synchronize()` to synchronize +kernels manually. +""" +struct Kernel{F} end + +Kernel(f) = Kernel{f}() + +function (::Kernel{F})(args...; ndrange) where F + @nospecialize args + dev = gpu_kernel_backend() + kern = F(dev) + kern(args...; ndrange) +end + +macro gpuproc(PROC, T) + PROC = esc(PROC) + T = esc(T) + quote + # Assume that we can run anything + Dagger.iscompatible_func(proc::$PROC, opts, f) = true + Dagger.iscompatible_arg(proc::$PROC, opts, x) = true + + # CPUs shouldn't process our array type + Dagger.iscompatible_arg(proc::ThreadProc, opts, x::$T) = false + end +end + +""" + gpu_processor(kind::Symbol) + +Get the processor type for the given kind of GPU. Supported kinds are +`:CPU`, `:CUDA`, `:ROC`, `:oneAPI`, `:Metal`, and `:OpenCL`. +""" +gpu_processor(kind::Symbol) = gpu_processor(Val(kind)) +gpu_processor(::Val{:CPU}) = ThreadProc + +""" + gpu_can_compute(kind::Symbol) + +Check if the given kind of GPU is ready and usable. Supported kinds are +`:CPU`, `:CUDA`, `:ROC`, `:oneAPI`, `:Metal`, and `:OpenCL`. +""" +gpu_can_compute(kind::Symbol) = gpu_can_compute(Val(kind)) +gpu_can_compute(::Val{:CPU}) = true +gpu_can_compute(::Val) = false + +function gpu_with_device end + +move_optimized(from_proc::Processor, + to_proc::Processor, + x) = nothing + +""" + gpu_kernel_backend() + gpu_kernel_backend(proc::Processor) + +Get the KernelAbstractions backend for the current processor. +""" +gpu_kernel_backend() = gpu_kernel_backend(task_processor()) +gpu_kernel_backend(::ThreadProc) = KernelAbstractions.CPU() + +""" + gpu_synchronize(proc::Processor) + +Synchronize all kernels launched by Dagger tasks on the given processor. +""" +gpu_synchronize(proc::Processor) = nothing + +""" + gpu_synchronize() + +Synchronize all kernels launched by Dagger tasks in the current scope. +""" +function gpu_synchronize() + for proc in Dagger.compatible_processors() + gpu_synchronize(proc) + end +end +""" + gpu_synchronize(kind::Symbol) + +Synchronize all kernels launched by Dagger tasks in the current scope for the +given processor kind. Alternatively, if `kind == :all`, synchronize all +kernels on all processors. Supported kinds are `:CPU`, `:CUDA`, `:ROC`, +`:oneAPI`, `:Metal`, and `:OpenCL`. +""" +function gpu_synchronize(kind::Symbol) + if kind == :all + for proc in Dagger.all_processors() + gpu_synchronize(proc) + end + else + gpu_synchronize(Val(kind)) + end +end +gpu_synchronize(::Val{:CPU}) = nothing \ No newline at end of file diff --git a/test/Project.toml b/test/Project.toml index 19e648820..10b64f326 100644 --- a/test/Project.toml +++ b/test/Project.toml @@ -8,6 +8,7 @@ Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f" GraphViz = "f526b714-d49f-11e8-06ff-31ed36ee7ee0" Graphs = "86223c79-3864-5bf0-83f7-82e725a168b6" JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" +KernelAbstractions = "63c18a36-062a-441e-b654-da1e3ab1ce7c" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" MemPool = "f9f48841-c794-520a-933b-121f7ba6ed94" OnlineStats = "a15396b6-48d5-5d58-9928-6d29437db91e" diff --git a/test/gpu.jl b/test/gpu.jl new file mode 100644 index 000000000..4da5c54bd --- /dev/null +++ b/test/gpu.jl @@ -0,0 +1,704 @@ +using Random +using LinearAlgebra + +if USE_CUDA + using Pkg + Pkg.add("CUDA") +end +if USE_ROCM + using Pkg + Pkg.add("AMDGPU") +end +if USE_ONEAPI + using Pkg + Pkg.add("oneAPI") +end +if USE_METAL + using Pkg + Pkg.add("Metal") +end +if USE_OPENCL + using Pkg + Pkg.add("OpenCL") + Pkg.add("pocl_jll") +end + +@everywhere begin + if $USE_CUDA + using CUDA + elseif !$IN_CI + try using CUDA + catch end + end + + if $USE_ROCM + using AMDGPU + elseif !$IN_CI + try using AMDGPU + catch end + end + + if $USE_ONEAPI + using oneAPI + elseif !$IN_CI + try using oneAPI + catch end + end + + if $USE_METAL + using Metal + elseif !$IN_CI + try using Metal + catch end + end + + if $USE_OPENCL + using pocl_jll, OpenCL + elseif !$IN_CI + try using pocl_jll, OpenCL + catch end + end + + using Distributed, Dagger + import Dagger: Kernel + using KernelAbstractions +end +@everywhere begin + function isongpu(X) + return !(X isa Array) + end + + @kernel function fill_kernel(A, x) + idx = @index(Global, Linear) + A[idx] = x + end + function fill_thunk(A, x) + backend = Dagger.gpu_kernel_backend() + k = fill_kernel(backend, 8) + k(A, x; ndrange=8) + KernelAbstractions.synchronize(backend) + return A, typeof(A) + end + + @kernel function copy_kernel(B, A) + idx = @index(Global, Linear) + B[idx] = A[idx] + end + + # Create a function to perform an in-place operation. + function addarray!(x) + x .= x .+ 1.0f0 + end +end + +function generate_thunks() + as = [Dagger.spawn(x->x+1, 1) for i in 1:10] + Dagger.spawn((xs...)->[sum(xs)], as...) +end + +@test (!USE_CUDA || Dagger.gpu_can_compute(:CUDA)) && + (!USE_ROCM || Dagger.gpu_can_compute(:ROC)) && + (!USE_ONEAPI || Dagger.gpu_can_compute(:oneAPI)) && + (!USE_METAL || Dagger.gpu_can_compute(:Metal)) + +@testset "CPU" begin + @testset "KernelAbstractions" begin + A = rand(Float32, 8) + DA, T = fetch(Dagger.@spawn fill_thunk(A, 2.3f0)) + @test all(DA .== 2.3f0) + @test T <: Array + + A = rand(Float64, 128) + B = zeros(Float64, 128) + Dagger.with_options(scope=Dagger.scope(worker=1,thread=1)) do + fetch(Dagger.@spawn Kernel(copy_kernel)(B, A; ndrange=length(A))) + Dagger.gpu_synchronize(:CPU) + end + @test all(B .== A) + end +end + +@testset "CUDA" begin + if !Dagger.gpu_can_compute(:CUDA) + @warn "No CUDA devices available, skipping tests" + else + cuproc = Base.get_extension(Dagger, :CUDAExt).CuArrayDeviceProc + @test Dagger.gpu_processor(:CUDA) === cuproc + ndevices = length(collect(CUDA.devices())) + gpu_configs = Any[1] + if ndevices > 1 + push!(gpu_configs, 2) + end + single_gpu_configs = copy(gpu_configs) + push!(gpu_configs, :all) + + @testset "Arrays (GPU $gpu)" for gpu in gpu_configs + scope = Dagger.scope(cuda_gpus=(gpu == :all ? Colon() : [gpu])) + + b = generate_thunks() + c = Dagger.with_options(;scope) do + @test fetch(Dagger.@spawn isongpu(b)) + Dagger.@spawn sum(b) + end + @test !fetch(Dagger.@spawn isongpu(b)) + @test fetch(Dagger.@spawn identity(c)) == 20 + end + + @testset "KernelAbstractions (GPU $gpu)" for gpu in gpu_configs + scope = Dagger.scope(cuda_gpus=(gpu == :all ? Colon() : [gpu])) + local_scope = Dagger.scope(worker=1, cuda_gpus=(gpu == :all ? Colon() : [gpu])) + + A = rand(Float32, 8) + DA, T = Dagger.with_options(;scope) do + fetch(Dagger.@spawn fill_thunk(A, 2.3f0)) + end + @test all(DA .== 2.3f0) + @test T <: CuArray + + if gpu != :all + local A, B + CUDA.device!(gpu-1) do + A = CUDA.rand(128) + B = CUDA.zeros(128) + end + Dagger.with_options(;scope=local_scope) do + fetch(Dagger.@spawn Kernel(copy_kernel)(B, A; ndrange=length(A))) + Dagger.gpu_synchronize(:CUDA) + end + CUDA.device!(gpu-1) do + @test all(B .== A) + end + end + end + + @testset "DArray Allocation (GPU $gpu)" for gpu in single_gpu_configs + scope = Dagger.scope(cuda_gpu=gpu) + + DA_cpu = rand(Blocks(4, 4), 8, 8) + Dagger.with_options(;scope) do + DA_gpu = similar(DA_cpu) + for chunk in DA_gpu.chunks + chunk = fetch(chunk; raw=true) + @test chunk isa Dagger.Chunk{<:CuArray} + @test remotecall_fetch(chunk.handle.owner, chunk) do chunk + devs = collect(CUDA.devices()) + arr = Dagger.MemPool.poolget(chunk.handle) + return CUDA.device(arr) == devs[gpu] + end + end + + for fn in (rand, randn, ones, zeros) + DA = rand(Blocks(4, 4), 8, 8) + for chunk in DA.chunks + chunk = fetch(chunk; raw=true) + @test chunk isa Dagger.Chunk{<:CuArray} + @test remotecall_fetch(chunk.handle.owner, chunk) do chunk + devs = collect(CUDA.devices()) + arr = Dagger.MemPool.poolget(chunk.handle) + return CUDA.device(arr) == devs[gpu] + end + end + end + end + end + + @testset "Datadeps (GPU $gpu)" for gpu in gpu_configs + local_scope = Dagger.scope(worker=1, cuda_gpus=(gpu == :all ? Colon() : [gpu])) + + DA = rand(Blocks(4, 4), 8, 8) + DB = rand(Blocks(4, 4), 8, 8) + + # In-place Matmul + DC = zeros(Blocks(4, 4), 8, 8) + Dagger.with_options(;scope=local_scope) do + mul!(DC, DA, DB) + end + @test collect(DC) ≈ collect(DA) * collect(DB) + + # Out-of-place Matmul + Dagger.with_options(;scope=local_scope) do + @test collect(DA * DB) ≈ collect(DA) * collect(DB) + end + + # Out-of-place Cholesky + A = rand(8, 8) + A = A * A' + A[diagind(A)] .+= size(A, 1) + DA = DArray(A, Blocks(4, 4)) + Dagger.with_options(;scope=local_scope) do + @test collect(cholesky(DA).U) ≈ cholesky(collect(DA)).U + end + end + end +end + +@testset "ROCm" begin + if !Dagger.gpu_can_compute(:ROC) + @warn "No ROCm devices available, skipping tests" + else + rocproc = Base.get_extension(Dagger, :ROCExt).ROCArrayDeviceProc + @test Dagger.gpu_processor(:ROC) === rocproc + ndevices = length(AMDGPU.devices()) + gpu_configs = Any[1] + if ndevices > 1 + push!(gpu_configs, 2) + end + single_gpu_configs = copy(gpu_configs) + push!(gpu_configs, :all) + + @testset "Arrays (GPU $gpu)" for gpu in gpu_configs + scope = Dagger.scope(rocm_gpus=(gpu == :all ? Colon() : [gpu])) + + b = generate_thunks() + c = Dagger.with_options(;scope) do + @test fetch(Dagger.@spawn isongpu(b)) + Dagger.@spawn sum(b) + end + @test !fetch(Dagger.@spawn isongpu(b)) + @test fetch(Dagger.@spawn identity(c)) == 20 + end + + @testset "KernelAbstractions (GPU $gpu)" for gpu in gpu_configs + scope = Dagger.scope(rocm_gpus=(gpu == :all ? Colon() : [gpu])) + local_scope = Dagger.scope(worker=1, rocm_gpus=(gpu == :all ? Colon() : [gpu])) + + A = rand(Float32, 8) + DA, T = Dagger.with_options(;scope) do + fetch(Dagger.@spawn fill_thunk(A, 2.3f0)) + end + @test all(DA .== 2.3f0) + @test T <: ROCArray + + if gpu != :all + local A, B + AMDGPU.device!(AMDGPU.devices()[gpu]) do + A = AMDGPU.rand(128) + B = AMDGPU.zeros(128) + end + Dagger.with_options(;scope=local_scope) do + fetch(Dagger.@spawn Kernel(copy_kernel)(B, A; ndrange=length(A))) + Dagger.gpu_synchronize(:ROC) + end + AMDGPU.device!(AMDGPU.devices()[gpu]) do + @test all(B .== A) + end + end + end + + @testset "DArray Allocation (GPU $gpu)" for gpu in single_gpu_configs + scope = Dagger.scope(rocm_gpu=gpu) + + DA_cpu = rand(Blocks(4, 4), 8, 8) + Dagger.with_options(;scope) do + DA_gpu = similar(DA_cpu) + for chunk in DA_gpu.chunks + chunk = fetch(chunk; raw=true) + @test chunk isa Dagger.Chunk{<:ROCArray} + @test remotecall_fetch(chunk.handle.owner, chunk) do chunk + devs = collect(AMDGPU.devices()) + arr = Dagger.MemPool.poolget(chunk.handle) + return AMDGPU.device(arr) == devs[gpu] + end + end + + for fn in (rand, randn, ones, zeros) + DA = rand(Blocks(4, 4), 8, 8) + for chunk in DA.chunks + chunk = fetch(chunk; raw=true) + @test chunk isa Dagger.Chunk{<:ROCArray} + @test remotecall_fetch(chunk.handle.owner, chunk) do chunk + devs = collect(AMDGPU.devices()) + arr = Dagger.MemPool.poolget(chunk.handle) + return AMDGPU.device(arr) == devs[gpu] + end + end + end + end + end + + @testset "Datadeps (GPU $gpu)" for gpu in gpu_configs + local_scope = Dagger.scope(worker=1, rocm_gpus=(gpu == :all ? Colon() : [gpu])) + + DA = rand(Blocks(4, 4), 8, 8) + DB = rand(Blocks(4, 4), 8, 8) + + # In-place Matmul + DC = zeros(Blocks(4, 4), 8, 8) + Dagger.with_options(;scope=local_scope) do + mul!(DC, DA, DB) + end + @test collect(DC) ≈ collect(DA) * collect(DB) + + # Out-of-place Matmul + Dagger.with_options(;scope=local_scope) do + @test collect(DA * DB) ≈ collect(DA) * collect(DB) + end + + # Out-of-place Cholesky + A = rand(8, 8) + A = A * A' + A[diagind(A)] .+= size(A, 1) + DA = DArray(A, Blocks(4, 4)) + Dagger.with_options(;scope=local_scope) do + @test collect(cholesky(DA).U) ≈ cholesky(collect(DA)).U + end + end + end +end + +@testset "oneAPI" begin + if !Dagger.gpu_can_compute(:oneAPI) + @warn "No oneAPI devices available, skipping tests" + else + oneproc = Base.get_extension(Dagger, :IntelExt).oneArrayDeviceProc + @test Dagger.gpu_processor(:oneAPI) === oneproc + ndevices = length(oneAPI.devices()) + gpu_configs = Any[1] + if ndevices > 1 + push!(gpu_configs, 2) + end + single_gpu_configs = copy(gpu_configs) + push!(gpu_configs, :all) + + @testset "Arrays (GPU $gpu)" for gpu in gpu_configs + scope = Dagger.scope(intel_gpus=(gpu == :all ? Colon() : [gpu])) + + b = generate_thunks() + c = Dagger.with_options(;scope) do + @test fetch(Dagger.@spawn isongpu(b)) + Dagger.@spawn sum(b) + end + @test !fetch(Dagger.@spawn isongpu(b)) + @test fetch(Dagger.@spawn identity(c)) == 20 + end + + @testset "KernelAbstractions (GPU $gpu)" for gpu in gpu_configs + scope = Dagger.scope(intel_gpus=(gpu == :all ? Colon() : [gpu])) + local_scope = Dagger.scope(worker=1, intel_gpus=(gpu == :all ? Colon() : [gpu])) + + A = rand(Float32, 8) + DA, T = Dagger.with_options(;scope) do + fetch(Dagger.@spawn fill_thunk(A, 2.3f0)) + end + @test all(DA .== 2.3f0) + @test T <: oneArray + + if gpu != :all + local A, B + old_dev = oneAPI.device() + oneAPI.device!(oneAPI.devices()[gpu]) + A = oneAPI.rand(Float32, 128) + B = oneAPI.zeros(Float32, 128) + oneAPI.device!(old_dev) + Dagger.with_options(;scope=local_scope) do + fetch(Dagger.@spawn Kernel(copy_kernel)(B, A; ndrange=length(A))) + Dagger.gpu_synchronize(:oneAPI) + end + old_dev = oneAPI.device() + oneAPI.device!(oneAPI.devices()[gpu]) + @test all(B .== A) + oneAPI.device!(old_dev) + end + end + + @testset "DArray Allocation (GPU $gpu)" for gpu in single_gpu_configs + scope = Dagger.scope(intel_gpu=gpu) + + DA_cpu = rand(Blocks(4, 4), Float32, 8, 8) + Dagger.with_options(;scope) do + DA_gpu = similar(DA_cpu) + for chunk in DA_gpu.chunks + chunk = fetch(chunk; raw=true) + @test chunk isa Dagger.Chunk{<:oneArray} + @test remotecall_fetch(chunk.handle.owner, chunk) do chunk + devs = collect(oneAPI.devices()) + arr = Dagger.MemPool.poolget(chunk.handle) + return oneAPI.device(arr) == devs[gpu] + end + end + + for fn in (rand, randn, ones, zeros) + DA = rand(Blocks(4, 4), Float32, 8, 8) + for chunk in DA.chunks + chunk = fetch(chunk; raw=true) + @test chunk isa Dagger.Chunk{<:oneArray} + @test remotecall_fetch(chunk.handle.owner, chunk) do chunk + devs = collect(oneAPI.devices()) + arr = Dagger.MemPool.poolget(chunk.handle) + return oneAPI.device(arr) == devs[gpu] + end + end + end + end + end + + #= FIXME: Requires more generic matmul, missing Cholesky methods + @testset "Datadeps (GPU $gpu)" for gpu in gpu_configs + local_scope = Dagger.scope(worker=1, intel_gpus=(gpu == :all ? Colon() : [gpu])) + + DA = rand(Blocks(4, 4), Float32, 8, 8) + DB = rand(Blocks(4, 4), Float32, 8, 8) + + # In-place Matmul + DC = zeros(Blocks(4, 4), Float32, 8, 8) + Dagger.with_options(;scope=local_scope) do + mul!(DC, DA, DB) + end + @test collect(DC) ≈ collect(DA) * collect(DB) + + # Out-of-place Matmul + Dagger.with_options(;scope=local_scope) do + @test collect(DA * DB) ≈ collect(DA) * collect(DB) + end + + # Out-of-place Cholesky + A = rand(Float32, 8, 8) + A = A * A' + A[diagind(A)] .+= size(A, 1) + DA = DArray(A, Blocks(4, 4)) + Dagger.with_options(;scope=local_scope) do + @test collect(cholesky(DA).U) ≈ cholesky(collect(DA)).U + end + end + =# + end +end + +@testset "Metal" begin + if !Dagger.gpu_can_compute(:Metal) + @warn "No Metal devices available, skipping tests" + else + mtlproc = Base.get_extension(Dagger, :MetalExt).MtlArrayDeviceProc + @test Dagger.gpu_processor(:Metal) === mtlproc + b = generate_thunks() + c = Dagger.with_options(;scope=Dagger.scope(metal_gpu=1)) do + @test fetch(Dagger.@spawn isongpu(b)) + Dagger.@spawn sum(b) + end + @test !fetch(Dagger.@spawn isongpu(b)) + @test fetch(Dagger.@spawn identity(c)) == 20 + + @testset "KernelAbstractions" begin + scope = Dagger.scope(metal_gpu=1) + local_scope = Dagger.scope(worker=1, metal_gpu=1) + + A = rand(Float32, 8) + DA, T = Dagger.with_options(;scope) do + fetch(Dagger.@spawn fill_thunk(A, 2.3f0)) + end + @test all(DA .== 2.3f0) + @test T <: MtlArray + + A = Metal.rand(128) + B = Metal.zeros(128) + Dagger.with_options(;scope=local_scope) do + fetch(Dagger.@spawn Kernel(copy_kernel)(B, A; ndrange=length(A))) + Dagger.gpu_synchronize(:Metal) + end + @test all(B .== A) + end + + @testset "DArray Allocation" begin + gpu = 1 + # FIXME: Multi-worker serialization is broken + scope = Dagger.scope(worker=1, metal_gpu=gpu) + + DA_cpu = rand(Blocks(4, 4), Float32, 8, 8) + Dagger.with_options(;scope) do + DA_gpu = similar(DA_cpu) + for chunk in DA_gpu.chunks + chunk = fetch(chunk; raw=true) + @test chunk isa Dagger.Chunk{<:MtlArray} + @test remotecall_fetch(chunk.handle.owner, chunk) do chunk + devs = collect(Metal.devices()) + arr = Dagger.MemPool.poolget(chunk.handle) + return Metal.device(arr) == devs[gpu] + end + end + + for fn in (rand, randn, ones, zeros) + DA = rand(Blocks(4, 4), Float32, 8, 8) + for chunk in DA.chunks + chunk = fetch(chunk; raw=true) + @test chunk isa Dagger.Chunk{<:MtlArray} + @test remotecall_fetch(chunk.handle.owner, chunk) do chunk + devs = collect(Metal.devices()) + arr = Dagger.MemPool.poolget(chunk.handle) + return Metal.device(arr) == devs[gpu] + end + end + end + end + end + + #= FIXME: Requires more generic matmul, missing Cholesky methods + @testset "Datadeps" begin + local_scope = Dagger.scope(worker=1, metal_gpu=1) + + DA = rand(Blocks(4, 4), Float32, 8, 8) + DB = rand(Blocks(4, 4), Float32, 8, 8) + + # In-place Matmul + DC = zeros(Blocks(4, 4), Float32, 8, 8) + Dagger.with_options(;scope=local_scope) do + mul!(DC, DA, DB) + end + @test collect(DC) ≈ collect(DA) * collect(DB) + + # Out-of-place Matmul + Dagger.with_options(;scope=local_scope) do + @test collect(DA * DB) ≈ collect(DA) * collect(DB) + end + + # Out-of-place Cholesky + A = rand(Float32, 8, 8) + A = A * A' + A[diagind(A)] .+= size(A, 1) + DA = DArray(A, Blocks(4, 4)) + Dagger.with_options(;scope=local_scope) do + @test collect(cholesky(DA).U) ≈ cholesky(collect(DA)).U + end + end + =# + + @testset "In-place operations" begin + # Create a page-aligned array. + dims = (2, 2) + T = Float32 + pagesize = ccall(:getpagesize, Cint, ()) + addr = Ref(C_NULL) + + ccall( + :posix_memalign, + Cint, + (Ptr{Ptr{Cvoid}}, Csize_t, Csize_t), addr, + pagesize, + prod(dims) * sizeof(T) + ) + + array = unsafe_wrap( + Array{T, length(dims)}, + reinterpret(Ptr{T}, addr[]), + dims, + own = false + ) + + # Initialize the array. + array[1, 1] = 1 + array[1, 2] = 2 + array[2, 1] = 3 + array[2, 2] = 4 + + # Perform the computation only on a local `MtlArrayDeviceProc` + t = Dagger.@spawn scope=Dagger.scope(worker=1, metal_gpu=1) addarray!(array) + + # Fetch and check the results. + ret = fetch(t) + + @test ret[1, 1] == 2.0f0 + @test ret[1, 2] == 3.0f0 + @test ret[2, 1] == 4.0f0 + @test ret[2, 2] == 5.0f0 + + # Check if the operation happened in-place. + @test_broken array[1, 1] == 2.0f0 + @test_broken array[1, 2] == 3.0f0 + @test_broken array[2, 1] == 4.0f0 + @test_broken array[2, 2] == 5.0f0 + end + end +end + +@testset "OpenCL" begin + if !Dagger.gpu_can_compute(:OpenCL) + @warn "No OpenCL devices available, skipping tests" + else + clproc = Base.get_extension(Dagger, :OpenCLExt).CLArrayDeviceProc + @test Dagger.gpu_processor(:OpenCL) === clproc + ndevices = length(cl.devices(cl.default_platform())) + gpu_configs = Any[1] + if ndevices > 1 + push!(gpu_configs, 2) + end + single_gpu_configs = copy(gpu_configs) + push!(gpu_configs, :all) + + @testset "Arrays (GPU $gpu)" for gpu in gpu_configs + scope = Dagger.scope(cl_devices=(gpu == :all ? Colon() : [gpu])) + + b = generate_thunks() + c = Dagger.with_options(;scope) do + @test fetch(Dagger.@spawn isongpu(b)) + Dagger.@spawn sum(b) + end + @test !fetch(Dagger.@spawn isongpu(b)) + @test fetch(Dagger.@spawn identity(c)) == 20 + end + + @testset "KernelAbstractions (GPU $gpu)" for gpu in gpu_configs + scope = Dagger.scope(cl_devices=(gpu == :all ? Colon() : [gpu])) + local_scope = Dagger.scope(worker=1, cl_devices=(gpu == :all ? Colon() : [gpu])) + + A = rand(Float32, 8) + DA, T = Dagger.with_options(;scope) do + fetch(Dagger.@spawn fill_thunk(A, 2.3f0)) + end + @test all(DA .== 2.3f0) + @test T <: CLArray + + if gpu != :all + local A, B + cl.device!(cl.devices(cl.default_platform())[gpu]) do + A = OpenCL.rand(128) + B = OpenCL.zeros(128) + end + Dagger.with_options(;scope=local_scope) do + fetch(Dagger.@spawn Kernel(copy_kernel)(B, A; ndrange=length(A))) + Dagger.gpu_synchronize(:OpenCL) + end + cl.device!(cl.devices(cl.default_platform())[gpu]) do + @test all(B .== A) + end + end + end + + @testset "DArray Allocation (GPU $gpu)" for gpu in single_gpu_configs + scope = Dagger.scope(cl_device=gpu) + + DA_cpu = rand(Blocks(4, 4), 8, 8) + Dagger.with_options(;scope) do + DA_gpu = similar(DA_cpu) + for chunk in DA_gpu.chunks + chunk = fetch(chunk; raw=true) + @test chunk isa Dagger.Chunk{<:CLArray} + # FIXME + @test_skip remotecall_fetch(chunk.handle.owner, chunk) do chunk + devs = collect(cl.devices(cl.default_platform())) + arr = Dagger.MemPool.poolget(chunk.handle) + arr_queue = arr.data[].queue + OpenCLExt = Base.get_extension(Dagger, :OpenCLExt) + arr_dev = OpenCLExt.DEVICES[findfirst(==(arr_queue), OpenCLExt.QUEUES)] + return arr_dev == devs[gpu] + end + end + + for fn in (rand, randn, ones, zeros) + DA = rand(Blocks(4, 4), 8, 8) + for chunk in DA.chunks + chunk = fetch(chunk; raw=true) + @test chunk isa Dagger.Chunk{<:CLArray} + # FIXME + @test_skip remotecall_fetch(chunk.handle.owner, chunk) do chunk + devs = collect(cl.devices(cl.default_platform())) + arr = Dagger.MemPool.poolget(chunk.handle) + arr_queue = arr.data[].queue + OpenCLExt = Base.get_extension(Dagger, :OpenCLExt) + arr_dev = OpenCLExt.DEVICES[findfirst(==(arr_queue), OpenCLExt.QUEUES)] + return arr_dev == devs[gpu] + end + end + end + end + end + end +end \ No newline at end of file diff --git a/test/runtests.jl b/test/runtests.jl index 79ba890d7..3b5d8de28 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,3 +1,12 @@ +IN_CI = parse(Bool, get(ENV, "CI", "0")) + +USE_CUDA = parse(Bool, get(ENV, "CI_USE_CUDA", "0")) +USE_ROCM = parse(Bool, get(ENV, "CI_USE_ROCM", "0")) +USE_ONEAPI = parse(Bool, get(ENV, "CI_USE_ONEAPI", "0")) +USE_METAL = parse(Bool, get(ENV, "CI_USE_METAL", "0")) +USE_OPENCL = parse(Bool, get(ENV, "CI_USE_OPENCL", "0")) +USE_GPU = USE_CUDA || USE_ROCM || USE_ONEAPI || USE_METAL || USE_OPENCL + tests = [ ("Thunk", "thunk.jl"), ("Scheduler", "scheduler.jl"), @@ -21,14 +30,20 @@ tests = [ ("Array - LinearAlgebra - Cholesky", "array/linalg/cholesky.jl"), ("Array - LinearAlgebra - LU", "array/linalg/lu.jl"), ("Array - Random", "array/random.jl"), + ("GPU", "gpu.jl"), ("Caching", "cache.jl"), ("Disk Caching", "diskcaching.jl"), ("File IO", "file-io.jl"), ("External Languages - Python", "extlang/python.jl"), - ("Preferences", "preferences.jl") + ("Preferences", "preferences.jl"), #("Fault Tolerance", "fault-tolerance.jl"), ] +if USE_GPU + # Only run GPU tests + tests = [("GPU", "gpu.jl")] +end all_test_names = map(test -> replace(last(test), ".jl"=>""), tests) + additional_workers::Int = 3 if PROGRAM_FILE != "" && realpath(PROGRAM_FILE) == @__FILE__