1
- import type { Message } from 'protobufjs' ;
1
+ import { BufferReader , type Message } from 'protobufjs' ;
2
2
import { KafkaConsumerDeserializationError } from '../errors.js' ;
3
- import type { ProtobufMessage } from '../types/types.js' ;
3
+ import type { ProtobufMessage , SchemaMetadata } from '../types/types.js' ;
4
+
5
+ /**
6
+ * Default order of varint types used in Protobuf to attempt deserializing Confluent Schema Registry messages.
7
+ */
8
+ const varintOrder : Array < 'int32' | 'sint32' > = [ 'int32' , 'sint32' ] ;
4
9
5
10
/**
6
11
* Deserialize a Protobuf message from a base64-encoded string.
@@ -10,15 +15,80 @@ import type { ProtobufMessage } from '../types/types.js';
10
15
* @param data - The base64-encoded string representing the Protobuf binary data.
11
16
* @param messageType - The Protobuf message type definition - see {@link Message | `Message`} from {@link https://www.npmjs.com/package/protobufjs | `protobufjs`}.
12
17
*/
13
- const deserialize = < T > ( data : string , messageType : ProtobufMessage < T > ) : T => {
18
+ const deserialize = < T > (
19
+ data : string ,
20
+ messageType : ProtobufMessage < T > ,
21
+ schemaMetadata : SchemaMetadata
22
+ ) : T => {
23
+ const buffer = Buffer . from ( data , 'base64' ) ;
14
24
try {
15
- const buffer = Buffer . from ( data , 'base64' ) ;
16
- return messageType . decode ( buffer , buffer . length ) ;
25
+ if ( schemaMetadata . schemaId === undefined ) {
26
+ return messageType . decode ( buffer , buffer . length ) ;
27
+ }
28
+ /**
29
+ * If `schemaId` is longer than 10 chars, it's an UUID, otherwise it's a numeric ID.
30
+ *
31
+ * When this is the case, we know the schema is coming from Glue Schema Registry,
32
+ * and the first byte of the buffer is a magic byte that we need to remove before
33
+ * decoding the message.
34
+ */
35
+ if ( schemaMetadata . schemaId . length > 10 ) {
36
+ // remove the first byte from the buffer
37
+ const reader = new BufferReader ( buffer ) ;
38
+ reader . uint32 ( ) ;
39
+ return messageType . decode ( reader ) ;
40
+ }
17
41
} catch ( error ) {
18
42
throw new KafkaConsumerDeserializationError (
19
43
`Failed to deserialize Protobuf message: ${ error } , message: ${ data } , messageType: ${ messageType } `
20
44
) ;
21
45
}
46
+
47
+ /**
48
+ * If schemaId is numeric, inferred from its length, we know it's coming from Confluent Schema Registry,
49
+ * so we need to remove the MessageIndex bytes.
50
+ * We don't know the type of the index, so we try both `int32` and `sint32`. If both fail, we throw an error.
51
+ */
52
+ try {
53
+ const newBuffer = clipConfluentSchemaRegistryBuffer ( buffer , varintOrder [ 0 ] ) ;
54
+ return messageType . decode ( newBuffer ) ;
55
+ } catch ( error ) {
56
+ try {
57
+ const newBuffer = clipConfluentSchemaRegistryBuffer (
58
+ buffer ,
59
+ varintOrder [ 1 ]
60
+ ) ;
61
+ const decoded = messageType . decode ( newBuffer ) ;
62
+ // swap varint order if the first attempt failed so we can use the correct one for subsequent messages
63
+ varintOrder . reverse ( ) ;
64
+ return decoded ;
65
+ } catch {
66
+ throw new KafkaConsumerDeserializationError (
67
+ `Failed to deserialize Protobuf message: ${ error } , message: ${ data } , messageType: ${ messageType } `
68
+ ) ;
69
+ }
70
+ }
71
+ } ;
72
+
73
+ /**
74
+ * Clip the Confluent Schema Registry buffer to remove the index bytes.
75
+ *
76
+ * @param buffer - The buffer to clip.
77
+ * @param intType - The type of the integer to read from the buffer, either 'int32' or 'sint32'.
78
+ */
79
+ const clipConfluentSchemaRegistryBuffer = (
80
+ buffer : Buffer ,
81
+ intType : 'int32' | 'sint32'
82
+ ) => {
83
+ const reader = new BufferReader ( buffer ) ;
84
+ /**
85
+ * Read the first varint byte to get the index count or 0.
86
+ * Doing so, also advances the reader position to the next byte after the index count.
87
+ */
88
+ const indexCount = intType === 'int32' ? reader . int32 ( ) : reader . sint32 ( ) ;
89
+ // Skip the index bytes
90
+ reader . skip ( indexCount ) ;
91
+ return reader ;
22
92
} ;
23
93
24
94
export { deserialize } ;
0 commit comments