Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions docs/ingestion/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ For configuration properties shared across all streaming ingestion methods, refe

|Property|Type|Description|Required|Default|
|--------|----|-----------|--------|-------|
|`topic`|String|The Kafka topic to read from. To ingest data from multiple topic, use `topicPattern`. |Yes if `topicPattern` isn't set.||
|`topic`|String|The Kafka topic to read from. Note that once this value is established for a supervisor, updating it is not supported. To ingest data from multiple topic, use `topicPattern`. |Yes if `topicPattern` isn't set.||
|`topicPattern`|String|Multiple Kafka topics to read from, passed as a regex pattern. See [Ingest from multiple topics](#ingest-from-multiple-topics) for more information.|Yes if `topic` isn't set.||
|`consumerProperties`|String, Object|A map of properties to pass to the Kafka consumer. See [Consumer properties](#consumer-properties) for details.|Yes. At the minimum, you must set the `bootstrap.servers` property to establish the initial connection to the Kafka cluster.||
|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds.|No|100|
Expand All @@ -134,6 +134,14 @@ If you enable multi-topic ingestion for a datasource, downgrading to a version o
28.0.0 will cause the ingestion for that datasource to fail.
:::

:::info
Migrating an existing supervisor to use `topicPattern` instead of `topic` is not supported. It is also not supported to change the `topicPattern` of an existing supervisor to a different regex pattern.
Copy link
Contributor

@kfaraz kfaraz Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this.

Maybe you can put it in a list format for better readability like

The following updates to a supervisor spec are not supported:
- item 1
- item 2
- item 3

Follow these steps instead: 
- Step 1
- Step 2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated with list

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

You can force the migration by doing the following:
1. Suspend the supervisor.
2. Reset the offsets.
3. Submit updated supervisor.
:::

You can ingest data from one or multiple topics.
When ingesting data from multiple topics, Druid assigns partitions based on the hashcode of the topic name and the ID of the partition within that topic. The partition assignment might not be uniform across all the tasks. Druid assumes that partitions across individual topics have similar load. If you want to ingest from both high and low load topics in the same supervisor, it is recommended that you have a higher number of partitions for a high load topic and a lower number of partitions for a low load topic.

Expand Down Expand Up @@ -456,4 +464,4 @@ See the following topics for more information:
* [Supervisor API](../api-reference/supervisor-api.md) for how to manage and monitor supervisors using the API.
* [Supervisor](../ingestion/supervisor.md) for supervisor status and capacity planning.
* [Loading from Apache Kafka](../tutorials/tutorial-kafka.md) for a tutorial on streaming data from Apache Kafka.
* [Kafka input format](../ingestion/data-formats.md#kafka) to learn about the `kafka` input format.
* [Kafka input format](../ingestion/data-formats.md#kafka) to learn about the `kafka` input format.
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
Expand Down Expand Up @@ -176,6 +180,47 @@ protected KafkaSupervisorSpec toggleSuspend(boolean suspend)
);
}

/**
* Extends {@link SeekableStreamSupervisorSpec#validateSpecUpdateTo} to ensure that the proposed spec and current spec are either both multi-topic or both single-topic.
* <p>
* getSource() returns the same string (exampleTopic) for "topicPattern=exampleTopic" and "topic=exampleTopic".
* This override prevents this case from being considered a valid update.
* </p>
* @param proposedSpec the proposed supervisor spec
* @throws DruidException if the proposed spec is not a Kafka spec or if the proposed spec changes from multi-topic to single-topic or vice versa
*/
@Override
public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidException
{
if (!(proposedSpec instanceof KafkaSupervisorSpec)) {
throw InvalidInput.exception(
StringUtils.format("Cannot evolve to [%s] from [%s]", getClass().getName(), proposedSpec.getClass().getName())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
StringUtils.format("Cannot evolve to [%s] from [%s]", getClass().getName(), proposedSpec.getClass().getName())
"Cannot change spec from type[%s] to type[%s]", getClass().getName(), proposedSpec.getClass().getName()

);
}
KafkaSupervisorSpec other = (KafkaSupervisorSpec) proposedSpec;
if (this.getSpec().getIOConfig().isMultiTopic() != other.getSpec().getIOConfig().isMultiTopic()) {
throw InvalidInput.exception(getIllegalInputSourceUpdateErrorMessage("(%s) %s", "(%s) %s"),
this.getSpec().getIOConfig().isMultiTopic() ? "multi-topic" : "single-topic",
this.getSource(),
other.getSpec().getIOConfig().isMultiTopic() ? "multi-topic" : "single-topic",
other.getSource());
}

super.validateSpecUpdateTo(proposedSpec);
}

@Override
protected String getIllegalInputSourceUpdateErrorMessage(String existingSource, String proposedSource)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need to override this method.
We can either use the constant defined in SeekableStreamSupervisorSpec as is or a different String altogether.
A cleaner way to include the type name in that String would be to have a placeholder for type and fill it with getType() while formatting.

{
return StringUtils.format(
"Update of topic/topicPattern from [%s] to [%s] is not supported for a running Kafka supervisor."
+ "%nTo perform the update safely, follow these steps:"
+ "%n(1) Suspend this supervisor, reset its offsets and then terminate it. "
+ "%n(2) Create a new supervisor with the new topic or topicPattern."
+ "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too.",
existingSource, proposedSource);
}

@Override
public String toString()
{
Expand Down
Loading
Loading