Skip to content

Commit b34c9c4

Browse files
committed
allow filtering on consume
1 parent 6a90c6e commit b34c9c4

File tree

5 files changed

+10
-21
lines changed

5 files changed

+10
-21
lines changed

README.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ See `saluki --help` for all options.
2020

2121
Use the `-g` flag to go the other way, ie. in the above example to consume the 9 messages _after_ offset 123456
2222

23-
### Consume X of a certain schema(s)
24-
25-
TODO
23+
You can also filter out messages to specific schema(s) with the `-f` flag, like the example above for `listen`.
2624

2725
## `sniff` - List all topics and their high, low watermarks and number of messages
2826
`saluki sniff mybroker:9092`

src/saluki/consume.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ def consume(
1414
num_messages: int = 1,
1515
offset: int | None = None,
1616
go_forwards: bool = False,
17+
schemas_to_filter_to: list[str] | None = None,
1718
) -> None:
1819
"""
1920
consume from a topic and deserialise each message
@@ -24,6 +25,7 @@ def consume(
2425
:param num_messages: number of messages to consume
2526
:param offset: offset to consume from/to
2627
:param go_forwards: whether to consume forwards or backwards
28+
:param schemas_to_filter_to: schemas in messages to filter to
2729
:return: None
2830
"""
2931
c = Consumer(
@@ -57,7 +59,7 @@ def consume(
5759
try:
5860
logger.info(f"Consuming {num_messages} messages")
5961
msgs = c.consume(num_messages)
60-
deserialise_and_print_messages(msgs, partition)
62+
deserialise_and_print_messages(msgs, partition, schemas_to_filter_to)
6163
except Exception:
6264
logger.exception("Got exception while consuming:")
6365
finally:

src/saluki/listen.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ def listen(
1111
broker: str,
1212
topic: str,
1313
partition: int | None = None,
14-
schemas_to_filter_out: list[str] | None = None,
14+
schemas_to_filter_to: list[str] | None = None,
1515
) -> None:
1616
"""
1717
Listen to a topic and deserialise each message
1818
:param broker: the broker address, including the port
1919
:param topic: the topic to use
2020
:param partition: the partition to listen to (default is all partitions in a given topic)
21+
:param schemas_to_filter_to: schemas to filter when listening to messages
2122
:return: None
2223
"""
2324
c = Consumer(
@@ -36,7 +37,7 @@ def listen(
3637
while True:
3738
msg = c.poll(1.0)
3839
deserialise_and_print_messages(
39-
[msg], partition, schemas_to_filter_out=schemas_to_filter_out
40+
[msg], partition, schemas_to_filter_to=schemas_to_filter_to
4041
)
4142
except KeyboardInterrupt:
4243
logger.debug("finished listening")

src/saluki/main.py

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,6 @@ def main() -> None:
7272
parents=[topic_parser, consumer_parser],
7373
)
7474

75-
#### NEW FEATURES HERE PLZ
76-
# saluki consume x messages of y schema
77-
# saluki consume -f pl72 mybroker:9092/XXX_runInfo -m 10 # get the last pl72 run starts
78-
79-
# saluki consume x messages of y or z schema
80-
# saluki consume -f pl72,6s4t mybroker:9092/XXX_runInfo -m 10 # get the last pl72 run starts or 6s4t run stops#
81-
8275
play_parser = sub_parsers.add_parser(
8376
_PLAY,
8477
help="replay mode - replay data into another topic",
@@ -111,12 +104,7 @@ def main() -> None:
111104
elif args.command == _CONSUME:
112105
broker, topic = parse_kafka_uri(args.topic)
113106
consume(
114-
broker,
115-
topic,
116-
args.partition,
117-
args.messages,
118-
args.offset,
119-
args.go_forwards,
107+
broker, topic, args.partition, args.messages, args.offset, args.go_forwards, args.filter
120108
)
121109
elif args.command == _PLAY:
122110
src_broker, src_topic = parse_kafka_uri(args.topics[0])

src/saluki/utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def fallback_deserialiser(payload: bytes) -> str:
3232

3333

3434
def deserialise_and_print_messages(
35-
msgs: List[Message], partition: int | None, schemas_to_filter_out: list[str] | None
35+
msgs: List[Message], partition: int | None, schemas_to_filter_to: list[str] | None = None
3636
) -> None:
3737
for msg in msgs:
3838
try:
@@ -44,7 +44,7 @@ def deserialise_and_print_messages(
4444
if partition is not None and msg.partition() != partition:
4545
continue
4646
schema, deserialised = _try_to_deserialise_message(msg.value())
47-
if schemas_to_filter_out is not None and schema in schemas_to_filter_out:
47+
if schemas_to_filter_to is not None and schema in schemas_to_filter_to:
4848
break
4949
time = _parse_timestamp(msg)
5050
logger.info(f"{msg.offset()} ({time}):({schema}) {deserialised}")

0 commit comments

Comments
 (0)