Skip to content

Commit b6f0506

Browse files
authored
feat: Hooks for the context rejection mechanism (#5901)
* feat: Hooks for the context rejection mechanism * feat: add context acceptor mechanism to the SubscriptionServer
1 parent cf9078a commit b6f0506

File tree

13 files changed

+150
-32
lines changed

13 files changed

+150
-32
lines changed

packages/cubejs-api-gateway/src/SubscriptionServer.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { v4 as uuidv4 } from 'uuid';
33
import { UserError } from './UserError';
44
import type { ApiGateway } from './gateway';
55
import type { LocalSubscriptionStore } from './LocalSubscriptionStore';
6-
import { ExtendedRequestContext } from './interfaces';
6+
import { ExtendedRequestContext, ContextAcceptorFn } from './interfaces';
77

88
const methodParams: Record<string, string[]> = {
99
load: ['query', 'queryType'],
@@ -26,6 +26,7 @@ export class SubscriptionServer {
2626
protected readonly apiGateway: ApiGateway,
2727
protected readonly sendMessage: WebSocketSendMessageFn,
2828
protected readonly subscriptionStore: LocalSubscriptionStore,
29+
protected readonly contextAcceptor: ContextAcceptorFn,
2930
) {
3031
}
3132

@@ -54,6 +55,11 @@ export class SubscriptionServer {
5455
if (message.authorization) {
5556
authContext = { isSubscription: true };
5657
await this.apiGateway.checkAuthFn(authContext, message.authorization);
58+
const acceptanceResult = this.contextAcceptor(authContext);
59+
if (!acceptanceResult.accepted) {
60+
this.sendMessage(connectionId, acceptanceResult.rejectMessage);
61+
return;
62+
}
5763
await this.subscriptionStore.setAuthContext(connectionId, authContext);
5864
this.sendMessage(connectionId, { handshake: true });
5965
return;

packages/cubejs-api-gateway/src/gateway.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ import {
5454
import {
5555
CheckAuthMiddlewareFn,
5656
RequestLoggerMiddlewareFn,
57+
ContextRejectionMiddlewareFn,
58+
ContextAcceptorFn,
5759
} from './interfaces';
5860
import { getRequestIdFromRequest, requestParser } from './requestParser';
5961
import { UserError } from './UserError';
@@ -126,6 +128,10 @@ class ApiGateway {
126128
protected readonly requestLoggerMiddleware: RequestLoggerMiddlewareFn;
127129

128130
protected readonly securityContextExtractor: SecurityContextExtractorFn;
131+
132+
protected readonly contextRejectionMiddleware: ContextRejectionMiddlewareFn;
133+
134+
protected readonly wsContextAcceptor: ContextAcceptorFn;
129135

130136
protected readonly releaseListeners: (() => any)[] = [];
131137

@@ -158,12 +164,15 @@ class ApiGateway {
158164
: this.checkAuth;
159165
this.securityContextExtractor = this.createSecurityContextExtractor(options.jwt);
160166
this.requestLoggerMiddleware = options.requestLoggerMiddleware || this.requestLogger;
167+
this.contextRejectionMiddleware = options.contextRejectionMiddleware || (async (req, res, next) => next());
168+
this.wsContextAcceptor = options.wsContextAcceptor || (() => ({ accepted: true }));
161169
}
162170

163171
public initApp(app: ExpressApplication) {
164172
const userMiddlewares: RequestHandler[] = [
165173
this.checkAuthMiddleware,
166174
this.requestContextMiddleware,
175+
this.contextRejectionMiddleware,
167176
this.logNetworkUsage,
168177
this.requestLoggerMiddleware
169178
];
@@ -279,6 +288,7 @@ class ApiGateway {
279288
const systemMiddlewares: RequestHandler[] = [
280289
this.checkAuthSystemMiddleware,
281290
this.requestContextMiddleware,
291+
this.contextRejectionMiddleware,
282292
this.requestLoggerMiddleware
283293
];
284294

@@ -386,7 +396,7 @@ class ApiGateway {
386396
}
387397

388398
public initSubscriptionServer(sendMessage: WebSocketSendMessageFn) {
389-
return new SubscriptionServer(this, sendMessage, this.subscriptionStore);
399+
return new SubscriptionServer(this, sendMessage, this.subscriptionStore, this.wsContextAcceptor);
390400
}
391401

392402
protected duration(requestStarted) {

packages/cubejs-api-gateway/src/interfaces.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,22 @@ export type CheckAuthMiddlewareFn =
7575
next: ExpressNextFunction,
7676
) => void;
7777

78+
/**
79+
* Context rejection middleware.
80+
*/
81+
export type ContextRejectionMiddlewareFn =
82+
(
83+
req: Request,
84+
res: ExpressResponse,
85+
next: ExpressNextFunction,
86+
) => void;
87+
88+
/**
89+
* ContextAcceptorFn type that matches the ContextAcceptor.shouldAcceptWs
90+
* signature from the server-core package
91+
*/
92+
export type ContextAcceptorFn = (context: RequestContext) => { accepted: boolean; rejectMessage?: any };
93+
7894
/**
7995
* Logger middleware.
8096
* @deprecated

packages/cubejs-api-gateway/src/types/gateway.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import {
1616
import {
1717
CheckAuthMiddlewareFn,
1818
RequestLoggerMiddlewareFn,
19+
ContextRejectionMiddlewareFn,
20+
ContextAcceptorFn,
1921
} from '../interfaces';
2022

2123
type UserBackgroundContext = {
@@ -59,6 +61,8 @@ interface ApiGatewayOptions {
5961
enforceSecurityChecks?: boolean;
6062
playgroundAuthSecret?: string;
6163
serverCoreVersion?: string;
64+
contextRejectionMiddleware?: ContextRejectionMiddlewareFn;
65+
wsContextAcceptor?: ContextAcceptorFn;
6266
checkAuth?: CheckAuthFn;
6367
/**
6468
* @deprecated Use checkAuth property instead.

packages/cubejs-backend-shared/src/env.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ const variables: Record<string, (...args: any) => any> = {
190190
maxPartitionsPerCube: () => get('CUBEJS_MAX_PARTITIONS_PER_CUBE')
191191
.default('10000')
192192
.asInt(),
193+
scheduledRefreshBatchSize: () => get('CUBEJS_SCHEDULED_REFRESH_BATCH_SIZE')
194+
.default('1')
195+
.asInt(),
193196

194197
/** ****************************************************************
195198
* Common db options *

packages/cubejs-server-core/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
"lru-cache": "^5.1.1",
4949
"moment": "^2.29.1",
5050
"node-fetch": "^2.6.0",
51+
"p-limit": "^3.1.0",
5152
"promise-timeout": "^1.3.0",
5253
"ramda": "^0.27.0",
5354
"semver": "^6.3.0",

packages/cubejs-server-core/src/core/OptsHandler.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,7 @@ export class OptsHandler {
464464
dashboardAppPort: 3000,
465465
scheduledRefreshConcurrency:
466466
parseInt(process.env.CUBEJS_SCHEDULED_REFRESH_CONCURRENCY, 10),
467+
scheduledRefreshBatchSize: getEnv('scheduledRefreshBatchSize'),
467468
preAggregationsSchema:
468469
getEnv('preAggregationsSchema') ||
469470
(this.isDevMode()

packages/cubejs-server-core/src/core/optionsValidate.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ const schemaOptions = Joi.object().keys({
8484
scheduledRefreshTimeZones: Joi.array().items(Joi.string()),
8585
scheduledRefreshContexts: Joi.func(),
8686
scheduledRefreshConcurrency: Joi.number().min(1).integer(),
87+
scheduledRefreshBatchSize: Joi.number().min(1).integer(),
8788
// Compiler cache
8889
compilerCacheSize: Joi.number().min(0).integer(),
8990
updateCompilerCacheKeepAlive: Joi.boolean(),

packages/cubejs-server-core/src/core/server.ts

Lines changed: 65 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import crypto from 'crypto';
33
import fs from 'fs-extra';
44
import LRUCache from 'lru-cache';
55
import isDocker from 'is-docker';
6+
import pLimit from 'p-limit';
67

78
import { ApiGateway, UserBackgroundContext } from '@cubejs-backend/api-gateway';
89
import {
@@ -11,7 +12,7 @@ import {
1112
getEnv, assertDataSource, getRealType, internalExceptions, track,
1213
} from '@cubejs-backend/shared';
1314

14-
import type { Application as ExpressApplication } from 'express';
15+
import type { Application as ExpressApplication, NextFunction } from 'express';
1516

1617
import { BaseDriver, DriverFactoryByDataSource } from '@cubejs-backend/query-orchestrator';
1718
import { FileRepository, SchemaFileRepository } from './FileRepository';
@@ -47,7 +48,7 @@ import type {
4748
LoggerFn,
4849
DriverConfig,
4950
} from './types';
50-
import { ContextToOrchestratorIdFn } from './types';
51+
import { ContextToOrchestratorIdFn, ContextAcceptanceResult, ContextAcceptanceResultHttp, ContextAcceptanceResultWs, ContextAcceptor } from './types';
5152

5253
const { version } = require('../../../package.json');
5354

@@ -59,6 +60,20 @@ function wrapToFnIfNeeded<T, R>(possibleFn: T | ((a: R) => T)): (a: R) => T {
5960
return () => possibleFn;
6061
}
6162

63+
class AcceptAllAcceptor {
64+
public shouldAccept(): ContextAcceptanceResult {
65+
return { accepted: true };
66+
}
67+
68+
public shouldAcceptHttp(): ContextAcceptanceResultHttp {
69+
return { accepted: true };
70+
}
71+
72+
public shouldAcceptWs(): ContextAcceptanceResultWs {
73+
return { accepted: true };
74+
}
75+
}
76+
6277
export class CubejsServerCore {
6378
/**
6479
* Returns core version based on package.json.
@@ -67,13 +82,6 @@ export class CubejsServerCore {
6782
return version;
6883
}
6984

70-
/**
71-
* Create an instance of the core.
72-
*/
73-
public static create(options?: CreateOptions, systemOptions?: SystemOptions) {
74-
return new CubejsServerCore(options, systemOptions);
75-
}
76-
7785
/**
7886
* Resolve driver module name by db type.
7987
*/
@@ -142,6 +150,8 @@ export class CubejsServerCore {
142150

143151
public coreServerVersion: string | null = null;
144152

153+
private contextAcceptor: ContextAcceptor;
154+
145155
/**
146156
* Class constructor.
147157
*/
@@ -179,6 +189,8 @@ export class CubejsServerCore {
179189
this.standalone = false;
180190
}
181191

192+
this.contextAcceptor = this.createContextAcceptor();
193+
182194
if (this.options.contextToDataSourceId) {
183195
throw new Error('contextToDataSourceId has been deprecated and removed. Use contextToOrchestratorId instead.');
184196
}
@@ -309,6 +321,10 @@ export class CubejsServerCore {
309321
}
310322
}
311323

324+
protected createContextAcceptor(): ContextAcceptor {
325+
return new AcceptAllAcceptor();
326+
}
327+
312328
/**
313329
* Determines whether current instance is ready to process queries.
314330
*/
@@ -419,7 +435,7 @@ export class CubejsServerCore {
419435
return this.apiGatewayInstance;
420436
}
421437

422-
return this.apiGatewayInstance = new ApiGateway(
438+
return (this.apiGatewayInstance = new ApiGateway(
423439
this.options.apiSecret,
424440
this.getCompilerApi.bind(this),
425441
this.getOrchestratorApi.bind(this),
@@ -429,17 +445,34 @@ export class CubejsServerCore {
429445
dataSourceStorage: this.orchestratorStorage,
430446
basePath: this.options.basePath,
431447
checkAuthMiddleware: this.options.checkAuthMiddleware,
448+
contextRejectionMiddleware: this.contextRejectionMiddleware.bind(this),
449+
wsContextAcceptor: this.contextAcceptor.shouldAcceptWs,
432450
checkAuth: this.options.checkAuth,
433-
queryRewrite: this.options.queryRewrite || this.options.queryTransformer,
451+
queryRewrite:
452+
this.options.queryRewrite || this.options.queryTransformer,
434453
extendContext: this.options.extendContext,
435454
playgroundAuthSecret: getEnv('playgroundAuthSecret'),
436455
jwt: this.options.jwt,
437456
refreshScheduler: () => new RefreshScheduler(this),
438457
scheduledRefreshContexts: this.options.scheduledRefreshContexts,
439458
scheduledRefreshTimeZones: this.options.scheduledRefreshTimeZones,
440-
serverCoreVersion: this.coreServerVersion
459+
serverCoreVersion: this.coreServerVersion,
441460
}
442-
);
461+
));
462+
}
463+
464+
protected async contextRejectionMiddleware(req, res, next) {
465+
if (!this.standalone) {
466+
const result = this.contextAcceptor.shouldAcceptHttp(req.context);
467+
if (!result.accepted) {
468+
res.writeHead(result.rejectStatusCode!, result.rejectHeaders!);
469+
res.send();
470+
return;
471+
}
472+
}
473+
if (next) {
474+
next();
475+
}
443476
}
444477

445478
public getCompilerApi(context: RequestContext) {
@@ -648,22 +681,33 @@ export class CubejsServerCore {
648681
* @internal Please dont use this method directly, use refreshTimer
649682
*/
650683
public handleScheduledRefreshInterval = async (options) => {
651-
const contexts = await this.options.scheduledRefreshContexts();
684+
const contexts = (await this.options.scheduledRefreshContexts()).filter(
685+
(context) => this.contextAcceptor.shouldAccept(this.migrateBackgroundContext(context)).accepted
686+
);
652687
if (contexts.length < 1) {
653688
this.logger('Refresh Scheduler Error', {
654689
error: 'At least one context should be returned by scheduledRefreshContexts'
655690
});
656691
}
657692

658-
return Promise.all(contexts.map(async context => {
659-
const queryingOptions: any = { ...options, concurrency: this.options.scheduledRefreshConcurrency };
693+
const batchLimit = pLimit(this.options.scheduledRefreshBatchSize);
694+
return Promise.all(
695+
contexts
696+
.map((context) => async () => {
697+
const queryingOptions: any = {
698+
...options,
699+
concurrency: this.options.scheduledRefreshConcurrency,
700+
};
660701

661-
if (this.options.scheduledRefreshTimeZones) {
662-
queryingOptions.timezones = this.options.scheduledRefreshTimeZones;
663-
}
702+
if (this.options.scheduledRefreshTimeZones) {
703+
queryingOptions.timezones = this.options.scheduledRefreshTimeZones;
704+
}
664705

665-
return this.runScheduledRefresh(context, queryingOptions);
666-
}));
706+
return this.runScheduledRefresh(context, queryingOptions);
707+
})
708+
// Limit the number of refresh contexts we process per iteration
709+
.map(batchLimit)
710+
);
667711
};
668712

669713
protected getRefreshScheduler() {

packages/cubejs-server-core/src/core/types.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ export interface CreateOptions {
187187
scheduledRefreshTimeZones?: string[];
188188
scheduledRefreshContexts?: () => Promise<UserBackgroundContext[]>;
189189
scheduledRefreshConcurrency?: number;
190+
scheduledRefreshBatchSize?: number;
190191
compilerCacheSize?: number;
191192
maxCompilerCacheKeepAlive?: number;
192193
updateCompilerCacheKeepAlive?: boolean;
@@ -228,3 +229,23 @@ export type ServerCoreInitializedOptions = Required<
228229
export type SystemOptions = {
229230
isCubeConfigEmpty: boolean;
230231
};
232+
233+
// Types to support the ContextAcceptance mechanism
234+
export type ContextAcceptanceResult = {
235+
accepted: boolean;
236+
};
237+
238+
export type ContextAcceptanceResultHttp = ContextAcceptanceResult & {
239+
rejectHeaders?: { [key: string]: string };
240+
rejectStatusCode?: number;
241+
};
242+
243+
export type ContextAcceptanceResultWs = ContextAcceptanceResult & {
244+
rejectMessage?: any;
245+
};
246+
247+
export interface ContextAcceptor {
248+
shouldAccept(context: RequestContext | null): ContextAcceptanceResult;
249+
shouldAcceptHttp(context: RequestContext | null): ContextAcceptanceResultHttp;
250+
shouldAcceptWs(context: RequestContext | null): ContextAcceptanceResultWs;
251+
}

0 commit comments

Comments
 (0)