1919import sys
2020import base64
2121import struct
22+ import warnings
2223from collections import deque
2324
2425from google .protobuf .message import DecodeError
@@ -110,11 +111,6 @@ def _create_msg_index(msg_desc):
110111 if not found :
111112 raise ValueError ("MessageDescriptor not found in file" )
112113
113- # The root element at the 0 position does not need a length prefix.
114- if len (msg_idx ) == 1 and msg_idx [0 ] == 0 :
115- return [0 ]
116-
117- msg_idx .appendleft (len (msg_idx ))
118114 return list (msg_idx )
119115
120116
@@ -169,6 +165,17 @@ class ProtobufSerializer(object):
169165 | | | Schema Registry subject names for Schema References |
170166 | | | Defaults to reference_subject_name_strategy |
171167 +-------------------------------------+----------+------------------------------------------------------+
168+ | ``use.deprecated.format`` | bool | Specifies whether the Protobuf serializer should |
169+ | | | serialize message indexes without zig-zag encoding. |
170+ | | | This option must be explicitly configured as older |
171+ | | | and newer Protobuf producers are incompatible. |
172+ | | | If the consumers of the topic being produced to are |
173+ | | | using confluent-kafka-python <1.8 then this property |
174+ | | | must be set to True until all old consumers have |
175+ | | | have been upgraded. |
176+ | | | Warning: This configuration property will be removed |
177+ | | | in a future version of the client. |
178+ +-------------------------------------+----------+------------------------------------------------------+
172179
173180 Schemas are registered to namespaces known as Subjects which define how a
174181 schema may evolve over time. By default the subject name is formed by
@@ -208,17 +215,27 @@ class ProtobufSerializer(object):
208215 __slots__ = ['_auto_register' , '_use_latest_version' , '_skip_known_types' ,
209216 '_registry' , '_known_subjects' ,
210217 '_msg_class' , '_msg_index' , '_schema' , '_schema_id' ,
211- '_ref_reference_subject_func' , '_subject_name_func' ]
218+ '_ref_reference_subject_func' , '_subject_name_func' ,
219+ '_use_deprecated_format' ]
212220 # default configuration
213221 _default_conf = {
214222 'auto.register.schemas' : True ,
215223 'use.latest.version' : False ,
216224 'skip.known.types' : False ,
217225 'subject.name.strategy' : topic_subject_name_strategy ,
218- 'reference.subject.name.strategy' : reference_subject_name_strategy
226+ 'reference.subject.name.strategy' : reference_subject_name_strategy ,
227+ 'use.deprecated.format' : False ,
219228 }
220229
221230 def __init__ (self , msg_type , schema_registry_client , conf = None ):
231+
232+ if conf is None or 'use.deprecated.format' not in conf :
233+ raise RuntimeError (
234+ "ProtobufSerializer: the 'use.deprecated.format' configuration "
235+ "property must be explicitly set due to backward incompatibility "
236+ "with older confluent-kafka-python Protobuf producers and consumers. "
237+ "See the release notes for more details" )
238+
222239 # handle configuration
223240 conf_copy = self ._default_conf .copy ()
224241 if conf is not None :
@@ -238,6 +255,19 @@ def __init__(self, msg_type, schema_registry_client, conf=None):
238255 if not isinstance (self ._skip_known_types , bool ):
239256 raise ValueError ("skip.known.types must be a boolean value" )
240257
258+ self ._use_deprecated_format = conf_copy .pop ('use.deprecated.format' )
259+ if not isinstance (self ._use_deprecated_format , bool ):
260+ raise ValueError ("use.deprecated.format must be a boolean value" )
261+ if not self ._use_deprecated_format :
262+ warnings .warn ("ProtobufSerializer: the 'use.deprecated.format' "
263+ "configuration property, and the ability to use the "
264+ "old incorrect Protobuf serializer heading format "
265+ "introduced in confluent-kafka-python v1.4.0, "
266+ "will be removed in an upcoming release in 2021 Q2. "
267+ "Please migrate your Python Protobuf producers and "
268+ "consumers to 'use.deprecated.format':True as "
269+ "soon as possible" )
270+
241271 self ._subject_name_func = conf_copy .pop ('subject.name.strategy' )
242272 if not callable (self ._subject_name_func ):
243273 raise ValueError ("subject.name.strategy must be callable" )
@@ -263,20 +293,46 @@ def __init__(self, msg_type, schema_registry_client, conf=None):
263293 schema_type = 'PROTOBUF' )
264294
265295 @staticmethod
266- def _encode_uvarints (buf , ints ):
296+ def _write_varint (buf , val , zigzag = True ):
297+ """
298+ Writes val to buf, either using zigzag or uvarint encoding.
299+
300+ Args:
301+ buf (BytesIO): buffer to write to.
302+ val (int): integer to be encoded.
303+ zigzag (bool): whether to encode in zigzag or uvarint encoding
304+ """
305+
306+ if zigzag :
307+ val = (val << 1 ) ^ (val >> 63 )
308+
309+ while (val & ~ 0x7f ) != 0 :
310+ buf .write (_bytes ((val & 0x7f ) | 0x80 ))
311+ val >>= 7
312+ buf .write (_bytes (val ))
313+
314+ @staticmethod
315+ def _encode_varints (buf , ints , zigzag = True ):
267316 """
268317 Encodes each int as a uvarint onto buf
269318
270319 Args:
271320 buf (BytesIO): buffer to write to.
272321 ints ([int]): ints to be encoded.
322+ zigzag (bool): whether to encode in zigzag or uvarint encoding
273323
274324 """
325+
326+ assert len (ints ) > 0
327+ # The root element at the 0 position does not need a length prefix.
328+ if ints == [0 ]:
329+ buf .write (_bytes (0x00 ))
330+ return
331+
332+ ProtobufSerializer ._write_varint (buf , len (ints ), zigzag = zigzag )
333+
275334 for value in ints :
276- while (value & ~ 0x7f ) != 0 :
277- buf .write (_bytes ((value & 0x7f ) | 0x80 ))
278- value >>= 7
279- buf .write (_bytes (value ))
335+ ProtobufSerializer ._write_varint (buf , value , zigzag = zigzag )
280336
281337 def _resolve_dependencies (self , ctx , file_desc ):
282338 """
@@ -361,7 +417,8 @@ def __call__(self, message_type, ctx):
361417 # (big endian)
362418 fo .write (struct .pack ('>bI' , _MAGIC_BYTE , self ._schema_id ))
363419 # write the record index to the buffer
364- self ._encode_uvarints (fo , self ._msg_index )
420+ self ._encode_varints (fo , self ._msg_index ,
421+ zigzag = not self ._use_deprecated_format )
365422 # write the record itself
366423 fo .write (message_type .SerializeToString ())
367424 return fo .getvalue ()
@@ -374,28 +431,82 @@ class ProtobufDeserializer(object):
374431
375432 Args:
376433 message_type (GeneratedProtocolMessageType): Protobuf Message type.
434+ conf (dict): Configuration dictionary.
435+
436+ ProtobufDeserializer configuration properties:
437+
438+ +-------------------------------------+----------+------------------------------------------------------+
439+ | Property Name | Type | Description |
440+ +-------------------------------------+----------+------------------------------------------------------+
441+ | ``use.deprecated.format`` | bool | Specifies whether the Protobuf deserializer should |
442+ | | | deserialize message indexes without zig-zag encoding.|
443+ | | | This option must be explicitly configured as older |
444+ | | | and newer Protobuf producers are incompatible. |
445+ | | | If Protobuf messages in the topic to consume were |
446+ | | | produced with confluent-kafka-python <1.8 then this |
447+ | | | property must be set to True until all old messages |
448+ | | | have been processed and producers have been upgraded.|
449+ | | | Warning: This configuration property will be removed |
450+ | | | in a future version of the client. |
451+ +-------------------------------------+----------+------------------------------------------------------+
452+
377453
378454 See Also:
379455 `Protobuf API reference <https://googleapis.dev/python/protobuf/latest/google/protobuf.html>`_
380456
381457 """
382- __slots__ = ['_msg_class' , '_msg_index' ]
458+ __slots__ = ['_msg_class' , '_msg_index' , '_use_deprecated_format' ]
459+
460+ # default configuration
461+ _default_conf = {
462+ 'use.deprecated.format' : False ,
463+ }
464+
465+ def __init__ (self , message_type , conf = None ):
466+
467+ # Require use.deprecated.format to be explicitly configured
468+ # during a transitionary period since old/new format are
469+ # incompatible.
470+ if conf is None or 'use.deprecated.format' not in conf :
471+ raise RuntimeError (
472+ "ProtobufDeserializer: the 'use.deprecated.format' configuration "
473+ "property must be explicitly set due to backward incompatibility "
474+ "with older confluent-kafka-python Protobuf producers and consumers. "
475+ "See the release notes for more details" )
476+
477+ # handle configuration
478+ conf_copy = self ._default_conf .copy ()
479+ if conf is not None :
480+ conf_copy .update (conf )
481+
482+ self ._use_deprecated_format = conf_copy .pop ('use.deprecated.format' )
483+ if not isinstance (self ._use_deprecated_format , bool ):
484+ raise ValueError ("use.deprecated.format must be a boolean value" )
485+ if not self ._use_deprecated_format :
486+ warnings .warn ("ProtobufDeserializer: the 'use.deprecated.format' "
487+ "configuration property, and the ability to use the "
488+ "old incorrect Protobuf serializer heading format "
489+ "introduced in confluent-kafka-python v1.4.0, "
490+ "will be removed in an upcoming release in 2022 Q2. "
491+ "Please migrate your Python Protobuf producers and "
492+ "consumers to 'use.deprecated.format':True as "
493+ "soon as possible" )
383494
384- def __init__ (self , message_type ):
385495 descriptor = message_type .DESCRIPTOR
386496 self ._msg_index = _create_msg_index (descriptor )
387497 self ._msg_class = MessageFactory ().GetPrototype (descriptor )
388498
389499 @staticmethod
390- def _decode_uvarint (buf ):
500+ def _decode_varint (buf , zigzag = True ):
391501 """
392- Decodes a single uvarint from a buffer.
502+ Decodes a single varint from a buffer.
393503
394504 Args:
395505 buf (BytesIO): buffer to read from
506+ zigzag (bool): decode as zigzag or uvarint
396507
397508 Returns:
398- int: decoded uvarint
509+ int: decoded varint
399510
400511 Raises:
401512 EOFError: if buffer is empty
@@ -410,7 +521,12 @@ def _decode_uvarint(buf):
410521 value |= (i & 0x7f ) << shift
411522 shift += 7
412523 if not (i & 0x80 ):
413- return value
524+ break
525+
526+ if zigzag :
527+ value = (value >> 1 ) ^ - (value & 1 )
528+
529+ return value
414530
415531 except EOFError :
416532 raise EOFError ("Unexpected EOF while reading index" )
@@ -432,7 +548,7 @@ def _read_byte(buf):
432548 return ord (i )
433549
434550 @staticmethod
435- def _decode_index (buf ):
551+ def _decode_index (buf , zigzag = True ):
436552 """
437553 Extracts message index from Schema Registry Protobuf formatted bytes.
438554
@@ -443,10 +559,17 @@ def _decode_index(buf):
443559 int: Protobuf Message index.
444560
445561 """
446- size = ProtobufDeserializer ._decode_uvarint (buf )
447- msg_index = [size ]
562+ size = ProtobufDeserializer ._decode_varint (buf , zigzag = zigzag )
563+ if size < 0 or size > 100000 :
564+ raise DecodeError ("Invalid Protobuf msgidx array length" )
565+
566+ if size == 0 :
567+ return [0 ]
568+
569+ msg_index = []
448570 for _ in range (size ):
449- msg_index .append (ProtobufDeserializer ._decode_uvarint (buf ))
571+ msg_index .append (ProtobufDeserializer ._decode_varint (buf ,
572+ zigzag = zigzag ))
450573
451574 return msg_index
452575
@@ -486,7 +609,7 @@ def __call__(self, value, ctx):
486609
487610 # Protobuf Messages are self-describing; no need to query schema
488611 # Move the reader cursor past the index
489- _ = ProtobufDeserializer ._decode_index (payload )
612+ _ = self ._decode_index (payload , zigzag = not self . _use_deprecated_format )
490613 msg = self ._msg_class ()
491614 try :
492615 msg .ParseFromString (payload .read ())
0 commit comments