Skip to content

Commit 5d39296

Browse files
committed
adding more unit tests
1 parent c131588 commit 5d39296

File tree

7 files changed

+428
-832
lines changed

7 files changed

+428
-832
lines changed

google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py

Lines changed: 23 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import asyncio
2828
import io
2929
import logging
30-
from typing import List, Optional, Tuple
30+
from typing import List, Optional, Tuple, Union
3131

3232
from google.api_core import exceptions
3333
from google.api_core.retry_async import AsyncRetry
@@ -51,6 +51,9 @@
5151
_WriteResumptionStrategy,
5252
_WriteState,
5353
)
54+
from google.cloud.storage._experimental.asyncio.retry._helpers import (
55+
_extract_bidi_writes_redirect_proto,
56+
)
5457

5558

5659
_MAX_CHUNK_SIZE_BYTES = 2 * 1024 * 1024 # 2 MiB
@@ -71,14 +74,18 @@ def _is_write_retryable(exc):
7174
exceptions.ServiceUnavailable,
7275
exceptions.DeadlineExceeded,
7376
exceptions.TooManyRequests,
77+
BidiWriteObjectRedirectedError
7478
),
7579
):
7680
logger.info(f"Retryable write exception encountered: {exc}")
7781
return True
7882

7983
grpc_error = None
80-
if isinstance(exc, exceptions.Aborted):
84+
if isinstance(exc, exceptions.Aborted) and exc.errors:
8185
grpc_error = exc.errors[0]
86+
if isinstance(grpc_error, BidiWriteObjectRedirectedError):
87+
return True
88+
8289
trailers = grpc_error.trailing_metadata()
8390
if not trailers:
8491
return False
@@ -226,53 +233,14 @@ async def state_lookup(self) -> int:
226233

227234
def _on_open_error(self, exc):
228235
"""Extracts routing token and write handle on redirect error during open."""
229-
grpc_error = None
230-
if isinstance(exc, exceptions.Aborted) and exc.errors:
231-
grpc_error = exc.errors[0]
232-
233-
if grpc_error:
234-
if isinstance(grpc_error, BidiWriteObjectRedirectedError):
235-
self._routing_token = grpc_error.routing_token
236-
if grpc_error.write_handle:
237-
self.write_handle = grpc_error.write_handle
238-
if grpc_error.generation:
239-
self.generation = grpc_error.generation
240-
return
241-
242-
if hasattr(grpc_error, "trailing_metadata"):
243-
trailers = grpc_error.trailing_metadata()
244-
if not trailers:
245-
return
246-
247-
status_details_bin = None
248-
for key, value in trailers:
249-
if key == "grpc-status-details-bin":
250-
status_details_bin = value
251-
break
252-
253-
if status_details_bin:
254-
status_proto = status_pb2.Status()
255-
try:
256-
status_proto.ParseFromString(status_details_bin)
257-
for detail in status_proto.details:
258-
if detail.type_url == _BIDI_WRITE_REDIRECTED_TYPE_URL:
259-
redirect_proto = (
260-
BidiWriteObjectRedirectedError.deserialize(
261-
detail.value
262-
)
263-
)
264-
if redirect_proto.routing_token:
265-
self._routing_token = redirect_proto.routing_token
266-
if redirect_proto.write_handle:
267-
self.write_handle = redirect_proto.write_handle
268-
if redirect_proto.generation:
269-
self.generation = redirect_proto.generation
270-
break
271-
except Exception:
272-
logger.error(
273-
"Error unpacking redirect details from gRPC error."
274-
)
275-
pass
236+
redirect_proto = _extract_bidi_writes_redirect_proto(exc)
237+
if redirect_proto:
238+
if redirect_proto.routing_token:
239+
self._routing_token = redirect_proto.routing_token
240+
if redirect_proto.write_handle:
241+
self.write_handle = redirect_proto.write_handle
242+
if redirect_proto.generation:
243+
self.generation = redirect_proto.generation
276244

277245
async def open(
278246
self,
@@ -448,9 +416,11 @@ async def generator():
448416
if resp.persisted_size is not None:
449417
self.persisted_size = resp.persisted_size
450418
state["write_state"].persisted_size = resp.persisted_size
419+
self.offset = self.persisted_size
451420
if resp.write_handle:
452421
self.write_handle = resp.write_handle
453422
state["write_state"].write_handle = resp.write_handle
423+
self.bytes_appended_since_last_flush = 0
454424

455425
yield resp
456426

@@ -473,6 +443,8 @@ async def generator():
473443
self.write_obj_stream.persisted_size = write_state.persisted_size
474444
self.write_obj_stream.write_handle = write_state.write_handle
475445
self.bytes_appended_since_last_flush = write_state.bytes_since_last_flush
446+
self.persisted_size = write_state.persisted_size
447+
self.offset = write_state.persisted_size
476448

477449
async def simple_flush(self) -> None:
478450
"""Flushes the data to the server.
@@ -492,6 +464,7 @@ async def simple_flush(self) -> None:
492464
flush=True,
493465
)
494466
)
467+
self.bytes_appended_since_last_flush = 0
495468

496469
async def flush(self) -> int:
497470
"""Flushes the data to the server.
@@ -515,6 +488,7 @@ async def flush(self) -> int:
515488
response = await self.write_obj_stream.recv()
516489
self.persisted_size = response.persisted_size
517490
self.offset = self.persisted_size
491+
self.bytes_appended_since_last_flush = 0
518492
return self.persisted_size
519493

520494
async def close(self, finalize_on_close=False) -> Union[int, _storage_v2.Object]:

google/cloud/storage/_experimental/asyncio/async_write_object_stream.py

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -147,25 +147,9 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None:
147147
response = await self.socket_like_rpc.recv()
148148
self._is_stream_open = True
149149

150-
if response.persisted_size >= 0:
150+
if response.persisted_size:
151151
self.persisted_size = response.persisted_size
152152

153-
if response.write_handle:
154-
self.write_handle = response.write_handle
155-
# return
156-
157-
# if not response.resource:
158-
# raise ValueError(
159-
# "Failed to obtain object resource after opening the stream"
160-
# )
161-
# if not response.resource.generation:
162-
# raise ValueError(
163-
# "Failed to obtain object generation after opening the stream"
164-
# )
165-
166-
# if not response.write_handle:
167-
# raise ValueError("Failed to obtain write_handle after opening the stream")
168-
169153
if response.resource:
170154
if not response.resource.size:
171155
# Appending to a 0 byte appendable object.
@@ -174,6 +158,8 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None:
174158
self.persisted_size = response.resource.size
175159

176160
self.generation_number = response.resource.generation
161+
162+
if response.write_handle:
177163
self.write_handle = response.write_handle
178164

179165
async def close(self) -> None:

google/cloud/storage/_experimental/asyncio/retry/_helpers.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@
1818
from typing import Tuple, Optional
1919

2020
from google.api_core import exceptions
21-
from google.cloud._storage_v2.types import BidiReadObjectRedirectedError
21+
from google.cloud._storage_v2.types import BidiReadObjectRedirectedError, BidiWriteObjectRedirectedError
2222
from google.rpc import status_pb2
2323

2424
_BIDI_READ_REDIRECTED_TYPE_URL = (
2525
"type.googleapis.com/google.storage.v2.BidiReadObjectRedirectedError"
2626
)
27+
_BIDI_WRITE_REDIRECTED_TYPE_URL = (
28+
"type.googleapis.com/google.storage.v2.BidiWriteObjectRedirectedError"
29+
)
30+
logger = logging.getLogger(__name__)
2731

2832

2933
def _handle_redirect(
@@ -78,6 +82,44 @@ def _handle_redirect(
7882
read_handle = redirect_proto.read_handle
7983
break
8084
except Exception as e:
81-
logging.ERROR(f"Error unpacking redirect: {e}")
85+
logger.error(f"Error unpacking redirect: {e}")
8286

8387
return routing_token, read_handle
88+
89+
def _extract_bidi_writes_redirect_proto(exc: Exception):
90+
grpc_error = None
91+
if isinstance(exc, exceptions.Aborted) and exc.errors:
92+
grpc_error = exc.errors[0]
93+
94+
if grpc_error:
95+
if isinstance(grpc_error, BidiWriteObjectRedirectedError):
96+
return grpc_error
97+
98+
if hasattr(grpc_error, "trailing_metadata"):
99+
trailers = grpc_error.trailing_metadata()
100+
if not trailers:
101+
return
102+
103+
status_details_bin = None
104+
for key, value in trailers:
105+
if key == "grpc-status-details-bin":
106+
status_details_bin = value
107+
break
108+
109+
if status_details_bin:
110+
status_proto = status_pb2.Status()
111+
try:
112+
status_proto.ParseFromString(status_details_bin)
113+
for detail in status_proto.details:
114+
if detail.type_url == _BIDI_WRITE_REDIRECTED_TYPE_URL:
115+
redirect_proto = (
116+
BidiWriteObjectRedirectedError.deserialize(
117+
detail.value
118+
)
119+
)
120+
return redirect_proto
121+
except Exception:
122+
logger.error(
123+
"Error unpacking redirect details from gRPC error."
124+
)
125+
pass

google/cloud/storage/_experimental/asyncio/retry/writes_resumption_strategy.py

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
from google.cloud.storage._experimental.asyncio.retry.base_strategy import (
2323
_BaseResumptionStrategy,
2424
)
25+
from google.cloud.storage._experimental.asyncio.retry._helpers import (
26+
_extract_bidi_writes_redirect_proto,
27+
)
28+
2529

2630
_BIDI_WRITE_REDIRECTED_TYPE_URL = (
2731
"type.googleapis.com/google.storage.v2.BidiWriteObjectRedirectedError"
@@ -139,39 +143,17 @@ async def recover_state_on_failure(
139143
if grpc_error:
140144
# Extract routing token and potentially a new write handle for redirection.
141145
if isinstance(grpc_error, BidiWriteObjectRedirectedError):
142-
self._routing_token = grpc_error.routing_token
146+
write_state.routing_token = grpc_error.routing_token
143147
if grpc_error.write_handle:
144-
self.write_handle = grpc_error.write_handle
148+
write_state.write_handle = grpc_error.write_handle
145149
return
146-
if hasattr(grpc_error, "trailing_metadata"):
147-
trailers = grpc_error.trailing_metadata()
148-
if not trailers:
149-
return
150-
151-
status_details_bin = None
152-
for key, value in trailers:
153-
if key == "grpc-status-details-bin":
154-
status_details_bin = value
155-
break
156-
157-
if status_details_bin:
158-
status_proto = status_pb2.Status()
159-
try:
160-
status_proto.ParseFromString(status_details_bin)
161-
for detail in status_proto.details:
162-
if detail.type_url == _BIDI_WRITE_REDIRECTED_TYPE_URL:
163-
redirect_proto = (
164-
BidiWriteObjectRedirectedError.deserialize(
165-
detail.value
166-
)
167-
)
168-
if redirect_proto.routing_token:
169-
write_state._routing_token = redirect_proto.routing_token
170-
if redirect_proto.write_handle:
171-
write_state.write_handle = redirect_proto.write_handle
172-
break
173-
except Exception:
174-
pass
150+
151+
redirect_proto = _extract_bidi_writes_redirect_proto(error)
152+
if redirect_proto:
153+
if redirect_proto.routing_token:
154+
write_state.routing_token = redirect_proto.routing_token
155+
if redirect_proto.write_handle:
156+
write_state.write_handle = redirect_proto.write_handle
175157

176158
# We must assume any data sent beyond 'persisted_size' was lost.
177159
# Reset the user buffer to the last known good byte confirmed by the server.

tests/conformance/test_bidi_writes.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ def on_retry_error(exc):
8585
await writer.append(
8686
CONTENT, metadata=fault_injection_metadata, retry_policy=policy_to_pass
8787
)
88-
await writer.finalize()
89-
await writer.close()
88+
# await writer.finalize()
89+
await writer.close(finalize_on_close=True)
9090

9191
# If an exception was expected, this line should not be reached.
9292
if scenario["expected_error"] is not None:

0 commit comments

Comments
 (0)