Skip to content

Commit 3aaa5f2

Browse files
committed
split_kafka_tests: works
1 parent cebeaea commit 3aaa5f2

16 files changed

+2048
-1259
lines changed

tests/integration/test_storage_kafka/common.py renamed to tests/integration/helpers/kafka/common.py

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,5 @@
1-
import io
2-
import logging
3-
import os.path as p
4-
import random
5-
import socket
6-
import string
7-
import time
8-
from contextlib import contextmanager
9-
10-
import avro.datafile
11-
import avro.io
12-
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
13-
from google.protobuf.internal.encoder import _VarintBytes
14-
from kafka import BrokerConnection, KafkaAdminClient, KafkaConsumer, KafkaProducer
15-
from kafka.admin import NewTopic
16-
from kafka.protocol.admin import DescribeGroupsRequest_v1
17-
from kafka.protocol.group import MemberAssignment
18-
19-
from helpers.client import QueryRuntimeException
20-
from helpers.test_tools import TSV
21-
22-
from . import kafka_pb2, social_pb2
23-
24-
KAFKA_TOPIC_OLD = "old_t"
25-
KAFKA_CONSUMER_GROUP_OLD = "old_cg"
26-
KAFKA_TOPIC_NEW = "new_t"
27-
KAFKA_CONSUMER_GROUP_NEW = "new_cg"
28-
1+
from helpers.kafka.common_direct import *
2+
from helpers.kafka.common_direct import _VarintBytes
293

304
def get_kafka_producer(port, serializer, retries):
315
errors = []
@@ -490,3 +464,8 @@ def describe_consumer_group(kafka_cluster, name):
490464
member_info["assignment"] = member_topics_assignment
491465
res.append(member_info)
492466
return res
467+
468+
KAFKA_TOPIC_OLD = "old_t"
469+
KAFKA_CONSUMER_GROUP_OLD = "old_cg"
470+
KAFKA_TOPIC_NEW = "new_t"
471+
KAFKA_CONSUMER_GROUP_NEW = "new_cg"
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import json
2+
import io
3+
import logging
4+
import math
5+
import os.path as p
6+
import random
7+
import socket
8+
import string
9+
import threading
10+
11+
import time
12+
from contextlib import contextmanager
13+
14+
import pytest
15+
16+
import avro.datafile
17+
import avro.io
18+
import avro.schema
19+
from confluent_kafka.avro.cached_schema_registry_client import (
20+
CachedSchemaRegistryClient,
21+
)
22+
23+
import kafka.errors
24+
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
25+
from kafka import BrokerConnection, KafkaAdminClient, KafkaConsumer, KafkaProducer
26+
from kafka.admin import NewTopic
27+
from kafka.protocol.admin import DescribeGroupsRequest_v1
28+
from kafka.protocol.group import MemberAssignment
29+
30+
from helpers.client import QueryRuntimeException
31+
from helpers.cluster import ClickHouseCluster, is_arm
32+
from helpers.network import PartitionManager
33+
from helpers.test_tools import TSV, assert_eq_with_retry
34+
35+
from helpers.kafka import kafka_pb2, social_pb2, message_with_repeated_pb2
36+
37+
from google.protobuf.internal.encoder import _VarintBytes
38+
39+
40+
if is_arm():
41+
pytestmark = pytest.mark.skip

tests/integration/test_storage_kafka/test_bacth_slow_3.py

Lines changed: 511 additions & 0 deletions
Large diffs are not rendered by default.

tests/integration/test_storage_kafka/test_batch_fast.py

Lines changed: 7 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,16 @@
11
"""Quick tests, faster than 30 seconds"""
22

3-
import json
4-
import logging
5-
import math
6-
import random
7-
import threading
8-
import time
9-
10-
import avro.datafile
11-
import avro.io
12-
import avro.schema
13-
import kafka.errors
14-
import pytest
15-
from confluent_kafka.avro.cached_schema_registry_client import (
16-
CachedSchemaRegistryClient,
17-
)
18-
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
19-
from google.protobuf.internal.encoder import _VarintBytes
20-
from kafka import BrokerConnection, KafkaAdminClient, KafkaConsumer, KafkaProducer
21-
from kafka.admin import NewTopic
22-
from kafka.protocol.admin import DescribeGroupsRequest_v1
23-
from kafka.protocol.group import MemberAssignment
24-
25-
from helpers.client import QueryRuntimeException
26-
from helpers.cluster import ClickHouseCluster, is_arm
27-
from helpers.network import PartitionManager
28-
from helpers.test_tools import TSV, assert_eq_with_retry
3+
from helpers.kafka.common_direct import *
4+
from helpers.kafka.common_direct import _VarintBytes
5+
import helpers.kafka.common as k
296

30-
from . import common as k
31-
from . import message_with_repeated_pb2
327

338
# protoc --version
349
# libprotoc 3.0.0
3510
# # to create kafka_pb2.py
3611
# protoc --python_out=. kafka.proto
3712

3813

39-
if is_arm():
40-
pytestmark = pytest.mark.skip
41-
4214
# TODO: add test for run-time offset update in CH, if we manually update it on Kafka side.
4315
# TODO: add test for SELECT LIMIT is working.
4416

@@ -2825,17 +2797,17 @@ def test_issue26643(kafka_cluster, create_query_generator):
28252797
thread_per_consumer = k.must_use_thread_per_consumer(create_query_generator)
28262798

28272799
with k.kafka_topic(k.get_admin_client(kafka_cluster), topic_name):
2828-
msg = message_with_repeated_pb2.Message(
2800+
msg = k.message_with_repeated_pb2.Message(
28292801
tnow=1629000000,
28302802
server="server1",
28312803
clien="host1",
28322804
sPort=443,
28332805
cPort=50000,
28342806
r=[
2835-
message_with_repeated_pb2.dd(
2807+
k.message_with_repeated_pb2.dd(
28362808
name="1", type=444, ttl=123123, data=b"adsfasd"
28372809
),
2838-
message_with_repeated_pb2.dd(name="2"),
2810+
k.message_with_repeated_pb2.dd(name="2"),
28392811
],
28402812
method="GET",
28412813
)
@@ -2844,7 +2816,7 @@ def test_issue26643(kafka_cluster, create_query_generator):
28442816
serialized_msg = msg.SerializeToString()
28452817
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
28462818

2847-
msg = message_with_repeated_pb2.Message(tnow=1629000002)
2819+
msg = k.message_with_repeated_pb2.Message(tnow=1629000002)
28482820

28492821
serialized_msg = msg.SerializeToString()
28502822
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg

0 commit comments

Comments
 (0)