11import argparse
2- import sys
3-
4-
52import logging
6- from logging import FileHandler
3+ import sys
74from typing import Tuple
85
96from saluki .consume import consume
@@ -30,25 +27,25 @@ def parse_kafka_uri(uri: str) -> Tuple[str, str]:
3027 broker , topic = tail .split ("/" ) if "/" in tail else (tail , "" )
3128 if not broker :
3229 raise RuntimeError (
33- f"Unable to parse URI { uri } , broker not defined. URI should be of form [PROTOCOL+SASL_MECHANISM\\ username@]broker:9092"
30+ f"Unable to parse URI { uri } , broker not defined. URI should be of form"
31+ f" [PROTOCOL+SASL_MECHANISM\\ username@]broker:9092"
3432 )
3533 if username and not (security_protocol and sasl_mechanism ):
3634 raise RuntimeError (
37- f"Unable to parse URI { uri } , PROTOCOL or SASL_MECHANISM not defined. URI should be of form [PROTOCOL+SASL_MECHANISM\\ username@]broker:9092"
35+ f"Unable to parse URI { uri } , PROTOCOL or SASL_MECHANISM not defined."
36+ f" URI should be of form [PROTOCOL+SASL_MECHANISM\\ username@]broker:9092"
3837 )
3938 return broker , topic
4039
4140
42- def main ():
41+ def main () -> None :
4342 parser = argparse .ArgumentParser (
4443 prog = "saluki" ,
4544 description = "serialise/de-serialise flatbuffers and consume/produce from/to kafka" ,
4645 )
4746
4847 parent_parser = argparse .ArgumentParser (add_help = False )
49- parent_parser .add_argument (
50- "topic" , type = str , help = "Kafka topic. format is broker<:port>/topic"
51- )
48+ parent_parser .add_argument ("topic" , type = str , help = "Kafka topic. format is broker<:port>/topic" )
5249
5350 parent_parser .add_argument (
5451 "-X" ,
@@ -66,9 +63,7 @@ def main():
6663 type = argparse .FileType ("a" ),
6764 )
6865
69- sub_parsers = parser .add_subparsers (
70- help = "sub-command help" , required = True , dest = "command"
71- )
66+ sub_parsers = parser .add_subparsers (help = "sub-command help" , required = True , dest = "command" )
7267
7368 consumer_parser = argparse .ArgumentParser (add_help = False )
7469 consumer_parser .add_argument (
@@ -93,24 +88,16 @@ def main():
9388 consumer_mode_parser .add_argument (
9489 "-o" , "--offset" , help = "offset to consume from" , type = int , required = False
9590 )
96- consumer_mode_parser .add_argument (
97- "-s" , "--schema" , required = False , default = "auto" , type = str
98- )
99- consumer_mode_parser .add_argument (
100- "-g" , "--go-forwards" , required = False , action = "store_true"
101- )
102- consumer_mode_parser .add_argument (
103- "-p" , "--partition" , required = False , type = int , default = 0
104- )
91+ consumer_mode_parser .add_argument ("-s" , "--schema" , required = False , default = "auto" , type = str )
92+ consumer_mode_parser .add_argument ("-g" , "--go-forwards" , required = False , action = "store_true" )
93+ consumer_mode_parser .add_argument ("-p" , "--partition" , required = False , type = int , default = 0 )
10594
10695 listen_parser = sub_parsers .add_parser (
10796 _LISTEN ,
10897 help = "listen mode - listen until KeyboardInterrupt" ,
10998 parents = [parent_parser , consumer_parser ],
11099 )
111- listen_parser .add_argument (
112- "-p" , "--partition" , required = False , type = int , default = None
113- )
100+ listen_parser .add_argument ("-p" , "--partition" , required = False , type = int , default = None )
114101
115102 if len (sys .argv ) == 1 :
116103 parser .print_help ()
@@ -123,7 +110,7 @@ def main():
123110 broker , topic = parse_kafka_uri (args .topic )
124111
125112 if args .log_file :
126- logger .addHandler (FileHandler (args .log_file .name ))
113+ logger .addHandler (logging . FileHandler (args .log_file .name ))
127114
128115 if args .command == _LISTEN :
129116 listen (broker , topic , args .partition )
@@ -136,8 +123,6 @@ def main():
136123 args .offset ,
137124 args .go_forwards ,
138125 )
139- elif args .command == _PRODUCE :
140- raise NotImplementedError
141126
142127
143128if __name__ == "__main__" :
0 commit comments