diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsConfigX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsConfigX.java index ba75b714b..70648cc43 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsConfigX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsConfigX.java @@ -30,8 +30,14 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import lombok.EqualsAndHashCode; import lombok.NonNull; +import lombok.ToString; import lombok.Value; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.state.HostInfo; @@ -39,36 +45,69 @@ * Class for simplified access to configs provided by {@link StreamsConfig} */ @Value -public class StreamsConfigX { +@EqualsAndHashCode(callSuper = true) +@ToString(callSuper = true) +public class StreamsConfigX extends AbstractConfig { + public static final String LINEAGE_ENABLED_CONFIG = "streams.bootstrap.lineage.enabled"; + private static final String LINEAGE_ENABLED_DOC = ""; + private static final ConfigDef CONFIG_DEF = StreamsConfig.configDef() + .define(LINEAGE_ENABLED_CONFIG, Type.BOOLEAN, false, Importance.LOW, LINEAGE_ENABLED_DOC); @NonNull StreamsConfig streamsConfig; + /** + * Create a new {@code StreamsConfigX} from a {@link StreamsConfig} + * + * @param streamsConfig streams config + */ + public StreamsConfigX(final StreamsConfig streamsConfig) { + super(CONFIG_DEF, streamsConfig.originals()); + this.streamsConfig = streamsConfig; + } + + private static HostInfo createHostInfo(final String applicationServerConfig) { + final String[] hostAndPort = applicationServerConfig.split(":"); + return new HostInfo(hostAndPort[0], Integer.parseInt(hostAndPort[1])); + } + /** * Get the application id of the underlying {@link StreamsConfig} + * * @return application id * @see StreamsConfig#APPLICATION_ID_CONFIG */ public String getAppId() { - return this.streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG); + return this.getString(StreamsConfig.APPLICATION_ID_CONFIG); } /** * Get the bootstrap servers of the underlying {@link StreamsConfig} + * * @return list of bootstrap servers * @see StreamsConfig#BOOTSTRAP_SERVERS_CONFIG */ public List getBoostrapServers() { - return this.streamsConfig.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); + return this.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); + } + + /** + * Check if adding lineage headers is enabled. This is controlled by {@link #LINEAGE_ENABLED_CONFIG} + * + * @return true if lineage headers are added to streams and tables + */ + public boolean isLineageEnabled() { + return this.getBoolean(LINEAGE_ENABLED_CONFIG); } /** * Get all configs of the underlying {@link StreamsConfig} + * * @return Kafka configs * @see StreamsConfig#originals() */ public Map getKafkaProperties() { - return Collections.unmodifiableMap(this.streamsConfig.originals()); + return Collections.unmodifiableMap(this.originals()); } /** @@ -78,13 +117,8 @@ public Map getKafkaProperties() { * {@link StreamsConfig#APPLICATION_SERVER_CONFIG} is set; otherwise, an empty {@link Optional}. */ public Optional getApplicationServer() { - final String applicationServerConfig = this.streamsConfig.getString(APPLICATION_SERVER_CONFIG); + final String applicationServerConfig = this.getString(APPLICATION_SERVER_CONFIG); return applicationServerConfig.isEmpty() ? Optional.empty() : Optional.of(createHostInfo(applicationServerConfig)); } - - private static HostInfo createHostInfo(final String applicationServerConfig) { - final String[] hostAndPort = applicationServerConfig.split(":"); - return new HostInfo(hostAndPort[0], Integer.parseInt(hostAndPort[1])); - } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java new file mode 100644 index 000000000..79270f9a7 --- /dev/null +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java @@ -0,0 +1,82 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka.streams.kstream; + +import java.nio.charset.StandardCharsets; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.experimental.Accessors; +import org.apache.kafka.common.header.Headers; + +/** + * Configure headers for data lineage of Kafka messages + */ +@RequiredArgsConstructor(access = AccessLevel.PACKAGE) +public class LineageHeaders { + private static final String LINEAGE_PREFIX = "streams.bootstrap.lineage."; + /** + * Header indicating the topic the record was read from. + */ + public static final String TOPIC_HEADER = LINEAGE_PREFIX + "topic"; + /** + * Header indicating the partition the record was read from. + */ + public static final String PARTITION_HEADER = LINEAGE_PREFIX + "partition"; + /** + * Header indicating the offset the record was read from. + */ + public static final String OFFSET_HEADER = LINEAGE_PREFIX + "offset"; + + @Getter(AccessLevel.PACKAGE) + @Accessors(fluent = true) + private final @NonNull Headers headers; + + LineageHeaders addTopicHeader(final String topic) { + if (topic == null) { + return this; + } + return new LineageHeaders(this.headers.add(TOPIC_HEADER, topic.getBytes(StandardCharsets.UTF_8))); + } + + LineageHeaders addPartitionHeader(final int partition) { + if (partition < 0) { + return this; + } + //TODO serialize more compact as int? But then UI tools usually can't handle it + return new LineageHeaders( + this.headers.add(PARTITION_HEADER, Integer.toString(partition).getBytes(StandardCharsets.UTF_8))); + } + + LineageHeaders addOffsetHeader(final long offset) { + if (offset < 0) { + return this; + } + //TODO serialize more compact as long? But then UI tools usually can't handle it + return new LineageHeaders( + this.headers.add(OFFSET_HEADER, Long.toString(offset).getBytes(StandardCharsets.UTF_8))); + } +} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageProcessor.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageProcessor.java new file mode 100644 index 000000000..6e5f1950b --- /dev/null +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageProcessor.java @@ -0,0 +1,61 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka.streams.kstream; + +import java.util.Optional; +import lombok.NoArgsConstructor; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.processor.api.RecordMetadata; + +@NoArgsConstructor +class LineageProcessor implements FixedKeyProcessor { + private FixedKeyProcessorContext context; + + private static Headers addHeaders(final Headers headers, final RecordMetadata metadata) { + return new LineageHeaders(new RecordHeaders(headers)) + .addTopicHeader(metadata.topic()) + .addPartitionHeader(metadata.partition()) + .addOffsetHeader(metadata.offset()) + .headers(); + } + + @Override + public void init(final FixedKeyProcessorContext context) { + this.context = context; + } + + @Override + public void process(final FixedKeyRecord rekord) { + final Optional metadata = this.context.recordMetadata(); + final Headers headers = rekord.headers(); + final Headers newHeaders = metadata.map(m -> addHeaders(headers, m)) + .orElse(headers); + this.context.forward(rekord.withHeaders(newHeaders)); + } +} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageTransformer.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageTransformer.java new file mode 100644 index 000000000..184218b53 --- /dev/null +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageTransformer.java @@ -0,0 +1,54 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka.streams.kstream; + +import lombok.NoArgsConstructor; +import org.apache.kafka.streams.kstream.ValueTransformerWithKey; +import org.apache.kafka.streams.processor.ProcessorContext; + +@NoArgsConstructor +class LineageTransformer implements ValueTransformerWithKey { + + private ProcessorContext context; + + @Override + public void init(final ProcessorContext context) { + this.context = context; + } + + @Override + public V transform(final K readOnlyKey, final V value) { + new LineageHeaders(this.context.headers()) + .addTopicHeader(this.context.topic()) + .addPartitionHeader(this.context.partition()) + .addOffsetHeader(this.context.offset()); + return value; + } + + @Override + public void close() { + // do nothing + } +} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/StreamsBuilderX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/StreamsBuilderX.java index aa36942fc..1c4a9269a 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/StreamsBuilderX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/StreamsBuilderX.java @@ -26,6 +26,7 @@ import com.bakdata.kafka.AppConfiguration; import com.bakdata.kafka.Configurator; +import com.bakdata.kafka.streams.StreamsConfigX; import com.bakdata.kafka.streams.StreamsTopicConfig; import java.util.Collection; import java.util.Map; @@ -35,6 +36,7 @@ import lombok.Value; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.GlobalKTable; @@ -74,14 +76,16 @@ public StreamsBuilderX(final StreamsTopicConfig topics, final Map KStreamX stream(final String topic) { - return this.getContext().wrap(this.streamsBuilder.stream(topic)); + final KStreamX stream = this.getContext().wrap(this.streamsBuilder.stream(topic)); + return initialize(stream); } /** * @see StreamsBuilder#stream(String, Consumed) */ public KStreamX stream(final String topic, final Consumed consumed) { - return this.getContext().wrap(this.streamsBuilder.stream(topic, consumed)); + final KStreamX stream = this.getContext().wrap(this.streamsBuilder.stream(topic, consumed)); + return initialize(stream); } /** @@ -95,14 +99,16 @@ public KStreamX stream(final String topic, final ConsumedX co * @see StreamsBuilder#stream(Collection) */ public KStreamX stream(final Collection topics) { - return this.getContext().wrap(this.streamsBuilder.stream(topics)); + final KStreamX stream = this.getContext().wrap(this.streamsBuilder.stream(topics)); + return initialize(stream); } /** * @see StreamsBuilder#stream(Collection, Consumed) */ public KStreamX stream(final Collection topics, final Consumed consumed) { - return this.getContext().wrap(this.streamsBuilder.stream(topics, consumed)); + final KStreamX stream = this.getContext().wrap(this.streamsBuilder.stream(topics, consumed)); + return initialize(stream); } /** @@ -117,14 +123,16 @@ public KStreamX stream(final Collection topics, * @see StreamsBuilder#stream(Pattern) */ public KStreamX stream(final Pattern topicPattern) { - return this.getContext().wrap(this.streamsBuilder.stream(topicPattern)); + final KStreamX stream = this.getContext().wrap(this.streamsBuilder.stream(topicPattern)); + return initialize(stream); } /** * @see StreamsBuilder#stream(Pattern, Consumed) */ public KStreamX stream(final Pattern topicPattern, final Consumed consumed) { - return this.getContext().wrap(this.streamsBuilder.stream(topicPattern, consumed)); + final KStreamX stream = this.getContext().wrap(this.streamsBuilder.stream(topicPattern, consumed)); + return initialize(stream); } /** @@ -136,10 +144,11 @@ public KStreamX stream(final Pattern topicPattern, final ConsumedX< /** * Create a {@link KStreamX} from all {@link StreamsTopicConfig#getInputTopics()} + * * @param consumed define optional parameters for streaming topics - * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics()} * @param type of keys * @param type of values + * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics()} * @see StreamsBuilder#stream(Collection, Consumed) */ public KStreamX streamInput(final Consumed consumed) { @@ -148,10 +157,11 @@ public KStreamX streamInput(final Consumed consumed) { /** * Create a {@link KStreamX} from all {@link StreamsTopicConfig#getInputTopics()} + * * @param consumed define optional parameters for streaming topics - * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics()} * @param type of keys * @param type of values + * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics()} * @see StreamsBuilder#stream(Collection, Consumed) */ public KStreamX streamInput(final ConsumedX consumed) { @@ -160,9 +170,10 @@ public KStreamX streamInput(final ConsumedX consumed) { /** * Create a {@link KStreamX} from all {@link StreamsTopicConfig#getInputTopics()} - * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics()} + * * @param type of keys * @param type of values + * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics()} * @see StreamsBuilder#stream(Collection) */ public KStreamX streamInput() { @@ -171,11 +182,12 @@ public KStreamX streamInput() { /** * Create a {@link KStreamX} from all {@link StreamsTopicConfig#getInputTopics(String)} + * * @param label label of input topics * @param consumed define optional parameters for streaming topics - * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics(String)} * @param type of keys * @param type of values + * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics(String)} * @see StreamsBuilder#stream(Collection, Consumed) */ public KStreamX streamInput(final String label, final Consumed consumed) { @@ -184,11 +196,12 @@ public KStreamX streamInput(final String label, final Consumed type of keys * @param type of values + * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics(String)} * @see StreamsBuilder#stream(Collection, Consumed) */ public KStreamX streamInput(final String label, final ConsumedX consumed) { @@ -197,10 +210,11 @@ public KStreamX streamInput(final String label, final ConsumedX type of keys * @param type of values + * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics(String)} * @see StreamsBuilder#stream(Collection) */ public KStreamX streamInput(final String label) { @@ -209,10 +223,11 @@ public KStreamX streamInput(final String label) { /** * Create a {@link KStreamX} from all topics matching {@link StreamsTopicConfig#getInputPattern()} + * * @param consumed define optional parameters for streaming topics - * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern()} * @param type of keys * @param type of values + * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern()} * @see StreamsBuilder#stream(Pattern, Consumed) */ public KStreamX streamInputPattern(final Consumed consumed) { @@ -221,10 +236,11 @@ public KStreamX streamInputPattern(final Consumed consumed) { /** * Create a {@link KStreamX} from all topics matching {@link StreamsTopicConfig#getInputPattern()} + * * @param consumed define optional parameters for streaming topics - * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern()} * @param type of keys * @param type of values + * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern()} * @see StreamsBuilder#stream(Pattern, Consumed) */ public KStreamX streamInputPattern(final ConsumedX consumed) { @@ -233,9 +249,10 @@ public KStreamX streamInputPattern(final ConsumedX consumed) /** * Create a {@link KStreamX} from all topics matching {@link StreamsTopicConfig#getInputPattern()} - * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern()} + * * @param type of keys * @param type of values + * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern()} * @see StreamsBuilder#stream(Pattern) */ public KStreamX streamInputPattern() { @@ -244,11 +261,12 @@ public KStreamX streamInputPattern() { /** * Create a {@link KStreamX} from all topics matching {@link StreamsTopicConfig#getInputPattern(String)} + * * @param label label of input pattern * @param consumed define optional parameters for streaming topics - * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern(String)} * @param type of keys * @param type of values + * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern(String)} * @see StreamsBuilder#stream(Pattern, Consumed) */ public KStreamX streamInputPattern(final String label, final Consumed consumed) { @@ -257,11 +275,12 @@ public KStreamX streamInputPattern(final String label, final Consum /** * Create a {@link KStreamX} from all topics matching {@link StreamsTopicConfig#getInputPattern(String)} + * * @param label label of input pattern * @param consumed define optional parameters for streaming topics - * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern(String)} * @param type of keys * @param type of values + * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern(String)} * @see StreamsBuilder#stream(Pattern, Consumed) */ public KStreamX streamInputPattern(final String label, @@ -271,10 +290,11 @@ public KStreamX streamInputPattern(final String label, /** * Create a {@link KStreamX} from all topics matching {@link StreamsTopicConfig#getInputPattern(String)} + * * @param label label of input pattern - * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern(String)} * @param type of keys * @param type of values + * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern(String)} * @see StreamsBuilder#stream(Pattern) */ public KStreamX streamInputPattern(final String label) { @@ -285,14 +305,16 @@ public KStreamX streamInputPattern(final String label) { * @see StreamsBuilder#table(String) */ public KTableX table(final String topic) { - return this.getContext().wrap(this.streamsBuilder.table(topic)); + final KTableX table = this.getContext().wrap(this.streamsBuilder.table(topic)); + return initialize(table); } /** * @see StreamsBuilder#table(String, Consumed) */ public KTableX table(final String topic, final Consumed consumed) { - return this.getContext().wrap(this.streamsBuilder.table(topic, consumed)); + final KTableX table = this.getContext().wrap(this.streamsBuilder.table(topic, consumed)); + return initialize(table); } /** @@ -307,7 +329,8 @@ public KTableX table(final String topic, final ConsumedX cons */ public KTableX table(final String topic, final Materialized> materialized) { - return this.getContext().wrap(this.streamsBuilder.table(topic, materialized)); + final KTableX table = this.getContext().wrap(this.streamsBuilder.table(topic, materialized)); + return initialize(table); } /** @@ -323,7 +346,8 @@ public KTableX table(final String topic, */ public KTableX table(final String topic, final Consumed consumed, final Materialized> materialized) { - return this.getContext().wrap(this.streamsBuilder.table(topic, consumed, materialized)); + final KTableX table = this.getContext().wrap(this.streamsBuilder.table(topic, consumed, materialized)); + return initialize(table); } /** @@ -418,6 +442,7 @@ public StreamsBuilderX addGlobalStore(final StoreBuilder storeBuil /** * Create {@link Configurator} to configure {@link org.apache.kafka.common.serialization.Serde} and * {@link org.apache.kafka.common.serialization.Serializer} using {@link #kafkaProperties}. + * * @return {@link Configurator} */ public Configurator createConfigurator() { @@ -426,6 +451,7 @@ public Configurator createConfigurator() { /** * Create {@link AppConfiguration} used by this app + * * @return {@link AppConfiguration} */ public AppConfiguration createConfiguration() { @@ -434,6 +460,7 @@ public AppConfiguration createConfiguration() { /** * Create a {@link StreamsContext} to wrap Kafka Streams interfaces + * * @return {@link StreamsContext} */ public StreamsContext getContext() { @@ -442,6 +469,7 @@ public StreamsContext getContext() { /** * Create stores using application context to lazily configures Serdes + * * @return {@link StoresX} */ public StoresX stores() { @@ -457,4 +485,20 @@ public StoresX stores() { public Topology build() { return this.streamsBuilder.build(); } + + private KStreamX initialize(final KStreamX stream) { + final StreamsConfigX streamsConfigX = new StreamsConfigX(new StreamsConfig(this.kafkaProperties)); + if (streamsConfigX.isLineageEnabled()) { + return stream.processValues(LineageProcessor::new); + } + return stream; + } + + private KTableX initialize(final KTableX table) { + final StreamsConfigX streamsConfigX = new StreamsConfigX(new StreamsConfig(this.kafkaProperties)); + if (streamsConfigX.isLineageEnabled()) { + return table.transformValues(LineageTransformer::new); + } + return table; + } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java index 1cc3aa29c..ed757cc5b 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java @@ -24,7 +24,6 @@ package com.bakdata.kafka.streams.kstream; -import static java.util.Collections.emptyMap; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -34,6 +33,7 @@ import com.bakdata.kafka.streams.apps.DoubleApp; import com.bakdata.kafka.streams.apps.StringApp; import java.time.Duration; +import java.util.Map; import java.util.function.Function; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; @@ -1835,7 +1835,10 @@ public void buildTopology(final StreamsBuilderX builder) { @Test void shouldHaveQueryableStoreName() { - final StreamsBuilderX builder = new StreamsBuilderX(StreamsTopicConfig.builder().build(), emptyMap()); + final StreamsBuilderX builder = new StreamsBuilderX(StreamsTopicConfig.builder().build(), Map.of( + "application.id", "app-id", + "bootstrap.servers", "localhost:9092" + )); final KTableX table = builder.stream("input").toTable(Materialized.as("store")); this.softly.assertThat(table.queryableStoreName()).isEqualTo("store"); } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/MaterializedXTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/MaterializedXTest.java index ec9b1d124..590682666 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/MaterializedXTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/MaterializedXTest.java @@ -26,7 +26,6 @@ import static com.bakdata.kafka.KafkaTest.POLL_TIMEOUT; import static com.bakdata.kafka.KafkaTest.SESSION_TIMEOUT; -import static java.util.Collections.emptyMap; import static java.util.concurrent.CompletableFuture.runAsync; import com.bakdata.fluent_kafka_streams_tests.TestTopology; @@ -550,7 +549,10 @@ public void buildTopology(final StreamsBuilderX builder) { @Test void shouldThrowIfRetentionIsTooShort() { - final StreamsBuilderX builder = new StreamsBuilderX(StreamsTopicConfig.builder().build(), emptyMap()); + final StreamsBuilderX builder = new StreamsBuilderX(StreamsTopicConfig.builder().build(), Map.of( + "application.id", "app-id", + "bootstrap.servers", "localhost:9092" + )); final KStreamX input = builder.stream("input"); final KGroupedStreamX grouped = input.groupByKey(); final TimeWindowedKStreamX windowed = diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/StreamsBuilderXTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/StreamsBuilderXTest.java index b4baed195..1034217d8 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/StreamsBuilderXTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/StreamsBuilderXTest.java @@ -26,12 +26,15 @@ import com.bakdata.fluent_kafka_streams_tests.TestTopology; import com.bakdata.kafka.Preconfigured; +import com.bakdata.kafka.streams.StreamsConfigX; import com.bakdata.kafka.streams.StreamsTopicConfig; import com.bakdata.kafka.streams.apps.DoubleApp; import com.bakdata.kafka.streams.apps.StringApp; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.regex.Pattern; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.GlobalKTable; @@ -41,10 +44,18 @@ import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(SoftAssertionsExtension.class) class StreamsBuilderXTest { + @InjectSoftAssertions + private SoftAssertions softly; + @Test void shouldReadFromInput() { final StringApp app = new StringApp() { @@ -631,4 +642,144 @@ public void buildTopology(final StreamsBuilderX builder) { } } + @Test + void shouldAddLineageToStream() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KStreamX input = builder.stream("input"); + input.to("output"); + } + + @Override + public Map createKafkaProperties() { + return Map.of( + StreamsConfigX.LINEAGE_ENABLED_CONFIG, true + ); + } + }; + try (final TestTopology topology = app.startApp()) { + topology.input().add("foo", "bar"); + final List> records = topology.streamOutput().toList(); + this.softly.assertThat(records) + .hasSize(1) + .anySatisfy(rekord -> { + this.softly.assertThat(rekord.key()).isEqualTo("foo"); + this.softly.assertThat(rekord.value()).isEqualTo("bar"); + this.softly.assertThat(rekord.headers().toArray()) + .hasSize(3) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.TOPIC_HEADER); + this.softly.assertThat(new String(header.value(), StandardCharsets.UTF_8)) + .isEqualTo("input"); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.PARTITION_HEADER); + this.softly.assertThat( + Integer.parseInt(new String(header.value(), + StandardCharsets.UTF_8))) + .isEqualTo(0); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.OFFSET_HEADER); + this.softly.assertThat( + Long.parseLong(new String(header.value(), StandardCharsets.UTF_8))) + .isEqualTo(0L); + }); + }); + } + } + + @Test + void shouldNotAddLineageToStream() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KStreamX input = builder.stream("input"); + input.to("output"); + } + }; + try (final TestTopology topology = app.startApp()) { + topology.input().add("foo", "bar"); + final List> records = topology.streamOutput().toList(); + this.softly.assertThat(records) + .hasSize(1) + .anySatisfy(rekord -> { + this.softly.assertThat(rekord.key()).isEqualTo("foo"); + this.softly.assertThat(rekord.value()).isEqualTo("bar"); + this.softly.assertThat(rekord.headers().toArray()).isEmpty(); + }); + } + } + + @Test + void shouldAddLineageToTable() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KTableX input = builder.table("input"); + input.toStream().to("output"); + } + + @Override + public Map createKafkaProperties() { + return Map.of( + StreamsConfigX.LINEAGE_ENABLED_CONFIG, true + ); + } + }; + try (final TestTopology topology = app.startApp()) { + topology.input().add("foo", "bar"); + final List> records = topology.streamOutput().toList(); + this.softly.assertThat(records) + .hasSize(1) + .anySatisfy(rekord -> { + this.softly.assertThat(rekord.key()).isEqualTo("foo"); + this.softly.assertThat(rekord.value()).isEqualTo("bar"); + this.softly.assertThat(rekord.headers().toArray()) + .hasSize(3) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.TOPIC_HEADER); + this.softly.assertThat(new String(header.value(), StandardCharsets.UTF_8)) + .isEqualTo("input"); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.PARTITION_HEADER); + this.softly.assertThat( + Integer.parseInt(new String(header.value(), + StandardCharsets.UTF_8))) + .isEqualTo(0); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.OFFSET_HEADER); + this.softly.assertThat( + Long.parseLong(new String(header.value(), StandardCharsets.UTF_8))) + .isEqualTo(0L); + }); + }); + } + } + + @Test + void shouldNotAddLineageToTable() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KTableX input = builder.table("input"); + input.toStream().to("output"); + } + }; + try (final TestTopology topology = app.startApp()) { + topology.input().add("foo", "bar"); + final List> records = topology.streamOutput().toList(); + this.softly.assertThat(records) + .hasSize(1) + .anySatisfy(rekord -> { + this.softly.assertThat(rekord.key()).isEqualTo("foo"); + this.softly.assertThat(rekord.value()).isEqualTo("bar"); + this.softly.assertThat(rekord.headers().toArray()).isEmpty(); + }); + } + } + }