Skip to content

Commit 99c479e

Browse files
authored
Improve file download performance (#319)
## Changes The change to use a chunked iterator for streaming responses caused a performance regression when streaming raw responses, as the default chunk size from the requests library is 1 byte. To identify an appropriate chunk size to use, I added a benchmark integration test that uploads a 50mb file and attempts to download it with a number of various chunk sizes. Based on this benchmarking, it seems like we don't have any real speed-up after around 100KiB chunks. Additionally, I added a check here to ensure that `read()` raises a ValueError if the response was ever closed, as it is not allowed to read multiple times from a streaming response. ## Tests - [x] Added an integration test to verify `read()` raises ValueError on second attempt to read. - [x] Added a benchmark to stress-test file download performance. ``` [chunk size 1kb] Average time to download: 129.83376684188843 [chunk size 2kb] Average time to download: 69.17960963249206 [chunk size 5kb] Average time to download: 34.08896443843842 [chunk size 10kb] Average time to download: 19.392758226394655 [chunk size 20kb] Average time to download: 10.74389090538025 [chunk size 50kb] Average time to download: 5.658655261993408 [chunk size 100kb] Average time to download: 3.982270860671997 [chunk size 200kb] Average time to download: 4.485624170303344 [chunk size 500kb] Average time to download: 4.236340761184692 [chunk size 1000kb] Average time to download: 4.695496129989624 [chunk size 2000kb] Average time to download: 4.6709349155426025 [chunk size 5000kb] Average time to download: 4.5816244840621945 [chunk size 10000kb] Average time to download: 4.32698233127594 [chunk size 20000kb] Average time to download: 4.625458240509033 [chunk size 50000kb] Average time to download: 4.405146503448487 {1: 129.83376684188843, 2: 69.17960963249206, 5: 34.08896443843842, 10: 19.392758226394655, 20: 10.74389090538025, 50: 5.658655261993408, 100: 3.982270860671997, 200: 4.485624170303344, 500: 4.236340761184692, 1000: 4.695496129989624, 2000: 4.6709349155426025, 5000: 4.5816244840621945, 10000: 4.32698233127594, 20000: 4.625458240509033, 50000: 4.405146503448487} Fastest chunk size: 100 kb, 3.982270860671997 seconds ```
1 parent ca4aeb5 commit 99c479e

File tree

5 files changed

+92
-10
lines changed

5 files changed

+92
-10
lines changed

Makefile

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@ lint:
2121
autoflake --check-diff --quiet --recursive databricks
2222

2323
test:
24-
pytest -m 'not integration' --cov=databricks --cov-report html tests
24+
pytest -m 'not integration and not benchmark' --cov=databricks --cov-report html tests
2525

2626
integration:
27-
pytest -n auto -m 'integration' --cov=databricks --cov-report html tests
27+
pytest -n auto -m 'integration and not benchmark' --cov=databricks --cov-report html tests
28+
29+
benchmark:
30+
pytest -m 'benchmark' tests
2831

2932
coverage: test
3033
open htmlcov/index.html

databricks/sdk/core.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1131,26 +1131,41 @@ def _redacted_dump(self, prefix: str, body: str) -> str:
11311131
class StreamingResponse(BinaryIO):
11321132
_response: requests.Response
11331133
_buffer: bytes
1134-
_content: Iterator[bytes]
1134+
_content: Union[Iterator[bytes], None]
1135+
_chunk_size: Union[int, None]
1136+
_closed: bool = False
11351137

1136-
def __init__(self, response: requests.Response):
1138+
def fileno(self) -> int:
1139+
pass
1140+
1141+
def flush(self) -> int:
1142+
pass
1143+
1144+
def __init__(self, response: requests.Response, chunk_size: Union[int, None] = None):
11371145
self._response = response
11381146
self._buffer = b''
11391147
self._content = None
1148+
self._chunk_size = chunk_size
11401149

11411150
def __enter__(self) -> BinaryIO:
1142-
self._content = self._response.iter_content()
1151+
self._content = self._response.iter_content(chunk_size=self._chunk_size)
11431152
return self
11441153

1154+
def set_chunk_size(self, chunk_size: Union[int, None]) -> None:
1155+
self._chunk_size = chunk_size
1156+
11451157
def close(self) -> None:
11461158
self._response.close()
1159+
self._closed = True
11471160

11481161
def isatty(self) -> bool:
11491162
return False
11501163

11511164
def read(self, n: int = -1) -> bytes:
1152-
if self._content is None:
1153-
self._content = self._response.iter_content()
1165+
if self._closed is None:
1166+
raise ValueError("I/O operation on closed file")
1167+
if not self._content:
1168+
self._content = self._response.iter_content(chunk_size=self._chunk_size)
11541169
read_everything = n < 0
11551170
remaining_bytes = n
11561171
res = b''
@@ -1191,7 +1206,7 @@ def truncate(self, __size: Union[int, None] = ...) -> int:
11911206
def writable(self) -> bool:
11921207
return False
11931208

1194-
def write(self, s: bytes) -> int:
1209+
def write(self, s: Union[bytes, bytearray]) -> int:
11951210
raise NotImplementedError()
11961211

11971212
def writelines(self, lines: Iterable[bytes]) -> None:

tests/integration/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ def pytest_configure(config):
2323

2424
config.addinivalue_line('markers',
2525
'integration: marks tests as those requiring a real Databricks backend')
26+
config.addinivalue_line('markers',
27+
'benchmark: marks tests as benchmarks which should not be run by default')
2628

2729

2830
def pytest_collection_modifyitems(items):

tests/integration/test_files.py

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import io
2+
import logging
23
import pathlib
3-
from typing import Callable, List
4+
import time
5+
from typing import Callable, List, Tuple, Union
46

57
import pytest
68

@@ -227,3 +229,63 @@ def test_files_api_upload_download(ucws, random):
227229
assert f.read() == b"some text data"
228230

229231
w.files.delete(target_file)
232+
233+
234+
def test_files_api_read_twice_from_one_download(ucws, random):
235+
w = ucws
236+
schema = 'filesit-' + random()
237+
volume = 'filesit-' + random()
238+
with ResourceWithCleanup.create_schema(w, 'main', schema):
239+
with ResourceWithCleanup.create_volume(w, 'main', schema, volume):
240+
f = io.BytesIO(b"some text data")
241+
target_file = f'/Volumes/main/{schema}/{volume}/filesit-{random()}.txt'
242+
w.files.upload(target_file, f)
243+
244+
res = w.files.download(target_file).contents
245+
246+
with res:
247+
assert res.read() == b"some text data"
248+
249+
with pytest.raises(ValueError):
250+
with res:
251+
res.read()
252+
253+
254+
@pytest.mark.benchmark
255+
def test_files_api_download_benchmark(ucws, random):
256+
w = ucws
257+
schema = 'filesit-' + random()
258+
volume = 'filesit-' + random()
259+
with ResourceWithCleanup.create_schema(w, 'main', schema):
260+
with ResourceWithCleanup.create_volume(w, 'main', schema, volume):
261+
# Create a 50 MB file
262+
f = io.BytesIO(bytes(range(256)) * 200000)
263+
target_file = f'/Volumes/main/{schema}/{volume}/filesit-benchmark-{random()}.txt'
264+
w.files.upload(target_file, f)
265+
266+
totals = {}
267+
for chunk_size_kb in [20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 50000, None]:
268+
chunk_size = chunk_size_kb * 1024 if chunk_size_kb else None
269+
total = 0
270+
count = 10
271+
for i in range(count):
272+
start = time.time()
273+
f = w.files.download(target_file).contents
274+
f.set_chunk_size(chunk_size)
275+
with f as vf:
276+
vf.read()
277+
end = time.time()
278+
total += end - start
279+
avg_time = total / count
280+
logging.info(f"[chunk_size=%s] Average time to download: %f seconds",
281+
str(chunk_size_kb) + 'kb' if chunk_size_kb else 'None', avg_time)
282+
totals[chunk_size_kb] = avg_time
283+
logging.info("Benchmark results:")
284+
best: Tuple[Union[int, None], Union[float, None]] = (None, None)
285+
for k, v in totals.items():
286+
if best[1] is None or v < best[1]:
287+
best = (k, v)
288+
logging.info(f"[chunk_size=%s] Average time to download: %f seconds",
289+
str(k) + 'kb' if k else 'None', v)
290+
min_str = str(best[0]) + "kb" if best[0] else "None"
291+
logging.info("Fastest chunk size: %s in %f seconds", min_str, best[1])

tests/test_core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ class DummyResponse:
264264
def __init__(self, content: List[bytes]) -> None:
265265
self._content = iter(content)
266266

267-
def iter_content(self) -> Iterator[bytes]:
267+
def iter_content(self, chunk_size: int = 1) -> Iterator[bytes]:
268268
return self._content
269269

270270
def close(self):

0 commit comments

Comments
 (0)