Skip to content

Commit 7cb0b8c

Browse files
authored
Add shutdown_strategy_before_consumer flag (#467)
* Add shutdown_strategy_before_consumer flag Full stacktrace we are encountering: File "sentry/utils/kafka.py", line 52, in run_processor_with_signals processor.run() File "arroyo/processing/processor.py", line 335, in run self._run_once() File "arroyo/processing/processor.py", line 418, in _run_once self.__processing_strategy.poll() File "arroyo/processing/strategies/healthcheck.py", line 32, in poll self.__next_step.poll() File "arroyo/processing/strategies/guard.py", line 101, in poll self.__inner_strategy.poll() File "arroyo/processing/strategies/run_task.py", line 55, in poll self.__next_step.poll() File "arroyo/processing/strategies/guard.py", line 37, in poll self.__next_step.poll() File "arroyo/processing/strategies/batching.py", line 82, in poll self.__reduce_step.poll() File "arroyo/processing/strategies/reduce.py", line 110, in poll self.__buffer_step.poll() File "arroyo/processing/strategies/buffer.py", line 165, in poll self.__flush(force=False) File "arroyo/processing/strategies/buffer.py", line 125, in __flush self.__next_step.submit(buffer_msg) File "arroyo/processing/strategies/guard.py", line 82, in submit self.__inner_strategy.submit(message) File "arroyo/processing/strategies/run_task.py", line 52, in submit self.__next_step.submit(Message(value)) File "arroyo/processing/strategies/guard.py", line 34, in submit self.__next_step.submit(message) File "sentry/spans/consumers/process/flusher.py", line 316, in submit self.next_step.submit(message) File "arroyo/processing/strategies/commit.py", line 34, in submit self.__commit(message.committable) File "arroyo/processing/processor.py", line 321, in __commit self.__consumer.commit_offsets() File "arroyo/backends/kafka/consumer.py", line 624, in commit_offsets return self.__commit_retry_policy.call(self.__commit) File "arroyo/utils/retries.py", line 88, in call return callable() File "arroyo/backends/kafka/consumer.py", line 582, in __commit result = self.__consumer.commit( KafkaException: KafkaError{code=UNKNOWN_MEMBER_ID,val=25,str="Commit failed: Broker: Unknown member"} * dedupe code
1 parent c5614d7 commit 7cb0b8c

File tree

1 file changed

+51
-43
lines changed

1 file changed

+51
-43
lines changed

arroyo/processing/processor.py

Lines changed: 51 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ def __init__(
139139
commit_policy: CommitPolicy = ONCE_PER_SECOND,
140140
dlq_policy: Optional[DlqPolicy[TStrategyPayload]] = None,
141141
join_timeout: Optional[float] = None,
142+
shutdown_strategy_before_consumer: bool = False,
142143
) -> None:
143144
self.__consumer = consumer
144145
self.__processor_factory = processor_factory
@@ -158,6 +159,7 @@ def __init__(
158159
self.__commit_policy_state = commit_policy.get_state_machine()
159160
self.__join_timeout = join_timeout
160161
self.__shutdown_requested = False
162+
self.__shutdown_strategy_before_consumer = shutdown_strategy_before_consumer
161163

162164
# Buffers messages for DLQ. Messages are added when they are submitted for processing and
163165
# removed once the commit callback is fired as they are guaranteed to be valid at that point.
@@ -170,49 +172,7 @@ def __init__(
170172
)
171173

172174
def _close_strategy() -> None:
173-
start_close = time.time()
174-
175-
if self.__processing_strategy is None:
176-
# Partitions are revoked when the consumer is shutting down, at
177-
# which point we already have closed the consumer.
178-
return
179-
180-
logger.info("Closing %r...", self.__processing_strategy)
181-
self.__processing_strategy.close()
182-
183-
logger.info("Waiting for %r to exit...", self.__processing_strategy)
184-
185-
while True:
186-
start_join = time.time()
187-
188-
try:
189-
self.__processing_strategy.join(self.__join_timeout)
190-
self.__metrics_buffer.incr_timing(
191-
"arroyo.consumer.join.time", time.time() - start_join
192-
)
193-
break
194-
except InvalidMessage as e:
195-
self.__metrics_buffer.incr_timing(
196-
"arroyo.consumer.join.time", time.time() - start_join
197-
)
198-
self._handle_invalid_message(e)
199-
200-
logger.info(
201-
"%r exited successfully, releasing assignment.",
202-
self.__processing_strategy,
203-
)
204-
self.__processing_strategy = None
205-
self.__message = None # avoid leaking buffered messages across assignments
206-
self.__is_paused = False
207-
self._clear_backpressure()
208-
209-
value = time.time() - start_close
210-
211-
self.__metrics_buffer.metrics.timing(
212-
"arroyo.consumer.run.close_strategy", value
213-
)
214-
215-
self.__metrics_buffer.incr_timing("arroyo.consumer.shutdown.time", value)
175+
self._close_processing_strategy()
216176

217177
def _create_strategy(partitions: Mapping[Partition, int]) -> None:
218178
start_create = time.time()
@@ -302,6 +262,47 @@ def on_partitions_revoked(partitions: Sequence[Partition]) -> None:
302262
[topic], on_assign=on_partitions_assigned, on_revoke=on_partitions_revoked
303263
)
304264

265+
def _close_processing_strategy(self) -> None:
266+
"""Close the processing strategy and wait for it to exit."""
267+
start_close = time.time()
268+
269+
if self.__processing_strategy is None:
270+
# Partitions are revoked when the consumer is shutting down, at
271+
# which point we already have closed the consumer.
272+
return
273+
274+
logger.info("Closing %r...", self.__processing_strategy)
275+
self.__processing_strategy.close()
276+
277+
logger.info("Waiting for %r to exit...", self.__processing_strategy)
278+
279+
while True:
280+
start_join = time.time()
281+
282+
try:
283+
self.__processing_strategy.join(self.__join_timeout)
284+
self.__metrics_buffer.incr_timing(
285+
"arroyo.consumer.join.time", time.time() - start_join
286+
)
287+
break
288+
except InvalidMessage as e:
289+
self.__metrics_buffer.incr_timing(
290+
"arroyo.consumer.join.time", time.time() - start_join
291+
)
292+
self._handle_invalid_message(e)
293+
294+
logger.info("%r exited successfully", self.__processing_strategy)
295+
self.__processing_strategy = None
296+
self.__message = None
297+
self.__is_paused = False
298+
self._clear_backpressure()
299+
300+
value = time.time() - start_close
301+
self.__metrics_buffer.metrics.timing(
302+
"arroyo.consumer.run.close_strategy", value
303+
)
304+
self.__metrics_buffer.incr_timing("arroyo.consumer.shutdown.time", value)
305+
305306
def __commit(self, offsets: Mapping[Partition, int], force: bool = False) -> None:
306307
"""
307308
If force is passed, commit immediately and do not throttle. This should
@@ -518,6 +519,13 @@ def signal_shutdown(self) -> None:
518519
self.__shutdown_requested = True
519520

520521
def _shutdown(self) -> None:
522+
# If shutdown_strategy_before_consumer is set, work around an issue
523+
# where rdkafka would revoke our partition, but then also immediately
524+
# revoke our member ID as well, causing join() of the CommitStrategy
525+
# (that is running in the partition revocation callback) to crash.
526+
if self.__shutdown_strategy_before_consumer:
527+
self._close_processing_strategy()
528+
521529
# close the consumer
522530
logger.info("Stopping consumer")
523531
self.__metrics_buffer.flush()

0 commit comments

Comments
 (0)