@@ -400,12 +400,14 @@ describe("Connection V2", () => {
400400 setupMockServer ( server ) ;
401401 server . use (
402402 rest . post (
403- `https://some_system_engine.com/${ QUERY_URL } ?output_format=JSON_Compact ` ,
403+ `https://some_system_engine.com/${ QUERY_URL } ` ,
404404 async ( req , res , ctx ) => {
405405 const body = await req . text ( ) ;
406+ const urlParams = Object . fromEntries ( req . url . searchParams . entries ( ) ) ;
406407 if (
407408 body . includes ( "fb_GetAsyncStatus" ) &&
408- body . includes ( "async_query_token" )
409+ body . includes ( "async_query_token" ) &&
410+ urlParams [ "output_format" ] === "JSON_Compact"
409411 ) {
410412 return res (
411413 ctx . json ( {
@@ -445,4 +447,226 @@ describe("Connection V2", () => {
445447 await new Promise ( resolve => setTimeout ( resolve , 100 ) ) ; // somehow we need it to wait for the flag switch
446448 expect ( cancelQueryExecuted ) . toBe ( true ) ;
447449 } ) ;
450+
451+ it ( "streaming works as expected" , async ( ) => {
452+ const firebolt = Firebolt ( {
453+ apiEndpoint
454+ } ) ;
455+ const jsonLines = [
456+ JSON . stringify ( {
457+ message_type : "START" ,
458+ result_columns : [
459+ {
460+ name : "?column?" ,
461+ type : "integer"
462+ }
463+ ]
464+ } ) ,
465+ JSON . stringify ( {
466+ message_type : "DATA" ,
467+ data : [ [ 1 ] , [ 1 ] ]
468+ } ) ,
469+ JSON . stringify ( {
470+ message_type : "FINISH_SUCCESSFULLY"
471+ } )
472+ ] . join ( "\n" ) ;
473+
474+ setupMockServer ( server ) ;
475+ server . use (
476+ rest . post (
477+ `https://some_system_engine.com/${ QUERY_URL } ` ,
478+ async ( req , res , ctx ) => {
479+ const body = await req . text ( ) ;
480+ const urlParams = Object . fromEntries ( req . url . searchParams . entries ( ) ) ;
481+ if (
482+ body . includes ( "select" ) &&
483+ body . includes ( "generate_series" ) &&
484+ urlParams [ "output_format" ] === "JSONLines_Compact"
485+ ) {
486+ return res ( ctx . body ( jsonLines ) ) ;
487+ }
488+ }
489+ )
490+ ) ;
491+
492+ const connectionParams : ConnectionOptions = {
493+ auth : {
494+ client_id : "dummy" ,
495+ client_secret : "dummy"
496+ } ,
497+ account : "my_account"
498+ } ;
499+
500+ const connection = await firebolt . connect ( connectionParams ) ;
501+ const streamStatement = await connection . executeStream (
502+ "select 1 from generate_series(1, 2))"
503+ ) ;
504+ let rowCount = 0 ;
505+ const { data } = await streamStatement . streamResult ( ) ;
506+ data
507+ . on ( "meta" , meta => {
508+ expect ( meta ) . toEqual ( [
509+ {
510+ name : "?column?" ,
511+ type : "integer"
512+ }
513+ ] ) ;
514+ } )
515+ . on ( "data" , row => {
516+ expect ( row ) . toEqual ( [ 1 ] ) ;
517+ rowCount ++ ;
518+ } )
519+ . on ( "end" , ( ) => {
520+ expect ( rowCount ) . toBe ( 2 ) ;
521+ } ) ;
522+ } ) ;
523+
524+ it ( "streaming with normalization works as expected" , async ( ) => {
525+ const firebolt = Firebolt ( {
526+ apiEndpoint
527+ } ) ;
528+ const jsonLines = [
529+ JSON . stringify ( {
530+ message_type : "START" ,
531+ result_columns : [
532+ {
533+ name : "?column?" ,
534+ type : "integer"
535+ }
536+ ]
537+ } ) ,
538+ JSON . stringify ( {
539+ message_type : "DATA" ,
540+ data : [ [ 1 ] , [ 1 ] ]
541+ } ) ,
542+ JSON . stringify ( {
543+ message_type : "FINISH_SUCCESSFULLY"
544+ } )
545+ ] . join ( "\n" ) ;
546+
547+ setupMockServer ( server ) ;
548+ server . use (
549+ rest . post (
550+ `https://some_system_engine.com/${ QUERY_URL } ` ,
551+ async ( req , res , ctx ) => {
552+ const body = await req . text ( ) ;
553+ const urlParams = Object . fromEntries ( req . url . searchParams . entries ( ) ) ;
554+ if (
555+ body . includes ( "select" ) &&
556+ body . includes ( "generate_series" ) &&
557+ urlParams [ "output_format" ] === "JSONLines_Compact"
558+ ) {
559+ return res ( ctx . body ( jsonLines ) ) ;
560+ }
561+ }
562+ )
563+ ) ;
564+
565+ const connectionParams : ConnectionOptions = {
566+ auth : {
567+ client_id : "dummy" ,
568+ client_secret : "dummy"
569+ } ,
570+ account : "my_account"
571+ } ;
572+
573+ const connection = await firebolt . connect ( connectionParams ) ;
574+ const streamStatement = await connection . executeStream (
575+ "select 1 from generate_series(1, 2))" ,
576+ {
577+ response : {
578+ normalizeData : true
579+ }
580+ }
581+ ) ;
582+ let rowCount = 0 ;
583+ const { data } = await streamStatement . streamResult ( ) ;
584+ data
585+ . on ( "meta" , meta => {
586+ expect ( meta ) . toEqual ( [
587+ {
588+ name : "?column?" ,
589+ type : "integer"
590+ }
591+ ] ) ;
592+ } )
593+ . on ( "data" , row => {
594+ expect ( row ) . toEqual ( { "?column?" : 1 } ) ;
595+ rowCount ++ ;
596+ } )
597+ . on ( "end" , ( ) => {
598+ expect ( rowCount ) . toBe ( 2 ) ;
599+ } ) ;
600+ } ) ;
601+
602+ it ( "streaming fails with error" , async ( ) => {
603+ const firebolt = Firebolt ( {
604+ apiEndpoint
605+ } ) ;
606+ const jsonLines = [
607+ JSON . stringify ( {
608+ message_type : "START" ,
609+ result_columns : [
610+ {
611+ name : "?column?" ,
612+ type : "integer"
613+ }
614+ ]
615+ } ) ,
616+ JSON . stringify ( {
617+ message_type : "FINISH_WITH_ERROR"
618+ } )
619+ ] . join ( "\n" ) ;
620+
621+ setupMockServer ( server ) ;
622+ server . use (
623+ rest . post (
624+ `https://some_system_engine.com/${ QUERY_URL } ` ,
625+ async ( req , res , ctx ) => {
626+ const body = await req . text ( ) ;
627+ const urlParams = Object . fromEntries ( req . url . searchParams . entries ( ) ) ;
628+ if (
629+ body . includes ( "select" ) &&
630+ body . includes ( "generate_series" ) &&
631+ urlParams [ "output_format" ] === "JSONLines_Compact"
632+ ) {
633+ return res ( ctx . body ( jsonLines ) ) ;
634+ }
635+ }
636+ )
637+ ) ;
638+
639+ const connectionParams : ConnectionOptions = {
640+ auth : {
641+ client_id : "dummy" ,
642+ client_secret : "dummy"
643+ } ,
644+ account : "my_account"
645+ } ;
646+
647+ const connection = await firebolt . connect ( connectionParams ) ;
648+ const streamStatement = await connection . executeStream (
649+ "select 1 from generate_series(1, 2))"
650+ ) ;
651+ const { data } = await streamStatement . streamResult ( ) ;
652+ data
653+ . on ( "meta" , meta => {
654+ expect ( meta ) . toEqual ( [
655+ {
656+ name : "?column?" ,
657+ type : "integer"
658+ }
659+ ] ) ;
660+ } )
661+ . on ( "data" , row => {
662+ fail ( '"Data should not be emitted"' ) ;
663+ } )
664+ . on ( "error" , error => {
665+ expect ( error ) . toEqual (
666+ new Error (
667+ 'Result encountered an error: {"message_type":"FINISH_WITH_ERROR"}'
668+ )
669+ ) ;
670+ } ) ;
671+ } ) ;
448672} ) ;
0 commit comments