Skip to content

Commit 0b2d3fb

Browse files
committed
small change to mp behaviour
1 parent b700d9a commit 0b2d3fb

File tree

1 file changed

+53
-19
lines changed

1 file changed

+53
-19
lines changed

dem_handler/download/aio_aws.py

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -142,19 +142,19 @@ async def upload(to, lp, cf, bn, sess):
142142
asyncio.run(upload(tile_objects, local_paths, config, bucket_name, session))
143143

144144

145-
def download_dem_tiles_async(
145+
def bulk_download_dem_tiles(
146146
tile_objects: list[Path],
147147
save_folder: Path,
148148
bucket_name: str = "copernicus-dem-30m",
149149
config: Config = Config(
150150
region_name="eu-central-1",
151151
retries={"max_attempts": 3, "mode": "standard"},
152152
),
153-
num_cpu: int = 4,
154-
num_tasks: int = 4,
153+
num_cpu: int = 1,
154+
num_tasks: int = 8,
155155
session: aioboto3.Session | None = None,
156-
):
157-
"""Asynchronous download of objects from S3
156+
) -> list[Path]:
157+
"""Asynchronous download of DEM objects from S3
158158
159159
Parameters
160160
----------
@@ -167,24 +167,37 @@ def download_dem_tiles_async(
167167
config : Config, optional
168168
botorcore Config, by default Config( signature_version="", region_name="eu-central-1", retries={"max_attempts": 3, "mode": "standard"}, )
169169
num_cpu : int, optional
170-
Number of cpus to be used for multi-processing, by default 4
170+
Number of cpus to be used for multi-processing, by default 1.
171+
Setting to -1 will use all available cpus
171172
num_tasks : int, optional
172-
Number of tasks to be run in async mode, by default 4
173-
If num_cps > 1, each task will be assigned to a cpu and will run in async mode on that cpu (multiple threads)
173+
Number of tasks to be run in async mode, by default 8
174+
If num_cpus > 1, each task will be assigned to a cpu and will run in async mode on that cpu (multiple threads).
175+
Setting to -1 will transfer all tiles in one task.
174176
session : aioboto3.Session | None, optional
175177
aioboto3.Session, by default None
178+
179+
Returns
180+
-------
181+
list[Path]
182+
List of local paths to the saved files.
176183
"""
177184

178185
if not session:
179186
session = aioboto3.Session()
180187
config.signature_version = ""
181188

182189
os.makedirs(save_folder, exist_ok=True)
183-
download_list_chunk = [tile_objects[i::num_tasks] for i in range(num_tasks)]
190+
download_list_chunk = (
191+
[tile_objects[i::num_tasks] for i in range(num_tasks)]
192+
if num_tasks != -1
193+
else [tile_objects]
194+
)
184195
if num_cpu == 1:
185196
for ch in download_list_chunk:
186197
single_download_process(ch, save_folder, config, bucket_name, session)
187198
else:
199+
if num_cpu == -1:
200+
num_cpu = mp.cpu_count()
188201
with mp.Pool(num_cpu) as p:
189202
p.starmap(
190203
single_download_process,
@@ -194,20 +207,22 @@ def download_dem_tiles_async(
194207
],
195208
)
196209

210+
return [save_folder / t.name for t in tile_objects]
211+
197212

198-
def upload_dem_tiles_async(
213+
def bulk_upload_dem_tiles(
199214
s3_dir: Path,
200215
local_dir: Path,
201216
bucket_name: str = "deant-data-public-dev",
202217
config: Config = Config(
203218
region_name="ap-southeast-2",
204219
retries={"max_attempts": 3, "mode": "standard"},
205220
),
206-
num_cpu: int = 4,
207-
num_tasks: int = 4,
221+
num_cpu: int = 1,
222+
num_tasks: int = 8,
208223
session: aioboto3.Session | None = None,
209-
):
210-
"""Asynchronous upload of objects to S3
224+
) -> list[Path]:
225+
"""Asynchronous upload of DEM objects to S3
211226
212227
Parameters
213228
----------
@@ -220,12 +235,19 @@ def upload_dem_tiles_async(
220235
config : Config, optional
221236
botorcore Config, by default Config( region_name="ap-southeast-2", retries={"max_attempts": 3, "mode": "standard"}, )
222237
num_cpu : int, optional
223-
Number of cpus to be used for multi-processing, by default 4
238+
Number of cpus to be used for multi-processing, by default 1.
239+
Setting to -1 will use all available cpus
224240
num_tasks : int, optional
225-
Number of tasks to be run in async mode, by default 4
226-
If num_cps > 1, each task will be assigned to a cpu and will run in async mode on that cpu (multiple threads)
241+
Number of tasks to be run in async mode, by default 8
242+
If num_cpus > 1, each task will be assigned to a cpu and will run in async mode on that cpu (multiple threads).
243+
Setting to -1 will transfer all tiles in one task.
227244
session : aioboto3.Session | None, optional
228245
aioboto3.Session, by default None
246+
247+
Returns
248+
-------
249+
list[Path]
250+
List of remote paths on S3.
229251
"""
230252

231253
if not session:
@@ -244,12 +266,22 @@ def upload_dem_tiles_async(
244266
tiles_dirs = [Path(*tp.parts[1:]) for tp in tile_paths]
245267
tile_objects = [s3_dir / td for td in tiles_dirs]
246268

247-
upload_list_chunk = [tile_objects[i::num_tasks] for i in range(num_tasks)]
248-
local_list_chunk = [tile_paths[i::num_tasks] for i in range(num_tasks)]
269+
upload_list_chunk = (
270+
[tile_objects[i::num_tasks] for i in range(num_tasks)]
271+
if num_tasks != -1
272+
else [tile_objects]
273+
)
274+
local_list_chunk = (
275+
[tile_paths[i::num_tasks] for i in range(num_tasks)]
276+
if num_tasks != -1
277+
else [tile_paths]
278+
)
249279
if num_cpu == 1:
250280
for ch, ll in zip(upload_list_chunk, local_list_chunk):
251281
single_upload_process(ch, ll, config, bucket_name, session)
252282
else:
283+
if num_cpu == -1:
284+
num_cpu = mp.cpu_count()
253285
with mp.Pool(num_cpu) as p:
254286
p.starmap(
255287
single_upload_process,
@@ -258,3 +290,5 @@ def upload_dem_tiles_async(
258290
for el in list(zip(upload_list_chunk, local_list_chunk))
259291
],
260292
)
293+
294+
return tile_objects

0 commit comments

Comments
 (0)