2222 clickhouse_path_dir = "clickhouse_path" ,
2323)
2424
25+
2526# Fixtures
2627@pytest .fixture (scope = "module" )
2728def kafka_cluster ():
@@ -33,6 +34,7 @@ def kafka_cluster():
3334 finally :
3435 cluster .shutdown ()
3536
37+
3638@pytest .fixture (autouse = True )
3739def kafka_setup_teardown ():
3840 instance .query ("DROP DATABASE IF EXISTS test SYNC; CREATE DATABASE test;" )
@@ -60,8 +62,10 @@ def get_topics_to_delete():
6062 time .sleep (0.5 )
6163 yield # run test
6264
65+
6366# Tests
6467
68+
6569def test_kafka_handling_commit_failure (kafka_cluster ):
6670 messages = [json .dumps ({"key" : j + 1 , "value" : "x" * 300 }) for j in range (22 )]
6771 k .kafka_produce (kafka_cluster , "handling_commit_failure" , messages )
@@ -100,9 +104,7 @@ def test_kafka_handling_commit_failure(kafka_cluster):
100104 # while materialized view is working to inject zookeeper failure
101105
102106 with kafka_cluster .pause_container ("kafka1" ):
103- instance .wait_for_log_line (
104- "timeout" , timeout = 60 , look_behind_lines = 100
105- )
107+ instance .wait_for_log_line ("timeout" , timeout = 60 , look_behind_lines = 100 )
106108
107109 # kafka_cluster.open_bash_shell('instance')
108110 instance .wait_for_log_line ("Committed offset 22" )
@@ -363,5 +365,3 @@ def produce():
363365 kafka_thread .join ()
364366
365367 assert result == 1 , "Messages from kafka get duplicated!"
366-
367-
0 commit comments