1- import stream , { TransformCallback } from "stream" ;
2- import { Firebolt } from "../../../src/index" ;
1+ import { Firebolt } from "../../../src" ;
32
43const connectionParams = {
54 auth : {
@@ -11,44 +10,68 @@ const connectionParams = {
1110 engineName : process . env . FIREBOLT_ENGINE_NAME as string
1211} ;
1312
14- jest . setTimeout ( 40000 ) ;
13+ jest . setTimeout ( 250000 ) ;
1514
1615describe ( "streams" , ( ) => {
17- it ( "stream transformters" , async ( ) => {
18- class SerializeRowStream extends stream . Transform {
19- public constructor ( ) {
20- super ( {
21- objectMode : true ,
22- transform (
23- row : any ,
24- encoding : BufferEncoding ,
25- callback : TransformCallback
26- ) {
27- const transformed = JSON . stringify ( row ) ;
28- this . push ( transformed ) ;
29- this . push ( "\n" ) ;
30- callback ( ) ;
31- }
32- } ) ;
33- }
34- }
35-
16+ it ( "check sum from stream result" , async ( ) => {
3617 const firebolt = Firebolt ( {
3718 apiEndpoint : process . env . FIREBOLT_API_ENDPOINT as string
3819 } ) ;
3920
40- const serializedStream = new SerializeRowStream ( ) ;
21+ const connection = await firebolt . connect ( connectionParams ) ;
22+
23+ const statement = await connection . executeStream (
24+ `select 1 from generate_series(1, 2500000)` //~1 GB response
25+ ) ;
26+
27+ const { data } = await statement . streamResult ( ) ;
28+ let sum = 0 ;
29+
30+ data
31+ . on ( "meta" , meta => {
32+ console . log ( "Meta:" , meta ) ;
33+ } )
34+ . on ( "data" , row => {
35+ sum += row [ 0 ] ;
36+ } ) ;
37+
38+ await new Promise ( resolve => {
39+ data . on ( "end" , ( ) => {
40+ expect ( sum ) . toEqual ( 2500000 ) ;
41+ resolve ( null ) ;
42+ } ) ;
43+ } ) ;
44+ } ) ;
45+ it ( "check normalized data" , async ( ) => {
46+ const firebolt = Firebolt ( {
47+ apiEndpoint : process . env . FIREBOLT_API_ENDPOINT as string
48+ } ) ;
4149
4250 const connection = await firebolt . connect ( connectionParams ) ;
4351
44- const statement = await connection . execute ( "select 1 union all select 2" ) ;
52+ const statement = await connection . executeStream (
53+ `select 1 from generate_series(1, 250000)` , //~1 GB response
54+ {
55+ response : {
56+ normalizeData : true
57+ }
58+ }
59+ ) ;
4560
4661 const { data } = await statement . streamResult ( ) ;
62+ let sum = 0 ;
4763
48- data . pipe ( serializedStream ) . pipe ( process . stdout ) ;
64+ data
65+ . on ( "meta" , meta => {
66+ console . log ( "Meta:" , meta ) ;
67+ } )
68+ . on ( "data" , row => {
69+ sum += row [ "?column?" ] ;
70+ } ) ;
4971
5072 await new Promise ( resolve => {
5173 data . on ( "end" , ( ) => {
74+ expect ( sum ) . toEqual ( 250000 ) ;
5275 resolve ( null ) ;
5376 } ) ;
5477 } ) ;
0 commit comments