1515from __future__ import annotations
1616import asyncio
1717import google_crc32c
18+ import grpc
1819from google .api_core import exceptions
1920from google .api_core .retry_async import AsyncRetry
21+ from google .cloud ._storage_v2 .types .storage import BidiReadObjectRedirectedError
22+ from google .rpc import status_pb2
23+ from google .protobuf .any_pb2 import Any as AnyProto
2024
2125from typing import List , Optional , Tuple , Any , Dict
2226
4347
4448
4549_MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100
46-
50+ _BIDI_READ_REDIRECTED_TYPE_URL = "type.googleapis.com/google.storage.v2.BidiReadObjectRedirectedError"
51+
52+
53+ def _is_read_retryable (exc ):
54+ """Predicate to determine if a read operation should be retried."""
55+ print (f"--- Checking if retryable: { type (exc )} : { exc } " )
56+ if isinstance (exc , (exceptions .ServiceUnavailable , exceptions .DeadlineExceeded , exceptions .TooManyRequests )):
57+ return True
58+
59+ grpc_error = None
60+ if isinstance (exc , exceptions .GoogleAPICallError ) and exc .errors :
61+ if isinstance (exc .errors [0 ], grpc .aio .AioRpcError ):
62+ grpc_error = exc .errors [0 ]
63+
64+ if grpc_error :
65+ print (f"--- Wrapped grpc.aio.AioRpcError code: { grpc_error .code ()} " )
66+ if grpc_error .code () in (
67+ grpc .StatusCode .UNAVAILABLE ,
68+ grpc .StatusCode .INTERNAL ,
69+ grpc .StatusCode .DEADLINE_EXCEEDED ,
70+ grpc .StatusCode .RESOURCE_EXHAUSTED ,
71+ ):
72+ return True
73+ if grpc_error .code () == grpc .StatusCode .ABORTED :
74+ trailers = grpc_error .trailing_metadata ()
75+ if not trailers :
76+ print ("--- No trailers" )
77+ return False
78+
79+ status_details_bin = None
80+ # *** CORRECTED TRAILER ACCESS ***
81+ for key , value in trailers :
82+ if key == 'grpc-status-details-bin' :
83+ status_details_bin = value
84+ break
85+
86+ if status_details_bin :
87+ status_proto = status_pb2 .Status ()
88+ try :
89+ status_proto .ParseFromString (status_details_bin )
90+ for detail in status_proto .details :
91+ if detail .type_url == _BIDI_READ_REDIRECTED_TYPE_URL :
92+ print ("--- Found BidiReadObjectRedirectedError, is retryable" )
93+ return True
94+ print ("--- BidiReadObjectRedirectedError type URL not found in details" )
95+ except Exception as e :
96+ print (f"--- Error parsing status_details_bin: { e } " )
97+ return False
98+ else :
99+ print ("--- No grpc-status-details-bin in trailers" )
100+ return False
47101
48102class AsyncMultiRangeDownloader :
49103 """Provides an interface for downloading multiple ranges of a GCS ``Object``
@@ -154,39 +208,91 @@ def __init__(
154208 self .read_handle = read_handle
155209 self .read_obj_str : Optional [_AsyncReadObjectStream ] = None
156210 self ._is_stream_open : bool = False
211+ self ._routing_token : Optional [str ] = None
212+
213+ async def _on_open_error (self , exc ):
214+ """Extracts routing token and read handle on redirect error during open."""
215+ print (f"--- _on_open_error called with { type (exc )} : { exc } " )
216+ grpc_error = None
217+ if isinstance (exc , exceptions .GoogleAPICallError ) and exc .errors :
218+ if isinstance (exc .errors [0 ], grpc .aio .AioRpcError ):
219+ grpc_error = exc .errors [0 ]
220+
221+ if grpc_error and grpc_error .code () == grpc .StatusCode .ABORTED :
222+ trailers = grpc_error .trailing_metadata ()
223+ if not trailers : return
224+
225+ status_details_bin = None
226+ # *** CORRECTED TRAILER ACCESS ***
227+ for key , value in trailers :
228+ if key == 'grpc-status-details-bin' :
229+ status_details_bin = value
230+ break
231+
232+ if status_details_bin :
233+ status_proto = status_pb2 .Status ()
234+ try :
235+ status_proto .ParseFromString (status_details_bin )
236+ for detail in status_proto .details :
237+ if detail .type_url == _BIDI_READ_REDIRECTED_TYPE_URL :
238+ redirect_proto = BidiReadObjectRedirectedError ()
239+ detail .Unpack (redirect_proto )
240+ if redirect_proto .routing_token :
241+ self ._routing_token = redirect_proto .routing_token
242+ if redirect_proto .read_handle and redirect_proto .read_handle .handle :
243+ self .read_handle = redirect_proto .read_handle .handle
244+ print (f"--- BidiReadObjectRedirectedError caught in open, new token: { self ._routing_token } , handle: { self .read_handle } " )
245+ break
246+ except Exception as e :
247+ print (f"--- Error unpacking redirect in _on_open_error: { e } " )
248+
249+ if self .read_obj_str and self .read_obj_str ._is_open :
250+ try :
251+ await self .read_obj_str .close ()
252+ except Exception :
253+ pass
254+ self ._is_stream_open = False
157255
158256 self ._read_id_to_writable_buffer_dict = {}
159257 self ._read_id_to_download_ranges_id = {}
160258 self ._download_ranges_id_to_pending_read_ids = {}
161259 self .persisted_size : Optional [int ] = None # updated after opening the stream
162260
163261 async def open (self , retry_policy : Optional [AsyncRetry ] = None ) -> None :
164- """Opens the bidi-gRPC connection to read from the object.
165-
166- This method initializes and opens an `_AsyncReadObjectStream` (bidi-gRPC stream) to
167- for downloading ranges of data from GCS ``Object``.
168-
169- "Opening" constitutes fetching object metadata such as generation number
170- and read handle and sets them as attributes if not already set.
171- """
262+ """Opens the bidi-gRPC connection to read from the object."""
172263 if self ._is_stream_open :
173264 raise ValueError ("Underlying bidi-gRPC stream is already open" )
174265
175266 if retry_policy is None :
176- # Default policy: retry generic transient errors
177- retry_policy = AsyncRetry (
178- predicate = lambda e : isinstance (e , (exceptions .ServiceUnavailable , exceptions .DeadlineExceeded ))
179- )
267+ retry_policy = AsyncRetry (predicate = _is_read_retryable , on_error = self ._on_open_error )
268+ else :
269+ original_on_error = retry_policy ._on_error
270+ async def combined_on_error (exc ):
271+ await self ._on_open_error (exc )
272+ if original_on_error :
273+ await original_on_error (exc )
274+ retry_policy = retry_policy .with_predicate (_is_read_retryable ).with_on_error (combined_on_error )
180275
181276 async def _do_open ():
277+ print ("--- Attempting _do_open" )
278+ if self ._is_stream_open :
279+ self ._is_stream_open = False
280+
182281 self .read_obj_str = _AsyncReadObjectStream (
183282 client = self .client ,
184283 bucket_name = self .bucket_name ,
185284 object_name = self .object_name ,
186285 generation_number = self .generation_number ,
187286 read_handle = self .read_handle ,
188287 )
189- await self .read_obj_str .open ()
288+
289+ metadata = []
290+ if self ._routing_token :
291+ metadata .append (("x-goog-request-params" , f"routing_token={ self ._routing_token } " ))
292+ print (f"--- Using routing_token for open: { self ._routing_token } " )
293+ self ._routing_token = None
294+
295+ await self .read_obj_str .open (metadata = metadata if metadata else None )
190296
191297 if self .read_obj_str .generation_number :
192298 self .generation_number = self .read_obj_str .generation_number
@@ -196,8 +302,8 @@ async def _do_open():
196302 self .persisted_size = self .read_obj_str .persisted_size
197303
198304 self ._is_stream_open = True
305+ print ("--- Stream opened successfully" )
199306
200- # Execute open with retry policy
201307 await retry_policy (_do_open )()
202308
203309 async def download_ranges (
@@ -263,9 +369,7 @@ async def download_ranges(
263369 lock = asyncio .Lock ()
264370
265371 if retry_policy is None :
266- retry_policy = AsyncRetry (
267- predicate = lambda e : isinstance (e , (exceptions .ServiceUnavailable , exceptions .DeadlineExceeded ))
268- )
372+ retry_policy = AsyncRetry (predicate = _is_read_retryable )
269373
270374 # Initialize Global State for Retry Strategy
271375 download_states = {}
0 commit comments