Skip to content

Commit cf9597c

Browse files
committed
feat: provide flush size to be configurable
1 parent 2b1d6ce commit cf9597c

File tree

3 files changed

+106
-10
lines changed

3 files changed

+106
-10
lines changed

google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838

3939

4040
_MAX_CHUNK_SIZE_BYTES = 2 * 1024 * 1024 # 2 MiB
41-
_MAX_BUFFER_SIZE_BYTES = 16 * 1024 * 1024 # 16 MiB
41+
_DEFAULT_FLUSH_INTERVAL_BYTES = 16 * 1024 * 1024 # 16 MiB
4242

4343

4444
class AsyncAppendableObjectWriter:
@@ -51,6 +51,7 @@ def __init__(
5151
object_name: str,
5252
generation=None,
5353
write_handle=None,
54+
writer_options: Optional[dict] = None,
5455
):
5556
"""
5657
Class for appending data to a GCS Appendable Object.
@@ -127,6 +128,21 @@ def __init__(
127128
# Please note: `offset` and `persisted_size` are same when the stream is
128129
# opened.
129130
self.persisted_size: Optional[int] = None
131+
if writer_options is None:
132+
writer_options = {}
133+
self.flush_interval = writer_options.get(
134+
"FLUSH_INTERVAL_BYTES", _DEFAULT_FLUSH_INTERVAL_BYTES
135+
)
136+
# TODO: add test case for this.
137+
if self.flush_interval < _MAX_CHUNK_SIZE_BYTES:
138+
raise exceptions.OutOfRange(
139+
f"flush_interval must be >= {_MAX_CHUNK_SIZE_BYTES} , but provided {self.flush_interval}"
140+
)
141+
if not self.flush_interval % _MAX_CHUNK_SIZE_BYTES == 0:
142+
raise exceptions.OutOfRange(
143+
f"flush interval - {self.flush_interval} should be multiple of {_MAX_CHUNK_SIZE_BYTES}"
144+
)
145+
self.bytes_appended_since_last_flush = 0
130146

131147
async def state_lookup(self) -> int:
132148
"""Returns the persisted_size
@@ -195,7 +211,7 @@ async def append(self, data: bytes) -> None:
195211
self.offset = self.persisted_size
196212

197213
start_idx = 0
198-
bytes_to_flush = 0
214+
# bytes_to_flush = 0
199215
while start_idx < total_bytes:
200216
end_idx = min(start_idx + _MAX_CHUNK_SIZE_BYTES, total_bytes)
201217
data_chunk = data[start_idx:end_idx]
@@ -210,10 +226,10 @@ async def append(self, data: bytes) -> None:
210226
)
211227
chunk_size = end_idx - start_idx
212228
self.offset += chunk_size
213-
bytes_to_flush += chunk_size
214-
if bytes_to_flush >= _MAX_BUFFER_SIZE_BYTES:
229+
self.bytes_appended_since_last_flush += chunk_size
230+
if self.bytes_appended_since_last_flush >= self.flush_interval:
215231
await self.simple_flush()
216-
bytes_to_flush = 0
232+
self.bytes_appended_since_last_flush = 0
217233
start_idx = end_idx
218234

219235
async def simple_flush(self) -> None:

tests/system/test_zonal.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
1515
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
1616
AsyncAppendableObjectWriter,
17-
_MAX_BUFFER_SIZE_BYTES,
17+
_DEFAULT_FLUSH_INTERVAL_BYTES,
1818
)
1919
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
2020
AsyncMultiRangeDownloader,
@@ -168,6 +168,59 @@ async def test_basic_wrd_in_slices(
168168
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
169169

170170

171+
@pytest.mark.asyncio
172+
@pytest.mark.parametrize(
173+
"flush_interval",
174+
[2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024, _DEFAULT_FLUSH_INTERVAL_BYTES],
175+
)
176+
async def test_wrd_with_non_default_flush_interval(
177+
storage_client,
178+
blobs_to_delete,
179+
flush_interval,
180+
):
181+
object_name = f"test_basic_wrd-{str(uuid.uuid4())}"
182+
object_size = 9 * 1024 * 1024
183+
184+
# Client instantiation; it cannot be part of fixture because.
185+
# grpc_client's event loop and event loop of coroutine running it
186+
# (i.e. this test) must be same.
187+
# Note:
188+
# 1. @pytest.mark.asyncio ensures new event loop for each test.
189+
# 2. we can keep the same event loop for entire module but that may
190+
# create issues if tests are run in parallel and one test hogs the event
191+
# loop slowing down other tests.
192+
object_data = os.urandom(object_size)
193+
object_checksum = google_crc32c.value(object_data)
194+
grpc_client = AsyncGrpcClient().grpc_client
195+
196+
writer = AsyncAppendableObjectWriter(
197+
grpc_client,
198+
_ZONAL_BUCKET,
199+
object_name,
200+
writer_options={"FLUSH_INTERVAL_BYTES": flush_interval},
201+
)
202+
await writer.open()
203+
mark1, mark2 = _get_equal_dist(0, object_size)
204+
await writer.append(object_data[0:mark1])
205+
await writer.append(object_data[mark1:mark2])
206+
await writer.append(object_data[mark2:])
207+
object_metadata = await writer.close(finalize_on_close=True)
208+
assert object_metadata.size == object_size
209+
assert int(object_metadata.checksums.crc32c) == object_checksum
210+
211+
mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
212+
buffer = BytesIO()
213+
await mrd.open()
214+
# (0, 0) means read the whole object
215+
await mrd.download_ranges([(0, 0, buffer)])
216+
await mrd.close()
217+
assert buffer.getvalue() == object_data
218+
assert mrd.persisted_size == object_size
219+
220+
# Clean up; use json client (i.e. `storage_client` fixture) to delete.
221+
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
222+
223+
171224
@pytest.mark.asyncio
172225
async def test_read_unfinalized_appendable_object(storage_client, blobs_to_delete):
173226
object_name = f"read_unfinalized_appendable_object-{str(uuid.uuid4())[:4]}"

tests/unit/asyncio/test_async_appendable_object_writer.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
GENERATION = 123
2828
WRITE_HANDLE = b"test-write-handle"
2929
PERSISTED_SIZE = 456
30+
_ONE_MIB = 1024 * 1024
3031

3132

3233
@pytest.fixture
@@ -50,6 +51,7 @@ def test_init(mock_write_object_stream, mock_client):
5051
assert not writer._is_stream_open
5152
assert writer.offset is None
5253
assert writer.persisted_size is None
54+
assert writer.bytes_appended_since_last_flush == 0
5355

5456
mock_write_object_stream.assert_called_once_with(
5557
client=mock_client,
@@ -76,6 +78,7 @@ def test_init_with_optional_args(mock_write_object_stream, mock_client):
7678

7779
assert writer.generation == GENERATION
7880
assert writer.write_handle == WRITE_HANDLE
81+
assert writer.bytes_appended_since_last_flush == 0
7982

8083
mock_write_object_stream.assert_called_once_with(
8184
client=mock_client,
@@ -86,6 +89,30 @@ def test_init_with_optional_args(mock_write_object_stream, mock_client):
8689
)
8790

8891

92+
@mock.patch(
93+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
94+
)
95+
def test_init_with_writer_options(mock_write_object_stream, mock_client):
96+
"""Test the constructor with optional arguments."""
97+
writer = AsyncAppendableObjectWriter(
98+
mock_client,
99+
BUCKET,
100+
OBJECT,
101+
writer_options={"FLUSH_INTERVAL_BYTES": 8 * _ONE_MIB},
102+
)
103+
104+
assert writer.flush_interval == 8 * _ONE_MIB
105+
assert writer.bytes_appended_since_last_flush == 0
106+
107+
mock_write_object_stream.assert_called_once_with(
108+
client=mock_client,
109+
bucket_name=BUCKET,
110+
object_name=OBJECT,
111+
generation_number=None,
112+
write_handle=None,
113+
)
114+
115+
89116
@mock.patch("google.cloud.storage._experimental.asyncio._utils.google_crc32c")
90117
@mock.patch(
91118
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
@@ -470,7 +497,7 @@ async def test_append_flushes_when_buffer_is_full(
470497
):
471498
"""Test that append flushes the stream when the buffer size is reached."""
472499
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
473-
_MAX_BUFFER_SIZE_BYTES,
500+
_DEFAULT_FLUSH_INTERVAL_BYTES,
474501
)
475502

476503
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
@@ -480,7 +507,7 @@ async def test_append_flushes_when_buffer_is_full(
480507
mock_stream.send = mock.AsyncMock()
481508
writer.simple_flush = mock.AsyncMock()
482509

483-
data = b"a" * _MAX_BUFFER_SIZE_BYTES
510+
data = b"a" * _DEFAULT_FLUSH_INTERVAL_BYTES
484511
await writer.append(data)
485512

486513
writer.simple_flush.assert_awaited_once()
@@ -493,7 +520,7 @@ async def test_append_flushes_when_buffer_is_full(
493520
async def test_append_handles_large_data(mock_write_object_stream, mock_client):
494521
"""Test that append handles data larger than the buffer size."""
495522
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
496-
_MAX_BUFFER_SIZE_BYTES,
523+
_DEFAULT_FLUSH_INTERVAL_BYTES,
497524
)
498525

499526
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
@@ -503,7 +530,7 @@ async def test_append_handles_large_data(mock_write_object_stream, mock_client):
503530
mock_stream.send = mock.AsyncMock()
504531
writer.simple_flush = mock.AsyncMock()
505532

506-
data = b"a" * (_MAX_BUFFER_SIZE_BYTES * 2 + 1)
533+
data = b"a" * (_DEFAULT_FLUSH_INTERVAL_BYTES * 2 + 1)
507534
await writer.append(data)
508535

509536
assert writer.simple_flush.await_count == 2

0 commit comments

Comments
 (0)