Skip to content

Commit f0f8901

Browse files
committed
add sniff, restructure args a bit, remove log-file in favour of just piping
1 parent 4b1c8b5 commit f0f8901

File tree

2 files changed

+33
-26
lines changed

2 files changed

+33
-26
lines changed

src/saluki/main.py

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from saluki.consume import consume
66
from saluki.listen import listen
7+
from saluki.sniff import sniff
78
from saluki.utils import parse_kafka_uri
89

910
logger = logging.getLogger("saluki")
@@ -22,26 +23,19 @@ def main() -> None:
2223
description="serialise/de-serialise flatbuffers and consume/produce from/to kafka",
2324
)
2425

25-
parent_parser = argparse.ArgumentParser(add_help=False)
26+
topic_parser = argparse.ArgumentParser(add_help=False)
2627
#TODO this needs restructuring. consider having a topic_parser with partition options etc.
2728
# because -m and -g does not make sense for saluki listen.
28-
parent_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic")
29+
topic_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic")
2930

30-
parent_parser.add_argument(
31+
topic_parser.add_argument(
3132
"-X",
3233
"--kafka-config",
3334
help="kafka options to pass through to librdkafka",
3435
required=False,
3536
default=None,
3637
)
37-
parent_parser.add_argument(
38-
"-l",
39-
"--log-file",
40-
help="filename to output all data to",
41-
required=False,
42-
default=None,
43-
type=argparse.FileType("a"),
44-
)
38+
topic_parser.add_argument("-p", "--partition", required=False, type=int, default=0)
4539

4640
sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command")
4741

@@ -58,7 +52,7 @@ def main() -> None:
5852
)
5953

6054
consumer_mode_parser = sub_parsers.add_parser(
61-
_CONSUME, help="consumer mode", parents=[parent_parser, consumer_parser]
55+
_CONSUME, help="consumer mode", parents=[topic_parser, consumer_parser]
6256
)
6357
consumer_mode_parser.add_argument(
6458
"-m",
@@ -71,17 +65,14 @@ def main() -> None:
7165
consumer_mode_parser.add_argument(
7266
"-o", "--offset", help="offset to consume from", type=int, required=False
7367
)
74-
consumer_mode_parser.add_argument("-s", "--schema", required=False, default="auto", type=str)
7568
consumer_mode_parser.add_argument("-g", "--go-forwards", required=False, action="store_true")
76-
consumer_mode_parser.add_argument("-p", "--partition", required=False, type=int, default=0)
7769
consumer_mode_parser.add_argument("-f", "--filter", required=False, action="append")
7870

79-
listen_parser = sub_parsers.add_parser(
71+
listen_parser = sub_parsers.add_parser( # noqa: F841
8072
_LISTEN,
8173
help="listen mode - listen until KeyboardInterrupt",
82-
parents=[parent_parser, consumer_parser],
74+
parents=[topic_parser, consumer_parser],
8375
)
84-
listen_parser.add_argument("-p", "--partition", required=False, type=int, default=None)
8576

8677
#### NEW FEATURES HERE PLZ
8778
# replay from, to offset
@@ -108,7 +99,7 @@ def main() -> None:
10899
play_parser = sub_parsers.add_parser(
109100
_PLAY,
110101
help="replay mode - replay data into another topic",
111-
parents=[parent_parser],
102+
parents=[topic_parser],
112103
)
113104
play_parser.add_argument("-o", "--offset", help="offsets to replay between (inclusive)", type=int, nargs=2)
114105
play_parser.add_argument("-t", "--timestamp", help="timestamps to replay between", type=str, nargs=2)
@@ -119,17 +110,14 @@ def main() -> None:
119110
sys.exit(1)
120111
args = parser.parse_args()
121112

122-
if args.kafka_config is not None:
113+
if 'kafka_config' in args and args.kafka_config is not None:
123114
raise NotImplementedError("-X is not implemented yet.")
124115

125-
broker, topic = parse_kafka_uri(args.topic)
126-
127-
if args.log_file:
128-
logger.addHandler(logging.FileHandler(args.log_file.name))
129-
130116
if args.command == _LISTEN:
117+
broker, topic = parse_kafka_uri(args.topic)
131118
listen(broker, topic, args.partition, args.filter)
132119
elif args.command == _CONSUME:
120+
broker, topic = parse_kafka_uri(args.topic)
133121
consume(
134122
broker,
135123
topic,
@@ -142,8 +130,7 @@ def main() -> None:
142130
pass
143131
#play(src_broker, src_topic, dest_broker, dest_topic, args.offset, args.timestamp)
144132
elif args.command == _SNIFF:
145-
print(args.broker)
146-
pass
133+
sniff(args.broker)
147134
elif args.command == _BURY:
148135
pass
149136
elif args.command == _DIG:

src/saluki/sniff.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from confluent_kafka import Consumer, TopicPartition
2+
from confluent_kafka.admin import AdminClient
3+
4+
def sniff(broker: str):
5+
a = AdminClient({'bootstrap.servers': broker})
6+
c = Consumer({'bootstrap.servers': broker, 'group.id': 'saluki-sniff'})
7+
t = a.list_topics(timeout=5)
8+
print(f"Cluster ID: {t.cluster_id}")
9+
print(f"Brokers:")
10+
[print(f"\t{value}") for value in t.brokers.values()]
11+
12+
print(f"Topics:")
13+
14+
for k,v in t.topics.items():
15+
partitions = v.partitions.keys()
16+
print(f"\t{k}:")
17+
for p in partitions:
18+
tp = TopicPartition(k, p)
19+
low, high = c.get_watermark_offsets(tp)
20+
print(f"\t\tlow:{low}, high:{high}, num_messages:{high-low}")

0 commit comments

Comments
 (0)