diff --git a/packages/cubejs-client-core/src/HttpTransport.ts b/packages/cubejs-client-core/src/HttpTransport.ts index decb7bdfd7731..3814076d6db51 100644 --- a/packages/cubejs-client-core/src/HttpTransport.ts +++ b/packages/cubejs-client-core/src/HttpTransport.ts @@ -1,5 +1,6 @@ import fetch from 'cross-fetch'; import 'url-search-params-polyfill'; +import { responseChunks } from './streaming'; export interface ErrorResponse { error: string; @@ -31,13 +32,38 @@ export type TransportOptions = { }; export interface ITransportResponse { - subscribe: (cb: (result: R | ErrorResponse, resubscribe: () => Promise) => CBResult) => Promise; + subscribe: ( + cb: ( + result: R | ErrorResponse, + resubscribe: () => Promise + ) => CBResult + ) => Promise; // Optional, supported in WebSocketTransport unsubscribe?: () => Promise; } +export interface ITransportStreamResponse { + stream(): Promise>; + unsubscribe?: () => Promise; +} + +export interface ITransportStreamParams = Record> { + method?: 'GET' | 'POST' | 'PUT' | 'PATCH'; + fetchTimeout?: number; + baseRequestId?: string; + signal?: AbortSignal; + params?: T; +} + export interface ITransport { - request(method: string, params: Record): ITransportResponse; + request( + method: string, + params: Record + ): ITransportResponse; + requestStream? = Record>( + method: string, + params: ITransportStreamParams + ): ITransportStreamResponse; authorization: TransportOptions['authorization']; } @@ -59,7 +85,17 @@ export class HttpTransport implements ITransport { private readonly signal: AbortSignal | undefined; - public constructor({ authorization, apiUrl, method, headers = {}, credentials, fetchTimeout, signal }: Omit & { headers?: TransportOptions['headers'] }) { + public constructor({ + authorization, + apiUrl, + method, + headers = {}, + credentials, + fetchTimeout, + signal, + }: Omit & { + headers?: TransportOptions['headers']; + }) { this.authorization = authorization; this.apiUrl = apiUrl; this.method = method; @@ -69,34 +105,55 @@ export class HttpTransport implements ITransport { this.signal = signal; } - public request(method: string, { baseRequestId, signal, ...params }: any): ITransportResponse { + public request( + apiMethod: string, + { method, fetchTimeout, baseRequestId, signal, ...params }: any + ): ITransportResponse { let spanCounter = 1; const searchParams = new URLSearchParams( - params && Object.keys(params) - .map(k => ({ [k]: typeof params[k] === 'object' ? JSON.stringify(params[k]) : params[k] })) - .reduce((a, b) => ({ ...a, ...b }), {}) + params && + Object.keys(params) + .map((k) => ({ + [k]: + typeof params[k] === 'object' + ? JSON.stringify(params[k]) + : params[k], + })) + .reduce((a, b) => ({ ...a, ...b }), {}) ); - let url = `${this.apiUrl}/${method}${searchParams.toString().length ? `?${searchParams}` : ''}`; + let url = `${this.apiUrl}/${apiMethod}${ + searchParams.toString().length ? `?${searchParams}` : '' + }`; - const requestMethod = this.method || (url.length < 2000 ? 'GET' : 'POST'); + const requestMethod = + method ?? this.method ?? (url.length < 2000 ? 'GET' : 'POST'); if (requestMethod === 'POST') { - url = `${this.apiUrl}/${method}`; + url = `${this.apiUrl}/${apiMethod}`; this.headers['Content-Type'] = 'application/json'; } + const effectiveFetchTimeout = fetchTimeout ?? this.fetchTimeout; + const actualSignal = + signal || + this.signal || + (effectiveFetchTimeout + ? AbortSignal.timeout(effectiveFetchTimeout) + : undefined); + // Currently, all methods make GET requests. If a method makes a request with a body payload, // remember to add {'Content-Type': 'application/json'} to the header. const runRequest = () => fetch(url, { method: requestMethod, headers: { Authorization: this.authorization, - 'x-request-id': baseRequestId && `${baseRequestId}-span-${spanCounter++}`, - ...this.headers + 'x-request-id': + baseRequestId && `${baseRequestId}-span-${spanCounter++}`, + ...this.headers, } as HeadersInit, credentials: this.credentials, body: requestMethod === 'POST' ? JSON.stringify(params) : null, - signal: signal || this.signal || (this.fetchTimeout ? AbortSignal.timeout(this.fetchTimeout) : undefined), + signal: actualSignal, }); return { @@ -105,11 +162,95 @@ export class HttpTransport implements ITransport { try { const result = await runRequest(); return callback(result, () => this.subscribe(callback)); - } catch (e) { - const result: ErrorResponse = { error: 'network Error' }; + } catch (e: any) { + let errorMessage = 'network Error'; + + if (e.name === 'AbortError') { + if ( + actualSignal?.reason === 'TimeoutError' || + actualSignal?.reason?.name === 'TimeoutError' + ) { + errorMessage = 'timeout'; + } else { + errorMessage = 'aborted'; + } + } + + const result: ErrorResponse = { error: errorMessage }; return callback(result, () => this.subscribe(callback)); } - } + }, + }; + } + + public requestStream = Record>( + apiMethod: string, + { method, fetchTimeout, baseRequestId, signal, params }: ITransportStreamParams + ): ITransportStreamResponse { + const processedParams: Record = {}; + + // Handle the generic params object + if (params) { + Object.keys(params).forEach((k) => { + const value = params[k]; + if (value !== undefined) { + processedParams[k] = typeof value === 'object' ? JSON.stringify(value) : String(value); + } + }); + } + + const searchParams = new URLSearchParams(processedParams); + + let url = `${this.apiUrl}/${apiMethod}${ + searchParams.toString().length ? `?${searchParams}` : '' + }`; + + const requestMethod = method ?? this.method ?? 'POST'; + if (requestMethod === 'POST') { + url = `${this.apiUrl}/${apiMethod}`; + this.headers['Content-Type'] = 'application/json'; + } + + const effectiveFetchTimeout = fetchTimeout ?? this.fetchTimeout; + + let controller: AbortController | undefined; + let actualSignal: AbortSignal | undefined = signal || this.signal; + + if (!actualSignal && effectiveFetchTimeout) { + controller = new AbortController(); + actualSignal = controller.signal; + setTimeout(() => controller?.abort(), effectiveFetchTimeout); + } + + return { + stream: async () => { + const response = await fetch(url, { + method: requestMethod, + headers: { + Authorization: this.authorization, + 'x-request-id': baseRequestId || 'stream-request', + ...this.headers, + } as HeadersInit, + credentials: this.credentials, + body: requestMethod === 'POST' ? JSON.stringify(params || {}) : null, + signal: actualSignal, + }); + + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } + + if (!response.body) { + throw new Error('No response body available for streaming'); + } + + return responseChunks(response); + }, + unsubscribe: async () => { + if (controller) { + controller.abort(); + } + }, }; } } diff --git a/packages/cubejs-client-core/src/index.ts b/packages/cubejs-client-core/src/index.ts index c2f32bb9cd063..3100713455c49 100644 --- a/packages/cubejs-client-core/src/index.ts +++ b/packages/cubejs-client-core/src/index.ts @@ -102,6 +102,34 @@ export type DryRunResponse = { transformedQueries: TransformedQuery[]; }; +export type CubeSqlOptions = LoadMethodOptions & { + /** + * Query timeout in milliseconds + */ + timeout?: number; +}; + +export type CubeSqlSchemaColumn = { + name: string; + columnType: string; +}; + +export type CubeSqlResult = { + schema: CubeSqlSchemaColumn[]; + data: (string | number | boolean | null)[][]; +}; + +export type CubeSqlStreamChunk = { + type: 'schema'; + schema: CubeSqlSchemaColumn[]; +} | { + type: 'data'; + data: (string | number | boolean | null)[]; +} | { + type: 'error'; + error: string; +}; + interface BodyResponse { error?: string; [key: string]: any; @@ -350,7 +378,7 @@ class CubeApi { await requestInstance.unsubscribe(); } - const error = new RequestError(body.error || '', body, response.status); + const error = new RequestError(body.error || (response as any).error || '', body, response.status); if (callback) { callback(error); } else { @@ -669,6 +697,161 @@ class CubeApi { callback ); } + + public cubeSql(sqlQuery: string, options?: CubeSqlOptions): Promise; + + public cubeSql(sqlQuery: string, options?: CubeSqlOptions, callback?: LoadMethodCallback): UnsubscribeObj; + + /** + * Execute a Cube SQL query against Cube SQL interface and return the results. + */ + public cubeSql(sqlQuery: string, options?: CubeSqlOptions, callback?: LoadMethodCallback): Promise | UnsubscribeObj { + return this.loadMethod( + () => { + const request = this.request('cubesql', { + query: sqlQuery, + method: 'POST', + signal: options?.signal, + fetchTimeout: options?.timeout + }); + + return request; + }, + (response: any) => { + // TODO: The response is sending both errors and successful results as `error` + if (!response || !response.error) { + throw new Error('Invalid response format'); + } + + // Check if this is a timeout or abort error from transport + if (response.error === 'timeout') { + const timeoutMs = options?.timeout || 5 * 60 * 1000; + throw new Error(`CubeSQL query timed out after ${timeoutMs}ms`); + } + + if (response.error === 'aborted') { + throw new Error('CubeSQL query was aborted'); + } + + const [schema, ...data] = response.error.split('\n'); + + try { + return { + schema: JSON.parse(schema).schema, + data: data + .filter((d: string) => d.trim().length) + .map((d: string) => JSON.parse(d).data) + .reduce((a: any, b: any) => a.concat(b), []), + }; + } catch (err) { + throw new Error(response.error); + } + }, + options, + callback + ); + } + + /** + * Execute a Cube SQL query against Cube SQL interface and return streaming results as an async generator. + * The server returns JSONL (JSON Lines) format with schema first, then data rows. + */ + public async* cubeSqlStream(sqlQuery: string, options?: CubeSqlOptions): AsyncGenerator { + if (!this.transport.requestStream) { + throw new Error('Transport does not support streaming'); + } + + const streamResponse = this.transport.requestStream('cubesql', { + method: 'POST', + signal: options?.signal, + fetchTimeout: options?.timeout, + baseRequestId: uuidv4(), + params: { + query: sqlQuery + } + }); + + const decoder = new TextDecoder(); + let buffer = ''; + + try { + const stream = await streamResponse.stream(); + + for await (const chunk of stream) { + buffer += decoder.decode(chunk, { stream: true }); + + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + + for (const line of lines) { + if (line.trim()) { + try { + const parsed = JSON.parse(line); + + if (parsed.schema) { + yield { + type: 'schema' as const, + schema: parsed.schema + }; + } else if (parsed.data) { + yield { + type: 'data' as const, + data: parsed.data + }; + } else if (parsed.error) { + yield { + type: 'error' as const, + error: parsed.error + }; + } + } catch (parseError) { + yield { + type: 'error' as const, + error: `Failed to parse JSON line: ${line}` + }; + } + } + } + } + + if (buffer.trim()) { + try { + const parsed = JSON.parse(buffer); + + if (parsed.schema) { + yield { + type: 'schema' as const, + schema: parsed.schema + }; + } else if (parsed.data) { + yield { + type: 'data' as const, + data: parsed.data + }; + } else if (parsed.error) { + yield { + type: 'error' as const, + error: parsed.error + }; + } + } catch (parseError) { + yield { + type: 'error' as const, + error: `Failed to parse remaining JSON: ${buffer}` + }; + } + } + } catch (error: any) { + if (error.name === 'AbortError') { + throw new Error('aborted'); + } + throw error; + } finally { + if (streamResponse.unsubscribe) { + await streamResponse.unsubscribe(); + } + } + } } export default (apiToken: string | (() => Promise), options: CubeApiOptions) => new CubeApi(apiToken, options); diff --git a/packages/cubejs-client-core/src/streaming.ts b/packages/cubejs-client-core/src/streaming.ts new file mode 100644 index 0000000000000..c63f54ff1641d --- /dev/null +++ b/packages/cubejs-client-core/src/streaming.ts @@ -0,0 +1,33 @@ +export async function* responseChunks(res: Response): AsyncIterable { + // eslint-disable-next-line prefer-destructuring + const body: any = res.body; + + if (body && typeof body.getReader === 'function') { + const reader = body.getReader(); // Browser / Node native fetch + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value) yield value; // Uint8Array + } + } finally { + reader.releaseLock?.(); + } + return; + } + + // Node.js Readable (node-fetch v2 via cross-fetch) + if (body && Symbol.asyncIterator in body) { + for await (const chunk of body as AsyncIterable) { + if (typeof chunk === 'string') { + // Convert string chunks to bytes (rare, but safe) + yield new TextEncoder().encode(chunk); + } else { + yield new Uint8Array(chunk); + } + } + return; + } + + throw new Error('Unsupported response body type for streaming'); +}