Skip to content

Commit 611c68a

Browse files
committed
make deserialise_and_print_message public
1 parent 0fa3f91 commit 611c68a

File tree

5 files changed

+26
-26
lines changed

5 files changed

+26
-26
lines changed

src/saluki/consume.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from confluent_kafka import Consumer, TopicPartition
44

5-
from saluki.utils import _deserialise_and_print_messages
5+
from saluki.utils import deserialise_and_print_messages
66

77
logger = logging.getLogger("saluki")
88

@@ -57,7 +57,7 @@ def consume(
5757
try:
5858
logger.info(f"Consuming {num_messages} messages")
5959
msgs = c.consume(num_messages)
60-
_deserialise_and_print_messages(msgs, partition)
60+
deserialise_and_print_messages(msgs, partition)
6161
except Exception:
6262
logger.exception("Got exception while consuming:")
6363
finally:

src/saluki/listen.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from confluent_kafka import Consumer, TopicPartition
44

5-
from saluki.utils import _deserialise_and_print_messages
5+
from saluki.utils import deserialise_and_print_messages
66

77
logger = logging.getLogger("saluki")
88

@@ -30,7 +30,7 @@ def listen(broker: str, topic: str, partition: int | None = None) -> None:
3030
logger.info(f"listening to {broker}/{topic}")
3131
while True:
3232
msg = c.poll(1.0)
33-
_deserialise_and_print_messages([msg], partition)
33+
deserialise_and_print_messages([msg], partition)
3434
except KeyboardInterrupt:
3535
logger.debug("finished listening")
3636
finally:

src/saluki/utils.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
logger = logging.getLogger("saluki")
1212

1313

14-
def __try_to_deserialise_message(payload: bytes) -> Tuple[str | None, str | None]:
14+
def _try_to_deserialise_message(payload: bytes) -> Tuple[str | None, str | None]:
1515
logger.debug(f"got some data: {payload}")
1616
try:
1717
schema = get_schema(payload)
@@ -34,7 +34,7 @@ def fallback_deserialiser(payload: bytes) -> str:
3434
return schema, ret
3535

3636

37-
def _deserialise_and_print_messages(msgs: List[Message], partition: int | None) -> None:
37+
def deserialise_and_print_messages(msgs: List[Message], partition: int | None) -> None:
3838
for msg in msgs:
3939
try:
4040
if msg is None:
@@ -44,7 +44,7 @@ def _deserialise_and_print_messages(msgs: List[Message], partition: int | None)
4444
continue
4545
if partition is not None and msg.partition() != partition:
4646
continue
47-
schema, deserialised = __try_to_deserialise_message(msg.value())
47+
schema, deserialised = _try_to_deserialise_message(msg.value())
4848
time = _parse_timestamp(msg)
4949
logger.info(f"{msg.offset()} ({time}):({schema}) {deserialised}")
5050
except Exception as e:

tests/test_listen.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ def test_listen_with_partition_assigns_to_partition():
88
topic = "sometopic"
99
with (
1010
mock.patch(
11-
"saluki.listen._deserialise_and_print_messages",
11+
"saluki.listen.deserialise_and_print_messages",
1212
side_effect=KeyboardInterrupt,
1313
),
1414
mock.patch("saluki.listen.Consumer") as c,
@@ -21,7 +21,7 @@ def test_listen_with_partition_assigns_to_partition():
2121
def test_keyboard_interrupt_causes_consumer_to_close():
2222
with (
2323
mock.patch(
24-
"saluki.listen._deserialise_and_print_messages",
24+
"saluki.listen.deserialise_and_print_messages",
2525
side_effect=KeyboardInterrupt,
2626
),
2727
mock.patch("saluki.listen.Consumer") as c,

tests/test_utils.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
)
99

1010
from saluki.utils import (
11-
__try_to_deserialise_message,
12-
_deserialise_and_print_messages,
11+
_try_to_deserialise_message,
12+
deserialise_and_print_messages,
1313
_parse_timestamp,
1414
parse_kafka_uri,
1515
)
@@ -21,42 +21,42 @@ def mock_message():
2121

2222

2323
def test_deserialising_message_with_no_message_continues():
24-
with patch("saluki.utils.__try_to_deserialise_message") as mock_deserialise_message:
25-
_deserialise_and_print_messages([None], None)
24+
with patch("saluki.utils._try_to_deserialise_message") as mock_deserialise_message:
25+
deserialise_and_print_messages([None], None)
2626
mock_deserialise_message.assert_not_called()
2727

2828

2929
def test_deserialising_message_with_error_continues(mock_message):
3030
mock_message.error.return_value = "Some error"
31-
with patch("saluki.utils.__try_to_deserialise_message") as mock_deserialise_message:
32-
_deserialise_and_print_messages([mock_message], None)
31+
with patch("saluki.utils._try_to_deserialise_message") as mock_deserialise_message:
32+
deserialise_and_print_messages([mock_message], None)
3333
mock_deserialise_message.assert_not_called()
3434

3535

3636
def test_deserialising_message_with_wrong_partition_continues(mock_message):
3737
noninteresting_partition = 123
3838
mock_message.error.return_value = False
3939
mock_message.partition.return_value = noninteresting_partition
40-
with patch("saluki.utils.__try_to_deserialise_message") as mock_deserialise_message:
41-
_deserialise_and_print_messages([mock_message], 234)
40+
with patch("saluki.utils._try_to_deserialise_message") as mock_deserialise_message:
41+
deserialise_and_print_messages([mock_message], 234)
4242
mock_deserialise_message.assert_not_called()
4343

4444

4545
def test_deserialising_message_with_correct_partition_calls_deserialise(mock_message):
4646
partition = 123
4747
mock_message.error.return_value = False
4848
mock_message.partition.return_value = partition
49-
with patch("saluki.utils.__try_to_deserialise_message") as mock_deserialise_message:
50-
_deserialise_and_print_messages([mock_message], partition)
49+
with patch("saluki.utils._try_to_deserialise_message") as mock_deserialise_message:
50+
deserialise_and_print_messages([mock_message], partition)
5151
mock_deserialise_message.assert_called_once()
5252

5353

5454
def test_deserialising_empty_message(mock_message):
55-
assert (None, "") == __try_to_deserialise_message(b"")
55+
assert (None, "") == _try_to_deserialise_message(b"")
5656

5757

5858
def test_deserialising_message_with_invalid_schema_falls_back_to_raw_bytes_decode():
59-
assert __try_to_deserialise_message(b"blah") == (None, "blah")
59+
assert _try_to_deserialise_message(b"blah") == (None, "blah")
6060

6161

6262
def test_deserialising_message_which_raises_does_not_stop_loop(mock_message):
@@ -70,27 +70,27 @@ def test_deserialising_message_which_raises_does_not_stop_loop(mock_message):
7070
mock_message.error.return_value = False
7171
mock_message.timestamp.return_value = 2, 1
7272

73-
_deserialise_and_print_messages([mock_message, ok_message], None)
73+
deserialise_and_print_messages([mock_message, ok_message], None)
7474
assert logger.info.call_count == 1
7575

7676

7777
def test_message_that_has_valid_schema_but_empty_payload():
7878
with pytest.raises(Exception):
7979
# Empty fc00 message - valid schema but not valid payload
80-
__try_to_deserialise_message(b" fc00")
80+
_try_to_deserialise_message(b" fc00")
8181

8282

8383
def test_schema_that_isnt_in_deserialiser_list(mock_message):
84-
assert __try_to_deserialise_message(b" blah123") == ("blah", " \t blah123")
84+
assert _try_to_deserialise_message(b" blah123") == ("blah", " \t blah123")
8585

8686

8787
def test_message_that_has_valid_schema_but_invalid_payload(mock_message):
8888
with pytest.raises(Exception):
89-
__try_to_deserialise_message(b" fc0012345")
89+
_try_to_deserialise_message(b" fc0012345")
9090

9191

9292
def test_message_that_has_valid_schema_and_valid_payload(mock_message):
93-
assert __try_to_deserialise_message(
93+
assert _try_to_deserialise_message(
9494
b"\x10\x00\x00\x00\x66\x63\x30\x30\x08\x00\x0c\x00\x06\x00\x08\x00\x08\x00\x00\x00\x00\x00\x01\x00\x04\x00\x00\x00\x03\x00\x00\x00\x0c\x00\x00\x00\x2c\x00\x00\x00\x4c\x00\x00\x00\xea\xff\xff\xff\x00\x00\x00\x00\x7c\x00\x00\x00\x6c\x00\x00\x00\x50\x00\x00\x00\x01\x00\x0e\x00\x16\x00\x08\x00\x0c\x00\x10\x00\x14\x00\x04\x00\x0e\x00\x00\x00\x00\x00\x00\x00\x9c\x00\x00\x00\x8c\x00\x00\x00\x70\x00\x00\x00\x01\x00\x0e\x00\x18\x00\x08\x00\x0c\x00\x10\x00\x16\x00\x04\x00\x0e\x00\x00\x00\x00\x00\x00\x00\xbc\x00\x00\x00\xac\x00\x00\x00\x90\x00\x00\x00\x00\x00\x01\x00\x11\x00\x00\x00\x4e\x44\x57\x32\x36\x37\x32\x5f\x73\x61\x6d\x70\x6c\x65\x45\x6e\x76\x00\x00\x00\x04\x00\x00\x00\x66\x31\x34\x34\x00\x00\x00\x00\x1b\x00\x00\x00\x54\x45\x3a\x4e\x44\x57\x32\x36\x37\x32\x3a\x43\x53\x3a\x53\x42\x3a\x4d\x42\x42\x49\x5f\x42\x4c\x4f\x43\x4b\x00\x11\x00\x00\x00\x4e\x44\x57\x32\x36\x37\x32\x5f\x73\x61\x6d\x70\x6c\x65\x45\x6e\x76\x00\x00\x00\x04\x00\x00\x00\x66\x31\x34\x34\x00\x00\x00\x00\x19\x00\x00\x00\x54\x45\x3a\x4e\x44\x57\x32\x36\x37\x32\x3a\x43\x53\x3a\x53\x42\x3a\x42\x49\x5f\x42\x4c\x4f\x43\x4b\x00\x00\x00\x11\x00\x00\x00\x4e\x44\x57\x32\x36\x37\x32\x5f\x73\x61\x6d\x70\x6c\x65\x45\x6e\x76\x00\x00\x00\x04\x00\x00\x00\x66\x31\x34\x34\x00\x00\x00\x00\x1c\x00\x00\x00\x54\x45\x3a\x4e\x44\x57\x32\x36\x37\x32\x3a\x43\x53\x3a\x53\x42\x3a\x46\x4c\x4f\x41\x54\x5f\x42\x4c\x4f\x43\x4b\x00\x00\x00\x00"
9595
) == (
9696
"fc00",

0 commit comments

Comments
 (0)