diff --git a/schemaregistry/serde/serde.ts b/schemaregistry/serde/serde.ts index c210d86d..79abca4b 100644 --- a/schemaregistry/serde/serde.ts +++ b/schemaregistry/serde/serde.ts @@ -11,6 +11,8 @@ import {RuleRegistry} from "./rule-registry"; import {ClientConfig} from "../rest-service"; import {BufferWrapper, MAX_VARINT_LEN_64} from "./buffer-wrapper"; import {IHeaders} from "../../types/kafkajs"; +import {fromBinary} from "@bufbuild/protobuf"; +import {FileDescriptorProtoSchema} from "@bufbuild/protobuf/wkt"; export enum SerdeType { KEY = 'KEY', @@ -662,6 +664,112 @@ export type SubjectNameStrategyFunc = ( schema?: SchemaInfo, ) => string +/** + * Extract record name from an Avro schema + * @param schemaString - the Avro schema string + * @returns the fully qualified record name {namespace}.{name} or {name} if namespace is not present + * @throws SerializationError if schema is invalid or name is missing + */ +function getAvroRecordName(schemaString: string): string { + try { + const avroSchema = JSON.parse(schemaString) + if (!avroSchema.name) { + throw new SerializationError('Avro schema is missing required "name" field') + } + // Return fully qualified name if namespace exists + if (avroSchema.namespace) { + return `${avroSchema.namespace}.${avroSchema.name}` + } + return avroSchema.name + } catch (e) { + if (e instanceof SerializationError) { + throw e + } + throw new SerializationError(`Invalid Avro schema: ${e instanceof Error ? e.message : String(e)}`) + } +} + +/** + * Extract record name from a Protobuf schema + * @param schemaString - the Protobuf schema string (base64-encoded) + * @returns the fully qualified record name {package}.{messageName}, or {messageName} if package is not present + * @throws SerializationError if schema is invalid or has no message types + */ +function getProtobufRecordName(schemaString: string): string { + try { + // Protobuf schemas are stored as base64-encoded FileDescriptorProto + const fileDesc = fromBinary(FileDescriptorProtoSchema, Buffer.from(schemaString, 'base64')) + + // Get first message name + if (!fileDesc.messageType || fileDesc.messageType.length === 0) { + throw new SerializationError('Protobuf schema has no message types defined') + } + + const messageName = fileDesc.messageType[0].name + if (!messageName) { + throw new SerializationError('Protobuf message type is missing "name" field') + } + + // Get package name + const packageName = fileDesc.package || null + + // Return fully qualified name if package exists + return packageName ? `${packageName}.${messageName}` : messageName + } catch (e) { + if (e instanceof SerializationError) { + throw e + } + throw new SerializationError(`Invalid Protobuf schema: ${e instanceof Error ? e.message : String(e)}`) + } +} + +/** + * Extract record name from a JSON schema + * @param schemaString - the JSON schema string + * @returns the title field value + * @throws SerializationError if schema is invalid or title is missing + */ +function getJsonRecordName(schemaString: string): string { + try { + const jsonSchema = JSON.parse(schemaString) + if (!jsonSchema.title) { + throw new SerializationError('JSON schema is missing required "title" field') + } + // JSON Schema uses "title" as the record name + return jsonSchema.title + } catch (e) { + if (e instanceof SerializationError) { + throw e + } + throw new SerializationError(`Invalid JSON schema: ${e instanceof Error ? e.message : String(e)}`) + } +} + +/** + * Helper function to extract the record name from a schema + * @param schema - the schema info (schema and its associated information) + * @returns the record name + * @throws SerializationError if schema is invalid, missing, or lacks required fields + */ +function getRecordName(schema?: SchemaInfo): string { + if (!schema || !schema.schema) { + throw new SerializationError('Schema is required but was not provided') + } + + const schemaType = schema.schemaType?.toUpperCase() || 'AVRO' + + switch (schemaType) { + case 'AVRO': + return getAvroRecordName(schema.schema) + case 'PROTOBUF': + return getProtobufRecordName(schema.schema) + case 'JSON': + return getJsonRecordName(schema.schema) + default: + throw new SerializationError(`Unsupported schema type: ${schemaType}`) + } +} + /** * TopicNameStrategy creates a subject name by appending -[key|value] to the topic name. * @param topic - the topic name @@ -675,6 +783,22 @@ export const TopicNameStrategy: SubjectNameStrategyFunc = (topic: string, serdeT return topic + suffix } +/** + * RecordNameStrategy creates a subject name using only the record name. + * This allows schemas to be shared across topics, with compatibility per record type. + * @param _topic - the topic name + * @param _serdeType - the serde type + * @param schema - the schema info (used to extract record name) + * @throws SerializationError if schema is invalid or record name cannot be determined + */ +export const RecordNameStrategy: SubjectNameStrategyFunc = ( + _topic: string, + _serdeType: SerdeType, + schema?: SchemaInfo +) => { + return getRecordName(schema) +} + /** * SchemaIdSerializerFunc serializes a schema ID/GUID */ @@ -1013,7 +1137,7 @@ export class ErrorAction implements RuleAction { } async run(ctx: RuleContext, msg: any, err: Error): Promise { - throw new SerializationError(`rule ${ctx.rule.name} failed: ${err.message}`) + throw new SerializationError(err.message) } close(): void { diff --git a/schemaregistry/test/serde/serde.spec.ts b/schemaregistry/test/serde/serde.spec.ts index bd97b141..052fdc74 100644 --- a/schemaregistry/test/serde/serde.spec.ts +++ b/schemaregistry/test/serde/serde.spec.ts @@ -1,5 +1,8 @@ import { describe, expect, it } from '@jest/globals'; -import {SchemaId} from "../../serde/serde"; +import {RecordNameStrategy, SchemaId, SerdeType, TopicNameStrategy} from "../../serde/serde"; +import {SchemaInfo} from "../../schemaregistry-client"; +import {create, toBinary} from "@bufbuild/protobuf"; +import {FileDescriptorProtoSchema} from "@bufbuild/protobuf/wkt"; describe('SchemaGuid', () => { it('schema guid', () => { @@ -67,3 +70,195 @@ describe('SchemaGuid', () => { } }) }) + +describe('TopicNameStrategy', () => { + it('should create subject with -value suffix', () => { + const subject = TopicNameStrategy('test-topic', SerdeType.VALUE) + expect(subject).toBe('test-topic-value') + }) + + it('should create subject with -key suffix', () => { + const subject = TopicNameStrategy('test-topic', SerdeType.KEY) + expect(subject).toBe('test-topic-key') + }) +}) + +describe('RecordNameStrategy', () => { + describe('Avro schema', () => { + it('should extract record name with namespace', () => { + const schema: SchemaInfo = { + schema: JSON.stringify({ + type: 'record', + name: 'User', + namespace: 'com.example', + fields: [ + { name: 'name', type: 'string' }, + { name: 'age', type: 'int' } + ] + }), + schemaType: 'AVRO' + } + + const subject = RecordNameStrategy('test-topic', SerdeType.VALUE, schema) + expect(subject).toBe('com.example.User') + }) + + it('should extract record name without namespace', () => { + const schema: SchemaInfo = { + schema: JSON.stringify({ + type: 'record', + name: 'User', + fields: [ + { name: 'name', type: 'string' } + ] + }), + schemaType: 'AVRO' + } + + const subject = RecordNameStrategy('test-topic', SerdeType.VALUE, schema) + expect(subject).toBe('User') + }) + + it('should throw on invalid Avro schema', () => { + const schema: SchemaInfo = { + schema: 'invalid json', + schemaType: 'AVRO' + } + + expect(() => { + RecordNameStrategy('test-topic', SerdeType.VALUE, schema) + }).toThrow('Invalid Avro schema') + }) + }) + + describe('JSON schema', () => { + it('should extract title from JSON schema', () => { + const schema: SchemaInfo = { + schema: JSON.stringify({ + title: 'com.example.User', + type: 'object', + properties: { + name: { type: 'string' }, + age: { type: 'number' } + } + }), + schemaType: 'JSON' + } + + const subject = RecordNameStrategy('test-topic', SerdeType.VALUE, schema) + expect(subject).toBe('com.example.User') + }) + + it('should throw on JSON schema without title', () => { + const schema: SchemaInfo = { + schema: JSON.stringify({ + type: 'object', + properties: { + name: { type: 'string' } + } + }), + schemaType: 'JSON' + } + + expect(() => { + RecordNameStrategy('test-topic', SerdeType.VALUE, schema) + }).toThrow('JSON schema is missing required "title" field') + }) + }) + + describe('Protobuf schema', () => { + it('should extract message name with package', () => { + // Create a simple FileDescriptorProto + const fileDescriptor = create(FileDescriptorProtoSchema, { + name: 'test.proto', + package: 'com.example', + messageType: [ + { name: 'User' } + ] + }) + + const base64Schema = Buffer.from( + toBinary(FileDescriptorProtoSchema, fileDescriptor) + ).toString('base64') + + const schema: SchemaInfo = { + schema: base64Schema, + schemaType: 'PROTOBUF' + } + + const subject = RecordNameStrategy('test-topic', SerdeType.VALUE, schema) + expect(subject).toBe('com.example.User') + }) + + it('should extract message name without package', () => { + const fileDescriptor = create(FileDescriptorProtoSchema, { + name: 'test.proto', + messageType: [ + { name: 'User' } + ] + }) + + const base64Schema = Buffer.from( + toBinary(FileDescriptorProtoSchema, fileDescriptor) + ).toString('base64') + + const schema: SchemaInfo = { + schema: base64Schema, + schemaType: 'PROTOBUF' + } + + const subject = RecordNameStrategy('test-topic', SerdeType.VALUE, schema) + expect(subject).toBe('User') + }) + + it('should throw on Protobuf schema without messages', () => { + const fileDescriptor = create(FileDescriptorProtoSchema, { + name: 'test.proto', + package: 'com.example' + }) + + const base64Schema = Buffer.from( + toBinary(FileDescriptorProtoSchema, fileDescriptor) + ).toString('base64') + + const schema: SchemaInfo = { + schema: base64Schema, + schemaType: 'PROTOBUF' + } + + expect(() => { + RecordNameStrategy('test-topic', SerdeType.VALUE, schema) + }).toThrow('Protobuf schema has no message types defined') + }) + + it('should throw on invalid base64 Protobuf schema', () => { + const schema: SchemaInfo = { + schema: 'invalid-base64!!!', + schemaType: 'PROTOBUF' + } + + expect(() => { + RecordNameStrategy('test-topic', SerdeType.VALUE, schema) + }).toThrow('Invalid Protobuf schema') + }) + }) + + describe('edge cases', () => { + it('should throw when schema is undefined', () => { + expect(() => { + RecordNameStrategy('test-topic', SerdeType.VALUE, undefined) + }).toThrow('Schema is required but was not provided') + }) + + it('should throw when schema.schema is empty', () => { + const schema: SchemaInfo = { + schema: '', + schemaType: 'AVRO' + } + + expect(() => { + RecordNameStrategy('test-topic', SerdeType.VALUE, schema) + }).toThrow('Schema is required but was not provided') + }) + }) +})