Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e2a595a
feat: compute chunk wise checksum for bidi_writes
chandra-siri Dec 18, 2025
4f3d18e
move common code to utils
chandra-siri Dec 18, 2025
62a2c89
add and fix failing unit tests
chandra-siri Dec 18, 2025
dddf6c2
Merge branch 'main' into bidi_writes_checksum
chandra-siri Dec 18, 2025
e89e205
add license info in _utils file
chandra-siri Dec 18, 2025
f1abcaa
Merge branch 'bidi_writes_checksum' of github.com:googleapis/python-s…
chandra-siri Dec 18, 2025
9353e58
use FailedPreCondition instead of NotFound
chandra-siri Dec 18, 2025
2b1d6ce
chore: add test cases for large objects
chandra-siri Dec 18, 2025
cf9597c
feat: provide flush size to be configurable
chandra-siri Dec 18, 2025
b285a40
remove unused imports
chandra-siri Dec 18, 2025
df42160
add unit tests and idomatic
chandra-siri Dec 18, 2025
2dc9f61
feat: implement "append_from_file"
chandra-siri Dec 18, 2025
262480d
add imports
chandra-siri Dec 18, 2025
fad8a88
Merge branch 'main' of github.com:googleapis/python-storage into sys_…
chandra-siri Dec 19, 2025
def00f3
address nit comments
chandra-siri Dec 19, 2025
2fbe83d
add assert statements
chandra-siri Dec 19, 2025
5bc78db
remove unused imports
chandra-siri Dec 19, 2025
8f3c2be
Merge branch 'sys_test_large_obj' of github.com:googleapis/python-sto…
chandra-siri Dec 19, 2025
842e5e2
Merge branch 'configurable_flush_interval' of github.com:googleapis/p…
chandra-siri Dec 19, 2025
bdb5b93
Merge branch 'main' of github.com:googleapis/python-storage into helpers
chandra-siri Dec 19, 2025
687072a
Merge branch 'main' into helpers
chandra-siri Dec 20, 2025
3f0d75a
Merge branch 'main' of github.com:googleapis/python-storage into helpers
chandra-siri Dec 20, 2025
2f1c0dc
increase the open file descriptors limit
chandra-siri Dec 22, 2025
ce6c2f6
Merge branch 'helpers' of github.com:googleapis/python-storage into h…
chandra-siri Dec 22, 2025
3de9d7a
apply limit on cloudbuild.yaml
chandra-siri Dec 22, 2025
bff780b
use _ in front of variable name for cloud build
chandra-siri Dec 22, 2025
af94e3a
print current ulimit
chandra-siri Dec 22, 2025
cbfb53e
increase ulimit to 1048576
chandra-siri Dec 22, 2025
e4aef2b
use $$ instead of $ for $CURRENT_PATH
chandra-siri Dec 22, 2025
5b53fd1
Merge branch 'main' into helpers
chandra-siri Dec 22, 2025
bab7677
reset .yaml file to main
chandra-siri Dec 22, 2025
f367344
Merge branch 'helpers' of github.com:googleapis/python-storage into h…
chandra-siri Dec 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions google/cloud/storage/_experimental/asyncio/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import google_crc32c

from google.api_core import exceptions

def raise_if_no_fast_crc32c():
"""Check if the C-accelerated version of google-crc32c is available.

If not, raise an error to prevent silent performance degradation.

raises google.api_core.exceptions.FailedPrecondition: If the C extension is not available.
returns: True if the C extension is available.
rtype: bool

"""
if google_crc32c.implementation != "c":
raise exceptions.FailedPrecondition(
"The google-crc32c package is not installed with C support. "
"C extension is required for faster data integrity checks."
"For more information, see https://github.com/googleapis/python-crc32c."
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@

"""
from typing import Optional, Union

from google.api_core import exceptions
from google_crc32c import Checksum

from ._utils import raise_if_no_fast_crc32c
from google.cloud import _storage_v2
from google.cloud.storage._experimental.asyncio.async_grpc_client import (
AsyncGrpcClient,
Expand All @@ -32,7 +37,7 @@


_MAX_CHUNK_SIZE_BYTES = 2 * 1024 * 1024 # 2 MiB
_MAX_BUFFER_SIZE_BYTES = 16 * 1024 * 1024 # 16 MiB
_DEFAULT_FLUSH_INTERVAL_BYTES = 16 * 1024 * 1024 # 16 MiB


class AsyncAppendableObjectWriter:
Expand All @@ -45,6 +50,7 @@ def __init__(
object_name: str,
generation=None,
write_handle=None,
writer_options: Optional[dict] = None,
):
"""
Class for appending data to a GCS Appendable Object.
Expand Down Expand Up @@ -100,6 +106,7 @@ def __init__(
:param write_handle: (Optional) An existing handle for writing the object.
If provided, opening the bidi-gRPC connection will be faster.
"""
raise_if_no_fast_crc32c()
self.client = client
self.bucket_name = bucket_name
self.object_name = object_name
Expand All @@ -120,6 +127,21 @@ def __init__(
# Please note: `offset` and `persisted_size` are same when the stream is
# opened.
self.persisted_size: Optional[int] = None
if writer_options is None:
writer_options = {}
self.flush_interval = writer_options.get(
"FLUSH_INTERVAL_BYTES", _DEFAULT_FLUSH_INTERVAL_BYTES
)
# TODO: add test case for this.
if self.flush_interval < _MAX_CHUNK_SIZE_BYTES:
raise exceptions.OutOfRange(
f"flush_interval must be >= {_MAX_CHUNK_SIZE_BYTES} , but provided {self.flush_interval}"
)
if self.flush_interval % _MAX_CHUNK_SIZE_BYTES != 0:
raise exceptions.OutOfRange(
f"flush_interval must be a multiple of {_MAX_CHUNK_SIZE_BYTES}, but provided {self.flush_interval}"
)
self.bytes_appended_since_last_flush = 0

async def state_lookup(self) -> int:
"""Returns the persisted_size
Expand Down Expand Up @@ -188,23 +210,24 @@ async def append(self, data: bytes) -> None:
self.offset = self.persisted_size

start_idx = 0
bytes_to_flush = 0
while start_idx < total_bytes:
end_idx = min(start_idx + _MAX_CHUNK_SIZE_BYTES, total_bytes)
data_chunk = data[start_idx:end_idx]
await self.write_obj_stream.send(
_storage_v2.BidiWriteObjectRequest(
write_offset=self.offset,
checksummed_data=_storage_v2.ChecksummedData(
content=data[start_idx:end_idx]
content=data_chunk,
crc32c=int.from_bytes(Checksum(data_chunk).digest(), "big"),
),
)
)
chunk_size = end_idx - start_idx
self.offset += chunk_size
bytes_to_flush += chunk_size
if bytes_to_flush >= _MAX_BUFFER_SIZE_BYTES:
self.bytes_appended_since_last_flush += chunk_size
if self.bytes_appended_since_last_flush >= self.flush_interval:
await self.simple_flush()
bytes_to_flush = 0
self.bytes_appended_since_last_flush = 0
start_idx = end_idx

async def simple_flush(self) -> None:
Expand Down Expand Up @@ -316,6 +339,16 @@ async def append_from_stream(self, stream_obj):
"""
raise NotImplementedError("append_from_stream is not implemented yet.")

async def append_from_file(self, file_path: str):
"""Create a file object from `file_path` and call append_from_stream(file_obj)"""
raise NotImplementedError("append_from_file is not implemented yet.")
async def append_from_file(
self, file_obj: BufferedReader, block_size: int = _DEFAULT_FLUSH_INTERVAL_BYTES
):
"""
Appends data to an Appendable Object using file_handle which is opened
for reading in binary mode.

:type file_obj: file
:param file_obj: A file handle opened in binary mode for reading.

"""
while block := file_obj.read(block_size):
await self.append(block)
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@

from __future__ import annotations
import asyncio
import google_crc32c
from google.api_core import exceptions
from google_crc32c import Checksum

from typing import List, Optional, Tuple

from google_crc32c import Checksum

from ._utils import raise_if_no_fast_crc32c
from google.cloud.storage._experimental.asyncio.async_read_object_stream import (
_AsyncReadObjectStream,
)
Expand Down Expand Up @@ -160,14 +159,7 @@ def __init__(
:param read_handle: (Optional) An existing read handle.
"""

# Verify that the fast, C-accelerated version of crc32c is available.
# If not, raise an error to prevent silent performance degradation.
if google_crc32c.implementation != "c":
raise exceptions.NotFound(
"The google-crc32c package is not installed with C support. "
"Bidi reads require the C extension for data integrity checks."
"For more information, see https://github.com/googleapis/python-crc32c."
)
raise_if_no_fast_crc32c()

self.client = client
self.bucket_name = bucket_name
Expand Down
135 changes: 130 additions & 5 deletions tests/system/test_zonal.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
from io import BytesIO

# python additional imports
import google_crc32c

import pytest

# current library imports
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
AsyncAppendableObjectWriter,
_DEFAULT_FLUSH_INTERVAL_BYTES,
)
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
AsyncMultiRangeDownloader,
Expand All @@ -28,6 +31,11 @@
_BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object"


def _get_equal_dist(a: int, b: int) -> tuple[int, int]:
step = (b - a) // 3
return a + step, a + 2 * step


async def write_one_appendable_object(
bucket_name: str,
object_name: str,
Expand Down Expand Up @@ -59,11 +67,21 @@ def appendable_object(storage_client, blobs_to_delete):


@pytest.mark.asyncio
@pytest.mark.parametrize(
"object_size",
[
256, # less than _chunk size
10 * 1024 * 1024, # less than _MAX_BUFFER_SIZE_BYTES
20 * 1024 * 1024, # greater than _MAX_BUFFER_SIZE
],
)
@pytest.mark.parametrize(
"attempt_direct_path",
[True, False],
)
async def test_basic_wrd(storage_client, blobs_to_delete, attempt_direct_path):
async def test_basic_wrd(
storage_client, blobs_to_delete, attempt_direct_path, object_size
):
object_name = f"test_basic_wrd-{str(uuid.uuid4())}"

# Client instantiation; it cannot be part of fixture because.
Expand All @@ -74,22 +92,129 @@ async def test_basic_wrd(storage_client, blobs_to_delete, attempt_direct_path):
# 2. we can keep the same event loop for entire module but that may
# create issues if tests are run in parallel and one test hogs the event
# loop slowing down other tests.
object_data = os.urandom(object_size)
object_checksum = google_crc32c.value(object_data)
grpc_client = AsyncGrpcClient(attempt_direct_path=attempt_direct_path).grpc_client

writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name)
await writer.open()
await writer.append(_BYTES_TO_UPLOAD)
await writer.append(object_data)
object_metadata = await writer.close(finalize_on_close=True)
assert object_metadata.size == len(_BYTES_TO_UPLOAD)
assert object_metadata.size == object_size
assert int(object_metadata.checksums.crc32c) == object_checksum

mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
buffer = BytesIO()
await mrd.open()
# (0, 0) means read the whole object
await mrd.download_ranges([(0, 0, buffer)])
await mrd.close()
assert buffer.getvalue() == _BYTES_TO_UPLOAD
assert mrd.persisted_size == len(_BYTES_TO_UPLOAD)
assert buffer.getvalue() == object_data
assert mrd.persisted_size == object_size

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))


@pytest.mark.asyncio
@pytest.mark.parametrize(
"object_size",
[
20 * 1024 * 1024, # greater than _MAX_BUFFER_SIZE
],
)
@pytest.mark.parametrize(
"attempt_direct_path",
[True],
)
async def test_basic_wrd_in_slices(
storage_client, blobs_to_delete, attempt_direct_path, object_size
):
object_name = f"test_basic_wrd-{str(uuid.uuid4())}"

# Client instantiation; it cannot be part of fixture because.
# grpc_client's event loop and event loop of coroutine running it
# (i.e. this test) must be same.
# Note:
# 1. @pytest.mark.asyncio ensures new event loop for each test.
# 2. we can keep the same event loop for entire module but that may
# create issues if tests are run in parallel and one test hogs the event
# loop slowing down other tests.
object_data = os.urandom(object_size)
object_checksum = google_crc32c.value(object_data)
grpc_client = AsyncGrpcClient(attempt_direct_path=attempt_direct_path).grpc_client

writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name)
await writer.open()
mark1, mark2 = _get_equal_dist(0, object_size)
await writer.append(object_data[0:mark1])
await writer.append(object_data[mark1:mark2])
await writer.append(object_data[mark2:])
object_metadata = await writer.close(finalize_on_close=True)
assert object_metadata.size == object_size
assert int(object_metadata.checksums.crc32c) == object_checksum

mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
buffer = BytesIO()
await mrd.open()
# (0, 0) means read the whole object
await mrd.download_ranges([(0, 0, buffer)])
await mrd.close()
assert buffer.getvalue() == object_data
assert mrd.persisted_size == object_size

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))


@pytest.mark.asyncio
@pytest.mark.parametrize(
"flush_interval",
[2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024, _DEFAULT_FLUSH_INTERVAL_BYTES],
)
async def test_wrd_with_non_default_flush_interval(
storage_client,
blobs_to_delete,
flush_interval,
):
object_name = f"test_basic_wrd-{str(uuid.uuid4())}"
object_size = 9 * 1024 * 1024

# Client instantiation; it cannot be part of fixture because.
# grpc_client's event loop and event loop of coroutine running it
# (i.e. this test) must be same.
# Note:
# 1. @pytest.mark.asyncio ensures new event loop for each test.
# 2. we can keep the same event loop for entire module but that may
# create issues if tests are run in parallel and one test hogs the event
# loop slowing down other tests.
object_data = os.urandom(object_size)
object_checksum = google_crc32c.value(object_data)
grpc_client = AsyncGrpcClient().grpc_client

writer = AsyncAppendableObjectWriter(
grpc_client,
_ZONAL_BUCKET,
object_name,
writer_options={"FLUSH_INTERVAL_BYTES": flush_interval},
)
await writer.open()
mark1, mark2 = _get_equal_dist(0, object_size)
await writer.append(object_data[0:mark1])
await writer.append(object_data[mark1:mark2])
await writer.append(object_data[mark2:])
object_metadata = await writer.close(finalize_on_close=True)
assert object_metadata.size == object_size
assert int(object_metadata.checksums.crc32c) == object_checksum

mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
buffer = BytesIO()
await mrd.open()
# (0, 0) means read the whole object
await mrd.download_ranges([(0, 0, buffer)])
await mrd.close()
assert buffer.getvalue() == object_data
assert mrd.persisted_size == object_size

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
Expand Down
Loading