Skip to content

Commit 8021f2b

Browse files
committed
updates
1 parent 169cd95 commit 8021f2b

File tree

2 files changed

+43
-14
lines changed

2 files changed

+43
-14
lines changed

components/google_cloud/sources/bigquery-new-row/bigquery-new-row.mjs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ export default {
131131
},
132132
getQueryOpts() {
133133
const lastResultId = this._getLastResultId();
134+
const maxRowsPerExecution = this.maxRowsPerExecution || 1000;
135+
134136
let query = `
135137
SELECT *
136138
FROM \`${this.tableId}\`
@@ -139,14 +141,25 @@ export default {
139141
query += ` WHERE \`${this.uniqueKey}\` >= @lastResultId`;
140142
}
141143
query += ` ORDER BY \`${this.uniqueKey}\` ASC`;
142-
const params = lastResultId
143-
? {
144-
lastResultId,
145-
}
146-
: {};
144+
query += " LIMIT @maxRows";
145+
146+
const params = {
147+
maxRows: maxRowsPerExecution,
148+
...(lastResultId
149+
? {
150+
lastResultId,
151+
}
152+
: {}),
153+
};
154+
147155
return {
148156
query,
149157
params,
158+
jobConfig: {
159+
// Add timeout to prevent runaway queries
160+
jobTimeoutMs: 300000, // 5 minutes
161+
maximumBytesBilled: "1000000000", // 1GB limit to prevent excessive costs
162+
},
150163
};
151164
},
152165
generateMeta(row, ts) {

components/google_cloud/sources/common/bigquery.mjs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ export default {
2626
maxRowsPerExecution: {
2727
type: "integer",
2828
label: "Max Rows Per Execution",
29-
description: "Maximum number of rows to process in a single execution to prevent memory issues",
30-
default: 5000,
29+
description: "Maximum number of rows to process in a single execution to prevent memory issues. Reduce this value if you experience out of memory errors.",
30+
default: 1000,
3131
min: 100,
32-
max: 50000,
32+
max: 10000,
3333
optional: true,
3434
},
3535
datasetId: {
@@ -54,12 +54,23 @@ export default {
5454
const client = this.googleCloud
5555
.getBigQueryClient()
5656
.dataset(this.datasetId);
57+
58+
// Merge jobConfig if provided for better resource management
59+
const jobOptions = {
60+
...queryOpts,
61+
...(queryOpts.jobConfig || {}),
62+
};
63+
5764
const [
5865
job,
59-
] = await client.createQueryJob(queryOpts);
66+
] = await client.createQueryJob(jobOptions);
67+
68+
// Ensure job completion before proceeding
69+
console.log("BigQuery job created, processing results...");
70+
await job.promise();
6071

6172
const pageSize = 100;
62-
const maxRowsPerExecution = this.maxRowsPerExecution || 5000;
73+
const maxRowsPerExecution = this.maxRowsPerExecution || 1000;
6374
const maxPages = Math.ceil(maxRowsPerExecution / pageSize);
6475
let pageToken = null;
6576
let allProcessed = false;
@@ -124,8 +135,13 @@ export default {
124135
]); // Pass a copy to avoid mutation issues
125136
}
126137

127-
// Clear reference to help with garbage collection
128-
rows.length = 0;
138+
// Clear references to help with garbage collection
139+
rows.splice(0, rows.length); // More efficient than rows.length = 0
140+
141+
// Force garbage collection if available (Node.js with --expose-gc)
142+
if (global.gc && pageCount % 10 === 0) {
143+
global.gc();
144+
}
129145

130146
pageCount++;
131147
if (pageCount >= maxPages) {
@@ -163,9 +179,9 @@ export default {
163179
throw new Error("processEvent is not implemented");
164180
},
165181
},
166-
run(event) {
182+
async run(event) {
167183
const { timestamp } = event;
168184
const queryOpts = this.getQueryOpts(event);
169-
return this.processCollection(queryOpts, timestamp);
185+
return await this.processCollection(queryOpts, timestamp);
170186
},
171187
};

0 commit comments

Comments
 (0)