Skip to content

Commit f98b8ee

Browse files
Merge pull request #37 from ISISComputingGroup/filtering
add `play`, `sniff` and `--filter` to `consume`&`listen`
2 parents f85675b + c083469 commit f98b8ee

File tree

12 files changed

+569
-44
lines changed

12 files changed

+569
-44
lines changed

README.md

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,64 @@
11
![](https://github.com/ISISComputingGroup/saluki/blob/main/resources/logo.png)
22

3-
Serialise/deserialise flatbuffers blobs from kafka.
4-
This currently deserialises https://github.com/ess-dmsc/python-streaming-data-types, but I am working to make it agnostic. Python bindings for the respective schema will need to be generated.
3+
ISIS-specific Kafka tools.
4+
Deserialises [the ESS flatbuffers blobs](https://github.com/ess-dmsc/python-streaming-data-types) from Kafka.
5+
6+
Also allows replaying data in a topic.
57

68
# Usage
9+
10+
To run the latest version, install [uv](https://docs.astral.sh/uv/getting-started/installation/) and use `uvx saluki <args>`.
11+
12+
alternatively you can `pip install saluki` and run it from a `venv`.
13+
714
See `saluki --help` for all options.
815

9-
## Listen to a topic for updates
16+
## `listen` - Listen to a topic for updates
1017
`saluki listen mybroker:9092/mytopic` - This will listen for updates for `mytopic` on `mybroker`.
1118

12-
## Consume from a topic
19+
### Filter to specific schemas
20+
21+
`saluki listen mybroker:9092/mytopic -f f144 -f f142` - This will listen for updates but ignore messages with schema IDs of `f142` or `f144`
22+
23+
## `consume`- Consume from a topic
1324
`saluki consume mybroker:9092/mytopic -p 1 -o 123456 -m 10` - This will print 9 messages before (and inclusively the offset specified) offset `123456` of `mytopic` on `mybroker`, in partition 1.
1425

1526
Use the `-g` flag to go the other way, ie. in the above example to consume the 9 messages _after_ offset 123456
1627

17-
# Install
18-
`pip install saluki`
28+
You can also filter out messages to specific schema(s) with the `-f` flag, like the example above for `listen`.
29+
30+
## `sniff` - List all topics and their high, low watermarks and number of messages
31+
`saluki sniff mybroker:9092`
32+
33+
Output looks as follows:
34+
35+
```
36+
$ saluki sniff mybroker:9092
37+
38+
INFO:saluki:Cluster ID: redpanda.0faa4595-7298-407e-9db7-7e2758d1af1f
39+
INFO:saluki:Brokers:
40+
INFO:saluki: 192.168.0.111:9092/1
41+
INFO:saluki: 192.168.0.112:9092/2
42+
INFO:saluki: 192.168.0.113:9092/0
43+
INFO:saluki:Topics:
44+
INFO:saluki: MERLIN_events:
45+
INFO:saluki: 0 - low:262322729, high:302663378, num_messages:40340649
46+
INFO:saluki: MERLIN_runInfo:
47+
INFO:saluki: 0 - low:335, high:2516, num_messages:2181
48+
INFO:saluki: MERLIN_monitorHistograms:
49+
INFO:saluki: 0 - low:7515, high:7551, num_messages:36
50+
```
51+
52+
## `play` - Replay data from one topic to another
53+
54+
### Between offsets
55+
56+
`saluki play mybroker:9092/source_topic mybroker:9092/dest_topic -o 123 125` - This will forward messages at offset 123, 124 and 125 in the `source_topic` to the `dest_topic`
57+
58+
### Between timestamps
59+
60+
`saluki play mybroker:9092/source_topic mybroker:9092/dest_topic -t 1762209990 1762209992` - This will forward messages between the two given timestamps.
1961

20-
## Developer setup
21-
`pip install .[dev]`
62+
# Developer setup
63+
`pip install -e .[dev]`
2264

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ name = "saluki"
77
dynamic = ["version"]
88
dependencies = [
99
"ess-streaming-data-types",
10-
"confluent-kafka",
10+
"confluent-kafka>=2.12.1", # for produce_batch in play()
11+
"python-dateutil",
1112
"tzdata"
1213
]
1314
readme = {file = "README.md", content-type = "text/markdown"}

src/saluki/consume.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
import uuid
23

34
from confluent_kafka import Consumer, TopicPartition
45

@@ -14,6 +15,8 @@ def consume(
1415
num_messages: int = 1,
1516
offset: int | None = None,
1617
go_forwards: bool = False,
18+
schemas_to_filter_to: list[str] | None = None,
19+
timestamp: int | None = None,
1720
) -> None:
1821
"""
1922
consume from a topic and deserialise each message
@@ -24,12 +27,14 @@ def consume(
2427
:param num_messages: number of messages to consume
2528
:param offset: offset to consume from/to
2629
:param go_forwards: whether to consume forwards or backwards
30+
:param schemas_to_filter_to: schemas in messages to filter to
31+
:param timestamp: optionally a timestamp as a starting point
2732
:return: None
2833
"""
2934
c = Consumer(
3035
{
3136
"bootstrap.servers": broker,
32-
"group.id": "saluki",
37+
"group.id": f"saluki-consume-{uuid.uuid4()}",
3338
"session.timeout.ms": 6000,
3439
"auto.offset.reset": "latest",
3540
"enable.auto.offset.store": False,
@@ -38,6 +43,10 @@ def consume(
3843
}
3944
)
4045

46+
if timestamp is not None:
47+
offset = c.offsets_for_times([TopicPartition(topic, partition, timestamp)])[0].offset
48+
logger.debug(f"offset for timestamp {timestamp} is {offset}")
49+
4150
if go_forwards:
4251
if offset is None:
4352
raise ValueError("Can't go forwards without an offset")
@@ -57,7 +66,7 @@ def consume(
5766
try:
5867
logger.info(f"Consuming {num_messages} messages")
5968
msgs = c.consume(num_messages)
60-
deserialise_and_print_messages(msgs, partition)
69+
deserialise_and_print_messages(msgs, partition, schemas_to_filter_to)
6170
except Exception:
6271
logger.exception("Got exception while consuming:")
6372
finally:

src/saluki/listen.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
import uuid
23

34
from confluent_kafka import Consumer, TopicPartition
45

@@ -7,18 +8,24 @@
78
logger = logging.getLogger("saluki")
89

910

10-
def listen(broker: str, topic: str, partition: int | None = None) -> None:
11+
def listen(
12+
broker: str,
13+
topic: str,
14+
partition: int | None = None,
15+
schemas_to_filter_to: list[str] | None = None,
16+
) -> None:
1117
"""
1218
Listen to a topic and deserialise each message
1319
:param broker: the broker address, including the port
1420
:param topic: the topic to use
1521
:param partition: the partition to listen to (default is all partitions in a given topic)
22+
:param schemas_to_filter_to: schemas to filter when listening to messages
1623
:return: None
1724
"""
1825
c = Consumer(
1926
{
2027
"bootstrap.servers": broker,
21-
"group.id": "saluki",
28+
"group.id": f"saluki-listen-{uuid.uuid4()}",
2229
"auto.offset.reset": "latest",
2330
"enable.auto.commit": False,
2431
}
@@ -30,7 +37,9 @@ def listen(broker: str, topic: str, partition: int | None = None) -> None:
3037
logger.info(f"listening to {broker}/{topic}")
3138
while True:
3239
msg = c.poll(1.0)
33-
deserialise_and_print_messages([msg], partition)
40+
deserialise_and_print_messages(
41+
[msg], partition, schemas_to_filter_to=schemas_to_filter_to
42+
)
3443
except KeyboardInterrupt:
3544
logger.debug("finished listening")
3645
finally:

src/saluki/main.py

Lines changed: 97 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,42 +4,57 @@
44

55
from saluki.consume import consume
66
from saluki.listen import listen
7-
from saluki.utils import parse_kafka_uri
7+
from saluki.play import play
8+
from saluki.sniff import sniff
9+
from saluki.utils import dateutil_parsable_or_unix_timestamp, parse_kafka_uri
810

911
logger = logging.getLogger("saluki")
1012
logging.basicConfig(level=logging.INFO)
1113

1214
_LISTEN = "listen"
1315
_CONSUME = "consume"
16+
_PLAY = "play"
17+
_SNIFF = "sniff"
1418

1519

1620
def main() -> None:
1721
parser = argparse.ArgumentParser(
1822
prog="saluki",
1923
description="serialise/de-serialise flatbuffers and consume/produce from/to kafka",
2024
)
25+
common_options = argparse.ArgumentParser(add_help=False)
26+
common_options.add_argument("-v", "--verbose", help="show DEBUG logs", action="store_true")
27+
common_options.add_argument(
28+
"-l",
29+
"--log-file",
30+
help="filename to output all data to",
31+
required=False,
32+
default=None,
33+
type=argparse.FileType("a"),
34+
)
2135

22-
parent_parser = argparse.ArgumentParser(add_help=False)
23-
parent_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic")
36+
topic_parser = argparse.ArgumentParser(add_help=False)
37+
topic_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic")
2438

25-
parent_parser.add_argument(
39+
topic_parser.add_argument(
2640
"-X",
2741
"--kafka-config",
2842
help="kafka options to pass through to librdkafka",
2943
required=False,
3044
default=None,
3145
)
32-
parent_parser.add_argument(
33-
"-l",
34-
"--log-file",
35-
help="filename to output all data to",
36-
required=False,
37-
default=None,
38-
type=argparse.FileType("a"),
39-
)
46+
topic_parser.add_argument("-p", "--partition", required=False, type=int, default=0)
47+
topic_parser.add_argument("-f", "--filter", required=False, action="append")
4048

4149
sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command")
4250

51+
sniff_parser = sub_parsers.add_parser(
52+
_SNIFF, help="sniff - broker metadata", parents=[common_options]
53+
)
54+
sniff_parser.add_argument(
55+
"broker", type=str, help="broker, optionally suffixed with a topic name to filter to"
56+
)
57+
4358
consumer_parser = argparse.ArgumentParser(add_help=False)
4459
consumer_parser.add_argument(
4560
"-e",
@@ -50,7 +65,7 @@ def main() -> None:
5065
)
5166

5267
consumer_mode_parser = sub_parsers.add_parser(
53-
_CONSUME, help="consumer mode", parents=[parent_parser, consumer_parser]
68+
_CONSUME, help="consumer mode", parents=[topic_parser, consumer_parser, common_options]
5469
)
5570
consumer_mode_parser.add_argument(
5671
"-m",
@@ -60,44 +75,100 @@ def main() -> None:
6075
required=False,
6176
default=1,
6277
)
63-
consumer_mode_parser.add_argument(
64-
"-o", "--offset", help="offset to consume from", type=int, required=False
65-
)
66-
consumer_mode_parser.add_argument("-s", "--schema", required=False, default="auto", type=str)
78+
6779
consumer_mode_parser.add_argument("-g", "--go-forwards", required=False, action="store_true")
68-
consumer_mode_parser.add_argument("-p", "--partition", required=False, type=int, default=0)
80+
cg = consumer_mode_parser.add_mutually_exclusive_group(required=False)
81+
cg.add_argument(
82+
"-o",
83+
"--offset",
84+
help="offset to consume from",
85+
type=int,
86+
)
87+
cg.add_argument(
88+
"-t",
89+
"--timestamp",
90+
help="timestamp to consume from",
91+
type=dateutil_parsable_or_unix_timestamp,
92+
)
6993

70-
listen_parser = sub_parsers.add_parser(
94+
listen_parser = sub_parsers.add_parser( # noqa: F841
7195
_LISTEN,
7296
help="listen mode - listen until KeyboardInterrupt",
73-
parents=[parent_parser, consumer_parser],
97+
parents=[topic_parser, consumer_parser, common_options],
98+
)
99+
100+
play_parser = sub_parsers.add_parser(
101+
_PLAY,
102+
help="replay mode - replay data into another topic",
103+
parents=[common_options],
104+
)
105+
play_parser.add_argument("topics", type=str, nargs=2, help="SRC topic DEST topic")
106+
g = play_parser.add_mutually_exclusive_group(required=True)
107+
g.add_argument(
108+
"-o",
109+
"--offsets",
110+
help="offsets to replay between (inclusive)",
111+
type=int,
112+
nargs=2,
113+
)
114+
g.add_argument(
115+
"-t",
116+
"--timestamps",
117+
help="timestamps to replay between in ISO8601 or RFC3339 format ie."
118+
' "2025-11-17 07:00:00 or as a unix timestamp" ',
119+
type=dateutil_parsable_or_unix_timestamp,
120+
nargs=2,
74121
)
75-
listen_parser.add_argument("-p", "--partition", required=False, type=int, default=None)
76122

77123
if len(sys.argv) == 1:
78124
parser.print_help()
79125
sys.exit(1)
80126
args = parser.parse_args()
81127

82-
if args.kafka_config is not None:
83-
raise NotImplementedError("-X is not implemented yet.")
84-
85-
broker, topic = parse_kafka_uri(args.topic)
128+
if args.verbose:
129+
logger.setLevel(logging.DEBUG)
86130

87131
if args.log_file:
88132
logger.addHandler(logging.FileHandler(args.log_file.name))
89133

134+
if "kafka_config" in args and args.kafka_config is not None:
135+
raise NotImplementedError("-X is not implemented yet.")
136+
90137
if args.command == _LISTEN:
91-
listen(broker, topic, args.partition)
138+
broker, topic = parse_kafka_uri(args.topic)
139+
listen(broker, topic, args.partition, args.filter)
92140
elif args.command == _CONSUME:
141+
broker, topic = parse_kafka_uri(args.topic)
93142
consume(
94143
broker,
95144
topic,
96145
args.partition,
97146
args.messages,
98147
args.offset,
99148
args.go_forwards,
149+
args.filter,
150+
args.timestamp,
151+
)
152+
elif args.command == _PLAY:
153+
src_broker, src_topic = parse_kafka_uri(args.topics[0])
154+
dest_broker, dest_topic = parse_kafka_uri(args.topics[1])
155+
156+
play(
157+
src_broker,
158+
src_topic,
159+
dest_broker,
160+
dest_topic,
161+
args.offsets,
162+
args.timestamps,
100163
)
164+
elif args.command == _SNIFF:
165+
try:
166+
broker, topic = parse_kafka_uri(args.broker)
167+
logger.debug(f"Sniffing single topic {topic} on broker {broker}")
168+
sniff(broker, topic)
169+
except RuntimeError:
170+
logger.debug(f"Sniffing whole broker {args.broker}")
171+
sniff(args.broker)
101172

102173

103174
if __name__ == "__main__":

0 commit comments

Comments
 (0)