Skip to content

Commit 618e1f9

Browse files
Schemaregistry oauth retry (#170)
* Add simple-oauth types to schema registry * Add oauth retry and unit tests for retry * Add oauth retry and unit tests for retry rename retry helper Rename retry helper * Added boom error * Add boom and boomify * remove retryHelper
1 parent e24a709 commit 618e1f9

File tree

9 files changed

+334
-26
lines changed

9 files changed

+334
-26
lines changed

package-lock.json

Lines changed: 51 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@
3535
"@bufbuild/buf": "^1.37.0",
3636
"@bufbuild/protoc-gen-es": "^2.0.0",
3737
"@eslint/js": "^9.9.0",
38+
"@hapi/boom": "^10.0.1",
3839
"@types/eslint__js": "^8.42.3",
3940
"@types/jest": "^29.5.13",
4041
"@types/node": "^20.16.1",
42+
"axios-mock-adapter": "^2.1.0",
4143
"bluebird": "^3.5.3",
4244
"eslint": "^8.57.0",
4345
"eslint-plugin-jest": "^28.6.0",
@@ -53,7 +55,7 @@
5355
"@mapbox/node-pre-gyp": "^1.0.11",
5456
"bindings": "^1.3.1",
5557
"nan": "^2.17.0"
56-
},
58+
},
5759
"engines": {
5860
"node": ">=18.0.0"
5961
},
@@ -62,4 +64,4 @@
6264
"schemaregistry",
6365
"schemaregistry-examples"
6466
]
65-
}
67+
}

schemaregistry/oauth/oauth-client.ts

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
import { ModuleOptions, ClientCredentials, ClientCredentialTokenConfig, AccessToken } from 'simple-oauth2';
2+
import { sleep, fullJitter, isRetriable } from '../retry-helper';
3+
import { isBoom } from '@hapi/boom';
24

35
const TOKEN_EXPIRATION_THRESHOLD_SECONDS = 30 * 60; // 30 minutes
46

57
export class OAuthClient {
68
private client: ClientCredentials;
79
private token: AccessToken | undefined;
810
private tokenParams: ClientCredentialTokenConfig;
11+
private maxRetries: number;
12+
private retriesWaitMs: number;
13+
private retriesMaxWaitMs: number;
914

10-
constructor(clientId: string, clientSecret: string, tokenHost: string, tokenPath: string, scope: string) {
15+
constructor(clientId: string, clientSecret: string, tokenHost: string, tokenPath: string, scope: string,
16+
maxRetries: number, retriesWaitMs: number, retriesMaxWaitMs: number
17+
) {
1118
const clientConfig: ModuleOptions = {
1219
client: {
1320
id: clientId,
@@ -22,6 +29,10 @@ export class OAuthClient {
2229
this.tokenParams = { scope };
2330

2431
this.client = new ClientCredentials(clientConfig);
32+
33+
this.maxRetries = maxRetries;
34+
this.retriesWaitMs = retriesWaitMs;
35+
this.retriesMaxWaitMs = retriesMaxWaitMs;
2536
}
2637

2738
async getAccessToken(): Promise<string> {
@@ -33,14 +44,21 @@ export class OAuthClient {
3344
}
3445

3546
async generateAccessToken(): Promise<void> {
36-
try {
37-
const token = await this.client.getToken(this.tokenParams);
38-
this.token = token;
39-
} catch (error) {
40-
if (error instanceof Error) {
41-
throw new Error(`Failed to get token from server: ${error.message}`);
47+
for (let i = 0; i < this.maxRetries + 1; i++) {
48+
try {
49+
const token = await this.client.getToken(this.tokenParams);
50+
this.token = token;
51+
} catch (error: any) {
52+
if (isBoom(error) && i < this.maxRetries) {
53+
const statusCode = error.output.statusCode;
54+
if (isRetriable(statusCode)) {
55+
const waitTime = fullJitter(this.retriesWaitMs, this.retriesMaxWaitMs, i);
56+
await sleep(waitTime);
57+
continue;
58+
}
59+
}
60+
throw new Error(`Failed to get token from server: ${error}`);
4261
}
43-
throw new Error(`Failed to get token from server: ${error}`);
4462
}
4563
}
4664

@@ -54,3 +72,4 @@ export class OAuthClient {
5472
throw new Error('Access token is not available');
5573
}
5674
}
75+

schemaregistry/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@
3838
"@criteria/json-schema-validation": "^0.10.0",
3939
"@google-cloud/kms": "^4.5.0",
4040
"@hackbg/miscreant-esm": "^0.3.2-patch.3",
41+
"@hapi/boom": "^10.0.1",
4142
"@smithy/types": "^3.3.0",
43+
"@types/simple-oauth2": "^5.0.7",
4244
"@types/validator": "^13.12.0",
4345
"ajv": "^8.17.1",
4446
"async-mutex": "^0.5.0",

schemaregistry/rest-service.ts

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import axios, { AxiosInstance, AxiosRequestConfig, AxiosResponse, CreateAxiosDef
22
import { OAuthClient } from './oauth/oauth-client';
33
import { RestError } from './rest-error';
44
import axiosRetry from "axios-retry";
5+
import { fullJitter, isRetriable } from './retry-helper';
56
/*
67
* Confluent-Schema-Registry-TypeScript - Node.js wrapper for Confluent Schema Registry
78
*
@@ -62,10 +63,10 @@ export class RestService {
6263
axiosRetry(this.client, {
6364
retries: maxRetries ?? 2,
6465
retryDelay: (retryCount) => {
65-
return this.fullJitter(retriesWaitMs ?? 1000, retriesMaxWaitMs ?? 20000, retryCount - 1)
66+
return fullJitter(retriesWaitMs ?? 1000, retriesMaxWaitMs ?? 20000, retryCount - 1)
6667
},
6768
retryCondition: (error) => {
68-
return this.isRetriable(error.response?.status ?? 0);
69+
return isRetriable(error.response?.status ?? 0);
6970
}
7071
});
7172
this.baseURLs = baseURLs;
@@ -76,16 +77,7 @@ export class RestService {
7677
this.setHeaders({ 'Content-Type': 'application/vnd.schemaregistry.v1+json' });
7778

7879
this.handleBasicAuth(basicAuthCredentials);
79-
this.handleBearerAuth(bearerAuthCredentials);
80-
}
81-
82-
isRetriable(statusCode: number): boolean {
83-
return statusCode == 408 || statusCode == 429
84-
|| statusCode == 500 || statusCode == 502 || statusCode == 503 || statusCode == 504;
85-
}
86-
87-
fullJitter(baseDelayMs: number, maxDelayMs: number, retriesAttempted: number): number {
88-
return Math.random() * Math.min(maxDelayMs, baseDelayMs * 2 ** retriesAttempted)
80+
this.handleBearerAuth(maxRetries ?? 2, retriesWaitMs ?? 1000, retriesMaxWaitMs ?? 20000, bearerAuthCredentials);
8981
}
9082

9183
handleBasicAuth(basicAuthCredentials?: BasicAuthCredentials): void {
@@ -119,7 +111,8 @@ export class RestService {
119111
}
120112
}
121113

122-
handleBearerAuth(bearerAuthCredentials?: BearerAuthCredentials): void {
114+
handleBearerAuth(maxRetries: number,
115+
retriesWaitMs: number, retriesMaxWaitMs: number, bearerAuthCredentials?: BearerAuthCredentials): void {
123116
if (bearerAuthCredentials) {
124117
delete this.client.defaults.auth;
125118

@@ -157,7 +150,8 @@ export class RestService {
157150
}
158151
const issuerEndPointUrl = new URL(bearerAuthCredentials.issuerEndpointUrl!);
159152
this.oauthClient = new OAuthClient(bearerAuthCredentials.clientId!, bearerAuthCredentials.clientSecret!,
160-
issuerEndPointUrl.origin, issuerEndPointUrl.pathname, bearerAuthCredentials.scope!);
153+
issuerEndPointUrl.origin, issuerEndPointUrl.pathname, bearerAuthCredentials.scope!,
154+
maxRetries, retriesWaitMs, retriesMaxWaitMs);
161155
break;
162156
default:
163157
throw new Error('Invalid bearer auth credentials source');

schemaregistry/retry-helper.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
2+
3+
function fullJitter(baseDelayMs: number, maxDelayMs: number, retriesAttempted: number): number {
4+
return Math.random() * Math.min(maxDelayMs, baseDelayMs * 2 ** retriesAttempted)
5+
}
6+
7+
function isRetriable(statusCode: number): boolean {
8+
return statusCode == 408 || statusCode == 429
9+
|| statusCode == 500 || statusCode == 502 || statusCode == 503 || statusCode == 504;
10+
}
11+
12+
export { sleep, fullJitter, isRetriable };

0 commit comments

Comments
 (0)