Skip to content

Commit cb5f882

Browse files
committed
kafkatest: add new command line arguments
1 parent 18e1e73 commit cb5f882

File tree

2 files changed

+14
-1
lines changed

2 files changed

+14
-1
lines changed

confluent_kafka/kafkatest/verifiable_consumer.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,9 @@ def to_dict(self):
239239
parser = argparse.ArgumentParser(description='Verifiable Python Consumer')
240240
parser.add_argument('--topic', action='append', type=str, required=True)
241241
parser.add_argument('--group-id', dest='conf_group.id', required=True)
242+
parser.add_argument('--group-instance-id', dest='conf_group.instance.id')
242243
parser.add_argument('--broker-list', dest='conf_bootstrap.servers', required=True)
244+
parser.add_argument('--bootstrap-server', dest='conf_bootstrap.servers')
243245
parser.add_argument('--session-timeout', type=int, dest='conf_session.timeout.ms', default=6000)
244246
parser.add_argument('--enable-autocommit', action='store_true', dest='conf_enable.auto.commit', default=False)
245247
parser.add_argument('--max-messages', type=int, dest='max_messages', default=-1)

confluent_kafka/kafkatest/verifiable_producer.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,12 @@ def dr_cb(self, err, msg):
6464
parser.add_argument('--topic', type=str, required=True)
6565
parser.add_argument('--throughput', type=int, default=0)
6666
parser.add_argument('--broker-list', dest='conf_bootstrap.servers', required=True)
67+
parser.add_argument('--bootstrap-server', dest='conf_bootstrap.servers')
6768
parser.add_argument('--max-messages', type=int, dest='max_msgs', default=1000000) # avoid infinite
6869
parser.add_argument('--value-prefix', dest='value_prefix', type=str, default=None)
6970
parser.add_argument('--acks', type=int, dest='topicconf_request.required.acks', default=-1)
7071
parser.add_argument('--message-create-time', type=int, dest='create_time', default=0)
72+
parser.add_argument('--repeating-keys', type=int, dest='repeating_keys', default=0)
7173
parser.add_argument('--producer.config', dest='producer_config')
7274
parser.add_argument('-X', nargs=1, dest='extra_conf', action='append', help='Configuration property', default=[])
7375
args = vars(parser.parse_args())
@@ -92,6 +94,9 @@ def dr_cb(self, err, msg):
9294
else:
9395
value_fmt = '%d'
9496

97+
repeating_keys = args['repeating_keys']
98+
key_counter = 0
99+
95100
if throughput > 0:
96101
delay = 1.0/throughput
97102
else:
@@ -106,8 +111,14 @@ def dr_cb(self, err, msg):
106111

107112
t_end = time.time() + delay
108113
while vp.run:
114+
if repeating_keys != 0:
115+
key = '%d' % key_counter
116+
key_counter = (key_counter + 1) % repeating_keys
117+
else:
118+
key = None
119+
109120
try:
110-
vp.producer.produce(topic, value=(value_fmt % i),
121+
vp.producer.produce(topic, value=(value_fmt % i), key=key,
111122
timestamp=args.get('create_time', 0))
112123
vp.num_sent += 1
113124
except KafkaException as e:

0 commit comments

Comments
 (0)