Skip to content

Commit e1c3227

Browse files
authored
Header in serializing context (#1120)
SerializationContext doesn't support message headers before, add headers support to SerializationContext.
1 parent c3f1c76 commit e1c3227

File tree

6 files changed

+15
-7
lines changed

6 files changed

+15
-7
lines changed

src/confluent_kafka/deserializing_consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ def poll(self, timeout=-1):
130130
if msg.error() is not None:
131131
raise ConsumeError(msg.error(), kafka_message=msg)
132132

133-
ctx = SerializationContext(msg.topic(), MessageField.VALUE)
133+
ctx = SerializationContext(msg.topic(), MessageField.VALUE, msg.headers())
134134
value = msg.value()
135135
if self._value_deserializer is not None:
136136
try:

src/confluent_kafka/serialization/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,13 @@ class SerializationContext(object):
5656
field (MessageField): Describes what part of the message is
5757
being serialized.
5858
59+
headers (list): List of message header tuples. Defaults to None.
60+
5961
"""
60-
def __init__(self, topic, field):
62+
def __init__(self, topic, field, headers=None):
6163
self.topic = topic
6264
self.field = field
65+
self.headers = headers
6366

6467

6568
class SerializationError(KafkaException):

src/confluent_kafka/serializing_producer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ def produce(self, topic, key=None, value=None, partition=-1,
160160
KafkaException: For all other errors
161161
162162
"""
163-
ctx = SerializationContext(topic, MessageField.KEY)
163+
ctx = SerializationContext(topic, MessageField.KEY, headers)
164164
if self._key_serializer is not None:
165165
try:
166166
key = self._key_serializer(key, ctx)

tests/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ A python3 env suitable for running tests:
1818

1919
$ python3 -m venv venv_test
2020
$ source venv_test/bin/activate
21-
$ pip install -r test/requirements.txt
21+
$ pip install -r tests/requirements.txt
2222
$ python setup.py build
2323
$ python setup.py install
2424

tests/integration/schema_registry/test_avro_serializers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ def test_delivery_report_serialization(kafka_cluster, load_file, avsc, data, rec
124124

125125
def assert_cb(err, msg):
126126
actual = value_deserializer(msg.value(),
127-
SerializationContext(topic, MessageField.VALUE))
127+
SerializationContext(topic, MessageField.VALUE, msg.headers()))
128128

129129
if record_type == "record":
130130
assert [v == actual[k] for k, v in data.items()]

tests/schema_registry/test_avro_serializer.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,11 @@ def test_avro_serializer_record_subject_name_strategy(load_avsc):
111111
conf={'subject.name.strategy':
112112
record_subject_name_strategy})
113113

114-
ctx = SerializationContext('test_subj', MessageField.VALUE)
114+
ctx = SerializationContext('test_subj', MessageField.VALUE, [])
115115
assert test_serializer._subject_name_func(ctx,
116116
test_serializer._schema_name) == 'python.test.basic'
117+
assert ctx is not None
118+
assert not ctx.headers
117119

118120

119121
def test_avro_serializer_record_subject_name_strategy_primitive(load_avsc):
@@ -127,9 +129,10 @@ def test_avro_serializer_record_subject_name_strategy_primitive(load_avsc):
127129
conf={'subject.name.strategy':
128130
record_subject_name_strategy})
129131

130-
ctx = SerializationContext('test_subj', MessageField.VALUE)
132+
ctx = SerializationContext('test_subj', MessageField.VALUE, [('header1', 'header value 1'), ])
131133
assert test_serializer._subject_name_func(ctx,
132134
test_serializer._schema_name) == 'int'
135+
assert ('header1', 'header value 1') in ctx.headers
133136

134137

135138
def test_avro_serializer_topic_record_subject_name_strategy(load_avsc):
@@ -162,6 +165,8 @@ def test_avro_serializer_topic_record_subject_name_strategy_primitive(load_avsc)
162165
ctx = SerializationContext('test_subj', MessageField.VALUE)
163166
assert test_serializer._subject_name_func(
164167
ctx, test_serializer._schema_name) == 'test_subj-int'
168+
assert ctx is not None
169+
assert ctx.headers is None
165170

166171

167172
def test_avro_serializer_subject_name_strategy_default(load_avsc):

0 commit comments

Comments
 (0)