4747 one_id = 'oneof_str' ,
4848 is_active = False )})
4949])
50- def test_protobuf_message_serialization (kafka_cluster , pb2 , data ):
50+ def test_protobuf_message_serialization (kafka_single_broker_cluster , pb2 , data ):
5151 """
5252 Validates that we get the same message back that we put in.
5353
5454 """
55- topic = kafka_cluster .create_topic ("serialization-proto" )
56- sr = kafka_cluster .schema_registry ()
55+ topic = kafka_single_broker_cluster .create_topic ("serialization-proto" )
56+ sr = kafka_single_broker_cluster .schema_registry ()
5757
5858 value_serializer = ProtobufSerializer (pb2 , sr , {'use.deprecated.format' : False })
5959 value_deserializer = ProtobufDeserializer (pb2 , {'use.deprecated.format' : False })
6060
61- producer = kafka_cluster .producer (value_serializer = value_serializer )
62- consumer = kafka_cluster .consumer (value_deserializer = value_deserializer )
61+ producer = kafka_single_broker_cluster .producer (value_serializer = value_serializer )
62+ consumer = kafka_single_broker_cluster .consumer (value_deserializer = value_deserializer )
6363 consumer .assign ([TopicPartition (topic , 0 )])
6464
6565 expect = pb2 (** data )
@@ -78,16 +78,16 @@ def test_protobuf_message_serialization(kafka_cluster, pb2, data):
7878 (DependencyMessage , ['NestedTestProto.proto' , 'PublicTestProto.proto' ]),
7979 (ClickCas , ['metadata_proto.proto' , 'common_proto.proto' ])
8080])
81- def test_protobuf_reference_registration (kafka_cluster , pb2 , expected_refs ):
81+ def test_protobuf_reference_registration (kafka_single_broker_cluster , pb2 , expected_refs ):
8282 """
8383 Registers multiple messages with dependencies then queries the Schema
8484 Registry to ensure the references match up.
8585
8686 """
87- sr = kafka_cluster .schema_registry ()
88- topic = kafka_cluster .create_topic ("serialization-proto-refs" )
87+ sr = kafka_single_broker_cluster .schema_registry ()
88+ topic = kafka_single_broker_cluster .create_topic ("serialization-proto-refs" )
8989 serializer = ProtobufSerializer (pb2 , sr , {'use.deprecated.format' : False })
90- producer = kafka_cluster .producer (key_serializer = serializer )
90+ producer = kafka_single_broker_cluster .producer (key_serializer = serializer )
9191
9292 producer .produce (topic , key = pb2 (), partition = 0 )
9393 producer .flush ()
@@ -97,19 +97,19 @@ def test_protobuf_reference_registration(kafka_cluster, pb2, expected_refs):
9797 assert expected_refs .sort () == [ref .name for ref in registered_refs ].sort ()
9898
9999
100- def test_protobuf_serializer_type_mismatch (kafka_cluster ):
100+ def test_protobuf_serializer_type_mismatch (kafka_single_broker_cluster ):
101101 """
102102 Ensures an Exception is raised when deserializing an unexpected type.
103103
104104 """
105105 pb2_1 = TestProto_pb2 .TestMessage
106106 pb2_2 = NestedTestProto_pb2 .NestedMessage
107107
108- sr = kafka_cluster .schema_registry ()
109- topic = kafka_cluster .create_topic ("serialization-proto-refs" )
108+ sr = kafka_single_broker_cluster .schema_registry ()
109+ topic = kafka_single_broker_cluster .create_topic ("serialization-proto-refs" )
110110 serializer = ProtobufSerializer (pb2_1 , sr , {'use.deprecated.format' : False })
111111
112- producer = kafka_cluster .producer (key_serializer = serializer )
112+ producer = kafka_single_broker_cluster .producer (key_serializer = serializer )
113113
114114 with pytest .raises (KafkaException ,
115115 match = r"message must be of type <class"
@@ -118,21 +118,21 @@ def test_protobuf_serializer_type_mismatch(kafka_cluster):
118118 producer .produce (topic , key = pb2_2 ())
119119
120120
121- def test_protobuf_deserializer_type_mismatch (kafka_cluster ):
121+ def test_protobuf_deserializer_type_mismatch (kafka_single_broker_cluster ):
122122 """
123123 Ensures an Exception is raised when deserializing an unexpected type.
124124
125125 """
126126 pb2_1 = PublicTestProto_pb2 .TestMessage
127127 pb2_2 = metadata_proto_pb2 .HDFSOptions
128128
129- sr = kafka_cluster .schema_registry ()
130- topic = kafka_cluster .create_topic ("serialization-proto-refs" )
129+ sr = kafka_single_broker_cluster .schema_registry ()
130+ topic = kafka_single_broker_cluster .create_topic ("serialization-proto-refs" )
131131 serializer = ProtobufSerializer (pb2_1 , sr , {'use.deprecated.format' : False })
132132 deserializer = ProtobufDeserializer (pb2_2 , {'use.deprecated.format' : False })
133133
134- producer = kafka_cluster .producer (key_serializer = serializer )
135- consumer = kafka_cluster .consumer (key_deserializer = deserializer )
134+ producer = kafka_single_broker_cluster .producer (key_serializer = serializer )
135+ consumer = kafka_single_broker_cluster .consumer (key_deserializer = deserializer )
136136 consumer .assign ([TopicPartition (topic , 0 )])
137137
138138 def dr (err , msg ):
0 commit comments