Skip to content

Commit f23e8f2

Browse files
authored
Support for schema id in header (#303)
* DGS-20366 Support schema ID in header * Use id serdes * Minor cleanup * Add test * Minor renaming * Minor fix * Add tests * Fix get id with response
1 parent 53b72d3 commit f23e8f2

15 files changed

+617
-105
lines changed

package-lock.json

Lines changed: 15 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

schemaregistry/e2e/schemaregistry-client.spec.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ describe('SchemaRegistryClient Integration Test', () => {
129129

130130
const schemaMetadata: SchemaMetadata = {
131131
id: registerResponse?.id!,
132+
guid: registerResponse?.guid!,
132133
version: schemaVersion,
133134
schema: schemaInfo.schema,
134135
subject: testSubject,
@@ -137,6 +138,7 @@ describe('SchemaRegistryClient Integration Test', () => {
137138

138139
const schemaMetadata2: SchemaMetadata = {
139140
id: registerResponse2?.id!,
141+
guid: registerResponse2?.guid!,
140142
version: registerResponse2?.version!,
141143
schema: backwardCompatibleSchemaInfo.schema,
142144
subject: testSubject,

schemaregistry/mock-schemaregistry-client.ts

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import {
88
ServerConfig
99
} from './schemaregistry-client';
1010
import stringify from "json-stringify-deterministic";
11+
import {v4} from "uuid";
1112
import {ClientConfig} from "./rest-service";
1213
import {RestError} from "./rest-error";
14+
import {SchemaId} from "./serde/serde";
1315

1416
interface VersionCacheEntry {
1517
version: number;
@@ -45,6 +47,7 @@ class MockClient implements Client {
4547
private clientConfig?: ClientConfig;
4648
private infoToSchemaCache: Map<string, MetadataCacheEntry>;
4749
private idToSchemaCache: Map<string, InfoCacheEntry>;
50+
private guidToSchemaCache: Map<string, InfoCacheEntry>;
4851
private schemaToVersionCache: Map<string, VersionCacheEntry>;
4952
private configCache: Map<string, ServerConfig>;
5053
private counter: Counter;
@@ -53,6 +56,7 @@ class MockClient implements Client {
5356
this.clientConfig = config
5457
this.infoToSchemaCache = new Map();
5558
this.idToSchemaCache = new Map();
59+
this.guidToSchemaCache = new Map();
5660
this.schemaToVersionCache = new Map();
5761
this.configCache = new Map();
5862
this.counter = new Counter();
@@ -67,7 +71,7 @@ class MockClient implements Client {
6771
if (!metadata) {
6872
throw new RestError("Failed to register schema", 422, 42200);
6973
}
70-
return metadata.id;
74+
return metadata.id!;
7175
}
7276

7377
async registerFullResponse(subject: string, schema: SchemaInfo, normalize: boolean = false): Promise<SchemaMetadata> {
@@ -78,20 +82,19 @@ class MockClient implements Client {
7882
return cacheEntry.metadata;
7983
}
8084

81-
const id = await this.getIDFromRegistry(subject, schema);
82-
if (id === -1) {
85+
const schemaId = await this.getIDFromRegistry(subject, schema);
86+
if (schemaId.id === -1) {
8387
throw new RestError("Failed to retrieve schema ID from registry", 422, 42200);
8488
}
8589

86-
const metadata: SchemaMetadata = { ...schema, id };
90+
const metadata: SchemaMetadata = { id: schemaId.id!, guid: schemaId.guid!, ...schema };
8791
this.infoToSchemaCache.set(cacheKey, { metadata, softDeleted: false });
8892

8993
return metadata;
9094
}
9195

92-
private async getIDFromRegistry(subject: string, schema: SchemaInfo): Promise<number> {
96+
private async getIDFromRegistry(subject: string, schema: SchemaInfo): Promise<SchemaId> {
9397
let id = -1;
94-
9598
for (const [key, value] of this.idToSchemaCache.entries()) {
9699
const parsedKey = JSON.parse(key);
97100
if (parsedKey.subject === subject && this.schemasEqual(value.info, schema)) {
@@ -100,14 +103,24 @@ class MockClient implements Client {
100103
}
101104
}
102105

106+
let guid = "";
107+
for (const [key, value] of this.guidToSchemaCache.entries()) {
108+
if (this.schemasEqual(value.info, schema)) {
109+
guid = key;
110+
break;
111+
}
112+
}
113+
103114
await this.generateVersion(subject, schema);
104115
if (id < 0) {
105116
id = this.counter.increment();
106117
const idCacheKey = stringify({ subject, id });
107118
this.idToSchemaCache.set(idCacheKey, { info: schema, softDeleted: false });
119+
guid = v4()
120+
this.guidToSchemaCache.set(guid, { info: schema, softDeleted: false });
108121
}
109122

110-
return id;
123+
return new SchemaId("", id, guid);
111124
}
112125

113126
private async generateVersion(subject: string, schema: SchemaInfo): Promise<void> {
@@ -134,13 +147,27 @@ class MockClient implements Client {
134147
return cacheEntry.info;
135148
}
136149

150+
async getByGuid(guid: string, format?: string): Promise<SchemaInfo> {
151+
const cacheEntry = this.guidToSchemaCache.get(guid);
152+
153+
if (!cacheEntry || cacheEntry.softDeleted) {
154+
throw new RestError("Schema not found", 404, 40400);
155+
}
156+
return cacheEntry.info;
157+
}
158+
137159
async getId(subject: string, schema: SchemaInfo): Promise<number> {
160+
const metadata = await this.getIdFullResponse(subject, schema);
161+
return metadata.id!;
162+
}
163+
164+
async getIdFullResponse(subject: string, schema: SchemaInfo): Promise<SchemaMetadata> {
138165
const cacheKey = stringify({ subject, schema: minimize(schema) });
139166
const cacheEntry = this.infoToSchemaCache.get(cacheKey);
140167
if (!cacheEntry || cacheEntry.softDeleted) {
141168
throw new RestError("Schema not found", 404, 40400);
142169
}
143-
return cacheEntry.metadata.id;
170+
return cacheEntry.metadata;
144171
}
145172

146173
async getLatestSchemaMetadata(subject: string, format?: string): Promise<SchemaMetadata> {
@@ -158,6 +185,7 @@ class MockClient implements Client {
158185
const parsedKey = JSON.parse(key);
159186
if (parsedKey.subject === subject && value.version === version) {
160187
json = parsedKey;
188+
break
161189
}
162190
}
163191

@@ -170,14 +198,26 @@ class MockClient implements Client {
170198
const parsedKey = JSON.parse(key);
171199
if (parsedKey.subject === subject && value.info.schema === json.schema.schema) {
172200
id = parsedKey.id;
201+
break
173202
}
174203
}
175204
if (id === -1) {
176205
throw new RestError("Schema not found", 404, 40400);
177206
}
207+
let guid: string = "";
208+
for (const [key, value] of this.guidToSchemaCache.entries()) {
209+
if (value.info.schema === json.schema.schema) {
210+
guid = key
211+
break
212+
}
213+
}
214+
if (guid === "") {
215+
throw new RestError("Schema not found", 404, 40400);
216+
}
178217

179218
return {
180219
id,
220+
guid,
181221
version,
182222
subject,
183223
...json.schema,
@@ -226,6 +266,7 @@ class MockClient implements Client {
226266
const parsedKey = JSON.parse(key);
227267
if (parsedKey.subject === subject && value.info.schema === latest.schema) {
228268
id = parsedKey.id;
269+
break
229270
}
230271
}
231272
if (id === -1) {

schemaregistry/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
"node-gyp": "^9.3.1",
2727
"ts-jest": "^29.2.4",
2828
"typescript": "^5.5.4",
29-
"typescript-eslint": "^8.2.0",
30-
"uuid": "^10.0.0"
29+
"typescript-eslint": "^8.2.0"
3130
},
3231
"dependencies": {
3332
"@aws-sdk/client-kms": "^3.637.0",
@@ -53,6 +52,7 @@
5352
"lru-cache": "^10.4.3",
5453
"node-vault": "^0.10.2",
5554
"simple-oauth2": "^5.1.0",
55+
"uuid": "^11.0.0",
5656
"validator": "^13.12.0"
5757
},
5858
"scripts": {

schemaregistry/rules/encryption/encrypt-executor.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import {
33
FieldRuleExecutor,
44
FieldTransform,
55
FieldType,
6-
MAGIC_BYTE,
6+
MAGIC_BYTE_V0,
77
RuleContext,
88
RuleError,
99
} from "../../serde/serde";
@@ -538,15 +538,15 @@ export class FieldEncryptionExecutorTransform implements FieldTransform {
538538
prefixVersion(version: number, ciphertext: Buffer): Buffer {
539539
const versionBuf = Buffer.alloc(4)
540540
versionBuf.writeInt32BE(version)
541-
return Buffer.concat([MAGIC_BYTE, versionBuf, ciphertext])
541+
return Buffer.concat([MAGIC_BYTE_V0, versionBuf, ciphertext])
542542
}
543543

544544
extractVersion(ciphertext: Buffer): number | null {
545545
let magicByte = ciphertext.subarray(0, 1)
546-
if (!magicByte.equals(MAGIC_BYTE)) {
546+
if (!magicByte.equals(MAGIC_BYTE_V0)) {
547547
throw new RuleError(
548548
`Message encoded with magic byte ${JSON.stringify(magicByte)}, expected ${JSON.stringify(
549-
MAGIC_BYTE,
549+
MAGIC_BYTE_V0,
550550
)}`,
551551
)
552552
}

0 commit comments

Comments
 (0)