Skip to content

Commit 4b1c8b5

Browse files
committed
backing up
1 parent 27ca560 commit 4b1c8b5

File tree

4 files changed

+66
-18
lines changed

4 files changed

+66
-18
lines changed

README.md

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,42 @@
11
![](https://github.com/ISISComputingGroup/saluki/blob/main/resources/logo.png)
22

3-
Serialise/deserialise flatbuffers blobs from kafka.
4-
This currently deserialises https://github.com/ess-dmsc/python-streaming-data-types, but I am working to make it agnostic. Python bindings for the respective schema will need to be generated.
3+
ISIS-specific Kafka tools.
4+
Deserialises [the ESS flatbuffers blobs](https://github.com/ess-dmsc/python-streaming-data-types) from Kafka.
5+
6+
Also allows replaying data in a topic.
57

68
# Usage
79
See `saluki --help` for all options.
810

911
## Listen to a topic for updates
1012
`saluki listen mybroker:9092/mytopic` - This will listen for updates for `mytopic` on `mybroker`.
1113

14+
### Filter to specific schemas
15+
16+
TODO
17+
1218
## Consume from a topic
1319
`saluki consume mybroker:9092/mytopic -p 1 -o 123456 -m 10` - This will print 9 messages before (and inclusively the offset specified) offset `123456` of `mytopic` on `mybroker`, in partition 1.
1420

1521
Use the `-g` flag to go the other way, ie. in the above example to consume the 9 messages _after_ offset 123456
1622

23+
### Consume X of a certain schema(s)
24+
25+
TODO
26+
27+
## List all topics and their high, low watermarks and number of messages
28+
TODO
29+
30+
## Replay data from one topic to another
31+
32+
### Between offsets
33+
34+
TODO
35+
36+
### Between timestamps
37+
38+
TODO
39+
1740
# Install
1841
`pip install saluki`
1942

src/saluki/listen.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
logger = logging.getLogger("saluki")
88

99

10-
def listen(broker: str, topic: str, partition: int | None = None) -> None:
10+
def listen(broker: str, topic: str, partition: int | None = None, filter: list[str] | None = None) -> None:
1111
"""
1212
Listen to a topic and deserialise each message
1313
:param broker: the broker address, including the port

src/saluki/main.py

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111

1212
_LISTEN = "listen"
1313
_CONSUME = "consume"
14-
_REPLAY = "replay"
15-
14+
_PLAY = "play"
15+
_SNIFF = "sniff"
16+
_BURY = "bury"
17+
_DIG = "dig"
1618

1719
def main() -> None:
1820
parser = argparse.ArgumentParser(
@@ -21,6 +23,8 @@ def main() -> None:
2123
)
2224

2325
parent_parser = argparse.ArgumentParser(add_help=False)
26+
#TODO this needs restructuring. consider having a topic_parser with partition options etc.
27+
# because -m and -g does not make sense for saluki listen.
2428
parent_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic")
2529

2630
parent_parser.add_argument(
@@ -41,6 +45,9 @@ def main() -> None:
4145

4246
sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command")
4347

48+
sniff_parser = sub_parsers.add_parser(_SNIFF, help="sniff - broker metadata")
49+
sniff_parser.add_argument("broker", type=str)
50+
4451
consumer_parser = argparse.ArgumentParser(add_help=False)
4552
consumer_parser.add_argument(
4653
"-e",
@@ -67,37 +74,44 @@ def main() -> None:
6774
consumer_mode_parser.add_argument("-s", "--schema", required=False, default="auto", type=str)
6875
consumer_mode_parser.add_argument("-g", "--go-forwards", required=False, action="store_true")
6976
consumer_mode_parser.add_argument("-p", "--partition", required=False, type=int, default=0)
70-
# TODO make this allow multiple comma-split args
71-
consumer_mode_parser.add_argument("-f", "--filter", required=False, type=str, nargs='+')
77+
consumer_mode_parser.add_argument("-f", "--filter", required=False, action="append")
7278

7379
listen_parser = sub_parsers.add_parser(
7480
_LISTEN,
7581
help="listen mode - listen until KeyboardInterrupt",
7682
parents=[parent_parser, consumer_parser],
7783
)
7884
listen_parser.add_argument("-p", "--partition", required=False, type=int, default=None)
79-
# TODO make filtering work for this as well
8085

8186
#### NEW FEATURES HERE PLZ
8287
# replay from, to offset
83-
# saluki replay -o FROMOFFSET TOOFFSET srcbroker/srctopic destbroker/desttopic
88+
# saluki play -o FROMOFFSET TOOFFSET srcbroker/srctopic destbroker/desttopic
8489

8590
# replay from, to timestamp
86-
# saluki replay -t FROMTIMESTAMP TOTIMESTAMP srcbroker/srctopic destbroker/desttopic
91+
# saluki play -t FROMTIMESTAMP TOTIMESTAMP srcbroker/srctopic destbroker/desttopic
8792

8893
# saluki consume x messages of y schema
8994
# saluki consume -f pl72 mybroker:9092/XXX_runInfo -m 10 # get the last pl72 run starts
9095

9196
# saluki consume x messages of y or z schema
92-
# saluki consume -f pl72,6s4t mybroker:9092/XXX_runInfo -m 10 # get the last pl72 run starts or 6s4t run stops
97+
# saluki consume -f pl72,6s4t mybroker:9092/XXX_runInfo -m 10 # get the last pl72 run starts or 6s4t run stops#
98+
99+
# saluki bury - dump data on topic to file
100+
# saluki bury mybroker:9092/topicname -p 0 -f offsetortimestamp -t offsetortimestamp outputfile
101+
102+
# saluki dig - push data from dump generated by saluki bury to topic
103+
# saluki dig mybroker:9092/topicname -p 0 outputfile
93104

94-
replay_parser = sub_parsers.add_parser(
95-
_REPLAY,
105+
# saluki sniff - broker metadata ie. topic watermarks and num_messages.
106+
# saluki sniff mybroker:9092
107+
108+
play_parser = sub_parsers.add_parser(
109+
_PLAY,
96110
help="replay mode - replay data into another topic",
97111
parents=[parent_parser],
98112
)
99-
replay_parser.add_argument("-o", "--offset", help="replay between offsets", type=bool, required=False, default="store_false")
100-
replay_parser.add_argument("-t", "--timestamp", help="replay between timestamps", type=bool, required=False)
113+
play_parser.add_argument("-o", "--offset", help="offsets to replay between (inclusive)", type=int, nargs=2)
114+
play_parser.add_argument("-t", "--timestamp", help="timestamps to replay between", type=str, nargs=2)
101115

102116

103117
if len(sys.argv) == 1:
@@ -114,7 +128,7 @@ def main() -> None:
114128
logger.addHandler(logging.FileHandler(args.log_file.name))
115129

116130
if args.command == _LISTEN:
117-
listen(broker, topic, args.partition)
131+
listen(broker, topic, args.partition, args.filter)
118132
elif args.command == _CONSUME:
119133
consume(
120134
broker,
@@ -124,7 +138,15 @@ def main() -> None:
124138
args.offset,
125139
args.go_forwards,
126140
)
127-
elif args.command == _REPLAY:
141+
elif args.command == _PLAY:
142+
pass
143+
#play(src_broker, src_topic, dest_broker, dest_topic, args.offset, args.timestamp)
144+
elif args.command == _SNIFF:
145+
print(args.broker)
146+
pass
147+
elif args.command == _BURY:
148+
pass
149+
elif args.command == _DIG:
128150
pass
129151

130152
if __name__ == "__main__":

src/saluki/utils.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def fallback_deserialiser(payload: bytes) -> str:
3131
return schema, ret
3232

3333

34-
def deserialise_and_print_messages(msgs: List[Message], partition: int | None) -> None:
34+
def deserialise_and_print_messages(msgs: List[Message], partition: int | None, filter: list[str] | None) -> None:
3535
for msg in msgs:
3636
try:
3737
if msg is None:
@@ -42,6 +42,9 @@ def deserialise_and_print_messages(msgs: List[Message], partition: int | None) -
4242
if partition is not None and msg.partition() != partition:
4343
continue
4444
schema, deserialised = _try_to_deserialise_message(msg.value())
45+
if filter is not None and schema not in filter:
46+
# ignore
47+
break
4548
time = _parse_timestamp(msg)
4649
logger.info(f"{msg.offset()} ({time}):({schema}) {deserialised}")
4750
except Exception as e:

0 commit comments

Comments
 (0)