Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/google_cloud/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pipedream/google_cloud",
"version": "0.6.1",
"version": "0.6.2",
"description": "Pipedream Google_cloud Components",
"main": "google_cloud.app.mjs",
"keywords": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export default {
// eslint-disable-next-line pipedream/source-name
name: "BigQuery - New Row",
description: "Emit new events when a new row is added to a table",
version: "0.1.7",
version: "0.1.8",
dedupe: "unique",
type: "source",
props: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export default {
// eslint-disable-next-line pipedream/source-name
name: "BigQuery - Query Results",
description: "Emit new events with the results of an arbitrary query",
version: "0.1.6",
version: "0.1.7",
dedupe: "unique",
type: "source",
props: {
Expand All @@ -21,11 +21,7 @@ export default {
dedupeKey: {
type: "string",
label: "De-duplication Key",
description: `
The name of a column in the table to use for deduplication. See [the
docs](https://github.com/PipedreamHQ/pipedream/tree/master/components/google_cloud/sources/bigquery-query-results#technical-details)
for more info.
`,
description: "The name of a column in the table to use for deduplication. See [the docs](https://github.com/PipedreamHQ/pipedream/tree/master/components/google_cloud/sources/bigquery-query-results#technical-details) for more info.",
optional: true,
},
},
Expand All @@ -47,10 +43,11 @@ export default {
},
generateMetaForCollection(rows, ts) {
const hash = crypto.createHash("sha1");
rows
.map((i) => i[this.dedupeKey] || uuidv4())
.map((i) => i.toString())
.forEach((i) => hash.update(i));
// Process rows incrementally to avoid memory accumulation
for (const row of rows) {
const key = row[this.dedupeKey] || uuidv4();
hash.update(key.toString());
}
const id = hash.digest("base64");
const summary = `New event (ID: ${id})`;
return {
Expand Down
152 changes: 93 additions & 59 deletions components/google_cloud/sources/common/bigquery.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ export default {
description: "The number of rows to include in a single event (by default, emits 1 event per row)",
default: 1,
min: 1,
max: 1000,
},
maxRowsPerExecution: {
type: "integer",
label: "Max Rows Per Execution",
description: "Maximum number of rows to process in a single execution to prevent memory issues",
default: 5000,
min: 100,
max: 50000,
optional: true,
},
datasetId: {
propDefinition: [
Expand All @@ -30,34 +40,6 @@ export default {
},
},
methods: {
async createQueryJob(queryOpts) {
const client = this.googleCloud
.getBigQueryClient()
.dataset(this.datasetId);
const [
job,
] = await client.createQueryJob(queryOpts);
return job;
},
async getRowsForQuery(job, pageSize = 1000, pageToken = null) {
const options = {
maxResults: pageSize,
};

if (pageToken) {
options.pageToken = pageToken;
}

const [
rows,
queryResults,
] = await job.getQueryResults(options);
const nextPageToken = queryResults?.pageToken;
return {
rows,
pageToken: nextPageToken,
};
},
_updateLastResultId(rows) {
const lastRow = rows.pop();
if (!lastRow) {
Expand All @@ -69,49 +51,101 @@ export default {
this.db.set("lastResultId", newLastResultId);
},
async processCollection(queryOpts, timestamp) {
const job = await this.createQueryJob(queryOpts);
const client = this.googleCloud
.getBigQueryClient()
.dataset(this.datasetId);
const [
job,
] = await client.createQueryJob(queryOpts);

const pageSize = 1000, maxPages = 10;
const pageSize = 100;
const maxRowsPerExecution = this.maxRowsPerExecution || 5000;
const maxPages = Math.ceil(maxRowsPerExecution / pageSize);
let pageToken = null;
let allProcessed = false;
let pageCount = 0;
let totalRowsProcessed = 0;

console.log(`Starting BigQuery processing with max ${maxRowsPerExecution} rows per execution`);

while (!allProcessed) {
const {
rows, pageToken: nextPageToken,
} = await this.getRowsForQuery(job, pageSize, pageToken);
try {
const options = {
maxResults: pageSize,
};

if (rows.length === 0) {
allProcessed = true;
break;
}
if (pageToken) {
options.pageToken = pageToken;
}

chunk(rows, this.eventSize).forEach((batch) => {
if (this.eventSize === 1) {
const meta = this.generateMeta(batch[0], timestamp);
this.$emit(batch[0], meta);
} else {
const meta = this.generateMetaForCollection(batch, timestamp);
const data = {
rows: batch,
rowCount: batch.length,
};
this.$emit(data, meta);
const [
rows,
queryResults,
] = await job.getQueryResults(options);
const nextPageToken = queryResults?.pageToken;

if (rows.length === 0) {
allProcessed = true;
break;
}
});

pageCount++;
if (pageCount >= maxPages) {
allProcessed = true;
}
if (this.uniqueKey) {
this._updateLastResultId(rows);
}
if (!nextPageToken) {
break;
// Check memory limits before processing
totalRowsProcessed += rows.length;
if (totalRowsProcessed > maxRowsPerExecution) {
console.log(`Reached max rows limit (${maxRowsPerExecution}). Stopping processing to prevent memory issues.`);
allProcessed = true;
break;
}

// Process rows immediately and in small chunks to reduce memory usage
chunk(rows, this.eventSize).forEach((batch) => {
try {
if (this.eventSize === 1) {
const meta = this.generateMeta(batch[0], timestamp);
this.$emit(batch[0], meta);
} else {
const meta = this.generateMetaForCollection(batch, timestamp);
const data = {
rows: batch,
rowCount: batch.length,
};
this.$emit(data, meta);
}
} catch (error) {
console.error("Error processing batch:", error);
throw error;
}
});

// Update last result ID before clearing rows
if (this.uniqueKey && rows.length > 0) {
this._updateLastResultId([
...rows,
]); // Pass a copy to avoid mutation issues
}

// Clear reference to help with garbage collection
rows.length = 0;

pageCount++;
if (pageCount >= maxPages) {
console.log(`Reached max pages limit (${maxPages}). Stopping processing.`);
allProcessed = true;
}
if (!nextPageToken) {
break;
}
pageToken = nextPageToken;
} catch (error) {
console.error("Error in BigQuery processing:", error);
if (error.message && error.message.includes("memory")) {
throw new Error(`Memory error in BigQuery processing. Consider reducing maxRowsPerExecution or eventSize. Original error: ${error.message}`);
}
throw error;
}
pageToken = nextPageToken;
}

console.log(`BigQuery processing completed. Processed ${totalRowsProcessed} rows in ${pageCount} pages.`);
},
getInitialEventCount() {
return 10;
Expand Down
Loading