Skip to content

Commit a4754c7

Browse files
Add Kafka Oauth implementation (#74) (#119)
1 parent f40df20 commit a4754c7

File tree

5 files changed

+153
-13
lines changed

5 files changed

+153
-13
lines changed

package-lock.json

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

schemaregistry-examples/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
"description": "",
1010
"devDependencies": {
1111
"@confluentinc/kafka-javascript": "^0.2.0",
12-
"@confluentinc/schemaregistry": "^v0.1.17.6-devel",
12+
"@confluentinc/schemaregistry": "^v0.2.1",
13+
"axios": "^1.7.7",
1314
"uuid": "^10.0.0"
1415
}
1516
}

schemaregistry-examples/src/constants.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import { BasicAuthCredentials } from '@confluentinc/schemaregistry';
22

33
const issuerEndpointUrl = '<your-issuer-endpoint-url>'; // e.g. 'https://dev-123456.okta.com/oauth2/default/v1/token';
4-
const clientId = '<your-client-id>';
5-
const clientSecret = '<your-client-secret>';
4+
const oauthClientId = '<your-client-id>';
5+
const oauthClientSecret = '<your-client-secret>';
66
const scope = '<your-scope>'; // e.g. 'schemaregistry';
77
const identityPoolId = '<your-pool>'; // e.g. pool-Gx30
8-
const logicalCluster = '<your-logical-cluster>'; //e.g. lsrc-a6m5op
8+
const kafkaLogicalCluster = '<your-logical-cluster>'; // e.g. lkc-12345
9+
const schemaRegistryLogicalCluster = '<your-logical-cluster>'; //e.g. lsrc-a6m5op
910
const baseUrl = '<your-schema-registry-url>'; // e.g. 'https://psrc-3amt5nj.us-east-1.aws.confluent.cloud'
1011
const clusterBootstrapUrl = '<your-cluster-bootstrap-url>'; // e.g. "pkc-p34xa.us-east-1.aws.confluent.cloud:9092"
1112
const clusterApiKey = '<your-cluster-api-key>';
@@ -22,6 +23,6 @@ const basicAuthCredentials: BasicAuthCredentials = {
2223
};
2324

2425
export {
25-
issuerEndpointUrl, clientId, clientSecret, scope, identityPoolId, logicalCluster, baseUrl,
26-
clusterBootstrapUrl, clusterApiKey, clusterApiSecret, basicAuthCredentials, localAuthCredentials
26+
issuerEndpointUrl, oauthClientId, oauthClientSecret, scope, identityPoolId, kafkaLogicalCluster, schemaRegistryLogicalCluster,
27+
baseUrl, clusterBootstrapUrl, clusterApiKey, clusterApiSecret, basicAuthCredentials, localAuthCredentials
2728
};
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import {
2+
AvroSerializer, AvroSerializerConfig, SerdeType,
3+
ClientConfig, SchemaRegistryClient, SchemaInfo, BearerAuthCredentials
4+
} from "@confluentinc/schemaregistry";
5+
import { CreateAxiosDefaults } from "axios";
6+
import { KafkaJS } from '@confluentinc/kafka-javascript';
7+
import {
8+
clusterBootstrapUrl,
9+
baseUrl,
10+
issuerEndpointUrl, oauthClientId, oauthClientSecret, scope,
11+
identityPoolId, schemaRegistryLogicalCluster, kafkaLogicalCluster
12+
} from "./constants";
13+
import axios from 'axios';
14+
15+
// Only showing the producer, will be the same implementation for the consumer
16+
17+
async function token_refresh() {
18+
try {
19+
// Make a POST request to get the access token
20+
const response = await axios.post(issuerEndpointUrl, new URLSearchParams({
21+
grant_type: 'client_credentials',
22+
client_id: oauthClientId,
23+
client_secret: oauthClientSecret,
24+
scope: scope
25+
}), {
26+
headers: {
27+
'Content-Type': 'application/x-www-form-urlencoded'
28+
}
29+
});
30+
31+
// Extract the token and expiration time from the response
32+
const token = response.data.access_token;
33+
const exp_seconds = Math.floor(Date.now() / 1000) + response.data.expires_in;
34+
const exp_ms = exp_seconds * 1000;
35+
36+
const principal = 'admin'; // You can adjust this based on your needs
37+
const extensions = {
38+
traceId: '123',
39+
logicalCluster: kafkaLogicalCluster,
40+
identityPoolId: identityPoolId
41+
};
42+
43+
return { value: token, lifetime: exp_ms, principal, extensions };
44+
} catch (error) {
45+
console.error('Failed to retrieve OAuth token:', error);
46+
throw new Error('Failed to retrieve OAuth token');
47+
}
48+
}
49+
50+
async function kafkaProducerAvro() {
51+
52+
const createAxiosDefaults: CreateAxiosDefaults = {
53+
timeout: 10000
54+
};
55+
56+
const bearerAuthCredentials: BearerAuthCredentials = {
57+
credentialsSource: 'OAUTHBEARER',
58+
issuerEndpointUrl: issuerEndpointUrl,
59+
clientId: oauthClientId,
60+
clientSecret: oauthClientSecret,
61+
scope: scope,
62+
identityPoolId: identityPoolId,
63+
logicalCluster: schemaRegistryLogicalCluster
64+
}
65+
66+
const clientConfig: ClientConfig = {
67+
baseURLs: [baseUrl],
68+
createAxiosDefaults: createAxiosDefaults,
69+
cacheCapacity: 512,
70+
cacheLatestTtlSecs: 60,
71+
bearerAuthCredentials: bearerAuthCredentials
72+
};
73+
74+
const schemaRegistryClient = new SchemaRegistryClient(clientConfig);
75+
76+
const kafka: KafkaJS.Kafka = new KafkaJS.Kafka({
77+
kafkaJS: {
78+
brokers: [clusterBootstrapUrl],
79+
ssl: true,
80+
sasl: {
81+
mechanism: 'oauthbearer',
82+
oauthBearerProvider: token_refresh
83+
},
84+
},
85+
});
86+
87+
const producer: KafkaJS.Producer = kafka.producer({
88+
kafkaJS: {
89+
allowAutoTopicCreation: true,
90+
acks: 1,
91+
compression: KafkaJS.CompressionTypes.GZIP,
92+
}
93+
});
94+
95+
console.log("Producer created");
96+
97+
const schemaString: string = JSON.stringify({
98+
type: 'record',
99+
name: 'User',
100+
fields: [
101+
{ name: 'name', type: 'string' },
102+
{ name: 'age', type: 'int' },
103+
],
104+
});
105+
106+
const schemaInfo: SchemaInfo = {
107+
schemaType: 'AVRO',
108+
schema: schemaString,
109+
};
110+
111+
const userTopic = 'example-user-topic';
112+
await schemaRegistryClient.register(userTopic + "-value", schemaInfo);
113+
114+
const userInfo = { name: 'Alice N Bob', age: 30 };
115+
116+
const avroSerializerConfig: AvroSerializerConfig = { useLatestVersion: true };
117+
118+
const serializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig);
119+
120+
const outgoingMessage = {
121+
key: "1",
122+
value: await serializer.serialize(userTopic, userInfo)
123+
};
124+
125+
console.log("Outgoing message: ", outgoingMessage);
126+
127+
await producer.connect();
128+
129+
await producer.send({
130+
topic: userTopic,
131+
messages: [outgoingMessage]
132+
});
133+
134+
await producer.disconnect();
135+
}
136+
137+
kafkaProducerAvro();

schemaregistry-examples/src/oauth-schemaregistry.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
11
import { SchemaRegistryClient, BearerAuthCredentials, ClientConfig } from '@confluentinc/schemaregistry';
22
import { CreateAxiosDefaults } from 'axios';
33
import {
4-
issuerEndpointUrl, clientId, clientSecret, scope,
5-
identityPoolId, logicalCluster, baseUrl
4+
issuerEndpointUrl, oauthClientId, oauthClientSecret, scope,
5+
identityPoolId, schemaRegistryLogicalCluster, baseUrl
66
} from './constants';
77

88
async function oauthSchemaRegistry() {
99

1010
const bearerAuthCredentials: BearerAuthCredentials = {
1111
credentialsSource: 'OAUTHBEARER',
1212
issuerEndpointUrl: issuerEndpointUrl,
13-
clientId: clientId,
14-
clientSecret: clientSecret,
13+
clientId: oauthClientId,
14+
clientSecret: oauthClientSecret,
1515
scope: scope,
1616
identityPoolId: identityPoolId,
17-
logicalCluster: logicalCluster
17+
logicalCluster: schemaRegistryLogicalCluster
1818
}
1919

2020
const createAxiosDefaults: CreateAxiosDefaults = {

0 commit comments

Comments
 (0)