@@ -93,9 +93,7 @@ def build_executor_consumer(
9393) -> StreamProcessor [KafkaPayload ]:
9494 # Validate that a valid dataset/entity pair was passed in
9595 dataset = get_dataset (dataset_name )
96- dataset_entity_names = [
97- get_entity_name (e ).value for e in dataset .get_all_entities ()
98- ]
96+ dataset_entity_names = [get_entity_name (e ).value for e in dataset .get_all_entities ()]
9997
10098 # Only entities in the same dataset with the same scheduled and result topics
10199 # may be run together
@@ -197,9 +195,7 @@ def create_with_partitions(
197195 self .__total_partition_count ,
198196 self .__total_concurrent_queries ,
199197 )
200- self .__metrics .gauge (
201- "calculated_max_concurrent_queries" , calculated_max_concurrent_queries
202- )
198+ self .__metrics .gauge ("calculated_max_concurrent_queries" , calculated_max_concurrent_queries )
203199
204200 strategy : ProcessingStrategy [KafkaPayload ] = ExecuteQuery (
205201 self .__dataset ,
@@ -242,15 +238,11 @@ def __init__(
242238 self .__encoder = SubscriptionScheduledTaskEncoder ()
243239 self .__result_encoder = SubscriptionTaskResultEncoder ()
244240
245- self .__queue : Deque [
246- Tuple [Message [KafkaPayload ], SubscriptionTaskResultFuture ]
247- ] = deque ()
241+ self .__queue : Deque [Tuple [Message [KafkaPayload ], SubscriptionTaskResultFuture ]] = deque ()
248242
249243 self .__closed = False
250244
251- self .__concurrent_gauge : Gauge = ThreadSafeGauge (
252- self .__metrics , "executor.concurrent"
253- )
245+ self .__concurrent_gauge : Gauge = ThreadSafeGauge (self .__metrics , "executor.concurrent" )
254246
255247 self .__concurrent_clickhouse_gauge : Gauge = ThreadSafeGauge (
256248 self .__metrics , "executor.concurrent.clickhouse"
@@ -261,9 +253,7 @@ def __execute_query(
261253 ) -> Tuple [SubscriptionRequest , Result ]:
262254 # Measure the amount of time that took between the task's scheduled
263255 # time and it beginning to execute.
264- self .__metrics .timing (
265- "executor.latency" , (time .time () - task .timestamp .timestamp ()) * 1000
266- )
256+ self .__metrics .timing ("executor.latency" , (time .time () - task .timestamp .timestamp ()) * 1000 )
267257
268258 timer = Timer ("query" )
269259
@@ -297,16 +287,18 @@ def poll(self) -> None:
297287 try :
298288 transformed_message = message .replace (
299289 self .__result_encoder .encode (
300- SubscriptionTaskResult (
301- result_future .task , result_future .future .result ()
302- )
290+ SubscriptionTaskResult (result_future .task , result_future .future .result ())
303291 )
304292 )
305293 except QueryException as exc :
306294 cause = exc .__cause__
307295 if isinstance (cause , ClickhouseError ):
308296 if cause .code in NON_RETRYABLE_CLICKHOUSE_ERROR_CODES :
309297 logger .exception ("Error running subscription query %r" , exc )
298+ self .__metrics .increment (
299+ "subscription_executor_nonretryable_error" ,
300+ tags = {"error_type" : str (cause .code )},
301+ )
310302 else :
311303 raise SubscriptionQueryException (exc .message )
312304
@@ -321,9 +313,7 @@ def submit(self, message: Message[KafkaPayload]) -> None:
321313 # we will start raising MessageRejected to slow down the consumer as
322314 # it means our executor cannot keep up
323315 queue_size_factor = state .get_config ("executor_queue_size_factor" , 10 )
324- assert (
325- queue_size_factor is not None
326- ), "Invalid executor_queue_size_factor config"
316+ assert queue_size_factor is not None , "Invalid executor_queue_size_factor config"
327317 max_queue_size = self .__max_concurrent_queries * queue_size_factor
328318
329319 # Tell the consumer to pause until we have removed some futures from
@@ -343,8 +333,7 @@ def submit(self, message: Message[KafkaPayload]) -> None:
343333 # Don't execute stale subscriptions
344334 if (
345335 self .__stale_threshold_seconds is not None
346- and time .time () - datetime .timestamp (task .timestamp )
347- >= self .__stale_threshold_seconds
336+ and time .time () - datetime .timestamp (task .timestamp ) >= self .__stale_threshold_seconds
348337 ):
349338 should_execute = False
350339
@@ -354,9 +343,7 @@ def submit(self, message: Message[KafkaPayload]) -> None:
354343 message ,
355344 SubscriptionTaskResultFuture (
356345 task ,
357- self .__executor .submit (
358- self .__execute_query , task , tick_upper_offset
359- ),
346+ self .__executor .submit (self .__execute_query , task , tick_upper_offset ),
360347 ),
361348 )
362349 )
@@ -385,9 +372,7 @@ def join(self, timeout: Optional[float] = None) -> None:
385372 message , result_future = self .__queue .popleft ()
386373
387374 transformed_message = self .__result_encoder .encode (
388- SubscriptionTaskResult (
389- result_future .task , result_future .future .result (remaining )
390- )
375+ SubscriptionTaskResult (result_future .task , result_future .future .result (remaining ))
391376 )
392377
393378 self .__next_step .submit (message .replace (transformed_message ))
0 commit comments