diff --git a/components/google_cloud/package.json b/components/google_cloud/package.json index 01508632ca235..2593e22b65e60 100644 --- a/components/google_cloud/package.json +++ b/components/google_cloud/package.json @@ -1,6 +1,6 @@ { "name": "@pipedream/google_cloud", - "version": "0.6.1", + "version": "0.6.2", "description": "Pipedream Google_cloud Components", "main": "google_cloud.app.mjs", "keywords": [ diff --git a/components/google_cloud/sources/bigquery-new-row/bigquery-new-row.mjs b/components/google_cloud/sources/bigquery-new-row/bigquery-new-row.mjs index 9e20e95c25c6c..07a0efbc29f89 100644 --- a/components/google_cloud/sources/bigquery-new-row/bigquery-new-row.mjs +++ b/components/google_cloud/sources/bigquery-new-row/bigquery-new-row.mjs @@ -9,7 +9,7 @@ export default { // eslint-disable-next-line pipedream/source-name name: "BigQuery - New Row", description: "Emit new events when a new row is added to a table", - version: "0.1.7", + version: "0.1.8", dedupe: "unique", type: "source", props: { diff --git a/components/google_cloud/sources/bigquery-query-results/bigquery-query-results.mjs b/components/google_cloud/sources/bigquery-query-results/bigquery-query-results.mjs index 56af8ea87f529..99b8368725f4d 100644 --- a/components/google_cloud/sources/bigquery-query-results/bigquery-query-results.mjs +++ b/components/google_cloud/sources/bigquery-query-results/bigquery-query-results.mjs @@ -8,7 +8,7 @@ export default { // eslint-disable-next-line pipedream/source-name name: "BigQuery - Query Results", description: "Emit new events with the results of an arbitrary query", - version: "0.1.6", + version: "0.1.7", dedupe: "unique", type: "source", props: { @@ -21,11 +21,7 @@ export default { dedupeKey: { type: "string", label: "De-duplication Key", - description: ` - The name of a column in the table to use for deduplication. See [the - docs](https://github.com/PipedreamHQ/pipedream/tree/master/components/google_cloud/sources/bigquery-query-results#technical-details) - for more info. - `, + description: "The name of a column in the table to use for deduplication. See [the docs](https://github.com/PipedreamHQ/pipedream/tree/master/components/google_cloud/sources/bigquery-query-results#technical-details) for more info.", optional: true, }, }, @@ -47,10 +43,11 @@ export default { }, generateMetaForCollection(rows, ts) { const hash = crypto.createHash("sha1"); - rows - .map((i) => i[this.dedupeKey] || uuidv4()) - .map((i) => i.toString()) - .forEach((i) => hash.update(i)); + // Process rows incrementally to avoid memory accumulation + for (const row of rows) { + const key = row[this.dedupeKey] || uuidv4(); + hash.update(key.toString()); + } const id = hash.digest("base64"); const summary = `New event (ID: ${id})`; return { diff --git a/components/google_cloud/sources/common/bigquery.mjs b/components/google_cloud/sources/common/bigquery.mjs index 4ae2b1a6f3fd8..b5df544d6ad38 100644 --- a/components/google_cloud/sources/common/bigquery.mjs +++ b/components/google_cloud/sources/common/bigquery.mjs @@ -21,6 +21,16 @@ export default { description: "The number of rows to include in a single event (by default, emits 1 event per row)", default: 1, min: 1, + max: 1000, + }, + maxRowsPerExecution: { + type: "integer", + label: "Max Rows Per Execution", + description: "Maximum number of rows to process in a single execution to prevent memory issues", + default: 5000, + min: 100, + max: 50000, + optional: true, }, datasetId: { propDefinition: [ @@ -30,34 +40,6 @@ export default { }, }, methods: { - async createQueryJob(queryOpts) { - const client = this.googleCloud - .getBigQueryClient() - .dataset(this.datasetId); - const [ - job, - ] = await client.createQueryJob(queryOpts); - return job; - }, - async getRowsForQuery(job, pageSize = 1000, pageToken = null) { - const options = { - maxResults: pageSize, - }; - - if (pageToken) { - options.pageToken = pageToken; - } - - const [ - rows, - queryResults, - ] = await job.getQueryResults(options); - const nextPageToken = queryResults?.pageToken; - return { - rows, - pageToken: nextPageToken, - }; - }, _updateLastResultId(rows) { const lastRow = rows.pop(); if (!lastRow) { @@ -69,49 +51,101 @@ export default { this.db.set("lastResultId", newLastResultId); }, async processCollection(queryOpts, timestamp) { - const job = await this.createQueryJob(queryOpts); + const client = this.googleCloud + .getBigQueryClient() + .dataset(this.datasetId); + const [ + job, + ] = await client.createQueryJob(queryOpts); - const pageSize = 1000, maxPages = 10; + const pageSize = 100; + const maxRowsPerExecution = this.maxRowsPerExecution || 5000; + const maxPages = Math.ceil(maxRowsPerExecution / pageSize); let pageToken = null; let allProcessed = false; let pageCount = 0; + let totalRowsProcessed = 0; + + console.log(`Starting BigQuery processing with max ${maxRowsPerExecution} rows per execution`); while (!allProcessed) { - const { - rows, pageToken: nextPageToken, - } = await this.getRowsForQuery(job, pageSize, pageToken); + try { + const options = { + maxResults: pageSize, + }; - if (rows.length === 0) { - allProcessed = true; - break; - } + if (pageToken) { + options.pageToken = pageToken; + } - chunk(rows, this.eventSize).forEach((batch) => { - if (this.eventSize === 1) { - const meta = this.generateMeta(batch[0], timestamp); - this.$emit(batch[0], meta); - } else { - const meta = this.generateMetaForCollection(batch, timestamp); - const data = { - rows: batch, - rowCount: batch.length, - }; - this.$emit(data, meta); + const [ + rows, + queryResults, + ] = await job.getQueryResults(options); + const nextPageToken = queryResults?.pageToken; + + if (rows.length === 0) { + allProcessed = true; + break; } - }); - pageCount++; - if (pageCount >= maxPages) { - allProcessed = true; - } - if (this.uniqueKey) { - this._updateLastResultId(rows); - } - if (!nextPageToken) { - break; + // Check memory limits before processing + totalRowsProcessed += rows.length; + if (totalRowsProcessed > maxRowsPerExecution) { + console.log(`Reached max rows limit (${maxRowsPerExecution}). Stopping processing to prevent memory issues.`); + allProcessed = true; + break; + } + + // Process rows immediately and in small chunks to reduce memory usage + chunk(rows, this.eventSize).forEach((batch) => { + try { + if (this.eventSize === 1) { + const meta = this.generateMeta(batch[0], timestamp); + this.$emit(batch[0], meta); + } else { + const meta = this.generateMetaForCollection(batch, timestamp); + const data = { + rows: batch, + rowCount: batch.length, + }; + this.$emit(data, meta); + } + } catch (error) { + console.error("Error processing batch:", error); + throw error; + } + }); + + // Update last result ID before clearing rows + if (this.uniqueKey && rows.length > 0) { + this._updateLastResultId([ + ...rows, + ]); // Pass a copy to avoid mutation issues + } + + // Clear reference to help with garbage collection + rows.length = 0; + + pageCount++; + if (pageCount >= maxPages) { + console.log(`Reached max pages limit (${maxPages}). Stopping processing.`); + allProcessed = true; + } + if (!nextPageToken) { + break; + } + pageToken = nextPageToken; + } catch (error) { + console.error("Error in BigQuery processing:", error); + if (error.message && error.message.includes("memory")) { + throw new Error(`Memory error in BigQuery processing. Consider reducing maxRowsPerExecution or eventSize. Original error: ${error.message}`); + } + throw error; } - pageToken = nextPageToken; } + + console.log(`BigQuery processing completed. Processed ${totalRowsProcessed} rows in ${pageCount} pages.`); }, getInitialEventCount() { return 10;