Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 125 additions & 1 deletion schemaregistry/serde/serde.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {RuleRegistry} from "./rule-registry";
import {ClientConfig} from "../rest-service";
import {BufferWrapper, MAX_VARINT_LEN_64} from "./buffer-wrapper";
import {IHeaders} from "../../types/kafkajs";
import {fromBinary} from "@bufbuild/protobuf";
import {FileDescriptorProtoSchema} from "@bufbuild/protobuf/wkt";

export enum SerdeType {
KEY = 'KEY',
Expand Down Expand Up @@ -662,6 +664,112 @@ export type SubjectNameStrategyFunc = (
schema?: SchemaInfo,
) => string

/**
* Extract record name from an Avro schema
* @param schemaString - the Avro schema string
* @returns the fully qualified record name {namespace}.{name} or {name} if namespace is not present
* @throws SerializationError if schema is invalid or name is missing
*/
function getAvroRecordName(schemaString: string): string {
try {
const avroSchema = JSON.parse(schemaString)
if (!avroSchema.name) {
throw new SerializationError('Avro schema is missing required "name" field')
}
// Return fully qualified name if namespace exists
if (avroSchema.namespace) {
return `${avroSchema.namespace}.${avroSchema.name}`
}
return avroSchema.name
} catch (e) {
if (e instanceof SerializationError) {
throw e
}
throw new SerializationError(`Invalid Avro schema: ${e instanceof Error ? e.message : String(e)}`)
}
}

/**
* Extract record name from a Protobuf schema
* @param schemaString - the Protobuf schema string (base64-encoded)
* @returns the fully qualified record name {package}.{messageName}, or {messageName} if package is not present
* @throws SerializationError if schema is invalid or has no message types
*/
function getProtobufRecordName(schemaString: string): string {
try {
// Protobuf schemas are stored as base64-encoded FileDescriptorProto
const fileDesc = fromBinary(FileDescriptorProtoSchema, Buffer.from(schemaString, 'base64'))

// Get first message name
if (!fileDesc.messageType || fileDesc.messageType.length === 0) {
throw new SerializationError('Protobuf schema has no message types defined')
}

const messageName = fileDesc.messageType[0].name
if (!messageName) {
throw new SerializationError('Protobuf message type is missing "name" field')
}

// Get package name
const packageName = fileDesc.package || null
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assignment || null is unnecessary since fileDesc.package is already optional. The ternary on line 717 already handles falsy values correctly. Consider simplifying to const packageName = fileDesc.package.

Suggested change
const packageName = fileDesc.package || null
const packageName = fileDesc.package

Copilot uses AI. Check for mistakes.

// Return fully qualified name if package exists
return packageName ? `${packageName}.${messageName}` : messageName
} catch (e) {
if (e instanceof SerializationError) {
throw e
}
throw new SerializationError(`Invalid Protobuf schema: ${e instanceof Error ? e.message : String(e)}`)
}
}

/**
* Extract record name from a JSON schema
* @param schemaString - the JSON schema string
* @returns the title field value
* @throws SerializationError if schema is invalid or title is missing
*/
function getJsonRecordName(schemaString: string): string {
try {
const jsonSchema = JSON.parse(schemaString)
if (!jsonSchema.title) {
throw new SerializationError('JSON schema is missing required "title" field')
}
// JSON Schema uses "title" as the record name
return jsonSchema.title
} catch (e) {
if (e instanceof SerializationError) {
throw e
}
throw new SerializationError(`Invalid JSON schema: ${e instanceof Error ? e.message : String(e)}`)
}
}

/**
* Helper function to extract the record name from a schema
* @param schema - the schema info (schema and its associated information)
* @returns the record name
* @throws SerializationError if schema is invalid, missing, or lacks required fields
*/
function getRecordName(schema?: SchemaInfo): string {
if (!schema || !schema.schema) {
throw new SerializationError('Schema is required but was not provided')
}

const schemaType = schema.schemaType?.toUpperCase() || 'AVRO'

switch (schemaType) {
case 'AVRO':
return getAvroRecordName(schema.schema)
case 'PROTOBUF':
return getProtobufRecordName(schema.schema)
case 'JSON':
return getJsonRecordName(schema.schema)
default:
throw new SerializationError(`Unsupported schema type: ${schemaType}`)
}
}

/**
* TopicNameStrategy creates a subject name by appending -[key|value] to the topic name.
* @param topic - the topic name
Expand All @@ -675,6 +783,22 @@ export const TopicNameStrategy: SubjectNameStrategyFunc = (topic: string, serdeT
return topic + suffix
}

/**
* RecordNameStrategy creates a subject name using only the record name.
* This allows schemas to be shared across topics, with compatibility per record type.
* @param _topic - the topic name
* @param _serdeType - the serde type
* @param schema - the schema info (used to extract record name)
* @throws SerializationError if schema is invalid or record name cannot be determined
*/
export const RecordNameStrategy: SubjectNameStrategyFunc = (
_topic: string,
_serdeType: SerdeType,
schema?: SchemaInfo
) => {
return getRecordName(schema)
}

/**
* SchemaIdSerializerFunc serializes a schema ID/GUID
*/
Expand Down Expand Up @@ -1013,7 +1137,7 @@ export class ErrorAction implements RuleAction {
}

async run(ctx: RuleContext, msg: any, err: Error): Promise<void> {
throw new SerializationError(`rule ${ctx.rule.name} failed: ${err.message}`)
throw new SerializationError(err.message)
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the rule context information (rule ${ctx.rule.name} failed:) makes error messages less informative for debugging. This change reduces the ability to identify which rule caused the failure. Consider preserving the rule name in the error message.

Suggested change
throw new SerializationError(err.message)
throw new SerializationError(`rule ${ctx.rule.name} failed: ${err.message}`)

Copilot uses AI. Check for mistakes.
}

close(): void {
Expand Down
197 changes: 196 additions & 1 deletion schemaregistry/test/serde/serde.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { describe, expect, it } from '@jest/globals';
import {SchemaId} from "../../serde/serde";
import {RecordNameStrategy, SchemaId, SerdeType, TopicNameStrategy} from "../../serde/serde";
import {SchemaInfo} from "../../schemaregistry-client";
import {create, toBinary} from "@bufbuild/protobuf";
import {FileDescriptorProtoSchema} from "@bufbuild/protobuf/wkt";

describe('SchemaGuid', () => {
it('schema guid', () => {
Expand Down Expand Up @@ -67,3 +70,195 @@ describe('SchemaGuid', () => {
}
})
})

describe('TopicNameStrategy', () => {
it('should create subject with -value suffix', () => {
const subject = TopicNameStrategy('test-topic', SerdeType.VALUE)
expect(subject).toBe('test-topic-value')
})

it('should create subject with -key suffix', () => {
const subject = TopicNameStrategy('test-topic', SerdeType.KEY)
expect(subject).toBe('test-topic-key')
})
})

describe('RecordNameStrategy', () => {
describe('Avro schema', () => {
it('should extract record name with namespace', () => {
const schema: SchemaInfo = {
schema: JSON.stringify({
type: 'record',
name: 'User',
namespace: 'com.example',
fields: [
{ name: 'name', type: 'string' },
{ name: 'age', type: 'int' }
]
}),
schemaType: 'AVRO'
}

const subject = RecordNameStrategy('test-topic', SerdeType.VALUE, schema)
expect(subject).toBe('com.example.User')
})

it('should extract record name without namespace', () => {
const schema: SchemaInfo = {
schema: JSON.stringify({
type: 'record',
name: 'User',
fields: [
{ name: 'name', type: 'string' }
]
}),
schemaType: 'AVRO'
}

const subject = RecordNameStrategy('test-topic', SerdeType.VALUE, schema)
expect(subject).toBe('User')
})

it('should throw on invalid Avro schema', () => {
const schema: SchemaInfo = {
schema: 'invalid json',
schemaType: 'AVRO'
}

expect(() => {
RecordNameStrategy('test-topic', SerdeType.VALUE, schema)
}).toThrow('Invalid Avro schema')
})
})

describe('JSON schema', () => {
it('should extract title from JSON schema', () => {
const schema: SchemaInfo = {
schema: JSON.stringify({
title: 'com.example.User',
type: 'object',
properties: {
name: { type: 'string' },
age: { type: 'number' }
}
}),
schemaType: 'JSON'
}

const subject = RecordNameStrategy('test-topic', SerdeType.VALUE, schema)
expect(subject).toBe('com.example.User')
})

it('should throw on JSON schema without title', () => {
const schema: SchemaInfo = {
schema: JSON.stringify({
type: 'object',
properties: {
name: { type: 'string' }
}
}),
schemaType: 'JSON'
}

expect(() => {
RecordNameStrategy('test-topic', SerdeType.VALUE, schema)
}).toThrow('JSON schema is missing required "title" field')
})
})

describe('Protobuf schema', () => {
it('should extract message name with package', () => {
// Create a simple FileDescriptorProto
const fileDescriptor = create(FileDescriptorProtoSchema, {
name: 'test.proto',
package: 'com.example',
messageType: [
{ name: 'User' }
]
})

const base64Schema = Buffer.from(
toBinary(FileDescriptorProtoSchema, fileDescriptor)
).toString('base64')

const schema: SchemaInfo = {
schema: base64Schema,
schemaType: 'PROTOBUF'
}

const subject = RecordNameStrategy('test-topic', SerdeType.VALUE, schema)
expect(subject).toBe('com.example.User')
})

it('should extract message name without package', () => {
const fileDescriptor = create(FileDescriptorProtoSchema, {
name: 'test.proto',
messageType: [
{ name: 'User' }
]
})

const base64Schema = Buffer.from(
toBinary(FileDescriptorProtoSchema, fileDescriptor)
).toString('base64')

const schema: SchemaInfo = {
schema: base64Schema,
schemaType: 'PROTOBUF'
}

const subject = RecordNameStrategy('test-topic', SerdeType.VALUE, schema)
expect(subject).toBe('User')
})

it('should throw on Protobuf schema without messages', () => {
const fileDescriptor = create(FileDescriptorProtoSchema, {
name: 'test.proto',
package: 'com.example'
})

const base64Schema = Buffer.from(
toBinary(FileDescriptorProtoSchema, fileDescriptor)
).toString('base64')

const schema: SchemaInfo = {
schema: base64Schema,
schemaType: 'PROTOBUF'
}

expect(() => {
RecordNameStrategy('test-topic', SerdeType.VALUE, schema)
}).toThrow('Protobuf schema has no message types defined')
})

it('should throw on invalid base64 Protobuf schema', () => {
const schema: SchemaInfo = {
schema: 'invalid-base64!!!',
schemaType: 'PROTOBUF'
}

expect(() => {
RecordNameStrategy('test-topic', SerdeType.VALUE, schema)
}).toThrow('Invalid Protobuf schema')
})
})

describe('edge cases', () => {
it('should throw when schema is undefined', () => {
expect(() => {
RecordNameStrategy('test-topic', SerdeType.VALUE, undefined)
}).toThrow('Schema is required but was not provided')
})

it('should throw when schema.schema is empty', () => {
const schema: SchemaInfo = {
schema: '',
schemaType: 'AVRO'
}

expect(() => {
RecordNameStrategy('test-topic', SerdeType.VALUE, schema)
}).toThrow('Schema is required but was not provided')
})
})
})