Skip to content

Commit c4752ca

Browse files
committed
let's see if it fails now
1 parent 3bdbd2e commit c4752ca

File tree

2 files changed

+8
-12
lines changed

2 files changed

+8
-12
lines changed

python/integration_tests/test_consumer_rebalancing.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def test_tasks_written_once_during_rebalancing() -> None:
5656
consumer_path = str(TASKBROKER_BIN)
5757
num_consumers = 8
5858
num_messages = 100_000
59-
num_restarts = 16
59+
num_restarts = 1
6060
num_partitions = 32
6161
min_restart_duration = 1
6262
max_restart_duration = 30
@@ -206,10 +206,7 @@ def test_tasks_written_once_during_rebalancing() -> None:
206206
consumers_have_error = False
207207
for i in range(num_consumers):
208208
with open(str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), "r") as f:
209-
consumers_have_error = (
210-
consumers_have_error
211-
or b"[31mERROR\x1b".decode("utf-8", "strict") in f.read()
212-
)
209+
consumers_have_error = consumers_have_error or "[31mERROR" in f.read()
213210

214211
if not all([row[3] == 0 for row in row_count]):
215212
print("Test failed! Got duplicate/missing kafka messages in sqlite")
@@ -239,9 +236,8 @@ def test_tasks_written_once_during_rebalancing() -> None:
239236
print(f"Cleaning up test output files in {TESTS_OUTPUT_PATH}")
240237
shutil.rmtree(TESTS_OUTPUT_PATH)
241238

242-
if (
243-
not all([row[3] == 0 for row in row_count])
244-
or not consumers_have_data
245-
or consumers_have_error
246-
):
247-
assert False
239+
assert (
240+
all([row[3] == 0 for row in row_count])
241+
and consumers_have_data
242+
and not consumers_have_error
243+
)

src/consumer/kafka.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ pub struct ActorHandles {
172172
impl ActorHandles {
173173
#[instrument(skip(self))]
174174
async fn shutdown(mut self, deadline: Duration) {
175-
debug!("Signaling shutdown to actors...");
175+
error!("Signaling shutdown to actors...");
176176
self.shutdown.cancel();
177177
info!("Actor shutdown signaled, waiting for rendezvous...");
178178

0 commit comments

Comments
 (0)