Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/google_cloud/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pipedream/google_cloud",
"version": "0.6.2",
"version": "0.6.3",
"description": "Pipedream Google_cloud Components",
"main": "google_cloud.app.mjs",
"keywords": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export default {
// eslint-disable-next-line pipedream/source-name
name: "BigQuery - New Row",
description: "Emit new events when a new row is added to a table",
version: "0.1.8",
version: "0.1.9",
dedupe: "unique",
type: "source",
props: {
Expand Down Expand Up @@ -56,9 +56,6 @@ export default {
const lastResultId = await this._getIdOfLastRow();
this._setLastResultId(lastResultId);
},
deactivate() {
this._setLastResultId(null);
},
},
methods: {
...common.methods,
Expand Down Expand Up @@ -113,7 +110,15 @@ export default {
limit,
},
};
const rows = await this.getRowsForQuery(queryOpts, this.datasetId);
const client = this.googleCloud
.getBigQueryClient()
.dataset(this.datasetId);
const [
job,
] = await client.createQueryJob(queryOpts);
const [
rows,
] = await job.getQueryResults();
if (rows.length === 0) {
console.log(`
No records found in the target table, will start scanning from the beginning
Expand All @@ -126,18 +131,35 @@ export default {
},
getQueryOpts() {
const lastResultId = this._getLastResultId();
const query = `
const maxRowsPerExecution = this.maxRowsPerExecution || 1000;

let query = `
SELECT *
FROM \`${this.tableId}\`
WHERE \`${this.uniqueKey}\` >= @lastResultId
ORDER BY \`${this.uniqueKey}\` ASC
`;
if (lastResultId) {
query += ` WHERE \`${this.uniqueKey}\` >= @lastResultId`;
}
query += ` ORDER BY \`${this.uniqueKey}\` ASC`;
query += " LIMIT @maxRows";

const params = {
lastResultId,
maxRows: maxRowsPerExecution,
...(lastResultId
? {
lastResultId,
}
: {}),
};

return {
query,
params,
jobConfig: {
// Add timeout to prevent runaway queries
jobTimeoutMs: 300000, // 5 minutes
maximumBytesBilled: "1000000000", // 1GB limit to prevent excessive costs
},
};
},
generateMeta(row, ts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export default {
// eslint-disable-next-line pipedream/source-name
name: "BigQuery - Query Results",
description: "Emit new events with the results of an arbitrary query",
version: "0.1.7",
version: "0.1.8",
dedupe: "unique",
type: "source",
props: {
Expand Down
34 changes: 25 additions & 9 deletions components/google_cloud/sources/common/bigquery.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ export default {
maxRowsPerExecution: {
type: "integer",
label: "Max Rows Per Execution",
description: "Maximum number of rows to process in a single execution to prevent memory issues",
default: 5000,
description: "Maximum number of rows to process in a single execution to prevent memory issues. Reduce this value if you experience out of memory errors.",
default: 1000,
min: 100,
max: 50000,
max: 10000,
optional: true,
},
datasetId: {
Expand All @@ -54,12 +54,23 @@ export default {
const client = this.googleCloud
.getBigQueryClient()
.dataset(this.datasetId);

// Merge jobConfig if provided for better resource management
const jobOptions = {
...queryOpts,
...(queryOpts.jobConfig || {}),
};

const [
job,
] = await client.createQueryJob(queryOpts);
] = await client.createQueryJob(jobOptions);

// Ensure job completion before proceeding
console.log("BigQuery job created, processing results...");
await job.promise();

const pageSize = 100;
const maxRowsPerExecution = this.maxRowsPerExecution || 5000;
const maxRowsPerExecution = this.maxRowsPerExecution || 1000;
const maxPages = Math.ceil(maxRowsPerExecution / pageSize);
let pageToken = null;
let allProcessed = false;
Expand Down Expand Up @@ -124,8 +135,13 @@ export default {
]); // Pass a copy to avoid mutation issues
}

// Clear reference to help with garbage collection
rows.length = 0;
// Clear references to help with garbage collection
rows.splice(0, rows.length); // More efficient than rows.length = 0

// Force garbage collection if available (Node.js with --expose-gc)
if (global.gc && pageCount % 10 === 0) {
global.gc();
}

pageCount++;
if (pageCount >= maxPages) {
Expand Down Expand Up @@ -163,9 +179,9 @@ export default {
throw new Error("processEvent is not implemented");
},
},
run(event) {
async run(event) {
const { timestamp } = event;
const queryOpts = this.getQueryOpts(event);
return this.processCollection(queryOpts, timestamp);
return await this.processCollection(queryOpts, timestamp);
},
};
Loading