55from io import BytesIO
66
77# python additional imports
8+ import google_crc32c
9+
810import pytest
911
1012# current library imports
2830_BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object"
2931
3032
33+ def _get_equal_dist (a : int , b : int ) -> tuple [int , int ]:
34+ step = (b - a ) // 3
35+ return a + step , a + 2 * step
36+
37+
3138async def write_one_appendable_object (
3239 bucket_name : str ,
3340 object_name : str ,
@@ -59,11 +66,21 @@ def appendable_object(storage_client, blobs_to_delete):
5966
6067
6168@pytest .mark .asyncio
69+ @pytest .mark .parametrize (
70+ "object_size" ,
71+ [
72+ 256 , # less than _chunk size
73+ 10 * 1024 * 1024 , # less than _MAX_BUFFER_SIZE_BYTES
74+ 20 * 1024 * 1024 , # greater than _MAX_BUFFER_SIZE
75+ ],
76+ )
6277@pytest .mark .parametrize (
6378 "attempt_direct_path" ,
6479 [True , False ],
6580)
66- async def test_basic_wrd (storage_client , blobs_to_delete , attempt_direct_path ):
81+ async def test_basic_wrd (
82+ storage_client , blobs_to_delete , attempt_direct_path , object_size
83+ ):
6784 object_name = f"test_basic_wrd-{ str (uuid .uuid4 ())} "
6885
6986 # Client instantiation; it cannot be part of fixture because.
@@ -74,22 +91,72 @@ async def test_basic_wrd(storage_client, blobs_to_delete, attempt_direct_path):
7491 # 2. we can keep the same event loop for entire module but that may
7592 # create issues if tests are run in parallel and one test hogs the event
7693 # loop slowing down other tests.
94+ object_data = os .urandom (object_size )
95+ object_checksum = google_crc32c .value (object_data )
7796 grpc_client = AsyncGrpcClient (attempt_direct_path = attempt_direct_path ).grpc_client
7897
7998 writer = AsyncAppendableObjectWriter (grpc_client , _ZONAL_BUCKET , object_name )
8099 await writer .open ()
81- await writer .append (_BYTES_TO_UPLOAD )
100+ await writer .append (object_data )
82101 object_metadata = await writer .close (finalize_on_close = True )
83- assert object_metadata .size == len (_BYTES_TO_UPLOAD )
102+ assert object_metadata .size == object_size
103+ assert int (object_metadata .checksums .crc32c ) == object_checksum
84104
85105 mrd = AsyncMultiRangeDownloader (grpc_client , _ZONAL_BUCKET , object_name )
86106 buffer = BytesIO ()
87107 await mrd .open ()
88108 # (0, 0) means read the whole object
89109 await mrd .download_ranges ([(0 , 0 , buffer )])
90110 await mrd .close ()
91- assert buffer .getvalue () == _BYTES_TO_UPLOAD
92- assert mrd .persisted_size == len (_BYTES_TO_UPLOAD )
111+ assert buffer .getvalue () == object_data
112+ assert mrd .persisted_size == object_size
113+
114+ # Clean up; use json client (i.e. `storage_client` fixture) to delete.
115+ blobs_to_delete .append (storage_client .bucket (_ZONAL_BUCKET ).blob (object_name ))
116+
117+
118+ @pytest .mark .asyncio
119+ @pytest .mark .parametrize (
120+ "object_size" ,
121+ [
122+ 10 , # less than _chunk size,
123+ 10 * 1024 * 1024 , # less than _MAX_BUFFER_SIZE_BYTES
124+ 20 * 1024 * 1024 , # greater than _MAX_BUFFER_SIZE_BYTES
125+ ],
126+ )
127+ async def test_basic_wrd_in_slices (storage_client , blobs_to_delete , object_size ):
128+ object_name = f"test_basic_wrd-{ str (uuid .uuid4 ())} "
129+
130+ # Client instantiation; it cannot be part of fixture because.
131+ # grpc_client's event loop and event loop of coroutine running it
132+ # (i.e. this test) must be same.
133+ # Note:
134+ # 1. @pytest.mark.asyncio ensures new event loop for each test.
135+ # 2. we can keep the same event loop for entire module but that may
136+ # create issues if tests are run in parallel and one test hogs the event
137+ # loop slowing down other tests.
138+ object_data = os .urandom (object_size )
139+ object_checksum = google_crc32c .value (object_data )
140+ grpc_client = AsyncGrpcClient ().grpc_client
141+
142+ writer = AsyncAppendableObjectWriter (grpc_client , _ZONAL_BUCKET , object_name )
143+ await writer .open ()
144+ mark1 , mark2 = _get_equal_dist (0 , object_size )
145+ await writer .append (object_data [0 :mark1 ])
146+ await writer .append (object_data [mark1 :mark2 ])
147+ await writer .append (object_data [mark2 :])
148+ object_metadata = await writer .close (finalize_on_close = True )
149+ assert object_metadata .size == object_size
150+ assert int (object_metadata .checksums .crc32c ) == object_checksum
151+
152+ mrd = AsyncMultiRangeDownloader (grpc_client , _ZONAL_BUCKET , object_name )
153+ buffer = BytesIO ()
154+ await mrd .open ()
155+ # (0, 0) means read the whole object
156+ await mrd .download_ranges ([(0 , 0 , buffer )])
157+ await mrd .close ()
158+ assert buffer .getvalue () == object_data
159+ assert mrd .persisted_size == object_size
93160
94161 # Clean up; use json client (i.e. `storage_client` fixture) to delete.
95162 blobs_to_delete .append (storage_client .bucket (_ZONAL_BUCKET ).blob (object_name ))
0 commit comments