Skip to content

Commit 859f7e5

Browse files
committed
ruff
1 parent ecd0d8f commit 859f7e5

File tree

3 files changed

+24
-8
lines changed

3 files changed

+24
-8
lines changed

src/saluki/consume.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ def consume(
4747
start = offset - num_messages + 1
4848
else:
4949
start = (
50-
c.get_watermark_offsets(TopicPartition(topic, partition), cached=False)[1]
50+
c.get_watermark_offsets(TopicPartition(topic, partition), cached=False)[
51+
1
52+
]
5153
- num_messages
5254
)
5355

src/saluki/main.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ def main() -> None:
2121
)
2222

2323
parent_parser = argparse.ArgumentParser(add_help=False)
24-
parent_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic")
24+
parent_parser.add_argument(
25+
"topic", type=str, help="Kafka topic. format is broker<:port>/topic"
26+
)
2527

2628
parent_parser.add_argument(
2729
"-X",
@@ -39,7 +41,9 @@ def main() -> None:
3941
type=argparse.FileType("a"),
4042
)
4143

42-
sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command")
44+
sub_parsers = parser.add_subparsers(
45+
help="sub-command help", required=True, dest="command"
46+
)
4347

4448
consumer_parser = argparse.ArgumentParser(add_help=False)
4549
consumer_parser.add_argument(
@@ -64,16 +68,24 @@ def main() -> None:
6468
consumer_mode_parser.add_argument(
6569
"-o", "--offset", help="offset to consume from", type=int, required=False
6670
)
67-
consumer_mode_parser.add_argument("-s", "--schema", required=False, default="auto", type=str)
68-
consumer_mode_parser.add_argument("-g", "--go-forwards", required=False, action="store_true")
69-
consumer_mode_parser.add_argument("-p", "--partition", required=False, type=int, default=0)
71+
consumer_mode_parser.add_argument(
72+
"-s", "--schema", required=False, default="auto", type=str
73+
)
74+
consumer_mode_parser.add_argument(
75+
"-g", "--go-forwards", required=False, action="store_true"
76+
)
77+
consumer_mode_parser.add_argument(
78+
"-p", "--partition", required=False, type=int, default=0
79+
)
7080

7181
listen_parser = sub_parsers.add_parser(
7282
_LISTEN,
7383
help="listen mode - listen until KeyboardInterrupt",
7484
parents=[parent_parser, consumer_parser],
7585
)
76-
listen_parser.add_argument("-p", "--partition", required=False, type=int, default=None)
86+
listen_parser.add_argument(
87+
"-p", "--partition", required=False, type=int, default=None
88+
)
7789

7890
if len(sys.argv) == 1:
7991
parser.print_help()

src/saluki/utils.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ def _try_to_deserialise_message(payload: bytes) -> Tuple[str | None, str | None]
2323
def fallback_deserialiser(payload: bytes) -> str:
2424
return payload.decode()
2525

26-
deserialiser = DESERIALISERS.get(schema if schema is not None else "", fallback_deserialiser)
26+
deserialiser = DESERIALISERS.get(
27+
schema if schema is not None else "", fallback_deserialiser
28+
)
2729
logger.debug(f"Deserialiser: {deserialiser}")
2830

2931
ret = deserialiser(payload)

0 commit comments

Comments
 (0)