Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ npm-debug.log*
.#*
coverage/
allure-results/
.idea
87 changes: 66 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -617,37 +617,55 @@ await database.delete();
<a id="streaming-results"></a>
### Streaming results

The recommended way to consume query results is by using streams.
Streaming can only be used with a v2 connection and only using the `executeStream` method.
The streamResult method for the normal `execute` method returns an in-memory stream of result, rather than dynamically fetching them from the response. This is further explained in the [in-memory stream](#in-memory-stream) section.

For convenience, `statement.streamResult` also returns `meta: Promise<Meta[]>` and `statistics: Promise<Statistics>`, which are wrappers over `data.on('metadata')` and `data.on('statistics')`.
The recommended way to consume query results is by using streams with standard events:
* `data.on('meta')`
* `data.on('data')`
* `data.on('end')`
* `data.on('error')`

```typescript
const firebolt = Firebolt();

const connection = await firebolt.connect(connectionParams);

const statement = await connection.execute("SELECT 1");
const statement = await connection.executeStream("SELECT 1");

const {
data,
meta: metaPromise,
statistics: statisticsPromise
} = await statement.streamResult();
const { data } = await statement.streamResult();

const rows: unknown[] = [];

const meta = await metaPromise;

for await (const row of data) {
rows.push(row);
}
const meta = await stream.once(data, "meta");

const statistics = await statisticsPromise
data.on("data", data => {;
rows.push(data);
});

console.log(meta);
console.log(statistics);
console.log(rows)
```
In case an errors occurs before streaming, or during the first packet, the error will be thrown by the executeStream method. If the error occurs during streaming, it will be emitted by the stream.
```typescript
try {
await connection.executeStream("select *1;");
} catch (error) {
//error is thrown directly since this is a syntax error
}

const statement = await connection.executeStream(
"select 1/(i-100000) as a from generate_series(1,100000) as i"
);
const { data } = await statement.streamResult();

data.on("error", error => {
//error is emitted by the stream after first chunk of results
console.log(error);
});

```


<a id="custom-stream-transformers"></a>
### Custom stream transformers
Expand Down Expand Up @@ -679,22 +697,49 @@ const serializedStream = new SerializeRowStream()

const firebolt = Firebolt();
const connection = await firebolt.connect(connectionParams);
const statement = await connection.execute("select 1 union all select 2");
const statement = await connection.executeStream("select * from generate_series(1, 1000)");

const { data } = await statement.streamResult();


data.pipe(serializedStream).pipe(process.stdout);
```

Or use `rowParser` that returns strings or Buffer:
<a id="in-memory-stream"></a>
### In-memory stream

When using the streamResult method on the object returned from a simple execute method, the driver will return
an in-memory stream of the result. This is useful for small result sets, but not recommended for large result sets.

In this case the whole result will be first fetched in memory and then made available via streamResult.
This is done for compatibility reasons and has no performance benefits compared to using fetchResult

```typescript
const { data } = await statement.streamResult({
rowParser: (row: string) => `${row}\n`
});
const firebolt = Firebolt();

const connection = await firebolt.connect(connectionParams);

const statement = await connection.execute("SELECT 1");

const {
data,
meta: metaPromise,
statistics: statisticsPromise
} = await statement.streamResult();

data.pipe(process.stdout);
const rows: unknown[] = [];

const meta = await metaPromise;

for await (const row of data) {
rows.push(row);
}

const statistics = await statisticsPromise

console.log(meta);
console.log(statistics);
console.log(rows)
```

## Development process
Expand Down
37 changes: 36 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions src/connection/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import { CompositeError } from "../common/errors";
import JSONbig from "json-bigint";
import { QueryFormatter } from "../formatter/base";
import { AsyncStatement } from "../statement/async";
import { StreamStatement } from "../statement/stream";
import { Response, Headers } from "node-fetch";

const defaultQuerySettings = {
output_format: OutputFormat.COMPACT
Expand Down Expand Up @@ -166,6 +168,11 @@ export abstract class Connection {
executeQueryOptions?: ExecuteQueryOptions
): Promise<AsyncStatement>;

abstract executeStream(
query: string,
executeQueryOptions?: ExecuteQueryOptions
): Promise<StreamStatement>;

abstract isAsyncQueryRunning(token: string): Promise<boolean>;

abstract isAsyncQuerySuccessful(token: string): Promise<boolean | undefined>;
Expand Down
10 changes: 10 additions & 0 deletions src/connection/connection_v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Connection as BaseConnection } from "./base";
import { ResourceManager } from "../service";
import { ExecuteQueryOptions } from "../types";
import { AsyncStatement } from "../statement/async";
import { StreamStatement } from "../statement/stream";

export interface AccountInfo {
id: string;
Expand Down Expand Up @@ -93,4 +94,13 @@ export class ConnectionV1 extends BaseConnection {
"Asynchronous execution is not supported in this Firebolt version."
);
}

async executeStream(
query: string,
executeQueryOptions?: ExecuteQueryOptions
): Promise<StreamStatement> {
throw new Error(
"Stream execution is not supported in this Firebolt version."
);
}
}
21 changes: 20 additions & 1 deletion src/connection/connection_v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import { ACCOUNT_SYSTEM_ENGINE, QUERY_URL } from "../common/api";

import { Connection as BaseConnection } from "./base";
import { Cache, inMemoryCache, noneCache } from "../common/tokenCache";
import { ExecuteQueryOptions } from "../types";
import { ExecuteQueryOptions, OutputFormat } from "../types";
import { AsyncStatement } from "../statement/async";
import { StreamStatement } from "../statement/stream";

export class ConnectionV2 extends BaseConnection {
private get account(): string {
Expand Down Expand Up @@ -103,6 +104,24 @@ export class ConnectionV2 extends BaseConnection {
});
}

async executeStream(
query: string,
executeQueryOptions: ExecuteQueryOptions = {}
): Promise<StreamStatement> {
const { response } = await this.prepareAndExecuteQuery(query, {
...executeQueryOptions,
settings: {
...executeQueryOptions?.settings,
output_format: OutputFormat.JSON_LINES
}
});

return new StreamStatement({
response,
executeQueryOptions
});
}

private async getAsyncQueryInfo(token: string) {
const query = `CALL fb_GetAsyncStatus('${token}')`;

Expand Down
2 changes: 0 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,5 @@ export {
QueryFormatter
} from "./formatter/base";

export { JSONParser } from "./statement/stream/parser";

export type { Connection } from "./connection";
export type { Meta } from "./meta";
8 changes: 6 additions & 2 deletions src/statement/dataTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ const typeMapping = {
decimal: "decimal",
decimal_ext: "decimal",
long: "long",
bigint: "long",
float: "float",
real: "float",
float32: "float",
float64: "double",
double: "double",
double32: "double",
double64: "double",
integer: "integer",
"double precision": "double",
integer: "int",
int: "int",
uint: "int",
int8: "int",
Expand All @@ -42,7 +45,7 @@ const getMappedType = (innerType: string) => {
) {
return typeMapping.timestamp;
}
if (innerType.match(/decimal(.+)/i)) {
if (innerType.match(/decimal(.+)/i) || innerType.match(/numeric(.+)/i)) {
return typeMapping.decimal;
}
};
Expand Down Expand Up @@ -83,6 +86,7 @@ export const STRING_TYPES = withNullableTypes(["string", "text"]);

export const BYTEA_TYPES = withNullableTypes(["bytea"]);

//todo fix nullable types FIR-45354
export const getFireboltType = (type: string): string => {
const key = type.toLowerCase();
const match = key.match(COMPLEX_TYPE);
Expand Down
30 changes: 24 additions & 6 deletions src/statement/normalizeResponse.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {
QueryResponse,
OutputFormat,
ExecuteQueryOptions,
OutputFormat,
QueryResponse,
Row
} from "../types";
import { Meta } from "../meta";
Expand Down Expand Up @@ -35,11 +35,11 @@ export const normalizeColumn = (column: { name: string; type: string }) => {
return new Meta(column);
};

const getNormalizedMeta = (response: ParsedResponse): Meta[] => {
if (!response.meta) {
export const getNormalizedMeta = (meta: any[]): Meta[] => {
if (!meta) {
return [];
}
return response.meta.map(normalizeColumn);
return meta.map(normalizeColumn);
};

export const getNormalizedStatistics = (response: ParsedResponse) => {
Expand Down Expand Up @@ -71,7 +71,7 @@ export const normalizeResponse = (

const hydrate = executeQueryOptions?.response?.hydrateRow || hydrateRow;

const meta = getNormalizedMeta(response);
const meta = getNormalizedMeta(response.meta);

const statistics = getNormalizedStatistics(response);

Expand All @@ -96,3 +96,21 @@ export const normalizeResponse = (
statistics
};
};

export const normalizeResponseRowStreaming = (
data: any[],
executeQueryOptions: ExecuteQueryOptions,
meta: Meta[]
): Row[] => {
const { response: { normalizeData = false } = {} } = executeQueryOptions;

const hydrate = executeQueryOptions?.response?.hydrateRow || hydrateRow;

return data.map((row: Row) => {
const hydratedRow = hydrate(row, meta, executeQueryOptions);
if (normalizeData) {
return normalizeRow(hydratedRow, meta, executeQueryOptions);
}
return hydratedRow;
});
};
Loading
Loading