Skip to content

Commit 65c601d

Browse files
Schemaregistry rebase (#33) (#80)
* rebase dev_early_access * Add OAuth Support to Rest Service (#25) * Add OAuth client support * Add optional chaining for token * Fix merge conflict * add simple-oauth2 dependency
1 parent f5e7f3e commit 65c601d

File tree

8 files changed

+2293
-2632
lines changed

8 files changed

+2293
-2632
lines changed

package-lock.json

Lines changed: 2167 additions & 2623 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,30 @@
4747
"typescript-eslint": "^8.2.0"
4848
},
4949
"dependencies": {
50+
"@aws-sdk/client-kms": "^3.637.0",
51+
"@azure/identity": "^4.4.1",
52+
"@azure/keyvault-keys": "^4.8.0",
53+
"@bufbuild/protobuf": "^2.0.0",
54+
"@criteria/json-schema": "^0.10.0",
55+
"@criteria/json-schema-validation": "^0.10.0",
56+
"@google-cloud/kms": "^4.5.0",
57+
"@hackbg/miscreant-esm": "^0.3.2-patch.3",
5058
"@mapbox/node-pre-gyp": "^1.0.11",
59+
"@smithy/types": "^3.3.0",
60+
"@types/simple-oauth2": "^5.0.7",
61+
"@types/validator": "^13.12.0",
62+
"ajv": "^8.17.1",
63+
"async-mutex": "^0.5.0",
64+
"avsc": "^5.7.7",
65+
"axios": "^1.7.3",
5166
"bindings": "^1.3.1",
52-
"nan": "^2.17.0"
67+
"json-stringify-deterministic": "^1.0.12",
68+
"lru-cache": "^11.0.0",
69+
"nan": "^2.17.0",
70+
"node-vault": "^0.10.2",
71+
"simple-oauth2": "^5.1.0",
72+
"ts-jest": "^29.2.4",
73+
"validator": "^13.12.0"
5374
},
5475
"engines": {
5576
"node": ">=18.0.0"

schemaregistry/oauth/oauth-client.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import { ModuleOptions, ClientCredentials, ClientCredentialTokenConfig, AccessToken } from 'simple-oauth2';
2+
3+
const TOKEN_EXPIRATION_THRESHOLD_SECONDS = 30 * 60; // 30 minutes
4+
5+
export class OAuthClient {
6+
private client: ClientCredentials;
7+
private token: AccessToken | undefined;
8+
private tokenParams: ClientCredentialTokenConfig;
9+
10+
constructor(clientId: string, clientSecret: string, tokenHost: string, tokenPath: string, scope: string) {
11+
const clientConfig: ModuleOptions = {
12+
client: {
13+
id: clientId,
14+
secret: clientSecret,
15+
},
16+
auth: {
17+
tokenHost: tokenHost,
18+
tokenPath: tokenPath
19+
}
20+
}
21+
22+
this.tokenParams = { scope };
23+
24+
this.client = new ClientCredentials(clientConfig);
25+
}
26+
27+
async getAccessToken(): Promise<string> {
28+
if (!this.token || this.token.expired(TOKEN_EXPIRATION_THRESHOLD_SECONDS)) {
29+
await this.generateAccessToken();
30+
}
31+
32+
return this.getAccessTokenString();
33+
}
34+
35+
async generateAccessToken(): Promise<void> {
36+
try {
37+
const token = await this.client.getToken(this.tokenParams);
38+
this.token = token;
39+
} catch (error) {
40+
if (error instanceof Error) {
41+
throw new Error(`Failed to get token from server: ${error.message}`);
42+
}
43+
throw new Error(`Failed to get token from server: ${error}`);
44+
}
45+
}
46+
47+
async getAccessTokenString(): Promise<string> {
48+
const accessToken = this.token?.token?.['access_token'];
49+
50+
if (typeof accessToken === 'string') {
51+
return accessToken;
52+
}
53+
54+
throw new Error('Access token is not available');
55+
}
56+
}

schemaregistry/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@
3939
"jsonata": "^2.0.5",
4040
"lru-cache": "^11.0.0",
4141
"node-vault": "^0.10.2",
42+
"simple-oauth2": "^5.1.0",
4243
"validator": "^13.12.0"
4344
},
4445
"scripts": {
4546
"test:types": "tsc -p .",
4647
"test:schemaregistry": "make -f Makefile.schemaregistry test"
4748
},
48-
4949
"keywords": [
5050
"schemaregistry",
5151
"confluent"
@@ -54,6 +54,5 @@
5454
"type": "git",
5555
"url": "[email protected]:confluentinc/confluent-kafka-javascript.git"
5656
},
57-
5857
"license": "MIT"
5958
}

schemaregistry/rest-service.ts

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import axios, { AxiosInstance, AxiosRequestConfig, AxiosResponse, CreateAxiosDefaults } from 'axios';
2+
import { OAuthClient } from './oauth/oauth-client';
23
import { RestError } from './rest-error';
34

45
/*
@@ -10,25 +11,51 @@ import { RestError } from './rest-error';
1011
* of the MIT license. See the LICENSE.txt file for details.
1112
*/
1213

14+
export interface BearerAuthCredentials {
15+
clientId: string,
16+
clientSecret: string,
17+
tokenHost: string,
18+
tokenPath: string,
19+
schemaRegistryLogicalCluster: string,
20+
identityPool: string,
21+
scope: string
22+
}
23+
24+
//TODO: Consider retry policy, may need additional libraries on top of Axios
1325
export interface ClientConfig {
1426
baseURLs: string[],
1527
cacheCapacity: number,
1628
cacheLatestTtlSecs?: number,
17-
isForward?: boolean
29+
isForward?: boolean,
1830
createAxiosDefaults?: CreateAxiosDefaults,
31+
bearerAuthCredentials?: BearerAuthCredentials,
1932
}
2033

2134
export class RestService {
2235
private client: AxiosInstance;
2336
private baseURLs: string[];
37+
private OAuthClient?: OAuthClient;
38+
private bearerAuth: boolean = false;
2439

25-
constructor(baseURLs: string[], isForward?: boolean, axiosDefaults?: CreateAxiosDefaults) {
40+
constructor(baseURLs: string[], isForward?: boolean, axiosDefaults?: CreateAxiosDefaults,
41+
bearerAuthCredentials?: BearerAuthCredentials) {
2642
this.client = axios.create(axiosDefaults);
2743
this.baseURLs = baseURLs;
2844

2945
if (isForward) {
3046
this.client.defaults.headers.common['X-Forward'] = 'true'
3147
}
48+
49+
if (bearerAuthCredentials) {
50+
this.bearerAuth = true;
51+
delete this.client.defaults.auth;
52+
this.setHeaders({
53+
'Confluent-Identity-Pool-Id': bearerAuthCredentials.identityPool,
54+
'target-sr-cluster': bearerAuthCredentials.schemaRegistryLogicalCluster
55+
});
56+
this.OAuthClient = new OAuthClient(bearerAuthCredentials.clientId, bearerAuthCredentials.clientSecret,
57+
bearerAuthCredentials.tokenHost, bearerAuthCredentials.tokenPath, bearerAuthCredentials.scope);
58+
}
3259
}
3360

3461
async handleRequest<T>(
@@ -38,6 +65,10 @@ export class RestService {
3865
config?: AxiosRequestConfig,
3966
): Promise<AxiosResponse<T>> {
4067

68+
if (this.bearerAuth) {
69+
await this.setBearerToken();
70+
}
71+
4172
for (let i = 0; i < this.baseURLs.length; i++) {
4273
try {
4374
this.setBaseURL(this.baseURLs[i]);
@@ -80,6 +111,15 @@ export class RestService {
80111
}
81112
}
82113

114+
async setBearerToken(): Promise<void> {
115+
if (!this.OAuthClient) {
116+
throw new Error('OAuthClient not initialized');
117+
}
118+
119+
const bearerToken: string = await this.OAuthClient.getAccessToken();
120+
this.setAuth(undefined, bearerToken);
121+
}
122+
83123
setTimeout(timeout: number): void {
84124
this.client.defaults.timeout = timeout
85125
}

schemaregistry/rules/encryption/dekregistry/dekregistry-client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class DekRegistryClient implements Client {
7878
}
7979

8080
static newClient(config: ClientConfig): Client {
81-
let url = config.baseURLs[0]
81+
const url = config.baseURLs[0];
8282
if (url.startsWith("mock://")) {
8383
return new MockDekRegistryClient()
8484
}

schemaregistry/rules/encryption/dekregistry/mock-dekregistry-client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class MockDekRegistryClient implements Client {
6868
algorithm: string, version: number = 1, deleted: boolean = false): Promise<Dek> {
6969
if (version === -1) {
7070
let latestVersion = 0;
71-
for (let key of this.dekCache.keys()) {
71+
for (const key of this.dekCache.keys()) {
7272
const parsedKey = JSON.parse(key);
7373
if (parsedKey.kekName === kekName && parsedKey.subject === subject
7474
&& parsedKey.algorithm === algorithm && parsedKey.deleted === deleted) {

schemaregistry/schemaregistry-client.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { AxiosResponse } from 'axios';
33
import stringify from "json-stringify-deterministic";
44
import { LRUCache } from 'lru-cache';
55
import { Mutex } from 'async-mutex';
6-
import {MockClient} from "./mock-schemaregistry-client";
6+
import { MockClient } from "./mock-schemaregistry-client";
77

88
/*
99
* Confluent-Schema-Registry-TypeScript - Node.js wrapper for Confluent Schema Registry
@@ -165,7 +165,8 @@ export class SchemaRegistryClient implements Client {
165165
...(config.cacheLatestTtlSecs !== undefined && { maxAge: config.cacheLatestTtlSecs * 1000 })
166166
};
167167

168-
this.restService = new RestService(config.baseURLs, config.isForward, config.createAxiosDefaults);
168+
this.restService = new RestService(config.baseURLs, config.isForward, config.createAxiosDefaults,
169+
config.bearerAuthCredentials);
169170

170171
this.schemaToIdCache = new LRUCache(cacheOptions);
171172
this.idToSchemaInfoCache = new LRUCache(cacheOptions);

0 commit comments

Comments
 (0)