@@ -53,7 +53,8 @@ public class KafkaProperties {
53
53
* Comma-delimited list of host:port pairs to use for establishing the initial
54
54
* connection to the Kafka cluster.
55
55
*/
56
- private List <String > bootstrapServers = new ArrayList <String >(Collections .singletonList ("localhost:9092" ));
56
+ private List <String > bootstrapServers = new ArrayList <String >(
57
+ Collections .singletonList ("localhost:9092" ));
57
58
58
59
/**
59
60
* Id to pass to the server when making requests; used for server-side logging.
@@ -122,7 +123,8 @@ public Template getTemplate() {
122
123
private Map <String , Object > buildCommonProperties () {
123
124
Map <String , Object > properties = new HashMap <String , Object >();
124
125
if (this .bootstrapServers != null ) {
125
- properties .put (CommonClientConfigs .BOOTSTRAP_SERVERS_CONFIG , this .bootstrapServers );
126
+ properties .put (CommonClientConfigs .BOOTSTRAP_SERVERS_CONFIG ,
127
+ this .bootstrapServers );
126
128
}
127
129
if (this .clientId != null ) {
128
130
properties .put (CommonClientConfigs .CLIENT_ID_CONFIG , this .clientId );
@@ -131,16 +133,20 @@ private Map<String, Object> buildCommonProperties() {
131
133
properties .put (SslConfigs .SSL_KEY_PASSWORD_CONFIG , this .ssl .getKeyPassword ());
132
134
}
133
135
if (this .ssl .getKeystoreLocation () != null ) {
134
- properties .put (SslConfigs .SSL_KEYSTORE_LOCATION_CONFIG , resourceToPath (this .ssl .getKeystoreLocation ()));
136
+ properties .put (SslConfigs .SSL_KEYSTORE_LOCATION_CONFIG ,
137
+ resourceToPath (this .ssl .getKeystoreLocation ()));
135
138
}
136
139
if (this .ssl .getKeystorePassword () != null ) {
137
- properties .put (SslConfigs .SSL_KEYSTORE_PASSWORD_CONFIG , this .ssl .getKeystorePassword ());
140
+ properties .put (SslConfigs .SSL_KEYSTORE_PASSWORD_CONFIG ,
141
+ this .ssl .getKeystorePassword ());
138
142
}
139
143
if (this .ssl .getTruststoreLocation () != null ) {
140
- properties .put (SslConfigs .SSL_TRUSTSTORE_LOCATION_CONFIG , resourceToPath (this .ssl .getTruststoreLocation ()));
144
+ properties .put (SslConfigs .SSL_TRUSTSTORE_LOCATION_CONFIG ,
145
+ resourceToPath (this .ssl .getTruststoreLocation ()));
141
146
}
142
147
if (this .ssl .getTruststorePassword () != null ) {
143
- properties .put (SslConfigs .SSL_TRUSTSTORE_PASSWORD_CONFIG , this .ssl .getTruststorePassword ());
148
+ properties .put (SslConfigs .SSL_TRUSTSTORE_PASSWORD_CONFIG ,
149
+ this .ssl .getTruststorePassword ());
144
150
}
145
151
if (!CollectionUtils .isEmpty (this .properties )) {
146
152
properties .putAll (this .properties );
@@ -153,8 +159,8 @@ private Map<String, Object> buildCommonProperties() {
153
159
* <p>
154
160
* This allows you to add additional properties, if necessary, and override the
155
161
* default kafkaConsumerFactory bean.
156
- * @return the consumer properties initialized with the customizations defined
157
- * on this instance
162
+ * @return the consumer properties initialized with the customizations defined on this
163
+ * instance
158
164
*/
159
165
public Map <String , Object > buildConsumerProperties () {
160
166
Map <String , Object > properties = buildCommonProperties ();
@@ -167,8 +173,8 @@ public Map<String, Object> buildConsumerProperties() {
167
173
* <p>
168
174
* This allows you to add additional properties, if necessary, and override the
169
175
* default kafkaProducerFactory bean.
170
- * @return the producer properties initialized with the customizations defined
171
- * on this instance
176
+ * @return the producer properties initialized with the customizations defined on this
177
+ * instance
172
178
*/
173
179
public Map <String , Object > buildProducerProperties () {
174
180
Map <String , Object > properties = buildCommonProperties ();
@@ -181,7 +187,8 @@ private static String resourceToPath(Resource resource) {
181
187
return resource .getFile ().getAbsolutePath ();
182
188
}
183
189
catch (IOException ex ) {
184
- throw new IllegalStateException ("Resource '" + resource + "' must be on a file system" , ex );
190
+ throw new IllegalStateException (
191
+ "Resource '" + resource + "' must be on a file system" , ex );
185
192
}
186
193
}
187
194
@@ -190,8 +197,8 @@ public static class Consumer {
190
197
private final Ssl ssl = new Ssl ();
191
198
192
199
/**
193
- * Frequency in milliseconds that the consumer offsets are auto-committed to
194
- * Kafka if 'enable.auto.commit' true.
200
+ * Frequency in milliseconds that the consumer offsets are auto-committed to Kafka
201
+ * if 'enable.auto.commit' true.
195
202
*/
196
203
private Integer autoCommitInterval ;
197
204
@@ -213,8 +220,7 @@ public static class Consumer {
213
220
private String clientId ;
214
221
215
222
/**
216
- * If true the consumer's offset will be periodically committed in the
217
- * background.
223
+ * If true the consumer's offset will be periodically committed in the background.
218
224
*/
219
225
private Boolean enableAutoCommit ;
220
226
@@ -358,22 +364,27 @@ public void setMaxPollRecords(Integer maxPollRecords) {
358
364
public Map <String , Object > buildProperties () {
359
365
Map <String , Object > properties = new HashMap <String , Object >();
360
366
if (this .autoCommitInterval != null ) {
361
- properties .put (ConsumerConfig .AUTO_COMMIT_INTERVAL_MS_CONFIG , this .autoCommitInterval );
367
+ properties .put (ConsumerConfig .AUTO_COMMIT_INTERVAL_MS_CONFIG ,
368
+ this .autoCommitInterval );
362
369
}
363
370
if (this .autoOffsetReset != null ) {
364
- properties .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , this .autoOffsetReset );
371
+ properties .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG ,
372
+ this .autoOffsetReset );
365
373
}
366
374
if (this .bootstrapServers != null ) {
367
- properties .put (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , this .bootstrapServers );
375
+ properties .put (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG ,
376
+ this .bootstrapServers );
368
377
}
369
378
if (this .clientId != null ) {
370
379
properties .put (ConsumerConfig .CLIENT_ID_CONFIG , this .clientId );
371
380
}
372
381
if (this .enableAutoCommit != null ) {
373
- properties .put (ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG , this .enableAutoCommit );
382
+ properties .put (ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG ,
383
+ this .enableAutoCommit );
374
384
}
375
385
if (this .fetchMaxWait != null ) {
376
- properties .put (ConsumerConfig .FETCH_MAX_WAIT_MS_CONFIG , this .fetchMaxWait );
386
+ properties .put (ConsumerConfig .FETCH_MAX_WAIT_MS_CONFIG ,
387
+ this .fetchMaxWait );
377
388
}
378
389
if (this .fetchMinSize != null ) {
379
390
properties .put (ConsumerConfig .FETCH_MIN_BYTES_CONFIG , this .fetchMinSize );
@@ -382,32 +393,40 @@ public Map<String, Object> buildProperties() {
382
393
properties .put (ConsumerConfig .GROUP_ID_CONFIG , this .groupId );
383
394
}
384
395
if (this .heartbeatInterval != null ) {
385
- properties .put (ConsumerConfig .HEARTBEAT_INTERVAL_MS_CONFIG , this .heartbeatInterval );
396
+ properties .put (ConsumerConfig .HEARTBEAT_INTERVAL_MS_CONFIG ,
397
+ this .heartbeatInterval );
386
398
}
387
399
if (this .keyDeserializer != null ) {
388
- properties .put (ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , this .keyDeserializer );
400
+ properties .put (ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG ,
401
+ this .keyDeserializer );
389
402
}
390
403
if (this .ssl .getKeyPassword () != null ) {
391
- properties .put (SslConfigs .SSL_KEY_PASSWORD_CONFIG , this .ssl .getKeyPassword ());
404
+ properties .put (SslConfigs .SSL_KEY_PASSWORD_CONFIG ,
405
+ this .ssl .getKeyPassword ());
392
406
}
393
407
if (this .ssl .getKeystoreLocation () != null ) {
394
- properties .put (SslConfigs .SSL_KEYSTORE_LOCATION_CONFIG , resourceToPath (this .ssl .getKeystoreLocation ()));
408
+ properties .put (SslConfigs .SSL_KEYSTORE_LOCATION_CONFIG ,
409
+ resourceToPath (this .ssl .getKeystoreLocation ()));
395
410
}
396
411
if (this .ssl .getKeystorePassword () != null ) {
397
- properties .put (SslConfigs .SSL_KEYSTORE_PASSWORD_CONFIG , this .ssl .getKeystorePassword ());
412
+ properties .put (SslConfigs .SSL_KEYSTORE_PASSWORD_CONFIG ,
413
+ this .ssl .getKeystorePassword ());
398
414
}
399
415
if (this .ssl .getTruststoreLocation () != null ) {
400
416
properties .put (SslConfigs .SSL_TRUSTSTORE_LOCATION_CONFIG ,
401
417
resourceToPath (this .ssl .getTruststoreLocation ()));
402
418
}
403
419
if (this .ssl .getTruststorePassword () != null ) {
404
- properties .put (SslConfigs .SSL_TRUSTSTORE_PASSWORD_CONFIG , this .ssl .getTruststorePassword ());
420
+ properties .put (SslConfigs .SSL_TRUSTSTORE_PASSWORD_CONFIG ,
421
+ this .ssl .getTruststorePassword ());
405
422
}
406
423
if (this .valueDeserializer != null ) {
407
- properties .put (ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , this .valueDeserializer );
424
+ properties .put (ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG ,
425
+ this .valueDeserializer );
408
426
}
409
427
if (this .maxPollRecords != null ) {
410
- properties .put (ConsumerConfig .MAX_POLL_RECORDS_CONFIG , this .maxPollRecords );
428
+ properties .put (ConsumerConfig .MAX_POLL_RECORDS_CONFIG ,
429
+ this .maxPollRecords );
411
430
}
412
431
return properties ;
413
432
}
@@ -425,9 +444,8 @@ public static class Producer {
425
444
private String acks ;
426
445
427
446
/**
428
- * Default batch size in bytes. A small batch size will make batching less
429
- * common and may reduce throughput (a batch size of zero disables batching
430
- * entirely).
447
+ * Default batch size in bytes. A small batch size will make batching less common
448
+ * and may reduce throughput (a batch size of zero disables batching entirely).
431
449
*/
432
450
private Integer batchSize ;
433
451
@@ -438,8 +456,8 @@ public static class Producer {
438
456
private List <String > bootstrapServers ;
439
457
440
458
/**
441
- * Total bytes of memory the producer can use to buffer records waiting to be
442
- * sent to the server.
459
+ * Total bytes of memory the producer can use to buffer records waiting to be sent
460
+ * to the server.
443
461
*/
444
462
private Long bufferMemory ;
445
463
@@ -553,7 +571,8 @@ public Map<String, Object> buildProperties() {
553
571
properties .put (ProducerConfig .BATCH_SIZE_CONFIG , this .batchSize );
554
572
}
555
573
if (this .bootstrapServers != null ) {
556
- properties .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , this .bootstrapServers );
574
+ properties .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG ,
575
+ this .bootstrapServers );
557
576
}
558
577
if (this .bufferMemory != null ) {
559
578
properties .put (ProducerConfig .BUFFER_MEMORY_CONFIG , this .bufferMemory );
@@ -562,32 +581,39 @@ public Map<String, Object> buildProperties() {
562
581
properties .put (ProducerConfig .CLIENT_ID_CONFIG , this .clientId );
563
582
}
564
583
if (this .compressionType != null ) {
565
- properties .put (ProducerConfig .COMPRESSION_TYPE_CONFIG , this .compressionType );
584
+ properties .put (ProducerConfig .COMPRESSION_TYPE_CONFIG ,
585
+ this .compressionType );
566
586
}
567
587
if (this .keySerializer != null ) {
568
- properties .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , this .keySerializer );
588
+ properties .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG ,
589
+ this .keySerializer );
569
590
}
570
591
if (this .retries != null ) {
571
592
properties .put (ProducerConfig .RETRIES_CONFIG , this .retries );
572
593
}
573
594
if (this .ssl .getKeyPassword () != null ) {
574
- properties .put (SslConfigs .SSL_KEY_PASSWORD_CONFIG , this .ssl .getKeyPassword ());
595
+ properties .put (SslConfigs .SSL_KEY_PASSWORD_CONFIG ,
596
+ this .ssl .getKeyPassword ());
575
597
}
576
598
if (this .ssl .getKeystoreLocation () != null ) {
577
- properties .put (SslConfigs .SSL_KEYSTORE_LOCATION_CONFIG , resourceToPath (this .ssl .getKeystoreLocation ()));
599
+ properties .put (SslConfigs .SSL_KEYSTORE_LOCATION_CONFIG ,
600
+ resourceToPath (this .ssl .getKeystoreLocation ()));
578
601
}
579
602
if (this .ssl .getKeystorePassword () != null ) {
580
- properties .put (SslConfigs .SSL_KEYSTORE_PASSWORD_CONFIG , this .ssl .getKeystorePassword ());
603
+ properties .put (SslConfigs .SSL_KEYSTORE_PASSWORD_CONFIG ,
604
+ this .ssl .getKeystorePassword ());
581
605
}
582
606
if (this .ssl .getTruststoreLocation () != null ) {
583
607
properties .put (SslConfigs .SSL_TRUSTSTORE_LOCATION_CONFIG ,
584
608
resourceToPath (this .ssl .getTruststoreLocation ()));
585
609
}
586
610
if (this .ssl .getTruststorePassword () != null ) {
587
- properties .put (SslConfigs .SSL_TRUSTSTORE_PASSWORD_CONFIG , this .ssl .getTruststorePassword ());
611
+ properties .put (SslConfigs .SSL_TRUSTSTORE_PASSWORD_CONFIG ,
612
+ this .ssl .getTruststorePassword ());
588
613
}
589
614
if (this .valueSerializer != null ) {
590
- properties .put (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , this .valueSerializer );
615
+ properties .put (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG ,
616
+ this .valueSerializer );
591
617
}
592
618
return properties ;
593
619
}
0 commit comments