diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java
index 53369cc6ad60..b310c573af85 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java
@@ -20,6 +20,7 @@
package org.apache.druid.data.input.kafka;
import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.KafkaEntity;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.indexing.kafka.KafkaRecordSupplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -34,7 +35,7 @@
*
* NOTE: Any records with null values will be skipped, even if they contain non-null keys, or headers
*/
-public class KafkaRecordEntity extends ByteEntity
+public class KafkaRecordEntity extends ByteEntity implements KafkaEntity
{
private final ConsumerRecord record;
@@ -48,4 +49,10 @@ public ConsumerRecord getRecord()
{
return record;
}
+
+ @Override
+ public long getRecordTimestampMillis()
+ {
+ return record.timestamp();
+ }
}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/HeaderFilterHandler.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/HeaderFilterHandler.java
new file mode 100644
index 000000000000..b67f11d5254d
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/HeaderFilterHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+/**
+ * Interface for handling different filter types in Kafka header evaluation.
+ *
+ * This provides an extensible way to support various Druid filter types for
+ * header-based filtering without modifying the core evaluation logic.
+ */
+public interface HeaderFilterHandler
+{
+ /**
+ * Gets the header name/key to evaluate from the filter.
+ *
+ * @return the header name to look for in Kafka message headers
+ */
+ String getHeaderName();
+
+ /**
+ * Evaluates whether a record should be included based on the header value.
+ *
+ * @param headerValue the decoded header value (guaranteed to be non-null when called)
+ * @return true if the record should be included, false if it should be filtered out
+ */
+ boolean shouldInclude(String headerValue);
+
+ /**
+ * Gets a human-readable description of this filter for logging and debugging.
+ *
+ * @return a descriptive string representation of the filter
+ */
+ String getDescription();
+
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/HeaderFilterHandlerFactory.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/HeaderFilterHandlerFactory.java
new file mode 100644
index 000000000000..224b2afe3cef
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/HeaderFilterHandlerFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.filter.InDimFilter;
+
+/**
+ * Factory for creating HeaderFilterHandler instances.
+ *
+ * This factory uses explicit instanceof checks for clarity and performance,
+ * making it easy to add support for new filter types by simply adding
+ * new conditional branches.
+ */
+public final class HeaderFilterHandlerFactory
+{
+ private HeaderFilterHandlerFactory()
+ {
+ // Utility class - prevent instantiation
+ }
+
+ /**
+ * Creates the appropriate handler for the given filter.
+ *
+ * @param filter the Druid filter to create a handler for
+ * @return a HeaderFilterHandler that can evaluate the given filter type
+ * @throws IllegalArgumentException if the filter type is not supported
+ */
+ public static HeaderFilterHandler forFilter(Filter filter)
+ {
+ if (filter instanceof InDimFilter) {
+ return new InDimFilterHandler((InDimFilter) filter);
+ }
+
+ throw new IllegalArgumentException(
+ "Unsupported filter type for header filtering: " + filter.getClass().getSimpleName() +
+ ". Supported types: InDimFilter"
+ );
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/InDimFilterHandler.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/InDimFilterHandler.java
new file mode 100644
index 000000000000..35efe17480c0
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/InDimFilterHandler.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.filter.InDimFilter;
+
+import javax.annotation.Nullable;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Handler for InDimFilter in Kafka header evaluation.
+ *
+ * This handler evaluates whether a header value is contained in the filter's value set.
+ * It uses a HashSet for O(1) average-case lookup performance instead of the filter's
+ * internal TreeSet which has O(log n) lookup time.
+ */
+public class InDimFilterHandler implements HeaderFilterHandler
+{
+ private final InDimFilter filter;
+ private final Set filterValues;
+
+ /**
+ * Creates a new handler for the given InDimFilter.
+ *
+ * @param filter the InDimFilter to handle, must not be null
+ * @throws IllegalArgumentException if filter is null
+ */
+ public InDimFilterHandler(InDimFilter filter)
+ {
+ this.filter = Preconditions.checkNotNull(filter, "filter cannot be null");
+
+ // Convert to HashSet for O(1) lookups instead of O(log n) TreeSet lookups
+ // This optimization is particularly beneficial when the filter has many values
+ this.filterValues = new HashSet<>(filter.getValues());
+ }
+
+ @Override
+ public String getHeaderName()
+ {
+ return filter.getDimension();
+ }
+
+ @Override
+ public boolean shouldInclude(@Nullable String headerValue)
+ {
+ return filterValues.contains(headerValue);
+ }
+
+ @Override
+ public String getDescription()
+ {
+ return StringUtils.format(
+ "InDimFilter[header=%s, values=%d]",
+ filter.getDimension(),
+ filterValues.size()
+ );
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaHeaderBasedFilterEvaluator.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaHeaderBasedFilterEvaluator.java
new file mode 100644
index 000000000000..89be079424e7
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaHeaderBasedFilterEvaluator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.druid.indexing.kafka.supervisor.KafkaHeaderBasedFilterConfig;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.filter.Filter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+
+import javax.annotation.Nullable;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+/**
+ * Evaluates Kafka header filters for pre-ingestion filtering.
+ */
+public class KafkaHeaderBasedFilterEvaluator
+{
+ private static final Logger log = new Logger(KafkaHeaderBasedFilterEvaluator.class);
+
+ private final HeaderFilterHandler filterHandler;
+ private final String headerName;
+ private final Charset encoding;
+ private final Cache stringDecodingCache;
+
+ /**
+ * Creates a new KafkaHeaderBasedFilterEvaluator with the given configuration.
+ *
+ * @param headerBasedFilterConfig the configuration containing filter, encoding, and cache settings
+ * @throws IllegalArgumentException if the filter type is not supported
+ */
+ public KafkaHeaderBasedFilterEvaluator(KafkaHeaderBasedFilterConfig headerBasedFilterConfig)
+ {
+ this.encoding = Charset.forName(headerBasedFilterConfig.getEncoding());
+ this.stringDecodingCache = Caffeine.newBuilder()
+ .maximumSize(headerBasedFilterConfig.getStringDecodingCacheSize())
+ .build();
+
+ Filter filter = headerBasedFilterConfig.getFilter().toFilter();
+ this.filterHandler = HeaderFilterHandlerFactory.forFilter(filter);
+ this.headerName = filterHandler.getHeaderName();
+
+ log.info("Initialized Kafka header filter: %s with encoding [%s] and cache size [%d]",
+ filterHandler.getDescription(),
+ headerBasedFilterConfig.getEncoding(),
+ headerBasedFilterConfig.getStringDecodingCacheSize());
+ }
+
+
+ /**
+ * Evaluates whether a Kafka record should be included based on its headers.
+ *
+ * @param record the Kafka consumer record
+ * @return true if the record should be included, false if it should be filtered out
+ */
+ public boolean shouldIncludeRecord(ConsumerRecord record)
+ {
+ try {
+ return evaluateInclusion(record.headers());
+ }
+ catch (Exception e) {
+ log.warn(
+ e,
+ "Error evaluating header filter for record at topic [%s] partition [%d] offset [%d], including record",
+ record.topic(),
+ record.partition(),
+ record.offset()
+ );
+ return true; // Default to including record on error
+ }
+ }
+
+ /**
+ * Evaluates whether a record should be included based on its headers.
+ *
+ * Uses permissive behavior: records with missing, null, or undecodable headers
+ * are included by default. Only records with successfully decoded header values
+ * that don't match the filter criteria are excluded.
+ *
+ * @param headers the Kafka message headers to evaluate
+ * @return true if the record should be included, false if it should be filtered out
+ */
+ private boolean evaluateInclusion(Headers headers)
+ {
+ // Permissive behavior: missing headers result in inclusion
+ if (headers == null) {
+ return true;
+ }
+
+ Header header = headers.lastHeader(headerName);
+
+ // Permissive behavior: header is null or empty
+ if (header == null || header.value() == null) {
+ return true;
+ }
+
+ String headerValue = getDecodedHeaderValue(header.value());
+ // Permissive behavior: failed to decode header value
+ if (headerValue == null) {
+ return true;
+ }
+
+ return filterHandler.shouldInclude(headerValue);
+ }
+
+
+ /**
+ * Decode header bytes to string with caching.
+ * Returns null if decoding fails.
+ */
+ @Nullable
+ private String getDecodedHeaderValue(byte[] headerBytes)
+ {
+ try {
+ ByteBuffer key = ByteBuffer.wrap(headerBytes);
+ return stringDecodingCache.get(key, k -> new String(headerBytes, encoding));
+ }
+ catch (Exception e) {
+ log.warn(e, "Failed to decode header bytes, treating as null");
+ return null;
+ }
+ }
+
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index f19ac81a85b1..6033ee09419e 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -118,8 +118,13 @@ protected KafkaRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox)
props.put("auto.offset.reset", "none");
final KafkaRecordSupplier recordSupplier =
- new KafkaRecordSupplier(props, configMapper, kafkaIndexTaskIOConfig.getConfigOverrides(),
- kafkaIndexTaskIOConfig.isMultiTopic());
+ new KafkaRecordSupplier(
+ props,
+ configMapper,
+ kafkaIndexTaskIOConfig.getConfigOverrides(),
+ kafkaIndexTaskIOConfig.isMultiTopic(),
+ kafkaIndexTaskIOConfig.getheaderBasedFilterConfig()
+ );
if (toolbox.getMonitorScheduler() != null) {
toolbox.getMonitorScheduler().addMonitor(recordSupplier.monitor());
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
index 07c0f80fbe83..63defc2a760d 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
@@ -24,6 +24,7 @@
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
+import org.apache.druid.indexing.kafka.supervisor.KafkaHeaderBasedFilterConfig;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
@@ -39,6 +40,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig consumerProperties;
private final long pollTimeout;
private final KafkaConfigOverrides configOverrides;
+ private final KafkaHeaderBasedFilterConfig headerBasedFilterConfig;
private final boolean multiTopic;
@@ -64,7 +66,8 @@ public KafkaIndexTaskIOConfig(
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("configOverrides") @Nullable KafkaConfigOverrides configOverrides,
@JsonProperty("multiTopic") @Nullable Boolean multiTopic,
- @JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes
+ @JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes,
+ @JsonProperty("headerBasedFilterConfig") @Nullable KafkaHeaderBasedFilterConfig headerBasedFilterConfig
)
{
super(
@@ -84,6 +87,7 @@ public KafkaIndexTaskIOConfig(
this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
this.pollTimeout = pollTimeout != null ? pollTimeout : KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS;
this.configOverrides = configOverrides;
+ this.headerBasedFilterConfig = headerBasedFilterConfig;
this.multiTopic = multiTopic != null ? multiTopic : KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC;
final SeekableStreamEndSequenceNumbers myEndSequenceNumbers = getEndSequenceNumbers();
@@ -110,7 +114,8 @@ public KafkaIndexTaskIOConfig(
DateTime maximumMessageTime,
InputFormat inputFormat,
KafkaConfigOverrides configOverrides,
- Long refreshRejectionPeriodsInMinutes
+ Long refreshRejectionPeriodsInMinutes,
+ KafkaHeaderBasedFilterConfig headerBasedFilterConfig
)
{
this(
@@ -128,7 +133,8 @@ public KafkaIndexTaskIOConfig(
inputFormat,
configOverrides,
KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC,
- refreshRejectionPeriodsInMinutes
+ refreshRejectionPeriodsInMinutes,
+ headerBasedFilterConfig
);
}
@@ -184,6 +190,14 @@ public boolean isMultiTopic()
return multiTopic;
}
+
+ @JsonProperty
+ @Nullable
+ public KafkaHeaderBasedFilterConfig getheaderBasedFilterConfig()
+ {
+ return headerBasedFilterConfig;
+ }
+
@Override
public String toString()
{
@@ -198,6 +212,8 @@ public String toString()
", minimumMessageTime=" + getMinimumMessageTime() +
", maximumMessageTime=" + getMaximumMessageTime() +
", configOverrides=" + getConfigOverrides() +
+ ", headerBasedFilterConfig=" + getheaderBasedFilterConfig() +
+ ", multiTopic=" + multiTopic +
'}';
}
}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java
index 6a364fc5f031..2d3120317f37 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java
@@ -26,6 +26,7 @@
import com.google.inject.Binder;
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
import org.apache.druid.data.input.kafkainput.KafkaInputFormat;
+import org.apache.druid.indexing.kafka.supervisor.KafkaHeaderBasedFilterConfig;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
import org.apache.druid.initialization.DruidModule;
@@ -51,7 +52,8 @@ public List extends Module> getJacksonModules()
new NamedType(KafkaSupervisorTuningConfig.class, SCHEME),
new NamedType(KafkaSupervisorSpec.class, SCHEME),
new NamedType(KafkaSamplerSpec.class, SCHEME),
- new NamedType(KafkaInputFormat.class, SCHEME)
+ new NamedType(KafkaInputFormat.class, SCHEME),
+ new NamedType(KafkaHeaderBasedFilterConfig.class, SCHEME)
)
.addKeySerializer(KafkaTopicPartition.class, new KafkaTopicPartition.KafkaTopicPartitionKeySerializer())
);
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index 30e99db7f409..4003b288be2e 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -27,6 +27,7 @@
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexing.kafka.supervisor.KafkaHeaderBasedFilterConfig;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
@@ -46,6 +47,8 @@
import org.apache.kafka.common.serialization.Deserializer;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
@@ -80,6 +83,9 @@ public class KafkaRecordSupplier implements RecordSupplier consumerProperties,
+ ObjectMapper sortingMapper,
+ KafkaConfigOverrides configOverrides,
+ boolean multiTopic,
+ @Nullable KafkaHeaderBasedFilterConfig headerBasedFilterConfig
+ )
+ {
+ this(getKafkaConsumer(sortingMapper, consumerProperties, configOverrides), multiTopic, headerBasedFilterConfig);
+ }
+
+ @VisibleForTesting
+ public KafkaRecordSupplier(KafkaConsumer consumer, boolean multiTopic)
+ {
+ this(consumer, multiTopic, (KafkaHeaderBasedFilterConfig) null);
}
@VisibleForTesting
public KafkaRecordSupplier(
KafkaConsumer consumer,
- boolean multiTopic
+ boolean multiTopic,
+ @Nullable KafkaHeaderBasedFilterConfig headerBasedFilterConfig
)
{
this.consumer = consumer;
this.multiTopic = multiTopic;
this.monitor = new KafkaConsumerMonitor(consumer);
+ this.headerFilterEvaluator = headerBasedFilterConfig != null ?
+ new KafkaHeaderBasedFilterEvaluator(headerBasedFilterConfig) : null;
}
@Override
@@ -172,16 +198,34 @@ public Set> getAssignment()
public List> poll(long timeout)
{
List> polledRecords = new ArrayList<>();
- for (ConsumerRecord record : consumer.poll(Duration.ofMillis(timeout))) {
- polledRecords.add(new OrderedPartitionableRecord<>(
- record.topic(),
- new KafkaTopicPartition(multiTopic, record.topic(), record.partition()),
- record.offset(),
- record.value() == null ? null : ImmutableList.of(new KafkaRecordEntity(record)),
- record.timestamp()
- ));
+ for (ConsumerRecord record : consumer.poll(Duration.ofMillis(timeout))) {
+ KafkaTopicPartition kafkaPartition = new KafkaTopicPartition(multiTopic, record.topic(), record.partition());
+
+ // Apply header filter if configured
+ if (headerFilterEvaluator != null && !headerFilterEvaluator.shouldIncludeRecord(record)) {
+ // Create filtered record for offset advancement with filtered=true flag
+ polledRecords.add(new OrderedPartitionableRecord<>(
+ record.topic(),
+ kafkaPartition,
+ record.offset(),
+ Collections.emptyList(), // Empty list for filtered records
+ record.timestamp(),
+ true // Mark as filtered
+ ));
+ } else {
+ // Create record for accepted records
+ polledRecords.add(new OrderedPartitionableRecord<>(
+ record.topic(),
+ kafkaPartition,
+ record.offset(),
+ record.value() == null ? null : ImmutableList.of(new KafkaRecordEntity(record)),
+ record.timestamp(),
+ false
+ ));
+ }
}
+
return polledRecords;
}
@@ -269,7 +313,7 @@ public Set getPartitionIds(String stream)
}
/**
- * Returns a Monitor that emits Kafka consumer metrics.
+ * Returns the Kafka consumer monitor.
*/
public Monitor monitor()
{
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaHeaderBasedFilterConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaHeaderBasedFilterConfig.java
new file mode 100644
index 000000000000..40b219b85c48
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaHeaderBasedFilterConfig.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka.supervisor;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.filter.InDimFilter;
+
+import javax.annotation.Nullable;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Kafka-specific implementation of header-based filtering.
+ * Allows filtering Kafka records based on message headers before deserialization.
+ */
+public class KafkaHeaderBasedFilterConfig
+{
+ private static final ImmutableSet> SUPPORTED_FILTER_TYPES = ImmutableSet.of(
+ InDimFilter.class
+ );
+
+ private final DimFilter filter;
+ private final String encoding;
+ private final int stringDecodingCacheSize;
+
+ @JsonCreator
+ public KafkaHeaderBasedFilterConfig(
+ @JsonProperty("filter") DimFilter filter,
+ @JsonProperty("encoding") @Nullable String encoding,
+ @JsonProperty("stringDecodingCacheSize") @Nullable Integer stringDecodingCacheSize
+ )
+ {
+ this.filter = Preconditions.checkNotNull(filter, "filter cannot be null");
+ this.encoding = encoding != null ? encoding : StandardCharsets.UTF_8.name();
+ this.stringDecodingCacheSize = stringDecodingCacheSize != null ? stringDecodingCacheSize : 10_000;
+
+ // Validate encoding
+ try {
+ Charset.forName(this.encoding);
+ }
+ catch (Exception e) {
+ throw new IllegalArgumentException("Invalid encoding: " + this.encoding, e);
+ }
+
+ // Validate that only supported filter types are used
+ validateSupportedFilter(this.filter);
+ }
+
+ /**
+ * Validates that the filter is one of the supported types.
+ * Only 'in' filters are supported for direct evaluation.
+ */
+ private void validateSupportedFilter(DimFilter dimFilter)
+ {
+ if (!SUPPORTED_FILTER_TYPES.contains(dimFilter.getClass())) {
+ throw InvalidInput.exception(
+ "Unsupported filter type [%s]. Only 'in' filters are supported for Kafka header filtering.",
+ dimFilter.getClass().getSimpleName()
+ );
+ }
+ }
+
+ @JsonProperty
+ public DimFilter getFilter()
+ {
+ return filter;
+ }
+
+ @JsonProperty
+ public String getEncoding()
+ {
+ return encoding;
+ }
+
+ @JsonProperty
+ public int getStringDecodingCacheSize()
+ {
+ return stringDecodingCacheSize;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ KafkaHeaderBasedFilterConfig that = (KafkaHeaderBasedFilterConfig) o;
+ return stringDecodingCacheSize == that.stringDecodingCacheSize &&
+ filter.equals(that.filter) &&
+ encoding.equals(that.encoding);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = filter.hashCode();
+ result = 31 * result + encoding.hashCode();
+ result = 31 * result + stringDecodingCacheSize;
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "KafkaheaderBasedFilterConfig{" +
+ "filter=" + filter +
+ ", encoding='" + encoding + '\'' +
+ ", stringDecodingCacheSize=" + stringDecodingCacheSize +
+ '}';
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index c501454bd355..6ae6b430a223 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -131,7 +131,8 @@ protected RecordSupplier setupReco
spec.getIoConfig().getConsumerProperties(),
sortingMapper,
spec.getIoConfig().getConfigOverrides(),
- spec.getIoConfig().isMultiTopic()
+ spec.getIoConfig().isMultiTopic(),
+ spec.getIoConfig().getheaderBasedFilterConfig()
);
}
@@ -220,7 +221,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
ioConfig.getInputFormat(),
kafkaIoConfig.getConfigOverrides(),
kafkaIoConfig.isMultiTopic(),
- ioConfig.getTaskDuration().getStandardMinutes()
+ ioConfig.getTaskDuration().getStandardMinutes(),
+ kafkaIoConfig.getheaderBasedFilterConfig()
);
}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index 4eac3163fe34..ff6e1301e7be 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -54,6 +54,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
private final String topic;
private final String topicPattern;
private final boolean emitTimeLagMetrics;
+ private final KafkaHeaderBasedFilterConfig headerBasedFilterConfig;
@JsonCreator
public KafkaSupervisorIOConfig(
@@ -75,6 +76,7 @@ public KafkaSupervisorIOConfig(
@JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod,
@JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime,
@JsonProperty("configOverrides") KafkaConfigOverrides configOverrides,
+ @JsonProperty("headerBasedFilterConfig") KafkaHeaderBasedFilterConfig headerBasedFilterConfig,
@JsonProperty("idleConfig") IdleConfig idleConfig,
@JsonProperty("stopTaskCount") Integer stopTaskCount,
@Nullable @JsonProperty("emitTimeLagMetrics") Boolean emitTimeLagMetrics
@@ -99,6 +101,7 @@ public KafkaSupervisorIOConfig(
stopTaskCount
);
+ this.headerBasedFilterConfig = headerBasedFilterConfig;
this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
Preconditions.checkNotNull(
consumerProperties.get(BOOTSTRAP_SERVERS_KEY),
@@ -167,6 +170,14 @@ public boolean isEmitTimeLagMetrics()
return emitTimeLagMetrics;
}
+ @JsonProperty
+ @Nullable
+ public KafkaHeaderBasedFilterConfig getheaderBasedFilterConfig()
+ {
+ return headerBasedFilterConfig;
+ }
+
+
@Override
public String toString()
{
@@ -187,6 +198,7 @@ public String toString()
", lateMessageRejectionPeriod=" + getLateMessageRejectionPeriod() +
", lateMessageRejectionStartDateTime=" + getLateMessageRejectionStartDateTime() +
", configOverrides=" + getConfigOverrides() +
+ ", headerBasedFilterConfig=" + headerBasedFilterConfig +
", idleConfig=" + getIdleConfig() +
", stopTaskCount=" + getStopTaskCount() +
'}';
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java
index edff7464d073..d38b2422a5c9 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java
@@ -28,6 +28,7 @@
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
@@ -363,6 +364,7 @@ public byte[] value()
while (iterator.hasNext()) {
final InputRow row = iterator.next();
+ final MapBasedInputRow mrow = (MapBasedInputRow) row;
// Payload verifications
// this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec
// but test reading them anyway since it isn't technically illegal
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/HeaderFilterHandlerTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/HeaderFilterHandlerTest.java
new file mode 100644
index 000000000000..2a62c0642450
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/HeaderFilterHandlerTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.filter.InDimFilter;
+import org.apache.druid.query.filter.TrueDimFilter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HeaderFilterHandlerTest
+{
+ @Test
+ public void testInDimFilterHandler()
+ {
+ // Create an InDimFilter for testing
+ InDimFilter filter = new InDimFilter("environment", ImmutableSet.of("production", "staging"));
+
+ // Create handler using our extensible factory
+ HeaderFilterHandler handler = HeaderFilterHandlerFactory.forFilter(filter);
+
+ // Verify it's the correct type
+ Assert.assertTrue("Handler should be InDimFilterHandler", handler instanceof InDimFilterHandler);
+
+ // Test header name extraction
+ Assert.assertEquals("environment", handler.getHeaderName());
+
+ // Test matching values
+ Assert.assertTrue("Production should be included", handler.shouldInclude("production"));
+ Assert.assertTrue("Staging should be included", handler.shouldInclude("staging"));
+
+ // Test non-matching values
+ Assert.assertFalse("Development should be excluded", handler.shouldInclude("development"));
+ Assert.assertFalse("Test should be excluded", handler.shouldInclude("test"));
+
+ // Test description
+ String description = handler.getDescription();
+ Assert.assertTrue("Description should contain filter type", description.contains("InDimFilter"));
+ Assert.assertTrue("Description should contain header name", description.contains("environment"));
+ Assert.assertTrue("Description should contain value count", description.contains("2"));
+ }
+
+ @Test
+ public void testUnsupportedFilterType()
+ {
+ // Create a mock filter that's not supported
+ Filter unsupportedFilter = TrueDimFilter.instance().toFilter();
+
+ // Should throw IllegalArgumentException
+ try {
+ HeaderFilterHandlerFactory.forFilter(unsupportedFilter);
+ Assert.fail("Should have thrown IllegalArgumentException for unsupported filter type");
+ }
+ catch (IllegalArgumentException e) {
+ Assert.assertTrue("Error message should mention unsupported type",
+ e.getMessage().contains("Unsupported filter type"));
+ Assert.assertTrue("Error message should mention True",
+ e.getMessage().contains("True"));
+ }
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaHeaderBasedFilterConfigEvaluatorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaHeaderBasedFilterConfigEvaluatorTest.java
new file mode 100644
index 000000000000..603372ca7cad
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaHeaderBasedFilterConfigEvaluatorTest.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import org.apache.druid.indexing.kafka.supervisor.KafkaHeaderBasedFilterConfig;
+import org.apache.druid.math.expr.ExpressionProcessing;
+import org.apache.druid.query.filter.InDimFilter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+
+public class KafkaHeaderBasedFilterConfigEvaluatorTest
+{
+ private KafkaHeaderBasedFilterEvaluator evaluator;
+ private ConsumerRecord record;
+
+ @BeforeClass
+ public static void setUpStatic()
+ {
+ ExpressionProcessing.initializeForTests();
+ }
+
+ @Before
+ public void setUp()
+ {
+ // Create a test record with headers
+ RecordHeaders headers = new RecordHeaders();
+ headers.add(new RecordHeader("environment", "production".getBytes(StandardCharsets.UTF_8)));
+ headers.add(new RecordHeader("service", "user-service".getBytes(StandardCharsets.UTF_8)));
+ headers.add(new RecordHeader("version", "1.0".getBytes(StandardCharsets.UTF_8)));
+
+ record = new ConsumerRecord<>(
+ "test-topic",
+ 0,
+ 100L,
+ "test-key".getBytes(StandardCharsets.UTF_8),
+ "test-value".getBytes(StandardCharsets.UTF_8)
+ );
+
+ try {
+ // Use reflection to set headers since ConsumerRecord doesn't have a public setter
+ Field headersField = ConsumerRecord.class.getDeclaredField("headers");
+ headersField.setAccessible(true);
+ headersField.set(record, headers);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to set headers on test record", e);
+ }
+ }
+
+ @Test
+ public void testInFilterSingleValueMatch()
+ {
+ InDimFilter filter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(new KafkaHeaderBasedFilterConfig(filter, null, null));
+
+ Assert.assertTrue(evaluator.shouldIncludeRecord(record));
+ }
+
+ @Test
+ public void testInFilterSingleValueNoMatch()
+ {
+ InDimFilter filter = new InDimFilter("environment", Collections.singletonList("staging"), null);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(new KafkaHeaderBasedFilterConfig(filter, null, null));
+
+ Assert.assertFalse(evaluator.shouldIncludeRecord(record));
+ }
+
+ @Test
+ public void testInFilterMultipleValuesMatch()
+ {
+ InDimFilter filter = new InDimFilter("environment", Arrays.asList("staging", "production", "development"), null);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(new KafkaHeaderBasedFilterConfig(filter, null, null));
+
+ Assert.assertTrue(evaluator.shouldIncludeRecord(record));
+ }
+
+ @Test
+ public void testInFilterMultipleValuesNoMatch()
+ {
+ InDimFilter filter = new InDimFilter("environment", Arrays.asList("staging", "development"), null);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(new KafkaHeaderBasedFilterConfig(filter, null, null));
+
+ Assert.assertFalse(evaluator.shouldIncludeRecord(record));
+ }
+
+ @Test
+ public void testInFilterMissingHeader()
+ {
+ InDimFilter filter = new InDimFilter("missing-header", Collections.singletonList("value"), null);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(new KafkaHeaderBasedFilterConfig(filter, null, null));
+
+ // With permissive filtering, missing headers should result in inclusion
+ Assert.assertTrue("InDimFilter with missing header should include record (permissive)", evaluator.shouldIncludeRecord(record));
+ }
+
+ @Test
+ public void testInFilterNullValue()
+ {
+ // Create record with null header value
+ RecordHeaders headers = new RecordHeaders();
+ headers.add(new RecordHeader("null-header", null));
+
+ ConsumerRecord nullRecord = new ConsumerRecord<>(
+ "test-topic",
+ 0,
+ 100L,
+ "test-key".getBytes(StandardCharsets.UTF_8),
+ "test-value".getBytes(StandardCharsets.UTF_8)
+ );
+
+ try {
+ Field headersField = ConsumerRecord.class.getDeclaredField("headers");
+ headersField.setAccessible(true);
+ headersField.set(nullRecord, headers);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to set headers on test record", e);
+ }
+
+ InDimFilter filter = new InDimFilter("null-header", Collections.singletonList("value"), null);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(new KafkaHeaderBasedFilterConfig(filter, null, null));
+
+ Assert.assertTrue(evaluator.shouldIncludeRecord(nullRecord));
+ }
+
+ @Test
+ public void testInFilterWithDifferentServices()
+ {
+ InDimFilter filter = new InDimFilter("service", Arrays.asList("user-service", "payment-service"), null);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(new KafkaHeaderBasedFilterConfig(filter, null, null));
+
+ Assert.assertTrue(evaluator.shouldIncludeRecord(record)); // matches "user-service"
+ }
+
+ @Test
+ public void testInFilterWithDifferentServicesNoMatch()
+ {
+ InDimFilter filter = new InDimFilter("service", Arrays.asList("payment-service", "notification-service"), null);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(new KafkaHeaderBasedFilterConfig(filter, null, null));
+
+ Assert.assertFalse(evaluator.shouldIncludeRecord(record)); // doesn't match "user-service"
+ }
+
+ @Test
+ public void testRepeatedEvaluations()
+ {
+ InDimFilter filter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(new KafkaHeaderBasedFilterConfig(filter, null, null));
+
+ // Test multiple evaluations to verify consistent behavior
+ boolean result1 = evaluator.shouldIncludeRecord(record); // should match
+ boolean result2 = evaluator.shouldIncludeRecord(record); // should match
+
+ Assert.assertTrue("First evaluation should match", result1);
+ Assert.assertTrue("Second evaluation should match", result2);
+ Assert.assertEquals("Results should be consistent", result1, result2);
+ }
+
+ @Test
+ public void testDifferentEncodings()
+ {
+ // Test with ISO-8859-1 encoding
+ String testValue = "café"; // Contains non-ASCII characters
+
+ RecordHeaders headers = new RecordHeaders();
+ headers.add(new RecordHeader("text", testValue.getBytes(StandardCharsets.ISO_8859_1)));
+
+ ConsumerRecord encodedRecord = new ConsumerRecord<>(
+ "test-topic",
+ 0,
+ 100L,
+ "test-key".getBytes(StandardCharsets.UTF_8),
+ "test-value".getBytes(StandardCharsets.UTF_8)
+ );
+
+ try {
+ Field headersField = ConsumerRecord.class.getDeclaredField("headers");
+ headersField.setAccessible(true);
+ headersField.set(encodedRecord, headers);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to set headers on test record", e);
+ }
+
+ InDimFilter filter = new InDimFilter("text", Collections.singletonList(testValue), null);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(new KafkaHeaderBasedFilterConfig(filter, "ISO-8859-1", null));
+
+ Assert.assertTrue(evaluator.shouldIncludeRecord(encodedRecord));
+ }
+
+ @Test
+ public void testNullHeaderValue()
+ {
+ // Create record without the header we're filtering on
+ RecordHeaders headers = new RecordHeaders();
+ headers.add(new RecordHeader("other-header", "other-value".getBytes(StandardCharsets.UTF_8)));
+
+ ConsumerRecord noHeaderRecord = new ConsumerRecord<>(
+ "test-topic",
+ 0,
+ 100L,
+ "test-key".getBytes(StandardCharsets.UTF_8),
+ "test-value".getBytes(StandardCharsets.UTF_8)
+ );
+
+ try {
+ Field headersField = ConsumerRecord.class.getDeclaredField("headers");
+ headersField.setAccessible(true);
+ headersField.set(noHeaderRecord, headers);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to set headers on test record", e);
+ }
+
+ InDimFilter filter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(new KafkaHeaderBasedFilterConfig(filter, null, null));
+
+ // Missing header should result in inclusion (permissive behavior)
+ Assert.assertTrue(evaluator.shouldIncludeRecord(noHeaderRecord));
+ }
+
+ @Test
+ public void testMultipleHeadersWithSameKey()
+ {
+ // Create record with multiple headers with the same key (Kafka allows this)
+ RecordHeaders headers = new RecordHeaders();
+ headers.add(new RecordHeader("environment", "staging".getBytes(StandardCharsets.UTF_8)));
+ headers.add(new RecordHeader("environment", "production".getBytes(StandardCharsets.UTF_8))); // Last one wins
+
+ ConsumerRecord multiHeaderRecord = new ConsumerRecord<>(
+ "test-topic",
+ 0,
+ 100L,
+ "test-key".getBytes(StandardCharsets.UTF_8),
+ "test-value".getBytes(StandardCharsets.UTF_8)
+ );
+
+ try {
+ Field headersField = ConsumerRecord.class.getDeclaredField("headers");
+ headersField.setAccessible(true);
+ headersField.set(multiHeaderRecord, headers);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to set headers on test record", e);
+ }
+
+ // Filter should match "production" (the last value), not "staging" (the first value)
+ InDimFilter prodFilter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(new KafkaHeaderBasedFilterConfig(prodFilter, null, null));
+ Assert.assertTrue("Should match last header value 'production'", evaluator.shouldIncludeRecord(multiHeaderRecord));
+
+ // Filter should NOT match "staging" (the first value)
+ InDimFilter stagingFilter = new InDimFilter("environment", Collections.singletonList("staging"), null);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(new KafkaHeaderBasedFilterConfig(stagingFilter, null, null));
+ Assert.assertFalse("Should not match first header value 'staging'", evaluator.shouldIncludeRecord(multiHeaderRecord));
+ }
+
+ @Test
+ public void testStringDecodingCacheSize()
+ {
+ InDimFilter filter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(new KafkaHeaderBasedFilterConfig(filter, null, 50_000));
+
+ // Test that the evaluator works with custom cache size
+ Assert.assertTrue(evaluator.shouldIncludeRecord(record));
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaHeaderBasedFilterConfigIntegrationTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaHeaderBasedFilterConfigIntegrationTest.java
new file mode 100644
index 000000000000..f2d90304387b
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaHeaderBasedFilterConfigIntegrationTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.indexing.kafka.supervisor.KafkaHeaderBasedFilterConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.math.expr.ExpressionProcessing;
+import org.apache.druid.query.filter.InDimFilter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+
+public class KafkaHeaderBasedFilterConfigIntegrationTest
+{
+ private KafkaHeaderBasedFilterEvaluator evaluator;
+ private final ObjectMapper objectMapper = new DefaultObjectMapper();
+
+ @BeforeClass
+ public static void setUpStatic()
+ {
+ ExpressionProcessing.initializeForTests();
+ }
+
+ @Before
+ public void setUp()
+ {
+ // Will be initialized in each test
+ evaluator = null;
+ }
+
+ private ConsumerRecord createRecord(String topic, int partition, long offset, RecordHeaders headers)
+ {
+ ConsumerRecord record = new ConsumerRecord<>(
+ topic,
+ partition,
+ offset,
+ "test-key".getBytes(StandardCharsets.UTF_8),
+ "test-value".getBytes(StandardCharsets.UTF_8)
+ );
+
+ try {
+ Field headersField = ConsumerRecord.class.getDeclaredField("headers");
+ headersField.setAccessible(true);
+ headersField.set(record, headers);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to set headers on test record", e);
+ }
+
+ return record;
+ }
+
+ private RecordHeaders headers(String... keyValuePairs)
+ {
+ RecordHeaders headers = new RecordHeaders();
+ for (int i = 0; i < keyValuePairs.length; i += 2) {
+ headers.add(new RecordHeader(keyValuePairs[i], keyValuePairs[i + 1].getBytes(StandardCharsets.UTF_8)));
+ }
+ return headers;
+ }
+
+ @Test
+ public void testProductionEnvironmentFiltering()
+ {
+ // Test Case: Only include records from production environment
+ InDimFilter filter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ KafkaHeaderBasedFilterConfig headerFilter = new KafkaHeaderBasedFilterConfig(filter, null, null);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(headerFilter);
+
+ // Production record - should be included
+ ConsumerRecord prodRecord = createRecord(
+ "events",
+ 0,
+ 100L,
+ headers("environment", "production", "service", "user-service")
+ );
+ Assert.assertTrue("Production record should be included", evaluator.shouldIncludeRecord(prodRecord));
+
+ // Staging record - should be excluded
+ ConsumerRecord stagingRecord = createRecord(
+ "events",
+ 0,
+ 101L,
+ headers("environment", "staging", "service", "user-service")
+ );
+ Assert.assertFalse("Staging record should be excluded", evaluator.shouldIncludeRecord(stagingRecord));
+
+ // Record without environment header - should be included (permissive)
+ ConsumerRecord noEnvRecord = createRecord(
+ "events",
+ 0,
+ 102L,
+ headers("service", "user-service")
+ );
+ Assert.assertTrue("Record without environment header should be included", evaluator.shouldIncludeRecord(noEnvRecord));
+ }
+
+ @Test
+ public void testMultiServiceFiltering()
+ {
+ // Test Case: Include records from multiple services
+ InDimFilter filter = new InDimFilter("service", Arrays.asList("user-service", "payment-service"), null);
+ KafkaHeaderBasedFilterConfig headerFilter = new KafkaHeaderBasedFilterConfig(filter, null, null);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(headerFilter);
+
+ // User service record - should be included
+ ConsumerRecord userRecord = createRecord(
+ "events",
+ 0,
+ 100L,
+ headers("environment", "production", "service", "user-service")
+ );
+ Assert.assertTrue("User service record should be included", evaluator.shouldIncludeRecord(userRecord));
+
+ // Payment service record - should be included
+ ConsumerRecord paymentRecord = createRecord(
+ "events",
+ 0,
+ 101L,
+ headers("environment", "production", "service", "payment-service")
+ );
+ Assert.assertTrue("Payment service record should be included", evaluator.shouldIncludeRecord(paymentRecord));
+
+ // Notification service record - should be excluded
+ ConsumerRecord notificationRecord = createRecord(
+ "events",
+ 0,
+ 102L,
+ headers("environment", "production", "service", "notification-service")
+ );
+ Assert.assertFalse("Notification service record should be excluded", evaluator.shouldIncludeRecord(notificationRecord));
+ }
+
+ @Test
+ public void testHighThroughputFiltering()
+ {
+ // Test Case: Filter for high-throughput scenarios with many values
+ InDimFilter filter = new InDimFilter(
+ "metric.name",
+ Arrays.asList(
+ "io.kafka.server/delayed_share_fetch/expires/total/delta",
+ "io.kafka.server/request_bytes/total/delta",
+ "io.kafka.server/response_bytes/total/delta"
+ ),
+ null
+ );
+ KafkaHeaderBasedFilterConfig headerFilter = new KafkaHeaderBasedFilterConfig(filter, null, null);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(headerFilter);
+
+ // Matching metric - should be included
+ ConsumerRecord matchingRecord = createRecord(
+ "telemetry.metrics.cloud.stag",
+ 0,
+ 100L,
+ headers("metric.name", "io.kafka.server/delayed_share_fetch/expires/total/delta")
+ );
+ Assert.assertTrue("Matching metric should be included", evaluator.shouldIncludeRecord(matchingRecord));
+
+ // Non-matching metric - should be excluded
+ ConsumerRecord nonMatchingRecord = createRecord(
+ "telemetry.metrics.cloud.stag",
+ 0,
+ 101L,
+ headers("metric.name", "some.other.metric")
+ );
+ Assert.assertFalse("Non-matching metric should be excluded", evaluator.shouldIncludeRecord(nonMatchingRecord));
+ }
+
+ @Test
+ public void testFilteringBehavior()
+ {
+ // Test Case: Verify basic filtering behavior
+ InDimFilter filter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ KafkaHeaderBasedFilterConfig headerFilter = new KafkaHeaderBasedFilterConfig(filter, null, null);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(headerFilter);
+
+ // Process multiple records to verify filtering logic
+ ConsumerRecord record1 = createRecord("events", 0, 100L, headers("environment", "production"));
+ ConsumerRecord record2 = createRecord("events", 0, 101L, headers("environment", "staging"));
+ ConsumerRecord record3 = createRecord("events", 0, 102L, headers("environment", "production"));
+
+ // Verify filtering results
+ Assert.assertTrue("Production record should be included", evaluator.shouldIncludeRecord(record1));
+ Assert.assertFalse("Staging record should be excluded", evaluator.shouldIncludeRecord(record2));
+ Assert.assertTrue("Production record should be included", evaluator.shouldIncludeRecord(record3));
+ }
+
+ @Test
+ public void testConfigurationSerialization() throws Exception
+ {
+ // Test that header filter configurations can be serialized/deserialized correctly
+ InDimFilter filter = new InDimFilter("environment", Arrays.asList("production", "staging"), null);
+ KafkaHeaderBasedFilterConfig originalFilter = new KafkaHeaderBasedFilterConfig(filter, "UTF-16", null);
+
+ // Serialize to JSON
+ String json = objectMapper.writeValueAsString(originalFilter);
+
+ // Deserialize back
+ KafkaHeaderBasedFilterConfig deserializedFilter = objectMapper.readValue(json, KafkaHeaderBasedFilterConfig.class);
+
+ // Verify they're equivalent
+ Assert.assertEquals(originalFilter.getFilter(), deserializedFilter.getFilter());
+ Assert.assertEquals(originalFilter.getEncoding(), deserializedFilter.getEncoding());
+ Assert.assertEquals(originalFilter.getStringDecodingCacheSize(), deserializedFilter.getStringDecodingCacheSize());
+
+ // Test that the deserialized filter works
+ evaluator = new KafkaHeaderBasedFilterEvaluator(deserializedFilter);
+ ConsumerRecord record = createRecord("events", 0, 100L, headers("environment", "production"));
+ Assert.assertFalse("Deserialized filter should work", evaluator.shouldIncludeRecord(record));
+ }
+
+ @Test
+ public void testEncodingHandling()
+ {
+ // Test different character encodings
+ String testValue = "café"; // Contains non-ASCII characters
+
+ InDimFilter filter = new InDimFilter("text", Collections.singletonList(testValue), null);
+ KafkaHeaderBasedFilterConfig headerFilter = new KafkaHeaderBasedFilterConfig(filter, "ISO-8859-1", null);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(headerFilter);
+
+ // Create record with ISO-8859-1 encoded header
+ RecordHeaders headers = new RecordHeaders();
+ headers.add(new RecordHeader("text", testValue.getBytes(StandardCharsets.ISO_8859_1)));
+
+ ConsumerRecord record = createRecord("events", 0, 100L, headers);
+
+ Assert.assertTrue("Should handle different encodings correctly", evaluator.shouldIncludeRecord(record));
+ }
+
+ @Test
+ public void testCustomCacheSize()
+ {
+ // Test with custom cache size
+ InDimFilter filter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ KafkaHeaderBasedFilterConfig headerFilter = new KafkaHeaderBasedFilterConfig(filter, null, 100_000);
+ evaluator = new KafkaHeaderBasedFilterEvaluator(headerFilter);
+
+ ConsumerRecord record = createRecord("events", 0, 100L, headers("environment", "production"));
+ Assert.assertTrue("Should work with custom cache size", evaluator.shouldIncludeRecord(record));
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index f736d01bacbd..7db7fb2ddc46 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -359,7 +359,8 @@ public void testRunAfterDataInserted() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
Assert.assertTrue(task.supportsQueries());
@@ -416,7 +417,8 @@ public void testIngestNullColumnAfterDataInserted() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
final ListenableFuture future = runTask(task);
@@ -465,7 +467,8 @@ public void testIngestNullColumnAfterDataInserted_storeEmptyColumnsOff_shouldNot
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false);
@@ -501,7 +504,8 @@ public void testRunAfterDataInsertedWithLegacyParser() throws Exception
null,
null,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -543,7 +547,8 @@ public void testRunBeforeDataInserted() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -595,7 +600,8 @@ public void testRunAfterDataInsertedLiveReport() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
final ListenableFuture future = runTask(task);
@@ -673,7 +679,8 @@ public void testIncrementalHandOff() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
final ListenableFuture future = runTask(task);
@@ -777,7 +784,8 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
final ListenableFuture future = runTask(task);
@@ -904,7 +912,8 @@ public void testTimeBasedIncrementalHandOff() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
final ListenableFuture future = runTask(task);
@@ -983,7 +992,8 @@ public void testCheckpointResetWithSameEndOffsets() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
final ListenableFuture future = runTask(task);
@@ -1047,7 +1057,8 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
final KafkaIndexTask staleReplica = createTask(
@@ -1064,7 +1075,8 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -1131,7 +1143,8 @@ public void testRunWithMinimumMessageTime() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -1180,7 +1193,8 @@ public void testRunWithMaximumMessageTime() throws Exception
DateTimes.of("2010"),
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -1238,7 +1252,8 @@ public void testRunWithTransformSpec() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -1310,7 +1325,8 @@ public void testKafkaRecordEntityInputFormat() throws Exception
null,
new TestKafkaInputFormat(INPUT_FORMAT),
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
Assert.assertTrue(task.supportsQueries());
@@ -1383,7 +1399,8 @@ public void testKafkaInputFormat() throws Exception
null,
KAFKA_INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
Assert.assertTrue(task.supportsQueries());
@@ -1435,7 +1452,8 @@ public void testRunOnNothing() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -1471,7 +1489,8 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -1518,7 +1537,8 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -1570,7 +1590,8 @@ public void testReportParseExceptions() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -1609,7 +1630,8 @@ public void testMultipleParseExceptionsSuccess() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -1702,7 +1724,8 @@ public void testMultipleParseExceptionsFailure() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -1769,7 +1792,8 @@ public void testRunReplicas() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
final KafkaIndexTask task2 = createTask(
@@ -1786,7 +1810,8 @@ public void testRunReplicas() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -1835,7 +1860,8 @@ public void testRunConflicting() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
final KafkaIndexTask task2 = createTask(
@@ -1852,7 +1878,8 @@ public void testRunConflicting() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -1903,7 +1930,8 @@ public void testRunConflictingWithoutTransactions() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
final KafkaIndexTask task2 = createTask(
@@ -1920,7 +1948,8 @@ public void testRunConflictingWithoutTransactions() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -1969,7 +1998,8 @@ public void testRunOneTaskTwoPartitions() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -2016,7 +2046,8 @@ public void testRunTwoTasksTwoPartitions() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
final KafkaIndexTask task2 = createTask(
@@ -2033,7 +2064,8 @@ public void testRunTwoTasksTwoPartitions() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -2084,7 +2116,8 @@ public void testRestore() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -2119,7 +2152,8 @@ public void testRestore() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -2170,7 +2204,8 @@ public void testRestoreAfterPersistingSequences() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -2214,7 +2249,8 @@ public void testRestoreAfterPersistingSequences() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -2266,7 +2302,8 @@ public void testRunWithPauseAndResume() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -2342,7 +2379,8 @@ public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -2380,7 +2418,8 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -2428,7 +2467,8 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
),
context
);
@@ -2473,7 +2513,8 @@ public void testRunWithDuplicateRequest() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -2513,7 +2554,8 @@ public void testRunTransactionModeRollback() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -2595,7 +2637,8 @@ public void testRunUnTransactionMode() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -2656,7 +2699,8 @@ public void testCanStartFromLaterThanEarliestOffset() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
final ListenableFuture future = runTask(task);
@@ -2680,7 +2724,8 @@ public void testRunWithoutDataInserted() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -2727,7 +2772,8 @@ public void testSerde() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -2760,7 +2806,8 @@ public void testCorrectInputSources() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -3022,7 +3069,8 @@ public void testMultipleLinesJSONText() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -3084,7 +3132,8 @@ public void testParseExceptionsInIteratorConstructionSuccess() throws Exception
null,
new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT),
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -3157,7 +3206,8 @@ public void testNoParseExceptionsTaskSucceeds() throws Exception
null,
new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT),
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -3232,7 +3282,8 @@ public void testParseExceptionsBeyondThresholdTaskFails() throws Exception
null,
new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT),
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -3285,7 +3336,8 @@ public void testCompletionReportPartitionStats() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
@@ -3340,7 +3392,8 @@ public void testCompletionReportMultiplePartitionStats() throws Exception
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
)
);
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierHeaderFilterTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierHeaderFilterTest.java
new file mode 100644
index 000000000000..174590ee5d84
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierHeaderFilterTest.java
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import org.apache.druid.data.input.kafka.KafkaRecordEntity;
+import org.apache.druid.data.input.kafka.KafkaTopicPartition;
+import org.apache.druid.indexing.kafka.supervisor.KafkaHeaderBasedFilterConfig;
+import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.math.expr.ExpressionProcessing;
+import org.apache.druid.query.filter.InDimFilter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test KafkaRecordSupplier with header-based filtering integrated into the main poll() method.
+ */
+public class KafkaRecordSupplierHeaderFilterTest
+{
+ private KafkaConsumer mockConsumer;
+
+ private KafkaRecordSupplier recordSupplier;
+
+ @BeforeClass
+ public static void setUpClass()
+ {
+ ExpressionProcessing.initializeForTests();
+ }
+
+ @Before
+ public void setUp()
+ {
+ mockConsumer = EasyMock.createMock(KafkaConsumer.class);
+ }
+
+ @Test
+ public void testNoHeaderFilter()
+ {
+ // Test that records are not filtered when no header filter is configured
+ recordSupplier = new KafkaRecordSupplier(mockConsumer, false, null);
+
+ ConsumerRecord record1 = createRecord("topic", 0, 100L,
+ headers("environment", "production"));
+ ConsumerRecord record2 = createRecord("topic", 0, 101L,
+ headers("environment", "staging"));
+
+ EasyMock.expect(mockConsumer.poll(EasyMock.anyObject(Duration.class)))
+ .andReturn(createConsumerRecords(Arrays.asList(record1, record2)));
+ EasyMock.replay(mockConsumer);
+
+ List> results =
+ recordSupplier.poll(1000);
+
+ Assert.assertEquals("Should include all records when no filter", 2, results.size());
+ Assert.assertEquals(100L, (long) results.get(0).getSequenceNumber());
+ Assert.assertEquals(101L, (long) results.get(1).getSequenceNumber());
+ EasyMock.verify(mockConsumer);
+ }
+
+ @Test
+ public void testInHeaderFilterSingleValue()
+ {
+ // Test filtering with in filter (single value)
+ InDimFilter filter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ KafkaHeaderBasedFilterConfig headerFilter = new KafkaHeaderBasedFilterConfig(filter, null, null);
+
+ recordSupplier = new KafkaRecordSupplier(mockConsumer, false, headerFilter);
+
+ ConsumerRecord prodRecord = createRecord("topic", 0, 100L,
+ headers("environment", "production"));
+ ConsumerRecord stagingRecord = createRecord("topic", 0, 101L,
+ headers("environment", "staging"));
+ ConsumerRecord noHeaderRecord = createRecord("topic", 0, 102L,
+ new RecordHeaders()); // No headers
+
+ EasyMock.expect(mockConsumer.poll(EasyMock.anyObject(Duration.class)))
+ .andReturn(createConsumerRecords(Arrays.asList(prodRecord, stagingRecord, noHeaderRecord)));
+ EasyMock.replay(mockConsumer);
+
+ List> results =
+ recordSupplier.poll(1000);
+
+ Assert.assertEquals("Should return all records (accepted + filtered markers)", 3, results.size());
+
+ // First record: production (accepted - has data)
+ Assert.assertNotNull("Production record should have data", results.get(0).getData());
+ Assert.assertFalse("Production record should have data", results.get(0).getData().isEmpty());
+ Assert.assertEquals(100L, (long) results.get(0).getSequenceNumber());
+
+ // Second record: staging (filtered - empty data for offset advancement)
+ Assert.assertTrue("Staging record should have empty data", results.get(1).getData().isEmpty());
+ Assert.assertEquals(101L, (long) results.get(1).getSequenceNumber());
+
+ // Third record: no-header (accepted - has data, permissive behavior)
+ Assert.assertNotNull("No-header record should have data", results.get(2).getData());
+ Assert.assertFalse("No-header record should have data", results.get(2).getData().isEmpty());
+ Assert.assertEquals(102L, (long) results.get(2).getSequenceNumber());
+ EasyMock.verify(mockConsumer);
+ }
+
+ @Test
+ public void testFilteredFlagTracking()
+ {
+ // Test that filtered records are properly marked with filtered flag
+ InDimFilter filter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ KafkaHeaderBasedFilterConfig headerFilter = new KafkaHeaderBasedFilterConfig(filter, null, null);
+
+ recordSupplier = new KafkaRecordSupplier(mockConsumer, false, headerFilter);
+
+ ConsumerRecord prodRecord = createRecord("topic", 0, 100L,
+ headers("environment", "production"));
+ ConsumerRecord stagingRecord = createRecord("topic", 0, 101L,
+ headers("environment", "staging"));
+
+ EasyMock.expect(mockConsumer.poll(EasyMock.anyObject(Duration.class)))
+ .andReturn(createConsumerRecords(Arrays.asList(prodRecord, stagingRecord)));
+ EasyMock.replay(mockConsumer);
+
+ List> results =
+ recordSupplier.poll(1000);
+
+ // Verify records returned
+ Assert.assertEquals("Should return 2 records (accepted + filtered)", 2, results.size());
+
+ // Verify filtered flags
+ Assert.assertFalse("Production record should not be filtered", results.get(0).isFiltered());
+ Assert.assertTrue("Staging record should be filtered", results.get(1).isFiltered());
+
+ // Verify data presence
+ Assert.assertFalse("Production record should have data", results.get(0).getData().isEmpty());
+ Assert.assertTrue("Filtered record should have empty data", results.get(1).getData().isEmpty());
+
+ EasyMock.verify(mockConsumer);
+ }
+
+ @Test
+ public void testInHeaderFilterMultipleValues()
+ {
+ // Test filtering with in filter (multiple values)
+ InDimFilter filter = new InDimFilter("service", Arrays.asList("user-service", "payment-service"), null);
+ KafkaHeaderBasedFilterConfig headerFilter = new KafkaHeaderBasedFilterConfig(filter, null, null);
+
+ recordSupplier = new KafkaRecordSupplier(mockConsumer, false, headerFilter);
+
+ ConsumerRecord userServiceRecord = createRecord("topic", 0, 100L,
+ headers("service", "user-service"));
+ ConsumerRecord paymentServiceRecord = createRecord("topic", 0, 101L,
+ headers("service", "payment-service"));
+ ConsumerRecord orderServiceRecord = createRecord("topic", 0, 102L,
+ headers("service", "order-service"));
+
+ EasyMock.expect(mockConsumer.poll(EasyMock.anyObject(Duration.class)))
+ .andReturn(createConsumerRecords(Arrays.asList(userServiceRecord, paymentServiceRecord, orderServiceRecord)));
+ EasyMock.replay(mockConsumer);
+
+ List> results =
+ recordSupplier.poll(1000);
+
+ Assert.assertEquals("Should return all records (accepted + filtered markers)", 3, results.size());
+
+ // First record: user-service (accepted - has data)
+ Assert.assertNotNull("User-service record should have data", results.get(0).getData());
+ Assert.assertFalse("User-service record should have data", results.get(0).getData().isEmpty());
+ Assert.assertEquals(100L, (long) results.get(0).getSequenceNumber());
+
+ // Second record: payment-service (accepted - has data)
+ Assert.assertNotNull("Payment-service record should have data", results.get(1).getData());
+ Assert.assertFalse("Payment-service record should have data", results.get(1).getData().isEmpty());
+ Assert.assertEquals(101L, (long) results.get(1).getSequenceNumber());
+
+ // Third record: order-service (filtered - empty data for offset advancement marker)
+ Assert.assertTrue("Order-service record should have empty data", results.get(2).getData().isEmpty());
+ Assert.assertEquals(102L, (long) results.get(2).getSequenceNumber());
+ EasyMock.verify(mockConsumer);
+ }
+
+ @Test
+ public void testInFilterWithMultipleHeaders()
+ {
+ // Test InDimFilter with multiple possible values
+ InDimFilter serviceFilter = new InDimFilter("service", Arrays.asList("user-service", "payment-service"), null);
+ KafkaHeaderBasedFilterConfig headerFilter = new KafkaHeaderBasedFilterConfig(serviceFilter, null, null);
+
+ recordSupplier = new KafkaRecordSupplier(mockConsumer, false, headerFilter);
+
+ ConsumerRecord userServiceRecord = createRecord("topic", 0, 100L,
+ headers("service", "user-service"));
+ ConsumerRecord paymentServiceRecord = createRecord("topic", 0, 101L,
+ headers("service", "payment-service"));
+ ConsumerRecord orderServiceRecord = createRecord("topic", 0, 102L,
+ headers("service", "order-service"));
+
+ EasyMock.expect(mockConsumer.poll(EasyMock.anyObject(Duration.class)))
+ .andReturn(createConsumerRecords(Arrays.asList(userServiceRecord, paymentServiceRecord, orderServiceRecord)));
+ EasyMock.replay(mockConsumer);
+
+ List> results =
+ recordSupplier.poll(1000);
+
+ Assert.assertEquals("Should return all records (accepted + filtered markers)", 3, results.size());
+
+ // First record: user-service (accepted - has data)
+ Assert.assertNotNull("User-service record should have data", results.get(0).getData());
+ Assert.assertFalse("User-service record should have data", results.get(0).getData().isEmpty());
+ Assert.assertEquals(100L, (long) results.get(0).getSequenceNumber());
+
+ // Second record: payment-service (accepted - has data)
+ Assert.assertNotNull("Payment-service record should have data", results.get(1).getData());
+ Assert.assertFalse("Payment-service record should have data", results.get(1).getData().isEmpty());
+ Assert.assertEquals(101L, (long) results.get(1).getSequenceNumber());
+
+ // Third record: order-service (filtered - empty data for offset advancement marker)
+ Assert.assertTrue("Order-service record should have empty data", results.get(2).getData().isEmpty());
+ Assert.assertEquals(102L, (long) results.get(2).getSequenceNumber());
+ EasyMock.verify(mockConsumer);
+ }
+
+ @Test
+ public void testMultiplePolls()
+ {
+ // Test that statistics accumulate across multiple polls
+ InDimFilter filter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ KafkaHeaderBasedFilterConfig headerFilter = new KafkaHeaderBasedFilterConfig(filter, null, null);
+
+ recordSupplier = new KafkaRecordSupplier(mockConsumer, false, headerFilter);
+
+ // First poll
+ ConsumerRecord prodRecord1 = createRecord("topic", 0, 100L,
+ headers("environment", "production"));
+ ConsumerRecord stagingRecord1 = createRecord("topic", 0, 101L,
+ headers("environment", "staging"));
+
+ // Second poll
+ ConsumerRecord prodRecord2 = createRecord("topic", 0, 102L,
+ headers("environment", "production"));
+ ConsumerRecord stagingRecord2 = createRecord("topic", 0, 103L,
+ headers("environment", "staging"));
+
+ EasyMock.expect(mockConsumer.poll(EasyMock.anyObject(Duration.class)))
+ .andReturn(createConsumerRecords(Arrays.asList(prodRecord1, stagingRecord1)));
+ EasyMock.expect(mockConsumer.poll(EasyMock.anyObject(Duration.class)))
+ .andReturn(createConsumerRecords(Arrays.asList(prodRecord2, stagingRecord2)));
+ EasyMock.replay(mockConsumer);
+
+ List> results1 =
+ recordSupplier.poll(1000);
+
+ Assert.assertEquals("First poll should return 2 records (accepted + filtered marker)", 2, results1.size());
+ Assert.assertNotNull("Production record should have data", results1.get(0).getData());
+ Assert.assertFalse("Production record should have data", results1.get(0).getData().isEmpty());
+ Assert.assertTrue("Staging record should have empty data", results1.get(1).getData().isEmpty());
+
+ List> results2 =
+ recordSupplier.poll(1000);
+
+ Assert.assertEquals("Second poll should return 2 records (accepted + filtered marker)", 2, results2.size());
+ Assert.assertNotNull("Production record should have data", results2.get(0).getData());
+ Assert.assertFalse("Production record should have data", results2.get(0).getData().isEmpty());
+ Assert.assertTrue("Staging record should have empty data", results2.get(1).getData().isEmpty());
+ EasyMock.verify(mockConsumer);
+ }
+
+ @Test
+ public void testEmptyPoll()
+ {
+ // Test that empty polls don't affect statistics
+ InDimFilter filter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ KafkaHeaderBasedFilterConfig headerFilter = new KafkaHeaderBasedFilterConfig(filter, null, null);
+
+ recordSupplier = new KafkaRecordSupplier(mockConsumer, false, headerFilter);
+
+ EasyMock.expect(mockConsumer.poll(EasyMock.anyObject(Duration.class)))
+ .andReturn(createConsumerRecords(Collections.emptyList()));
+ EasyMock.replay(mockConsumer);
+
+ List> results =
+ recordSupplier.poll(1000);
+
+ Assert.assertEquals("Empty poll should return empty list", 0, results.size());
+ EasyMock.verify(mockConsumer);
+ }
+
+ @Test
+ public void testAllRecordsFilteredStillAdvanceOffsets()
+ {
+ // CRITICAL TEST: Verify that when ALL records are filtered out, we still return
+ // filtered record markers to prevent infinite loop
+ InDimFilter filter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ KafkaHeaderBasedFilterConfig headerFilter = new KafkaHeaderBasedFilterConfig(filter, null, null);
+
+ recordSupplier = new KafkaRecordSupplier(mockConsumer, false, headerFilter);
+
+ // All records have "staging" environment - none should pass the "production" filter
+ ConsumerRecord stagingRecord1 = createRecord("topic", 0, 100L,
+ headers("environment", "staging"));
+ ConsumerRecord stagingRecord2 = createRecord("topic", 0, 101L,
+ headers("environment", "staging"));
+ ConsumerRecord stagingRecord3 = createRecord("topic", 0, 102L,
+ headers("environment", "staging"));
+
+ EasyMock.expect(mockConsumer.poll(EasyMock.anyObject(Duration.class)))
+ .andReturn(createConsumerRecords(Arrays.asList(stagingRecord1, stagingRecord2, stagingRecord3)));
+ EasyMock.replay(mockConsumer);
+
+ List> results =
+ recordSupplier.poll(1000);
+
+ // CRITICAL: Even though all records were filtered, we should still get record markers
+ // to advance offsets and prevent infinite loop
+ Assert.assertEquals("Should return filtered record markers for offset advancement", 3, results.size());
+
+ // Verify that all returned records have filtered record markers
+ for (OrderedPartitionableRecord result : results) {
+ Assert.assertTrue("Filtered record should have empty data", result.getData().isEmpty());
+ }
+
+ // Verify offsets are correct
+ Assert.assertEquals(100L, (long) results.get(0).getSequenceNumber());
+ Assert.assertEquals(101L, (long) results.get(1).getSequenceNumber());
+ Assert.assertEquals(102L, (long) results.get(2).getSequenceNumber());
+
+ EasyMock.verify(mockConsumer);
+ }
+
+ @Test
+ public void testMixedFilteredAndAcceptedRecords()
+ {
+ // Test that mix of filtered and accepted records works correctly
+ InDimFilter filter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ KafkaHeaderBasedFilterConfig headerFilter = new KafkaHeaderBasedFilterConfig(filter, null, null);
+
+ recordSupplier = new KafkaRecordSupplier(mockConsumer, false, headerFilter);
+
+ ConsumerRecord prodRecord = createRecord("topic", 0, 100L,
+ headers("environment", "production"));
+ ConsumerRecord stagingRecord = createRecord("topic", 0, 101L,
+ headers("environment", "staging"));
+ ConsumerRecord prodRecord2 = createRecord("topic", 0, 102L,
+ headers("environment", "production"));
+
+ EasyMock.expect(mockConsumer.poll(EasyMock.anyObject(Duration.class)))
+ .andReturn(createConsumerRecords(Arrays.asList(prodRecord, stagingRecord, prodRecord2)));
+ EasyMock.replay(mockConsumer);
+
+ List> results =
+ recordSupplier.poll(1000);
+
+ Assert.assertEquals("Should return all records (accepted + filtered markers)", 3, results.size());
+
+ // First record: accepted (has data)
+ Assert.assertNotNull("Accepted record should have data", results.get(0).getData());
+ Assert.assertEquals(100L, (long) results.get(0).getSequenceNumber());
+
+ // Second record: filtered (empty data for offset advancement marker)
+ Assert.assertTrue("Filtered record should have empty data", results.get(1).getData().isEmpty());
+ Assert.assertEquals(101L, (long) results.get(1).getSequenceNumber());
+
+ // Third record: accepted (has data)
+ Assert.assertNotNull("Accepted record should have data", results.get(2).getData());
+ Assert.assertEquals(102L, (long) results.get(2).getSequenceNumber());
+
+ EasyMock.verify(mockConsumer);
+ }
+
+ @Test
+ public void testMultiTopic()
+ {
+ // Test header filtering with multi-topic configuration
+ InDimFilter filter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ KafkaHeaderBasedFilterConfig headerFilter = new KafkaHeaderBasedFilterConfig(filter, null, null);
+
+ recordSupplier = new KafkaRecordSupplier(mockConsumer, true, headerFilter); // multiTopic = true
+
+ ConsumerRecord topic1Record = createRecord("topic1", 0, 100L,
+ headers("environment", "production"));
+ ConsumerRecord topic2Record = createRecord("topic2", 0, 101L,
+ headers("environment", "staging"));
+
+ EasyMock.expect(mockConsumer.poll(EasyMock.anyObject(Duration.class)))
+ .andReturn(createConsumerRecords(Arrays.asList(topic1Record, topic2Record)));
+ EasyMock.replay(mockConsumer);
+
+ List> results =
+ recordSupplier.poll(1000);
+
+ Assert.assertEquals("Should return both records (accepted + filtered marker)", 2, results.size());
+
+ // First record: accepted
+ Assert.assertNotNull("Production record should have data", results.get(0).getData());
+ Assert.assertEquals("topic1", results.get(0).getStream());
+ Assert.assertTrue("Should be multi-topic partition",
+ results.get(0).getPartitionId().isMultiTopicPartition());
+
+ // Second record: filtered marker
+ Assert.assertTrue("Staging record should have empty data", results.get(1).getData().isEmpty());
+ Assert.assertEquals("topic2", results.get(1).getStream());
+
+ EasyMock.verify(mockConsumer);
+ }
+
+ // Helper methods
+
+ private ConsumerRecord createRecord(String topic, int partition, long offset, RecordHeaders headers)
+ {
+ ConsumerRecord record = new ConsumerRecord<>(
+ topic,
+ partition,
+ offset,
+ "test-key".getBytes(StandardCharsets.UTF_8),
+ "test-value".getBytes(StandardCharsets.UTF_8)
+ );
+
+ // Set headers using reflection since ConsumerRecord headers are final
+ try {
+ Field headersField = ConsumerRecord.class.getDeclaredField("headers");
+ headersField.setAccessible(true);
+ headersField.set(record, headers);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to set headers on test record", e);
+ }
+
+ return record;
+ }
+
+ private RecordHeaders headers(String... keyValuePairs)
+ {
+ if (keyValuePairs.length % 2 != 0) {
+ throw new IllegalArgumentException("Key-value pairs must be even number of arguments");
+ }
+
+ RecordHeaders headers = new RecordHeaders();
+ for (int i = 0; i < keyValuePairs.length; i += 2) {
+ String key = keyValuePairs[i];
+ String value = keyValuePairs[i + 1];
+ headers.add(new RecordHeader(key, value.getBytes(StandardCharsets.UTF_8)));
+ }
+ return headers;
+ }
+
+ private ConsumerRecords createConsumerRecords(List> records)
+ {
+ Map>> recordsMap = new HashMap<>();
+ for (ConsumerRecord record : records) {
+ TopicPartition tp = new TopicPartition(record.topic(), record.partition());
+ recordsMap.computeIfAbsent(tp, k -> new ArrayList<>()).add(record);
+ }
+ return new ConsumerRecords<>(recordsMap);
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index 5e21fd0aa7f8..dacaac2832a2 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -168,6 +168,7 @@ public void testSample()
null,
null,
null,
+ null,
false
),
null,
@@ -224,6 +225,7 @@ public void testSampleWithTopicPattern()
null,
null,
null,
+ null,
false
),
null,
@@ -289,6 +291,7 @@ public void testSampleKafkaInputFormat()
null,
null,
null,
+ null,
false
),
null,
@@ -396,6 +399,7 @@ public void testWithInputRowParser() throws IOException
null,
null,
null,
+ null,
false
),
null,
@@ -583,6 +587,7 @@ public void testInvalidKafkaConfig()
null,
null,
null,
+ null,
false
),
null,
@@ -642,6 +647,7 @@ public void testGetInputSourceResources()
null,
null,
null,
+ null,
false
),
null,
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
index b46b2b5bbd66..5cc2ccab79c3 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
@@ -147,7 +147,7 @@ private KafkaSupervisorSpec createKafkaSupervisor(String supervisorId, String to
kafkaServer.consumerProperties(),
null, null, null, null, null,
true,
- null, null, null, null, null, null, null, null
+ null, null, null, null, null, null, null, null, null
),
null, null, null, null, null, null, null, null, null, null, null
);
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaHeaderBasedFilterConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaHeaderBasedFilterConfigTest.java
new file mode 100644
index 000000000000..5d031af629a6
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaHeaderBasedFilterConfigTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka.supervisor;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.math.expr.ExpressionProcessing;
+import org.apache.druid.query.filter.AndDimFilter;
+import org.apache.druid.query.filter.InDimFilter;
+import org.apache.druid.query.filter.NotDimFilter;
+import org.apache.druid.query.filter.SelectorDimFilter;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+public class KafkaHeaderBasedFilterConfigTest
+{
+ private final ObjectMapper objectMapper = new DefaultObjectMapper();
+
+ @BeforeClass
+ public static void setUpStatic()
+ {
+ ExpressionProcessing.initializeForTests();
+ }
+
+ @Test
+ public void testInFilterSingleValue()
+ {
+ InDimFilter dimFilter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ KafkaHeaderBasedFilterConfig filter = new KafkaHeaderBasedFilterConfig(dimFilter, null, null);
+
+ Assert.assertEquals(dimFilter, filter.getFilter());
+ Assert.assertEquals("UTF-8", filter.getEncoding());
+ Assert.assertEquals(10_000, filter.getStringDecodingCacheSize());
+ }
+
+ @Test
+ public void testInFilterMultipleValues()
+ {
+ InDimFilter dimFilter = new InDimFilter("service", Arrays.asList("user-service", "payment-service"), null);
+ KafkaHeaderBasedFilterConfig filter = new KafkaHeaderBasedFilterConfig(dimFilter, "ISO-8859-1", null);
+
+ Assert.assertEquals(dimFilter, filter.getFilter());
+ Assert.assertEquals("ISO-8859-1", filter.getEncoding());
+ Assert.assertEquals(10_000, filter.getStringDecodingCacheSize());
+ }
+
+ @Test
+ public void testInFilterWithCustomCacheSize()
+ {
+ InDimFilter dimFilter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ KafkaHeaderBasedFilterConfig filter = new KafkaHeaderBasedFilterConfig(dimFilter, null, 50_000);
+
+ Assert.assertEquals(dimFilter, filter.getFilter());
+ Assert.assertEquals("UTF-8", filter.getEncoding());
+ Assert.assertEquals(50_000, filter.getStringDecodingCacheSize());
+ }
+
+ @Test
+ public void testSelectorFilterRejected()
+ {
+ SelectorDimFilter dimFilter = new SelectorDimFilter("environment", "production", null);
+ try {
+ new KafkaHeaderBasedFilterConfig(dimFilter, null, null);
+ Assert.fail("Expected DruidException for SelectorDimFilter");
+ }
+ catch (DruidException e) {
+ Assert.assertTrue("Should mention unsupported filter type", e.getMessage().contains("Unsupported filter type"));
+ Assert.assertTrue("Should mention SelectorDimFilter", e.getMessage().contains("SelectorDimFilter"));
+ }
+ }
+
+ @Test
+ public void testAndFilterRejected()
+ {
+ SelectorDimFilter envFilter = new SelectorDimFilter("environment", "production", null);
+ SelectorDimFilter serviceFilter = new SelectorDimFilter("service", "user-service", null);
+ AndDimFilter andFilter = new AndDimFilter(Arrays.asList(envFilter, serviceFilter));
+ try {
+ new KafkaHeaderBasedFilterConfig(andFilter, null, null);
+ Assert.fail("Expected DruidException for AndDimFilter");
+ }
+ catch (DruidException e) {
+ Assert.assertTrue("Should mention unsupported filter type", e.getMessage().contains("Unsupported filter type"));
+ Assert.assertTrue("Should mention AndDimFilter", e.getMessage().contains("AndDimFilter"));
+ }
+ }
+
+ @Test
+ public void testNotFilterRejected()
+ {
+ SelectorDimFilter debugFilter = new SelectorDimFilter("debug-mode", "true", null);
+ NotDimFilter notFilter = new NotDimFilter(debugFilter);
+ try {
+ new KafkaHeaderBasedFilterConfig(notFilter, null, null);
+ Assert.fail("Expected DruidException for NotDimFilter");
+ }
+ catch (DruidException e) {
+ Assert.assertTrue("Should mention unsupported filter type", e.getMessage().contains("Unsupported filter type"));
+ Assert.assertTrue("Should mention NotDimFilter", e.getMessage().contains("NotDimFilter"));
+ }
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testNullFilter()
+ {
+ new KafkaHeaderBasedFilterConfig(null, null, null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidEncoding()
+ {
+ InDimFilter dimFilter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ new KafkaHeaderBasedFilterConfig(dimFilter, "INVALID-ENCODING", null);
+ }
+
+ @Test
+ public void testSerialization() throws Exception
+ {
+ InDimFilter dimFilter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ KafkaHeaderBasedFilterConfig originalFilter = new KafkaHeaderBasedFilterConfig(dimFilter, "UTF-16", null);
+
+ // Serialize to JSON
+ String json = objectMapper.writeValueAsString(originalFilter);
+
+ // Deserialize back
+ KafkaHeaderBasedFilterConfig deserializedFilter = objectMapper.readValue(json, KafkaHeaderBasedFilterConfig.class);
+
+ Assert.assertEquals(originalFilter.getFilter(), deserializedFilter.getFilter());
+ Assert.assertEquals(originalFilter.getEncoding(), deserializedFilter.getEncoding());
+ Assert.assertEquals(originalFilter.getStringDecodingCacheSize(), deserializedFilter.getStringDecodingCacheSize());
+ }
+
+ @Test
+ public void testEquals()
+ {
+ InDimFilter dimFilter1 = new InDimFilter("environment", Collections.singletonList("production"), null);
+ InDimFilter dimFilter2 = new InDimFilter("environment", Collections.singletonList("production"), null);
+ InDimFilter dimFilter3 = new InDimFilter("environment", Collections.singletonList("staging"), null);
+
+ KafkaHeaderBasedFilterConfig filter1 = new KafkaHeaderBasedFilterConfig(dimFilter1, "UTF-8", null);
+ KafkaHeaderBasedFilterConfig filter2 = new KafkaHeaderBasedFilterConfig(dimFilter2, "UTF-8", null);
+ KafkaHeaderBasedFilterConfig filter3 = new KafkaHeaderBasedFilterConfig(dimFilter3, "UTF-8", null);
+ KafkaHeaderBasedFilterConfig filter4 = new KafkaHeaderBasedFilterConfig(dimFilter1, "UTF-16", null);
+ KafkaHeaderBasedFilterConfig filter5 = new KafkaHeaderBasedFilterConfig(dimFilter1, "UTF-8", 5000);
+
+ Assert.assertEquals(filter1, filter2);
+ Assert.assertNotEquals(filter1, filter3);
+ Assert.assertNotEquals(filter1, filter4);
+ Assert.assertNotEquals(filter1, filter5); // Different cache size
+ Assert.assertNotEquals(filter1, null);
+ Assert.assertNotEquals(filter1, "string");
+ }
+
+ @Test
+ public void testToString()
+ {
+ InDimFilter dimFilter = new InDimFilter("environment", Collections.singletonList("production"), null);
+ KafkaHeaderBasedFilterConfig filter = new KafkaHeaderBasedFilterConfig(dimFilter, "UTF-8", null);
+
+ String toString = filter.toString();
+ Assert.assertTrue(toString.contains("KafkaheaderBasedFilterConfig"));
+ Assert.assertTrue(toString.contains("filter="));
+ Assert.assertTrue(toString.contains("encoding='UTF-8'"));
+ Assert.assertTrue(toString.contains("stringDecodingCacheSize=10000"));
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 2da46aaf0e2b..d0554f44c26f 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -340,6 +340,7 @@ public void testAutoScalingConfigSerde() throws JsonProcessingException
null,
null,
null,
+ null,
false
);
String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
@@ -375,6 +376,7 @@ public void testAutoScalingConfigSerde() throws JsonProcessingException
null,
null,
null,
+ null,
false
);
Assert.assertEquals(5, kafkaSupervisorIOConfig.getTaskCount().intValue());
@@ -425,6 +427,7 @@ public void testIdleConfigSerde() throws JsonProcessingException
null,
null,
null,
+ null,
mapper.convertValue(idleConfig, IdleConfig.class),
null,
false
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
index d7bb9acfb7f6..92343233918c 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
@@ -702,6 +702,7 @@ public void test_validateSpecUpdateTo()
null,
null,
null,
+ null,
false
),
Map.of(
@@ -752,6 +753,7 @@ private KafkaSupervisorSpec getSpec(String topic, String topicPattern)
null,
null,
null,
+ null,
false
),
null,
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index bf07a2dec644..78de2aa59e8f 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -327,6 +327,7 @@ public SeekableStreamIndexTaskClient build(
null,
null,
null,
+ null,
new IdleConfig(true, 1000L),
1,
false
@@ -509,7 +510,8 @@ public void testGetTaskRunnerType() throws JsonProcessingException
null,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
),
new KafkaIndexTaskTuningConfig(
null,
@@ -5343,6 +5345,7 @@ private TestableKafkaSupervisor getTestableSupervisor(
earlyMessageRejectionPeriod,
null,
null,
+ null,
idleConfig,
null,
true
@@ -5462,6 +5465,7 @@ private TestableKafkaSupervisor getTestableSupervisorCustomIsTaskCurrent(
null,
null,
null,
+ null,
false
);
@@ -5582,6 +5586,7 @@ private KafkaSupervisor createSupervisor(
null,
null,
null,
+ null,
false
);
@@ -5728,7 +5733,8 @@ private KafkaIndexTask createKafkaIndexTask(
maximumMessageTime,
INPUT_FORMAT,
null,
- Duration.standardHours(2).getStandardMinutes()
+ Duration.standardHours(2).getStandardMinutes(),
+ null
),
Collections.emptyMap(),
OBJECT_MAPPER
diff --git a/extensions-core/kinesis-indexing-service/pom.xml b/extensions-core/kinesis-indexing-service/pom.xml
index 316d8df3ed22..6c17c7e7f54b 100644
--- a/extensions-core/kinesis-indexing-service/pom.xml
+++ b/extensions-core/kinesis-indexing-service/pom.xml
@@ -29,7 +29,7 @@
org.apache.druid
druid
- 34.0.0-SNAPSHOT
+ 34.0.0
../../pom.xml
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 98dfea4333cc..ff9cbc8aab8c 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -1201,7 +1201,8 @@ public void testMultipleParseExceptionsSuccess() throws Exception
RowIngestionMeters.PROCESSED_BYTES, 763,
RowIngestionMeters.PROCESSED_WITH_ERROR, 3,
RowIngestionMeters.UNPARSEABLE, 4,
- RowIngestionMeters.THROWN_AWAY, 0
+ RowIngestionMeters.THROWN_AWAY, 0,
+ RowIngestionMeters.FILTERED, 0
)
);
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
@@ -1291,7 +1292,8 @@ public void testMultipleParseExceptionsFailure() throws Exception
RowIngestionMeters.PROCESSED_BYTES, (int) totalBytes,
RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
RowIngestionMeters.UNPARSEABLE, 3,
- RowIngestionMeters.THROWN_AWAY, 0
+ RowIngestionMeters.THROWN_AWAY, 0,
+ RowIngestionMeters.FILTERED, 0
)
);
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
diff --git a/extensions-core/kubernetes-extensions/pom.xml b/extensions-core/kubernetes-extensions/pom.xml
index 0e6b7f32f88e..9e9ddfd45886 100644
--- a/extensions-core/kubernetes-extensions/pom.xml
+++ b/extensions-core/kubernetes-extensions/pom.xml
@@ -18,8 +18,7 @@
~ under the License.
-->
-
+
4.0.0
org.apache.druid.extensions
@@ -30,7 +29,7 @@
org.apache.druid
druid
- 34.0.0-SNAPSHOT
+ 34.0.0
../../pom.xml
diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java
index 03a1b6914a85..a3f0a06fbe57 100644
--- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java
+++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java
@@ -36,6 +36,9 @@
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.logger.Logger;
+import org.joda.time.Duration;
+
+import javax.annotation.Nullable;
import java.io.IOException;
import java.net.SocketTimeoutException;
@@ -76,7 +79,8 @@ public void patchPod(String podName, String podNamespace, String jsonPatchStr)
public DiscoveryDruidNodeList listPods(
String podNamespace,
String labelSelector,
- NodeRole nodeRole
+ NodeRole nodeRole,
+ @Nullable Duration terminatingStateCheckDuration
)
{
try {
@@ -85,6 +89,26 @@ public DiscoveryDruidNodeList listPods(
Map allNodes = new HashMap();
for (V1Pod podDef : podList.getItems()) {
+ // irrespective of the grace period, we skip the pod if it has been in termination for more than terminatingStateCheckDuration
+ if (podDef.getMetadata() != null && podDef.getMetadata().getDeletionTimestamp() != null) {
+ long deletionTimestamp = podDef.getMetadata().getDeletionTimestamp().toInstant().toEpochMilli();
+ long currentTimestamp = System.currentTimeMillis();
+ long terminationGracePeriod = podDef.getSpec().getTerminationGracePeriodSeconds() != null ?
+ podDef.getSpec().getTerminationGracePeriodSeconds() * 1000L : 30 * 1000L; // Default to 30s if graceperiod is not set
+ long checkDuration = terminatingStateCheckDuration != null ?
+ terminatingStateCheckDuration.getMillis() : 30 * 1000L; // Default to 30s if not specified
+
+ if (currentTimestamp - deletionTimestamp + terminationGracePeriod > checkDuration) {
+ LOGGER.info(
+ "Skipping pod %s/%s from discovery as it has been in termination for more than grace period + %d seconds",
+ podDef.getMetadata().getNamespace(),
+ podDef.getMetadata().getName(),
+ checkDuration / 1000
+ );
+ continue;
+ }
+ }
+
DiscoveryDruidNode node = getDiscoveryDruidNodeFromPodDef(nodeRole, podDef);
allNodes.put(node.getDruidNode().getHostAndPortToUse(), node);
}
diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sApiClient.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sApiClient.java
index 1e61677420c8..4f4efebdbbb8 100644
--- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sApiClient.java
+++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sApiClient.java
@@ -20,6 +20,9 @@
package org.apache.druid.k8s.discovery;
import org.apache.druid.discovery.NodeRole;
+import org.joda.time.Duration;
+
+import javax.annotation.Nullable;
/**
* Interface to abstract pod read/update with K8S API Server to allow unit tests with mock impl.
@@ -28,7 +31,7 @@ public interface K8sApiClient
{
void patchPod(String podName, String namespace, String jsonPatchStr);
- DiscoveryDruidNodeList listPods(String namespace, String labelSelector, NodeRole nodeRole);
+ DiscoveryDruidNodeList listPods(String namespace, String labelSelector, NodeRole nodeRole, @Nullable Duration terminatingStateCheckDuration);
/**
* @return NULL if history not available or else return the {@link WatchResult} object
diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDiscoveryConfig.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDiscoveryConfig.java
index 998b8641c83a..ab9302a5c420 100644
--- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDiscoveryConfig.java
+++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDiscoveryConfig.java
@@ -60,6 +60,12 @@ public class K8sDiscoveryConfig
@JsonProperty
private final Duration retryPeriod;
+ @JsonProperty
+ private final Duration terminatingStateCheckDuration;
+
+ @JsonProperty
+ private final Duration periodicListInterval;
+
@JsonCreator
public K8sDiscoveryConfig(
@JsonProperty("clusterIdentifier") String clusterIdentifier,
@@ -69,7 +75,9 @@ public K8sDiscoveryConfig(
@JsonProperty("overlordLeaderElectionConfigMapNamespace") String overlordLeaderElectionConfigMapNamespace,
@JsonProperty("leaseDuration") Duration leaseDuration,
@JsonProperty("renewDeadline") Duration renewDeadline,
- @JsonProperty("retryPeriod") Duration retryPeriod
+ @JsonProperty("retryPeriod") Duration retryPeriod,
+ @JsonProperty("terminatingStateCheckDuration") Duration terminatingStateCheckDuration,
+ @JsonProperty("periodicListInterval") Duration periodicListInterval
)
{
Preconditions.checkArgument(clusterIdentifier != null && !clusterIdentifier.isEmpty(), "null/empty clusterIdentifier");
@@ -97,6 +105,8 @@ public K8sDiscoveryConfig(
this.leaseDuration = leaseDuration == null ? Duration.millis(60000) : leaseDuration;
this.renewDeadline = renewDeadline == null ? Duration.millis(17000) : renewDeadline;
this.retryPeriod = retryPeriod == null ? Duration.millis(5000) : retryPeriod;
+ this.terminatingStateCheckDuration = terminatingStateCheckDuration == null ? Duration.standardSeconds(30) : terminatingStateCheckDuration;
+ this.periodicListInterval = periodicListInterval == null ? Duration.standardMinutes(1) : periodicListInterval;
}
@JsonProperty
@@ -147,6 +157,18 @@ public Duration getRetryPeriod()
return retryPeriod;
}
+ @JsonProperty
+ public Duration getTerminatingStateCheckDuration()
+ {
+ return terminatingStateCheckDuration;
+ }
+
+ @JsonProperty
+ public Duration getPeriodicListInterval()
+ {
+ return periodicListInterval;
+ }
+
@Override
public String toString()
{
@@ -159,6 +181,8 @@ public String toString()
", leaseDuration=" + leaseDuration +
", renewDeadline=" + renewDeadline +
", retryPeriod=" + retryPeriod +
+ ", terminatingStateCheckDuration=" + terminatingStateCheckDuration +
+ ", periodicListInterval=" + periodicListInterval +
'}';
}
@@ -185,7 +209,9 @@ public boolean equals(Object o)
) &&
Objects.equals(leaseDuration, that.leaseDuration) &&
Objects.equals(renewDeadline, that.renewDeadline) &&
- Objects.equals(retryPeriod, that.retryPeriod);
+ Objects.equals(retryPeriod, that.retryPeriod) &&
+ Objects.equals(terminatingStateCheckDuration, that.terminatingStateCheckDuration) &&
+ Objects.equals(periodicListInterval, that.periodicListInterval);
}
@Override
@@ -199,7 +225,9 @@ public int hashCode()
overlordLeaderElectionConfigMapNamespace,
leaseDuration,
renewDeadline,
- retryPeriod
+ retryPeriod,
+ terminatingStateCheckDuration,
+ periodicListInterval
);
}
}
diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
index d2472a9fde4d..8d5dd9221d5c 100644
--- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
+++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
@@ -97,7 +97,7 @@ public BooleanSupplier getForNode(DruidNode node, NodeRole nodeRole)
return () -> k8sApiClient.listPods(
podInfo.getPodNamespace(),
K8sDruidNodeAnnouncer.getLabelSelectorForNode(discoveryConfig, nodeRole, node),
- nodeRole
+ nodeRole, discoveryConfig.getTerminatingStateCheckDuration()
).getDruidNodes().containsKey(node.getHostAndPortToUse());
}
@@ -223,9 +223,32 @@ private void watch()
return;
}
+ // Create a scheduled executor for periodic listing
+ ScheduledExecutorService periodicListExecutor = Execs.scheduledSingleThreaded(
+ "K8sDruidNodeDiscoveryProvider-PeriodicList-" + nodeRole.getJsonName()
+ );
+
+ // Schedule periodic listing every minute
+ periodicListExecutor.scheduleAtFixedRate(() -> {
+ try {
+ if (lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
+ LOGGER.info("Performing periodic pod listing for NodeRole [%s]", nodeRole);
+ DiscoveryDruidNodeList list = k8sApiClient.listPods(
+ podInfo.getPodNamespace(),
+ labelSelector,
+ nodeRole, discoveryConfig.getTerminatingStateCheckDuration()
+ );
+ baseNodeRoleWatcher.resetNodes(list.getDruidNodes());
+ }
+ }
+ catch (Throwable ex) {
+ LOGGER.error(ex, "Error during periodic pod listing for NodeRole [%s]", nodeRole);
+ }
+ }, 120000, discoveryConfig.getPeriodicListInterval().getMillis(), TimeUnit.MILLISECONDS);
+
while (lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
try {
- DiscoveryDruidNodeList list = k8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, nodeRole);
+ DiscoveryDruidNodeList list = k8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, nodeRole, discoveryConfig.getTerminatingStateCheckDuration());
baseNodeRoleWatcher.resetNodes(list.getDruidNodes());
if (!cacheInitialized) {
@@ -246,6 +269,7 @@ private void watch()
}
}
+ periodicListExecutor.shutdownNow();
LOGGER.info("Exited Watch for role[%s].", nodeRole);
}
diff --git a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sAnnouncerAndDiscoveryIntTest.java b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sAnnouncerAndDiscoveryIntTest.java
index e7752d757b73..9b8c8f96c890 100644
--- a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sAnnouncerAndDiscoveryIntTest.java
+++ b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sAnnouncerAndDiscoveryIntTest.java
@@ -52,7 +52,7 @@ public class K8sAnnouncerAndDiscoveryIntTest
private final PodInfo podInfo = new PodInfo("busybox", "default");
- private final K8sDiscoveryConfig discoveryConfig = new K8sDiscoveryConfig("druid-cluster", null, null, null, null, null, null, null);
+ private final K8sDiscoveryConfig discoveryConfig = new K8sDiscoveryConfig("druid-cluster", null, null, null, null, null, null, null, null, null);
@Test(timeout = 30000L)
public void testAnnouncementAndDiscoveryWorkflow() throws Exception
diff --git a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDiscoveryConfigTest.java b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDiscoveryConfigTest.java
index b76ae4b62462..41ef2bc37408 100644
--- a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDiscoveryConfigTest.java
+++ b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDiscoveryConfigTest.java
@@ -34,7 +34,7 @@ public void testDefaultValuesSerde() throws Exception
{
testSerde(
"{\"clusterIdentifier\": \"test-cluster\"}\n",
- new K8sDiscoveryConfig("test-cluster", null, null, null, null, null, null, null)
+ new K8sDiscoveryConfig("test-cluster", null, null, null, null, null, null, null, null, null)
);
}
@@ -50,8 +50,10 @@ public void testCustomizedValuesSerde() throws Exception
+ " \"overlordLeaderElectionConfigMapNamespace\": \"overlordns\",\n"
+ " \"leaseDuration\": \"PT3S\",\n"
+ " \"renewDeadline\": \"PT2S\",\n"
- + " \"retryPeriod\": \"PT1S\"\n"
- + "}\n",
+ + " \"retryPeriod\": \"PT1S\",\n"
+ + " \"terminatingStateCheckDuration\": \"PT30S\",\n"
+ + " \"periodicListInterval\": \"PT20S\"\n"
+ + "\n}",
new K8sDiscoveryConfig(
"test-cluster",
"PODNAMETEST",
@@ -60,7 +62,9 @@ public void testCustomizedValuesSerde() throws Exception
"overlordns",
Duration.millis(3000),
Duration.millis(2000),
- Duration.millis(1000)
+ Duration.millis(1000),
+ Duration.millis(30000),
+ Duration.millis(20000)
)
);
}
diff --git a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidLeaderElectionIntTest.java b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidLeaderElectionIntTest.java
index 168c0625cda3..7e19c5efd968 100644
--- a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidLeaderElectionIntTest.java
+++ b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidLeaderElectionIntTest.java
@@ -54,7 +54,7 @@ public class K8sDruidLeaderElectionIntTest
);
private final K8sDiscoveryConfig discoveryConfig = new K8sDiscoveryConfig("druid-cluster", null, null, "default", "default",
- Duration.millis(10_000), Duration.millis(7_000), Duration.millis(3_000));
+ Duration.millis(10_000), Duration.millis(7_000), Duration.millis(3_000), null, null);
private final ApiClient k8sApiClient;
diff --git a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidLeaderSelectorTest.java b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidLeaderSelectorTest.java
index a500502524fb..96f66b7ffbd3 100644
--- a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidLeaderSelectorTest.java
+++ b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidLeaderSelectorTest.java
@@ -38,7 +38,7 @@ public class K8sDruidLeaderSelectorTest
);
private final K8sDiscoveryConfig discoveryConfig = new K8sDiscoveryConfig("druid-cluster", null, null,
- "default", "default", Duration.millis(10_000), Duration.millis(7_000), Duration.millis(3_000));
+ "default", "default", Duration.millis(10_000), Duration.millis(7_000), Duration.millis(3_000), null, null);
private final String lockResourceName = "druid-leader-election";
diff --git a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeAnnouncerTest.java b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeAnnouncerTest.java
index fc0304009f73..30c4373ea423 100644
--- a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeAnnouncerTest.java
+++ b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeAnnouncerTest.java
@@ -47,7 +47,7 @@ public class K8sDruidNodeAnnouncerTest
private final PodInfo podInfo = new PodInfo("testpod", "testns");
- private final K8sDiscoveryConfig discoveryConfig = new K8sDiscoveryConfig("druid-cluster", null, null, null, null, null, null, null);
+ private final K8sDiscoveryConfig discoveryConfig = new K8sDiscoveryConfig("druid-cluster", null, null, null, null, null, null, null, null, null);
@Test
public void testAnnounce() throws Exception
diff --git a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
index 12a3a6118da8..0eb5dfa28158 100644
--- a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
+++ b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
@@ -30,6 +30,7 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.DruidNode;
import org.easymock.EasyMock;
+import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;
@@ -76,14 +77,14 @@ public class K8sDruidNodeDiscoveryProviderTest
private final PodInfo podInfo = new PodInfo("testpod", "testns");
- private final K8sDiscoveryConfig discoveryConfig = new K8sDiscoveryConfig("druid-cluster", null, null, null, null, null, null, null);
+ private final K8sDiscoveryConfig discoveryConfig = new K8sDiscoveryConfig("druid-cluster", null, null, null, null, null, null, null, null, null);
@Test(timeout = 60_000)
public void testGetForNodeRole() throws Exception
{
String labelSelector = "druidDiscoveryAnnouncement-cluster-identifier=druid-cluster,druidDiscoveryAnnouncement-router=true";
K8sApiClient mockK8sApiClient = EasyMock.createMock(K8sApiClient.class);
- EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, NodeRole.ROUTER)).andReturn(
+ EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, NodeRole.ROUTER, Duration.millis(30000))).andReturn(
new DiscoveryDruidNodeList(
"v1",
ImmutableMap.of(
@@ -94,7 +95,7 @@ public void testGetForNodeRole() throws Exception
);
EasyMock.expect(mockK8sApiClient.watchPods(
podInfo.getPodNamespace(), labelSelector, "v1", NodeRole.ROUTER)).andReturn(null);
- EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, NodeRole.ROUTER)).andReturn(
+ EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, NodeRole.ROUTER, Duration.millis(30000))).andReturn(
new DiscoveryDruidNodeList(
"v2",
ImmutableMap.of(
@@ -168,7 +169,7 @@ public void testNodeRoleWatcherHandlesNullFromAPIByRestarting() throws Exception
{
String labelSelector = "druidDiscoveryAnnouncement-cluster-identifier=druid-cluster,druidDiscoveryAnnouncement-router=true";
K8sApiClient mockK8sApiClient = EasyMock.createMock(K8sApiClient.class);
- EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, NodeRole.ROUTER)).andReturn(
+ EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, NodeRole.ROUTER, Duration.millis(30000))).andReturn(
new DiscoveryDruidNodeList(
"v1",
ImmutableMap.of(
@@ -187,7 +188,7 @@ public void testNodeRoleWatcherHandlesNullFromAPIByRestarting() throws Exception
false
)
);
- EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, NodeRole.ROUTER)).andReturn(
+ EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, NodeRole.ROUTER, Duration.millis(30000))).andReturn(
new DiscoveryDruidNodeList(
"v2",
ImmutableMap.of(
@@ -231,7 +232,7 @@ public void testNodeRoleWatcherLoopOnNullItems() throws Exception
{
String labelSelector = "druidDiscoveryAnnouncement-cluster-identifier=druid-cluster,druidDiscoveryAnnouncement-router=true";
K8sApiClient mockK8sApiClient = EasyMock.createMock(K8sApiClient.class);
- EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, NodeRole.ROUTER)).andReturn(
+ EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, NodeRole.ROUTER, Duration.millis(30000))).andReturn(
new DiscoveryDruidNodeList(
"v1",
ImmutableMap.of(
diff --git a/extensions-core/kubernetes-overlord-extensions/pom.xml b/extensions-core/kubernetes-overlord-extensions/pom.xml
index ec731a590349..30fefc46274f 100644
--- a/extensions-core/kubernetes-overlord-extensions/pom.xml
+++ b/extensions-core/kubernetes-overlord-extensions/pom.xml
@@ -18,8 +18,7 @@
~ under the License.
-->
-
+
4.0.0
org.apache.druid.extensions
@@ -30,13 +29,24 @@
org.apache.druid
druid
- 34.0.0-SNAPSHOT
+ 34.0.0
../../pom.xml
7.2.0
+
+
+
+
+ com.squareup.okio
+ okio
+ 1.17.6
+
+
+
diff --git a/extensions-core/lookups-cached-global/pom.xml b/extensions-core/lookups-cached-global/pom.xml
index 798a29d5d6af..b26b0b9595b4 100644
--- a/extensions-core/lookups-cached-global/pom.xml
+++ b/extensions-core/lookups-cached-global/pom.xml
@@ -28,7 +28,7 @@
org.apache.druid
druid
- 34.0.0-SNAPSHOT
+ 34.0.0
../../pom.xml
diff --git a/extensions-core/lookups-cached-single/pom.xml b/extensions-core/lookups-cached-single/pom.xml
index 283d19ee114b..1114c1e72407 100644
--- a/extensions-core/lookups-cached-single/pom.xml
+++ b/extensions-core/lookups-cached-single/pom.xml
@@ -28,7 +28,7 @@
org.apache.druid
druid
- 34.0.0-SNAPSHOT
+ 34.0.0
../../pom.xml
diff --git a/extensions-core/multi-stage-query/pom.xml b/extensions-core/multi-stage-query/pom.xml
index 6fd8eba6c2f0..e99f38bd4400 100644
--- a/extensions-core/multi-stage-query/pom.xml
+++ b/extensions-core/multi-stage-query/pom.xml
@@ -19,8 +19,7 @@
~ under the License.
-->
-
+
4.0.0
org.apache.druid.extensions
@@ -31,7 +30,7 @@
org.apache.druid
druid
- 34.0.0-SNAPSHOT
+ 34.0.0
../../pom.xml
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
index 879386e87071..45127f6eed64 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
@@ -178,6 +178,7 @@ protected ReturnOrAwait runWithSegment(final SegmentWithDescriptor segment
Objects.requireNonNull(mappedSegment.as(CursorFactory.class)),
mappedSegment.as(TimeBoundaryInspector.class),
bufferPool,
+ null,
null
);
@@ -213,6 +214,7 @@ protected ReturnOrAwait runWithInputChannel(
Objects.requireNonNull(mappedSegment.as(CursorFactory.class)),
mappedSegment.as(TimeBoundaryInspector.class),
bufferPool,
+ null,
null
);
diff --git a/extensions-core/mysql-metadata-storage/pom.xml b/extensions-core/mysql-metadata-storage/pom.xml
index e957efb66408..41b6c0b0c1bb 100644
--- a/extensions-core/mysql-metadata-storage/pom.xml
+++ b/extensions-core/mysql-metadata-storage/pom.xml
@@ -30,7 +30,7 @@