Skip to content

Commit eccc320

Browse files
authored
remote: use configured jobs number (#5747)
1 parent a984f95 commit eccc320

File tree

6 files changed

+11
-11
lines changed

6 files changed

+11
-11
lines changed

dvc/fs/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class BaseFileSystem:
3838
scheme = "base"
3939
REQUIRES: ClassVar[Dict[str, str]] = {}
4040
PATH_CLS = URLInfo # type: Any
41-
JOBS = 4 * cpu_count()
41+
_JOBS = 4 * cpu_count()
4242

4343
CHECKSUM_DIR_SUFFIX = ".dir"
4444
HASH_JOBS = max(1, min(4, cpu_count() // 2))
@@ -67,7 +67,7 @@ def jobs(self):
6767
return (
6868
self.config.get("jobs")
6969
or (self.repo and self.repo.config["core"].get("jobs"))
70-
or self.JOBS
70+
or self._JOBS
7171
)
7272

7373
@cached_property

dvc/fs/ssh/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def ask_password(host, user, port):
3333
class SSHFileSystem(BaseFileSystem):
3434
scheme = Schemes.SSH
3535
REQUIRES = {"paramiko": "paramiko"}
36-
JOBS = 4
36+
_JOBS = 4
3737

3838
PARAM_CHECKSUM = "md5"
3939
# At any given time some of the connections will go over network and

dvc/objects/db/base.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ def list_hashes_traverse(
229229
keeping all of it in memory.
230230
"""
231231
num_pages = remote_size / self.fs.LIST_OBJECT_PAGE_SIZE
232-
if num_pages < 256 / self.fs.JOBS:
232+
if num_pages < 256 / self.fs.jobs:
233233
# Fetching prefixes in parallel requires at least 255 more
234234
# requests, for small enough remotes it will be faster to fetch
235235
# entire cache without splitting it into prefixes.
@@ -263,7 +263,7 @@ def list_with_update(prefix):
263263
)
264264

265265
with ThreadPoolExecutor(
266-
max_workers=jobs or self.fs.JOBS
266+
max_workers=jobs or self.fs.jobs
267267
) as executor:
268268
in_remote = executor.map(list_with_update, traverse_prefixes,)
269269
yield from itertools.chain.from_iterable(in_remote)
@@ -331,7 +331,7 @@ def exists_with_progress(path_info):
331331
return ret
332332

333333
with ThreadPoolExecutor(
334-
max_workers=jobs or self.fs.JOBS
334+
max_workers=jobs or self.fs.jobs
335335
) as executor:
336336
path_infos = map(self.hash_to_path_info, hashes)
337337
in_remote = executor.map(exists_with_progress, path_infos)

dvc/objects/db/ssh.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,10 @@ def exists_with_progress(chunks):
6464
return self.batch_exists(chunks, callback=pbar.update_msg)
6565

6666
with ThreadPoolExecutor(
67-
max_workers=jobs or self.fs.JOBS
67+
max_workers=jobs or self.fs.jobs
6868
) as executor:
6969
path_infos = [self.hash_to_path_info(x) for x in hashes]
70-
chunks = to_chunks(path_infos, num_chunks=self.fs.JOBS)
70+
chunks = to_chunks(path_infos, num_chunks=self.fs.jobs)
7171
results = executor.map(exists_with_progress, chunks)
7272
in_remote = itertools.chain.from_iterable(results)
7373
ret = list(itertools.compress(hashes, in_remote))

dvc/remote/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ def _process(
318318
desc = "Uploading"
319319

320320
if jobs is None:
321-
jobs = self.fs.JOBS
321+
jobs = self.fs.jobs
322322

323323
dir_status, file_status, dir_contents = self._status(
324324
cache,

tests/unit/remote/test_base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def test_hashes_exist(object_exists, traverse, dvc):
7070
# large remote, large local
7171
object_exists.reset_mock()
7272
traverse.reset_mock()
73-
odb.fs.JOBS = 16
73+
odb.fs._JOBS = 16
7474
with mock.patch.object(odb, "list_hashes", return_value=list(range(256))):
7575
hashes = list(range(1000000))
7676
odb.hashes_exist(hashes)
@@ -94,7 +94,7 @@ def test_list_hashes_traverse(_path_to_hash, list_hashes, dvc):
9494
odb.fs.path_info = PathInfo("foo")
9595

9696
# parallel traverse
97-
size = 256 / odb.fs.JOBS * odb.fs.LIST_OBJECT_PAGE_SIZE
97+
size = 256 / odb.fs._JOBS * odb.fs.LIST_OBJECT_PAGE_SIZE
9898
list(odb.list_hashes_traverse(size, {0}))
9999
for i in range(1, 16):
100100
list_hashes.assert_any_call(

0 commit comments

Comments
 (0)