88 import asyncio
99 import logging
1010 import time
11- import warnings
1211 from collections .abc import Awaitable
1312 from collections .abc import Callable
1413 from typing import Optional
1817 from . import metrics
1918 from .subscriber_client import SubscriberClient
2019 from .subscriber_message import SubscriberMessage
21- from .metrics_agent import MetricsAgent
2220
2321 log = logging .getLogger (__name__ )
2422
@@ -97,7 +95,6 @@ async def acker(
9795 ack_queue : 'asyncio.Queue[str]' ,
9896 subscriber_client : 'SubscriberClient' ,
9997 ack_window : float ,
100- metrics_client : MetricsAgent ,
10198 ) -> None :
10299 ack_ids : list [str ] = []
103100 while True :
@@ -160,7 +157,6 @@ async def maybe_ack(ack_id: str) -> None:
160157 exc_info = e ,
161158 extra = {'exc_message' : str (e )},
162159 )
163- metrics_client .increment ('pubsub.acker.batch.failed' )
164160 metrics .BATCH_STATUS .labels (
165161 component = 'acker' ,
166162 outcome = 'failed' ,
@@ -175,15 +171,13 @@ async def maybe_ack(ack_id: str) -> None:
175171 exc_info = e ,
176172 extra = {'exc_message' : str (e )},
177173 )
178- metrics_client .increment ('pubsub.acker.batch.failed' )
179174 metrics .BATCH_STATUS .labels (
180175 component = 'acker' ,
181176 outcome = 'failed' ,
182177 ).inc ()
183178
184179 continue
185180
186- metrics_client .histogram ('pubsub.acker.batch' , len (ack_ids ))
187181 metrics .BATCH_STATUS .labels (
188182 component = 'acker' ,
189183 outcome = 'succeeded' ,
@@ -199,7 +193,6 @@ async def nacker(
199193 nack_queue : 'asyncio.Queue[str]' ,
200194 subscriber_client : 'SubscriberClient' ,
201195 nack_window : float ,
202- metrics_client : MetricsAgent ,
203196 ) -> None :
204197 ack_ids : list [str ] = []
205198 while True :
@@ -264,7 +257,6 @@ async def maybe_nack(ack_id: str) -> None:
264257 exc_info = e ,
265258 extra = {'exc_message' : str (e )},
266259 )
267- metrics_client .increment ('pubsub.nacker.batch.failed' )
268260 metrics .BATCH_STATUS .labels (
269261 component = 'nacker' , outcome = 'failed' ,
270262 ).inc ()
@@ -278,14 +270,12 @@ async def maybe_nack(ack_id: str) -> None:
278270 exc_info = e ,
279271 extra = {'exc_message' : str (e )},
280272 )
281- metrics_client .increment ('pubsub.nacker.batch.failed' )
282273 metrics .BATCH_STATUS .labels (
283274 component = 'nacker' , outcome = 'failed' ,
284275 ).inc ()
285276
286277 continue
287278
288- metrics_client .histogram ('pubsub.nacker.batch' , len (ack_ids ))
289279 metrics .BATCH_STATUS .labels (
290280 component = 'nacker' ,
291281 outcome = 'succeeded' ,
@@ -302,7 +292,6 @@ async def _execute_callback(
302292 ack_queue : 'asyncio.Queue[str]' ,
303293 nack_queue : 'Optional[asyncio.Queue[str]]' ,
304294 insertion_time : float ,
305- metrics_client : MetricsAgent ,
306295 ) -> None :
307296 try :
308297 start = time .perf_counter ()
@@ -312,19 +301,13 @@ async def _execute_callback(
312301 with metrics .CONSUME_LATENCY .labels (phase = 'runtime' ).time ():
313302 await callback (message )
314303 await ack_queue .put (message .ack_id )
315- metrics_client .histogram (
316- 'pubsub.consumer.latency.runtime' ,
317- time .perf_counter () - start ,
318- )
319- metrics_client .increment ('pubsub.consumer.succeeded' )
320304 metrics .CONSUME .labels (outcome = 'succeeded' ).inc ()
321305
322306 except asyncio .CancelledError :
323307 if nack_queue :
324308 await nack_queue .put (message .ack_id )
325309
326310 log .warning ('application callback was cancelled' )
327- metrics_client .increment ('pubsub.consumer.cancelled' )
328311 metrics .CONSUME .labels (outcome = 'cancelled' ).inc ()
329312 except Exception as e :
330313 if nack_queue :
@@ -335,7 +318,6 @@ async def _execute_callback(
335318 exc_info = e ,
336319 extra = {'exc_message' : str (e )},
337320 )
338- metrics_client .increment ('pubsub.consumer.failed' )
339321 metrics .CONSUME .labels (outcome = 'failed' ).inc ()
340322
341323 async def consumer ( # pylint: disable=too-many-locals
@@ -345,7 +327,6 @@ async def consumer( # pylint: disable=too-many-locals
345327 ack_deadline_cache : AckDeadlineCache ,
346328 max_tasks : int ,
347329 nack_queue : 'Optional[asyncio.Queue[str]]' ,
348- metrics_client : MetricsAgent ,
349330 ) -> None :
350331 try :
351332 semaphore = asyncio .Semaphore (max_tasks )
@@ -358,7 +339,6 @@ async def _consume_one(
358339
359340 ack_deadline = await ack_deadline_cache .get ()
360341 if (time .perf_counter () - pulled_at ) >= ack_deadline :
361- metrics_client .increment ('pubsub.consumer.failfast' )
362342 metrics .CONSUME .labels (outcome = 'failfast' ).inc ()
363343 message_queue .task_done ()
364344 semaphore .release ()
@@ -367,9 +347,6 @@ async def _consume_one(
367347 # publish_time is in UTC Zulu
368348 # https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
369349 recv_latency = time .time () - message .publish_time .timestamp ()
370- metrics_client .histogram (
371- 'pubsub.consumer.latency.receive' , recv_latency ,
372- )
373350 metrics .CONSUME_LATENCY .labels (phase = 'receive' ).observe (
374351 recv_latency ,
375352 )
@@ -381,7 +358,6 @@ async def _consume_one(
381358 ack_queue ,
382359 nack_queue ,
383360 time .perf_counter (),
384- metrics_client ,
385361 ),
386362 )
387363 task .add_done_callback (lambda _f : semaphore .release ())
@@ -407,7 +383,6 @@ async def producer(
407383 message_queue : MessageQueue ,
408384 subscriber_client : 'SubscriberClient' ,
409385 max_messages : int ,
410- metrics_client : MetricsAgent ,
411386 ) -> None :
412387 try :
413388 while True :
@@ -429,9 +404,6 @@ async def producer(
429404 except (asyncio .TimeoutError , KeyError ):
430405 continue
431406
432- metrics_client .histogram (
433- 'pubsub.producer.batch' , len (new_messages ),
434- )
435407 metrics .MESSAGES_RECEIVED .inc (len (new_messages ))
436408 metrics .BATCH_SIZE .observe (len (new_messages ))
437409
@@ -473,7 +445,6 @@ async def subscribe(
473445 num_tasks_per_consumer : int = 1 ,
474446 enable_nack : bool = True ,
475447 nack_window : float = 0.3 ,
476- metrics_client : MetricsAgent | None = None ,
477448 ) -> None :
478449 # pylint: disable=too-many-locals
479450 ack_queue : 'asyncio.Queue[str]' = asyncio .Queue (
@@ -487,13 +458,6 @@ async def subscribe(
487458 ack_deadline ,
488459 )
489460
490- if metrics_client is not None :
491- warnings .warn (
492- 'Using MetricsAgent in subscribe() is deprecated. '
493- 'Refer to Prometheus metrics instead.' ,
494- DeprecationWarning ,
495- )
496- metrics_client = metrics_client or MetricsAgent ()
497461 acker_tasks = []
498462 consumer_tasks = []
499463 producer_tasks = []
@@ -502,7 +466,7 @@ async def subscribe(
502466 asyncio .ensure_future (
503467 acker (
504468 subscription , ack_queue , subscriber_client ,
505- ack_window = ack_window , metrics_client = metrics_client ,
469+ ack_window = ack_window ,
506470 ),
507471 ),
508472 )
@@ -515,7 +479,6 @@ async def subscribe(
515479 nacker (
516480 subscription , nack_queue , subscriber_client ,
517481 nack_window = nack_window ,
518- metrics_client = metrics_client ,
519482 ),
520483 ),
521484 )
@@ -532,7 +495,6 @@ async def subscribe(
532495 ack_deadline_cache ,
533496 num_tasks_per_consumer ,
534497 nack_queue ,
535- metrics_client = metrics_client ,
536498 ),
537499 ),
538500 )
@@ -543,7 +505,6 @@ async def subscribe(
543505 q ,
544506 subscriber_client ,
545507 max_messages = max_messages_per_producer ,
546- metrics_client = metrics_client ,
547508 ),
548509 ),
549510 )
0 commit comments