Skip to content

Commit a72ad7d

Browse files
committed
backing up again
1 parent 000df6d commit a72ad7d

File tree

3 files changed

+65
-26
lines changed

3 files changed

+65
-26
lines changed

src/saluki/main.py

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@
22
import logging
33
import sys
44

5-
from dateutil.parser import parse, ParserError
65
from saluki.consume import consume
76
from saluki.listen import listen
87
from saluki.play import play
98
from saluki.sniff import sniff
10-
from saluki.utils import parse_kafka_uri
9+
from saluki.utils import parse_kafka_uri, dateutil_parsable_or_unix_timestamp
1110

1211
logger = logging.getLogger("saluki")
1312
logging.basicConfig(level=logging.INFO)
@@ -18,22 +17,14 @@
1817
_SNIFF = "sniff"
1918

2019

21-
def _dateutil_parsable_or_unix_timestamp(inp: str) -> float:
22-
try:
23-
try:
24-
return parse(inp).timestamp()
25-
except ParserError:
26-
return float(inp)
27-
except ValueError:
28-
raise argparse.ArgumentTypeError(f"timestamp {inp} is not parsable by dateutil.parse() and is not a unix timestamp")
29-
30-
3120
def main() -> None:
3221
parser = argparse.ArgumentParser(
3322
prog="saluki",
3423
description="serialise/de-serialise flatbuffers and consume/produce from/to kafka",
3524
)
36-
parser.add_argument(
25+
common_options = argparse.ArgumentParser(add_help=False)
26+
common_options.add_argument("-v", "--verbose", help="show DEBUG logs", action='store_true')
27+
common_options.add_argument(
3728
"-l",
3829
"--log-file",
3930
help="filename to output all data to",
@@ -57,8 +48,8 @@ def main() -> None:
5748

5849
sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command")
5950

60-
sniff_parser = sub_parsers.add_parser(_SNIFF, help="sniff - broker metadata")
61-
sniff_parser.add_argument("broker", type=str, help="broker, optionally suffixed with a topic name")
51+
sniff_parser = sub_parsers.add_parser(_SNIFF, help="sniff - broker metadata", parents=[common_options])
52+
sniff_parser.add_argument("broker", type=str, help="broker, optionally suffixed with a topic name to filter to")
6253

6354
consumer_parser = argparse.ArgumentParser(add_help=False)
6455
consumer_parser.add_argument(
@@ -70,7 +61,7 @@ def main() -> None:
7061
)
7162

7263
consumer_mode_parser = sub_parsers.add_parser(
73-
_CONSUME, help="consumer mode", parents=[topic_parser, consumer_parser]
64+
_CONSUME, help="consumer mode", parents=[topic_parser, consumer_parser, common_options]
7465
)
7566
consumer_mode_parser.add_argument(
7667
"-m",
@@ -88,13 +79,13 @@ def main() -> None:
8879
listen_parser = sub_parsers.add_parser( # noqa: F841
8980
_LISTEN,
9081
help="listen mode - listen until KeyboardInterrupt",
91-
parents=[topic_parser, consumer_parser],
82+
parents=[topic_parser, consumer_parser, common_options],
9283
)
9384

9485
play_parser = sub_parsers.add_parser(
9586
_PLAY,
9687
help="replay mode - replay data into another topic",
97-
parents=[],
88+
parents=[common_options],
9889
)
9990
play_parser.add_argument("topics", type=str, nargs=2, help="SRC topic DEST topic")
10091
g = play_parser.add_mutually_exclusive_group(required=True)
@@ -105,13 +96,16 @@ def main() -> None:
10596
type=int,
10697
nargs=2,
10798
)
108-
g.add_argument("-t", "--timestamps", help='timestamps to replay between in ISO8601 or RFC3339 format ie. "2025-11-17 07:00:00" ', type=_dateutil_parsable_or_unix_timestamp, nargs=2)
99+
g.add_argument("-t", "--timestamps", help='timestamps to replay between in ISO8601 or RFC3339 format ie. "2025-11-17 07:00:00 or as a unix timestamp" ', type=dateutil_parsable_or_unix_timestamp, nargs=2)
109100

110101
if len(sys.argv) == 1:
111102
parser.print_help()
112103
sys.exit(1)
113104
args = parser.parse_args()
114105

106+
if args.verbose:
107+
logger.setLevel(logging.DEBUG)
108+
115109
if args.log_file:
116110
logger.addHandler(logging.FileHandler(args.log_file.name))
117111

@@ -141,8 +135,10 @@ def main() -> None:
141135
elif args.command == _SNIFF:
142136
try:
143137
broker, topic = parse_kafka_uri(args.broker)
138+
logger.debug(f"Sniffing single topic {topic} on broker {broker}")
144139
sniff(broker, topic)
145140
except RuntimeError:
141+
logger.debug(f"Sniffing whole broker {args.broker}")
146142
sniff(args.broker)
147143

148144

src/saluki/play.py

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22
import uuid
3-
3+
from time import sleep
44
from confluent_kafka import Consumer, Producer, TopicPartition
55

66
logger = logging.getLogger("saluki")
@@ -40,27 +40,53 @@ def play(
4040
src_partition = 0
4141

4242
if timestamps is not None:
43-
start_offset, stop_offset = consumer.offsets_for_times(
43+
logger.debug(f"getting offsets for times: {timestamps[0]} and {timestamps[1]}")
44+
start_offset = consumer.offsets_for_times(
4445
[
4546
TopicPartition(src_topic, src_partition, timestamps[0]),
46-
TopicPartition(src_topic, src_partition, timestamps[1]),
47+
4748
]
48-
)
49+
)[0]
50+
# See https://github.com/confluentinc/confluent-kafka-python/issues/1178 as to why offsets_for_times is called twice.
51+
stop_offset = consumer.offsets_for_times([TopicPartition(src_topic, src_partition, timestamps[1])])[0]
4952
elif offsets is not None:
5053
start_offset = TopicPartition(src_topic, src_partition, offsets[0])
5154
stop_offset = TopicPartition(src_topic, src_partition, offsets[1])
5255
else:
5356
raise ValueError("offsets and timestamps cannot both be None")
5457

58+
logger.debug(f"start_offset: {start_offset.offset}, stop_offset: {stop_offset.offset}")
59+
60+
logger.debug(f"assigning to offset {start_offset.offset}")
5561
consumer.assign([start_offset])
5662

5763
num_messages = stop_offset.offset - start_offset.offset + 1
5864

65+
def delivery_report(err, msg):
66+
""" Called once for each message produced to indicate delivery result.
67+
Triggered by poll() or flush()."""
68+
if err is not None:
69+
logger.error('Message delivery failed: {}'.format(err))
70+
else:
71+
logger.debug('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
72+
5973
try:
6074
msgs = consumer.consume(num_messages)
75+
logger.debug(f"finished consuming {num_messages} messages")
76+
consumer.close()
77+
# logger.debug(f"{msgs}")
6178
for message in msgs:
62-
producer.produce(dest_topic, message.value(), message.key())
63-
producer.flush()
79+
producer.poll(0)
80+
producer.produce(dest_topic, message.value(), message.key(), callback=delivery_report)
81+
# producer.produce_batch(dest_topic, [{'key': message.key(), 'value': message.value()} for message in msgs])
82+
# producer.poll()
83+
logger.debug(f"flushing producer. len(p): {len(producer)}")
84+
# while len(producer): producer.flush()
85+
86+
producer.flush(timeout=10)
87+
88+
logger.debug(f"length after flushing: {len(producer)}")
89+
6490
except Exception:
6591
logger.exception("Got exception while replaying:")
6692
finally:

src/saluki/utils.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import datetime
22
import logging
3+
from argparse import ArgumentTypeError
34
from typing import List, Tuple
45
from zoneinfo import ZoneInfo
56

67
from confluent_kafka import Message
8+
from dateutil.parser import parse, ParserError
79
from streaming_data_types import DESERIALISERS
810
from streaming_data_types.exceptions import ShortBufferException
911
from streaming_data_types.utils import get_schema
@@ -47,7 +49,7 @@ def deserialise_and_print_messages(
4749
if schemas_to_filter_to is not None and schema not in schemas_to_filter_to:
4850
continue
4951
time = _parse_timestamp(msg)
50-
logger.info(f"{msg.offset()} ({time}):({schema}) {deserialised}")
52+
logger.info(f"(o:{msg.offset()},t:{time},s:{schema}) {deserialised}")
5153
except Exception as e:
5254
logger.exception(f"Got error while deserialising: {e}")
5355

@@ -89,3 +91,18 @@ def parse_kafka_uri(uri: str) -> Tuple[str, str]:
8991
broker,
9092
topic,
9193
)
94+
95+
96+
def dateutil_parsable_or_unix_timestamp(inp: str) -> int:
97+
"""
98+
Parse a dateutil string, if this fails then try to parse a unix timestamp.
99+
This returns a unix timestamp as an int
100+
"""
101+
try:
102+
try:
103+
return int(round(parse(inp).timestamp() * 1000))
104+
except ParserError:
105+
logger.debug(f"Failed to parse {inp} as a dateutil parsable. Falling back to unix timestamp")
106+
return int(inp)
107+
except ValueError:
108+
raise ArgumentTypeError(f"timestamp {inp} is not parsable by dateutil.parse() and is not a unix timestamp")

0 commit comments

Comments
 (0)