Skip to content

Commit 7d5d246

Browse files
authored
Enhance docs (#108)
* Update kafkajs README and sr example * Minor formatting
1 parent bd4fad6 commit 7d5d246

File tree

6 files changed

+32
-30
lines changed

6 files changed

+32
-30
lines changed

README.md

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,9 @@ pace with core Apache Kafka and components of the [Confluent Platform](https://w
2323

2424
This library leverages the work and concepts from two popular Apache Kafka JavaScript clients: [node-rdkafka](https://github.com/Blizzard/node-rdkafka) and [KafkaJS](https://github.com/tulios/kafkajs). The core is heavily based on the node-rdkafka library, which uses our own [librdkafka](https://github.com/confluentinc/librdkafka) library for core client functionality. However, we leverage a promisified API and a more idiomatic interface, similar to the one in KafkaJS, making it easy for developers to migrate and adopt this client depending on the patterns and interface they prefer. We're very happy to have been able to leverage the excellent work of the many authors of these libraries!
2525

26-
### This library is currently in limited-availability - it is supported for all usage but for the schema-registry client.
26+
### This library is currently in limited-availability
2727

28-
To use **Schema Registry**, use the existing [kafkajs/confluent-schema-registry](https://github.com/kafkajs/confluent-schema-registry) library that is compatible with this library. For a simple schema registry example, see [sr.js](https://github.com/confluentinc/confluent-kafka-javascript/blob/dev_early_access_development_branch/examples/kafkajs/sr.js).
29-
30-
**DISCLAIMER:** Although it is compatible with **confluent-kafka-javascript**, Confluent does not own or maintain kafkajs/confluent-schema-registry, and the use and functionality of the library should be considered "as is".
28+
To use **Schema Registry**, use the existing [@confluentinc/schemaregistry](https://www.npmjs.com/package/@confluentinc/schemaregistry) library that is compatible with this library. For a simple schema registry example, see [sr.js](https://github.com/confluentinc/confluent-kafka-javascript/blob/dev_early_access_development_branch/examples/kafkajs/sr.js).
3129

3230

3331
## Requirements

examples/kafkajs/sr.js

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
22
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
33

4-
// Note: The kafkajs/confluent-schema-registry will need to be installed separately to run this example,
4+
// Note: The @confluentinc/schemaregistry will need to be installed separately to run this example,
55
// as it isn't a dependency of confluent-kafka-javascript.
6-
const { SchemaRegistry, SchemaType } = require('@kafkajs/confluent-schema-registry');
6+
const { SchemaRegistryClient, SerdeType, AvroSerializer, AvroDeserializer} = require('@confluentinc/schemaregistry');
77

8-
const registry = new SchemaRegistry({ host: '<fill>' })
8+
const registry = new SchemaRegistryClient({ baseURLs: ['<fill>'] })
99
const kafka = new Kafka({
1010
kafkaJS: {
1111
brokers: ['<fill>'],
@@ -17,11 +17,12 @@ const kafka = new Kafka({
1717
},
1818
}
1919
});
20+
2021
let consumer = kafka.consumer({
21-
kafkaJS: {
22-
groupId: "test-group",
23-
fromBeginning: true,
24-
},
22+
kafkaJS: {
23+
groupId: "test-group",
24+
fromBeginning: true,
25+
},
2526
});
2627
let producer = kafka.producer();
2728

@@ -43,40 +44,42 @@ const schemaB = {
4344
};
4445

4546
const topicName = 'test-topic';
47+
const subjectName = topicName + '-value';
4648

4749
const run = async () => {
4850
// Register schemaB.
4951
await registry.register(
52+
'avro-b',
5053
{
51-
type: SchemaType.AVRO,
54+
schemaType: 'AVRO',
5255
schema: JSON.stringify(schemaB),
53-
},
54-
{ subject: 'Avro:B' },
56+
}
5557
);
56-
const response = await registry.api.Subject.latestVersion({ subject: 'Avro:B' });
57-
const { version } = JSON.parse(response.responseData);
58+
const response = await registry.getLatestSchemaMetadata('avro-b');
59+
const version = response.version
5860

5961
// Register schemaA, which references schemaB.
60-
const { id } = await registry.register(
62+
const id = await registry.register(
63+
subjectName,
6164
{
62-
type: SchemaType.AVRO,
65+
schemaType: 'AVRO',
6366
schema: JSON.stringify(schemaA),
6467
references: [
6568
{
6669
name: 'test.B',
67-
subject: 'Avro:B',
70+
subject: 'avro-b',
6871
version,
6972
},
7073
],
71-
},
72-
{ subject: 'Avro:A' },
74+
}
7375
)
7476

7577
// Produce a message with schemaA.
7678
await producer.connect()
79+
const ser = new AvroSerializer(registry, SerdeType.VALUE, { useLatestVersion: true });
7780
const outgoingMessage = {
7881
key: 'key',
79-
value: await registry.encode(id, { id: 1, b: { id: 2 } })
82+
value: await ser.serialize(topicName, { id: 1, b: { id: 2 } }),
8083
}
8184
await producer.send({
8285
topic: topicName,
@@ -90,11 +93,12 @@ const run = async () => {
9093
await consumer.subscribe({ topic: topicName })
9194

9295
let messageRcvd = false;
96+
const deser = new AvroDeserializer(registry, SerdeType.VALUE, {});
9397
await consumer.run({
9498
eachMessage: async ({ message }) => {
9599
const decodedMessage = {
96100
...message,
97-
value: await registry.decode(message.value)
101+
value: await deser.deserialize(topicName, message.value)
98102
};
99103
console.log("Consumer received message.\nBefore decoding: " + JSON.stringify(message) + "\nAfter decoding: " + JSON.stringify(decodedMessage));
100104
messageRcvd = true;

schemaregistry/rest-service.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import { RestError } from './rest-error';
1313
export interface BasicAuthCredentials {
1414
credentialsSource: 'USER_INFO' | 'URL' | 'SASL_INHERIT',
1515
userInfo?: string,
16-
saslInfo?: SaslInfo
16+
sasl?: SaslInfo
1717
}
1818

1919
export interface SaslInfo {
@@ -75,13 +75,13 @@ export class RestService {
7575
this.setAuth(toBase64(basicAuthCredentials.userInfo!));
7676
break;
7777
case 'SASL_INHERIT':
78-
if (!basicAuthCredentials.saslInfo) {
78+
if (!basicAuthCredentials.sasl) {
7979
throw new Error('Sasl info not provided');
8080
}
81-
if (basicAuthCredentials.saslInfo.mechanism?.toUpperCase() === 'GSSAPI') {
81+
if (basicAuthCredentials.sasl.mechanism?.toUpperCase() === 'GSSAPI') {
8282
throw new Error('SASL_INHERIT support PLAIN and SCRAM SASL mechanisms only');
8383
}
84-
this.setAuth(toBase64(`${basicAuthCredentials.saslInfo.username}:${basicAuthCredentials.saslInfo.password}`));
84+
this.setAuth(toBase64(`${basicAuthCredentials.sasl.username}:${basicAuthCredentials.sasl.password}`));
8585
break;
8686
case 'URL':
8787
if (!basicAuthCredentials.userInfo) {

schemaregistry/rules/encryption/tink/bytes.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ export function fromBase64(encoded: string, opt_webSafe?: boolean): Uint8Array {
130130
* @param bytes - the byte array input
131131
* @param opt_webSafe - True indicates we should use the alternative
132132
* alphabet, which does not require escaping for use in URLs.
133-
* @returns base64 - output
133+
* @returns base64 output
134134
*/
135135
export function toBase64(bytes: Uint8Array, opt_webSafe?: boolean): string {
136136
const encoded = window

schemaregistry/serde/json-util.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import { deepStrictEqual } from 'assert';
1212
* Generate JSON schema from value.
1313
*
1414
* @param value - Value.
15-
* @returns - JSON schema.
15+
* @returns JSON schema.
1616
*/
1717
export function generateSchema(value: any): any {
1818
switch (true) {

schemaregistry/serde/wildcard-matcher.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*
1919
* @param str - the string to match on
2020
* @param wildcardMatcher - the wildcard string to match against
21-
* @returns true - if the string matches the wildcard string
21+
* @returns true if the string matches the wildcard string
2222
*/
2323
export function match(str: string, wildcardMatcher: string): boolean {
2424
let re = wildcardToRegexp(wildcardMatcher, '.')

0 commit comments

Comments
 (0)