diff --git a/components/snowflake/actions/execute-sql-query/execute-sql-query.mjs b/components/snowflake/actions/execute-sql-query/execute-sql-query.mjs index a98f79dd8ae5c..d70431184cc48 100644 --- a/components/snowflake/actions/execute-sql-query/execute-sql-query.mjs +++ b/components/snowflake/actions/execute-sql-query/execute-sql-query.mjs @@ -2,7 +2,7 @@ import snowflake from "../../snowflake.app.mjs"; export default { name: "Execute SQL Query", - version: "0.2.0", + version: "0.2.1", key: "snowflake-execute-sql-query", description: "Execute a custom Snowflake query. See [our docs](https://pipedream.com/docs/databases/working-with-sql) to learn more about working with SQL in Pipedream.", type: "action", diff --git a/components/snowflake/actions/insert-multiple-rows/insert-multiple-rows.mjs b/components/snowflake/actions/insert-multiple-rows/insert-multiple-rows.mjs index 8da7918bc2b50..543d45a6c0a00 100644 --- a/components/snowflake/actions/insert-multiple-rows/insert-multiple-rows.mjs +++ b/components/snowflake/actions/insert-multiple-rows/insert-multiple-rows.mjs @@ -6,7 +6,7 @@ export default { key: "snowflake-insert-multiple-rows", name: "Insert Multiple Rows", description: "Insert multiple rows into a table", - version: "0.1.2", + version: "0.1.3", props: { snowflake, database: { @@ -50,6 +50,31 @@ export default { "values", ], }, + batchSize: { + type: "integer", + label: "Batch Size", + description: "Number of rows to process per batch. Automatically calculated based on data size if not specified. Recommended: `50-200` for wide tables, `100-500` for narrow tables.", + optional: true, + default: 100, + min: 10, + max: 1000, + }, + maxPayloadSizeMB: { + type: "integer", + label: "Max Payload Size (MB)", + description: "Maximum payload size per batch in MB. Helps prevent `413 Payload Too Large` errors.", + optional: true, + default: 5, + min: 1, + max: 10, + }, + enableBatching: { + type: "boolean", + label: "Enable Batch Processing", + description: "Enable automatic batch processing for large datasets. Disable only for small datasets (< 50 rows) or troubleshooting.", + optional: true, + default: true, + }, }, async run({ $ }) { let rows = this.values; @@ -57,13 +82,22 @@ export default { let inputValidated = true; if (!Array.isArray(rows)) { - rows = JSON.parse(rows); + try { + rows = JSON.parse(rows); + } catch (parseError) { + throw new ConfigurationError("The row data could not be parsed as JSON. Please ensure it's a valid JSON array of arrays."); + } } if (!rows || !rows.length || !Array.isArray(rows)) { inputValidated = false; } else { - rows.forEach((row) => { if (!Array.isArray(row)) { inputValidated = false; } }); + rows.forEach((row, index) => { + if (!Array.isArray(row)) { + console.log(`Row ${index + 1} is not an array:`, row); + inputValidated = false; + } + }); } // Throw an error if input validation failed @@ -71,8 +105,81 @@ export default { throw new ConfigurationError("The row data you passed is not an array of arrays. Please enter an array of arrays in the `Values` parameter above. If you're trying to add a single row to Snowflake, select the **Insert Single Row** action."); } - const response = await this.snowflake.insertRows(this.tableName, this.columns, this.values); - $.export("$summary", `Successfully inserted ${this.values.length} rows in ${this.tableName}`); - return response; + const expectedColumnCount = this.columns.length; + const invalidRows = rows.filter((row, index) => { + if (row.length !== expectedColumnCount) { + console.error(`Row ${index + 1} has ${row.length} values but ${expectedColumnCount} columns specified`); + return true; + } + return false; + }); + + if (invalidRows.length > 0) { + throw new ConfigurationError(`${invalidRows.length} rows have a different number of values than the specified columns. Each row must have exactly ${expectedColumnCount} values to match the selected columns.`); + } + + // Add batch processing options + const batchOptions = { + batchSize: this.batchSize, + maxPayloadSizeMB: this.maxPayloadSizeMB, + enableBatching: this.enableBatching, + }; + + try { + const response = await this.snowflake.insertRows( + this.tableName, + this.columns, + rows, + batchOptions, + ); + + // Handle different response formats (batched vs single insert) + if (response.summary) { + // Batched response + const { summary } = response; + $.export("$summary", `Successfully inserted ${summary.totalRowsProcessed} rows into ${this.tableName} using ${summary.totalBatches} batches`); + + // Export detailed batch information + $.export("batchDetails", { + totalRows: summary.totalRows, + totalBatches: summary.totalBatches, + successfulBatches: summary.successfulBatches, + failedBatches: summary.failedBatches, + batchSize: summary.batchSize, + processingTime: new Date().toISOString(), + }); + + // Export batch results for debugging if needed + $.export("batchResults", summary.results); + + return response; + + } else { + // Single insert response (small dataset or batching disabled) + $.export("$summary", `Successfully inserted ${rows.length} rows into ${this.tableName}`); + return response; + } + + } catch (error) { + // Enhanced error handling for batch processing + if (error.summary) { + // Partial failure in batch processing + const { summary } = error; + $.export("$summary", `Partial success: ${summary.totalRowsProcessed}/${summary.totalRows} rows inserted. ${summary.failedBatches} batches failed.`); + $.export("batchDetails", summary); + $.export("failedBatches", summary.results.filter((r) => !r.success)); + } + + // Re-throw the error with additional context + if (error.message.includes("413") || error.message.includes("Payload Too Large")) { + throw new ConfigurationError( + `Payload too large error detected. Try reducing the batch size (current: ${this.batchSize}) or enable batching if disabled. ` + + `You're trying to insert ${rows.length} rows with ${this.columns.length} columns each. ` + + `Original error: ${error.message}`, + ); + } + + throw error; + } }, }; diff --git a/components/snowflake/actions/insert-row/insert-row.mjs b/components/snowflake/actions/insert-row/insert-row.mjs index fbe646fcab3dd..431f0fb79e80b 100644 --- a/components/snowflake/actions/insert-row/insert-row.mjs +++ b/components/snowflake/actions/insert-row/insert-row.mjs @@ -5,7 +5,7 @@ export default { key: "snowflake-insert-row", name: "Insert Single Row", description: "Insert a row into a table", - version: "1.1.2", + version: "1.1.3", props: { snowflake, database: { diff --git a/components/snowflake/common/utils.mjs b/components/snowflake/common/utils.mjs new file mode 100644 index 0000000000000..1e2f3c7dfa38a --- /dev/null +++ b/components/snowflake/common/utils.mjs @@ -0,0 +1,24 @@ +function estimatePayloadSize(data) { + try { + const jsonString = JSON.stringify(data); + // Use Buffer.byteLength for accurate size calculation in Node.js + return Buffer.byteLength(jsonString, "utf8"); + } catch (error) { + // Fallback estimation if JSON.stringify fails + return data.length * 1000; // Conservative estimate + } +} + +// Helper method to split rows into batches +function createBatches(rows, batchSize) { + const batches = []; + for (let i = 0; i < rows.length; i += batchSize) { + batches.push(rows.slice(i, i + batchSize)); + } + return batches; +} + +export default { + estimatePayloadSize, + createBatches, +}; diff --git a/components/snowflake/package.json b/components/snowflake/package.json index 6907b2812e781..a15bc6f4fc550 100644 --- a/components/snowflake/package.json +++ b/components/snowflake/package.json @@ -1,6 +1,6 @@ { "name": "@pipedream/snowflake", - "version": "0.13.0", + "version": "0.13.1", "description": "Pipedream Snowflake Components", "main": "snowflake.app.mjs", "keywords": [ diff --git a/components/snowflake/snowflake.app.mjs b/components/snowflake/snowflake.app.mjs index 7e5406ab67736..9342b01ca19eb 100644 --- a/components/snowflake/snowflake.app.mjs +++ b/components/snowflake/snowflake.app.mjs @@ -2,8 +2,9 @@ import { createPrivateKey } from "crypto"; import { snowflake } from "@pipedream/snowflake-sdk"; import { promisify } from "util"; import { - sqlProxy, sqlProp, + sqlProxy, sqlProp, ConfigurationError, } from "@pipedream/platform"; +import utils from "./common/utils.mjs"; snowflake.configure({ logLevel: "WARN", @@ -368,13 +369,123 @@ export default { }; return this.executeQuery(statement); }, - async insertRows(tableName, columns, binds) { - const sqlText = `INSERT INTO ${tableName} (${columns.join(",")}) VALUES (${columns.map(() => "?").join(", ")});`; + async _insertRowsOriginal(tableName, columns, values) { + // Create placeholders for all rows + const rowPlaceholders = values.map(() => + `(${columns.map(() => "?").join(", ")})`).join(", "); + + const sqlText = `INSERT INTO ${tableName} (${columns.join(",")}) VALUES ${rowPlaceholders}`; + + // Flatten all values into a single array for binding + const binds = values.flat(); + const statement = { sqlText, binds, }; + return this.executeQuery(statement); }, + async insertRows(tableName, columns, values, options = {}) { + const { + batchSize = 100, + maxPayloadSizeMB = 5, + enableBatching = true, + } = options; + + // If batching is disabled or small dataset, use original approach + if (!enableBatching || values.length <= 50) { + return this._insertRowsOriginal(tableName, columns, values); + } + + // Estimate payload size for dynamic batch sizing + const sampleRowData = values.slice(0, Math.min(10, values.length)); + const sampleSize = utils.estimatePayloadSize(sampleRowData); + const avgRowSize = sampleSize / sampleRowData.length; + const maxSizeBytes = maxPayloadSizeMB * 1024 * 1024; + + // Calculate optimal batch size with safety margin + const calculatedBatchSize = Math.floor((maxSizeBytes * 0.8) / avgRowSize); + const optimalBatchSize = Math.min( + Math.max(calculatedBatchSize, 10), // Minimum 10 rows per batch + Math.min(batchSize, 500), // Maximum 500 rows per batch + ); + + console.log(`Processing ${values.length} rows in batches of ${optimalBatchSize}`); + + // Split into batches + const batches = utils.createBatches(values, optimalBatchSize); + + // Process batches sequentially + const results = []; + let totalRowsProcessed = 0; + let successfulBatches = 0; + let failedBatches = 0; + + for (let i = 0; i < batches.length; i++) { + const batch = batches[i]; + + try { + console.log(`Processing batch ${i + 1}/${batches.length} (${batch.length} rows)`); + + const batchResult = await this._insertRowsOriginal(tableName, columns, batch); + + results.push({ + batchIndex: i + 1, + rowsProcessed: batch.length, + success: true, + result: batchResult, + }); + + totalRowsProcessed += batch.length; + successfulBatches++; + + // Small delay between batches to prevent overwhelming the server + if (i < batches.length - 1) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + } catch (error) { + console.log(`Batch ${i + 1} failed:`, error.message); + + results.push({ + batchIndex: i + 1, + rowsProcessed: 0, + success: false, + error: error.message, + }); + + failedBatches++; + + // Continue processing remaining batches + } + } + + const summary = { + totalRows: values.length, + totalBatches: batches.length, + successfulBatches, + failedBatches, + totalRowsProcessed, + batchSize: optimalBatchSize, + results, + }; + + console.log(`Batch processing completed: ${totalRowsProcessed}/${values.length} rows processed`); + + if (failedBatches > 0) { + const error = new ConfigurationError( + `Batch insert partially failed: \`${totalRowsProcessed}/${values.length}\` rows inserted. \`${failedBatches}\` batches failed.`, + ); + error.summary = summary; + throw error; + } + + return { + success: true, + message: `Successfully inserted ${totalRowsProcessed} rows in ${batches.length} batches`, + summary, + }; + }, }, }; diff --git a/components/snowflake/sources/change-to-warehouse/change-to-warehouse.mjs b/components/snowflake/sources/change-to-warehouse/change-to-warehouse.mjs index 68ef4609ff0d7..95cd740384a07 100644 --- a/components/snowflake/sources/change-to-warehouse/change-to-warehouse.mjs +++ b/components/snowflake/sources/change-to-warehouse/change-to-warehouse.mjs @@ -17,7 +17,7 @@ export default { // eslint-disable-next-line name: "New, Updated, or Deleted Warehouse", description: "Emit new events when a warehouse is created, altered, or dropped", - version: "0.1.2", + version: "0.1.3", async run() { await this.watchObjectsAndEmitChanges("WAREHOUSE", this.warehouses, this.queryTypes); }, diff --git a/components/snowflake/sources/deleted-role/deleted-role.mjs b/components/snowflake/sources/deleted-role/deleted-role.mjs index 2255938778efd..29f12f1e6a8e6 100644 --- a/components/snowflake/sources/deleted-role/deleted-role.mjs +++ b/components/snowflake/sources/deleted-role/deleted-role.mjs @@ -6,7 +6,7 @@ export default { key: "snowflake-deleted-role", name: "New Deleted Role", description: "Emit new event when a role is deleted", - version: "0.1.2", + version: "0.1.3", methods: { ...common.methods, getSqlText() { diff --git a/components/snowflake/sources/deleted-user/deleted-user.mjs b/components/snowflake/sources/deleted-user/deleted-user.mjs index 9d6e36df1203d..7a40eba366c27 100644 --- a/components/snowflake/sources/deleted-user/deleted-user.mjs +++ b/components/snowflake/sources/deleted-user/deleted-user.mjs @@ -6,7 +6,7 @@ export default { key: "snowflake-deleted-user", name: "New Deleted User", description: "Emit new event when a user is deleted", - version: "0.1.2", + version: "0.1.3", methods: { ...common.methods, getSqlText() { diff --git a/components/snowflake/sources/failed-task-in-schema/failed-task-in-schema.mjs b/components/snowflake/sources/failed-task-in-schema/failed-task-in-schema.mjs index ae1f3b03b1a17..86fc18c136de9 100644 --- a/components/snowflake/sources/failed-task-in-schema/failed-task-in-schema.mjs +++ b/components/snowflake/sources/failed-task-in-schema/failed-task-in-schema.mjs @@ -39,7 +39,7 @@ export default { // eslint-disable-next-line name: "Failed Task in Schema", description: "Emit new events when a task fails in a database schema", - version: "0.1.2", + version: "0.1.3", async run() { await this.emitFailedTasks({ database: this.database, diff --git a/components/snowflake/sources/new-database/new-database.mjs b/components/snowflake/sources/new-database/new-database.mjs index 0be27e16f77e8..b313fc0cab193 100644 --- a/components/snowflake/sources/new-database/new-database.mjs +++ b/components/snowflake/sources/new-database/new-database.mjs @@ -7,7 +7,7 @@ export default { key: "snowflake-new-database", name: "New Database", description: "Emit new event when a database is created", - version: "0.1.2", + version: "0.1.3", methods: { ...common.methods, alwaysRunInSingleProcessMode() { diff --git a/components/snowflake/sources/new-role/new-role.mjs b/components/snowflake/sources/new-role/new-role.mjs index b55bc1cc07506..5ec3257b908a3 100644 --- a/components/snowflake/sources/new-role/new-role.mjs +++ b/components/snowflake/sources/new-role/new-role.mjs @@ -6,7 +6,7 @@ export default { key: "snowflake-new-role", name: "New Role", description: "Emit new event when a role is created", - version: "0.1.2", + version: "0.1.3", methods: { ...common.methods, alwaysRunInSingleProcessMode() { diff --git a/components/snowflake/sources/new-row/new-row.mjs b/components/snowflake/sources/new-row/new-row.mjs index b3ed3fc35f2da..4a51bb313383c 100644 --- a/components/snowflake/sources/new-row/new-row.mjs +++ b/components/snowflake/sources/new-row/new-row.mjs @@ -6,7 +6,7 @@ export default { key: "snowflake-new-row", name: "New Row", description: "Emit new event when a row is added to a table", - version: "0.2.2", + version: "0.2.3", methods: { ...common.methods, async getStatement(lastResultId) { diff --git a/components/snowflake/sources/new-schema/new-schema.mjs b/components/snowflake/sources/new-schema/new-schema.mjs index b52708d5f8a88..c9e48efc5bccb 100644 --- a/components/snowflake/sources/new-schema/new-schema.mjs +++ b/components/snowflake/sources/new-schema/new-schema.mjs @@ -7,7 +7,7 @@ export default { key: "snowflake-new-schema", name: "New Schema", description: "Emit new event when a schema is created", - version: "0.1.2", + version: "0.1.3", methods: { ...common.methods, alwaysRunInSingleProcessMode() { diff --git a/components/snowflake/sources/new-table/new-table.mjs b/components/snowflake/sources/new-table/new-table.mjs index e22a9820efcf0..7be89c141e254 100644 --- a/components/snowflake/sources/new-table/new-table.mjs +++ b/components/snowflake/sources/new-table/new-table.mjs @@ -7,7 +7,7 @@ export default { key: "snowflake-new-table", name: "New Table", description: "Emit new event when a table is created", - version: "0.1.2", + version: "0.1.3", methods: { ...common.methods, alwaysRunInSingleProcessMode() { diff --git a/components/snowflake/sources/new-user/new-user.mjs b/components/snowflake/sources/new-user/new-user.mjs index 2ed4951da9176..94447a2f1756c 100644 --- a/components/snowflake/sources/new-user/new-user.mjs +++ b/components/snowflake/sources/new-user/new-user.mjs @@ -6,7 +6,7 @@ export default { key: "snowflake-new-user", name: "New User", description: "Emit new event when a user is created", - version: "0.1.2", + version: "0.1.3", methods: { ...common.methods, alwaysRunInSingleProcessMode() { diff --git a/components/snowflake/sources/query-results/query-results.mjs b/components/snowflake/sources/query-results/query-results.mjs index 00c31110c3516..f70b4743e89ec 100644 --- a/components/snowflake/sources/query-results/query-results.mjs +++ b/components/snowflake/sources/query-results/query-results.mjs @@ -8,7 +8,7 @@ export default { name: "New Query Results", // eslint-disable-next-line description: "Run a SQL query on a schedule, triggering a workflow for each row of results", - version: "0.2.2", + version: "0.2.3", props: { ...common.props, sqlQuery: { diff --git a/components/snowflake/sources/updated-role/updated-role.mjs b/components/snowflake/sources/updated-role/updated-role.mjs index 56ed51fc7d2dd..d53c7b828ab21 100644 --- a/components/snowflake/sources/updated-role/updated-role.mjs +++ b/components/snowflake/sources/updated-role/updated-role.mjs @@ -6,7 +6,7 @@ export default { key: "snowflake-updated-role", name: "New Update Role", description: "Emit new event when a role is updated", - version: "0.1.2", + version: "0.1.3", methods: { ...common.methods, getLookUpKey() { diff --git a/components/snowflake/sources/updated-user/updated-user.mjs b/components/snowflake/sources/updated-user/updated-user.mjs index f89f88091857c..979ef6886b52e 100644 --- a/components/snowflake/sources/updated-user/updated-user.mjs +++ b/components/snowflake/sources/updated-user/updated-user.mjs @@ -6,7 +6,7 @@ export default { key: "snowflake-updated-user", name: "New Update User", description: "Emit new event when a user is updated", - version: "0.1.2", + version: "0.1.3", methods: { ...common.methods, getLookUpKey() { diff --git a/components/snowflake/sources/usage-monitor/usage-monitor.mjs b/components/snowflake/sources/usage-monitor/usage-monitor.mjs index 0df075de144fb..7949cc11d69eb 100644 --- a/components/snowflake/sources/usage-monitor/usage-monitor.mjs +++ b/components/snowflake/sources/usage-monitor/usage-monitor.mjs @@ -6,7 +6,7 @@ export default { key: "snowflake-usage-monitor", name: "New Usage Monitor", description: "Emit new event when a query is executed in the specified params", - version: "0.1.2", + version: "0.1.3", dedupe: "unique", props: { snowflake, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8e47a0ac47b05..628c5ddf91b34 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -16144,7 +16144,7 @@ importers: version: 3.1.7 ts-jest: specifier: ^29.2.5 - version: 29.2.5(@babel/core@8.0.0-alpha.13)(@jest/transform@29.7.0)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@8.0.0-alpha.13))(jest@29.7.0(@types/node@20.17.30)(babel-plugin-macros@3.1.0))(typescript@5.7.2) + version: 29.2.5(@babel/core@7.26.0)(@jest/transform@29.7.0)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.26.0))(jest@29.7.0(@types/node@20.17.30)(babel-plugin-macros@3.1.0))(typescript@5.7.2) tsup: specifier: ^8.3.6 version: 8.3.6(@microsoft/api-extractor@7.47.12(@types/node@20.17.30))(jiti@2.4.2)(postcss@8.4.49)(tsx@4.19.4)(typescript@5.7.2)(yaml@2.6.1) @@ -16187,7 +16187,7 @@ importers: version: 3.1.0 jest: specifier: ^29.1.2 - version: 29.7.0(@types/node@20.17.30)(babel-plugin-macros@3.1.0) + version: 29.7.0(@types/node@20.17.6)(babel-plugin-macros@3.1.0) type-fest: specifier: ^4.15.0 version: 4.27.0 @@ -51376,7 +51376,7 @@ snapshots: ts-interface-checker@0.1.13: {} - ts-jest@29.2.5(@babel/core@8.0.0-alpha.13)(@jest/transform@29.7.0)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@8.0.0-alpha.13))(jest@29.7.0(@types/node@20.17.30)(babel-plugin-macros@3.1.0))(typescript@5.7.2): + ts-jest@29.2.5(@babel/core@7.26.0)(@jest/transform@29.7.0)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.26.0))(jest@29.7.0(@types/node@20.17.30)(babel-plugin-macros@3.1.0))(typescript@5.7.2): dependencies: bs-logger: 0.2.6 ejs: 3.1.10 @@ -51390,10 +51390,10 @@ snapshots: typescript: 5.7.2 yargs-parser: 21.1.1 optionalDependencies: - '@babel/core': 8.0.0-alpha.13 + '@babel/core': 7.26.0 '@jest/transform': 29.7.0 '@jest/types': 29.6.3 - babel-jest: 29.7.0(@babel/core@8.0.0-alpha.13) + babel-jest: 29.7.0(@babel/core@7.26.0) ts-jest@29.2.5(@babel/core@8.0.0-alpha.13)(@jest/transform@29.7.0)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@8.0.0-alpha.13))(jest@29.7.0(@types/node@20.17.6)(babel-plugin-macros@3.1.0))(typescript@5.6.3): dependencies: