Skip to content

Commit 44d34f3

Browse files
committed
add prefetcher implementation for regional
1 parent 3121be9 commit 44d34f3

File tree

6 files changed

+1518
-3
lines changed

6 files changed

+1518
-3
lines changed

cloudbuild/run_tests.sh

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,19 @@ case "$TEST_SUITE" in
117117
"--deselect=gcsfs/tests/test_core.py::test_mv_file_cache"
118118
)
119119

120+
# The prefetcher engine is not in this PR. It will be implemented in a separate PR,
121+
# after which this will be removed.
122+
ZONAL_DESELECTS+=(
123+
"--deselect=gcsfs/tests/test_core.py::test_cat_file_routing_and_thresholds"
124+
"--deselect=gcsfs/tests/test_core.py::test_cat_file_concurrent_data_integrity"
125+
"--deselect=gcsfs/tests/test_core.py::test_cat_file_concurrent_exception_cancellation"
126+
"--deselect=gcsfs/tests/test_core.py::test_gcsfile_prefetch_disabled_fallback"
127+
"--deselect=gcsfs/tests/test_core.py::test_gcsfile_prefetch_sequential_integrity"
128+
"--deselect=gcsfs/tests/test_core.py::test_gcsfile_prefetch_random_seek_integrity"
129+
"--deselect=gcsfs/tests/test_core.py::test_gcsfile_multithreaded_read_integrity"
130+
"--deselect=gcsfs/tests/test_core.py::test_gcsfile_not_satisfiable_range"
131+
)
132+
120133
pytest "${ARGS[@]}" "${ZONAL_DESELECTS[@]}" gcsfs/tests/test_core.py
121134
;;
122135
esac

gcsfs/core.py

Lines changed: 100 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from .credentials import GoogleCredentials
3131
from .inventory_report import InventoryReport
3232
from .retry import errs, retry_request, validate_response
33+
from .zb_hns_utils import DEFAULT_CONCURRENCY
3334

3435
logger = logging.getLogger("gcsfs")
3536

@@ -299,6 +300,7 @@ class GCSFileSystem(asyn.AsyncFileSystem):
299300
default_block_size = DEFAULT_BLOCK_SIZE
300301
protocol = "gs", "gcs"
301302
async_impl = True
303+
MIN_CHUNK_SIZE_FOR_CONCURRENCY = 5 * 1024 * 1024
302304

303305
def __init__(
304306
self,
@@ -1166,22 +1168,75 @@ def url(self, path):
11661168
f"&generation={generation}" if generation else "",
11671169
)
11681170

1169-
async def _cat_file(self, path, start=None, end=None, **kwargs):
1171+
async def _cat_file_sequential(self, path, start=None, end=None, **kwargs):
11701172
"""Simple one-shot get of file data"""
11711173
# if start and end are both provided and valid, but start >= end, return empty bytes
11721174
# Otherwise, _process_limits would generate an invalid HTTP range (e.g. "bytes=5-4"
11731175
# for start=5, end=5), causing the server to return the whole file instead of nothing.
11741176
if start is not None and end is not None and start >= end >= 0:
11751177
return b""
1178+
11761179
u2 = self.url(path)
1177-
# 'if start or end' fails when start=0 or end=0 because 0 is Falsey.
11781180
if start is not None or end is not None:
11791181
head = {"Range": await self._process_limits(path, start, end)}
11801182
else:
11811183
head = {}
1184+
11821185
headers, out = await self._call("GET", u2, headers=head)
11831186
return out
11841187

1188+
async def _cat_file_concurrent(
1189+
self, path, start=None, end=None, concurrency=DEFAULT_CONCURRENCY, **kwargs
1190+
):
1191+
"""Concurrent fetch of file data"""
1192+
if start is None:
1193+
start = 0
1194+
if end is None:
1195+
end = (await self._info(path))["size"]
1196+
if start >= end:
1197+
return b""
1198+
1199+
if concurrency <= 1 or end - start < self.MIN_CHUNK_SIZE_FOR_CONCURRENCY:
1200+
return await self._cat_file_sequential(path, start=start, end=end, **kwargs)
1201+
1202+
total_size = end - start
1203+
part_size = total_size // concurrency
1204+
tasks = []
1205+
1206+
for i in range(concurrency):
1207+
offset = start + (i * part_size)
1208+
actual_size = (
1209+
part_size if i < concurrency - 1 else total_size - (i * part_size)
1210+
)
1211+
tasks.append(
1212+
asyncio.create_task(
1213+
self._cat_file_sequential(
1214+
path, start=offset, end=offset + actual_size, **kwargs
1215+
)
1216+
)
1217+
)
1218+
1219+
try:
1220+
results = await asyncio.gather(*tasks)
1221+
return b"".join(results)
1222+
except BaseException as e:
1223+
for t in tasks:
1224+
if not t.done():
1225+
t.cancel()
1226+
await asyncio.gather(*tasks, return_exceptions=True)
1227+
raise e
1228+
1229+
async def _cat_file(
1230+
self, path, start=None, end=None, concurrency=DEFAULT_CONCURRENCY, **kwargs
1231+
):
1232+
"""Simple one-shot, or concurrent get of file data"""
1233+
if concurrency > 1:
1234+
return await self._cat_file_concurrent(
1235+
path, start=start, end=end, concurrency=concurrency, **kwargs
1236+
)
1237+
1238+
return await self._cat_file_sequential(path, start=start, end=end, **kwargs)
1239+
11851240
async def _getxattr(self, path, attr):
11861241
"""Get user-defined metadata attribute"""
11871242
meta = (await self._info(path)).get("metadata", {})
@@ -2020,6 +2075,30 @@ def __init__(
20202075
self.acl = acl
20212076
self.consistency = consistency
20222077
self.checker = get_consistency_checker(consistency)
2078+
2079+
# Ideally, all of these fields should be part of `cache_options`. Because current
2080+
# `fsspec` caches do not accept arbitrary `*args` and `**kwargs`, passing them
2081+
# there currently causes instantiation errors. We are holding off on introducing
2082+
# them as explicit keyword arguments to ensure existing user workloads are not
2083+
# disrupted. This will be refactored once the upstream `fsspec` changes are merged.
2084+
use_prefetch_reader = kwargs.get(
2085+
"use_experimental_adaptive_prefetching", False
2086+
) or os.environ.get("use_experimental_adaptive_prefetching", False)
2087+
self.concurrency = kwargs.get("concurrency", DEFAULT_CONCURRENCY)
2088+
2089+
if use_prefetch_reader:
2090+
max_prefetch_size = kwargs.get("max_prefetch_size", None)
2091+
from .prefetcher import BackgroundPrefetcher
2092+
2093+
self._prefetch_engine = BackgroundPrefetcher(
2094+
self._async_fetch_range,
2095+
self.size,
2096+
max_prefetch_size=max_prefetch_size,
2097+
concurrency=self.concurrency,
2098+
)
2099+
else:
2100+
self._prefetch_engine = None
2101+
20232102
# _supports_append is an internal argument not meant to be used directly.
20242103
# If True, allows opening file in append mode. This is generally not supported
20252104
# by GCS, but may be supported by subclasses (e.g. ZonalFile). This flag should
@@ -2202,12 +2281,30 @@ def _fetch_range(self, start=None, end=None):
22022281
if not both None, fetch only given range
22032282
"""
22042283
try:
2205-
return self.gcsfs.cat_file(self.path, start=start, end=end)
2284+
if self._prefetch_engine:
2285+
return self._prefetch_engine._fetch(start=start, end=end)
2286+
return self.gcsfs.cat_file(
2287+
self.path, start=start, end=end, concurrency=self.concurrency
2288+
)
22062289
except RuntimeError as e:
22072290
if "not satisfiable" in str(e):
22082291
return b""
22092292
raise
22102293

2294+
async def _async_fetch_range(self, start_offset, total_size, split_factor=1):
2295+
"""Async fetcher mapped to the Prefetcher engine for regional buckets."""
2296+
return await self.gcsfs._cat_file_concurrent(
2297+
self.path,
2298+
start=start_offset,
2299+
end=start_offset + total_size,
2300+
concurrency=split_factor,
2301+
)
2302+
2303+
def close(self):
2304+
if hasattr(self, "_prefetch_engine") and self._prefetch_engine:
2305+
self._prefetch_engine.close()
2306+
super().close()
2307+
22112308

22122309
def _convert_fixed_key_metadata(metadata, *, from_google=False):
22132310
"""

0 commit comments

Comments
 (0)