Skip to content

Commit ea9513e

Browse files
authored
Fix Protobuf field transform to use schema from SR (#138)
* Fix Protobuf field transform to use schema from SR * Minor refactoring * Minor formatting
1 parent c32cf88 commit ea9513e

File tree

3 files changed

+83
-45
lines changed

3 files changed

+83
-45
lines changed

schemaregistry/serde/avro.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ async function transform(ctx: RuleContext, schema: Type, msg: any, fieldTransfor
285285
const recordSchema = schema as RecordType
286286
const record = msg as Record<string, any>
287287
for (const field of recordSchema.fields) {
288-
await transformField(ctx, recordSchema, field, record, record[field.name], fieldTransform)
288+
await transformField(ctx, recordSchema, field, record, fieldTransform)
289289
}
290290
return record
291291
default:
@@ -304,13 +304,12 @@ async function transformField(
304304
recordSchema: RecordType,
305305
field: Field,
306306
record: Record<string, any>,
307-
val: any,
308307
fieldTransform: FieldTransform,
309308
): Promise<void> {
310309
const fullName = recordSchema.name + '.' + field.name
311310
try {
312311
ctx.enterField(
313-
val,
312+
record,
314313
fullName,
315314
field.name,
316315
getType(field.type),

schemaregistry/serde/json.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -350,8 +350,7 @@ async function transform(ctx: RuleContext, schema: DereferencedJSONSchema, path:
350350
case FieldType.RECORD:
351351
if (schema.properties != null) {
352352
for (let [propName, propSchema] of Object.entries(schema.properties)) {
353-
let value = msg[propName]
354-
await transformField(ctx, path, propName, msg, value, propSchema, fieldTransform)
353+
await transformField(ctx, path, propName, msg, propSchema, fieldTransform)
355354
}
356355
}
357356
return msg
@@ -371,12 +370,13 @@ async function transform(ctx: RuleContext, schema: DereferencedJSONSchema, path:
371370
return msg
372371
}
373372

374-
async function transformField(ctx: RuleContext, path: string, propName: string, msg: any, value: any,
375-
propSchema: DereferencedJSONSchema,
376-
fieldTransform: FieldTransform): Promise<void> {
373+
async function transformField(ctx: RuleContext, path: string, propName: string, msg: any,
374+
propSchema: DereferencedJSONSchema,
375+
fieldTransform: FieldTransform): Promise<void> {
377376
const fullName = path + '.' + propName
378377
try {
379378
ctx.enterField(msg, fullName, propName, getType(propSchema), getInlineTags(propSchema))
379+
let value = msg[propName]
380380
const newVal = await transform(ctx, propSchema, fullName, value, fieldTransform)
381381
if (ctx.rule.kind === 'CONDITION') {
382382
if (newVal === false) {

schemaregistry/serde/protobuf.ts

Lines changed: 76 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ import {
3232
file_google_protobuf_field_mask,
3333
file_google_protobuf_source_context,
3434
file_google_protobuf_struct,
35-
file_google_protobuf_timestamp, file_google_protobuf_type, file_google_protobuf_wrappers,
35+
file_google_protobuf_timestamp,
36+
file_google_protobuf_type,
37+
file_google_protobuf_wrappers,
38+
FileDescriptorProto,
3639
FileDescriptorProtoSchema
3740
} from "@bufbuild/protobuf/wkt";
3841
import { BufferWrapper, MAX_VARINT_LEN_64 } from "./buffer-wrapper";
@@ -100,6 +103,7 @@ export type ProtobufSerializerConfig = SerializerConfig & {
100103
*/
101104
export class ProtobufSerializer extends Serializer implements ProtobufSerde {
102105
registry: MutableRegistry
106+
fileRegistry: FileRegistry
103107
schemaToDescCache: LRUCache<string, DescFile>
104108
descToSchemaCache: LRUCache<string, SchemaInfo>
105109

@@ -113,6 +117,7 @@ export class ProtobufSerializer extends Serializer implements ProtobufSerde {
113117
constructor(client: Client, serdeType: SerdeType, conf: ProtobufSerializerConfig, ruleRegistry?: RuleRegistry) {
114118
super(client, serdeType, conf, ruleRegistry)
115119
this.registry = conf.registry ?? createMutableRegistry()
120+
this.fileRegistry = createFileRegistry()
116121
this.schemaToDescCache = new LRUCache<string, DescFile>({ max: this.config().cacheCapacity ?? 1000 } )
117122
this.descToSchemaCache = new LRUCache<string, SchemaInfo>({ max: this.config().cacheCapacity ?? 1000 } )
118123
this.fieldTransformer = async (ctx: RuleContext, fieldTransform: FieldTransform, msg: any) => {
@@ -281,15 +286,41 @@ export class ProtobufSerializer extends Serializer implements ProtobufSerde {
281286
}
282287

283288
async fieldTransform(ctx: RuleContext, fieldTransform: FieldTransform, msg: any): Promise<any> {
289+
const fileDesc = await this.toFileDesc(this.client, ctx.target)
284290
const typeName = msg.$typeName
285-
if (typeName == null) {
286-
throw new SerializationError('message type name is empty')
291+
const messageDesc = this.toMessageDescFromName(fileDesc, typeName)
292+
return await transform(ctx, messageDesc, msg, fieldTransform)
293+
}
294+
295+
async toFileDesc(client: Client, info: SchemaInfo): Promise<DescFile> {
296+
const value = this.schemaToDescCache.get(stringify(info.schema))
297+
if (value != null) {
298+
return value
287299
}
288-
const messageDesc = this.registry.getMessage(typeName)
289-
if (messageDesc == null) {
290-
throw new SerializationError('message descriptor not in registry')
300+
const fileDesc = await this.parseFileDesc(client, info)
301+
if (fileDesc == null) {
302+
throw new SerializationError('file descriptor not found')
291303
}
292-
return await transform(ctx, messageDesc, msg, fieldTransform)
304+
this.schemaToDescCache.set(stringify(info.schema), fileDesc)
305+
return fileDesc
306+
}
307+
308+
async parseFileDesc(client: Client, info: SchemaInfo): Promise<DescFile | undefined> {
309+
const deps = new Map<string, string>()
310+
await this.resolveReferences(client, info, deps, 'serialized')
311+
const fileDesc = fromBinary(FileDescriptorProtoSchema, Buffer.from(info.schema, 'base64'))
312+
const fileRegistry = newFileRegistry(fileDesc, deps)
313+
this.fileRegistry = createFileRegistry(this.fileRegistry, fileRegistry)
314+
return this.fileRegistry.getFile(fileDesc.name)
315+
}
316+
317+
toMessageDescFromName(fd: DescFile, msgName: string): DescMessage {
318+
for (let i = 0; i < fd.messages.length; i++) {
319+
if (fd.messages[i].typeName === msgName) {
320+
return fd.messages[i]
321+
}
322+
}
323+
throw new SerializationError('message descriptor not found')
293324
}
294325
}
295326

@@ -302,7 +333,7 @@ export type ProtobufDeserializerConfig = DeserializerConfig
302333
* ProtobufDeserializer is a deserializer for Protobuf messages.
303334
*/
304335
export class ProtobufDeserializer extends Deserializer implements ProtobufSerde {
305-
registry: FileRegistry
336+
fileRegistry: FileRegistry
306337
schemaToDescCache: LRUCache<string, DescFile>
307338

308339
/**
@@ -314,7 +345,7 @@ export class ProtobufDeserializer extends Deserializer implements ProtobufSerde
314345
*/
315346
constructor(client: Client, serdeType: SerdeType, conf: ProtobufDeserializerConfig, ruleRegistry?: RuleRegistry) {
316347
super(client, serdeType, conf, ruleRegistry)
317-
this.registry = createFileRegistry()
348+
this.fileRegistry = createFileRegistry()
318349
this.schemaToDescCache = new LRUCache<string, DescFile>({ max: this.config().cacheCapacity ?? 1000 } )
319350
this.fieldTransformer = async (ctx: RuleContext, fieldTransform: FieldTransform, msg: any) => {
320351
return await this.fieldTransform(ctx, fieldTransform, msg)
@@ -340,7 +371,7 @@ export class ProtobufDeserializer extends Deserializer implements ProtobufSerde
340371
const info = await this.getSchema(topic, payload, 'serialized')
341372
const fd = await this.toFileDesc(this.client, info)
342373
const [bytesRead, msgIndexes] = this.readMessageIndexes(payload.subarray(5))
343-
const messageDesc = this.toMessageDesc(fd, msgIndexes)
374+
const messageDesc = this.toMessageDescFromIndexes(fd, msgIndexes)
344375

345376
const subject = this.subjectName(topic, info)
346377
const readerMeta = await this.getReaderSchema(subject, 'serialized')
@@ -361,14 +392,9 @@ export class ProtobufDeserializer extends Deserializer implements ProtobufSerde
361392
}
362393

363394
async fieldTransform(ctx: RuleContext, fieldTransform: FieldTransform, msg: any): Promise<any> {
395+
const fileDesc = await this.toFileDesc(this.client, ctx.target)
364396
const typeName = msg.$typeName
365-
if (typeName == null) {
366-
throw new SerializationError('message type name is empty')
367-
}
368-
const messageDesc = this.registry.getMessage(typeName)
369-
if (messageDesc == null) {
370-
throw new SerializationError('message descriptor not in registry')
371-
}
397+
const messageDesc = this.toMessageDescFromName(fileDesc, typeName)
372398
return await transform(ctx, messageDesc, msg, fieldTransform)
373399
}
374400

@@ -389,26 +415,18 @@ export class ProtobufDeserializer extends Deserializer implements ProtobufSerde
389415
const deps = new Map<string, string>()
390416
await this.resolveReferences(client, info, deps, 'serialized')
391417
const fileDesc = fromBinary(FileDescriptorProtoSchema, Buffer.from(info.schema, 'base64'))
392-
const resolve = (depName: string) => {
393-
if (isBuiltin(depName)) {
394-
const dep = builtinDeps.get(depName)
395-
if (dep == null) {
396-
throw new SerializationError(`dependency ${depName} not found`)
397-
}
398-
return dep
399-
} else {
400-
const dep = deps.get(depName)
401-
if (dep == null) {
402-
throw new SerializationError(`dependency ${depName} not found`)
403-
}
404-
const fileDesc = fromBinary(FileDescriptorProtoSchema, Buffer.from(dep, 'base64'))
405-
fileDesc.name = depName
406-
return fileDesc
418+
const fileRegistry = newFileRegistry(fileDesc, deps)
419+
this.fileRegistry = createFileRegistry(this.fileRegistry, fileRegistry)
420+
return this.fileRegistry.getFile(fileDesc.name)
421+
}
422+
423+
toMessageDescFromName(fd: DescFile, msgName: string): DescMessage {
424+
for (let i = 0; i < fd.messages.length; i++) {
425+
if (fd.messages[i].typeName === msgName) {
426+
return fd.messages[i]
407427
}
408428
}
409-
const fileRegistry = createFileRegistry(fileDesc, resolve)
410-
this.registry = createFileRegistry(this.registry, fileRegistry)
411-
return this.registry.getFile(fileDesc.name)
429+
throw new SerializationError('message descriptor not found')
412430
}
413431

414432
readMessageIndexes(payload: Buffer): [number, number[]] {
@@ -421,7 +439,7 @@ export class ProtobufDeserializer extends Deserializer implements ProtobufSerde
421439
return [bw.pos, msgIndexes]
422440
}
423441

424-
toMessageDesc(fd: DescFile, msgIndexes: number[]): DescMessage {
442+
toMessageDescFromIndexes(fd: DescFile, msgIndexes: number[]): DescMessage {
425443
let index = msgIndexes[0]
426444
if (msgIndexes.length === 1) {
427445
return fd.messages[index]
@@ -438,6 +456,27 @@ export class ProtobufDeserializer extends Deserializer implements ProtobufSerde
438456
}
439457
}
440458

459+
function newFileRegistry(fileDesc: FileDescriptorProto, deps: Map<string, string>): FileRegistry {
460+
const resolve = (depName: string) => {
461+
if (isBuiltin(depName)) {
462+
const dep = builtinDeps.get(depName)
463+
if (dep == null) {
464+
throw new SerializationError(`dependency ${depName} not found`)
465+
}
466+
return dep
467+
} else {
468+
const dep = deps.get(depName)
469+
if (dep == null) {
470+
throw new SerializationError(`dependency ${depName} not found`)
471+
}
472+
const fileDesc = fromBinary(FileDescriptorProtoSchema, Buffer.from(dep, 'base64'))
473+
fileDesc.name = depName
474+
return fileDesc
475+
}
476+
}
477+
return createFileRegistry(fileDesc, resolve)
478+
}
479+
441480
async function transform(ctx: RuleContext, descriptor: DescMessage, msg: any, fieldTransform: FieldTransform): Promise<any> {
442481
if (msg == null || descriptor == null) {
443482
return msg
@@ -470,7 +509,7 @@ async function transform(ctx: RuleContext, descriptor: DescMessage, msg: any, fi
470509
}
471510

472511
async function transformField(ctx: RuleContext, fd: DescField, desc: DescMessage,
473-
msg: any, fieldTransform: FieldTransform) {
512+
msg: any, fieldTransform: FieldTransform) {
474513
try {
475514
ctx.enterField(
476515
msg,

0 commit comments

Comments
 (0)