Skip to content

Commit a0f8c2d

Browse files
committed
ruff
1 parent 7f31937 commit a0f8c2d

File tree

5 files changed

+56
-21
lines changed

5 files changed

+56
-21
lines changed

src/saluki/consume.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def consume(
1616
offset: int | None = None,
1717
go_forwards: bool = False,
1818
schemas_to_filter_to: list[str] | None = None,
19-
timestamp: int|None = None,
19+
timestamp: int | None = None,
2020
) -> None:
2121
"""
2222
consume from a topic and deserialise each message
@@ -44,9 +44,7 @@ def consume(
4444
)
4545

4646
if timestamp is not None:
47-
offset = c.offsets_for_times(
48-
[TopicPartition(topic, partition, timestamp)]
49-
)[0].offset
47+
offset = c.offsets_for_times([TopicPartition(topic, partition, timestamp)])[0].offset
5048
logger.debug(f"offset for timestamp {timestamp} is {offset}")
5149

5250
if go_forwards:

src/saluki/main.py

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from saluki.listen import listen
77
from saluki.play import play
88
from saluki.sniff import sniff
9-
from saluki.utils import parse_kafka_uri, dateutil_parsable_or_unix_timestamp
9+
from saluki.utils import dateutil_parsable_or_unix_timestamp, parse_kafka_uri
1010

1111
logger = logging.getLogger("saluki")
1212
logging.basicConfig(level=logging.INFO)
@@ -23,7 +23,7 @@ def main() -> None:
2323
description="serialise/de-serialise flatbuffers and consume/produce from/to kafka",
2424
)
2525
common_options = argparse.ArgumentParser(add_help=False)
26-
common_options.add_argument("-v", "--verbose", help="show DEBUG logs", action='store_true')
26+
common_options.add_argument("-v", "--verbose", help="show DEBUG logs", action="store_true")
2727
common_options.add_argument(
2828
"-l",
2929
"--log-file",
@@ -48,8 +48,12 @@ def main() -> None:
4848

4949
sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command")
5050

51-
sniff_parser = sub_parsers.add_parser(_SNIFF, help="sniff - broker metadata", parents=[common_options])
52-
sniff_parser.add_argument("broker", type=str, help="broker, optionally suffixed with a topic name to filter to")
51+
sniff_parser = sub_parsers.add_parser(
52+
_SNIFF, help="sniff - broker metadata", parents=[common_options]
53+
)
54+
sniff_parser.add_argument(
55+
"broker", type=str, help="broker, optionally suffixed with a topic name to filter to"
56+
)
5357

5458
consumer_parser = argparse.ArgumentParser(add_help=False)
5559
consumer_parser.add_argument(
@@ -75,9 +79,17 @@ def main() -> None:
7579
consumer_mode_parser.add_argument("-g", "--go-forwards", required=False, action="store_true")
7680
cg = consumer_mode_parser.add_mutually_exclusive_group(required=False)
7781
cg.add_argument(
78-
"-o", "--offset", help="offset to consume from", type=int,
82+
"-o",
83+
"--offset",
84+
help="offset to consume from",
85+
type=int,
86+
)
87+
cg.add_argument(
88+
"-t",
89+
"--timestamp",
90+
help="timestamp to consume from",
91+
type=dateutil_parsable_or_unix_timestamp,
7992
)
80-
cg.add_argument("-t", "--timestamp", help="timestamp to consume from", type=dateutil_parsable_or_unix_timestamp)
8193

8294
listen_parser = sub_parsers.add_parser( # noqa: F841
8395
_LISTEN,
@@ -99,7 +111,14 @@ def main() -> None:
99111
type=int,
100112
nargs=2,
101113
)
102-
g.add_argument("-t", "--timestamps", help='timestamps to replay between in ISO8601 or RFC3339 format ie. "2025-11-17 07:00:00 or as a unix timestamp" ', type=dateutil_parsable_or_unix_timestamp, nargs=2)
114+
g.add_argument(
115+
"-t",
116+
"--timestamps",
117+
help="timestamps to replay between in ISO8601 or RFC3339 format ie."
118+
' "2025-11-17 07:00:00 or as a unix timestamp" ',
119+
type=dateutil_parsable_or_unix_timestamp,
120+
nargs=2,
121+
)
103122

104123
if len(sys.argv) == 1:
105124
parser.print_help()
@@ -121,7 +140,14 @@ def main() -> None:
121140
elif args.command == _CONSUME:
122141
broker, topic = parse_kafka_uri(args.topic)
123142
consume(
124-
broker, topic, args.partition, args.messages, args.offset, args.go_forwards, args.filter, args.timestamp
143+
broker,
144+
topic,
145+
args.partition,
146+
args.messages,
147+
args.offset,
148+
args.go_forwards,
149+
args.filter,
150+
args.timestamp,
125151
)
126152
elif args.command == _PLAY:
127153
src_broker, src_topic = parse_kafka_uri(args.topics[0])

src/saluki/play.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import uuid
3+
34
from confluent_kafka import Consumer, Producer, TopicPartition
45

56
logger = logging.getLogger("saluki")
@@ -44,11 +45,13 @@ def play(
4445
start_offset = consumer.offsets_for_times(
4546
[
4647
TopicPartition(src_topic, src_partition, timestamps[0]),
47-
4848
]
4949
)[0]
50-
# See https://github.com/confluentinc/confluent-kafka-python/issues/1178 as to why offsets_for_times is called twice.
51-
stop_offset = consumer.offsets_for_times([TopicPartition(src_topic, src_partition, timestamps[1])])[0]
50+
# See https://github.com/confluentinc/confluent-kafka-python/issues/1178
51+
# as to why offsets_for_times is called twice.
52+
stop_offset = consumer.offsets_for_times(
53+
[TopicPartition(src_topic, src_partition, timestamps[1])]
54+
)[0]
5255
elif offsets is not None:
5356
start_offset = TopicPartition(src_topic, src_partition, offsets[0])
5457
stop_offset = TopicPartition(src_topic, src_partition, offsets[1])
@@ -66,7 +69,9 @@ def play(
6669
msgs = consumer.consume(num_messages)
6770
logger.debug(f"finished consuming {num_messages} messages")
6871
consumer.close()
69-
producer.produce_batch(dest_topic, [{'key': message.key(), 'value': message.value()} for message in msgs])
72+
producer.produce_batch(
73+
dest_topic, [{"key": message.key(), "value": message.value()} for message in msgs]
74+
)
7075
logger.debug(f"flushing producer. len(p): {len(producer)}")
7176
producer.flush(timeout=10)
7277

src/saluki/utils.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from zoneinfo import ZoneInfo
66

77
from confluent_kafka import Message
8-
from dateutil.parser import parse, ParserError
8+
from dateutil.parser import ParserError, parse
99
from streaming_data_types import DESERIALISERS
1010
from streaming_data_types.exceptions import ShortBufferException
1111
from streaming_data_types.utils import get_schema
@@ -102,7 +102,11 @@ def dateutil_parsable_or_unix_timestamp(inp: str) -> int:
102102
try:
103103
return int(round(parse(inp).timestamp() * 1000))
104104
except (ParserError, OverflowError):
105-
logger.debug(f"Failed to parse {inp} as a dateutil parsable. Falling back to unix timestamp")
105+
logger.debug(
106+
f"Failed to parse {inp} as a dateutil parsable. Falling back to unix timestamp"
107+
)
106108
return int(inp)
107109
except ValueError:
108-
raise ArgumentTypeError(f"timestamp {inp} is not parsable by dateutil.parse() and is not a unix timestamp")
110+
raise ArgumentTypeError(
111+
f"timestamp {inp} is not parsable by dateutil.parse() and is not a unix timestamp"
112+
)

tests/test_sniff.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
import pytest
21
from unittest.mock import patch
32

3+
import pytest
44
from confluent_kafka.admin import BrokerMetadata, ClusterMetadata, TopicMetadata
55

66
from saluki.sniff import sniff
77

8+
89
@pytest.fixture()
910
def fake_cluster_md():
1011
"""
@@ -27,6 +28,7 @@ def fake_cluster_md():
2728
fake_cluster_md.topics = {"topic1": topic1, "topic2": topic2}
2829
return fake_cluster_md
2930

31+
3032
def test_sniff_with_two_partitions_in_a_topic(fake_cluster_md):
3133
with (
3234
patch("saluki.sniff.AdminClient") as a,
@@ -50,13 +52,13 @@ def test_sniff_with_two_partitions_in_a_topic(fake_cluster_md):
5052
topic2_call2 = logger.info.call_args_list[8]
5153
assert "1 - low:1, high:2, num_messages:1" in topic2_call2.args[0]
5254

55+
5356
def test_sniff_with_single_topic(fake_cluster_md):
5457
with (
5558
patch("saluki.sniff.AdminClient") as a,
5659
patch("saluki.sniff.Consumer") as c,
5760
patch("saluki.sniff.logger") as logger,
5861
):
59-
6062
a().list_topics.return_value = fake_cluster_md
6163
c().get_watermark_offsets.return_value = 1, 2
6264
sniff("mybroker:9093", "topic1")

0 commit comments

Comments
 (0)