Skip to content

Commit d070c4d

Browse files
committed
Add example listing committed offsets and lag
1 parent 624cdb8 commit d070c4d

File tree

2 files changed

+80
-1
lines changed

2 files changed

+80
-1
lines changed

examples/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
The scripts in this directory provide code examples using Confluent's Python client:
22

33
* [adminapi.py](adminapi.py): collection of Kafka Admin API operations
4-
* [avro-cli.py](avro-cli.py): produces Avro messages with Confluent Schema Registry and then reads them back again
4+
* [avro-cli.py](avro-cli.py): produces Avro messages with Confluent Schema Registry and then reads them back again
55
* [consumer.py](consumer.py): reads messages from a Kafka topic
66
* [producer.py](producer.py): reads lines from stdin and sends them to Kafka
77
* [eos-transactions.py](eos-transactions.py): transactional producer with exactly once semantics (EOS)
@@ -12,6 +12,7 @@ The scripts in this directory provide code examples using Confluent's Python cli
1212
* [protobuf_producer.py](protobuf_producer.py): SerializingProducer with ProtobufSerializer
1313
* [protobuf_consumer.py](protobuf_consumer.py): DeserializingConsumer with ProtobufDeserializer
1414
* [sasl_producer.py](sasl_producer.py): SerializingProducer with SASL Authentication
15+
* [list_offsets.py](list_offsets.py): List committed offsets and consumer lag for group and topics
1516

1617
Additional examples for [Confluent Cloud](https://www.confluent.io/confluent-cloud/):
1718

examples/list_offsets.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#!/usr/bin/env python3
2+
#
3+
# Copyright 2020 Confluent Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
#
19+
# Show committed offsets and current lag for group and topic(s).
20+
#
21+
22+
23+
import sys
24+
import confluent_kafka
25+
26+
27+
if len(sys.argv) < 4:
28+
sys.stderr.write("Usage: {} <brokers> <group.id> <topic> <topic2..>\n".format(sys.argv[0]))
29+
sys.exit(1)
30+
31+
brokers, group = sys.argv[1:3]
32+
33+
# Create consumer.
34+
# This consumer will not join the group, but the group.id is required by
35+
# committed() to know which group to get offsets for.
36+
consumer = confluent_kafka.Consumer({'bootstrap.servers': brokers,
37+
'group.id': group})
38+
39+
40+
print("%-50s %9s %9s" % ("Topic [Partition]", "Committed", "Lag"))
41+
print("=" * 72)
42+
43+
for topic in sys.argv[3:]:
44+
# Get the topic's partitions
45+
metadata = consumer.list_topics(topic, timeout=10)
46+
if metadata.topics[topic].error is not None:
47+
raise confluent_kafka.KafkaException(metadata.topics[topic].error)
48+
49+
# Construct TopicPartition list of partitions to query
50+
partitions = [confluent_kafka.TopicPartition(topic, p) for p in metadata.topics[topic].partitions]
51+
52+
# Query committed offsets for this group and the given partitions
53+
committed = consumer.committed(partitions, timeout=10)
54+
55+
for partition in committed:
56+
# Get the partitions low and high watermark offsets.
57+
(lo, hi) = consumer.get_watermark_offsets(partition, timeout=10, cached=False)
58+
59+
if partition.offset == confluent_kafka.OFFSET_INVALID:
60+
offset = "-"
61+
else:
62+
offset = "%d" % (partition.offset)
63+
64+
if hi < 0:
65+
lag = "no hwmark" # Unlikely
66+
elif partition.offset < 0:
67+
# No committed offset, show total message count as lag.
68+
# The actual message count may be lower due to compaction
69+
# and record deletions.
70+
lag = "%d" % (hi - lo)
71+
else:
72+
lag = "%d" % (hi - partition.offset)
73+
74+
print("%-50s %9s %9s" % (
75+
"{} [{}]".format(partition.topic, partition.partition), offset, lag))
76+
77+
78+
consumer.close()

0 commit comments

Comments
 (0)