66} from "../types" ;
77import { Statement } from "../statement" ;
88import { generateUserAgent } from "../common/util" ;
9+ import { ConnectionError } from "../common/errors" ;
910
1011const defaultQuerySettings = {
1112 output_format : OutputFormat . COMPACT
@@ -15,19 +16,33 @@ const defaultResponseSettings = {
1516 normalizeData : false
1617} ;
1718
19+ interface AccountInfo {
20+ id : string ;
21+ infraVersion : number ;
22+ }
23+
1824const updateParametersHeader = "Firebolt-Update-Parameters" ;
1925const allowedUpdateParameters = [ "database" ] ;
26+ const updateEndpointHeader = "Firebolt-Update-Endpoint" ;
27+ const resetSessionHeader = "Firebolt-Reset-Session" ;
28+ const immutableParameters = [ "database" , "account_id" , "output_format" ] ;
2029
2130export abstract class Connection {
2231 protected context : Context ;
2332 protected options : ConnectionOptions ;
2433 protected userAgent : string ;
34+ protected parameters : Record < string , string > ;
35+ protected accountInfo : AccountInfo | undefined ;
2536 engineEndpoint ! : string ;
2637 activeRequests = new Set < { abort : ( ) => void } > ( ) ;
2738
2839 constructor ( context : Context , options : ConnectionOptions ) {
2940 this . context = context ;
3041 this . options = options ;
42+ this . parameters = {
43+ ...( options . database ? { database : options . database } : { } ) ,
44+ ...defaultQuerySettings
45+ } ;
3146 this . userAgent = generateUserAgent (
3247 options . additionalParameters ?. userClients ,
3348 options . additionalParameters ?. userDrivers
@@ -60,31 +75,82 @@ export abstract class Connection {
6075 executeQueryOptions : ExecuteQueryOptions
6176 ) : Record < string , string | undefined > {
6277 const { settings } = executeQueryOptions ;
63- const { database } = this . options ;
64- return { database, ...settings } ;
78+
79+ // convert all settings values to string
80+ const strSettings = Object . entries ( settings ?? { } ) . reduce <
81+ Record < string , string >
82+ > ( ( acc , [ key , value ] ) => {
83+ if ( value !== undefined ) {
84+ acc [ key ] = value . toString ( ) ;
85+ }
86+ return acc ;
87+ } , { } ) ;
88+
89+ return { ...this . parameters , ...strSettings } ;
90+ }
91+
92+ private handleUpdateParametersHeader ( headerValue : string ) {
93+ const updateParameters = headerValue
94+ . split ( "," )
95+ . reduce ( ( acc : Record < string , string > , param ) => {
96+ const [ key , value ] = param . split ( "=" ) ;
97+ if ( allowedUpdateParameters . includes ( key ) ) {
98+ acc [ key ] = value . trim ( ) ;
99+ }
100+ return acc ;
101+ } , { } ) ;
102+ this . parameters = {
103+ ...this . parameters ,
104+ ...updateParameters
105+ } ;
106+ }
107+
108+ private handleResetSessionHeader ( ) {
109+ const remainingParameters : Record < string , string > = { } ;
110+ for ( const key in this . parameters ) {
111+ if ( immutableParameters . includes ( key ) ) {
112+ remainingParameters [ key ] = this . parameters [ key ] ;
113+ }
114+ }
115+ this . parameters = remainingParameters ;
65116 }
66117
67- private processHeaders ( headers : Headers ) {
118+ private async handleUpdateEndpointHeader ( headerValue : string ) : Promise < void > {
119+ const url = new URL (
120+ headerValue . startsWith ( "http" ) ? headerValue : `https://${ headerValue } `
121+ ) ;
122+ const newParams = Object . fromEntries ( url . searchParams . entries ( ) ) ;
123+
124+ // Validate account_id if present
125+ const currentAccountId =
126+ this . accountInfo ?. id ?? ( await this . resolveAccountId ( ) ) ;
127+ if ( newParams . account_id && currentAccountId !== newParams . account_id ) {
128+ throw new ConnectionError ( {
129+ message : `Failed to execute USE ENGINE command. Account parameter mismatch. Contact support.`
130+ } ) ;
131+ }
132+
133+ // Remove url parameters and update engineEndpoint
134+ this . engineEndpoint = url . toString ( ) . replace ( url . search , "" ) ;
135+ this . parameters = {
136+ ...this . parameters ,
137+ ...newParams
138+ } ;
139+ }
140+
141+ private async processHeaders ( headers : Headers ) {
68142 const updateHeaderValue = headers . get ( updateParametersHeader ) ;
69143 if ( updateHeaderValue ) {
70- const updateParameters = updateHeaderValue
71- . split ( "," )
72- . reduce ( ( acc : Record < string , string > , param ) => {
73- const [ key , value ] = param . split ( "=" ) ;
74- if ( allowedUpdateParameters . includes ( key ) ) {
75- acc [ key ] = value . trim ( ) ;
76- }
77- return acc ;
78- } , { } ) ;
79-
80- if ( updateParameters . database ) {
81- this . options . database = updateParameters . database ;
82- delete updateParameters . database ;
83- }
84- this . options . additionalParameters = {
85- ...this . options . additionalParameters ,
86- ...updateParameters
87- } ;
144+ this . handleUpdateParametersHeader ( updateHeaderValue ) ;
145+ }
146+
147+ if ( headers . has ( resetSessionHeader ) ) {
148+ this . handleResetSessionHeader ( ) ;
149+ }
150+
151+ const updateEndpointValue = headers . get ( updateEndpointHeader ) ;
152+ if ( updateEndpointValue ) {
153+ await this . handleUpdateEndpointHeader ( updateEndpointValue ) ;
88154 }
89155 }
90156
@@ -94,11 +160,6 @@ export abstract class Connection {
94160 ) : Promise < Statement > {
95161 const { httpClient, queryFormatter } = this . context ;
96162
97- executeQueryOptions . settings = {
98- ...defaultQuerySettings ,
99- ...( executeQueryOptions . settings ?? { } )
100- } ;
101-
102163 executeQueryOptions . response = {
103164 ...defaultResponseSettings ,
104165 ...( executeQueryOptions . response ?? { } )
@@ -124,7 +185,7 @@ export abstract class Connection {
124185
125186 try {
126187 const response = await request . ready ( ) ;
127- this . processHeaders ( response . headers ) ;
188+ await this . processHeaders ( response . headers ) ;
128189 const statement = new Statement ( this . context , {
129190 query : formattedQuery ,
130191 request,
0 commit comments