Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions google/cloud/storage/_experimental/asyncio/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import google_crc32c

from google.api_core import exceptions

def raise_if_no_fast_crc32c():
"""Check if the C-accelerated version of google-crc32c is available.

If not, raise an error to prevent silent performance degradation.

raises google.api_core.exceptions.FailedPrecondition: If the C extension is not available.
returns: True if the C extension is available.
rtype: bool

"""
if google_crc32c.implementation != "c":
raise exceptions.FailedPrecondition(
"The google-crc32c package is not installed with C support. "
"C extension is required for faster data integrity checks."
"For more information, see https://github.com/googleapis/python-crc32c."
)
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
if you want to use these Rapid Storage APIs.

"""
import time
from typing import Optional, Union

from google.api_core import exceptions
from google_crc32c import Checksum

from ._utils import raise_if_no_fast_crc32c
from google.cloud import _storage_v2
from google.cloud.storage._experimental.asyncio.async_grpc_client import (
AsyncGrpcClient,
Expand All @@ -32,7 +38,7 @@


_MAX_CHUNK_SIZE_BYTES = 2 * 1024 * 1024 # 2 MiB
_MAX_BUFFER_SIZE_BYTES = 16 * 1024 * 1024 # 16 MiB
_DEFAULT_FLUSH_INTERVAL_BYTES = 16 * 1024 * 1024 # 16 MiB


class AsyncAppendableObjectWriter:
Expand All @@ -45,6 +51,7 @@ def __init__(
object_name: str,
generation=None,
write_handle=None,
writer_options: Optional[dict] = None,
):
"""
Class for appending data to a GCS Appendable Object.
Expand Down Expand Up @@ -100,6 +107,7 @@ def __init__(
:param write_handle: (Optional) An existing handle for writing the object.
If provided, opening the bidi-gRPC connection will be faster.
"""
raise_if_no_fast_crc32c()
self.client = client
self.bucket_name = bucket_name
self.object_name = object_name
Expand All @@ -120,6 +128,21 @@ def __init__(
# Please note: `offset` and `persisted_size` are same when the stream is
# opened.
self.persisted_size: Optional[int] = None
if writer_options is None:
writer_options = {}
self.flush_interval = writer_options.get(
"FLUSH_INTERVAL_BYTES", _DEFAULT_FLUSH_INTERVAL_BYTES
)
# TODO: add test case for this.
if self.flush_interval < _MAX_CHUNK_SIZE_BYTES:
raise exceptions.OutOfRange(
f"flush_interval must be >= {_MAX_CHUNK_SIZE_BYTES} , but provided {self.flush_interval}"
)
if not self.flush_interval % _MAX_CHUNK_SIZE_BYTES == 0:
raise exceptions.OutOfRange(
f"flush interval - {self.flush_interval} should be multiple of {_MAX_CHUNK_SIZE_BYTES}"
)
self.bytes_appended_since_last_flush = 0

async def state_lookup(self) -> int:
"""Returns the persisted_size
Expand Down Expand Up @@ -188,23 +211,25 @@ async def append(self, data: bytes) -> None:
self.offset = self.persisted_size

start_idx = 0
bytes_to_flush = 0
# bytes_to_flush = 0
while start_idx < total_bytes:
end_idx = min(start_idx + _MAX_CHUNK_SIZE_BYTES, total_bytes)
data_chunk = data[start_idx:end_idx]
await self.write_obj_stream.send(
_storage_v2.BidiWriteObjectRequest(
write_offset=self.offset,
checksummed_data=_storage_v2.ChecksummedData(
content=data[start_idx:end_idx]
content=data_chunk,
crc32c=int.from_bytes(Checksum(data_chunk).digest(), "big"),
),
)
)
chunk_size = end_idx - start_idx
self.offset += chunk_size
bytes_to_flush += chunk_size
if bytes_to_flush >= _MAX_BUFFER_SIZE_BYTES:
self.bytes_appended_since_last_flush += chunk_size
if self.bytes_appended_since_last_flush >= self.flush_interval:
await self.simple_flush()
bytes_to_flush = 0
self.bytes_appended_since_last_flush = 0
start_idx = end_idx

async def simple_flush(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

from __future__ import annotations
import asyncio
import google_crc32c
from typing import List, Optional, Tuple

from google.api_core import exceptions
from google_crc32c import Checksum

from typing import List, Optional, Tuple

from ._utils import raise_if_no_fast_crc32c
from google.cloud.storage._experimental.asyncio.async_read_object_stream import (
_AsyncReadObjectStream,
)
Expand Down Expand Up @@ -160,14 +160,7 @@ def __init__(
:param read_handle: (Optional) An existing read handle.
"""

# Verify that the fast, C-accelerated version of crc32c is available.
# If not, raise an error to prevent silent performance degradation.
if google_crc32c.implementation != "c":
raise exceptions.NotFound(
"The google-crc32c package is not installed with C support. "
"Bidi reads require the C extension for data integrity checks."
"For more information, see https://github.com/googleapis/python-crc32c."
)
raise_if_no_fast_crc32c()

self.client = client
self.bucket_name = bucket_name
Expand Down
136 changes: 131 additions & 5 deletions tests/system/test_zonal.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@
import os
import uuid
from io import BytesIO
import random

# python additional imports
import google_crc32c

import pytest

# current library imports
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
AsyncAppendableObjectWriter,
_DEFAULT_FLUSH_INTERVAL_BYTES,
)
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
AsyncMultiRangeDownloader,
Expand All @@ -28,6 +32,11 @@
_BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object"


def _get_equal_dist(a: int, b: int) -> tuple[int, int]:
step = (b - a) // 3
return a + step, a + 2 * step


async def write_one_appendable_object(
bucket_name: str,
object_name: str,
Expand Down Expand Up @@ -59,11 +68,21 @@ def appendable_object(storage_client, blobs_to_delete):


@pytest.mark.asyncio
@pytest.mark.parametrize(
"object_size",
[
256, # less than _chunk size
10 * 1024 * 1024, # less than _MAX_BUFFER_SIZE_BYTES
20 * 1024 * 1024, # greater than _MAX_BUFFER_SIZE
],
)
@pytest.mark.parametrize(
"attempt_direct_path",
[True, False],
)
async def test_basic_wrd(storage_client, blobs_to_delete, attempt_direct_path):
async def test_basic_wrd(
storage_client, blobs_to_delete, attempt_direct_path, object_size
):
object_name = f"test_basic_wrd-{str(uuid.uuid4())}"

# Client instantiation; it cannot be part of fixture because.
Expand All @@ -74,22 +93,129 @@ async def test_basic_wrd(storage_client, blobs_to_delete, attempt_direct_path):
# 2. we can keep the same event loop for entire module but that may
# create issues if tests are run in parallel and one test hogs the event
# loop slowing down other tests.
object_data = os.urandom(object_size)
object_checksum = google_crc32c.value(object_data)
grpc_client = AsyncGrpcClient(attempt_direct_path=attempt_direct_path).grpc_client

writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name)
await writer.open()
await writer.append(_BYTES_TO_UPLOAD)
await writer.append(object_data)
object_metadata = await writer.close(finalize_on_close=True)
assert object_metadata.size == len(_BYTES_TO_UPLOAD)
assert object_metadata.size == object_size
assert int(object_metadata.checksums.crc32c) == object_checksum

mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
buffer = BytesIO()
await mrd.open()
# (0, 0) means read the whole object
await mrd.download_ranges([(0, 0, buffer)])
await mrd.close()
assert buffer.getvalue() == _BYTES_TO_UPLOAD
assert mrd.persisted_size == len(_BYTES_TO_UPLOAD)
assert buffer.getvalue() == object_data
assert mrd.persisted_size == object_size

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))


@pytest.mark.asyncio
@pytest.mark.parametrize(
"object_size",
[
20 * 1024 * 1024, # greater than _MAX_BUFFER_SIZE
],
)
@pytest.mark.parametrize(
"attempt_direct_path",
[True],
)
async def test_basic_wrd_in_slices(
storage_client, blobs_to_delete, attempt_direct_path, object_size
):
object_name = f"test_basic_wrd-{str(uuid.uuid4())}"

# Client instantiation; it cannot be part of fixture because.
# grpc_client's event loop and event loop of coroutine running it
# (i.e. this test) must be same.
# Note:
# 1. @pytest.mark.asyncio ensures new event loop for each test.
# 2. we can keep the same event loop for entire module but that may
# create issues if tests are run in parallel and one test hogs the event
# loop slowing down other tests.
object_data = os.urandom(object_size)
object_checksum = google_crc32c.value(object_data)
grpc_client = AsyncGrpcClient(attempt_direct_path=attempt_direct_path).grpc_client

writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name)
await writer.open()
mark1, mark2 = _get_equal_dist(0, object_size)
await writer.append(object_data[0:mark1])
await writer.append(object_data[mark1:mark2])
await writer.append(object_data[mark2:])
object_metadata = await writer.close(finalize_on_close=True)
assert object_metadata.size == object_size
assert int(object_metadata.checksums.crc32c) == object_checksum

mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
buffer = BytesIO()
await mrd.open()
# (0, 0) means read the whole object
await mrd.download_ranges([(0, 0, buffer)])
await mrd.close()
assert buffer.getvalue() == object_data
assert mrd.persisted_size == object_size

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))


@pytest.mark.asyncio
@pytest.mark.parametrize(
"flush_interval",
[2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024, _DEFAULT_FLUSH_INTERVAL_BYTES],
)
async def test_wrd_with_non_default_flush_interval(
storage_client,
blobs_to_delete,
flush_interval,
):
object_name = f"test_basic_wrd-{str(uuid.uuid4())}"
object_size = 9 * 1024 * 1024

# Client instantiation; it cannot be part of fixture because.
# grpc_client's event loop and event loop of coroutine running it
# (i.e. this test) must be same.
# Note:
# 1. @pytest.mark.asyncio ensures new event loop for each test.
# 2. we can keep the same event loop for entire module but that may
# create issues if tests are run in parallel and one test hogs the event
# loop slowing down other tests.
object_data = os.urandom(object_size)
object_checksum = google_crc32c.value(object_data)
grpc_client = AsyncGrpcClient().grpc_client

writer = AsyncAppendableObjectWriter(
grpc_client,
_ZONAL_BUCKET,
object_name,
writer_options={"FLUSH_INTERVAL_BYTES": flush_interval},
)
await writer.open()
mark1, mark2 = _get_equal_dist(0, object_size)
await writer.append(object_data[0:mark1])
await writer.append(object_data[mark1:mark2])
await writer.append(object_data[mark2:])
object_metadata = await writer.close(finalize_on_close=True)
assert object_metadata.size == object_size
assert int(object_metadata.checksums.crc32c) == object_checksum

mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
buffer = BytesIO()
await mrd.open()
# (0, 0) means read the whole object
await mrd.download_ranges([(0, 0, buffer)])
await mrd.close()
assert buffer.getvalue() == object_data
assert mrd.persisted_size == object_size

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
Expand Down
Loading