Skip to content

Commit a9c0e15

Browse files
committed
ruff
1 parent 213340b commit a9c0e15

File tree

6 files changed

+17
-34
lines changed

6 files changed

+17
-34
lines changed

src/saluki/consume.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22

33
from confluent_kafka import Consumer, TopicPartition
4+
45
from saluki.utils import _deserialise_and_print_messages
56

67
logger = logging.getLogger("saluki")
@@ -46,9 +47,7 @@ def consume(
4647
start = offset - num_messages + 1
4748
else:
4849
start = (
49-
c.get_watermark_offsets(TopicPartition(topic, partition), cached=False)[
50-
1
51-
]
50+
c.get_watermark_offsets(TopicPartition(topic, partition), cached=False)[1]
5251
- num_messages
5352
)
5453

src/saluki/listen.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22

33
from confluent_kafka import Consumer, TopicPartition
4+
45
from saluki.utils import _deserialise_and_print_messages
56

67
logger = logging.getLogger("saluki")

src/saluki/main.py

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ def main() -> None:
2121
)
2222

2323
parent_parser = argparse.ArgumentParser(add_help=False)
24-
parent_parser.add_argument(
25-
"topic", type=str, help="Kafka topic. format is broker<:port>/topic"
26-
)
24+
parent_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic")
2725

2826
parent_parser.add_argument(
2927
"-X",
@@ -41,9 +39,7 @@ def main() -> None:
4139
type=argparse.FileType("a"),
4240
)
4341

44-
sub_parsers = parser.add_subparsers(
45-
help="sub-command help", required=True, dest="command"
46-
)
42+
sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command")
4743

4844
consumer_parser = argparse.ArgumentParser(add_help=False)
4945
consumer_parser.add_argument(
@@ -68,24 +64,16 @@ def main() -> None:
6864
consumer_mode_parser.add_argument(
6965
"-o", "--offset", help="offset to consume from", type=int, required=False
7066
)
71-
consumer_mode_parser.add_argument(
72-
"-s", "--schema", required=False, default="auto", type=str
73-
)
74-
consumer_mode_parser.add_argument(
75-
"-g", "--go-forwards", required=False, action="store_true"
76-
)
77-
consumer_mode_parser.add_argument(
78-
"-p", "--partition", required=False, type=int, default=0
79-
)
67+
consumer_mode_parser.add_argument("-s", "--schema", required=False, default="auto", type=str)
68+
consumer_mode_parser.add_argument("-g", "--go-forwards", required=False, action="store_true")
69+
consumer_mode_parser.add_argument("-p", "--partition", required=False, type=int, default=0)
8070

8171
listen_parser = sub_parsers.add_parser(
8272
_LISTEN,
8373
help="listen mode - listen until KeyboardInterrupt",
8474
parents=[parent_parser, consumer_parser],
8575
)
86-
listen_parser.add_argument(
87-
"-p", "--partition", required=False, type=int, default=None
88-
)
76+
listen_parser.add_argument("-p", "--partition", required=False, type=int, default=None)
8977

9078
if len(sys.argv) == 1:
9179
parser.print_help()

src/saluki/utils.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
import datetime
22
import logging
3-
from typing import Tuple, List
3+
from typing import List, Tuple
44

55
from confluent_kafka import Message
6-
76
from streaming_data_types import DESERIALISERS
87
from streaming_data_types.exceptions import ShortBufferException
98
from streaming_data_types.utils import get_schema
109

11-
1210
logger = logging.getLogger("saluki")
1311

1412

@@ -29,9 +27,7 @@ def __try_to_deserialise_message(payload: bytes) -> Tuple[str | None, str | None
2927
def fallback_deserialiser(payload: bytes) -> str:
3028
return payload.decode()
3129

32-
deserialiser = (
33-
fallback_deserialiser # Fall back to this if we need to so data isn't lost
34-
)
30+
deserialiser = fallback_deserialiser # Fall back to this if we need to so data isn't lost
3531

3632
logger.debug(f"Deserialiser: {deserialiser}")
3733

tests/test_listen.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
from unittest.mock import MagicMock
21

3-
from confluent_kafka import TopicPartition
42

5-
from saluki.listen import listen
63
from unittest import mock
74

5+
from saluki.listen import listen
6+
87

98
def test_listen_with_partition_assigns_to_partition():
109
expected_partition = 123

tests/test_utils.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
from unittest.mock import Mock, patch
22

33
import pytest
4+
from confluent_kafka import Message
45
from streaming_data_types.forwarder_config_update_fc00 import (
56
ConfigurationUpdate,
67
StreamInfo,
78
)
89

910
from saluki.utils import (
10-
parse_kafka_uri,
11-
_parse_timestamp,
12-
_deserialise_and_print_messages,
1311
__try_to_deserialise_message,
12+
_deserialise_and_print_messages,
13+
_parse_timestamp,
14+
parse_kafka_uri,
1415
)
15-
from confluent_kafka import Message
1616

1717

1818
@pytest.fixture

0 commit comments

Comments
 (0)