Skip to content

Commit 6596567

Browse files
committed
make a start on better error handlingand tests
1 parent c10ced8 commit 6596567

File tree

9 files changed

+169
-99
lines changed

9 files changed

+169
-99
lines changed

src/saluki/__init__.py

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1 @@
1-
import logging
2-
from typing import Tuple, List
3-
import datetime
4-
from confluent_kafka import Message
5-
from streaming_data_types import DESERIALISERS
6-
from streaming_data_types.exceptions import StreamingDataTypesException
7-
from streaming_data_types.utils import get_schema
81

9-
logger = logging.getLogger("saluki")
10-
11-
12-
def _fallback_deserialiser(payload: bytes) -> str:
13-
return payload.decode()
14-
15-
16-
def try_to_deserialise_message(payload: bytes) -> Tuple[str, str]:
17-
logger.debug(f"got some data: {payload}")
18-
schema = get_schema(payload)
19-
deserialiser = (
20-
_fallback_deserialiser # Fall back to this if we need to so data isn't lost
21-
)
22-
try:
23-
deserialiser = DESERIALISERS[schema]
24-
except StreamingDataTypesException:
25-
pass # TODO
26-
except KeyError:
27-
pass
28-
return schema, deserialiser(payload)
29-
30-
31-
def _deserialise_and_print_messages(msgs: List[Message], partition: int | None) -> None:
32-
for msg in msgs:
33-
if msg is None:
34-
continue
35-
if msg.error():
36-
logger.error("Consumer error: {}".format(msg.error()))
37-
continue
38-
if partition is not None and msg.partition() != partition:
39-
continue
40-
schema, deserialised = try_to_deserialise_message(msg.value())
41-
time = _parse_timestamp(msg)
42-
logger.info(f"{msg.offset()} ({time}):({schema}) {deserialised}")
43-
44-
45-
def _parse_timestamp(msg: Message) -> str:
46-
"""
47-
Parse a message timestamp.
48-
49-
See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Message.timestamp
50-
:param msg: the message to parse.
51-
:return: either the string-formatted timestamp or "Unknown" if not able to parse.
52-
"""
53-
timestamp_type, timestamp_ms_from_epoch = msg.timestamp()
54-
if timestamp_type == 1: # TIMESTAMP_CREATE_TIME
55-
return datetime.datetime.fromtimestamp(timestamp_ms_from_epoch / 1000).strftime(
56-
"%Y-%m-%d %H:%M:%S.%f"
57-
)
58-
else:
59-
# TIMESTAMP_NOT_AVAILABLE or TIMESTAMP_LOG_APPEND_TIME
60-
return "Unknown"

src/saluki/consume.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22

33
from confluent_kafka import Consumer, TopicPartition
4-
from saluki import _deserialise_and_print_messages
4+
from saluki.utils import _deserialise_and_print_messages
55

66
logger = logging.getLogger("saluki")
77

@@ -53,15 +53,15 @@ def consume(
5353
- num_messages
5454
)
5555

56-
logger.info(f"starting at offset {start}")
56+
logger.info(f"Starting at offset {start}")
5757
c.assign([TopicPartition(topic, partition, start)])
5858

5959
try:
60-
logger.info(f"consuming {num_messages} messages")
60+
logger.info(f"Consuming {num_messages} messages")
6161
msgs = c.consume(num_messages)
6262
_deserialise_and_print_messages(msgs, partition)
63-
except Exception as e:
64-
logger.error(e)
63+
except Exception:
64+
logger.exception("Got exception while consuming:")
6565
finally:
66-
logger.debug(f"closing consumer {c}")
66+
logger.debug(f"Closing consumer {c}")
6767
c.close()

src/saluki/listen.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22

33
from confluent_kafka import Consumer, TopicPartition
4-
from saluki import _deserialise_and_print_messages
4+
from saluki.utils import _deserialise_and_print_messages
55

66
logger = logging.getLogger("saluki")
77

@@ -32,6 +32,8 @@ def listen(broker: str, topic: str, partition: int | None = None) -> None:
3232
_deserialise_and_print_messages([msg], partition)
3333
except KeyboardInterrupt:
3434
logger.debug("finished listening")
35+
except Exception:
36+
logger.exception("Got exception while listening:")
3537
finally:
3638
logger.debug(f"closing consumer {c}")
3739
c.close()

src/saluki/main.py

Lines changed: 19 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import argparse
22
import logging
33
import sys
4-
from typing import Tuple
54

65
from saluki.consume import consume
76
from saluki.listen import listen
7+
from saluki.utils import parse_kafka_uri
88

99
logger = logging.getLogger("saluki")
1010
logging.basicConfig(level=logging.INFO)
@@ -14,38 +14,16 @@
1414
_CONSUME = "consume"
1515

1616

17-
def parse_kafka_uri(uri: str) -> Tuple[str, str]:
18-
"""Parse Kafka connection URI.
19-
20-
A broker hostname/ip must be present.
21-
If username is provided, a SASL mechanism must also be provided.
22-
Any other validation must be performed in the calling code.
23-
"""
24-
security_protocol, tail = uri.split("+") if "+" in uri else ("", uri)
25-
sasl_mechanism, tail = tail.split("\\") if "\\" in tail else ("", tail)
26-
username, tail = tail.split("@") if "@" in tail else ("", tail)
27-
broker, topic = tail.split("/") if "/" in tail else (tail, "")
28-
if not broker:
29-
raise RuntimeError(
30-
f"Unable to parse URI {uri}, broker not defined. URI should be of form"
31-
f" [PROTOCOL+SASL_MECHANISM\\username@]broker:9092"
32-
)
33-
if username and not (security_protocol and sasl_mechanism):
34-
raise RuntimeError(
35-
f"Unable to parse URI {uri}, PROTOCOL or SASL_MECHANISM not defined."
36-
f" URI should be of form [PROTOCOL+SASL_MECHANISM\\username@]broker:9092"
37-
)
38-
return broker, topic
39-
40-
4117
def main() -> None:
4218
parser = argparse.ArgumentParser(
4319
prog="saluki",
4420
description="serialise/de-serialise flatbuffers and consume/produce from/to kafka",
4521
)
4622

4723
parent_parser = argparse.ArgumentParser(add_help=False)
48-
parent_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic")
24+
parent_parser.add_argument(
25+
"topic", type=str, help="Kafka topic. format is broker<:port>/topic"
26+
)
4927

5028
parent_parser.add_argument(
5129
"-X",
@@ -63,7 +41,9 @@ def main() -> None:
6341
type=argparse.FileType("a"),
6442
)
6543

66-
sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command")
44+
sub_parsers = parser.add_subparsers(
45+
help="sub-command help", required=True, dest="command"
46+
)
6747

6848
consumer_parser = argparse.ArgumentParser(add_help=False)
6949
consumer_parser.add_argument(
@@ -88,16 +68,24 @@ def main() -> None:
8868
consumer_mode_parser.add_argument(
8969
"-o", "--offset", help="offset to consume from", type=int, required=False
9070
)
91-
consumer_mode_parser.add_argument("-s", "--schema", required=False, default="auto", type=str)
92-
consumer_mode_parser.add_argument("-g", "--go-forwards", required=False, action="store_true")
93-
consumer_mode_parser.add_argument("-p", "--partition", required=False, type=int, default=0)
71+
consumer_mode_parser.add_argument(
72+
"-s", "--schema", required=False, default="auto", type=str
73+
)
74+
consumer_mode_parser.add_argument(
75+
"-g", "--go-forwards", required=False, action="store_true"
76+
)
77+
consumer_mode_parser.add_argument(
78+
"-p", "--partition", required=False, type=int, default=0
79+
)
9480

9581
listen_parser = sub_parsers.add_parser(
9682
_LISTEN,
9783
help="listen mode - listen until KeyboardInterrupt",
9884
parents=[parent_parser, consumer_parser],
9985
)
100-
listen_parser.add_argument("-p", "--partition", required=False, type=int, default=None)
86+
listen_parser.add_argument(
87+
"-p", "--partition", required=False, type=int, default=None
88+
)
10189

10290
if len(sys.argv) == 1:
10391
parser.print_help()

src/saluki/utils.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import datetime
2+
import logging
3+
from typing import Tuple, List
4+
5+
from confluent_kafka import Message
6+
7+
from streaming_data_types import DESERIALISERS
8+
from streaming_data_types.exceptions import ShortBufferException
9+
from streaming_data_types.utils import get_schema
10+
11+
12+
logger = logging.getLogger("saluki")
13+
14+
15+
def __try_to_deserialise_message(payload: bytes) -> Tuple[str | None, str | None]:
16+
logger.debug(f"got some data: {payload}")
17+
try:
18+
schema = get_schema(payload)
19+
except ShortBufferException:
20+
schema = None
21+
22+
logger.debug(f"schema: {schema}")
23+
24+
try:
25+
deserialiser = DESERIALISERS[schema]
26+
except KeyError:
27+
logger.exception(f"Invalid schema: {schema}, falling back to raw bytes decode")
28+
29+
def fallback_deserialiser(payload: bytes) -> str:
30+
return payload.decode()
31+
32+
deserialiser = (
33+
fallback_deserialiser # Fall back to this if we need to so data isn't lost
34+
)
35+
36+
logger.debug(f"Deserialiser: {deserialiser}")
37+
38+
try:
39+
ret = deserialiser(payload)
40+
except Exception as e:
41+
raise e
42+
43+
return schema, ret
44+
45+
46+
def _deserialise_and_print_messages(msgs: List[Message], partition: int | None) -> None:
47+
for msg in msgs:
48+
try:
49+
if msg is None:
50+
continue
51+
if msg.error():
52+
logger.error("Consumer error: {}".format(msg.error()))
53+
continue
54+
if partition is not None and msg.partition() != partition:
55+
continue
56+
schema, deserialised = __try_to_deserialise_message(msg.value())
57+
time = _parse_timestamp(msg)
58+
logger.info(f"{msg.offset()} ({time}):({schema}) {deserialised}")
59+
except Exception as e:
60+
logger.exception(f"Got error while deserialising: {e}")
61+
62+
63+
def _parse_timestamp(msg: Message) -> str:
64+
"""
65+
Parse a message timestamp.
66+
67+
See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Message.timestamp
68+
:param msg: the message to parse.
69+
:return: either the string-formatted timestamp or "Unknown" if not able to parse.
70+
"""
71+
timestamp_type, timestamp_ms_from_epoch = msg.timestamp()
72+
if timestamp_type == 1: # TIMESTAMP_CREATE_TIME
73+
return datetime.datetime.fromtimestamp(timestamp_ms_from_epoch / 1000).strftime(
74+
"%Y-%m-%d %H:%M:%S.%f"
75+
)
76+
else:
77+
# TIMESTAMP_NOT_AVAILABLE or TIMESTAMP_LOG_APPEND_TIME
78+
return "Unknown"
79+
80+
81+
def parse_kafka_uri(uri: str) -> Tuple[str, str]:
82+
"""Parse Kafka connection URI.
83+
84+
A broker hostname/ip must be present.
85+
If username is provided, a SASL mechanism must also be provided.
86+
Any other validation must be performed in the calling code.
87+
"""
88+
broker, topic = uri.split("/") if "/" in uri else (uri, "")
89+
if not topic:
90+
raise RuntimeError(
91+
f"Unable to parse URI {uri}, topic not defined. URI should be of form"
92+
f" broker[:port]/topic"
93+
)
94+
return (
95+
broker,
96+
topic,
97+
)

tests/test_consume.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# test that tries go_forwards with no offset
2+
3+
# test that tries going forwards that consumes from offset
4+
5+
# test that checks start offset
6+
7+
# test that catches exception

tests/test_deserialiser.py

Lines changed: 0 additions & 2 deletions
This file was deleted.

tests/test_listen.py

Whitespace-only changes.

tests/test_utils.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import pytest
2+
3+
from saluki.utils import parse_kafka_uri
4+
5+
6+
# test with normal payload
7+
8+
# test with fallback serialiser when schema not found (empty payload)
9+
10+
# test with schema that looks ok, but not in list of deserialisers
11+
12+
# test exception while deserialising
13+
14+
# test _parse_timestamp
15+
16+
def test_uri_with_broker_name_and_topic_successfully_split():
17+
test_broker = "localhost"
18+
test_topic = "some_topic"
19+
test_uri = f"{test_broker}/{test_topic}"
20+
broker, topic = parse_kafka_uri(test_uri)
21+
assert broker == test_broker
22+
assert topic == test_topic
23+
24+
25+
def test_uri_with_port_after_broker_is_included_in_broker_output():
26+
test_broker = "localhost:9092"
27+
test_topic = "some_topic"
28+
test_uri = f"{test_broker}/{test_topic}"
29+
broker, topic = parse_kafka_uri(test_uri)
30+
assert broker == test_broker
31+
assert topic == test_topic
32+
33+
34+
def test_uri_with_no_topic():
35+
test_broker = "some_broker"
36+
with pytest.raises(RuntimeError):
37+
parse_kafka_uri(test_broker)

0 commit comments

Comments
 (0)