Skip to content

Commit 21ef781

Browse files
committed
adding license
1 parent 27b9f1d commit 21ef781

File tree

4 files changed

+113
-82
lines changed

4 files changed

+113
-82
lines changed

google/cloud/storage/_experimental/asyncio/retry/base_strategy.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
115
import abc
216
from typing import Any, Iterable
317

Lines changed: 44 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,23 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
115
import asyncio
16+
import time
217
from typing import Any, AsyncIterator, Callable
318

419
from google.api_core import exceptions
20+
from google.api_core.retry.retry_base import exponential_sleep_generator
521
from google.cloud.storage._experimental.asyncio.retry.base_strategy import (
622
_BaseResumptionStrategy,
723
)
@@ -14,54 +30,54 @@ def __init__(
1430
self,
1531
strategy: _BaseResumptionStrategy,
1632
stream_opener: Callable[..., AsyncIterator[Any]],
17-
retry_policy,
1833
):
19-
"""Initializes the retry manager."""
34+
"""Initializes the retry manager.
35+
36+
Args:
37+
strategy: The strategy for managing the state of a specific
38+
bidi operation (e.g., reads or writes).
39+
stream_opener: An async callable that opens a new gRPC stream.
40+
"""
2041
self._strategy = strategy
2142
self._stream_opener = stream_opener
22-
self._retry_policy = retry_policy
2343

24-
async def execute(self, initial_state: Any):
44+
async def execute(self, initial_state: Any, retry_policy):
2545
"""
2646
Executes the bidi operation with the configured retry policy.
2747
2848
This method implements a manual retry loop that provides the necessary
29-
control points to manage state between attempts, which is not possible
30-
with a simple retry decorator.
49+
control points to manage state between attempts.
50+
51+
Args:
52+
initial_state: An object containing all state for the operation.
53+
retry_policy: The `google.api_core.retry.AsyncRetry` object to
54+
govern the retry behavior for this specific operation.
3155
"""
3256
state = initial_state
33-
retry_policy = self._retry_policy
57+
58+
deadline = time.monotonic() + retry_policy._deadline if retry_policy._deadline else 0
59+
60+
sleep_generator = exponential_sleep_generator(
61+
retry_policy._initial, retry_policy._maximum, retry_policy._multiplier
62+
)
3463

3564
while True:
3665
try:
37-
# 1. Generate requests based on the current state.
3866
requests = self._strategy.generate_requests(state)
39-
40-
# 2. Open and consume the stream.
4167
stream = self._stream_opener(requests, state)
4268
async for response in stream:
4369
self._strategy.update_state_from_response(response, state)
44-
45-
# 3. If the stream completes without error, exit the loop.
4670
return
47-
4871
except Exception as e:
49-
# 4. If an error occurs, check if it's retriable.
50-
if not retry_policy.predicate(e):
51-
# If not retriable, fail fast.
72+
if not retry_policy._predicate(e):
5273
raise
5374

54-
# 5. If retriable, allow the strategy to recover state.
55-
# This is where routing tokens are extracted or QueryWriteStatus is called.
56-
try:
57-
await self._strategy.recover_state_on_failure(e, state)
58-
except Exception as recovery_exc:
59-
# If state recovery itself fails, we must abort.
75+
await self._strategy.recover_state_on_failure(e, state)
76+
77+
sleep = next(sleep_generator)
78+
if deadline is not None and time.monotonic() + sleep > deadline:
6079
raise exceptions.RetryError(
61-
"Failed to recover state after a transient error.",
62-
cause=recovery_exc,
63-
) from recovery_exc
80+
f"Deadline of {retry_policy._deadline}s exceeded", cause=e
81+
) from e
6482

65-
# 6. Use the policy to sleep and check for deadline expiration.
66-
# This will raise a RetryError if the deadline is exceeded.
67-
await asyncio.sleep(await retry_policy.sleep(e))
83+
await asyncio.sleep(sleep)

google/cloud/storage/_experimental/asyncio/retry/reads_resumption_strategy.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
115
from typing import Any, List, IO
216

317
from google.cloud import _storage_v2 as storage_v2
Lines changed: 41 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,52 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
116
import unittest
217
from unittest import mock
318

419
import pytest
520
from google.api_core import exceptions
6-
from google.api_core.retry.retry_streaming_async import AsyncStreamingRetry
21+
from google.api_core.retry_async import AsyncRetry
722

8-
from google.cloud.storage._experimental.asyncio.retry import manager
9-
from google.cloud.storage._experimental.asyncio.retry import strategy
23+
from google.cloud.storage._experimental.asyncio.retry import bidi_stream_retry_manager as manager
24+
from google.cloud.storage._experimental.asyncio.retry import base_strategy
1025

1126

1227
def _is_retriable(exc):
1328
return isinstance(exc, exceptions.ServiceUnavailable)
1429

15-
16-
DEFAULT_TEST_RETRY = AsyncStreamingRetry(predicate=_is_retriable, deadline=1)
30+
DEFAULT_TEST_RETRY = AsyncRetry(predicate=_is_retriable, deadline=1)
1731

1832

1933
class TestBidiStreamRetryManager(unittest.IsolatedAsyncioTestCase):
2034
async def test_execute_success_on_first_try(self):
21-
"""Verify the manager correctly handles a stream that succeeds immediately."""
22-
mock_strategy = mock.AsyncMock(spec=strategy._BaseResumptionStrategy)
23-
35+
mock_strategy = mock.AsyncMock(spec=base_strategy._BaseResumptionStrategy)
2436
async def mock_stream_opener(*args, **kwargs):
2537
yield "response_1"
2638

2739
retry_manager = manager._BidiStreamRetryManager(
28-
strategy=mock_strategy,
29-
stream_opener=mock_stream_opener,
30-
retry_policy=DEFAULT_TEST_RETRY,
40+
strategy=mock_strategy, stream_opener=mock_stream_opener
3141
)
32-
33-
await retry_manager.execute(initial_state={})
34-
42+
await retry_manager.execute(initial_state={}, retry_policy=DEFAULT_TEST_RETRY)
3543
mock_strategy.generate_requests.assert_called_once()
36-
mock_strategy.update_state_from_response.assert_called_once_with(
37-
"response_1", {}
38-
)
44+
mock_strategy.update_state_from_response.assert_called_once_with("response_1", {})
3945
mock_strategy.recover_state_on_failure.assert_not_called()
4046

4147
async def test_execute_retries_and_succeeds(self):
42-
"""Verify the manager retries on a transient error and then succeeds."""
43-
mock_strategy = mock.AsyncMock(spec=strategy._BaseResumptionStrategy)
44-
48+
mock_strategy = mock.AsyncMock(spec=base_strategy._BaseResumptionStrategy)
4549
attempt_count = 0
46-
4750
async def mock_stream_opener(*args, **kwargs):
4851
nonlocal attempt_count
4952
attempt_count += 1
@@ -53,59 +56,43 @@ async def mock_stream_opener(*args, **kwargs):
5356
yield "response_2"
5457

5558
retry_manager = manager._BidiStreamRetryManager(
56-
strategy=mock_strategy,
57-
stream_opener=mock_stream_opener,
58-
retry_policy=AsyncStreamingRetry(predicate=_is_retriable, initial=0.01),
59+
strategy=mock_strategy, stream_opener=mock_stream_opener
5960
)
60-
61-
await retry_manager.execute(initial_state={})
61+
retry_policy = AsyncRetry(predicate=_is_retriable, initial=0.01)
62+
await retry_manager.execute(initial_state={}, retry_policy=retry_policy)
6263

6364
self.assertEqual(attempt_count, 2)
6465
self.assertEqual(mock_strategy.generate_requests.call_count, 2)
6566
mock_strategy.recover_state_on_failure.assert_called_once()
66-
mock_strategy.update_state_from_response.assert_called_once_with(
67-
"response_2", {}
68-
)
67+
mock_strategy.update_state_from_response.assert_called_once_with("response_2", {})
6968

7069
async def test_execute_fails_after_deadline_exceeded(self):
71-
"""Verify the manager raises RetryError if the deadline is exceeded."""
72-
mock_strategy = mock.AsyncMock(spec=strategy._BaseResumptionStrategy)
73-
70+
mock_strategy = mock.AsyncMock(spec=base_strategy._BaseResumptionStrategy)
7471
async def mock_stream_opener(*args, **kwargs):
72+
if False:
73+
yield
7574
raise exceptions.ServiceUnavailable("Service is always down")
7675

77-
# Use a very short deadline to make the test fast.
78-
fast_retry = AsyncStreamingRetry(
79-
predicate=_is_retriable, deadline=0.1, initial=0.05
80-
)
76+
fast_retry = AsyncRetry(predicate=_is_retriable, deadline=0.01, initial=0.02)
8177
retry_manager = manager._BidiStreamRetryManager(
82-
strategy=mock_strategy,
83-
stream_opener=mock_stream_opener,
84-
retry_policy=fast_retry,
78+
strategy=mock_strategy, stream_opener=mock_stream_opener
8579
)
80+
with pytest.raises(exceptions.RetryError, match="Deadline of 0.01s exceeded"):
81+
await retry_manager.execute(initial_state={}, retry_policy=fast_retry)
8682

87-
with pytest.raises(exceptions.RetryError, match="Deadline of 0.1s exceeded"):
88-
await retry_manager.execute(initial_state={})
89-
90-
# Verify it attempted to recover state after each failure.
91-
self.assertGreater(mock_strategy.recover_state_on_failure.call_count, 1)
83+
self.assertGreater(mock_strategy.recover_state_on_failure.call_count, 0)
9284

9385
async def test_execute_fails_immediately_on_non_retriable_error(self):
94-
"""Verify the manager aborts immediately on a non-retriable error."""
95-
mock_strategy = mock.AsyncMock(spec=strategy._BaseResumptionStrategy)
96-
86+
mock_strategy = mock.AsyncMock(spec=base_strategy._BaseResumptionStrategy)
9787
async def mock_stream_opener(*args, **kwargs):
88+
if False:
89+
yield
9890
raise exceptions.PermissionDenied("Auth error")
9991

10092
retry_manager = manager._BidiStreamRetryManager(
101-
strategy=mock_strategy,
102-
stream_opener=mock_stream_opener,
103-
retry_policy=DEFAULT_TEST_RETRY,
93+
strategy=mock_strategy, stream_opener=mock_stream_opener
10494
)
105-
10695
with pytest.raises(exceptions.PermissionDenied):
107-
await retry_manager.execute(initial_state={})
96+
await retry_manager.execute(initial_state={}, retry_policy=DEFAULT_TEST_RETRY)
10897

109-
# Verify that it did not try to recover or update state.
11098
mock_strategy.recover_state_on_failure.assert_not_called()
111-
mock_strategy.update_state_from_response.assert_not_called()

0 commit comments

Comments
 (0)