@@ -30,6 +30,7 @@ public class AggregateTopics
3030 private Logger logger = Logger .getLogger (this .getClass ().getPackageName ());
3131 private String kafka_servers = "localhost:9092" ;
3232 private String config = "Accelerator" ;
33+ private String kafka_properties = "" ;
3334 private final String longTerm ;
3435 private boolean createTopic = false ;
3536 private final List <String > topics ;
@@ -43,7 +44,7 @@ private AggregateTopics(String[] args)
4344 if (createTopic )
4445 {
4546 logger .info ("Discovering and creating topics in " + topics .toString ());
46- CreateTopics .discoverAndCreateTopics (kafka_servers , false , List .of (longTerm ));
47+ CreateTopics .discoverAndCreateTopics (kafka_servers , false , List .of (longTerm ), kafka_properties );
4748 }
4849
4950 logger .info ("server:\" " + kafka_servers + "\" , config: \" " + config + "\" " );
@@ -84,6 +85,7 @@ private void help()
8485 System .out .println ("\t -server server_name: Allows specification of server address.\n \t \t Default is \" localhost:9092\" ." );
8586 System .out .println ("\t -confg config_name: Allows specification of config name.\n \t \t Default is \" Accelerator\" ." );
8687 System .out .println ("\t -create : Discovers if the config + \" LongTerm\" topic already exists. If it does not, it creates it." );
88+ System .out .println ("\t -kafka_properties kafka_properties_file : File to load kafka-client properties from." );
8789 System .exit (0 );
8890 }
8991
@@ -129,6 +131,18 @@ else if (arg.equals("-config"))
129131 throw new Exception ("'-config' must be followed by a config name." );
130132 }
131133 }
134+ else if (arg .equals ("-kafka_properties" ))
135+ {
136+ String next ;
137+ if (token .hasNext () && ! (next = token .next ()).startsWith ("-" ))
138+ {
139+ kafka_properties = next ;
140+ }
141+ else
142+ {
143+ throw new Exception ("'-kafka_properties' must be followed by a file name." );
144+ }
145+ }
132146 else
133147 {
134148 throw new Exception ("Unknown argument '" + arg + "'." );
@@ -154,7 +168,8 @@ private KafkaStreams createStream()
154168 {
155169 KafkaStreams stream = KafkaHelper .aggregateTopics (kafka_servers ,
156170 topics ,
157- longTerm );
171+ longTerm ,
172+ kafka_properties );
158173
159174 // Log any uncaught exceptions.
160175 stream .setUncaughtExceptionHandler ((Thread thread , Throwable throwable ) -> {
0 commit comments