diff --git a/README.md b/README.md index 8593ed4c..39c29ba8 100644 --- a/README.md +++ b/README.md @@ -145,6 +145,11 @@ console.log(rows) * Execute Async Query * Check Async Query Status * Cancel Async Query + * Transaction management + * Transaction methods + * Basic transaction usage + * Error handling + * Transaction isolation * Engine management * getByName * Engine @@ -472,6 +477,140 @@ const token = statement.asyncQueryToken; // can only be fetched for async query await connection.cancelAsyncQuery(token); ``` + +## Transaction management + +Firebolt's Node.js SDK supports database transactions, allowing you to group multiple operations into atomic units of work. Transactions ensure data consistency and provide the ability to rollback changes if needed. + + +### Transaction methods + +The SDK provides three main methods for transaction management: + +```typescript +await connection.begin(); // Start a new transaction +await connection.commit(); // Commit the current transaction +await connection.rollback(); // Rollback the current transaction +``` + + +### Basic transaction usage + +The following example demonstrates a basic transaction that inserts data and commits the changes: + +```typescript +import { Firebolt } from 'firebolt-sdk' + +const firebolt = Firebolt(); +const connection = await firebolt.connect({ + auth: { + client_id: process.env.FIREBOLT_CLIENT_ID, + client_secret: process.env.FIREBOLT_CLIENT_SECRET, + }, + account: process.env.FIREBOLT_ACCOUNT, + database: process.env.FIREBOLT_DATABASE, + engineName: process.env.FIREBOLT_ENGINE_NAME +}); + +// Start a transaction +await connection.begin(); + +try { + // Perform multiple operations + await connection.execute(` + INSERT INTO users (id, name, age) VALUES (1, 'Alice', 30) + `); + + await connection.execute(` + INSERT INTO users (id, name, age) VALUES (2, 'Bob', 25) + `); + + // Commit the transaction + await connection.commit(); + console.log('Transaction committed successfully'); +} catch (error) { + // Rollback on error + await connection.rollback(); + console.error('Transaction rolled back due to error:', error); +} +``` + +#### Transaction with prepared statements + +Transactions work seamlessly with prepared statements: + +```typescript +await connection.begin(); + +try { + // Use prepared statements within transactions + await connection.execute( + 'INSERT INTO users (id, name, age) VALUES (?, ?, ?)', + { parameters: [4, 'Diana', 28] } + ); + + await connection.execute( + 'UPDATE users SET age = ? WHERE id = ?', + { parameters: [29, 4] } + ); + + await connection.commit(); +} catch (error) { + await connection.rollback(); + throw error; +} +``` + + +### Error handling + +#### Transaction state errors + +The SDK will throw errors for invalid transaction operations: + +```typescript +try { + // This will throw an error if no transaction is active + await connection.commit(); +} catch (error) { + console.error('Cannot commit: no transaction in progress'); +} + +try { + await connection.begin(); + // This will throw an error if a transaction is already active + await connection.begin(); +} catch (error) { + console.error('Cannot begin: transaction already in progress'); +} +``` + + +### Transaction isolation + +Transactions in Firebolt provide isolation between concurrent operations. Changes made within a transaction are not visible to other connections until the transaction is committed: + +```typescript +// Connection 1 - Start transaction and insert data +const connection1 = await firebolt.connect(connectionOptions); +await connection1.begin(); +await connection1.execute("INSERT INTO users (id, name) VALUES (5, 'Eve')"); + +// Connection 2 - Cannot see uncommitted data +const connection2 = await firebolt.connect(connectionOptions); +const statement = await connection2.execute('SELECT COUNT(*) FROM users WHERE id = 5'); +const { data } = await statement.fetchResult(); +console.log('Count from connection 2:', data[0][0]); // Should show 0 + +// Connection 1 - Commit transaction +await connection1.commit(); + +// Connection 2 - Now can see committed data +const statement2 = await connection2.execute('SELECT COUNT(*) FROM users WHERE id = 5'); +const { data: data2 } = await statement2.fetchResult(); +console.log('Count after commit:', data2[0][0]); // Should show 1 +``` + ## Server-side prepared statement diff --git a/src/connection/base.ts b/src/connection/base.ts index 0febfcab..3f5db279 100644 --- a/src/connection/base.ts +++ b/src/connection/base.ts @@ -22,9 +22,9 @@ export const defaultResponseSettings = { }; const updateParametersHeader = "Firebolt-Update-Parameters"; -const allowedUpdateParameters = ["database"]; const updateEndpointHeader = "Firebolt-Update-Endpoint"; const resetSessionHeader = "Firebolt-Reset-Session"; +const removeParametersHeader = "Firebolt-Remove-Parameters"; const immutableParameters = ["database", "account_id", "output_format"]; const testConnectionQuery = "SELECT 1"; @@ -108,9 +108,7 @@ export abstract class Connection { .split(",") .reduce((acc: Record, param) => { const [key, value] = param.split("="); - if (allowedUpdateParameters.includes(key)) { - acc[key] = value.trim(); - } + acc[key] = value.trim(); return acc; }, {}); this.parameters = { @@ -119,6 +117,14 @@ export abstract class Connection { }; } + private handleRemoveParametersHeader(headerValue: string) { + const removeParameters = headerValue.split(","); + + removeParameters.forEach(key => { + delete this.parameters[key.trim()]; + }); + } + private handleResetSessionHeader() { const remainingParameters: Record = {}; for (const key in this.parameters) { @@ -161,6 +167,11 @@ export abstract class Connection { if (updateEndpointValue) { await this.handleUpdateEndpointHeader(updateEndpointValue); } + + const removeParametersValue = headers.get(removeParametersHeader); + if (removeParametersValue) { + this.handleRemoveParametersHeader(removeParametersValue); + } } abstract executeAsync( diff --git a/src/connection/connection_v1.ts b/src/connection/connection_v1.ts index 7a5a31e7..132adb3f 100644 --- a/src/connection/connection_v1.ts +++ b/src/connection/connection_v1.ts @@ -103,4 +103,22 @@ export class ConnectionV1 extends BaseConnection { "Stream execution is not supported in this Firebolt version." ); } + + async begin(): Promise { + throw new Error( + "Transaction management is not supported in this Firebolt version." + ); + } + + async commit(): Promise { + throw new Error( + "Transaction management is not supported in this Firebolt version." + ); + } + + async rollback(): Promise { + throw new Error( + "Transaction management is not supported in this Firebolt version." + ); + } } diff --git a/src/connection/connection_v2.ts b/src/connection/connection_v2.ts index 0e848996..e76bfa4a 100644 --- a/src/connection/connection_v2.ts +++ b/src/connection/connection_v2.ts @@ -218,4 +218,16 @@ export class ConnectionV2 extends BaseConnection { const settings = { internal: [{ auto_start_stop_control: "ignore" }] }; await this.execute("select 1", { settings }); } + + async begin(): Promise { + await this.execute("BEGIN TRANSACTION"); + } + + async commit(): Promise { + await this.execute("COMMIT"); + } + + async rollback(): Promise { + await this.execute("ROLLBACK"); + } } diff --git a/src/http/node.ts b/src/http/node.ts index a564e8b0..5897a002 100644 --- a/src/http/node.ts +++ b/src/http/node.ts @@ -28,7 +28,7 @@ const DEFAULT_ERROR = "Server error"; const DEFAULT_USER_AGENT = systemInfoString(); const PROTOCOL_VERSION_HEADER = "Firebolt-Protocol-Version"; -const PROTOCOL_VERSION = "2.3"; +const PROTOCOL_VERSION = "2.4"; const createSocket = HttpsAgent.prototype.createSocket; const agentOptions = { diff --git a/test/integration/v1/index.test.ts b/test/integration/v1/index.test.ts index 4dfa1dfd..a8f90298 100644 --- a/test/integration/v1/index.test.ts +++ b/test/integration/v1/index.test.ts @@ -209,4 +209,42 @@ describe("integration test", () => { data.pipe(process.stdout); }); + + describe("Transaction methods", () => { + it("throws error for begin transaction", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + + const connection = await firebolt.connect(connectionParams); + + await expect(connection.begin()).rejects.toThrow( + "Transaction management is not supported in this Firebolt version." + ); + }); + + it("throws error for commit", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + + const connection = await firebolt.connect(connectionParams); + + await expect(connection.commit()).rejects.toThrow( + "Transaction management is not supported in this Firebolt version." + ); + }); + + it("throws error for rollback", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + + const connection = await firebolt.connect(connectionParams); + + await expect(connection.rollback()).rejects.toThrow( + "Transaction management is not supported in this Firebolt version." + ); + }); + }); }); diff --git a/test/integration/v2/transaction.test.ts b/test/integration/v2/transaction.test.ts new file mode 100644 index 00000000..421fa4f7 --- /dev/null +++ b/test/integration/v2/transaction.test.ts @@ -0,0 +1,369 @@ +import { Firebolt } from "../../../src"; + +const connectionParams = { + auth: { + client_id: process.env.FIREBOLT_CLIENT_ID as string, + client_secret: process.env.FIREBOLT_CLIENT_SECRET as string + }, + account: process.env.FIREBOLT_ACCOUNT as string, + database: process.env.FIREBOLT_DATABASE as string, + engineName: process.env.FIREBOLT_ENGINE_NAME as string +}; + +jest.setTimeout(500000); + +describe("v2 transaction integration tests", () => { + beforeAll(async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + + // Setup test table + const connection = await firebolt.connect(connectionParams); + await connection.execute("DROP TABLE IF EXISTS transaction_test"); + await connection.execute(` + CREATE FACT TABLE IF NOT EXISTS transaction_test ( + id LONG, + name TEXT + ) + `); + }); + + afterAll(async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + // Cleanup test table + const connection = await firebolt.connect(connectionParams); + try { + await connection.execute("DROP TABLE IF EXISTS transaction_test CASCADE"); + } catch (error) { + // Ignore cleanup errors + } + }); + + const checkRecordCountByIdInAnotherTransaction = async ( + id: number, + expected: number + ): Promise => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + const statement = await connection.execute( + `SELECT COUNT(*) FROM transaction_test WHERE id = ${id}` + ); + const { data } = await statement.fetchResult(); + const count = parseInt(data[0][0]); + expect(count).toBe(expected); + }; + + it("should commit transaction", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + await connection.execute("BEGIN TRANSACTION"); + await connection.execute("INSERT INTO transaction_test VALUES (1, 'test')"); + + await checkRecordCountByIdInAnotherTransaction(1, 0); + + await connection.execute("COMMIT"); + + await checkRecordCountByIdInAnotherTransaction(1, 1); + }); + + it("should rollback transaction", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + await connection.execute("BEGIN TRANSACTION"); + await connection.execute("INSERT INTO transaction_test VALUES (2, 'test')"); + + await checkRecordCountByIdInAnotherTransaction(2, 0); + + await connection.execute("ROLLBACK"); + + await checkRecordCountByIdInAnotherTransaction(2, 0); + }); + + it("should commit transaction using transaction control methods", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + // In v2, auto commit is handled through explicit transaction control + await connection.begin(); + + await connection.execute("INSERT INTO transaction_test VALUES (3, 'test')"); + + await checkRecordCountByIdInAnotherTransaction(3, 0); + + await connection.commit(); + + await checkRecordCountByIdInAnotherTransaction(3, 1); + }); + + it("should rollback transaction using transaction control methods", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + await connection.begin(); + + // Start transaction + await connection.execute("INSERT INTO transaction_test VALUES (4, 'test')"); + + await checkRecordCountByIdInAnotherTransaction(4, 0); + + // Rollback + await connection.rollback(); + + await checkRecordCountByIdInAnotherTransaction(4, 0); + }); + + it("should handle sequential transactions", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + // First transaction + await connection.begin(); + await connection.execute("INSERT INTO transaction_test VALUES (5, 'test')"); + await checkRecordCountByIdInAnotherTransaction(5, 0); + await connection.commit(); + + await checkRecordCountByIdInAnotherTransaction(5, 1); + + // Second transaction + await connection.begin(); + await connection.execute("INSERT INTO transaction_test VALUES (6, 'test')"); + await checkRecordCountByIdInAnotherTransaction(6, 0); + await connection.commit(); + + await checkRecordCountByIdInAnotherTransaction(5, 1); + await checkRecordCountByIdInAnotherTransaction(6, 1); + }); + + it("should work with prepared statements", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + await connection.begin(); + + await connection.execute( + "INSERT INTO transaction_test VALUES (?, 'test')", + { + parameters: [7] + } + ); + + await checkRecordCountByIdInAnotherTransaction(7, 0); + + await connection.commit(); + await checkRecordCountByIdInAnotherTransaction(7, 1); + }); + + it("should not commit transaction when connection closes", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + let connection = await firebolt.connect(connectionParams); + + await connection.execute("BEGIN TRANSACTION"); + await connection.execute("INSERT INTO transaction_test VALUES (8, 'test')"); + await checkRecordCountByIdInAnotherTransaction(8, 0); + + // Simulate connection close by creating a new connection + connection = await firebolt.connect(connectionParams); + + await checkRecordCountByIdInAnotherTransaction(8, 0); + }); + + it("should throw exception when starting transaction during transaction", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + await connection.execute("BEGIN TRANSACTION"); + + await expect(connection.execute("BEGIN TRANSACTION")).rejects.toThrow( + /cannot BEGIN transaction: a transaction is already in progress/i + ); + }); + + it("should throw exception when committing with no transaction", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + await expect(connection.execute("COMMIT")).rejects.toThrow( + /cannot COMMIT transaction: no transaction is in progress/i + ); + }); + + it("should throw exception when rollback with no transaction", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + await expect(connection.execute("ROLLBACK")).rejects.toThrow( + /Cannot ROLLBACK transaction: no transaction is in progress/i + ); + }); + + it("should commit table creation and data insertion", async () => { + const tableName = "transaction_commit_test"; + + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + await connection.execute(`DROP TABLE IF EXISTS ${tableName} CASCADE`); + + await connection.begin(); + + const createTableSQL = `CREATE FACT TABLE ${tableName} (id LONG, name TEXT)`; + const insertSQL = `INSERT INTO ${tableName} (id, name) VALUES (0, 'some_text')`; + const checkTableSQL = `SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '${tableName}'`; + const selectSQL = `SELECT * FROM ${tableName}`; + + await connection.execute(createTableSQL); + await connection.execute(insertSQL); + + // Check table doesn't exist in another connection + const checkConnection = await firebolt.connect(connectionParams); + const checkStatement = await checkConnection.execute(checkTableSQL); + const { data: checkData } = await checkStatement.fetchResult(); + const count = parseInt(checkData[0][0]); + expect(count).toBe(0); + + await connection.commit(); + + const statement = await connection.execute(selectSQL); + const { data } = await statement.fetchResult(); + + expect(data.length).toBe(1); + const row = data[0]; + const id = parseInt(row[0]); + const name = row[1] as string; + + expect(id).toBe(0); + expect(name).toBe("some_text"); + + // Cleanup + await connection.execute(`DROP TABLE IF EXISTS ${tableName} CASCADE`); + }); + + it("should rollback table creation and data insertion", async () => { + const tableName = "transaction_rollback_test"; + + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + await connection.execute(`DROP TABLE IF EXISTS ${tableName} CASCADE`); + + await connection.begin(); + + const createTableSQL = `CREATE FACT TABLE ${tableName} (id LONG, name TEXT)`; + const insertSQL = `INSERT INTO ${tableName} (id, name) VALUES (0, 'some_text')`; + const checkTableSQL = `SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '${tableName}'`; + + await connection.execute(createTableSQL); + await connection.execute(insertSQL); + + // Check table doesn't exist in another connection + const checkConnection = await firebolt.connect(connectionParams); + const checkStatement = await checkConnection.execute(checkTableSQL); + const { data: checkData } = await checkStatement.fetchResult(); + const count = parseInt(checkData[0][0]); + expect(count).toBe(0); + + await connection.rollback(); + + const statement = await connection.execute(checkTableSQL); + const { data } = await statement.fetchResult(); + const finalCount = parseInt(data[0][0]); + expect(finalCount).toBe(0); + }); + + it("should handle parallel transactions", async () => { + const tableName = "parallel_transactions_test"; + const dropTableSQL = `DROP TABLE IF EXISTS ${tableName} CASCADE`; + const createTableSQL = `CREATE FACT TABLE IF NOT EXISTS ${tableName} (id LONG, name TEXT)`; + const insertSQL = `INSERT INTO ${tableName} (id, name) VALUES (?, ?)`; + const selectSQL = `SELECT * FROM ${tableName} ORDER BY id`; + + const firstName = "first"; + const secondName = "second"; + + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const tx1 = await firebolt.connect(connectionParams); + const tx2 = await firebolt.connect(connectionParams); + + await tx1.execute(dropTableSQL); + await tx1.execute(createTableSQL); + + await tx1.begin(); + await tx2.begin(); + + await tx1.execute(insertSQL, { parameters: [1, firstName] }); + await tx2.execute(insertSQL, { parameters: [2, secondName] }); + + // Validate each transaction can see its own data + await validateSingleResult(tx1, selectSQL, 1, firstName); + await validateSingleResult(tx2, selectSQL, 2, secondName); + + await tx1.commit(); + await tx2.commit(); + + // Check final state + const connection = await firebolt.connect(connectionParams); + const statement = await connection.execute(selectSQL); + const { data } = await statement.fetchResult(); + + expect(data.length).toBe(2); + + const row1 = data[0]; + expect(parseInt(row1[0])).toBe(1); + expect(row1[1]).toBe(firstName); + + const row2 = data[1]; + expect(parseInt(row2[0])).toBe(2); + expect(row2[1]).toBe(secondName); + + // Cleanup + await connection.execute(dropTableSQL); + }); + + const validateSingleResult = async ( + connection: any, + selectSQL: string, + expectedId: number, + expectedName: string + ): Promise => { + const statement = await connection.execute(selectSQL); + const { data } = await statement.fetchResult(); + + expect(data.length).toBe(1); + const row = data[0]; + const id = parseInt(row[0]); + const name = row[1] as string; + expect(id).toBe(expectedId); + expect(name).toBe(expectedName); + }; +}); diff --git a/test/unit/connection.test.ts b/test/unit/connection.test.ts index 9a3e16f4..f655be96 100644 --- a/test/unit/connection.test.ts +++ b/test/unit/connection.test.ts @@ -33,6 +33,34 @@ const emptyResponse = { rows: 0 }; +class MockConnection extends ConnectionV2 { + updateParameters(params: Record) { + this.parameters = { + ...this.parameters, + ...params + }; + } +} + +// A hack to allow updating connection parameters stored internally +async function mockConnect(connectionOptions: ConnectionOptions) { + const context = { + logger: new Logger(), + httpClient: new NodeHttpClient(), + apiEndpoint + }; + const queryFormatter = new QueryFormatter(); + const auth = new Authenticator(context, connectionOptions); + const connection = new MockConnection( + queryFormatter, + context, + connectionOptions + ); + await auth.authenticate(); + await connection.resolveEngineEndpoint(); + return connection; +} + function resetServerHandlers(server: any) { server.use( rest.post(`https://id.fake.firebolt.io/oauth/token`, (req, res, ctx) => { @@ -291,7 +319,7 @@ INFO: SYNTAX_ERROR - Unexpected character at {"failingLine":42,"startOffset":120 expect(paramsUsed).toHaveProperty("engine", "dummy"); // Extra params in USE DATABASE are ignored // But in USE ENGINE, they are used - expect(paramsUsed).not.toHaveProperty("other_param"); + expect(paramsUsed).toHaveProperty("other_param", "2"); expect(paramsUsed).toHaveProperty("another_eng_param", "1"); }); @@ -420,34 +448,6 @@ INFO: SYNTAX_ERROR - Unexpected character at {"failingLine":42,"startOffset":120 }) ); - class MockConnection extends ConnectionV2 { - updateParameters(params: Record) { - this.parameters = { - ...this.parameters, - ...params - }; - } - } - - // A hack to allow updating connection parameters stored internally - async function mockConnect(connectionOptions: ConnectionOptions) { - const context = { - logger: new Logger(), - httpClient: new NodeHttpClient(), - apiEndpoint - }; - const queryFormatter = new QueryFormatter(); - const auth = new Authenticator(context, connectionOptions); - const connection = new MockConnection( - queryFormatter, - context, - connectionOptions - ); - await auth.authenticate(); - await connection.resolveEngineEndpoint(); - return connection; - } - const connection = await mockConnect(connectionParams); connection.updateParameters({ param: "value" }); @@ -462,6 +462,247 @@ INFO: SYNTAX_ERROR - Unexpected character at {"failingLine":42,"startOffset":120 expect(searchParamsUsed.get("database")).toEqual("dummy"); }); + it("handles remove parameters header", async () => { + const connectionParams: ConnectionOptions = { + auth: { + client_id: "dummy", + client_secret: "dummy" + }, + database: "dummy", + engineName: "dummy", + account: "my_account" + }; + + let searchParamsUsed = new URLSearchParams(); + server.use( + rest.post(`https://some_engine.com`, async (req, res, ctx) => { + const body = await req.text(); + if (body.startsWith("SELECT 1")) { + // First query - return with remove parameters header + return res( + ctx.json(selectOneResponse), + ctx.set("Firebolt-Remove-Parameters", "param1,param2") + ); + } + if (body.startsWith("SELECT 2")) { + searchParamsUsed = req.url.searchParams; + return res(ctx.json(selectOneResponse)); + } + }) + ); + + const connection = await mockConnect(connectionParams); + connection.updateParameters({ + param1: "value1", + param2: "value2", + param3: "value3" + }); + + // Execute query that triggers remove parameters header + await connection.execute("SELECT 1"); + + // Execute another query to check parameters + await connection.execute("SELECT 2"); + + expect(searchParamsUsed.get("param1")).toEqual(null); + expect(searchParamsUsed.get("param2")).toEqual(null); + expect(searchParamsUsed.get("param3")).toEqual("value3"); + expect(searchParamsUsed.get("database")).toEqual("dummy"); + }); + + it("handles remove parameters header with whitespace", async () => { + const connectionParams: ConnectionOptions = { + auth: { + client_id: "dummy", + client_secret: "dummy" + }, + database: "dummy", + engineName: "dummy", + account: "my_account" + }; + + let searchParamsUsed = new URLSearchParams(); + server.use( + rest.post(`https://some_engine.com`, async (req, res, ctx) => { + const body = await req.text(); + if (body.startsWith("SELECT 1")) { + // Return with remove parameters header containing whitespace + return res( + ctx.json(selectOneResponse), + ctx.set("Firebolt-Remove-Parameters", " param1 , param2 , param3 ") + ); + } + if (body.startsWith("SELECT 2")) { + searchParamsUsed = req.url.searchParams; + return res(ctx.json(selectOneResponse)); + } + }) + ); + + const connection = await mockConnect(connectionParams); + connection.updateParameters({ + param1: "value1", + param2: "value2", + param3: "value3", + param4: "value4" + }); + + // Execute query that triggers remove parameters header + await connection.execute("SELECT 1"); + + // Execute another query to check parameters + await connection.execute("SELECT 2"); + + expect(searchParamsUsed.get("param1")).toEqual(null); + expect(searchParamsUsed.get("param2")).toEqual(null); + expect(searchParamsUsed.get("param3")).toEqual(null); + expect(searchParamsUsed.get("param4")).toEqual("value4"); + expect(searchParamsUsed.get("database")).toEqual("dummy"); + }); + + it("handles remove parameters header with non-existent parameters", async () => { + const connectionParams: ConnectionOptions = { + auth: { + client_id: "dummy", + client_secret: "dummy" + }, + database: "dummy", + engineName: "dummy", + account: "my_account" + }; + + let searchParamsUsed = new URLSearchParams(); + server.use( + rest.post(`https://some_engine.com`, async (req, res, ctx) => { + const body = await req.text(); + if (body.startsWith("SELECT 1")) { + // Remove both existing and non-existent parameters + return res( + ctx.json(selectOneResponse), + ctx.set( + "Firebolt-Remove-Parameters", + "existing_param,non_existent_param" + ) + ); + } + if (body.startsWith("SELECT 2")) { + searchParamsUsed = req.url.searchParams; + return res(ctx.json(selectOneResponse)); + } + }) + ); + + const connection = await mockConnect(connectionParams); + connection.updateParameters({ + existing_param: "value1", + other_param: "value2" + }); + + // Execute query that triggers remove parameters header + await connection.execute("SELECT 1"); + + // Execute another query to check parameters + await connection.execute("SELECT 2"); + + expect(searchParamsUsed.get("existing_param")).toEqual(null); + expect(searchParamsUsed.get("non_existent_param")).toEqual(null); + expect(searchParamsUsed.get("other_param")).toEqual("value2"); + expect(searchParamsUsed.get("database")).toEqual("dummy"); + }); + + it("handles remove parameters header with single parameter", async () => { + const connectionParams: ConnectionOptions = { + auth: { + client_id: "dummy", + client_secret: "dummy" + }, + database: "dummy", + engineName: "dummy", + account: "my_account" + }; + + let searchParamsUsed = new URLSearchParams(); + server.use( + rest.post(`https://some_engine.com`, async (req, res, ctx) => { + const body = await req.text(); + if (body.startsWith("SELECT 1")) { + // Remove single parameter + return res( + ctx.json(selectOneResponse), + ctx.set("Firebolt-Remove-Parameters", "transaction_id") + ); + } + if (body.startsWith("SELECT 2")) { + searchParamsUsed = req.url.searchParams; + return res(ctx.json(selectOneResponse)); + } + }) + ); + + const connection = await mockConnect(connectionParams); + connection.updateParameters({ + transaction_id: "tx_123", + other_param: "value" + }); + + // Execute query that triggers remove parameters header + await connection.execute("SELECT 1"); + + // Execute another query to check parameters + await connection.execute("SELECT 2"); + + expect(searchParamsUsed.get("transaction_id")).toEqual(null); + expect(searchParamsUsed.get("other_param")).toEqual("value"); + expect(searchParamsUsed.get("database")).toEqual("dummy"); + }); + + it("handles empty remove parameters header", async () => { + const connectionParams: ConnectionOptions = { + auth: { + client_id: "dummy", + client_secret: "dummy" + }, + database: "dummy", + engineName: "dummy", + account: "my_account" + }; + + let searchParamsUsed = new URLSearchParams(); + server.use( + rest.post(`https://some_engine.com`, async (req, res, ctx) => { + const body = await req.text(); + if (body.startsWith("SELECT 1")) { + // Empty remove parameters header + return res( + ctx.json(selectOneResponse), + ctx.set("Firebolt-Remove-Parameters", "") + ); + } + if (body.startsWith("SELECT 2")) { + searchParamsUsed = req.url.searchParams; + return res(ctx.json(selectOneResponse)); + } + }) + ); + + const connection = await mockConnect(connectionParams); + connection.updateParameters({ + param1: "value1", + param2: "value2" + }); + + // Execute query that triggers empty remove parameters header + await connection.execute("SELECT 1"); + + // Execute another query to check parameters + await connection.execute("SELECT 2"); + + // All parameters should still be present + expect(searchParamsUsed.get("param1")).toEqual("value1"); + expect(searchParamsUsed.get("param2")).toEqual("value2"); + expect(searchParamsUsed.get("database")).toEqual("dummy"); + }); + it("handles set statements correctly", async () => { let searchParamsUsed = new URLSearchParams(); server.use( diff --git a/test/unit/http.test.ts b/test/unit/http.test.ts index df2a6be0..5594489b 100644 --- a/test/unit/http.test.ts +++ b/test/unit/http.test.ts @@ -114,7 +114,7 @@ describe.each([ ); server.use( rest.post(`https://${apiEndpoint}/engines`, (req, res, ctx) => { - expect(req.headers.get("Firebolt-Protocol-Version")).toEqual("2.3"); + expect(req.headers.get("Firebolt-Protocol-Version")).toEqual("2.4"); return res(ctx.json({ ok: true })); }) ); @@ -398,7 +398,8 @@ describe.each([ calls++; return res( ctx.json({ - access_token: calls === 1 ? "fake_access_token_1" : "fake_access_token_2", + access_token: + calls === 1 ? "fake_access_token_1" : "fake_access_token_2", expires_in: 3600 // 1 hour }) ); diff --git a/test/unit/v1/connection.test.ts b/test/unit/v1/connection.test.ts index fce4de06..ec3d67f5 100644 --- a/test/unit/v1/connection.test.ts +++ b/test/unit/v1/connection.test.ts @@ -291,4 +291,29 @@ describe("Connection v1", () => { /.*not supported.*/ ); }); + + it("verify transaction methods throw an error", async () => { + const firebolt = Firebolt({ apiEndpoint }); + + const connectionParams: ConnectionOptions = { + auth: { + username: "user", + password: "pass" + }, + database: "dummy", + engineName: "dummy", + account: accountName + }; + + const connection = await firebolt.connect(connectionParams); + await expect(connection.begin()).rejects.toThrow( + "Transaction management is not supported in this Firebolt version." + ); + await expect(connection.commit()).rejects.toThrow( + "Transaction management is not supported in this Firebolt version." + ); + await expect(connection.rollback()).rejects.toThrow( + "Transaction management is not supported in this Firebolt version." + ); + }); }); diff --git a/test/unit/v2/connection.test.ts b/test/unit/v2/connection.test.ts index b9c51767..b8951842 100644 --- a/test/unit/v2/connection.test.ts +++ b/test/unit/v2/connection.test.ts @@ -274,6 +274,129 @@ describe("Connection V2", () => { expect(engineUrlCalls).toBe(2); }); + it("handles remove parameters header for transaction_id in v2", async () => { + const firebolt = Firebolt({ + apiEndpoint + }); + + let requestParameters = new URLSearchParams(); + setupMockServer(server); + server.use( + rest.post( + `https://some_system_engine.com/${QUERY_URL}`, + async (req, res, ctx) => { + const body = await req.text(); + requestParameters = req.url.searchParams; + + if (body === "COMMIT") { + // Simulate Firebolt removing transaction_id after COMMIT + return res( + ctx.json({ data: [], meta: [] }), + ctx.set("Firebolt-Remove-Parameters", "transaction_id") + ); + } + if (body === "BEGIN TRANSACTION") { + // Simulate Firebolt adding transaction_id after BEGIN + return res( + ctx.json({ data: [], meta: [] }), + ctx.set("Firebolt-Update-Parameters", "transaction_id=tx_12345") + ); + } + return res(ctx.json({ data: [], meta: [] })); + } + ) + ); + + const connectionParams: ConnectionOptions = { + auth: { + client_id: "dummy", + client_secret: "dummy" + }, + database: "dummy", + account: "my_account" + }; + + const connection = await firebolt.connect(connectionParams); + + // Start transaction - should add transaction_id + await connection.execute("BEGIN TRANSACTION"); + + // Execute a query to check parameters + await connection.execute("SELECT 1"); + expect(requestParameters.get("transaction_id")).toBe("tx_12345"); + + // Commit transaction - should remove transaction_id + await connection.execute("COMMIT"); + + // Execute another query to verify transaction_id is removed + await connection.execute("SELECT 2"); + expect(requestParameters.get("transaction_id")).toBeNull(); + }); + + it("handles remove parameters header with multiple parameters in v2", async () => { + const firebolt = Firebolt({ + apiEndpoint + }); + + let requestParameters = new URLSearchParams(); + setupMockServer(server); + server.use( + rest.post( + `https://some_system_engine.com/${QUERY_URL}`, + async (req, res, ctx) => { + const body = await req.text(); + requestParameters = req.url.searchParams; + + if (body === "ROLLBACK") { + // Simulate removing multiple transaction-related parameters + return res( + ctx.json({ data: [], meta: [] }), + ctx.set( + "Firebolt-Remove-Parameters", + "transaction_id,lock_timeout,isolation_level" + ) + ); + } + if (body === "SELECT 1") { + // Add multiple parameters + return res( + ctx.json({ data: [], meta: [] }), + ctx.set("Firebolt-Update-Parameters", "transaction_id=tx_abc") + ); + } + return res(ctx.json({ data: [], meta: [] })); + } + ) + ); + + const connectionParams: ConnectionOptions = { + auth: { + client_id: "dummy", + client_secret: "dummy" + }, + database: "dummy", + account: "my_account" + }; + + const connection = await firebolt.connect(connectionParams); + + // Add parameters + await connection.execute("SELECT 1"); + + // Verify parameters are present + await connection.execute("SELECT 2"); + expect(requestParameters.get("transaction_id")).toBe("tx_abc"); + + // Rollback - should remove multiple parameters + await connection.execute("ROLLBACK"); + + // Verify parameters are removed + await connection.execute("SELECT 3"); + expect(requestParameters.get("transaction_id")).toBeNull(); + // Database should still be present as it's not in the remove list + expect(requestParameters.get("database")).toBe("dummy"); + }); + it("executes async query successfully", async () => { const firebolt = Firebolt({ apiEndpoint @@ -1154,4 +1277,123 @@ describe("Connection V2", () => { "- Line 1, Column 17: Query referenced positional parameter $34, but it was not set" ); }); + + describe("Transaction methods", () => { + let transactionQueries: string[] = []; + + beforeEach(() => { + transactionQueries = []; + server.use( + rest.post( + `https://id.fake.firebolt.io/oauth/token`, + (req, res, ctx) => { + return res( + ctx.json({ + access_token: "fake_access_token" + }) + ); + } + ), + rest.get( + `https://api.fake.firebolt.io/web/v3/account/my_account/engineUrl`, + (req, res, ctx) => { + return res( + ctx.json({ + engineUrl: "https://some_system_engine.com" + }) + ); + } + ), + rest.post( + `https://some_system_engine.com/${QUERY_URL}`, + async (req, res, ctx) => { + const body = await req.text(); + transactionQueries.push(body); + return res(ctx.json(engineUrlResponse)); + } + ) + ); + }); + + it("executes BEGIN TRANSACTION query", async () => { + const firebolt = Firebolt({ + apiEndpoint + }); + + const connectionParams: ConnectionOptions = { + auth: { + client_id: "dummy", + client_secret: "dummy" + }, + account: "my_account" + }; + + const connection = await firebolt.connect(connectionParams); + await connection.begin(); + + expect(transactionQueries).toContain("BEGIN TRANSACTION"); + }); + + it("executes COMMIT query", async () => { + const firebolt = Firebolt({ + apiEndpoint + }); + + const connectionParams: ConnectionOptions = { + auth: { + client_id: "dummy", + client_secret: "dummy" + }, + account: "my_account" + }; + + const connection = await firebolt.connect(connectionParams); + await connection.commit(); + + expect(transactionQueries).toContain("COMMIT"); + }); + + it("executes ROLLBACK query", async () => { + const firebolt = Firebolt({ + apiEndpoint + }); + + const connectionParams: ConnectionOptions = { + auth: { + client_id: "dummy", + client_secret: "dummy" + }, + account: "my_account" + }; + + const connection = await firebolt.connect(connectionParams); + await connection.rollback(); + + expect(transactionQueries).toContain("ROLLBACK"); + }); + + it("handles full transaction lifecycle", async () => { + const firebolt = Firebolt({ + apiEndpoint + }); + + const connectionParams: ConnectionOptions = { + auth: { + client_id: "dummy", + client_secret: "dummy" + }, + account: "my_account" + }; + + const connection = await firebolt.connect(connectionParams); + + await connection.begin(); + await connection.execute("INSERT INTO test_table VALUES (1)"); + await connection.commit(); + + expect(transactionQueries).toContain("BEGIN TRANSACTION"); + expect(transactionQueries).toContain("INSERT INTO test_table VALUES (1)"); + expect(transactionQueries).toContain("COMMIT"); + }); + }); });