@@ -53,8 +53,7 @@ public class KafkaProperties {
53
53
* Comma-delimited list of host:port pairs to use for establishing the initial
54
54
* connection to the Kafka cluster.
55
55
*/
56
- private List <String > bootstrapServers = new ArrayList <String >(
57
- Collections .singletonList ("localhost:9092" ));
56
+ private List <String > bootstrapServers = new ArrayList <String >(Collections .singletonList ("localhost:9092" ));
58
57
59
58
/**
60
59
* Id to pass to the server when making requests; used for server-side logging.
@@ -123,8 +122,7 @@ public Template getTemplate() {
123
122
private Map <String , Object > buildCommonProperties () {
124
123
Map <String , Object > properties = new HashMap <String , Object >();
125
124
if (this .bootstrapServers != null ) {
126
- properties .put (CommonClientConfigs .BOOTSTRAP_SERVERS_CONFIG ,
127
- this .bootstrapServers );
125
+ properties .put (CommonClientConfigs .BOOTSTRAP_SERVERS_CONFIG , this .bootstrapServers );
128
126
}
129
127
if (this .clientId != null ) {
130
128
properties .put (CommonClientConfigs .CLIENT_ID_CONFIG , this .clientId );
@@ -133,20 +131,16 @@ private Map<String, Object> buildCommonProperties() {
133
131
properties .put (SslConfigs .SSL_KEY_PASSWORD_CONFIG , this .ssl .getKeyPassword ());
134
132
}
135
133
if (this .ssl .getKeystoreLocation () != null ) {
136
- properties .put (SslConfigs .SSL_KEYSTORE_LOCATION_CONFIG ,
137
- resourceToPath (this .ssl .getKeystoreLocation ()));
134
+ properties .put (SslConfigs .SSL_KEYSTORE_LOCATION_CONFIG , resourceToPath (this .ssl .getKeystoreLocation ()));
138
135
}
139
136
if (this .ssl .getKeystorePassword () != null ) {
140
- properties .put (SslConfigs .SSL_KEYSTORE_PASSWORD_CONFIG ,
141
- this .ssl .getKeystorePassword ());
137
+ properties .put (SslConfigs .SSL_KEYSTORE_PASSWORD_CONFIG , this .ssl .getKeystorePassword ());
142
138
}
143
139
if (this .ssl .getTruststoreLocation () != null ) {
144
- properties .put (SslConfigs .SSL_TRUSTSTORE_LOCATION_CONFIG ,
145
- resourceToPath (this .ssl .getTruststoreLocation ()));
140
+ properties .put (SslConfigs .SSL_TRUSTSTORE_LOCATION_CONFIG , resourceToPath (this .ssl .getTruststoreLocation ()));
146
141
}
147
142
if (this .ssl .getTruststorePassword () != null ) {
148
- properties .put (SslConfigs .SSL_TRUSTSTORE_PASSWORD_CONFIG ,
149
- this .ssl .getTruststorePassword ());
143
+ properties .put (SslConfigs .SSL_TRUSTSTORE_PASSWORD_CONFIG , this .ssl .getTruststorePassword ());
150
144
}
151
145
if (!CollectionUtils .isEmpty (this .properties )) {
152
146
properties .putAll (this .properties );
@@ -159,8 +153,9 @@ private Map<String, Object> buildCommonProperties() {
159
153
* <p>
160
154
* This allows you to add additional properties, if necessary, and override the
161
155
* default kafkaConsumerFactory bean.
162
- * @return the consumer properties initialized with the customizations defined on this
163
- * instance
156
+ *
157
+ * @return the consumer properties initialized with the customizations defined
158
+ * on this instance
164
159
*/
165
160
public Map <String , Object > buildConsumerProperties () {
166
161
Map <String , Object > properties = buildCommonProperties ();
@@ -173,8 +168,9 @@ public Map<String, Object> buildConsumerProperties() {
173
168
* <p>
174
169
* This allows you to add additional properties, if necessary, and override the
175
170
* default kafkaProducerFactory bean.
176
- * @return the producer properties initialized with the customizations defined on this
177
- * instance
171
+ *
172
+ * @return the producer properties initialized with the customizations defined
173
+ * on this instance
178
174
*/
179
175
public Map <String , Object > buildProducerProperties () {
180
176
Map <String , Object > properties = buildCommonProperties ();
@@ -185,10 +181,8 @@ public Map<String, Object> buildProducerProperties() {
185
181
private static String resourceToPath (Resource resource ) {
186
182
try {
187
183
return resource .getFile ().getAbsolutePath ();
188
- }
189
- catch (IOException ex ) {
190
- throw new IllegalStateException (
191
- "Resource '" + resource + "' must be on a file system" , ex );
184
+ } catch (IOException ex ) {
185
+ throw new IllegalStateException ("Resource '" + resource + "' must be on a file system" , ex );
192
186
}
193
187
}
194
188
@@ -197,8 +191,8 @@ public static class Consumer {
197
191
private final Ssl ssl = new Ssl ();
198
192
199
193
/**
200
- * Frequency in milliseconds that the consumer offsets are auto-committed to Kafka
201
- * if 'enable.auto.commit' true.
194
+ * Frequency in milliseconds that the consumer offsets are auto-committed to
195
+ * Kafka if 'enable.auto.commit' true.
202
196
*/
203
197
private Integer autoCommitInterval ;
204
198
@@ -220,7 +214,8 @@ public static class Consumer {
220
214
private String clientId ;
221
215
222
216
/**
223
- * If true the consumer's offset will be periodically committed in the background.
217
+ * If true the consumer's offset will be periodically committed in the
218
+ * background.
224
219
*/
225
220
private Boolean enableAutoCommit ;
226
221
@@ -364,27 +359,22 @@ public void setMaxPollRecords(Integer maxPollRecords) {
364
359
public Map <String , Object > buildProperties () {
365
360
Map <String , Object > properties = new HashMap <String , Object >();
366
361
if (this .autoCommitInterval != null ) {
367
- properties .put (ConsumerConfig .AUTO_COMMIT_INTERVAL_MS_CONFIG ,
368
- this .autoCommitInterval );
362
+ properties .put (ConsumerConfig .AUTO_COMMIT_INTERVAL_MS_CONFIG , this .autoCommitInterval );
369
363
}
370
364
if (this .autoOffsetReset != null ) {
371
- properties .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG ,
372
- this .autoOffsetReset );
365
+ properties .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , this .autoOffsetReset );
373
366
}
374
367
if (this .bootstrapServers != null ) {
375
- properties .put (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG ,
376
- this .bootstrapServers );
368
+ properties .put (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , this .bootstrapServers );
377
369
}
378
370
if (this .clientId != null ) {
379
371
properties .put (ConsumerConfig .CLIENT_ID_CONFIG , this .clientId );
380
372
}
381
373
if (this .enableAutoCommit != null ) {
382
- properties .put (ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG ,
383
- this .enableAutoCommit );
374
+ properties .put (ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG , this .enableAutoCommit );
384
375
}
385
376
if (this .fetchMaxWait != null ) {
386
- properties .put (ConsumerConfig .FETCH_MAX_WAIT_MS_CONFIG ,
387
- this .fetchMaxWait );
377
+ properties .put (ConsumerConfig .FETCH_MAX_WAIT_MS_CONFIG , this .fetchMaxWait );
388
378
}
389
379
if (this .fetchMinSize != null ) {
390
380
properties .put (ConsumerConfig .FETCH_MIN_BYTES_CONFIG , this .fetchMinSize );
@@ -393,40 +383,32 @@ public Map<String, Object> buildProperties() {
393
383
properties .put (ConsumerConfig .GROUP_ID_CONFIG , this .groupId );
394
384
}
395
385
if (this .heartbeatInterval != null ) {
396
- properties .put (ConsumerConfig .HEARTBEAT_INTERVAL_MS_CONFIG ,
397
- this .heartbeatInterval );
386
+ properties .put (ConsumerConfig .HEARTBEAT_INTERVAL_MS_CONFIG , this .heartbeatInterval );
398
387
}
399
388
if (this .keyDeserializer != null ) {
400
- properties .put (ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG ,
401
- this .keyDeserializer );
389
+ properties .put (ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , this .keyDeserializer );
402
390
}
403
391
if (this .ssl .getKeyPassword () != null ) {
404
- properties .put (SslConfigs .SSL_KEY_PASSWORD_CONFIG ,
405
- this .ssl .getKeyPassword ());
392
+ properties .put (SslConfigs .SSL_KEY_PASSWORD_CONFIG , this .ssl .getKeyPassword ());
406
393
}
407
394
if (this .ssl .getKeystoreLocation () != null ) {
408
- properties .put (SslConfigs .SSL_KEYSTORE_LOCATION_CONFIG ,
409
- resourceToPath (this .ssl .getKeystoreLocation ()));
395
+ properties .put (SslConfigs .SSL_KEYSTORE_LOCATION_CONFIG , resourceToPath (this .ssl .getKeystoreLocation ()));
410
396
}
411
397
if (this .ssl .getKeystorePassword () != null ) {
412
- properties .put (SslConfigs .SSL_KEYSTORE_PASSWORD_CONFIG ,
413
- this .ssl .getKeystorePassword ());
398
+ properties .put (SslConfigs .SSL_KEYSTORE_PASSWORD_CONFIG , this .ssl .getKeystorePassword ());
414
399
}
415
400
if (this .ssl .getTruststoreLocation () != null ) {
416
401
properties .put (SslConfigs .SSL_TRUSTSTORE_LOCATION_CONFIG ,
417
402
resourceToPath (this .ssl .getTruststoreLocation ()));
418
403
}
419
404
if (this .ssl .getTruststorePassword () != null ) {
420
- properties .put (SslConfigs .SSL_TRUSTSTORE_PASSWORD_CONFIG ,
421
- this .ssl .getTruststorePassword ());
405
+ properties .put (SslConfigs .SSL_TRUSTSTORE_PASSWORD_CONFIG , this .ssl .getTruststorePassword ());
422
406
}
423
407
if (this .valueDeserializer != null ) {
424
- properties .put (ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG ,
425
- this .valueDeserializer );
408
+ properties .put (ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , this .valueDeserializer );
426
409
}
427
410
if (this .maxPollRecords != null ) {
428
- properties .put (ConsumerConfig .MAX_POLL_RECORDS_CONFIG ,
429
- this .maxPollRecords );
411
+ properties .put (ConsumerConfig .MAX_POLL_RECORDS_CONFIG , this .maxPollRecords );
430
412
}
431
413
return properties ;
432
414
}
@@ -444,8 +426,9 @@ public static class Producer {
444
426
private String acks ;
445
427
446
428
/**
447
- * Default batch size in bytes. A small batch size will make batching less common
448
- * and may reduce throughput (a batch size of zero disables batching entirely).
429
+ * Default batch size in bytes. A small batch size will make batching less
430
+ * common and may reduce throughput (a batch size of zero disables batching
431
+ * entirely).
449
432
*/
450
433
private Integer batchSize ;
451
434
@@ -456,8 +439,8 @@ public static class Producer {
456
439
private List <String > bootstrapServers ;
457
440
458
441
/**
459
- * Total bytes of memory the producer can use to buffer records waiting to be sent
460
- * to the server.
442
+ * Total bytes of memory the producer can use to buffer records waiting to be
443
+ * sent to the server.
461
444
*/
462
445
private Long bufferMemory ;
463
446
@@ -571,8 +554,7 @@ public Map<String, Object> buildProperties() {
571
554
properties .put (ProducerConfig .BATCH_SIZE_CONFIG , this .batchSize );
572
555
}
573
556
if (this .bootstrapServers != null ) {
574
- properties .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG ,
575
- this .bootstrapServers );
557
+ properties .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , this .bootstrapServers );
576
558
}
577
559
if (this .bufferMemory != null ) {
578
560
properties .put (ProducerConfig .BUFFER_MEMORY_CONFIG , this .bufferMemory );
@@ -581,39 +563,32 @@ public Map<String, Object> buildProperties() {
581
563
properties .put (ProducerConfig .CLIENT_ID_CONFIG , this .clientId );
582
564
}
583
565
if (this .compressionType != null ) {
584
- properties .put (ProducerConfig .COMPRESSION_TYPE_CONFIG ,
585
- this .compressionType );
566
+ properties .put (ProducerConfig .COMPRESSION_TYPE_CONFIG , this .compressionType );
586
567
}
587
568
if (this .keySerializer != null ) {
588
- properties .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG ,
589
- this .keySerializer );
569
+ properties .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , this .keySerializer );
590
570
}
591
571
if (this .retries != null ) {
592
572
properties .put (ProducerConfig .RETRIES_CONFIG , this .retries );
593
573
}
594
574
if (this .ssl .getKeyPassword () != null ) {
595
- properties .put (SslConfigs .SSL_KEY_PASSWORD_CONFIG ,
596
- this .ssl .getKeyPassword ());
575
+ properties .put (SslConfigs .SSL_KEY_PASSWORD_CONFIG , this .ssl .getKeyPassword ());
597
576
}
598
577
if (this .ssl .getKeystoreLocation () != null ) {
599
- properties .put (SslConfigs .SSL_KEYSTORE_LOCATION_CONFIG ,
600
- resourceToPath (this .ssl .getKeystoreLocation ()));
578
+ properties .put (SslConfigs .SSL_KEYSTORE_LOCATION_CONFIG , resourceToPath (this .ssl .getKeystoreLocation ()));
601
579
}
602
580
if (this .ssl .getKeystorePassword () != null ) {
603
- properties .put (SslConfigs .SSL_KEYSTORE_PASSWORD_CONFIG ,
604
- this .ssl .getKeystorePassword ());
581
+ properties .put (SslConfigs .SSL_KEYSTORE_PASSWORD_CONFIG , this .ssl .getKeystorePassword ());
605
582
}
606
583
if (this .ssl .getTruststoreLocation () != null ) {
607
584
properties .put (SslConfigs .SSL_TRUSTSTORE_LOCATION_CONFIG ,
608
585
resourceToPath (this .ssl .getTruststoreLocation ()));
609
586
}
610
587
if (this .ssl .getTruststorePassword () != null ) {
611
- properties .put (SslConfigs .SSL_TRUSTSTORE_PASSWORD_CONFIG ,
612
- this .ssl .getTruststorePassword ());
588
+ properties .put (SslConfigs .SSL_TRUSTSTORE_PASSWORD_CONFIG , this .ssl .getTruststorePassword ());
613
589
}
614
590
if (this .valueSerializer != null ) {
615
- properties .put (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG ,
616
- this .valueSerializer );
591
+ properties .put (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , this .valueSerializer );
617
592
}
618
593
return properties ;
619
594
}
0 commit comments