Skip to content

Commit cd835d7

Browse files
authored
updates (#18490)
1 parent f4b15db commit cd835d7

File tree

4 files changed

+102
-71
lines changed

4 files changed

+102
-71
lines changed

components/google_cloud/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@pipedream/google_cloud",
3-
"version": "0.6.1",
3+
"version": "0.6.2",
44
"description": "Pipedream Google_cloud Components",
55
"main": "google_cloud.app.mjs",
66
"keywords": [

components/google_cloud/sources/bigquery-new-row/bigquery-new-row.mjs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ export default {
99
// eslint-disable-next-line pipedream/source-name
1010
name: "BigQuery - New Row",
1111
description: "Emit new events when a new row is added to a table",
12-
version: "0.1.7",
12+
version: "0.1.8",
1313
dedupe: "unique",
1414
type: "source",
1515
props: {

components/google_cloud/sources/bigquery-query-results/bigquery-query-results.mjs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ export default {
88
// eslint-disable-next-line pipedream/source-name
99
name: "BigQuery - Query Results",
1010
description: "Emit new events with the results of an arbitrary query",
11-
version: "0.1.6",
11+
version: "0.1.7",
1212
dedupe: "unique",
1313
type: "source",
1414
props: {
@@ -21,11 +21,7 @@ export default {
2121
dedupeKey: {
2222
type: "string",
2323
label: "De-duplication Key",
24-
description: `
25-
The name of a column in the table to use for deduplication. See [the
26-
docs](https://github.com/PipedreamHQ/pipedream/tree/master/components/google_cloud/sources/bigquery-query-results#technical-details)
27-
for more info.
28-
`,
24+
description: "The name of a column in the table to use for deduplication. See [the docs](https://github.com/PipedreamHQ/pipedream/tree/master/components/google_cloud/sources/bigquery-query-results#technical-details) for more info.",
2925
optional: true,
3026
},
3127
},
@@ -47,10 +43,11 @@ export default {
4743
},
4844
generateMetaForCollection(rows, ts) {
4945
const hash = crypto.createHash("sha1");
50-
rows
51-
.map((i) => i[this.dedupeKey] || uuidv4())
52-
.map((i) => i.toString())
53-
.forEach((i) => hash.update(i));
46+
// Process rows incrementally to avoid memory accumulation
47+
for (const row of rows) {
48+
const key = row[this.dedupeKey] || uuidv4();
49+
hash.update(key.toString());
50+
}
5451
const id = hash.digest("base64");
5552
const summary = `New event (ID: ${id})`;
5653
return {

components/google_cloud/sources/common/bigquery.mjs

Lines changed: 93 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,16 @@ export default {
2121
description: "The number of rows to include in a single event (by default, emits 1 event per row)",
2222
default: 1,
2323
min: 1,
24+
max: 1000,
25+
},
26+
maxRowsPerExecution: {
27+
type: "integer",
28+
label: "Max Rows Per Execution",
29+
description: "Maximum number of rows to process in a single execution to prevent memory issues",
30+
default: 5000,
31+
min: 100,
32+
max: 50000,
33+
optional: true,
2434
},
2535
datasetId: {
2636
propDefinition: [
@@ -30,34 +40,6 @@ export default {
3040
},
3141
},
3242
methods: {
33-
async createQueryJob(queryOpts) {
34-
const client = this.googleCloud
35-
.getBigQueryClient()
36-
.dataset(this.datasetId);
37-
const [
38-
job,
39-
] = await client.createQueryJob(queryOpts);
40-
return job;
41-
},
42-
async getRowsForQuery(job, pageSize = 1000, pageToken = null) {
43-
const options = {
44-
maxResults: pageSize,
45-
};
46-
47-
if (pageToken) {
48-
options.pageToken = pageToken;
49-
}
50-
51-
const [
52-
rows,
53-
queryResults,
54-
] = await job.getQueryResults(options);
55-
const nextPageToken = queryResults?.pageToken;
56-
return {
57-
rows,
58-
pageToken: nextPageToken,
59-
};
60-
},
6143
_updateLastResultId(rows) {
6244
const lastRow = rows.pop();
6345
if (!lastRow) {
@@ -69,49 +51,101 @@ export default {
6951
this.db.set("lastResultId", newLastResultId);
7052
},
7153
async processCollection(queryOpts, timestamp) {
72-
const job = await this.createQueryJob(queryOpts);
54+
const client = this.googleCloud
55+
.getBigQueryClient()
56+
.dataset(this.datasetId);
57+
const [
58+
job,
59+
] = await client.createQueryJob(queryOpts);
7360

74-
const pageSize = 1000, maxPages = 10;
61+
const pageSize = 100;
62+
const maxRowsPerExecution = this.maxRowsPerExecution || 5000;
63+
const maxPages = Math.ceil(maxRowsPerExecution / pageSize);
7564
let pageToken = null;
7665
let allProcessed = false;
7766
let pageCount = 0;
67+
let totalRowsProcessed = 0;
68+
69+
console.log(`Starting BigQuery processing with max ${maxRowsPerExecution} rows per execution`);
7870

7971
while (!allProcessed) {
80-
const {
81-
rows, pageToken: nextPageToken,
82-
} = await this.getRowsForQuery(job, pageSize, pageToken);
72+
try {
73+
const options = {
74+
maxResults: pageSize,
75+
};
8376

84-
if (rows.length === 0) {
85-
allProcessed = true;
86-
break;
87-
}
77+
if (pageToken) {
78+
options.pageToken = pageToken;
79+
}
8880

89-
chunk(rows, this.eventSize).forEach((batch) => {
90-
if (this.eventSize === 1) {
91-
const meta = this.generateMeta(batch[0], timestamp);
92-
this.$emit(batch[0], meta);
93-
} else {
94-
const meta = this.generateMetaForCollection(batch, timestamp);
95-
const data = {
96-
rows: batch,
97-
rowCount: batch.length,
98-
};
99-
this.$emit(data, meta);
81+
const [
82+
rows,
83+
queryResults,
84+
] = await job.getQueryResults(options);
85+
const nextPageToken = queryResults?.pageToken;
86+
87+
if (rows.length === 0) {
88+
allProcessed = true;
89+
break;
10090
}
101-
});
10291

103-
pageCount++;
104-
if (pageCount >= maxPages) {
105-
allProcessed = true;
106-
}
107-
if (this.uniqueKey) {
108-
this._updateLastResultId(rows);
109-
}
110-
if (!nextPageToken) {
111-
break;
92+
// Check memory limits before processing
93+
totalRowsProcessed += rows.length;
94+
if (totalRowsProcessed > maxRowsPerExecution) {
95+
console.log(`Reached max rows limit (${maxRowsPerExecution}). Stopping processing to prevent memory issues.`);
96+
allProcessed = true;
97+
break;
98+
}
99+
100+
// Process rows immediately and in small chunks to reduce memory usage
101+
chunk(rows, this.eventSize).forEach((batch) => {
102+
try {
103+
if (this.eventSize === 1) {
104+
const meta = this.generateMeta(batch[0], timestamp);
105+
this.$emit(batch[0], meta);
106+
} else {
107+
const meta = this.generateMetaForCollection(batch, timestamp);
108+
const data = {
109+
rows: batch,
110+
rowCount: batch.length,
111+
};
112+
this.$emit(data, meta);
113+
}
114+
} catch (error) {
115+
console.error("Error processing batch:", error);
116+
throw error;
117+
}
118+
});
119+
120+
// Update last result ID before clearing rows
121+
if (this.uniqueKey && rows.length > 0) {
122+
this._updateLastResultId([
123+
...rows,
124+
]); // Pass a copy to avoid mutation issues
125+
}
126+
127+
// Clear reference to help with garbage collection
128+
rows.length = 0;
129+
130+
pageCount++;
131+
if (pageCount >= maxPages) {
132+
console.log(`Reached max pages limit (${maxPages}). Stopping processing.`);
133+
allProcessed = true;
134+
}
135+
if (!nextPageToken) {
136+
break;
137+
}
138+
pageToken = nextPageToken;
139+
} catch (error) {
140+
console.error("Error in BigQuery processing:", error);
141+
if (error.message && error.message.includes("memory")) {
142+
throw new Error(`Memory error in BigQuery processing. Consider reducing maxRowsPerExecution or eventSize. Original error: ${error.message}`);
143+
}
144+
throw error;
112145
}
113-
pageToken = nextPageToken;
114146
}
147+
148+
console.log(`BigQuery processing completed. Processed ${totalRowsProcessed} rows in ${pageCount} pages.`);
115149
},
116150
getInitialEventCount() {
117151
return 10;

0 commit comments

Comments
 (0)