Skip to content

Commit d945720

Browse files
rnpridgeonRyan P
authored andcommitted
ProtobufSerializer/ProtobufDeserializer for Generic Serdes API
1 parent 8d66ff8 commit d945720

32 files changed

+6172
-2
lines changed

confluent_kafka/schema_registry/protobuf.py

Lines changed: 464 additions & 0 deletions
Large diffs are not rendered by default.

examples/Makefile

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
user_pb2.py: user.proto
2+
protoc -I=. --python_out=. ./user.proto;
3+
4+
clean:
5+
rm -f $(TARGET_DIR)/*_pb2.py

examples/protobuf_consumer.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2020 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
#
20+
# This is a simple example of the SerializingProducer using protobuf.
21+
#
22+
# To regenerate Protobuf classes you must first install the protobuf
23+
# compiler. Once installed you may call protoc directly or use make.
24+
#
25+
# See the protocol buffer docs for instructions on installing and using protoc.
26+
# https://developers.google.com/protocol-buffers/docs/pythontutorial
27+
#
28+
# After installing protoc execute the following command from the examples
29+
# directory to regenerate the user_pb2 module.
30+
# `make`
31+
#
32+
import argparse
33+
34+
# Protobuf generated class; resides at ./user_pb2.py
35+
import user_pb2
36+
from confluent_kafka import DeserializingConsumer
37+
from confluent_kafka.schema_registry.protobuf import ProtobufDeserializer
38+
from confluent_kafka.serialization import StringDeserializer
39+
40+
41+
def main(args):
42+
topic = args.topic
43+
44+
protobuf_deserializer = ProtobufDeserializer(user_pb2.User)
45+
string_deserializer = StringDeserializer('utf_8')
46+
47+
consumer_conf = {'bootstrap.servers': args.bootstrap_servers,
48+
'key.deserializer': string_deserializer,
49+
'value.deserializer': protobuf_deserializer,
50+
'group.id': args.group,
51+
'auto.offset.reset': "earliest"}
52+
53+
consumer = DeserializingConsumer(consumer_conf)
54+
consumer.subscribe([topic])
55+
56+
while True:
57+
try:
58+
# SIGINT can't be handled when polling, limit timeout to 1 second.
59+
msg = consumer.poll(1.0)
60+
if msg is None:
61+
continue
62+
63+
user = msg.value()
64+
if user is not None:
65+
print("User record {}: name: {}\n"
66+
"\tfavorite_number: {}\n"
67+
"\tfavorite_color: {}\n"
68+
.format(msg.key(), user.name,
69+
user.favorite_color,
70+
user.favorite_number))
71+
except KeyboardInterrupt:
72+
break
73+
74+
consumer.close()
75+
76+
77+
if __name__ == '__main__':
78+
parser = argparse.ArgumentParser(description="DeserializingConsumer Example")
79+
parser.add_argument('-b', dest="bootstrap_servers", required=True,
80+
help="Bootstrap broker(s) (host[:port])")
81+
parser.add_argument('-s', dest="schema_registry", required=True,
82+
help="Schema Registry (http(s)://host[:port]")
83+
parser.add_argument('-t', dest="topic", default="example_serde_protobuf",
84+
help="Topic name")
85+
parser.add_argument('-g', dest="group", default="example_serde_protobuf",
86+
help="Consumer group")
87+
88+
main(parser.parse_args())

examples/protobuf_producer.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2020 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
#
20+
# This is a simple example of the SerializingProducer using protobuf.
21+
#
22+
# To regenerate Protobuf classes you must first install the protobuf
23+
# compiler. Once installed you may call protoc directly or use make.
24+
#
25+
# See the protocol buffer docs for instructions on installing and using protoc.
26+
# https://developers.google.com/protocol-buffers/docs/pythontutorial
27+
#
28+
# After installing protoc execute the following command from the examples
29+
# directory to regenerate the user_pb2 module.
30+
# `make`
31+
#
32+
import argparse
33+
from uuid import uuid4
34+
35+
from six.moves import input
36+
37+
# Protobuf generated class; resides at ./user_pb2.py
38+
import user_pb2
39+
from confluent_kafka import SerializingProducer
40+
from confluent_kafka.serialization import StringSerializer
41+
from confluent_kafka.schema_registry import SchemaRegistryClient
42+
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer
43+
44+
45+
def delivery_report(err, msg):
46+
"""
47+
Reports the failure or success of a message delivery.
48+
49+
Args:
50+
err (KafkaError): The error that occurred on None on success.
51+
52+
msg (Message): The message that was produced or failed.
53+
54+
Note:
55+
In the delivery report callback the Message.key() and Message.value()
56+
will be the binary format as encoded by any configured Serializers and
57+
not the same object that was passed to produce().
58+
If you wish to pass the original object(s) for key and value to delivery
59+
report callback we recommend a bound callback or lambda where you pass
60+
the objects along.
61+
62+
"""
63+
if err is not None:
64+
print("Delivery failed for User record {}: {}".format(msg.key(), err))
65+
return
66+
print('User record {} successfully produced to {} [{}] at offset {}'.format(
67+
msg.key(), msg.topic(), msg.partition(), msg.offset()))
68+
69+
70+
def main(args):
71+
topic = args.topic
72+
73+
schema_registry_conf = {'url': args.schema_registry}
74+
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
75+
76+
protobuf_serializer = ProtobufSerializer(user_pb2.User,
77+
schema_registry_client)
78+
79+
producer_conf = {'bootstrap.servers': args.bootstrap_servers,
80+
'key.serializer': StringSerializer('utf_8'),
81+
'value.serializer': protobuf_serializer}
82+
83+
producer = SerializingProducer(producer_conf)
84+
85+
print("Producing user records to topic {}. ^C to exit.".format(topic))
86+
while True:
87+
# Serve on_delivery callbacks from previous calls to produce()
88+
producer.poll(0.0)
89+
try:
90+
user_name = input("Enter name: ")
91+
user_favorite_number = int(input("Enter favorite number: "))
92+
user_favorite_color = input("Enter favorite color: ")
93+
user = user_pb2.User(name=user_name,
94+
favorite_color=user_favorite_color,
95+
favorite_number=user_favorite_number)
96+
producer.produce(topic=topic, key=str(uuid4()), value=user,
97+
on_delivery=delivery_report)
98+
except KeyboardInterrupt:
99+
break
100+
except ValueError:
101+
print("Invalid input, discarding record...")
102+
continue
103+
104+
print("\nFlushing records...")
105+
producer.flush()
106+
107+
108+
if __name__ == '__main__':
109+
parser = argparse.ArgumentParser(description="SerializingProducer Example")
110+
parser.add_argument('-b', dest="bootstrap_servers", required=True,
111+
help="Bootstrap broker(s) (host[:port])")
112+
parser.add_argument('-s', dest="schema_registry", required=True,
113+
help="Schema Registry (http(s)://host[:port]")
114+
parser.add_argument('-t', dest="topic", default="example_serde_protobuf",
115+
help="Topic name")
116+
117+
main(parser.parse_args())

examples/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ pydantic
44
uvicorn
55
six
66
jsonschema
7+
protobuf

examples/user.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
syntax = "proto3";
2+
3+
message User {
4+
string name = 1;
5+
int64 favorite_number = 2;
6+
string favorite_color = 3;
7+
}

examples/user_pb2.py

Lines changed: 83 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
syntax = "proto3";
2+
3+
import "NestedTestProto.proto";
4+
import "PublicTestProto.proto";
5+
6+
package tests.integration.serialization.data;
7+
8+
option java_package = "io.confluent.kafka.serializers.protobuf.test";
9+
10+
message DependencyMessage {
11+
NestedMessage nested_message = 1;
12+
bool is_active = 2;
13+
TestMessage test_messsage = 3;
14+
}

tests/integration/schema_registry/data/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@ compile: $(PROTOS)
1313
done
1414

1515
clean:
16-
rm $(TARGET_DIR)/*_pb2.py
16+
rm -f $(TARGET_DIR)/*_pb2.py
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
syntax = "proto3";
2+
3+
import "google/protobuf/timestamp.proto";
4+
5+
package tests.integration.serialization.data;
6+
7+
option java_package = "io.confluent.kafka.serializers.protobuf.test";
8+
option java_outer_classname = "NestedTestProto";
9+
option java_multiple_files = false;
10+
11+
message UserId {
12+
oneof user_id {
13+
string kafka_user_id = 1;
14+
int32 other_user_id = 2;
15+
MessageId another_id = 3;
16+
}
17+
}
18+
19+
message MessageId {
20+
string id = 1;
21+
}
22+
23+
enum Status {
24+
ACTIVE = 0;
25+
INACTIVE = 1;
26+
}
27+
28+
message ComplexType {
29+
oneof some_val {
30+
string one_id = 1;
31+
int32 other_id = 2;
32+
}
33+
bool is_active = 3;
34+
}
35+
36+
/*
37+
* Complex message using nested protos and repeated fields
38+
*/
39+
message NestedMessage {
40+
UserId user_id = 1;
41+
bool is_active = 2;
42+
repeated string experiments_active = 3;
43+
Status status = 5;
44+
ComplexType complex_type = 6;
45+
map<string, string> map_type = 7;
46+
InnerMessage inner = 8;
47+
48+
message InnerMessage {
49+
string id = 1 [json_name="id"];
50+
repeated int32 ids = 2 [packed=true];
51+
}
52+
53+
enum InnerEnum {
54+
option allow_alias = true;
55+
ZERO = 0;
56+
ALSO_ZERO = 0;
57+
}
58+
59+
reserved 14, 15, 9 to 11;
60+
reserved "foo", "bar";
61+
}

0 commit comments

Comments
 (0)