Skip to content

Commit b9be537

Browse files
bugfix integ tests for registering -value (#71) (#115)
* bugfix integ tests for registering -value * Add autoRegister and useLatestVersion tests
1 parent e627f67 commit b9be537

File tree

3 files changed

+55
-85
lines changed

3 files changed

+55
-85
lines changed

e2e/schemaregistry/schemaregistry-avro.spec.ts

Lines changed: 20 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@ import stringify from 'json-stringify-deterministic';
1212
import { v4 } from 'uuid';
1313

1414
let schemaRegistryClient: SchemaRegistryClient;
15-
let producer: any;
15+
let serializerConfig: AvroSerializerConfig;
16+
let serializer: AvroSerializer;
17+
let deserializer: AvroDeserializer;
18+
let producer: KafkaJS.Producer;
19+
let consumer: KafkaJS.Consumer;
1620

1721
const kafkaBrokerList = 'localhost:9092';
1822
const kafka = new KafkaJS.Kafka({
@@ -48,11 +52,6 @@ const schemaInfo: SchemaInfo = {
4852
metadata: metadata
4953
};
5054

51-
let serializerConfig: AvroSerializerConfig;
52-
let serializer: AvroSerializer;
53-
let deserializer: AvroDeserializer;
54-
let consumer: KafkaJS.Consumer;
55-
5655
describe('Schema Registry Avro Integration Test', () => {
5756

5857
beforeEach(async () => {
@@ -78,15 +77,14 @@ describe('Schema Registry Avro Integration Test', () => {
7877

7978
afterEach(async () => {
8079
await producer.disconnect();
81-
producer = null;
8280
});
8381

8482
it("Should serialize and deserialize Avro", async () => {
85-
const testTopic = 'test-topic-' + v4();
83+
const testTopic = v4();
8684

87-
await schemaRegistryClient.register(testTopic, schemaInfo);
85+
await schemaRegistryClient.register(testTopic + "-value", schemaInfo);
8886

89-
serializerConfig = { autoRegisterSchemas: true };
87+
serializerConfig = { useLatestVersion: true };
9088
serializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig);
9189
deserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {});
9290

@@ -123,28 +121,8 @@ describe('Schema Registry Avro Integration Test', () => {
123121
await consumer.disconnect();
124122
}, 30000);
125123

126-
it("Should serialize with UseLatestVersion enabled", async () => {
124+
it('Should fail to serialize with useLatestVersion enabled and autoRegisterSchemas disabled', async () => {
127125
const testTopic = v4();
128-
await schemaRegistryClient.register(testTopic, schemaInfo);
129-
130-
serializerConfig = { autoRegisterSchemas: true, useLatestVersion: true };
131-
serializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig);
132-
133-
const outgoingMessage = {
134-
key: 'key',
135-
value: await serializer.serialize(testTopic, messageValue)
136-
};
137-
138-
await producer.send({
139-
topic: testTopic,
140-
messages: [outgoingMessage]
141-
});
142-
143-
}, 30000);
144-
145-
it('Should fail to serialize with UseLatestVersion enabled and autoRegisterSchemas disabled', async () => {
146-
const testTopic = v4();
147-
await schemaRegistryClient.register(testTopic, schemaInfo);
148126

149127
serializerConfig = { autoRegisterSchemas: false, useLatestVersion: true };
150128
serializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig);
@@ -154,12 +132,11 @@ describe('Schema Registry Avro Integration Test', () => {
154132
await expect(serializer.serialize(testTopic, messageValue)).rejects.toThrowError();
155133
});
156134

157-
it('Should serialize with schemas registered, UseLatestVersion enabled and autoRegisterSchemas disabled', async () => {
135+
it('Should serialize with autoRegisterSchemas enabled and useLatestVersion disabled', async () => {
158136
const testTopic = v4();
159-
await schemaRegistryClient.register(testTopic, schemaInfo);
160-
await schemaRegistryClient.register(testTopic+'-value', schemaInfo);
137+
await schemaRegistryClient.register(testTopic +' -value', schemaInfo);
161138

162-
serializerConfig = { autoRegisterSchemas: false, useLatestVersion: true };
139+
serializerConfig = { autoRegisterSchemas: true, useLatestVersion: false };
163140
serializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig);
164141

165142
const messageValue = { "name": "Bob Jones", "age": 25 };
@@ -257,7 +234,7 @@ describe('Schema Registry Avro Integration Test - Primitives', () => {
257234
}
258235
});
259236
await producer.connect();
260-
serializerConfig = { autoRegisterSchemas: true };
237+
serializerConfig = { useLatestVersion: true };
261238

262239
serializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig);
263240
deserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {});
@@ -272,7 +249,6 @@ describe('Schema Registry Avro Integration Test - Primitives', () => {
272249

273250
afterEach(async () => {
274251
await producer.disconnect();
275-
producer = null;
276252
});
277253

278254
it('Should serialize and deserialize string', async () => {
@@ -287,7 +263,7 @@ describe('Schema Registry Avro Integration Test - Primitives', () => {
287263
metadata: metadata
288264
};
289265

290-
await schemaRegistryClient.register(stringTopic, stringSchemaInfo);
266+
await schemaRegistryClient.register(stringTopic + "-value", stringSchemaInfo);
291267

292268
const stringMessageValue = "Hello, World!";
293269
const outgoingStringMessage = {
@@ -335,7 +311,7 @@ describe('Schema Registry Avro Integration Test - Primitives', () => {
335311
metadata: metadata
336312
};
337313

338-
await schemaRegistryClient.register(topic, stringSchemaInfo);
314+
await schemaRegistryClient.register(topic + "-value", stringSchemaInfo);
339315

340316
const messageValue = Buffer.from("Hello, World!");
341317
const outgoingMessage = {
@@ -383,7 +359,7 @@ describe('Schema Registry Avro Integration Test - Primitives', () => {
383359
metadata: metadata
384360
};
385361

386-
await schemaRegistryClient.register(topic, stringSchemaInfo);
362+
await schemaRegistryClient.register(topic + "-value", stringSchemaInfo);
387363

388364
const messageValue = 25;
389365
const outgoingMessage = {
@@ -431,7 +407,7 @@ describe('Schema Registry Avro Integration Test - Primitives', () => {
431407
metadata: metadata
432408
};
433409

434-
await schemaRegistryClient.register(topic, stringSchemaInfo);
410+
await schemaRegistryClient.register(topic + "-value", stringSchemaInfo);
435411

436412
const messageValue = 25;
437413
const outgoingMessage = {
@@ -479,7 +455,7 @@ describe('Schema Registry Avro Integration Test - Primitives', () => {
479455
metadata: metadata
480456
};
481457

482-
await schemaRegistryClient.register(topic, stringSchemaInfo);
458+
await schemaRegistryClient.register(topic + "-value", stringSchemaInfo);
483459

484460
const messageValue = true;
485461
const outgoingMessage = {
@@ -527,7 +503,7 @@ describe('Schema Registry Avro Integration Test - Primitives', () => {
527503
metadata: metadata
528504
};
529505

530-
await schemaRegistryClient.register(topic, stringSchemaInfo);
506+
await schemaRegistryClient.register(topic + "-value", stringSchemaInfo);
531507

532508
const messageValue = 1.354;
533509
const outgoingMessage = {
@@ -575,7 +551,7 @@ describe('Schema Registry Avro Integration Test - Primitives', () => {
575551
metadata: metadata
576552
};
577553

578-
await schemaRegistryClient.register(topic, stringSchemaInfo);
554+
await schemaRegistryClient.register(topic + "-value", stringSchemaInfo);
579555

580556
const messageValue = 1.354;
581557
const outgoingMessage = {

e2e/schemaregistry/schemaregistry-client.spec.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
} from '../../schemaregistry/schemaregistry-client';
99
import { beforeEach, describe, expect, it } from '@jest/globals';
1010
import { clientConfig } from '../../test/schemaregistry/test-constants';
11+
import { v4 } from 'uuid';
1112

1213
/* eslint-disable @typescript-eslint/no-non-null-asserted-optional-chain */
1314

@@ -77,7 +78,7 @@ describe('SchemaRegistryClient Integration Test', () => {
7778
});
7879

7980
it("Should return RestError when retrieving non-existent schema", async () => {
80-
await expect(schemaRegistryClient.getBySubjectAndId(testSubject, 1)).rejects.toThrow();
81+
await expect(schemaRegistryClient.getLatestSchemaMetadata(v4())).rejects.toThrow();
8182
});
8283

8384
it('Should register, retrieve, and delete a schema', async () => {

e2e/schemaregistry/schemaregistry-json.spec.ts

Lines changed: 33 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@ import stringify from 'json-stringify-deterministic';
1313
import { v4 } from 'uuid';
1414

1515
let schemaRegistryClient: SchemaRegistryClient;
16-
let producer: any;
16+
let serializerConfig: JsonSerializerConfig;
17+
let serializer: JsonSerializer;
18+
let deserializer: JsonDeserializer;
19+
let producer: KafkaJS.Producer;
20+
let consumer: KafkaJS.Consumer;
21+
1722

1823
const kafkaBrokerList = 'localhost:9092';
1924
const kafka = new KafkaJS.Kafka({
@@ -248,12 +253,6 @@ const schemaInfo: SchemaInfo = {
248253
schemaType: 'JSON'
249254
};
250255

251-
const customerSubject = 'Customer';
252-
const orderSubject = 'Order';
253-
const orderDetailsSubject = 'OrderDetails';
254-
255-
const subjectList = [orderSubject, orderDetailsSubject, customerSubject];
256-
257256
describe('SchemaRegistryClient json Integration Test', () => {
258257

259258
beforeEach(async () => {
@@ -267,35 +266,28 @@ describe('SchemaRegistryClient json Integration Test', () => {
267266
}
268267
});
269268
await producer.connect();
270-
const subjects: string[] = await schemaRegistryClient.getAllSubjects();
271-
272-
for (const subject of subjectList) {
273-
if (subjects && subjects.includes(subject)) {
274-
await schemaRegistryClient.deleteSubject(subject);
275-
await schemaRegistryClient.deleteSubject(subject, true);
276269

277-
const subjectValue = subject + '-value';
278-
if (subjects && subjects.includes(subjectValue)) {
279-
await schemaRegistryClient.deleteSubject(subjectValue);
280-
await schemaRegistryClient.deleteSubject(subjectValue, true);
281-
}
282-
}
283-
}
270+
consumer = kafka.consumer({
271+
kafkaJS: {
272+
groupId: 'test-group',
273+
fromBeginning: true,
274+
partitionAssigners: [KafkaJS.PartitionAssigners.roundRobin],
275+
},
276+
});
284277
});
285278

286279
afterEach(async () => {
287280
await producer.disconnect();
288-
producer = null;
289281
});
290282

291283
it("Should serialize and deserialize json", async () => {
292284
const testTopic = v4();
293285

294-
await schemaRegistryClient.register(testTopic, schemaInfo);
286+
await schemaRegistryClient.register(testTopic + "-value", schemaInfo);
295287

296-
const serializerConfig: JsonSerializerConfig = { autoRegisterSchemas: true };
297-
const serializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig);
298-
const deserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {});
288+
serializerConfig = { useLatestVersion: true };
289+
serializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig);
290+
deserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {});
299291

300292
const outgoingMessage = {
301293
key: 'key',
@@ -307,7 +299,7 @@ describe('SchemaRegistryClient json Integration Test', () => {
307299
messages: [outgoingMessage]
308300
});
309301

310-
let consumer = kafka.consumer({
302+
consumer = kafka.consumer({
311303
kafkaJS: {
312304
groupId: 'test-group',
313305
fromBeginning: true,
@@ -338,12 +330,11 @@ describe('SchemaRegistryClient json Integration Test', () => {
338330
await consumer.disconnect();
339331
}, 30000);
340332

341-
it("Should serialize with UseLatestVersion enabled", async () => {
333+
it("Should serialize with autoRegisterSchemas enabled and useLatestVersion disabled", async () => {
342334
const testTopic = v4();
343-
await schemaRegistryClient.register(testTopic, schemaInfo);
344335

345-
const serializerConfig: JsonSerializerConfig = { autoRegisterSchemas: true, useLatestVersion: true };
346-
const serializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig);
336+
serializerConfig = { autoRegisterSchemas: true, useLatestVersion: false };
337+
serializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig);
347338

348339
const outgoingMessage = {
349340
key: 'key',
@@ -359,10 +350,9 @@ describe('SchemaRegistryClient json Integration Test', () => {
359350

360351
it('Should fail to serialize with UseLatestVersion enabled and autoRegisterSchemas disabled', async () => {
361352
const testTopic = v4();
362-
await schemaRegistryClient.register(testTopic, schemaInfo);
363353

364-
const serializerConfig: JsonSerializerConfig = { autoRegisterSchemas: false, useLatestVersion: true };
365-
const serializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig);
354+
serializerConfig = { autoRegisterSchemas: false, useLatestVersion: true };
355+
serializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig);
366356

367357
const messageValue = { "name": "Bob Jones", "age": 25 };
368358

@@ -371,9 +361,12 @@ describe('SchemaRegistryClient json Integration Test', () => {
371361

372362
it("Should serialize referenced schemas", async () => {
373363
const testTopic = v4();
374-
const serializerConfig: JsonSerializerConfig = { autoRegisterSchemas: true };
375-
const serializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig);
376-
const deserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {});
364+
serializerConfig = { useLatestVersion: true };
365+
serializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig);
366+
deserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {});
367+
368+
const customerSubject = v4();
369+
const orderDetailsSubject = v4();
377370

378371
await schemaRegistryClient.register(customerSubject, customerSchema);
379372
const customerIdVersion: number = (await schemaRegistryClient.getLatestSchemaMetadata(customerSubject)).version!;
@@ -395,7 +388,7 @@ describe('SchemaRegistryClient json Integration Test', () => {
395388
};
396389
orderSchema.references = [orderDetailsReference];
397390

398-
await schemaRegistryClient.register(orderSubject, orderSchema);
391+
await schemaRegistryClient.register(testTopic + "-value", orderSchema);
399392

400393
const order = {
401394
order_details: {
@@ -412,15 +405,15 @@ describe('SchemaRegistryClient json Integration Test', () => {
412405

413406
const outgoingMessage = {
414407
key: 'key',
415-
value: await serializer.serialize(orderSubject, order)
408+
value: await serializer.serialize(testTopic, order)
416409
};
417410

418411
await producer.send({
419412
topic: testTopic,
420413
messages: [outgoingMessage]
421414
});
422415

423-
let consumer = kafka.consumer({
416+
consumer = kafka.consumer({
424417
kafkaJS: {
425418
groupId: 'test-group',
426419
fromBeginning: true,
@@ -437,7 +430,7 @@ describe('SchemaRegistryClient json Integration Test', () => {
437430
eachMessage: async ({ message }) => {
438431
const decodedMessage = {
439432
...message,
440-
value: await deserializer.deserialize(orderSubject, message.value as Buffer)
433+
value: await deserializer.deserialize(testTopic, message.value as Buffer)
441434
};
442435
messageRcvd = true;
443436

0 commit comments

Comments
 (0)