Skip to content

Commit 2c173cc

Browse files
committed
updates
1 parent bb97b76 commit 2c173cc

File tree

3 files changed

+85
-158
lines changed

3 files changed

+85
-158
lines changed

components/google_cloud/sources/bigquery-new-row/bigquery-new-row.mjs

Lines changed: 21 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -114,76 +114,39 @@ export default {
114114
.getBigQueryClient()
115115
.dataset(this.datasetId);
116116

117-
let job;
118-
try {
119-
const [
120-
createdJob,
121-
] = await client.createQueryJob(queryOpts);
122-
job = createdJob;
123-
124-
const [
125-
rows,
126-
] = await job.getQueryResults();
127-
128-
if (rows.length === 0) {
129-
console.log(`
130-
No records found in the target table, will start scanning from the beginning
131-
`);
132-
return;
133-
}
117+
const [
118+
job,
119+
] = await client.createQueryJob(queryOpts);
134120

135-
const startingRow = rows.pop();
136-
return startingRow[this.uniqueKey];
137-
} finally {
138-
// CRITICAL: Clean up ALL BigQuery resources to prevent memory leaks
139-
if (job) {
140-
try {
141-
console.log("Cleaning up BigQuery job resources in _getIdOfLastRow...");
142-
await job.cancel();
143-
job.removeAllListeners && job.removeAllListeners();
121+
const [
122+
rows,
123+
] = await job.getQueryResults();
144124

145-
// Also clean up the BigQuery client instance
146-
const bigQueryClient = this.googleCloud.getBigQueryClient();
147-
if (bigQueryClient && typeof bigQueryClient.close === "function") {
148-
await bigQueryClient.close();
149-
}
150-
} catch (cleanupError) {
151-
console.warn("Warning: Error during BigQuery job cleanup in _getIdOfLastRow:", cleanupError.message);
152-
}
153-
}
125+
if (rows.length === 0) {
126+
console.log(`
127+
No records found in the target table, will start scanning from the beginning
128+
`);
129+
return;
154130
}
131+
132+
const startingRow = rows.pop();
133+
return startingRow[this.uniqueKey];
155134
},
156135
getQueryOpts() {
157136
const lastResultId = this._getLastResultId();
158-
const maxRowsPerExecution = this.maxRowsPerExecution || 5000;
159-
160-
let query = `
161-
SELECT *
162-
FROM \`${this.tableId}\`
163-
`;
137+
let query = `SELECT * FROM \`${this.tableId}\``;
164138
if (lastResultId) {
165139
query += ` WHERE \`${this.uniqueKey}\` >= @lastResultId`;
166140
}
167-
query += ` ORDER BY \`${this.uniqueKey}\` ASC`;
168-
query += " LIMIT @maxRows";
169-
170-
const params = {
171-
maxRows: maxRowsPerExecution,
172-
...(lastResultId
173-
? {
174-
lastResultId,
175-
}
176-
: {}),
177-
};
178-
141+
query += ` ORDER BY \`${this.uniqueKey}\` DESC`;
142+
const params = lastResultId
143+
? {
144+
lastResultId,
145+
}
146+
: {};
179147
return {
180148
query,
181149
params,
182-
jobConfig: {
183-
// Add timeout to prevent runaway queries
184-
jobTimeoutMs: 300000, // 5 minutes
185-
maximumBytesBilled: "1000000000", // 1GB limit to prevent excessive costs
186-
},
187150
};
188151
},
189152
generateMeta(row, ts) {

components/google_cloud/sources/bigquery-query-results/bigquery-query-results.mjs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ export default {
88
// eslint-disable-next-line pipedream/source-name
99
name: "BigQuery - Query Results",
1010
description: "Emit new events with the results of an arbitrary query",
11-
version: "0.1.8",
11+
version: "0.1.9",
1212
dedupe: "unique",
1313
type: "source",
1414
props: {

components/google_cloud/sources/common/bigquery.mjs

Lines changed: 63 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,9 @@ export default {
5454
const client = this.googleCloud
5555
.getBigQueryClient()
5656
.dataset(this.datasetId);
57-
58-
// Merge jobConfig if provided for better resource management
59-
const jobOptions = {
60-
...queryOpts,
61-
...(queryOpts.jobConfig || {}),
62-
};
63-
6457
const [
6558
job,
66-
] = await client.createQueryJob(jobOptions);
67-
68-
// Ensure job completion before proceeding
69-
console.log("BigQuery job created, processing results...");
70-
await job.promise();
59+
] = await client.createQueryJob(queryOpts);
7160

7261
const pageSize = 100;
7362
const maxRowsPerExecution = this.maxRowsPerExecution || 5000;
@@ -79,105 +68,80 @@ export default {
7968

8069
console.log(`Starting BigQuery processing with max ${maxRowsPerExecution} rows per execution`);
8170

82-
try {
83-
while (!allProcessed) {
84-
try {
85-
const options = {
86-
maxResults: pageSize,
87-
};
71+
while (!allProcessed) {
72+
try {
73+
const options = {
74+
maxResults: pageSize,
75+
};
8876

89-
if (pageToken) {
90-
options.pageToken = pageToken;
91-
}
77+
if (pageToken) {
78+
options.pageToken = pageToken;
79+
}
9280

93-
const [
94-
rows,
95-
queryResults,
96-
] = await job.getQueryResults(options);
97-
const nextPageToken = queryResults?.pageToken;
81+
const [
82+
rows,
83+
queryResults,
84+
] = await job.getQueryResults(options);
85+
const nextPageToken = queryResults?.pageToken;
9886

99-
if (rows.length === 0) {
100-
allProcessed = true;
101-
break;
102-
}
87+
if (rows.length === 0) {
88+
allProcessed = true;
89+
break;
90+
}
10391

104-
// Check memory limits before processing
105-
totalRowsProcessed += rows.length;
106-
if (totalRowsProcessed > maxRowsPerExecution) {
107-
console.log(`Reached max rows limit (${maxRowsPerExecution}). Stopping processing to prevent memory issues.`);
108-
allProcessed = true;
109-
break;
110-
}
92+
// Check memory limits before processing
93+
totalRowsProcessed += rows.length;
94+
if (totalRowsProcessed > maxRowsPerExecution) {
95+
console.log(`Reached max rows limit (${maxRowsPerExecution}). Stopping processing to prevent memory issues.`);
96+
allProcessed = true;
97+
break;
98+
}
11199

112-
// Process rows immediately and in small chunks to reduce memory usage
113-
chunk(rows, this.eventSize).forEach((batch) => {
114-
try {
115-
if (this.eventSize === 1) {
116-
const meta = this.generateMeta(batch[0], timestamp);
117-
this.$emit(batch[0], meta);
118-
} else {
119-
const meta = this.generateMetaForCollection(batch, timestamp);
120-
const data = {
121-
rows: batch,
122-
rowCount: batch.length,
123-
};
124-
this.$emit(data, meta);
125-
}
126-
} catch (error) {
127-
console.error("Error processing batch:", error);
128-
throw error;
100+
// Process rows immediately and in small chunks to reduce memory usage
101+
chunk(rows, this.eventSize).forEach((batch) => {
102+
try {
103+
if (this.eventSize === 1) {
104+
const meta = this.generateMeta(batch[0], timestamp);
105+
this.$emit(batch[0], meta);
106+
} else {
107+
const meta = this.generateMetaForCollection(batch, timestamp);
108+
const data = {
109+
rows: batch,
110+
rowCount: batch.length,
111+
};
112+
this.$emit(data, meta);
129113
}
130-
});
131-
132-
// Update last result ID before clearing rows
133-
if (this.uniqueKey && rows.length > 0) {
134-
this._updateLastResultId([
135-
...rows,
136-
]); // Pass a copy to avoid mutation issues
114+
} catch (error) {
115+
console.error("Error processing batch:", error);
116+
throw error;
137117
}
118+
});
138119

139-
// Clear references to help with garbage collection
140-
rows.splice(0, rows.length); // More efficient than rows.length = 0
120+
// Update last result ID before clearing rows
121+
if (this.uniqueKey && rows.length > 0) {
122+
this._updateLastResultId([
123+
...rows,
124+
]); // Pass a copy to avoid mutation issues
125+
}
141126

142-
// Force garbage collection if available (Node.js with --expose-gc)
143-
if (global.gc && pageCount % 10 === 0) {
144-
global.gc();
145-
}
127+
// Clear reference to help with garbage collection
128+
rows.length = 0;
146129

147-
pageCount++;
148-
if (pageCount >= maxPages) {
149-
console.log(`Reached max pages limit (${maxPages}). Stopping processing.`);
150-
allProcessed = true;
151-
}
152-
if (!nextPageToken) {
153-
break;
154-
}
155-
pageToken = nextPageToken;
156-
} catch (error) {
157-
console.error("Error in BigQuery processing:", error);
158-
if (error.message && error.message.includes("memory")) {
159-
throw new Error(`Memory error in BigQuery processing. Consider reducing maxRowsPerExecution or eventSize. Original error: ${error.message}`);
160-
}
161-
throw error;
130+
pageCount++;
131+
if (pageCount >= maxPages) {
132+
console.log(`Reached max pages limit (${maxPages}). Stopping processing.`);
133+
allProcessed = true;
162134
}
163-
}
164-
} finally {
165-
// CRITICAL: Clean up ALL BigQuery resources to prevent memory leaks
166-
try {
167-
console.log("Cleaning up BigQuery job resources...");
168-
// Cancel the job if it's still running (shouldn't be, but just in case)
169-
await job.cancel();
170-
// Clear any internal references
171-
job.removeAllListeners && job.removeAllListeners();
172-
173-
// Also clean up the BigQuery client instance to ensure connections are closed
174-
const bigQueryClient = this.googleCloud.getBigQueryClient();
175-
if (bigQueryClient && typeof bigQueryClient.close === "function") {
176-
await bigQueryClient.close();
135+
if (!nextPageToken) {
136+
break;
137+
}
138+
pageToken = nextPageToken;
139+
} catch (error) {
140+
console.error("Error in BigQuery processing:", error);
141+
if (error.message && error.message.includes("memory")) {
142+
throw new Error(`Memory error in BigQuery processing. Consider reducing maxRowsPerExecution or eventSize. Original error: ${error.message}`);
177143
}
178-
} catch (cleanupError) {
179-
console.warn("Warning: Error during BigQuery resource cleanup:", cleanupError.message);
180-
// Don't throw - cleanup errors shouldn't fail the main process
144+
throw error;
181145
}
182146
}
183147

0 commit comments

Comments
 (0)