Skip to content

Commit 54579a4

Browse files
committed
add tests for utils and start for sniff
1 parent 18d81cf commit 54579a4

File tree

3 files changed

+43
-2
lines changed

3 files changed

+43
-2
lines changed

src/saluki/utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ def deserialise_and_print_messages(
4444
if partition is not None and msg.partition() != partition:
4545
continue
4646
schema, deserialised = _try_to_deserialise_message(msg.value())
47-
if schemas_to_filter_to is not None and schema in schemas_to_filter_to:
48-
break
47+
if schemas_to_filter_to is not None and schema not in schemas_to_filter_to:
48+
continue
4949
time = _parse_timestamp(msg)
5050
logger.info(f"{msg.offset()} ({time}):({schema}) {deserialised}")
5151
except Exception as e:

tests/test_sniff.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from unittest.mock import patch
2+
3+
from confluent_kafka.admin import ClusterMetadata, BrokerMetadata, TopicMetadata
4+
5+
from saluki.sniff import sniff
6+
7+
def test_sniff_with_two_partitions_in_a_topic():
8+
with patch("saluki.sniff.AdminClient") as a, patch("saluki.sniff.Consumer") as c, patch("saluki.sniff.logger") as logger:
9+
fake_cluster_md = ClusterMetadata()
10+
broker1 = BrokerMetadata()
11+
broker2 = BrokerMetadata()
12+
fake_cluster_md.brokers = {0: broker1, 1: broker2}
13+
14+
topic1 = TopicMetadata()
15+
topic2 = TopicMetadata()
16+
17+
fake_cluster_md.topics = {
18+
"topic1": topic1,
19+
"topic2": topic2
20+
}
21+
a.list_topics.return_value = fake_cluster_md
22+
sniff("whatever")
23+
24+
# TODO

tests/test_utils.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22

33
import pytest
44
from confluent_kafka import Message
5+
from streaming_data_types import serialise_f144
56
from streaming_data_types.forwarder_config_update_fc00 import (
67
ConfigurationUpdate,
78
StreamInfo,
9+
serialise_fc00
810
)
911

1012
from saluki.utils import (
@@ -74,6 +76,21 @@ def test_deserialising_message_which_raises_does_not_stop_loop(mock_message):
7476
assert logger.info.call_count == 1
7577

7678

79+
def test_deserialising_with_schema_list_ignores_messages_with_schema_not_in_list(mock_message):
80+
with patch("saluki.utils.logger") as logger:
81+
ok_message = Mock(spec=Message)
82+
ok_message.value.return_value = serialise_fc00(config_change=1, streams=[])
83+
ok_message.error.return_value = False
84+
ok_message.timestamp.return_value = 2, 1
85+
86+
mock_message.value.return_value = serialise_f144(source_name="test", value=123)
87+
mock_message.error.return_value = False
88+
mock_message.timestamp.return_value = 2, 1
89+
90+
deserialise_and_print_messages([mock_message, ok_message], None, schemas_to_filter_to=["fc00"])
91+
assert logger.info.call_count == 1
92+
93+
7794
def test_message_that_has_valid_schema_but_empty_payload():
7895
with pytest.raises(Exception):
7996
# Empty fc00 message - valid schema but not valid payload

0 commit comments

Comments
 (0)