Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
5d58213
local files for benchmarking
chandra-siri Dec 3, 2025
c797586
Merge branch 'main' of github.com:googleapis/python-storage into bench
chandra-siri Dec 23, 2025
20d2d2d
add test_reads.py for microbenchmarking reads
chandra-siri Dec 27, 2025
f493bd8
push local files
chandra-siri Dec 27, 2025
68c8ba0
1p 1c working copy
chandra-siri Dec 28, 2025
9e2afa8
Add microbenchmarking tests and utility functions for performance ana…
chandra-siri Dec 28, 2025
3ffc98d
Update microbenchmark configuration and tests for improved performanc…
chandra-siri Dec 28, 2025
bef9dcb
upload local changes
chandra-siri Dec 29, 2025
75007a7
just upload one
chandra-siri Dec 30, 2025
a85fff1
Refactor get_persisted_size_async to improve logging and update get_p…
chandra-siri Dec 31, 2025
4c24f66
working copy
chandra-siri Jan 2, 2026
e216644
add regional tests
chandra-siri Jan 3, 2026
80120a1
Add JSON to CSV conversion script and update benchmark tests for mult…
chandra-siri Jan 3, 2026
99bc3eb
Refactor benchmark configuration and cleanup unused code in test_read…
chandra-siri Jan 3, 2026
f4a622b
Merge branch 'main' of github.com:googleapis/python-storage into bench
chandra-siri Jan 3, 2026
af98e0e
Implement write benchmarks
chandra-siri Jan 3, 2026
1405e92
Merge branch 'main' of github.com:googleapis/python-storage into bench
chandra-siri Jan 4, 2026
3c7e7af
Merge branch 'bench' of github.com:googleapis/python-storage into bench
chandra-siri Jan 4, 2026
970b162
working copy
chandra-siri Jan 8, 2026
0bf17c7
Add benchmarks for downloading and uploading large objects, and impro…
chandra-siri Jan 11, 2026
a7309ac
revert changes in `samples/snippets/storage_list_files_with_prefix.py`
chandra-siri Jan 11, 2026
829f0f4
Remove unused test utility file in asyncio tests
chandra-siri Jan 11, 2026
72e98d6
Remove deprecated benchmark scripts for downloading and uploading obj…
chandra-siri Jan 11, 2026
5586aa6
Remove crc32 benchmark script
chandra-siri Jan 11, 2026
e3797e4
add 'read_rand_multi_coros' in `test_downloads_single_proc_multi_coro`
chandra-siri Jan 11, 2026
6bf72a0
Addressed gemini comments
chandra-siri Jan 11, 2026
9d571a1
fix kokoro lint failures: Refactor print statements in benchmark util…
chandra-siri Jan 11, 2026
b03f712
Add license headers to microbenchmark test files
chandra-siri Jan 11, 2026
6fab6ea
move json_to_csv inside benchmarks folder
chandra-siri Jan 11, 2026
b5d26c5
Merge branch 'main' into bench
chandra-siri Jan 12, 2026
9873084
move reads & writes into their folders
chandra-siri Jan 15, 2026
3e5435e
move writes benchmarking into another PR
chandra-siri Jan 15, 2026
7dd5e00
Merge branch 'main' into bench
chandra-siri Jan 15, 2026
3c0dac2
refactor: reorganize benchmark parameters add each parameter.py in re…
chandra-siri Jan 15, 2026
11ff2f6
move writes into another PR
chandra-siri Jan 15, 2026
4a94ed6
remove write related changes from contest.py
chandra-siri Jan 15, 2026
1298818
fix: update bucket map to use environment variables for default values
chandra-siri Jan 15, 2026
ffc7cd1
Apply suggestion from @gemini-code-assist[bot]
chandra-siri Jan 15, 2026
799bc99
refactor: simplify throughput calculations and improve function docum…
chandra-siri Jan 15, 2026
a20622c
refactor: enhance docstrings for clarity and detail in read benchmarks
chandra-siri Jan 15, 2026
91553d8
add README for performance microbenchmarks with usage instructions
chandra-siri Jan 15, 2026
a283c89
add testing dependencies for benchmarking
chandra-siri Jan 15, 2026
d5a7efd
refactor: improve docstring for _get_params function and simplify par…
chandra-siri Jan 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions benchmarks/async_tasks_downloade_mp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import asyncio
import time
from io import BytesIO
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Pool


from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
AsyncMultiRangeDownloader,
)

BUCKET_NAME = "chandrasiri-rs"
OBJECT_SIZE = 100 * 1024 * 1024


async def download_object_async(bucket_name, object_name, client=None):
"""Downloads a single object."""
if client is None:
client = AsyncGrpcClient().grpc_client

mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name)
await mrd.open()
buffer = BytesIO()
await mrd.download_ranges(read_ranges=[(0, 0, buffer)])
await mrd.close()

assert buffer.getbuffer().nbytes == OBJECT_SIZE

# Save the downloaded object to a local file
# with open(object_name, "wb") as f:
# f.write(buffer.getvalue())

print(f"Finished downloading {object_name}")


async def download_objects_pool(start_obj_num, end_obj_num):
""" """
print(f"starting for {start_obj_num}, {end_obj_num}")

client = AsyncGrpcClient().grpc_client
tasks = []
pool_start_time = time.monotonic_ns()
for obj_num in range(start_obj_num, end_obj_num):
tasks.append(
asyncio.create_task(
download_object_async(BUCKET_NAME, f"para_64-{obj_num}", client=client)
)
)

await asyncio.gather(*tasks)
pool_end_time = time.monotonic_ns()
print(
f"for {start_obj_num} , {end_obj_num}, {end_obj_num - start_obj_num} tasks done! in {(pool_end_time - pool_start_time) / (10**9)}s"
)


def async_runner(start_obj_num, end_obj_num):
asyncio.run(download_objects_pool(start_obj_num, end_obj_num))


def main():
num_object = 3000
process_count = 60
objects_per_process = num_object // process_count # 150
args = []
start_obj_num = 0
for _ in range(process_count):
args.append((start_obj_num, start_obj_num + objects_per_process))
start_obj_num += objects_per_process
# print(f"start {process_count} proc")
start_time_proc = time.monotonic_ns()
print(args, len(args))

with Pool(processes=process_count) as pool:
results = pool.starmap(async_runner, args)
end_time_proc = time.monotonic_ns()

print(
f"TOTAL: bytes - {num_object*OBJECT_SIZE}, time: {(end_time_proc - start_time_proc) / (10**9)}s"
)
print(
f"Throuput: {num_object*OBJECT_SIZE /((end_time_proc - start_time_proc) / (10**9))*10**-6} MBps"
)


if __name__ == "__main__":
main()
70 changes: 70 additions & 0 deletions benchmarks/async_tasks_downloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import asyncio
import time
from io import BytesIO
from concurrent.futures import ThreadPoolExecutor

from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
AsyncMultiRangeDownloader,
)

BUCKET_NAME = "chandrasiri-rs"
OBJECT_SIZE = 100 * 1024 * 1024


async def download_object_async(bucket_name, object_name, client=None):
"""Downloads a single object."""
if client is None:
client = AsyncGrpcClient().grpc_client

mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name)
await mrd.open()
buffer = BytesIO()
await mrd.download_ranges(read_ranges=[(0, 0, buffer)])
await mrd.close()

assert buffer.getbuffer().nbytes == OBJECT_SIZE

# Save the downloaded object to a local file
# with open(object_name, "wb") as f:
# f.write(buffer.getvalue())

# print(f"Finished downloading {object_name}")


async def download_objects_pool(start_obj_num, end_obj_num):
""" """

client = AsyncGrpcClient().grpc_client
tasks = []
pool_start_time = time.monotonic_ns()
for obj_num in range(start_obj_num, end_obj_num):
tasks.append(
asyncio.create_task(
download_object_async(BUCKET_NAME, f"para_64-{obj_num}", client=client)
)
)

await asyncio.gather(*tasks)
pool_end_time = time.monotonic_ns()
print(
f"{end_obj_num - start_obj_num} tasks done! in {(pool_end_time - pool_start_time) / (10**9)}s"
)


async def main():
"""Main function to orchestrate parallel downloads using threads."""
num_objects = 1000
pool_size = 100
start_time = time.monotonic_ns()

for i in range(0, num_objects, pool_size):
await download_objects_pool(i, i + pool_size)
end_time = time.monotonic_ns()
print(
f"FINSHED: total bytes downloaded - {num_objects*OBJECT_SIZE} in time {(end_time - start_time) / (10**9)}s"
)


if __name__ == "__main__":
asyncio.run(main())
181 changes: 181 additions & 0 deletions benchmarks/download_one_object_using_n_streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import argparse
import asyncio
from io import BytesIO
import os
import time
import threading

from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
AsyncMultiRangeDownloader,
)


async def download_range_async(
client, bucket_name, object_name, start_byte, end_byte, chunk_size
):
"""
Downloads a specific byte range of an object.
This is a modified version of the original download_one_async, adapted to
download a portion of an object.
"""
download_size = end_byte - start_byte
print(
f"Downloading {object_name} from byte {start_byte} to {end_byte} (size {download_size}) in chunks of {chunk_size} from {bucket_name} from process {os.getpid()} and thread {threading.get_ident()}"
)

mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name)
await mrd.open()

offset = 0
output_buffer = BytesIO()

start_time = time.perf_counter()
while offset < download_size:
bytes_to_download = min(chunk_size, download_size - offset)
await mrd.download_ranges(
[(start_byte + offset, bytes_to_download, output_buffer)]
)
offset += bytes_to_download
end_time = time.perf_counter()

elapsed_time = end_time - start_time
throughput_mbs = (
(download_size / elapsed_time) / (1000 * 1000) if elapsed_time > 0 else 0
)

print(f"Time taken for download loop: {elapsed_time:.4f} seconds")
print(f"Throughput for this range: {throughput_mbs:.2f} MB/s")

assert (
output_buffer.getbuffer().nbytes == download_size
), f"downloaded size incorrect for portion of {object_name}"

await mrd.close()
return output_buffer


async def download_one_object_with_n_streams_async(
bucket_name, object_name, download_size, chunk_size, num_streams
):
"""
Downloads a single object using 'n' concurrent streams.
It divides the object into 'n' parts and creates an async task to download each part.
"""
print(
f"Downloading {object_name} of size {download_size} from {bucket_name} using {num_streams} streams."
)

# Create one client to be shared by all download tasks.
client = AsyncGrpcClient().grpc_client

tasks = []

# Calculate the byte range for each stream.
portion_size = download_size // num_streams

for i in range(num_streams):
start = i * portion_size
end = start + portion_size
if i == num_streams - 1:
# The last stream downloads any remaining bytes.
end = download_size

task = asyncio.create_task(
download_range_async(
client, bucket_name, object_name, start, end, chunk_size
)
)
tasks.append(task)

# Wait for all download tasks to complete.
downloaded_parts = await asyncio.gather(*tasks)

# Stitch the downloaded parts together in the correct order.
final_buffer = BytesIO()
for part in downloaded_parts:
final_buffer.write(part.getbuffer())

# Verify the final size.
final_size = final_buffer.getbuffer().nbytes
assert (
final_size == download_size
), f"Downloaded size incorrect for {object_name}. Expected {download_size}, got {final_size}"
print(f"Successfully downloaded {object_name} with size {final_size}")


def main():
parser = argparse.ArgumentParser(
description="Download a single GCS object using multiple concurrent streams."
)
parser.add_argument("--bucket_name", type=str, default="chandrasiri-rs")
parser.add_argument(
"--download_size", type=int, default=1024 * 1024 * 1024
) # 1 GiB
parser.add_argument(
"--chunk_size", type=int, default=64 * 1024 * 1024
) # 64 MiB
parser.add_argument(
"--count",
type=int,
default=1,
help="Number of times to run the download (for benchmarking).",
)
parser.add_argument(
"--start_object_num",
type=int,
default=0,
help="The number of the object to download (e.g., py-sdk-mb-mt-{start_object_num}).",
)
parser.add_argument(
"-n",
"--num_workers",
type=int,
default=10,
help="Number of streams to use for downloading.",
)
args = parser.parse_args()

total_start_time = time.perf_counter()

object_name = f"py-sdk-mb-mt-{args.start_object_num}"

for i in range(args.count):
print(f"\n--- Starting download run {i+1}/{args.count} ---")
run_start_time = time.perf_counter()

asyncio.run(
download_one_object_with_n_streams_async(
args.bucket_name,
object_name,
args.download_size,
args.chunk_size,
args.num_workers,
)
)

run_end_time = time.perf_counter()
run_latency = run_end_time - run_start_time
run_throughput = (args.download_size / run_latency) / (1000 * 1000)
print(f"Run {i+1} throughput: {run_throughput:.2f} MB/s")

total_end_time = time.perf_counter()
total_latency = total_end_time - total_start_time
total_downloaded_bytes = args.download_size * args.count
aggregate_throughput = (total_downloaded_bytes / total_latency) / (
1000 * 1000
) # MB/s

print("\n--- Aggregate Results ---")
print(f"Total download runs: {args.count}")
print(f"Object name: {object_name}")
print(
f"Total data downloaded: {total_downloaded_bytes / (1024*1024*1024):.2f} GiB"
)
print(f"Total time taken: {total_latency:.2f} seconds")
print(f"Aggregate throughput: {aggregate_throughput:.2f} MB/s")
print(f"Number of streams used per download: {args.num_workers}")


if __name__ == "__main__":
main()
Loading