Skip to content

Commit 834dee3

Browse files
committed
Add schema registry example
1 parent 2f8f7fe commit 834dee3

File tree

7 files changed

+130
-9
lines changed

7 files changed

+130
-9
lines changed

MIGRATION.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,8 +301,12 @@ producerRun().then(consumerRun).catch(console.error);
301301
does not support `replicaAssignment`.
302302
* The `deleteTopics` method is fully supported.
303303
304+
### Using the Schema Registry
304305
305-
#### Error Handling
306+
In case you are using the Schema Registry client at `kafkajs/confluent-schema-registry`, you will not need to make any changes to the usage.
307+
An example is made available [here](../examples/kafkajs/sr.js).
308+
309+
### Error Handling
306310
307311
**Action**: Convert any checks based on `instanceof` and `error.name` or to error
308312
checks based on `error.code` or `error.type`.

examples/kafkajs/admin.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
const { Kafka } = require('../..').KafkaJS
2-
//const { Kafka } = require('kafkajs')
1+
// require('kafkajs') is replaced with require('confluent-kafka-js').KafkaJS.
2+
// Since this example is within the package itself, we use '../..', but code
3+
// will typically use 'confluent-kafka-js'.
4+
const { Kafka } = require('../..').KafkaJS;
35

46
async function adminStart() {
57
const kafka = new Kafka({

examples/kafkajs/consumer.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
const { Kafka } = require('../..').KafkaJS
2-
//const { Kafka } = require('kafkajs')
1+
// require('kafkajs') is replaced with require('confluent-kafka-js').KafkaJS.
2+
// Since this example is within the package itself, we use '../..', but code
3+
// will typically use 'confluent-kafka-js'.
4+
const { Kafka } = require('../..').KafkaJS;
35

46
async function consumerStart() {
57
let consumer;

examples/kafkajs/eos.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
const { Kafka } = require('../..').KafkaJS
2-
//const { Kafka } = require('kafkajs')
1+
// require('kafkajs') is replaced with require('confluent-kafka-js').KafkaJS.
2+
// Since this example is within the package itself, we use '../..', but code
3+
// will typically use 'confluent-kafka-js'.
4+
const { Kafka } = require('../..').KafkaJS;
35

46
async function eosStart() {
57
const kafka = new Kafka({

examples/kafkajs/producer.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
const { Kafka } = require('../..').KafkaJS
2-
//const { Kafka } = require('kafkajs')
1+
// require('kafkajs') is replaced with require('confluent-kafka-js').KafkaJS.
2+
// Since this example is within the package itself, we use '../..', but code
3+
// will typically use 'confluent-kafka-js'.
4+
const { Kafka } = require('../..').KafkaJS;
35

46
async function producerStart() {
57
const kafka = new Kafka({

examples/kafkajs/sr.js

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// require('kafkajs') is replaced with require('confluent-kafka-js').KafkaJS.
2+
// Since this example is within the package itself, we use '../..', but code
3+
// will typically use 'confluent-kafka-js'.
4+
const { Kafka } = require('../..').KafkaJS;
5+
6+
// Note: The kafkajs/confluent-schema-registry will need to be installed separately to run this example,
7+
// as it isn't a dependency of confluent-kafka-js.
8+
const { SchemaRegistry, SchemaType } = require('@kafkajs/confluent-schema-registry');
9+
10+
const registry = new SchemaRegistry({ host: '<fill>' })
11+
const kafka = new Kafka({
12+
brokers: ['<fill>'],
13+
clientId: 'example-consumer',
14+
})
15+
let consumer = kafka.consumer({ groupId: 'test-group' , fromBeginning: true, } );
16+
let producer = kafka.producer();
17+
18+
const schemaA = {
19+
type: 'record',
20+
namespace: 'test',
21+
name: 'A',
22+
fields: [
23+
{ name: 'id', type: 'int' },
24+
{ name: 'b', type: 'test.B' },
25+
],
26+
};
27+
28+
const schemaB = {
29+
type: 'record',
30+
namespace: 'test',
31+
name: 'B',
32+
fields: [{ name: 'id', type: 'int' }],
33+
};
34+
35+
const topicName = 'test-topic';
36+
37+
const run = async () => {
38+
// Register schemaB.
39+
await registry.register(
40+
{
41+
type: SchemaType.AVRO,
42+
schema: JSON.stringify(schemaB),
43+
},
44+
{ subject: 'Avro:B' },
45+
);
46+
const response = await registry.api.Subject.latestVersion({ subject: 'Avro:B' });
47+
const { version } = JSON.parse(response.responseData);
48+
49+
// Register schemaA, which references schemaB.
50+
const { id } = await registry.register(
51+
{
52+
type: SchemaType.AVRO,
53+
schema: JSON.stringify(schemaA),
54+
references: [
55+
{
56+
name: 'test.B',
57+
subject: 'Avro:B',
58+
version,
59+
},
60+
],
61+
},
62+
{ subject: 'Avro:A' },
63+
)
64+
65+
// Produce a message with schemaA.
66+
await producer.connect()
67+
const outgoingMessage = {
68+
key: 'key',
69+
value: await registry.encode(id, { id: 1, b: { id: 2 } })
70+
}
71+
await producer.send({
72+
topic: topicName,
73+
messages: [outgoingMessage]
74+
});
75+
console.log("Producer sent its message.")
76+
await producer.disconnect();
77+
producer = null;
78+
79+
await consumer.connect()
80+
await consumer.subscribe({ topic: topicName })
81+
82+
let messageRcvd = false;
83+
await consumer.run({
84+
eachMessage: async ({ message }) => {
85+
const decodedMessage = {
86+
...message,
87+
value: await registry.decode(message.value)
88+
};
89+
console.log("Consumer recieved message.\nBefore decoding: " + JSON.stringify(message) + "\nAfter decoding: " + JSON.stringify(decodedMessage));
90+
messageRcvd = true;
91+
},
92+
});
93+
94+
// Wait around until we get a message, and then disconnect.
95+
while (!messageRcvd) {
96+
await new Promise((resolve) => setTimeout(resolve, 100));
97+
}
98+
99+
await consumer.disconnect();
100+
consumer = null;
101+
}
102+
103+
run().catch (async e => {
104+
console.error(e);
105+
consumer && await consumer.disconnect();
106+
producer && await producer.disconnect();
107+
process.exit(1);
108+
})

lib/kafkajs/_consumer.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -967,6 +967,7 @@ class Consumer {
967967

968968
/**
969969
* Disconnects and cleans up the consumer.
970+
* @note This cannot be called from within `eachMessage` callback of `Consumer.run`.
970971
* @returns {Promise<void>} a promise that resolves when the consumer has disconnected.
971972
*/
972973
async disconnect() {

0 commit comments

Comments
 (0)