21
21
import argparse
22
22
import multiprocessing as mp
23
23
import os
24
+ import sys
24
25
import tempfile
25
- from functools import partial , update_wrapper
26
- from typing import Callable , List , Tuple
26
+ import traceback
27
+ from functools import partial , update_wrapper , wraps
28
+ from typing import Callable , List , ParamSpec , Tuple , TypeVar
27
29
28
30
import numpy as np
29
31
from loky import get_reusable_executor
@@ -98,34 +100,55 @@ def reset(self) -> None:
98
100
99
101
100
102
def setup_rmm () -> None :
101
- """Setup RMM for GPU-based external memory training."""
103
+ """Setup RMM for GPU-based external memory training.
104
+
105
+ It's important to use RMM with `CudaAsyncMemoryResource` or `ArenaMemoryResource`
106
+ for GPU-based external memory to improve performance. If XGBoost is not built with
107
+ RMM support, a warning is raised when constructing the `DMatrix`.
108
+
109
+ """
102
110
import rmm
111
+ from cuda import cudart
103
112
from rmm .allocators .cupy import rmm_cupy_allocator
113
+ from rmm .mr import ArenaMemoryResource
104
114
105
115
if not xgboost .build_info ()["USE_RMM" ]:
106
116
return
107
117
108
- try :
109
- # Use the arena pool if available
110
- from cuda .bindings import runtime as cudart
111
- from rmm .mr import ArenaMemoryResource
112
-
113
- status , free , total = cudart .cudaMemGetInfo ()
114
- if status != cudart .cudaError_t .cudaSuccess :
115
- raise RuntimeError (cudart .cudaGetErrorString (status ))
116
-
117
- mr = rmm .mr .CudaMemoryResource ()
118
- mr = ArenaMemoryResource (mr , arena_size = int (total * 0.9 ))
119
- except ImportError :
120
- # The combination of pool and async is by design. As XGBoost needs to allocate
121
- # large pages repeatly, it's not easy to handle fragmentation. We can use more
122
- # experiments here.
123
- mr = rmm .mr .PoolMemoryResource (rmm .mr .CudaAsyncMemoryResource ())
124
- rmm .mr .set_current_device_resource (mr )
118
+ status , free , total = cudart .cudaMemGetInfo ()
119
+ if status != cudart .cudaError_t .cudaSuccess :
120
+ raise RuntimeError (cudart .cudaGetErrorString (status ))
121
+
122
+ mr = rmm .mr .CudaMemoryResource ()
123
+ mr = ArenaMemoryResource (mr , arena_size = int (total * 0.9 ))
124
+
125
+ rmm .mr .set_current_device_resource (mr )
125
126
# Set the allocator for cupy as well.
126
127
cp .cuda .set_allocator (rmm_cupy_allocator )
127
128
128
129
130
+ R = TypeVar ("R" )
131
+ P = ParamSpec ("P" )
132
+
133
+
134
+ def try_run (fn : Callable [P , R ]) -> Callable [P , R ]:
135
+ """Loky aborts the process without printing out any error message if there's an
136
+ exception.
137
+
138
+ """
139
+
140
+ @wraps (fn )
141
+ def inner (* args : P .args , ** kwargs : P .kwargs ) -> R :
142
+ try :
143
+ return fn (* args , ** kwargs )
144
+ except Exception as e :
145
+ print (traceback .format_exc (), file = sys .stderr )
146
+ raise RuntimeError ("Running into exception in worker." ) from e
147
+
148
+ return inner
149
+
150
+
151
+ @try_run
129
152
def hist_train (worker_idx : int , tmpdir : str , device : str , rabit_args : dict ) -> None :
130
153
"""The hist tree method can use a special data structure `ExtMemQuantileDMatrix` for
131
154
faster initialization and lower memory usage.
@@ -153,7 +176,11 @@ def hist_train(worker_idx: int, tmpdir: str, device: str, rabit_args: dict) -> N
153
176
)
154
177
# Check the device is correctly set.
155
178
if device == "cuda" :
156
- assert int (os .environ ["CUDA_VISIBLE_DEVICES" ]) < coll .get_world_size ()
179
+ # Check the first device
180
+ assert (
181
+ int (os .environ ["CUDA_VISIBLE_DEVICES" ].split ("," )[0 ])
182
+ < coll .get_world_size ()
183
+ )
157
184
booster = xgboost .train (
158
185
{
159
186
"tree_method" : "hist" ,
@@ -180,8 +207,12 @@ def initializer(device: str) -> None:
180
207
if device == "cuda" :
181
208
# name: LokyProcess-1
182
209
lop , sidx = mp .current_process ().name .split ("-" )
183
- idx = int (sidx ) # 1-based indexing from loky
184
- os .environ ["CUDA_VISIBLE_DEVICES" ] = str (idx - 1 )
210
+ idx = int (sidx ) - 1 # 1-based indexing from loky
211
+ # Assuming two workers for demo.
212
+ devices = "," .join ([str (idx ), str ((idx + 1 ) % n_workers )])
213
+ # P0: CUDA_VISIBLE_DEVICES=0,1
214
+ # P1: CUDA_VISIBLE_DEVICES=1,0
215
+ os .environ ["CUDA_VISIBLE_DEVICES" ] = devices
185
216
setup_rmm ()
186
217
187
218
with get_reusable_executor (
@@ -204,10 +235,6 @@ def initializer(device: str) -> None:
204
235
if args .device == "cuda" :
205
236
import cupy as cp
206
237
207
- # It's important to use RMM with `CudaAsyncMemoryResource`. for GPU-based
208
- # external memory to improve performance. If XGBoost is not built with RMM
209
- # support, a warning is raised when constructing the `DMatrix`.
210
- setup_rmm ()
211
238
with tempfile .TemporaryDirectory () as tmpdir :
212
239
main (tmpdir , args )
213
240
else :
0 commit comments