Skip to content

Commit e081beb

Browse files
authored
Fix: Surface Fatal Stream Errors to Future; Adjust Retryable Error Codes (#1422)
1 parent 272b09f commit e081beb

File tree

2 files changed

+92
-15
lines changed

2 files changed

+92
-15
lines changed

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import collections
1818
import functools
19+
import inspect
1920
import itertools
2021
import logging
2122
import threading
@@ -62,14 +63,22 @@
6263
_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
6364
_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"
6465
_RETRYABLE_STREAM_ERRORS = (
66+
exceptions.Aborted,
6567
exceptions.DeadlineExceeded,
66-
exceptions.ServiceUnavailable,
68+
exceptions.GatewayTimeout,
6769
exceptions.InternalServerError,
70+
exceptions.ResourceExhausted,
71+
exceptions.ServiceUnavailable,
6872
exceptions.Unknown,
69-
exceptions.GatewayTimeout,
70-
exceptions.Aborted,
7173
)
72-
_TERMINATING_STREAM_ERRORS = (exceptions.Cancelled,)
74+
_TERMINATING_STREAM_ERRORS = (
75+
exceptions.Cancelled,
76+
exceptions.InvalidArgument,
77+
exceptions.NotFound,
78+
exceptions.PermissionDenied,
79+
exceptions.Unauthenticated,
80+
exceptions.Unauthorized,
81+
)
7382
_MAX_LOAD = 1.0
7483
"""The load threshold above which to pause the incoming message stream."""
7584

@@ -98,6 +107,13 @@
98107
code_pb2.UNAVAILABLE,
99108
}
100109

110+
# `on_fatal_exception` was added in `google-api-core v2.25.1``, which allows us to inform
111+
# callers on unrecoverable errors. We can only pass this arg if it's available in the
112+
# `BackgroundConsumer` spec.
113+
_SHOULD_USE_ON_FATAL_ERROR_CALLBACK = "on_fatal_exception" in inspect.getfullargspec(
114+
bidi.BackgroundConsumer
115+
)
116+
101117

102118
def _wrap_as_exception(maybe_exception: Any) -> BaseException:
103119
"""Wrap an object as a Python exception, if needed.
@@ -876,7 +892,18 @@ def open(
876892
assert self._scheduler is not None
877893
scheduler_queue = self._scheduler.queue
878894
self._dispatcher = dispatcher.Dispatcher(self, scheduler_queue)
879-
self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
895+
896+
# `on_fatal_exception` is only available in more recent library versions.
897+
# For backwards compatibility reasons, we only pass it when `google-api-core` supports it.
898+
if _SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
899+
self._consumer = bidi.BackgroundConsumer(
900+
self._rpc,
901+
self._on_response,
902+
on_fatal_exception=self._on_fatal_exception,
903+
)
904+
else:
905+
self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
906+
880907
self._leaser = leaser.Leaser(self)
881908
self._heartbeater = heartbeater.Heartbeater(self)
882909

@@ -1247,6 +1274,17 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
12471274

12481275
self.maybe_pause_consumer()
12491276

1277+
def _on_fatal_exception(self, exception: BaseException) -> None:
1278+
"""
1279+
Called whenever `self.consumer` receives a non-retryable exception.
1280+
We close the manager on such non-retryable cases.
1281+
"""
1282+
_LOGGER.exception(
1283+
"Streaming pull terminating after receiving non-recoverable error: %s",
1284+
exception,
1285+
)
1286+
self.close(exception)
1287+
12501288
def _should_recover(self, exception: BaseException) -> bool:
12511289
"""Determine if an error on the RPC stream should be recovered.
12521290
@@ -1283,8 +1321,10 @@ def _should_terminate(self, exception: BaseException) -> bool:
12831321
in a list of terminating exceptions.
12841322
"""
12851323
exception = _wrap_as_exception(exception)
1286-
if isinstance(exception, _TERMINATING_STREAM_ERRORS):
1287-
_LOGGER.debug("Observed terminating stream error %s", exception)
1324+
is_api_error = isinstance(exception, exceptions.GoogleAPICallError)
1325+
# Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.)
1326+
if not is_api_error or isinstance(exception, _TERMINATING_STREAM_ERRORS):
1327+
_LOGGER.error("Observed terminating stream error %s", exception)
12881328
return True
12891329
_LOGGER.debug("Observed non-terminating stream error %s", exception)
12901330
return False

tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,7 +1333,13 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
13331333
leaser.return_value.start.assert_called_once()
13341334
assert manager.leaser == leaser.return_value
13351335

1336-
background_consumer.assert_called_once_with(manager._rpc, manager._on_response)
1336+
if streaming_pull_manager._SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
1337+
background_consumer.assert_called_once_with(
1338+
manager._rpc, manager._on_response, manager._on_fatal_exception
1339+
)
1340+
else:
1341+
background_consumer.assert_called_once_with(manager._rpc, manager._on_response)
1342+
13371343
background_consumer.return_value.start.assert_called_once()
13381344
assert manager._consumer == background_consumer.return_value
13391345

@@ -1432,6 +1438,31 @@ def test_close():
14321438
assert manager.is_active is False
14331439

14341440

1441+
def test_closes_on_fatal_consumer_error():
1442+
(
1443+
manager,
1444+
consumer,
1445+
dispatcher,
1446+
leaser,
1447+
heartbeater,
1448+
scheduler,
1449+
) = make_running_manager()
1450+
1451+
if streaming_pull_manager._SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
1452+
error = ValueError("some fatal exception")
1453+
manager._on_fatal_exception(error)
1454+
1455+
await_manager_shutdown(manager, timeout=3)
1456+
1457+
consumer.stop.assert_called_once()
1458+
leaser.stop.assert_called_once()
1459+
dispatcher.stop.assert_called_once()
1460+
heartbeater.stop.assert_called_once()
1461+
scheduler.shutdown.assert_called_once()
1462+
1463+
assert manager.is_active is False
1464+
1465+
14351466
def test_close_inactive_consumer():
14361467
(
14371468
manager,
@@ -2270,18 +2301,24 @@ def test__should_recover_false():
22702301
def test__should_terminate_true():
22712302
manager = make_manager()
22722303

2273-
details = "Cancelled. Go away, before I taunt you a second time."
2274-
exc = exceptions.Cancelled(details)
2275-
2276-
assert manager._should_terminate(exc) is True
2304+
for exc in [
2305+
exceptions.Cancelled(""),
2306+
exceptions.PermissionDenied(""),
2307+
TypeError(),
2308+
ValueError(),
2309+
]:
2310+
assert manager._should_terminate(exc)
22772311

22782312

22792313
def test__should_terminate_false():
22802314
manager = make_manager()
22812315

2282-
exc = TypeError("wahhhhhh")
2283-
2284-
assert manager._should_terminate(exc) is False
2316+
for exc in [
2317+
exceptions.ResourceExhausted(""),
2318+
exceptions.ServiceUnavailable(""),
2319+
exceptions.DeadlineExceeded(""),
2320+
]:
2321+
assert not manager._should_terminate(exc)
22852322

22862323

22872324
@mock.patch("threading.Thread", autospec=True)

0 commit comments

Comments
 (0)