Skip to content

Commit aa3b308

Browse files
gihong-parkolegz
authored andcommitted
Add caching and logging control options for KTable materialization
- Add cachingDisabled and loggingDisabled properties to KafkaStreamsConsumerProperties - Update getMaterialized method to apply caching and logging settings - Fixes gh-3094 Signed-off-by: gihong-park <[email protected]> Resolves #3136
1 parent 2a8bf53 commit aa3b308

File tree

4 files changed

+142
-4
lines changed

4 files changed

+142
-4
lines changed

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
/**
9393
* @author Soby Chacko
9494
* @author Ralf Wiedmann
95+
* @author Gihong Park
9596
* @since 3.0.0
9697
*/
9798
public abstract class AbstractKafkaStreamsBinderProcessor implements ApplicationContextAware {
@@ -542,13 +543,31 @@ private <K, V> KTable<K, V> materializedAs(StreamsBuilder streamsBuilder, String
542543

543544
final Consumed<K, V> consumed = getConsumed(kafkaStreamsConsumerProperties, k, v, autoOffsetReset);
544545
return streamsBuilder.table(this.bindingServiceProperties.getBindingDestination(destination),
545-
consumed, getMaterialized(storeName, k, v));
546+
consumed, getMaterialized(storeName, k, v, kafkaStreamsConsumerProperties.isCachingDisabled(), kafkaStreamsConsumerProperties.isLoggingDisabled()));
546547
}
547548

548549
private <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> getMaterialized(
549-
String storeName, Serde<K> k, Serde<V> v) {
550-
return Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(storeName)
550+
String storeName, Serde<K> k, Serde<V> v, Boolean isCachingDisabled, Boolean isLoggingDisabled) {
551+
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized =
552+
Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(storeName)
551553
.withKeySerde(k).withValueSerde(v);
554+
555+
if (isCachingDisabled != null) {
556+
if (isCachingDisabled) {
557+
materialized = materialized.withCachingDisabled();
558+
}
559+
else {
560+
materialized = materialized.withCachingEnabled();
561+
}
562+
}
563+
564+
if (isLoggingDisabled != null) {
565+
if (isLoggingDisabled) {
566+
materialized = materialized.withLoggingDisabled();
567+
}
568+
}
569+
570+
return materialized;
552571
}
553572

554573
private <K, V> GlobalKTable<K, V> materializedAsGlobalKTable(
@@ -558,7 +577,7 @@ private <K, V> GlobalKTable<K, V> materializedAsGlobalKTable(
558577
return streamsBuilder.globalTable(
559578
this.bindingServiceProperties.getBindingDestination(destination),
560579
consumed,
561-
getMaterialized(storeName, k, v));
580+
getMaterialized(storeName, k, v, kafkaStreamsConsumerProperties.isCachingDisabled(), kafkaStreamsConsumerProperties.isLoggingDisabled()));
562581
}
563582

564583
private GlobalKTable<?, ?> getGlobalKTable(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties,

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/properties/KafkaStreamsConsumerProperties.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
*
2525
* @author Marius Bogoevici
2626
* @author Soby Chacko
27+
* @author Gihong Park
2728
*/
2829
public class KafkaStreamsConsumerProperties extends KafkaConsumerProperties {
2930

@@ -44,6 +45,16 @@ public class KafkaStreamsConsumerProperties extends KafkaConsumerProperties {
4445
*/
4546
private String materializedAs;
4647

48+
/**
49+
* Disable caching for materialized KTable.
50+
*/
51+
private boolean cachingDisabled;
52+
53+
/**
54+
* Disable logging for materialized KTable.
55+
*/
56+
private boolean loggingDisabled;
57+
4758
/**
4859
* Per input binding deserialization handler.
4960
*/
@@ -109,6 +120,22 @@ public void setMaterializedAs(String materializedAs) {
109120
this.materializedAs = materializedAs;
110121
}
111122

123+
public boolean isCachingDisabled() {
124+
return this.cachingDisabled;
125+
}
126+
127+
public void setCachingDisabled(boolean cachingDisabled) {
128+
this.cachingDisabled = cachingDisabled;
129+
}
130+
131+
public boolean isLoggingDisabled() {
132+
return this.loggingDisabled;
133+
}
134+
135+
public void setLoggingDisabled(boolean loggingDisabled) {
136+
this.loggingDisabled = loggingDisabled;
137+
}
138+
112139
public String getTimestampExtractorBeanName() {
113140
return timestampExtractorBeanName;
114141
}

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/ExtendedBindingHandlerMappingsProviderAutoConfigurationTests.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,85 @@ void defaultsRespectedWhenCustomBindingProperties() {
7777
});
7878
}
7979

80+
@Test
81+
void cachingAndLoggingDisabledPropertiesWork() {
82+
this.contextRunner
83+
.withPropertyValues(
84+
"spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.caching-disabled: true",
85+
"spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.logging-disabled: true")
86+
.run((context) -> {
87+
assertThat(context)
88+
.hasNotFailed()
89+
.hasSingleBean(KafkaStreamsExtendedBindingProperties.class);
90+
KafkaStreamsExtendedBindingProperties extendedBindingProperties = context.getBean(KafkaStreamsExtendedBindingProperties.class);
91+
assertThat(extendedBindingProperties.getExtendedConsumerProperties("process-in-0"))
92+
.hasFieldOrPropertyWithValue("cachingDisabled", true)
93+
.hasFieldOrPropertyWithValue("loggingDisabled", true);
94+
});
95+
}
96+
97+
@Test
98+
void cachingAndLoggingDefaultValues() {
99+
this.contextRunner.run((context) -> {
100+
assertThat(context)
101+
.hasNotFailed()
102+
.hasSingleBean(KafkaStreamsExtendedBindingProperties.class);
103+
KafkaStreamsExtendedBindingProperties extendedBindingProperties = context.getBean(KafkaStreamsExtendedBindingProperties.class);
104+
assertThat(extendedBindingProperties.getExtendedConsumerProperties("process-in-0"))
105+
.hasFieldOrPropertyWithValue("cachingDisabled", false)
106+
.hasFieldOrPropertyWithValue("loggingDisabled", false);
107+
});
108+
}
109+
110+
@Test
111+
void onlyCachingDisabledProperty() {
112+
this.contextRunner
113+
.withPropertyValues(
114+
"spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.caching-disabled: true")
115+
.run((context) -> {
116+
assertThat(context)
117+
.hasNotFailed()
118+
.hasSingleBean(KafkaStreamsExtendedBindingProperties.class);
119+
KafkaStreamsExtendedBindingProperties extendedBindingProperties = context.getBean(KafkaStreamsExtendedBindingProperties.class);
120+
assertThat(extendedBindingProperties.getExtendedConsumerProperties("process-in-0"))
121+
.hasFieldOrPropertyWithValue("cachingDisabled", true)
122+
.hasFieldOrPropertyWithValue("loggingDisabled", false);
123+
});
124+
}
125+
126+
@Test
127+
void onlyLoggingDisabledProperty() {
128+
this.contextRunner
129+
.withPropertyValues(
130+
"spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.logging-disabled: true")
131+
.run((context) -> {
132+
assertThat(context)
133+
.hasNotFailed()
134+
.hasSingleBean(KafkaStreamsExtendedBindingProperties.class);
135+
KafkaStreamsExtendedBindingProperties extendedBindingProperties = context.getBean(KafkaStreamsExtendedBindingProperties.class);
136+
assertThat(extendedBindingProperties.getExtendedConsumerProperties("process-in-0"))
137+
.hasFieldOrPropertyWithValue("cachingDisabled", false)
138+
.hasFieldOrPropertyWithValue("loggingDisabled", true);
139+
});
140+
}
141+
142+
@Test
143+
void defaultAndBindingSpecificCachingLoggingProperties() {
144+
this.contextRunner
145+
.withPropertyValues(
146+
"spring.cloud.stream.kafka.streams.default.consumer.caching-disabled: true",
147+
"spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.logging-disabled: true")
148+
.run((context) -> {
149+
assertThat(context)
150+
.hasNotFailed()
151+
.hasSingleBean(KafkaStreamsExtendedBindingProperties.class);
152+
KafkaStreamsExtendedBindingProperties extendedBindingProperties = context.getBean(KafkaStreamsExtendedBindingProperties.class);
153+
assertThat(extendedBindingProperties.getExtendedConsumerProperties("process-in-0"))
154+
.hasFieldOrPropertyWithValue("cachingDisabled", true)
155+
.hasFieldOrPropertyWithValue("loggingDisabled", true);
156+
});
157+
}
158+
80159
@EnableAutoConfiguration
81160
static class KafkaStreamsTestApp {
82161
}

docs/modules/ROOT/pages/kafka/kafka-streams-binder/configuration-options.adoc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,19 @@ state store to materialize when using incoming KTable types
146146
+
147147
Default: `none`.
148148

149+
cachingDisabled::
150+
Disable caching for materialized KTable.
151+
When set to `true`, calls `withCachingDisabled()` on the Materialized object.
152+
When set to `false`, calls `withCachingEnabled()` on the Materialized object.
153+
+
154+
Default: `false`.
155+
156+
loggingDisabled::
157+
Disable logging for materialized KTable.
158+
When set to `true`, calls `withLoggingDisabled()` on the Materialized object.
159+
+
160+
Default: `false`.
161+
149162
useNativeDecoding::
150163
flag to enable/disable native decoding
151164
+

0 commit comments

Comments
 (0)