Skip to content

Commit e0f06a5

Browse files
committed
add error handling for open method
1 parent 0e64fbc commit e0f06a5

File tree

10 files changed

+443
-500
lines changed

10 files changed

+443
-500
lines changed

google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py

Lines changed: 162 additions & 118 deletions
Large diffs are not rendered by default.

google/cloud/storage/_experimental/asyncio/async_read_object_stream.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,13 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None:
102102
if self._is_stream_open:
103103
raise ValueError("Stream is already open")
104104

105+
read_handle = self.read_handle if self.read_handle else None
106+
105107
read_object_spec = _storage_v2.BidiReadObjectSpec(
106108
bucket=self._full_bucket_name,
107109
object=self.object_name,
108110
generation=self.generation_number if self.generation_number else None,
109-
read_handle=self.read_handle if self.read_handle else None,
111+
read_handle=read_handle,
110112
)
111113
self.first_bidi_read_req = _storage_v2.BidiReadObjectRequest(
112114
read_object_spec=read_object_spec

google/cloud/storage/_experimental/asyncio/retry/reads_resumption_strategy.py

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414

1515
from typing import Any, Dict, List, IO
1616

17+
import grpc
18+
from google.api_core import exceptions
19+
from google.rpc import status_pb2
1720
from google_crc32c import Checksum
1821
from google.cloud import _storage_v2 as storage_v2
1922
from google.cloud.storage.exceptions import DataCorruption
@@ -23,6 +26,11 @@
2326
from google.cloud._storage_v2.types.storage import BidiReadObjectRedirectedError
2427

2528

29+
_BIDI_READ_REDIRECTED_TYPE_URL = (
30+
"type.googleapis.com/google.storage.v2.BidiReadObjectRedirectedError"
31+
)
32+
33+
2634
class _DownloadState:
2735
"""A helper class to track the state of a single range download."""
2836

@@ -74,8 +82,8 @@ def update_state_from_response(
7482
"""Processes a server response, performs integrity checks, and updates state."""
7583

7684
# Capture read_handle if provided.
77-
if response.read_handle and response.read_handle.handle:
78-
state["read_handle"] = response.read_handle.handle
85+
if response.read_handle:
86+
state["read_handle"] = response.read_handle
7987

8088
download_states = state["download_states"]
8189

@@ -97,7 +105,7 @@ def update_state_from_response(
97105
raise DataCorruption(
98106
response,
99107
f"Offset mismatch for read_id {read_id}. "
100-
f"Expected {read_state.next_expected_offset}, got {chunk_offset}"
108+
f"Expected {read_state.next_expected_offset}, got {chunk_offset}",
101109
)
102110

103111
# Checksum Verification
@@ -111,7 +119,7 @@ def update_state_from_response(
111119
raise DataCorruption(
112120
response,
113121
f"Checksum mismatch for read_id {read_id}. "
114-
f"Server sent {server_checksum}, client calculated {client_checksum}."
122+
f"Server sent {server_checksum}, client calculated {client_checksum}.",
115123
)
116124

117125
# Update State & Write Data
@@ -130,16 +138,52 @@ def update_state_from_response(
130138
raise DataCorruption(
131139
response,
132140
f"Byte count mismatch for read_id {read_id}. "
133-
f"Expected {read_state.initial_length}, got {read_state.bytes_written}"
141+
f"Expected {read_state.initial_length}, got {read_state.bytes_written}",
134142
)
135143

136144
async def recover_state_on_failure(self, error: Exception, state: Any) -> None:
137145
"""Handles BidiReadObjectRedirectedError for reads."""
138146
# This would parse the gRPC error details, extract the routing_token,
139147
# and store it on the shared state object.
140-
cause = getattr(error, "cause", error)
141-
if isinstance(cause, BidiReadObjectRedirectedError):
142-
state["routing_token"] = cause.routing_token
143-
if cause.read_handle and cause.read_handle.handle:
144-
state["read_handle"] = cause.read_handle.handle
145-
print(f"Recover state: Updated read_handle from redirect: {state['read_handle']}")
148+
grpc_error = None
149+
if isinstance(error, exceptions.Aborted) and error.errors:
150+
grpc_error = error.errors[0]
151+
152+
if grpc_error:
153+
if isinstance(grpc_error, BidiReadObjectRedirectedError):
154+
if grpc_error.routing_token:
155+
state["routing_token"] = grpc_error.routing_token
156+
if grpc_error.read_handle:
157+
state["read_handle"] = grpc_error.read_handle
158+
return
159+
160+
if hasattr(grpc_error, "trailing_metadata"):
161+
trailers = grpc_error.trailing_metadata()
162+
if not trailers:
163+
return
164+
status_details_bin = None
165+
for key, value in trailers:
166+
if key == "grpc-status-details-bin":
167+
status_details_bin = value
168+
break
169+
170+
if status_details_bin:
171+
status_proto = status_pb2.Status()
172+
try:
173+
status_proto.ParseFromString(status_details_bin)
174+
for detail in status_proto.details:
175+
if detail.type_url == _BIDI_READ_REDIRECTED_TYPE_URL:
176+
redirect_proto = (
177+
BidiReadObjectRedirectedError.deserialize(
178+
detail.value
179+
)
180+
)
181+
if redirect_proto.routing_token:
182+
state[
183+
"routing_token"
184+
] = redirect_proto.routing_token
185+
if redirect_proto.read_handle:
186+
state["read_handle"] = redirect_proto.read_handle
187+
break
188+
except Exception as e:
189+
print(f"--- Error unpacking redirect in _on_open_error: {e}")

google/cloud/storage/_media/requests/download.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,5 @@ def flush(self):
774774
def has_unconsumed_tail(self) -> bool:
775775
return self._decoder.has_unconsumed_tail
776776

777-
778777
else: # pragma: NO COVER
779778
_BrotliDecoder = None # type: ignore # pragma: NO COVER

0 commit comments

Comments
 (0)