Skip to content

Commit 000df6d

Browse files
committed
backing up - add parser for timestamps
1 parent 62f5810 commit 000df6d

File tree

5 files changed

+85
-27
lines changed

5 files changed

+85
-27
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ dynamic = ["version"]
88
dependencies = [
99
"ess-streaming-data-types",
1010
"confluent-kafka",
11+
"python-dateutil",
1112
"tzdata"
1213
]
1314
readme = {file = "README.md", content-type = "text/markdown"}

src/saluki/main.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
import sys
44

5+
from dateutil.parser import parse, ParserError
56
from saluki.consume import consume
67
from saluki.listen import listen
78
from saluki.play import play
@@ -17,6 +18,16 @@
1718
_SNIFF = "sniff"
1819

1920

21+
def _dateutil_parsable_or_unix_timestamp(inp: str) -> float:
22+
try:
23+
try:
24+
return parse(inp).timestamp()
25+
except ParserError:
26+
return float(inp)
27+
except ValueError:
28+
raise argparse.ArgumentTypeError(f"timestamp {inp} is not parsable by dateutil.parse() and is not a unix timestamp")
29+
30+
2031
def main() -> None:
2132
parser = argparse.ArgumentParser(
2233
prog="saluki",
@@ -47,7 +58,7 @@ def main() -> None:
4758
sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command")
4859

4960
sniff_parser = sub_parsers.add_parser(_SNIFF, help="sniff - broker metadata")
50-
sniff_parser.add_argument("broker", type=str)
61+
sniff_parser.add_argument("broker", type=str, help="broker, optionally suffixed with a topic name")
5162

5263
consumer_parser = argparse.ArgumentParser(add_help=False)
5364
consumer_parser.add_argument(
@@ -94,9 +105,7 @@ def main() -> None:
94105
type=int,
95106
nargs=2,
96107
)
97-
g.add_argument(
98-
"-t", "--timestamps", help="unix timestamps to replay between", type=str, nargs=2
99-
)
108+
g.add_argument("-t", "--timestamps", help='timestamps to replay between in ISO8601 or RFC3339 format ie. "2025-11-17 07:00:00" ', type=_dateutil_parsable_or_unix_timestamp, nargs=2)
100109

101110
if len(sys.argv) == 1:
102111
parser.print_help()
@@ -130,7 +139,11 @@ def main() -> None:
130139
args.timestamps,
131140
)
132141
elif args.command == _SNIFF:
133-
sniff(args.broker)
142+
try:
143+
broker, topic = parse_kafka_uri(args.broker)
144+
sniff(broker, topic)
145+
except RuntimeError:
146+
sniff(args.broker)
134147

135148

136149
if __name__ == "__main__":

src/saluki/sniff.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,31 @@
77
logger = logging.getLogger("saluki")
88

99

10-
def sniff(broker: str) -> None:
10+
def sniff(broker: str, topic: str | None = None) -> None:
1111
"""
1212
Prints the broker and topic metadata for a given broker.
13+
If a topic is given, only this topic's partitions and watermarks will be printed.
1314
:param broker: The broker address including port number.
15+
:param topic: Optional topic to filter information to.
1416
"""
1517
a = AdminClient({"bootstrap.servers": broker})
1618
c = Consumer({"bootstrap.servers": broker, "group.id": f"saluki-sniff-{uuid.uuid4()}"})
1719
t = a.list_topics(timeout=5)
18-
logger.info(f"Cluster ID: {t.cluster_id}")
19-
logger.info("Brokers:")
20-
for value in t.brokers.values():
21-
logger.info(f"\t{value}")
20+
if topic is not None and topic not in t.topics.keys():
21+
logger.warning(f"Topic {topic} not found on broker {broker}")
22+
return
2223

23-
logger.info("Topics:")
24+
if topic is None:
25+
logger.info(f"Cluster ID: {t.cluster_id}")
26+
logger.info("Brokers:")
27+
for value in t.brokers.values():
28+
logger.info(f"\t{value}")
29+
30+
logger.info("Topics:")
2431

2532
for k, v in t.topics.items():
33+
if topic is not None and k != topic:
34+
continue
2635
partitions = v.partitions.keys()
2736
logger.info(f"\t{k}:")
2837
for p in partitions:

src/saluki/utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ def parse_kafka_uri(uri: str) -> Tuple[str, str]:
7979
If username is provided, a SASL mechanism must also be provided.
8080
Any other validation must be performed in the calling code.
8181
"""
82-
broker, topic = uri.split("/") if "/" in uri else (uri, "")
83-
if not topic:
82+
broker, topic = uri.split("/") if "/" in uri else (uri, None)
83+
if topic is None:
8484
raise RuntimeError(
8585
f"Unable to parse URI {uri}, topic not defined. URI should be of form"
8686
f" broker[:port]/topic"

tests/test_sniff.py

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,38 @@
1+
import pytest
12
from unittest.mock import patch
23

34
from confluent_kafka.admin import BrokerMetadata, ClusterMetadata, TopicMetadata
45

56
from saluki.sniff import sniff
67

8+
@pytest.fixture()
9+
def fake_cluster_md():
10+
"""
11+
Returns a fake cluster metadata object with two topics;
12+
one with 1 partition and the other with 2.
13+
"""
14+
fake_cluster_md = ClusterMetadata()
15+
broker1 = BrokerMetadata()
16+
broker1.id = "id1" # type: ignore
17+
broker1.host = "mybroker" # type: ignore
18+
broker1.port = 9093
19+
fake_cluster_md.brokers = {0: broker1}
720

8-
def test_sniff_with_two_partitions_in_a_topic():
21+
topic1 = TopicMetadata()
22+
topic1.partitions = {0: {}}
23+
24+
topic2 = TopicMetadata()
25+
topic2.partitions = {0: {}, 1: {}}
26+
27+
fake_cluster_md.topics = {"topic1": topic1, "topic2": topic2}
28+
return fake_cluster_md
29+
30+
def test_sniff_with_two_partitions_in_a_topic(fake_cluster_md):
931
with (
1032
patch("saluki.sniff.AdminClient") as a,
1133
patch("saluki.sniff.Consumer") as c,
1234
patch("saluki.sniff.logger") as logger,
1335
):
14-
fake_cluster_md = ClusterMetadata()
15-
broker1 = BrokerMetadata()
16-
broker1.id = "id1" # type: ignore
17-
broker1.host = "mybroker" # type: ignore
18-
broker1.port = 9093
19-
fake_cluster_md.brokers = {0: broker1}
20-
21-
topic1 = TopicMetadata()
22-
topic1.partitions = {0: {}}
23-
topic2 = TopicMetadata()
24-
topic2.partitions = {0: {}, 1: {}}
25-
26-
fake_cluster_md.topics = {"topic1": topic1, "topic2": topic2}
2736
a().list_topics.return_value = fake_cluster_md
2837
c().get_watermark_offsets.return_value = 1, 2
2938
sniff("whatever")
@@ -40,3 +49,29 @@ def test_sniff_with_two_partitions_in_a_topic():
4049

4150
topic2_call2 = logger.info.call_args_list[8]
4251
assert "1 - low:1, high:2, num_messages:1" in topic2_call2.args[0]
52+
53+
def test_sniff_with_single_topic(fake_cluster_md):
54+
with (
55+
patch("saluki.sniff.AdminClient") as a,
56+
patch("saluki.sniff.Consumer") as c,
57+
patch("saluki.sniff.logger") as logger,
58+
):
59+
60+
a().list_topics.return_value = fake_cluster_md
61+
c().get_watermark_offsets.return_value = 1, 2
62+
sniff("mybroker:9093", "topic1")
63+
64+
assert "\ttopic1" in logger.info.call_args_list[0].args[0]
65+
assert "\t\t0 - low:1, high:2, num_messages:1" in logger.info.call_args_list[1].args[0]
66+
67+
68+
def test_sniff_with_single_nonexistent_topic():
69+
with (
70+
patch("saluki.sniff.AdminClient") as a,
71+
patch("saluki.sniff.Consumer"),
72+
patch("saluki.sniff.logger") as logger,
73+
):
74+
# Deliberately blank cluster metadata ie. no topics
75+
a().list_topics.return_value = ClusterMetadata()
76+
sniff("somebroker:9092", "sometopic")
77+
logger.warning.assert_called_with("Topic sometopic not found on broker somebroker:9092")

0 commit comments

Comments
 (0)