Skip to content

Commit e11fcfb

Browse files
committed
Add examples and tests for OAUTHBEARER (promisified API)
1 parent 510584b commit e11fcfb

File tree

4 files changed

+349
-0
lines changed

4 files changed

+349
-0
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
2+
var jwt = require('jsonwebtoken');
3+
4+
// This example uses the Producer for demonstration purposes.
5+
// It is the same whether you use a Consumer/AdminClient.
6+
7+
async function token_refresh(oauthbearer_config /* string - passed from config */) {
8+
console.log("Called token_refresh with given config: " + oauthbearer_config);
9+
// At this point, we can use the information in the token, make
10+
// some API calls, fetch something from a file...
11+
// For the illustration, everything is hard-coded.
12+
const principal = 'admin';
13+
// In seconds - needed by jsonwebtoken library
14+
const exp_seconds = Math.floor(Date.now() / 1000) + (60 * 60);
15+
// In milliseconds - needed by kafka-javascript.
16+
const exp_ms = exp_seconds * 1000;
17+
18+
// For illustration, we're not signing our JWT (algorithm: none).
19+
// For production uses-cases, it should be signed.
20+
const value = jwt.sign(
21+
{ 'sub': principal, exp: exp_seconds, 'scope': 'requiredScope' }, '', { algorithm: 'none' });
22+
23+
// SASL extensions can be passed as Map or key/value pairs in an object.
24+
const extensions = {
25+
traceId: '123'
26+
};
27+
28+
// The callback is called with the new token, its lifetime, and the principal.
29+
// The extensions are optional and may be omitted.
30+
console.log("Finished token_refresh, triggering callback: with value: " +
31+
value.slice(0, 10) + "..., lifetime: " + exp_ms +
32+
", principal: " + principal + ", extensions: " + JSON.stringify(extensions));
33+
34+
// If no token could be fetched or an error occurred, an Error can be thrown instead.
35+
return { value, lifetime: exp_ms, principal, extensions };
36+
}
37+
38+
async function run() {
39+
const kafka = new Kafka({});
40+
const producer = kafka.producer({
41+
kafkaJS: {
42+
brokers: ['localhost:46611'],
43+
sasl: {
44+
mechanism: 'oauthbearer',
45+
oauthBearerProvider: token_refresh,
46+
},
47+
},
48+
'sasl.oauthbearer.config': 'someConfigPropertiesKey=value',
49+
});
50+
51+
await producer.connect();
52+
console.log("Producer connected");
53+
54+
const deliveryReport = await producer.send({
55+
topic: 'topic',
56+
messages: [
57+
{ value: 'Hello world!' },
58+
],
59+
});
60+
console.log("Producer sent message", deliveryReport);
61+
62+
await producer.disconnect();
63+
}
64+
65+
run().catch(console.error);

examples/kafkajs/oauthbearer_calback_authentication/package-lock.json

Lines changed: 188 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"name": "oauthbearer_calback_authentication",
3+
"version": "1.0.0",
4+
"description": "",
5+
"main": "index.js",
6+
"scripts": {
7+
"test": "echo \"Error: no test specified\" && exit 1"
8+
},
9+
"keywords": [],
10+
"author": "",
11+
"license": "ISC",
12+
"dependencies": {
13+
"@confluentinc/kafka-javascript": "file:../../..",
14+
"jsonwebtoken": "^9.0.2"
15+
}
16+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// minimum 30s are needed for the connect timeouts of consumer/producer
2+
jest.setTimeout(35000);
3+
4+
const {
5+
createProducer,
6+
sleep,
7+
createConsumer,
8+
createAdmin,
9+
} = require('./testhelpers');
10+
11+
describe('Client > oauthbearer callback', () => {
12+
let oauthbearer_cb_called = 0;
13+
const oauthbearer_config = 'key=value';
14+
const providerCb = async (config) => {
15+
expect(config).toEqual(oauthbearer_config);
16+
oauthbearer_cb_called++;
17+
throw new Error('oauthbearer_cb error');
18+
};
19+
20+
beforeEach(async () => {
21+
oauthbearer_cb_called = 0;
22+
})
23+
24+
it('works for producer',
25+
async () => {
26+
const client = createProducer({
27+
sasl: {
28+
mechanism: 'OAUTHBEARER',
29+
oauthBearerProvider: providerCb,
30+
}
31+
}, {
32+
'sasl.oauthbearer.config': oauthbearer_config,
33+
});
34+
35+
await expect(client.connect()).rejects.toThrow('oauthbearer_cb error');
36+
expect(oauthbearer_cb_called).toEqual(1);
37+
await client.disconnect();
38+
}
39+
);
40+
41+
it('works for consumer',
42+
async () => {
43+
const client = createConsumer({
44+
groupId: 'gid',
45+
sasl: {
46+
mechanism: 'OAUTHBEARER',
47+
oauthBearerProvider: providerCb,
48+
}
49+
}, {
50+
'sasl.oauthbearer.config': oauthbearer_config,
51+
});
52+
53+
await expect(client.connect()).rejects.toThrow('oauthbearer_cb error');
54+
expect(oauthbearer_cb_called).toEqual(1);
55+
await client.disconnect();
56+
}
57+
);
58+
59+
it('works for admin',
60+
async () => {
61+
const client = createAdmin({
62+
sasl: {
63+
mechanism: 'OAUTHBEARER',
64+
oauthBearerProvider: providerCb,
65+
}
66+
}, {
67+
'sasl.oauthbearer.config': oauthbearer_config,
68+
});
69+
70+
// Unlike others, there is no actual connection establishment
71+
// within the admin client, so we can't test for the error here.
72+
await expect(client.connect()).resolves.toBeUndefined();
73+
74+
await sleep(2000); // Wait for the callback to be called
75+
expect(oauthbearer_cb_called).toEqual(1);
76+
await client.disconnect();
77+
}
78+
);
79+
80+
})

0 commit comments

Comments
 (0)