Skip to content

Commit 423aadc

Browse files
authored
feat(client-core): introduce cubesql method (#9884)
* feat(client-core): introduce cubesql method * error propagation * cleanup * wip * streaming method * cleanup
1 parent 3c6fc89 commit 423aadc

File tree

3 files changed

+374
-17
lines changed

3 files changed

+374
-17
lines changed

packages/cubejs-client-core/src/HttpTransport.ts

Lines changed: 157 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import fetch from 'cross-fetch';
22
import 'url-search-params-polyfill';
3+
import { responseChunks } from './streaming';
34

45
export interface ErrorResponse {
56
error: string;
@@ -31,13 +32,38 @@ export type TransportOptions = {
3132
};
3233

3334
export interface ITransportResponse<R> {
34-
subscribe: <CBResult>(cb: (result: R | ErrorResponse, resubscribe: () => Promise<CBResult>) => CBResult) => Promise<CBResult>;
35+
subscribe: <CBResult>(
36+
cb: (
37+
result: R | ErrorResponse,
38+
resubscribe: () => Promise<CBResult>
39+
) => CBResult
40+
) => Promise<CBResult>;
3541
// Optional, supported in WebSocketTransport
3642
unsubscribe?: () => Promise<void>;
3743
}
3844

45+
export interface ITransportStreamResponse {
46+
stream(): Promise<AsyncIterable<Uint8Array>>;
47+
unsubscribe?: () => Promise<void>;
48+
}
49+
50+
export interface ITransportStreamParams<T extends Record<string, unknown> = Record<string, unknown>> {
51+
method?: 'GET' | 'POST' | 'PUT' | 'PATCH';
52+
fetchTimeout?: number;
53+
baseRequestId?: string;
54+
signal?: AbortSignal;
55+
params?: T;
56+
}
57+
3958
export interface ITransport<R> {
40-
request(method: string, params: Record<string, unknown>): ITransportResponse<R>;
59+
request(
60+
method: string,
61+
params: Record<string, unknown>
62+
): ITransportResponse<R>;
63+
requestStream?<T extends Record<string, unknown> = Record<string, unknown>>(
64+
method: string,
65+
params: ITransportStreamParams<T>
66+
): ITransportStreamResponse;
4167
authorization: TransportOptions['authorization'];
4268
}
4369

@@ -59,7 +85,17 @@ export class HttpTransport implements ITransport<Response> {
5985

6086
private readonly signal: AbortSignal | undefined;
6187

62-
public constructor({ authorization, apiUrl, method, headers = {}, credentials, fetchTimeout, signal }: Omit<TransportOptions, 'headers'> & { headers?: TransportOptions['headers'] }) {
88+
public constructor({
89+
authorization,
90+
apiUrl,
91+
method,
92+
headers = {},
93+
credentials,
94+
fetchTimeout,
95+
signal,
96+
}: Omit<TransportOptions, 'headers'> & {
97+
headers?: TransportOptions['headers'];
98+
}) {
6399
this.authorization = authorization;
64100
this.apiUrl = apiUrl;
65101
this.method = method;
@@ -69,34 +105,55 @@ export class HttpTransport implements ITransport<Response> {
69105
this.signal = signal;
70106
}
71107

72-
public request(method: string, { baseRequestId, signal, ...params }: any): ITransportResponse<Response> {
108+
public request(
109+
apiMethod: string,
110+
{ method, fetchTimeout, baseRequestId, signal, ...params }: any
111+
): ITransportResponse<Response> {
73112
let spanCounter = 1;
74113
const searchParams = new URLSearchParams(
75-
params && Object.keys(params)
76-
.map(k => ({ [k]: typeof params[k] === 'object' ? JSON.stringify(params[k]) : params[k] }))
77-
.reduce((a, b) => ({ ...a, ...b }), {})
114+
params &&
115+
Object.keys(params)
116+
.map((k) => ({
117+
[k]:
118+
typeof params[k] === 'object'
119+
? JSON.stringify(params[k])
120+
: params[k],
121+
}))
122+
.reduce((a, b) => ({ ...a, ...b }), {})
78123
);
79124

80-
let url = `${this.apiUrl}/${method}${searchParams.toString().length ? `?${searchParams}` : ''}`;
125+
let url = `${this.apiUrl}/${apiMethod}${
126+
searchParams.toString().length ? `?${searchParams}` : ''
127+
}`;
81128

82-
const requestMethod = this.method || (url.length < 2000 ? 'GET' : 'POST');
129+
const requestMethod =
130+
method ?? this.method ?? (url.length < 2000 ? 'GET' : 'POST');
83131
if (requestMethod === 'POST') {
84-
url = `${this.apiUrl}/${method}`;
132+
url = `${this.apiUrl}/${apiMethod}`;
85133
this.headers['Content-Type'] = 'application/json';
86134
}
87135

136+
const effectiveFetchTimeout = fetchTimeout ?? this.fetchTimeout;
137+
const actualSignal =
138+
signal ||
139+
this.signal ||
140+
(effectiveFetchTimeout
141+
? AbortSignal.timeout(effectiveFetchTimeout)
142+
: undefined);
143+
88144
// Currently, all methods make GET requests. If a method makes a request with a body payload,
89145
// remember to add {'Content-Type': 'application/json'} to the header.
90146
const runRequest = () => fetch(url, {
91147
method: requestMethod,
92148
headers: {
93149
Authorization: this.authorization,
94-
'x-request-id': baseRequestId && `${baseRequestId}-span-${spanCounter++}`,
95-
...this.headers
150+
'x-request-id':
151+
baseRequestId && `${baseRequestId}-span-${spanCounter++}`,
152+
...this.headers,
96153
} as HeadersInit,
97154
credentials: this.credentials,
98155
body: requestMethod === 'POST' ? JSON.stringify(params) : null,
99-
signal: signal || this.signal || (this.fetchTimeout ? AbortSignal.timeout(this.fetchTimeout) : undefined),
156+
signal: actualSignal,
100157
});
101158

102159
return {
@@ -105,11 +162,95 @@ export class HttpTransport implements ITransport<Response> {
105162
try {
106163
const result = await runRequest();
107164
return callback(result, () => this.subscribe(callback));
108-
} catch (e) {
109-
const result: ErrorResponse = { error: 'network Error' };
165+
} catch (e: any) {
166+
let errorMessage = 'network Error';
167+
168+
if (e.name === 'AbortError') {
169+
if (
170+
actualSignal?.reason === 'TimeoutError' ||
171+
actualSignal?.reason?.name === 'TimeoutError'
172+
) {
173+
errorMessage = 'timeout';
174+
} else {
175+
errorMessage = 'aborted';
176+
}
177+
}
178+
179+
const result: ErrorResponse = { error: errorMessage };
110180
return callback(result, () => this.subscribe(callback));
111181
}
112-
}
182+
},
183+
};
184+
}
185+
186+
public requestStream<T extends Record<string, unknown> = Record<string, unknown>>(
187+
apiMethod: string,
188+
{ method, fetchTimeout, baseRequestId, signal, params }: ITransportStreamParams<T>
189+
): ITransportStreamResponse {
190+
const processedParams: Record<string, string> = {};
191+
192+
// Handle the generic params object
193+
if (params) {
194+
Object.keys(params).forEach((k) => {
195+
const value = params[k];
196+
if (value !== undefined) {
197+
processedParams[k] = typeof value === 'object' ? JSON.stringify(value) : String(value);
198+
}
199+
});
200+
}
201+
202+
const searchParams = new URLSearchParams(processedParams);
203+
204+
let url = `${this.apiUrl}/${apiMethod}${
205+
searchParams.toString().length ? `?${searchParams}` : ''
206+
}`;
207+
208+
const requestMethod = method ?? this.method ?? 'POST';
209+
if (requestMethod === 'POST') {
210+
url = `${this.apiUrl}/${apiMethod}`;
211+
this.headers['Content-Type'] = 'application/json';
212+
}
213+
214+
const effectiveFetchTimeout = fetchTimeout ?? this.fetchTimeout;
215+
216+
let controller: AbortController | undefined;
217+
let actualSignal: AbortSignal | undefined = signal || this.signal;
218+
219+
if (!actualSignal && effectiveFetchTimeout) {
220+
controller = new AbortController();
221+
actualSignal = controller.signal;
222+
setTimeout(() => controller?.abort(), effectiveFetchTimeout);
223+
}
224+
225+
return {
226+
stream: async () => {
227+
const response = await fetch(url, {
228+
method: requestMethod,
229+
headers: {
230+
Authorization: this.authorization,
231+
'x-request-id': baseRequestId || 'stream-request',
232+
...this.headers,
233+
} as HeadersInit,
234+
credentials: this.credentials,
235+
body: requestMethod === 'POST' ? JSON.stringify(params || {}) : null,
236+
signal: actualSignal,
237+
});
238+
239+
if (!response.ok) {
240+
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
241+
}
242+
243+
if (!response.body) {
244+
throw new Error('No response body available for streaming');
245+
}
246+
247+
return responseChunks(response);
248+
},
249+
unsubscribe: async () => {
250+
if (controller) {
251+
controller.abort();
252+
}
253+
},
113254
};
114255
}
115256
}

0 commit comments

Comments
 (0)