Skip to content

Commit 1394513

Browse files
committed
docs, formatting, fix listen -f
1 parent 2f14433 commit 1394513

File tree

8 files changed

+104
-42
lines changed

8 files changed

+104
-42
lines changed

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ Also allows replaying data in a topic.
88
# Usage
99
See `saluki --help` for all options.
1010

11-
## Listen to a topic for updates
11+
## `listen` - Listen to a topic for updates
1212
`saluki listen mybroker:9092/mytopic` - This will listen for updates for `mytopic` on `mybroker`.
1313

1414
### Filter to specific schemas
1515

16-
TODO
16+
`saluki listen mybroker:9092/mytopic -f f144 -f f142` - This will listen for updates but ignore messages with schema IDs of `f142` or `f144`
1717

18-
## Consume from a topic
18+
## `consume`- Consume from a topic
1919
`saluki consume mybroker:9092/mytopic -p 1 -o 123456 -m 10` - This will print 9 messages before (and inclusively the offset specified) offset `123456` of `mytopic` on `mybroker`, in partition 1.
2020

2121
Use the `-g` flag to go the other way, ie. in the above example to consume the 9 messages _after_ offset 123456
@@ -24,10 +24,10 @@ Use the `-g` flag to go the other way, ie. in the above example to consume the 9
2424

2525
TODO
2626

27-
## List all topics and their high, low watermarks and number of messages
28-
TODO
27+
## `sniff` - List all topics and their high, low watermarks and number of messages
28+
`saluki sniff mybroker:9092`
2929

30-
## Replay data from one topic to another
30+
## `play` - Replay data from one topic to another
3131

3232
### Between offsets
3333

src/saluki/consume.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ def consume(
4747
start = offset - num_messages + 1
4848
else:
4949
start = (
50-
c.get_watermark_offsets(TopicPartition(topic, partition), cached=False)[1]
50+
c.get_watermark_offsets(TopicPartition(topic, partition), cached=False)[
51+
1
52+
]
5153
- num_messages
5254
)
5355

src/saluki/listen.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@
77
logger = logging.getLogger("saluki")
88

99

10-
def listen(broker: str, topic: str, partition: int | None = None, filter: list[str] | None = None) -> None:
10+
def listen(
11+
broker: str,
12+
topic: str,
13+
partition: int | None = None,
14+
schemas_to_filter_out: list[str] | None = None,
15+
) -> None:
1116
"""
1217
Listen to a topic and deserialise each message
1318
:param broker: the broker address, including the port
@@ -30,7 +35,9 @@ def listen(broker: str, topic: str, partition: int | None = None, filter: list[s
3035
logger.info(f"listening to {broker}/{topic}")
3136
while True:
3237
msg = c.poll(1.0)
33-
deserialise_and_print_messages([msg], partition)
38+
deserialise_and_print_messages(
39+
[msg], partition, schemas_to_filter_out=schemas_to_filter_out
40+
)
3441
except KeyboardInterrupt:
3542
logger.debug("finished listening")
3643
finally:

src/saluki/main.py

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,17 @@
1818
_BURY = "bury"
1919
_DIG = "dig"
2020

21+
2122
def main() -> None:
2223
parser = argparse.ArgumentParser(
2324
prog="saluki",
2425
description="serialise/de-serialise flatbuffers and consume/produce from/to kafka",
2526
)
2627

2728
topic_parser = argparse.ArgumentParser(add_help=False)
28-
#TODO this needs restructuring. consider having a topic_parser with partition options etc.
29-
# because -m and -g does not make sense for saluki listen.
30-
topic_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic")
29+
topic_parser.add_argument(
30+
"topic", type=str, help="Kafka topic. format is broker<:port>/topic"
31+
)
3132

3233
topic_parser.add_argument(
3334
"-X",
@@ -37,8 +38,11 @@ def main() -> None:
3738
default=None,
3839
)
3940
topic_parser.add_argument("-p", "--partition", required=False, type=int, default=0)
41+
topic_parser.add_argument("-f", "--filter", required=False, action="append")
4042

41-
sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command")
43+
sub_parsers = parser.add_subparsers(
44+
help="sub-command help", required=True, dest="command"
45+
)
4246

4347
sniff_parser = sub_parsers.add_parser(_SNIFF, help="sniff - broker metadata")
4448
sniff_parser.add_argument("broker", type=str)
@@ -66,10 +70,11 @@ def main() -> None:
6670
consumer_mode_parser.add_argument(
6771
"-o", "--offset", help="offset to consume from", type=int, required=False
6872
)
69-
consumer_mode_parser.add_argument("-g", "--go-forwards", required=False, action="store_true")
70-
consumer_mode_parser.add_argument("-f", "--filter", required=False, action="append")
73+
consumer_mode_parser.add_argument(
74+
"-g", "--go-forwards", required=False, action="store_true"
75+
)
7176

72-
listen_parser = sub_parsers.add_parser( # noqa: F841
77+
listen_parser = sub_parsers.add_parser( # noqa: F841
7378
_LISTEN,
7479
help="listen mode - listen until KeyboardInterrupt",
7580
parents=[topic_parser, consumer_parser],
@@ -92,7 +97,7 @@ def main() -> None:
9297
# saluki bury mybroker:9092/topicname -p 0 -o startoffset finishoffset outputfile
9398
# saluki bury mybroker:9092/topicname -p 0 -t starttimestamp finishtimestamp outputfile
9499

95-
# saluki dig - push data from dump generated by saluki bury to topic
100+
# saluki dig - push data from dump generated by saluki bury to topic
96101
# saluki dig mybroker:9092/topicname -p 0 outputfile
97102

98103
play_parser = sub_parsers.add_parser(
@@ -102,16 +107,31 @@ def main() -> None:
102107
)
103108
play_parser.add_argument("topics", type=str, nargs=2, help="SRC topic DEST topic")
104109
g = play_parser.add_mutually_exclusive_group(required=True)
105-
g.add_argument("-o", "--offsets", help="offsets to replay between (inclusive)", type=int, nargs=2)
106-
g.add_argument("-t", "--timestamps", help="timestamps to replay between", type=str, nargs=2)
107-
g.add_argument("-c", "--chunk", help="forward in chunks. ie to avoid storing a huge list in memory", default=0, type=int, required=False)
110+
g.add_argument(
111+
"-o",
112+
"--offsets",
113+
help="offsets to replay between (inclusive)",
114+
type=int,
115+
nargs=2,
116+
)
117+
g.add_argument(
118+
"-t", "--timestamps", help="timestamps to replay between", type=str, nargs=2
119+
)
120+
g.add_argument(
121+
"-c",
122+
"--chunk",
123+
help="forward in chunks. ie to avoid storing a huge list in memory",
124+
default=0,
125+
type=int,
126+
required=False,
127+
)
108128

109129
if len(sys.argv) == 1:
110130
parser.print_help()
111131
sys.exit(1)
112132
args = parser.parse_args()
113133

114-
if 'kafka_config' in args and args.kafka_config is not None:
134+
if "kafka_config" in args and args.kafka_config is not None:
115135
raise NotImplementedError("-X is not implemented yet.")
116136

117137
if args.command == _LISTEN:
@@ -131,14 +151,28 @@ def main() -> None:
131151
src_broker, src_topic = parse_kafka_uri(args.topics[0])
132152
dest_broker, dest_topic = parse_kafka_uri(args.topics[1])
133153

134-
print(f"SOURCE BROKER: {src_broker}, SOURCE TOPIC: {src_topic}, DEST BROKER: {dest_broker}, DEST TOPIC: {dest_topic} ")
154+
print(
155+
f"SOURCE BROKER: {src_broker}, SOURCE TOPIC: {src_topic}, DEST BROKER: {dest_broker}, DEST TOPIC: {dest_topic} "
156+
)
135157
if args.offsets is not None:
136-
print(f"Replaying {src_broker}/{src_topic} between offsets {args.offsets[0]} and {args.offsets[1]} to {dest_broker}/{dest_topic}")
158+
print(
159+
f"Replaying {src_broker}/{src_topic} between offsets {args.offsets[0]} and {args.offsets[1]} to {dest_broker}/{dest_topic}"
160+
)
137161
elif args.timestamps is not None:
138-
print(f"Replaying {src_broker}/{src_topic} between timestamps {args.timestamps[0]} and {args.timestamps[1]} to {dest_broker}/{dest_topic}")
139-
140-
if input("OK? (y/n)").lower() == 'y':
141-
play(src_broker, src_topic, dest_broker, dest_topic, args.offsets, args.timestamps, args.chunks)
162+
print(
163+
f"Replaying {src_broker}/{src_topic} between timestamps {args.timestamps[0]} and {args.timestamps[1]} to {dest_broker}/{dest_topic}"
164+
)
165+
166+
if input("OK? (y/n)").lower() == "y":
167+
play(
168+
src_broker,
169+
src_topic,
170+
dest_broker,
171+
dest_topic,
172+
args.offsets,
173+
args.timestamps,
174+
args.chunks,
175+
)
142176
print("replayed")
143177
elif args.command == _SNIFF:
144178
sniff(args.broker)
@@ -147,5 +181,6 @@ def main() -> None:
147181
elif args.command == _DIG:
148182
pass
149183

184+
150185
if __name__ == "__main__":
151186
main()

src/saluki/play.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,24 @@
11
from confluent_kafka import Consumer, Producer
22

33

4-
def play(src_broker: str, src_topic: str, dest_broker: str, dest_topic: str, offsets:list[int]|None, timestamps:list[int]|None, chunks:int) -> None:
5-
consumer = Consumer( {
4+
def play(
5+
src_broker: str,
6+
src_topic: str,
7+
dest_broker: str,
8+
dest_topic: str,
9+
offsets: list[int] | None,
10+
timestamps: list[int] | None,
11+
chunks: int,
12+
) -> None:
13+
consumer = Consumer(
14+
{
615
"bootstrap.servers": src_broker,
716
"group.id": "saluki-play",
8-
})
9-
producer = Producer({
10-
"bootstrap.servers": dest_broker,
11-
})
17+
}
18+
)
19+
producer = Producer(
20+
{
21+
"bootstrap.servers": dest_broker,
22+
}
23+
)
1224
pass

src/saluki/sniff.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,21 @@
11
from confluent_kafka import Consumer, TopicPartition
22
from confluent_kafka.admin import AdminClient
33

4+
45
def sniff(broker: str):
5-
a = AdminClient({'bootstrap.servers': broker})
6-
c = Consumer({'bootstrap.servers': broker, 'group.id': 'saluki-sniff'})
6+
a = AdminClient({"bootstrap.servers": broker})
7+
c = Consumer({"bootstrap.servers": broker, "group.id": "saluki-sniff"})
78
t = a.list_topics(timeout=5)
89
print(f"Cluster ID: {t.cluster_id}")
910
print(f"Brokers:")
1011
[print(f"\t{value}") for value in t.brokers.values()]
1112

1213
print(f"Topics:")
1314

14-
for k,v in t.topics.items():
15+
for k, v in t.topics.items():
1516
partitions = v.partitions.keys()
1617
print(f"\t{k}:")
1718
for p in partitions:
1819
tp = TopicPartition(k, p)
1920
low, high = c.get_watermark_offsets(tp)
20-
print(f"\t\tlow:{low}, high:{high}, num_messages:{high-low}")
21+
print(f"\t\tlow:{low}, high:{high}, num_messages:{high - low}")

src/saluki/utils.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,19 @@ def _try_to_deserialise_message(payload: bytes) -> Tuple[str | None, str | None]
2323
def fallback_deserialiser(payload: bytes) -> str:
2424
return payload.decode()
2525

26-
deserialiser = DESERIALISERS.get(schema if schema is not None else "", fallback_deserialiser)
26+
deserialiser = DESERIALISERS.get(
27+
schema if schema is not None else "", fallback_deserialiser
28+
)
2729
logger.debug(f"Deserialiser: {deserialiser}")
2830

2931
ret = deserialiser(payload)
3032

3133
return schema, ret
3234

3335

34-
def deserialise_and_print_messages(msgs: List[Message], partition: int | None, filter: list[str] | None) -> None:
36+
def deserialise_and_print_messages(
37+
msgs: List[Message], partition: int | None, schemas_to_filter_out: list[str] | None
38+
) -> None:
3539
for msg in msgs:
3640
try:
3741
if msg is None:
@@ -42,8 +46,7 @@ def deserialise_and_print_messages(msgs: List[Message], partition: int | None, f
4246
if partition is not None and msg.partition() != partition:
4347
continue
4448
schema, deserialised = _try_to_deserialise_message(msg.value())
45-
if filter is not None and schema not in filter:
46-
# ignore
49+
if schemas_to_filter_out is not None and schema in schemas_to_filter_out:
4750
break
4851
time = _parse_timestamp(msg)
4952
logger.info(f"{msg.offset()} ({time}):({schema}) {deserialised}")

tests/test_listen.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ def test_listen_with_partition_assigns_to_partition():
1616
mock.patch("saluki.listen.Consumer") as c,
1717
):
1818
listen("somebroker", "sometopic", partition=expected_partition)
19-
c.return_value.assign.assert_called_with([TopicPartition(topic, expected_partition)])
19+
c.return_value.assign.assert_called_with(
20+
[TopicPartition(topic, expected_partition)]
21+
)
2022

2123

2224
def test_keyboard_interrupt_causes_consumer_to_close():

0 commit comments

Comments
 (0)