@@ -33,7 +33,7 @@ class ShareConsumerPerformanceService(PerformanceService):
33
33
34
34
"socket-buffer-size", "The size of the tcp RECV size."
35
35
36
- "consumer. config", "Consumer config properties file."
36
+ "command- config", "Config properties file."
37
37
"""
38
38
39
39
# Root directory for persistent output
@@ -73,16 +73,20 @@ def __init__(self, context, num_nodes, kafka, topic, messages, group="perf-share
73
73
for node in self .nodes :
74
74
node .version = version
75
75
76
- def args (self ):
76
+ def args (self , version ):
77
77
"""Dictionary of arguments used to start the Share Consumer Performance script."""
78
78
args = {
79
79
'topic' : self .topic ,
80
- 'messages' : self .messages ,
81
80
'bootstrap-server' : self .kafka .bootstrap_servers (self .security_config .security_protocol ),
82
81
'group' : self .group ,
83
82
'timeout' : self .timeout
84
83
}
85
84
85
+ if version .supports_command_config ():
86
+ args ['num-records' ] = self .messages
87
+ else :
88
+ args ['messages' ] = self .messages
89
+
86
90
if self .fetch_size is not None :
87
91
args ['fetch-size' ] = self .fetch_size
88
92
@@ -97,10 +101,13 @@ def start_cmd(self, node):
97
101
cmd += " export KAFKA_OPTS=%s;" % self .security_config .kafka_opts
98
102
cmd += " export KAFKA_LOG4J_OPTS=\" %s%s\" ;" % (get_log4j_config_param (node ), get_log4j_config_for_tools (node ))
99
103
cmd += " %s" % self .path .script ("kafka-share-consumer-perf-test.sh" , node )
100
- for key , value in self .args ().items ():
104
+ for key , value in self .args (node . version ).items ():
101
105
cmd += " --%s %s" % (key , value )
102
106
103
- cmd += " --consumer.config %s" % ShareConsumerPerformanceService .CONFIG_FILE
107
+ if node .version .supports_command_config ():
108
+ cmd += " --command-config %s" % ShareConsumerPerformanceService .CONFIG_FILE
109
+ else :
110
+ cmd += " --consumer.config %s" % ShareConsumerPerformanceService .CONFIG_FILE
104
111
105
112
for key , value in self .settings .items ():
106
113
cmd += " %s=%s" % (str (key ), str (value ))
0 commit comments