Skip to content

Commit c10a7e6

Browse files
committed
try reverting
1 parent 444ba08 commit c10a7e6

File tree

1 file changed

+13
-67
lines changed

1 file changed

+13
-67
lines changed

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 13 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,6 @@
6060

6161

6262
_LOGGER = logging.getLogger(__name__)
63-
_SLOW_ACK_LOGGER = logging.getLogger("slow-ack")
64-
_STREAMS_LOGGER = logging.getLogger("subscriber-streams")
65-
_FLOW_CONTROL_LOGGER = logging.getLogger("subscriber-flow-control")
66-
_CALLBACK_DELIVERY_LOGGER = logging.getLogger("callback-delivery")
67-
_CALLBACK_EXCEPTION_LOGGER = logging.getLogger("callback-exceptions")
68-
_EXPIRY_LOGGER = logging.getLogger("expiry")
6963
_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
7064
_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"
7165
_RETRYABLE_STREAM_ERRORS = (
@@ -151,14 +145,6 @@ def _wrap_callback_errors(
151145
callback: The user callback.
152146
message: The Pub/Sub message.
153147
"""
154-
_CALLBACK_DELIVERY_LOGGER.debug(
155-
"Message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=%s) received by subscriber callback",
156-
message.message_id,
157-
message.ack_id,
158-
message.ordering_key,
159-
message.exactly_once_enabled,
160-
)
161-
162148
try:
163149
if message.opentelemetry_data:
164150
message.opentelemetry_data.end_subscribe_concurrency_control_span()
@@ -170,15 +156,9 @@ def _wrap_callback_errors(
170156
# Note: the likelihood of this failing is extremely low. This just adds
171157
# a message to a queue, so if this doesn't work the world is in an
172158
# unrecoverable state and this thread should just bail.
173-
174-
_CALLBACK_EXCEPTION_LOGGER.exception(
175-
"Message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=%s)'s callback threw exception, nacking message.",
176-
message.message_id,
177-
message.ack_id,
178-
message.ordering_key,
179-
message.exactly_once_enabled,
159+
_LOGGER.exception(
160+
"Top-level exception occurred in callback while processing a message"
180161
)
181-
182162
message.nack()
183163
on_callback_error(exc)
184164

@@ -219,7 +199,6 @@ def _process_requests(
219199
error_status: Optional["status_pb2.Status"],
220200
ack_reqs_dict: Dict[str, requests.AckRequest],
221201
errors_dict: Optional[Dict[str, str]],
222-
ack_histogram: Optional[histogram.Histogram] = None,
223202
):
224203
"""Process requests when exactly-once delivery is enabled by referring to
225204
error_status and errors_dict.
@@ -231,16 +210,6 @@ def _process_requests(
231210
requests_completed = []
232211
requests_to_retry = []
233212
for ack_id in ack_reqs_dict:
234-
# Debug logging: slow acks
235-
if ack_histogram and ack_reqs_dict[
236-
ack_id
237-
].time_to_ack > ack_histogram.percentile(percent=99):
238-
_SLOW_ACK_LOGGER.debug(
239-
"Message (id=%s, ack_id=%s) ack duration of %s s is higher than the p99 ack duration",
240-
ack_reqs_dict[ack_id].message_id,
241-
ack_reqs_dict[ack_id].ack_id,
242-
)
243-
244213
# Handle special errors returned for ack/modack RPCs via the ErrorInfo
245214
# sidecar metadata when exactly-once delivery is enabled.
246215
if errors_dict and ack_id in errors_dict:
@@ -591,10 +560,8 @@ def maybe_pause_consumer(self) -> None:
591560
with self._pause_resume_lock:
592561
if self.load >= _MAX_LOAD:
593562
if self._consumer is not None and not self._consumer.is_paused:
594-
_FLOW_CONTROL_LOGGER.debug(
595-
"Message backlog over load at %.2f (threshold %.2f), initiating client-side flow control",
596-
self.load,
597-
_RESUME_THRESHOLD,
563+
_LOGGER.debug(
564+
"Message backlog over load at %.2f, pausing.", self.load
598565
)
599566
self._consumer.pause()
600567

@@ -621,18 +588,10 @@ def maybe_resume_consumer(self) -> None:
621588
self._maybe_release_messages()
622589

623590
if self.load < _RESUME_THRESHOLD:
624-
_FLOW_CONTROL_LOGGER.debug(
625-
"Current load is %.2f (threshold %.2f), suspending client-side flow control.",
626-
self.load,
627-
_RESUME_THRESHOLD,
628-
)
591+
_LOGGER.debug("Current load is %.2f, resuming consumer.", self.load)
629592
self._consumer.resume()
630593
else:
631-
_FLOW_CONTROL_LOGGER.debug(
632-
"Current load is %.2f (threshold %.2f), retaining client-side flow control.",
633-
self.load,
634-
_RESUME_THRESHOLD,
635-
)
594+
_LOGGER.debug("Did not resume, current load is %.2f.", self.load)
636595

637596
def _maybe_release_messages(self) -> None:
638597
"""Release (some of) the held messages if the current load allows for it.
@@ -743,7 +702,7 @@ def send_unary_ack(
743702

744703
if self._exactly_once_delivery_enabled():
745704
requests_completed, requests_to_retry = _process_requests(
746-
error_status, ack_reqs_dict, ack_errors_dict, self.ack_histogram
705+
error_status, ack_reqs_dict, ack_errors_dict
747706
)
748707
else:
749708
requests_completed = []
@@ -837,7 +796,7 @@ def send_unary_modack(
837796

838797
if self._exactly_once_delivery_enabled():
839798
requests_completed, requests_to_retry = _process_requests(
840-
error_status, ack_reqs_dict, modack_errors_dict, self.ack_histogram
799+
error_status, ack_reqs_dict, modack_errors_dict
841800
)
842801
else:
843802
requests_completed = []
@@ -1280,11 +1239,6 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
12801239
receipt_modack=True,
12811240
)
12821241

1283-
if len(expired_ack_ids):
1284-
_EXPIRY_LOGGER.debug(
1285-
"ack ids %s were dropped as they have already expired."
1286-
)
1287-
12881242
with self._pause_resume_lock:
12891243
if self._scheduler is None or self._leaser is None:
12901244
_LOGGER.debug(
@@ -1350,13 +1304,9 @@ def _should_recover(self, exception: BaseException) -> bool:
13501304
# If this is in the list of idempotent exceptions, then we want to
13511305
# recover.
13521306
if isinstance(exception, _RETRYABLE_STREAM_ERRORS):
1353-
_STREAMS_LOGGER.debug(
1354-
"Observed recoverable stream error %s, reopening stream", exception
1355-
)
1307+
_LOGGER.debug("Observed recoverable stream error %s", exception)
13561308
return True
1357-
_STREAMS_LOGGER.debug(
1358-
"Observed non-recoverable stream error %s, shutting down stream", exception
1359-
)
1309+
_LOGGER.debug("Observed non-recoverable stream error %s", exception)
13601310
return False
13611311

13621312
def _should_terminate(self, exception: BaseException) -> bool:
@@ -1376,13 +1326,9 @@ def _should_terminate(self, exception: BaseException) -> bool:
13761326
is_api_error = isinstance(exception, exceptions.GoogleAPICallError)
13771327
# Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.)
13781328
if not is_api_error or isinstance(exception, _TERMINATING_STREAM_ERRORS):
1379-
_STREAMS_LOGGER.debug(
1380-
"Observed terminating stream error %s, shutting down stream", exception
1381-
)
1329+
_LOGGER.debug("Observed terminating stream error %s", exception)
13821330
return True
1383-
_STREAMS_LOGGER.debug(
1384-
"Observed non-terminating stream error %s, attempting to reopen", exception
1385-
)
1331+
_LOGGER.debug("Observed non-terminating stream error %s", exception)
13861332
return False
13871333

13881334
def _on_rpc_done(self, future: Any) -> None:
@@ -1402,4 +1348,4 @@ def _on_rpc_done(self, future: Any) -> None:
14021348
name=_RPC_ERROR_THREAD_NAME, target=self._shutdown, kwargs={"reason": error}
14031349
)
14041350
thread.daemon = True
1405-
thread.start()
1351+
thread.start()

0 commit comments

Comments
 (0)