|
4 | 4 | // tag::single-source[] |
5 | 5 | :description: pass:q[Redpanda uses an internal topic, `__consumer_offsets`, to store committed offsets from each Kafka consumer that is attached to Redpanda.] |
6 | 6 |
|
7 | | -Redpanda supports `pass:[__consumer_offsets]`, which is a private topic on a Redpanda broker. The `pass:[__consumer_offsets]` topic stores committed offsets from each Kafka consumer that is attached to Redpanda. |
8 | 7 |
|
9 | | -Redpanda exposes the `pass:[__consumer_offsets]` key to enable the many tools in the Kafka ecosystem that rely on this value for their operation, providing greater ecosystem interoperability with environments and applications. |
| 8 | +In Redpanda, all messages are organized by glossterm:topic[] and distributed across multiple partitions, based on a https://www.redpanda.com/guides/kafka-tutorial-kafka-partition-strategy[partition strategy^]. For example, when using the round robin strategy, a producer writing to a topic with five partitions would distribute approximately 20% of the messages to each glossterm:partition[]. |
10 | 9 |
|
11 | | -Kafka consumer tracks the maximum offset it has consumed in each partition and has the capability to commit offsets so that it can resume from those offsets in the event of a restart. Kafka provides the option to store all the offsets for a given consumer group in a designated broker (for that group) called the "group coordinator". In other words, any consumer instance in that consumer group should send its offset commits and fetches to that group coordinator (broker). |
| 10 | +Within a partition, each message (once accepted and acknowledged by the partition leader) is permanently assigned a unique sequence number called an glossterm:offset[]. Offsets enable consumers to resume processing from a specific point, such as after an application outage. If an outage prevents your application from receiving events, you can use the consumer offset to retrieve only the events that occurred during the downtime. By default, the first message in a partition is assigned offset 0, the next is offset 1, and so on. You can manually specify a specific start value for offsets if needed. Once assigned, offsets are immutable, ensuring that the order of messages within a partition is preserved. |
12 | 11 |
|
13 | | -NOTE: A consumer group is a set of consumers that cooperate to consume data from some topics. The partitions of all the topics are divided among the consumers in the group. As new group members arrive and old members leave, the partitions are re-assigned so that each member receives a proportional share of the partitions. |
14 | 12 |
|
15 | | -When the group coordinator receives an OffsetCommitRequest, it appends the request to a special https://kafka.apache.org/documentation/#compaction[compacted^] Kafka topic named `pass:[__consumer_offsets]`. |
| 13 | +== How consumers use offsets |
| 14 | + |
| 15 | +As a consumer reads messages from Redpanda, it can save its progress by “committing the offset” (known as an glossterm:offset commit[]), an action initiated by the consumer, not Redpanda. Kafka client libraries provide an API for committing offsets, which communicates with Redpanda using the glossterm:consumer group[] API. Each committed offset is stored as a message in the `pass:[__consumer_offsets]` topic, which is a private Redpanda topic that stores committed offsets from each Kafka consumer attached to Redpanda, allowing the consumer to resume processing from the last committed point. Redpanda exposes the `pass:[__consumer_offsets]` key to enable the many tools in the Kafka ecosystem that rely on this value for their operation, providing greater ecosystem interoperability with environments and applications. |
| 16 | + |
| 17 | +When a consumer group works together to consume data from topics, the partitions are divided among the consumers in the group. For example, if a topic has 12 partitions, and there are two consumers, each consumer would be assigned six partitions to consume. If a new consumer starts later and joins this consumer group, a rebalance occurs, such that each consumer ends up with four partitions to consume. You specify a consumer group by setting the `group.id` property to a unique name for the group. |
| 18 | + |
| 19 | +Kafka tracks the maximum offset it has consumed in each partition and can commit offsets to ensure it can resume processing from the same point in the event of a restart. Kafka allows offsets for a consumer group to be stored on a designated broker, known as the group coordinator. All consumers in the group send their offset commits and fetch requests to this group coordinator. |
| 20 | + |
| 21 | +NOTE: More advanced consumers can read data from Redpanda without using a consumer group by requesting to read a specific topic, partition, and offset range. This pattern is often used by stream processing systems such as Apache Spark and Apache Flink, which have their own mechanisms for assigning work to consumers. |
| 22 | + |
| 23 | +When the group coordinator receives an OffsetCommitRequest, it appends the request to the https://kafka.apache.org/documentation/#compaction[compacted^] Kafka topic `pass:[__consumer_offsets]`. |
16 | 24 | The broker sends a successful offset commit response to the consumer only after all the replicas of the offsets topic receive the offsets. |
17 | 25 | If the offsets fail to replicate within a configurable timeout, the offset commit fails and the consumer may retry the commit after backing off. |
18 | | -The brokers periodically compact the offsets topic, because it only needs to maintain the most recent offset commit for each partition. |
| 26 | +The brokers periodically compact the `pass:[__consumer_offsets]` topic, because it only needs to maintain the most recent offset commit for each partition. |
19 | 27 | The coordinator also caches the offsets in an in-memory table to serve offset fetches quickly. |
20 | 28 |
|
21 | | -When stored in a partition, a sequential number is assigned to each event for order. This number is the _offset_. The offset allows consumer event processing to start or continue from where it left off. So, if you have a consumer application outage that prevents your application from receiving events, you can use the consumer offset to select and send only the events that occurred when the application was down. By default, the offset starts at 0, but you can manually specify a specific start value. |
| 29 | +== Commit strategies |
| 30 | + |
| 31 | +There are several strategies for managing offset commits: |
| 32 | + |
| 33 | +=== Automatic offset commit |
| 34 | + |
| 35 | +Auto commit is the default commit strategy, where the client automatically commits offsets at regular intervals. This is set with the `enable.auto.commit` property. The client then commits offsets every `auto.commit.interval.ms` milliseconds. |
| 36 | + |
| 37 | +The primary advantage of the auto commit approach is its simplicity. After it is configured, the consumer requires no additional effort. Commits are managed in the background. However, the consumer is unaware of what was committed or when. As a result, after an application restart, some messages may be reprocessed (since consumption resumes from the last committed offset, which may include already-processed messages). The strategy guarantees at-least-once delivery. |
| 38 | + |
| 39 | +NOTE: If your consume configuration is set up to consume and write to another data store, and the write to that datastore fails, the consumer might not recover when it is auto-committed. It may not only duplicate messages, but could also drop messages intended to be in another datastore. Make sure you understand the trade-off possibilities associated with this default behavior. |
| 40 | + |
| 41 | +=== Manual offset commit |
| 42 | + |
| 43 | +The manual offset commit strategy gives consumers greater control over when commits occur. This approach is typically used when a consumer needs to align commits with an external system, such as database transactions in an RDBMS. The main advantage of manual commits is that they allow you to decide exactly when a record is considered consumed. You can use two API calls for this: `commitSync` and `commitAsync`, which differ in their blocking behavior. |
| 44 | + |
| 45 | +==== Synchronous commit |
| 46 | + |
| 47 | +The advantage of synchronous commits is that consumers can take appropriate action before continuing to consume messages, albeit at the expense of increased latency (while waiting for the commit to return). The commit (`commitSync`) will also retry automatically, until it either succeeds or receives an unrecoverable error. The following example shows a synchronous commit: |
| 48 | + |
| 49 | +[source,java] |
| 50 | +---- |
| 51 | +consumer.subscribe(Arrays.asList("foo", "bar")); |
| 52 | +while (true) { |
| 53 | + ConsumerRecords<String, String> records = consumer.poll(100); |
| 54 | + for (ConsumerRecord<String, String> record : records) { |
| 55 | + // process records here ... |
| 56 | + |
| 57 | + // ... and at the appropriate point, call commit (not after every message) |
| 58 | + consumer.commitSync(); |
| 59 | + } |
| 60 | +} |
| 61 | +---- |
| 62 | + |
| 63 | +==== Asynchronous commit |
| 64 | + |
| 65 | +The advantage of asynchronous commits is lower latency, because the consumer does not pause to wait for the commit response. However, there is no automatic retry of the commit (`commitAsync`) if it fails. There is also increased coding complexity (due to the asynchronous callbacks). The following example shows an asynchronous commit in which the consumer will not block. Instead, the commit call registers a callback, which is executed once the commit returns: |
| 66 | + |
| 67 | +[source,java] |
| 68 | +---- |
| 69 | +void callback() { |
| 70 | + // executed when the commit returns |
| 71 | +} |
| 72 | +
|
| 73 | +consumer.subscribe(Arrays.asList("foo", "bar")); |
| 74 | +while (true) { |
| 75 | + ConsumerRecords<String, String> records = consumer.poll(100); |
| 76 | + for (ConsumerRecord<String, String> record : records) { |
| 77 | + // process records here ... |
| 78 | + |
| 79 | + // ... and at the appropriate point, call commit |
| 80 | + consumer.commitAsync(callback); |
| 81 | + } |
| 82 | +} |
| 83 | +---- |
| 84 | + |
| 85 | +=== External offset management |
| 86 | + |
| 87 | +The external offset management strategy allows consumers to manage offsets independently of Redpanda. In this approach: |
| 88 | + |
| 89 | +- Consumers bypass the consumer group API and directly assign partitions instead of subscribing to a topic. |
| 90 | +- Offsets are not committed to Redpanda, but are instead stored in an external storage system. |
| 91 | + |
| 92 | +To implement an external offset management strategy: |
| 93 | + |
| 94 | +. Set `enable.auto.commit` to `false`. |
| 95 | +. Use `assign(Collection<TopicPartition>)` to assign partitions. |
| 96 | +. Use the offset provided with each ConsumerRecord to save your position. |
| 97 | +. Upon restart, use `seek(TopicPartition, long)` to restore the position of the consumer. |
| 98 | + |
| 99 | +=== Hybrid offset management |
| 100 | + |
| 101 | +The hybrid offset management strategy allows consumers to handle their own consumer rebalancing while still leveraging Redpanda's offset commit functionality. In this approach: |
| 102 | + |
| 103 | +- Consumers bypass the consumer group API and directly assign partitions instead of subscribing to a topic. |
| 104 | +- Offsets are committed to Redpanda. |
| 105 | + |
| 106 | +== Offset commit best practices |
| 107 | + |
| 108 | +Follow these best practices to optimize offset commits. |
| 109 | + |
| 110 | +=== Avoid over-committing |
| 111 | + |
| 112 | +The purpose of a commit is to save consumer progress. More frequent commits reduce the amount of data to re-read after an application restart, as the commit interval directly affects the recovery point objective (RPO). Because a lower RPO is desirable, application designers may believe that committing frequently is a good design choice. |
| 113 | + |
| 114 | +However, committing too frequently can result in adverse consequences. While individually small, each commit still results in a message being written to the `pass:[__consumer_offsets]` topic, because the position of the consumer against every partition must be recorded. At high commit rates, this workload can become a bottleneck for both the client and the server. Additionally, many Kafka client implementations do not coalesce offset commits, meaning redundant commits in a backlog still need to be processed. |
| 115 | + |
| 116 | +In many Kafka client implementations, offset commits aren't coalesced at the client; so if a backlog of commits forms (when using the asynchronous commit API), the earlier commits still need to be processed, even though they are effectively redundant. |
| 117 | + |
| 118 | +*Best practice*: Monitor commit latency to ensure commits are timely. If you notice performance issues, commit less frequently. |
| 119 | + |
| 120 | +=== Use unique consumer groups |
| 121 | + |
| 122 | +Like many topics, the consumer group topic has multiple partitions to help with performance. When writing commit messages, Redpanda groups all of the commits for a consumer group into a specific partition to maintain ordering. Reusing a consumer group across multiple applications, even for different topics, forces all commits to use a single partition, negating the benefits of partitioning. |
| 123 | + |
| 124 | +*Best practice*: Assign a unique consumer group to each application to distribute the commit load across all partitions. |
| 125 | + |
| 126 | +=== Tune the consumer group |
| 127 | + |
| 128 | +In highly parallel applications, frequent consumer group heartbeats can create unnecessary overhead. For example, 3,200 consumers checking every 500 milliseconds generate 6,400 heartbeats per second. You can optimize this behavior by increasing the `heartbeat.interval.ms` (along with `session.timeout.ms`). |
| 129 | + |
| 130 | +*Best practice*: Adjust heartbeat and session timeout settings to reduce unnecessary overhead in large-scale applications. |
22 | 131 |
|
23 | 132 | // end::single-source[] |
0 commit comments