2323if you want to use these Rapid Storage APIs.
2424
2525"""
26- from io import BufferedReader , BytesIO
26+ from io import BufferedReader
2727import asyncio
2828import io
29- from typing import List , Optional , Tuple , Union
29+ import logging
30+ from typing import List , Optional , Tuple
3031
31- from google_crc32c import Checksum
3232from google .api_core import exceptions
3333from google .api_core .retry_async import AsyncRetry
3434from google .rpc import status_pb2
5858_BIDI_WRITE_REDIRECTED_TYPE_URL = (
5959 "type.googleapis.com/google.storage.v2.BidiWriteObjectRedirectedError"
6060)
61+ logger = logging .getLogger (__name__ )
6162
6263
6364def _is_write_retryable (exc ):
6465 """Predicate to determine if a write operation should be retried."""
6566
66- print ("In _is_write_retryable method, exception:" , exc )
67-
6867 if isinstance (
6968 exc ,
7069 (
@@ -74,6 +73,7 @@ def _is_write_retryable(exc):
7473 exceptions .TooManyRequests ,
7574 ),
7675 ):
76+ logger .info (f"Retryable write exception encountered: { exc } " )
7777 return True
7878
7979 grpc_error = None
@@ -97,6 +97,7 @@ def _is_write_retryable(exc):
9797 if detail .type_url == _BIDI_WRITE_REDIRECTED_TYPE_URL :
9898 return True
9999 except Exception :
100+ logger .error ("Error unpacking redirect details from gRPC error." )
100101 return False
101102 return False
102103
@@ -201,16 +202,6 @@ def __init__(
201202 self ._routing_token : Optional [str ] = None
202203 self .object_resource : Optional [_storage_v2 .Object ] = None
203204
204- def _stream_opener (self , write_handle = None ):
205- """Helper to create a new _AsyncWriteObjectStream."""
206- return _AsyncWriteObjectStream (
207- client = self .client ,
208- bucket_name = self .bucket_name ,
209- object_name = self .object_name ,
210- generation_number = self .generation ,
211- write_handle = write_handle if write_handle else self .write_handle ,
212- )
213-
214205 async def state_lookup (self ) -> int :
215206 """Returns the persisted_size
216207
@@ -244,6 +235,8 @@ def _on_open_error(self, exc):
244235 self ._routing_token = grpc_error .routing_token
245236 if grpc_error .write_handle :
246237 self .write_handle = grpc_error .write_handle
238+ if grpc_error .generation :
239+ self .generation = grpc_error .generation
247240 return
248241
249242 if hasattr (grpc_error , "trailing_metadata" ):
@@ -272,9 +265,13 @@ def _on_open_error(self, exc):
272265 self ._routing_token = redirect_proto .routing_token
273266 if redirect_proto .write_handle :
274267 self .write_handle = redirect_proto .write_handle
268+ if redirect_proto .generation :
269+ self .generation = redirect_proto .generation
275270 break
276271 except Exception :
277- # Could not parse the error, ignore
272+ logger .error (
273+ "Error unpacking redirect details from gRPC error."
274+ )
278275 pass
279276
280277 async def open (
@@ -302,12 +299,16 @@ def combined_on_error(exc):
302299 if original_on_error :
303300 original_on_error (exc )
304301
305- retry_policy = retry_policy .with_predicate (
306- _is_write_retryable
307- ).with_on_error (combined_on_error )
302+ retry_policy = AsyncRetry (
303+ predicate = _is_write_retryable ,
304+ initial = retry_policy ._initial ,
305+ maximum = retry_policy ._maximum ,
306+ multiplier = retry_policy ._multiplier ,
307+ deadline = retry_policy ._deadline ,
308+ on_error = combined_on_error ,
309+ )
308310
309311 async def _do_open ():
310- print ("In _do_open method" )
311312 current_metadata = list (metadata ) if metadata else []
312313
313314 # Cleanup stream from previous failed attempt, if any.
@@ -326,15 +327,14 @@ async def _do_open():
326327 object_name = self .object_name ,
327328 generation_number = self .generation ,
328329 write_handle = self .write_handle ,
330+ routing_token = self ._routing_token ,
329331 )
330332
331333 if self ._routing_token :
332334 current_metadata .append (
333335 ("x-goog-request-params" , f"routing_token={ self ._routing_token } " )
334336 )
335- self ._routing_token = None
336337
337- print ("Current metadata in _do_open:" , current_metadata )
338338 await self .write_obj_stream .open (
339339 metadata = current_metadata if metadata else None
340340 )
@@ -347,11 +347,10 @@ async def _do_open():
347347 self .persisted_size = self .write_obj_stream .persisted_size
348348
349349 self ._is_stream_open = True
350+ self ._routing_token = None
350351
351- print ("In open method, before retry_policy call" )
352352 await retry_policy (_do_open )()
353353
354-
355354 async def append (
356355 self ,
357356 data : bytes ,
@@ -387,47 +386,62 @@ async def append(
387386 if retry_policy is None :
388387 retry_policy = AsyncRetry (predicate = _is_write_retryable )
389388
389+ strategy = _WriteResumptionStrategy ()
390390 buffer = io .BytesIO (data )
391- target_persisted_size = self .persisted_size + len (data )
392391 attempt_count = 0
393392
394- print ("In append method" )
395-
396- def send_and_recv_generator (requests : List [BidiWriteObjectRequest ], state : dict [str , _WriteState ], metadata : Optional [List [Tuple [str , str ]]] = None ):
393+ def send_and_recv_generator (
394+ requests : List [BidiWriteObjectRequest ],
395+ state : dict [str , _WriteState ],
396+ metadata : Optional [List [Tuple [str , str ]]] = None ,
397+ ):
397398 async def generator ():
398- print ("In send_and_recv_generator" )
399399 nonlocal attempt_count
400+ nonlocal requests
400401 attempt_count += 1
401402 resp = None
402403 async with self ._lock :
403404 write_state = state ["write_state" ]
404405 # If this is a retry or redirect, we must re-open the stream
405406 if attempt_count > 1 or write_state .routing_token :
406- print ("Re-opening the stream inside send_and_recv_generator with attempt_count:" , attempt_count )
407- if self .write_obj_stream and self .write_obj_stream .is_stream_open :
407+ logger .info (
408+ f"Re-opening the stream with attempt_count: { attempt_count } "
409+ )
410+ if (
411+ self .write_obj_stream
412+ and self .write_obj_stream .is_stream_open
413+ ):
408414 await self .write_obj_stream .close ()
409415
410- self .write_obj_stream = self ._stream_opener (write_handle = write_state .write_handle )
411416 current_metadata = list (metadata ) if metadata else []
412417 if write_state .routing_token :
413- current_metadata .append (("x-goog-request-params" , f"routing_token={ write_state .routing_token } " ))
414- await self .write_obj_stream .open (metadata = current_metadata if current_metadata else None )
418+ current_metadata .append (
419+ (
420+ "x-goog-request-params" ,
421+ f"routing_token={ write_state .routing_token } " ,
422+ )
423+ )
424+ self ._routing_token = write_state .routing_token
425+
426+ self ._is_stream_open = False
427+ await self .open (metadata = current_metadata )
415428
416- self ._is_stream_open = True
417429 write_state .persisted_size = self .persisted_size
418430 write_state .write_handle = self .write_handle
431+ write_state .routing_token = None
419432
420- print ("Sending requests in send_and_recv_generator" )
421- # req_iter = iter(requests)
433+ write_state .user_buffer .seek (write_state .persisted_size )
434+ write_state .bytes_sent = write_state .persisted_size
435+ write_state .bytes_since_last_flush = 0
422436
423- print ("Starting to send requests" )
437+ requests = strategy .generate_requests (state )
438+
439+ num_requests = len (requests )
424440 for i , chunk_req in enumerate (requests ):
425- if i == len ( requests ) - 1 :
441+ if i == num_requests - 1 :
426442 chunk_req .state_lookup = True
427- print ( "Sending chunk request" )
443+ chunk_req . flush = True
428444 await self .write_obj_stream .send (chunk_req )
429- print ("Waiting to receive response" )
430- print ("Current persisted_size:" , state ["write_state" ].persisted_size , "Target persisted_size:" , target_persisted_size )
431445
432446 resp = await self .write_obj_stream .recv ()
433447 if resp :
@@ -437,38 +451,28 @@ async def generator():
437451 if resp .write_handle :
438452 self .write_handle = resp .write_handle
439453 state ["write_state" ].write_handle = resp .write_handle
440- print ("Received response in send_and_recv_generator" , resp )
441454
442455 yield resp
443456
444- # while state["write_state"].persisted_size < target_persisted_size:
445- # print("Waiting to receive response")
446- # print("Current persisted_size:", state["write_state"].persisted_size, "Target persisted_size:", target_persisted_size)
447- # resp = await self.write_obj_stream.recv()
448- # print("Received response in send_and_recv_generator", resp)
449- # if resp is None:
450- # break
451- # yield resp
452457 return generator ()
453458
454459 # State initialization
455- spec = _storage_v2 .AppendObjectSpec (
456- bucket = f"projects/_/buckets/{ self .bucket_name } " , object = self .object_name , generation = self .generation
457- )
458- write_state = _WriteState (spec , _MAX_CHUNK_SIZE_BYTES , buffer )
460+ write_state = _WriteState (_MAX_CHUNK_SIZE_BYTES , buffer , self .flush_interval )
459461 write_state .write_handle = self .write_handle
460462 write_state .persisted_size = self .persisted_size
461463 write_state .bytes_sent = self .persisted_size
464+ write_state .bytes_since_last_flush = self .bytes_appended_since_last_flush
462465
463- print ("Before creating retry manager" )
464- retry_manager = _BidiStreamRetryManager (_WriteResumptionStrategy (),
465- lambda r , s : send_and_recv_generator (r , s , metadata ))
466+ retry_manager = _BidiStreamRetryManager (
467+ _WriteResumptionStrategy (),
468+ lambda r , s : send_and_recv_generator (r , s , metadata ),
469+ )
466470 await retry_manager .execute ({"write_state" : write_state }, retry_policy )
467471
468472 # Sync local markers
469473 self .write_obj_stream .persisted_size = write_state .persisted_size
470474 self .write_obj_stream .write_handle = write_state .write_handle
471-
475+ self . bytes_appended_since_last_flush = write_state . bytes_since_last_flush
472476
473477 async def simple_flush (self ) -> None :
474478 """Flushes the data to the server.
@@ -561,16 +565,10 @@ async def finalize(self) -> _storage_v2.Object:
561565 if not self ._is_stream_open :
562566 raise ValueError ("Stream is not open. Call open() before finalize()." )
563567
564- print ("In finalize method" )
565-
566- # async with self._lock:
567- print ("Sending finish_write request" )
568568 await self .write_obj_stream .send (
569569 _storage_v2 .BidiWriteObjectRequest (finish_write = True )
570570 )
571- print ("Waiting to receive response for finalize" )
572571 response = await self .write_obj_stream .recv ()
573- print ("Received response for finalize:" )
574572 self .object_resource = response .resource
575573 self .persisted_size = self .object_resource .size
576574 await self .write_obj_stream .close ()
0 commit comments