|
33 | 33 | import org.apache.kafka.common.serialization.StringSerializer;
|
34 | 34 |
|
35 | 35 | import org.springframework.boot.context.properties.ConfigurationProperties;
|
| 36 | +import org.springframework.boot.context.properties.PropertyMapper; |
36 | 37 | import org.springframework.boot.convert.DurationUnit;
|
37 | 38 | import org.springframework.core.io.Resource;
|
38 | 39 | import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
|
@@ -192,16 +193,6 @@ public Map<String, Object> buildAdminProperties() {
|
192 | 193 | return properties;
|
193 | 194 | }
|
194 | 195 |
|
195 |
| - private static String resourceToPath(Resource resource) { |
196 |
| - try { |
197 |
| - return resource.getFile().getAbsolutePath(); |
198 |
| - } |
199 |
| - catch (IOException ex) { |
200 |
| - throw new IllegalStateException( |
201 |
| - "Resource '" + resource + "' must be on a file system", ex); |
202 |
| - } |
203 |
| - } |
204 |
| - |
205 | 196 | public static class Consumer {
|
206 | 197 |
|
207 | 198 | private final Ssl ssl = new Ssl();
|
@@ -382,55 +373,32 @@ public Map<String, String> getProperties() {
|
382 | 373 | }
|
383 | 374 |
|
384 | 375 | public Map<String, Object> buildProperties() {
|
385 |
| - Map<String, Object> properties = new HashMap<>(); |
386 |
| - if (this.autoCommitInterval != null) { |
387 |
| - properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, |
388 |
| - (int) this.autoCommitInterval.toMillis()); |
389 |
| - } |
390 |
| - if (this.autoOffsetReset != null) { |
391 |
| - properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, |
392 |
| - this.autoOffsetReset); |
393 |
| - } |
394 |
| - if (this.bootstrapServers != null) { |
395 |
| - properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, |
396 |
| - this.bootstrapServers); |
397 |
| - } |
398 |
| - if (this.clientId != null) { |
399 |
| - properties.put(ConsumerConfig.CLIENT_ID_CONFIG, this.clientId); |
400 |
| - } |
401 |
| - if (this.enableAutoCommit != null) { |
402 |
| - properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, |
403 |
| - this.enableAutoCommit); |
404 |
| - } |
405 |
| - if (this.fetchMaxWait != null) { |
406 |
| - properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, |
407 |
| - (int) this.fetchMaxWait.toMillis()); |
408 |
| - } |
409 |
| - if (this.fetchMinSize != null) { |
410 |
| - properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, this.fetchMinSize); |
411 |
| - } |
412 |
| - if (this.groupId != null) { |
413 |
| - properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId); |
414 |
| - } |
415 |
| - if (this.heartbeatInterval != null) { |
416 |
| - properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, |
417 |
| - (int) this.heartbeatInterval.toMillis()); |
418 |
| - } |
419 |
| - if (this.keyDeserializer != null) { |
420 |
| - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, |
421 |
| - this.keyDeserializer); |
422 |
| - } |
423 |
| - if (this.valueDeserializer != null) { |
424 |
| - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, |
425 |
| - this.valueDeserializer); |
426 |
| - } |
427 |
| - if (this.maxPollRecords != null) { |
428 |
| - properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, |
429 |
| - this.maxPollRecords); |
430 |
| - } |
431 |
| - properties.putAll(this.ssl.buildProperties()); |
432 |
| - properties.putAll(this.properties); |
433 |
| - return properties; |
| 376 | + Properties properties = new Properties(); |
| 377 | + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); |
| 378 | + map.from(this::getAutoCommitInterval).asInt(Duration::toMillis) |
| 379 | + .to(properties.in(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); |
| 380 | + map.from(this::getAutoOffsetReset) |
| 381 | + .to(properties.in(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); |
| 382 | + map.from(this::getBootstrapServers) |
| 383 | + .to(properties.in(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); |
| 384 | + map.from(this::getClientId) |
| 385 | + .to(properties.in(ConsumerConfig.CLIENT_ID_CONFIG)); |
| 386 | + map.from(this::getEnableAutoCommit) |
| 387 | + .to(properties.in(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); |
| 388 | + map.from(this::getFetchMaxWait).asInt(Duration::toMillis) |
| 389 | + .to(properties.in(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)); |
| 390 | + map.from(this::getFetchMinSize) |
| 391 | + .to(properties.in(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)); |
| 392 | + map.from(this::getGroupId).to(properties.in(ConsumerConfig.GROUP_ID_CONFIG)); |
| 393 | + map.from(this::getHeartbeatInterval).asInt(Duration::toMillis) |
| 394 | + .to(properties.in(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)); |
| 395 | + map.from(this::getKeyDeserializer) |
| 396 | + .to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); |
| 397 | + map.from(this::getValueDeserializer) |
| 398 | + .to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); |
| 399 | + map.from(this::getMaxPollRecords) |
| 400 | + .to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); |
| 401 | + return properties.with(this.ssl, this.properties); |
434 | 402 | }
|
435 | 403 |
|
436 | 404 | }
|
@@ -586,41 +554,25 @@ public Map<String, String> getProperties() {
|
586 | 554 | }
|
587 | 555 |
|
588 | 556 | public Map<String, Object> buildProperties() {
|
589 |
| - Map<String, Object> properties = new HashMap<>(); |
590 |
| - if (this.acks != null) { |
591 |
| - properties.put(ProducerConfig.ACKS_CONFIG, this.acks); |
592 |
| - } |
593 |
| - if (this.batchSize != null) { |
594 |
| - properties.put(ProducerConfig.BATCH_SIZE_CONFIG, this.batchSize); |
595 |
| - } |
596 |
| - if (this.bootstrapServers != null) { |
597 |
| - properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, |
598 |
| - this.bootstrapServers); |
599 |
| - } |
600 |
| - if (this.bufferMemory != null) { |
601 |
| - properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, this.bufferMemory); |
602 |
| - } |
603 |
| - if (this.clientId != null) { |
604 |
| - properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId); |
605 |
| - } |
606 |
| - if (this.compressionType != null) { |
607 |
| - properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, |
608 |
| - this.compressionType); |
609 |
| - } |
610 |
| - if (this.keySerializer != null) { |
611 |
| - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, |
612 |
| - this.keySerializer); |
613 |
| - } |
614 |
| - if (this.retries != null) { |
615 |
| - properties.put(ProducerConfig.RETRIES_CONFIG, this.retries); |
616 |
| - } |
617 |
| - if (this.valueSerializer != null) { |
618 |
| - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, |
619 |
| - this.valueSerializer); |
620 |
| - } |
621 |
| - properties.putAll(this.ssl.buildProperties()); |
622 |
| - properties.putAll(this.properties); |
623 |
| - return properties; |
| 557 | + Properties properties = new Properties(); |
| 558 | + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); |
| 559 | + map.from(this::getAcks).to(properties.in(ProducerConfig.ACKS_CONFIG)); |
| 560 | + map.from(this::getBatchSize) |
| 561 | + .to(properties.in(ProducerConfig.BATCH_SIZE_CONFIG)); |
| 562 | + map.from(this::getBootstrapServers) |
| 563 | + .to(properties.in(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); |
| 564 | + map.from(this::getBufferMemory) |
| 565 | + .to(properties.in(ProducerConfig.BUFFER_MEMORY_CONFIG)); |
| 566 | + map.from(this::getClientId) |
| 567 | + .to(properties.in(ProducerConfig.CLIENT_ID_CONFIG)); |
| 568 | + map.from(this::getCompressionType) |
| 569 | + .to(properties.in(ProducerConfig.COMPRESSION_TYPE_CONFIG)); |
| 570 | + map.from(this::getKeySerializer) |
| 571 | + .to(properties.in(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); |
| 572 | + map.from(this::getRetries).to(properties.in(ProducerConfig.RETRIES_CONFIG)); |
| 573 | + map.from(this::getValueSerializer) |
| 574 | + .to(properties.in(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); |
| 575 | + return properties.with(this.ssl, this.properties); |
624 | 576 | }
|
625 | 577 |
|
626 | 578 | }
|
@@ -669,13 +621,11 @@ public Map<String, String> getProperties() {
|
669 | 621 | }
|
670 | 622 |
|
671 | 623 | public Map<String, Object> buildProperties() {
|
672 |
| - Map<String, Object> properties = new HashMap<>(); |
673 |
| - if (this.clientId != null) { |
674 |
| - properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId); |
675 |
| - } |
676 |
| - properties.putAll(this.ssl.buildProperties()); |
677 |
| - properties.putAll(this.properties); |
678 |
| - return properties; |
| 624 | + Properties properties = new Properties(); |
| 625 | + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); |
| 626 | + map.from(this::getClientId) |
| 627 | + .to(properties.in(ProducerConfig.CLIENT_ID_CONFIG)); |
| 628 | + return properties.with(this.ssl, this.properties); |
679 | 629 | }
|
680 | 630 |
|
681 | 631 | }
|
@@ -969,40 +919,35 @@ public void setProtocol(String protocol) {
|
969 | 919 | }
|
970 | 920 |
|
971 | 921 | public Map<String, Object> buildProperties() {
|
972 |
| - Map<String, Object> properties = new HashMap<>(); |
973 |
| - if (this.getKeyPassword() != null) { |
974 |
| - properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.getKeyPassword()); |
975 |
| - } |
976 |
| - if (this.getKeystoreLocation() != null) { |
977 |
| - properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, |
978 |
| - resourceToPath(this.getKeystoreLocation())); |
979 |
| - } |
980 |
| - if (this.getKeystorePassword() != null) { |
981 |
| - properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, |
982 |
| - this.getKeystorePassword()); |
983 |
| - } |
984 |
| - if (this.getKeyStoreType() != null) { |
985 |
| - properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, |
986 |
| - this.getKeyStoreType()); |
987 |
| - } |
988 |
| - if (this.getTruststoreLocation() != null) { |
989 |
| - properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, |
990 |
| - resourceToPath(this.getTruststoreLocation())); |
991 |
| - } |
992 |
| - if (this.getTruststorePassword() != null) { |
993 |
| - properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, |
994 |
| - this.getTruststorePassword()); |
995 |
| - } |
996 |
| - if (this.getTrustStoreType() != null) { |
997 |
| - properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, |
998 |
| - this.getTrustStoreType()); |
999 |
| - } |
1000 |
| - if (this.getProtocol() != null) { |
1001 |
| - properties.put(SslConfigs.SSL_PROTOCOL_CONFIG, this.getProtocol()); |
1002 |
| - } |
| 922 | + Properties properties = new Properties(); |
| 923 | + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); |
| 924 | + map.from(this::getKeyPassword) |
| 925 | + .to(properties.in(SslConfigs.SSL_KEY_PASSWORD_CONFIG)); |
| 926 | + map.from(this::getKeystoreLocation).as(this::resourceToPath) |
| 927 | + .to(properties.in(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)); |
| 928 | + map.from(this::getKeystorePassword) |
| 929 | + .to(properties.in(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)); |
| 930 | + map.from(this::getKeyStoreType) |
| 931 | + .to(properties.in(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)); |
| 932 | + map.from(this::getTruststoreLocation).as(this::resourceToPath) |
| 933 | + .to(properties.in(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); |
| 934 | + map.from(this::getTruststorePassword) |
| 935 | + .to(properties.in(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); |
| 936 | + map.from(this::getTrustStoreType) |
| 937 | + .to(properties.in(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)); |
| 938 | + map.from(this::getProtocol).to(properties.in(SslConfigs.SSL_PROTOCOL_CONFIG)); |
1003 | 939 | return properties;
|
1004 | 940 | }
|
1005 | 941 |
|
| 942 | + private String resourceToPath(Resource resource) { |
| 943 | + try { |
| 944 | + return resource.getFile().getAbsolutePath(); |
| 945 | + } |
| 946 | + catch (IOException ex) { |
| 947 | + throw new IllegalStateException( |
| 948 | + "Resource '" + resource + "' must be on a file system", ex); |
| 949 | + } |
| 950 | + } |
1006 | 951 | }
|
1007 | 952 |
|
1008 | 953 | public static class Jaas {
|
@@ -1064,4 +1009,18 @@ public void setOptions(Map<String, String> options) {
|
1064 | 1009 |
|
1065 | 1010 | }
|
1066 | 1011 |
|
| 1012 | + private static class Properties extends HashMap<String, Object> { |
| 1013 | + |
| 1014 | + public <V> java.util.function.Consumer<V> in(String key) { |
| 1015 | + return (value) -> put(key, value); |
| 1016 | + } |
| 1017 | + |
| 1018 | + public Properties with(Ssl ssl, Map<String, String> properties) { |
| 1019 | + putAll(ssl.buildProperties()); |
| 1020 | + putAll(properties); |
| 1021 | + return this; |
| 1022 | + } |
| 1023 | + |
| 1024 | + } |
| 1025 | + |
1067 | 1026 | }
|
0 commit comments