Skip to content

Commit bc6a398

Browse files
Merge pull request #21 from ISISComputingGroup/error_handling_and_tests
Error handling and tests
2 parents 766dd75 + ada5ec4 commit bc6a398

File tree

11 files changed

+399
-101
lines changed

11 files changed

+399
-101
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ coverage.xml
5050
.hypothesis/
5151
.pytest_cache/
5252
cover/
53+
coverage_html_report/
5354

5455
# Translations
5556
*.mo

pyproject.toml

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ name = "saluki"
77
dynamic = ["version"]
88
dependencies = [
99
"ess-streaming-data-types",
10-
"confluent-kafka"
10+
"confluent-kafka",
11+
"tzdata"
1112
]
1213
readme = {file = "README.md", content-type = "text/markdown"}
1314
license-files = ["LICENSE"]
@@ -25,6 +26,29 @@ dev = [
2526
"pyright",
2627
"ruff",
2728
"pytest",
29+
"pytest-cov"
2830
]
2931

32+
[tool.coverage.run]
33+
branch = true
34+
source = ["src"]
35+
36+
[tool.coverage.report]
37+
fail_under = 100
38+
exclude_lines = [
39+
"pragma: no cover",
40+
"if TYPE_CHECKING:",
41+
"if typing.TYPE_CHECKING:",
42+
"@abstractmethod",
43+
]
44+
omit = ["main.py"] # No logic here besides argparse
45+
46+
47+
[tool.coverage.html]
48+
directory = "coverage_html_report"
49+
50+
[tool.pytest.ini_options]
51+
testpaths = "tests"
52+
addopts = "--cov --cov-report=html -vv"
53+
3054
[tool.setuptools_scm]

src/saluki/__init__.py

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1 @@
1-
import logging
2-
from typing import Tuple, List
3-
import datetime
4-
from confluent_kafka import Message
5-
from streaming_data_types import DESERIALISERS
6-
from streaming_data_types.exceptions import StreamingDataTypesException
7-
from streaming_data_types.utils import get_schema
81

9-
logger = logging.getLogger("saluki")
10-
11-
12-
def _fallback_deserialiser(payload: bytes) -> str:
13-
return payload.decode()
14-
15-
16-
def try_to_deserialise_message(payload: bytes) -> Tuple[str, str]:
17-
logger.debug(f"got some data: {payload}")
18-
schema = get_schema(payload)
19-
deserialiser = (
20-
_fallback_deserialiser # Fall back to this if we need to so data isn't lost
21-
)
22-
try:
23-
deserialiser = DESERIALISERS[schema]
24-
except StreamingDataTypesException:
25-
pass # TODO
26-
except KeyError:
27-
pass
28-
return schema, deserialiser(payload)
29-
30-
31-
def _deserialise_and_print_messages(msgs: List[Message], partition: int | None) -> None:
32-
for msg in msgs:
33-
if msg is None:
34-
continue
35-
if msg.error():
36-
logger.error("Consumer error: {}".format(msg.error()))
37-
continue
38-
if partition is not None and msg.partition() != partition:
39-
continue
40-
schema, deserialised = try_to_deserialise_message(msg.value())
41-
time = _parse_timestamp(msg)
42-
logger.info(f"{msg.offset()} ({time}):({schema}) {deserialised}")
43-
44-
45-
def _parse_timestamp(msg: Message) -> str:
46-
"""
47-
Parse a message timestamp.
48-
49-
See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Message.timestamp
50-
:param msg: the message to parse.
51-
:return: either the string-formatted timestamp or "Unknown" if not able to parse.
52-
"""
53-
timestamp_type, timestamp_ms_from_epoch = msg.timestamp()
54-
if timestamp_type == 1: # TIMESTAMP_CREATE_TIME
55-
return datetime.datetime.fromtimestamp(timestamp_ms_from_epoch / 1000).strftime(
56-
"%Y-%m-%d %H:%M:%S.%f"
57-
)
58-
else:
59-
# TIMESTAMP_NOT_AVAILABLE or TIMESTAMP_LOG_APPEND_TIME
60-
return "Unknown"

src/saluki/consume.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import logging
22

33
from confluent_kafka import Consumer, TopicPartition
4-
from saluki import _deserialise_and_print_messages
4+
5+
from saluki.utils import deserialise_and_print_messages
56

67
logger = logging.getLogger("saluki")
78

@@ -39,29 +40,26 @@ def consume(
3940

4041
if go_forwards:
4142
if offset is None:
42-
logger.error("Can't go forwards without an offset")
43-
return
43+
raise ValueError("Can't go forwards without an offset")
4444
start = offset
4545
else:
4646
if offset is not None:
4747
start = offset - num_messages + 1
4848
else:
4949
start = (
50-
c.get_watermark_offsets(TopicPartition(topic, partition), cached=False)[
51-
1
52-
]
50+
c.get_watermark_offsets(TopicPartition(topic, partition), cached=False)[1]
5351
- num_messages
5452
)
5553

56-
logger.info(f"starting at offset {start}")
54+
logger.info(f"Starting at offset {start}")
5755
c.assign([TopicPartition(topic, partition, start)])
5856

5957
try:
60-
logger.info(f"consuming {num_messages} messages")
58+
logger.info(f"Consuming {num_messages} messages")
6159
msgs = c.consume(num_messages)
62-
_deserialise_and_print_messages(msgs, partition)
63-
except Exception as e:
64-
logger.error(e)
60+
deserialise_and_print_messages(msgs, partition)
61+
except Exception:
62+
logger.exception("Got exception while consuming:")
6563
finally:
66-
logger.debug(f"closing consumer {c}")
64+
logger.debug(f"Closing consumer {c}")
6765
c.close()

src/saluki/listen.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import logging
22

33
from confluent_kafka import Consumer, TopicPartition
4-
from saluki import _deserialise_and_print_messages
4+
5+
from saluki.utils import deserialise_and_print_messages
56

67
logger = logging.getLogger("saluki")
78

@@ -29,7 +30,7 @@ def listen(broker: str, topic: str, partition: int | None = None) -> None:
2930
logger.info(f"listening to {broker}/{topic}")
3031
while True:
3132
msg = c.poll(1.0)
32-
_deserialise_and_print_messages([msg], partition)
33+
deserialise_and_print_messages([msg], partition)
3334
except KeyboardInterrupt:
3435
logger.debug("finished listening")
3536
finally:

src/saluki/main.py

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import argparse
22
import logging
33
import sys
4-
from typing import Tuple
54

65
from saluki.consume import consume
76
from saluki.listen import listen
7+
from saluki.utils import parse_kafka_uri
88

99
logger = logging.getLogger("saluki")
1010
logging.basicConfig(level=logging.INFO)
@@ -14,30 +14,6 @@
1414
_CONSUME = "consume"
1515

1616

17-
def parse_kafka_uri(uri: str) -> Tuple[str, str]:
18-
"""Parse Kafka connection URI.
19-
20-
A broker hostname/ip must be present.
21-
If username is provided, a SASL mechanism must also be provided.
22-
Any other validation must be performed in the calling code.
23-
"""
24-
security_protocol, tail = uri.split("+") if "+" in uri else ("", uri)
25-
sasl_mechanism, tail = tail.split("\\") if "\\" in tail else ("", tail)
26-
username, tail = tail.split("@") if "@" in tail else ("", tail)
27-
broker, topic = tail.split("/") if "/" in tail else (tail, "")
28-
if not broker:
29-
raise RuntimeError(
30-
f"Unable to parse URI {uri}, broker not defined. URI should be of form"
31-
f" [PROTOCOL+SASL_MECHANISM\\username@]broker:9092"
32-
)
33-
if username and not (security_protocol and sasl_mechanism):
34-
raise RuntimeError(
35-
f"Unable to parse URI {uri}, PROTOCOL or SASL_MECHANISM not defined."
36-
f" URI should be of form [PROTOCOL+SASL_MECHANISM\\username@]broker:9092"
37-
)
38-
return broker, topic
39-
40-
4117
def main() -> None:
4218
parser = argparse.ArgumentParser(
4319
prog="saluki",

src/saluki/utils.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import datetime
2+
import logging
3+
from typing import List, Tuple
4+
from zoneinfo import ZoneInfo
5+
6+
from confluent_kafka import Message
7+
from streaming_data_types import DESERIALISERS
8+
from streaming_data_types.exceptions import ShortBufferException
9+
from streaming_data_types.utils import get_schema
10+
11+
logger = logging.getLogger("saluki")
12+
13+
14+
def _try_to_deserialise_message(payload: bytes) -> Tuple[str | None, str | None]:
15+
logger.debug(f"got some data: {payload}")
16+
try:
17+
schema = get_schema(payload)
18+
except ShortBufferException:
19+
schema = None
20+
21+
logger.debug(f"schema: {schema}")
22+
23+
def fallback_deserialiser(payload: bytes) -> str:
24+
return payload.decode()
25+
26+
deserialiser = DESERIALISERS.get(schema if schema is not None else "", fallback_deserialiser)
27+
logger.debug(f"Deserialiser: {deserialiser}")
28+
29+
ret = deserialiser(payload)
30+
31+
return schema, ret
32+
33+
34+
def deserialise_and_print_messages(msgs: List[Message], partition: int | None) -> None:
35+
for msg in msgs:
36+
try:
37+
if msg is None:
38+
continue
39+
if msg.error():
40+
logger.error("Consumer error: {}".format(msg.error()))
41+
continue
42+
if partition is not None and msg.partition() != partition:
43+
continue
44+
schema, deserialised = _try_to_deserialise_message(msg.value())
45+
time = _parse_timestamp(msg)
46+
logger.info(f"{msg.offset()} ({time}):({schema}) {deserialised}")
47+
except Exception as e:
48+
logger.exception(f"Got error while deserialising: {e}")
49+
50+
51+
def _parse_timestamp(msg: Message) -> str:
52+
"""
53+
Parse a message timestamp.
54+
55+
See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Message.timestamp
56+
:param msg: the message to parse.
57+
:return: either the string-formatted timestamp or "Unknown" if not able to parse.
58+
"""
59+
timestamp_type, timestamp_ms_from_epoch = msg.timestamp()
60+
if timestamp_type == 1: # TIMESTAMP_CREATE_TIME
61+
return (
62+
datetime.datetime.fromtimestamp(timestamp_ms_from_epoch / 1000)
63+
.astimezone(ZoneInfo("UTC"))
64+
.strftime("%Y-%m-%d %H:%M:%S.%f")
65+
)
66+
else:
67+
# TIMESTAMP_NOT_AVAILABLE or TIMESTAMP_LOG_APPEND_TIME
68+
return "Unknown"
69+
70+
71+
def parse_kafka_uri(uri: str) -> Tuple[str, str]:
72+
"""Parse Kafka connection URI.
73+
74+
A broker hostname/ip must be present.
75+
If username is provided, a SASL mechanism must also be provided.
76+
Any other validation must be performed in the calling code.
77+
"""
78+
broker, topic = uri.split("/") if "/" in uri else (uri, "")
79+
if not topic:
80+
raise RuntimeError(
81+
f"Unable to parse URI {uri}, topic not defined. URI should be of form"
82+
f" broker[:port]/topic"
83+
)
84+
return (
85+
broker,
86+
topic,
87+
)

tests/test_consume.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
from unittest import mock
2+
from unittest.mock import patch
3+
4+
import pytest
5+
from confluent_kafka import TopicPartition
6+
7+
from saluki.consume import consume
8+
9+
10+
@patch("saluki.consume.Consumer")
11+
def test_go_forwards_with_no_offset_raises(_):
12+
with pytest.raises(ValueError):
13+
consume("broker", "topic", go_forwards=True, offset=None)
14+
15+
16+
@patch("saluki.consume.Consumer")
17+
def test_go_forwards_with_offset_assigns_at_offset(mock_consumer):
18+
expected_topic = "topic"
19+
expected_offset = 1234
20+
expected_partition = 1
21+
consume(
22+
"broker",
23+
expected_topic,
24+
go_forwards=True,
25+
offset=expected_offset,
26+
partition=expected_partition,
27+
)
28+
mock_assign = mock_consumer.return_value.assign
29+
30+
mock_assign.assert_called_with(
31+
[TopicPartition(expected_topic, expected_partition, expected_offset)]
32+
)
33+
34+
35+
@patch("saluki.consume.Consumer")
36+
def test_consume_with_offset_and_num_of_messages_goes_back_offset_minus_messages(
37+
mock_consumer,
38+
):
39+
expected_offset = 1234
40+
expected_topic = "sometopic"
41+
num_messages = 3
42+
expected_start_offset = expected_offset - num_messages + 1
43+
44+
consume("broker", expected_topic, offset=expected_offset, num_messages=num_messages)
45+
46+
mock_assign = mock_consumer.return_value.assign
47+
mock_assign.assert_called_once()
48+
49+
mock_assign_call = mock_assign.call_args.args[0][0]
50+
assert mock_assign_call.topic == expected_topic
51+
assert mock_assign_call.offset == expected_start_offset
52+
53+
54+
@patch("saluki.consume.Consumer")
55+
def test_consume_with_no_offset_and_num_of_messages_goes_back_high_watermark_minus_messages(
56+
mock_consumer,
57+
):
58+
expected_topic = "sometopic"
59+
num_messages = 3
60+
high_watermark_offset = 2345
61+
expected_start_offset = high_watermark_offset - num_messages
62+
63+
mock_consumer.return_value.get_watermark_offsets.return_value = (
64+
None,
65+
high_watermark_offset,
66+
)
67+
68+
consume("broker", topic=expected_topic, num_messages=num_messages)
69+
mock_assign = mock_consumer.return_value.assign
70+
mock_assign.assert_called_once()
71+
72+
mock_assign_call = mock_assign.call_args.args[0][0]
73+
74+
assert mock_assign_call.topic == expected_topic
75+
assert mock_assign_call.offset == expected_start_offset
76+
77+
78+
def test_consume_but_exception_thrown_consumer_is_closed():
79+
with (
80+
mock.patch("saluki.consume.Consumer") as c,
81+
):
82+
c.return_value.consume.side_effect = Exception
83+
consume("somebroker", "sometopic", num_messages=1)
84+
c.return_value.close.assert_called_once()

tests/test_deserialiser.py

Lines changed: 0 additions & 2 deletions
This file was deleted.

0 commit comments

Comments
 (0)