Skip to content

Commit 2b5ab6f

Browse files
authored
Merging pull request #18467
* bigquery improvements * pnpm-lock.yaml * updates
1 parent 8b6d4e3 commit 2b5ab6f

File tree

5 files changed

+74
-39
lines changed

5 files changed

+74
-39
lines changed

components/google_cloud/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@pipedream/google_cloud",
3-
"version": "0.6.0",
3+
"version": "0.6.1",
44
"description": "Pipedream Google_cloud Components",
55
"main": "google_cloud.app.mjs",
66
"keywords": [
@@ -20,7 +20,7 @@
2020
"@google-cloud/logging": "^10.0.3",
2121
"@google-cloud/pubsub": "^3.0.1",
2222
"@google-cloud/storage": "^6.0.1",
23-
"@pipedream/platform": "^0.10.0",
23+
"@pipedream/platform": "^3.1.0",
2424
"crypto": "^1.0.1",
2525
"lodash-es": "^4.17.21",
2626
"uuid": "^8.3.2"

components/google_cloud/sources/bigquery-new-row/bigquery-new-row.mjs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ export default {
99
// eslint-disable-next-line pipedream/source-name
1010
name: "BigQuery - New Row",
1111
description: "Emit new events when a new row is added to a table",
12-
version: "0.1.6",
12+
version: "0.1.7",
1313
dedupe: "unique",
1414
type: "source",
1515
props: {

components/google_cloud/sources/bigquery-query-results/bigquery-query-results.mjs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ export default {
88
// eslint-disable-next-line pipedream/source-name
99
name: "BigQuery - Query Results",
1010
description: "Emit new events with the results of an arbitrary query",
11-
version: "0.1.5",
11+
version: "0.1.6",
1212
dedupe: "unique",
1313
type: "source",
1414
props: {

components/google_cloud/sources/common/bigquery.mjs

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,33 @@ export default {
3030
},
3131
},
3232
methods: {
33-
async getRowsForQuery(queryOpts) {
33+
async createQueryJob(queryOpts) {
3434
const client = this.googleCloud
3535
.getBigQueryClient()
3636
.dataset(this.datasetId);
3737
const [
3838
job,
3939
] = await client.createQueryJob(queryOpts);
40+
return job;
41+
},
42+
async getRowsForQuery(job, pageSize = 1000, pageToken = null) {
43+
const options = {
44+
maxResults: pageSize,
45+
};
46+
47+
if (pageToken) {
48+
options.pageToken = pageToken;
49+
}
50+
4051
const [
4152
rows,
42-
] = await job.getQueryResults();
43-
return rows;
53+
queryResults,
54+
] = await job.getQueryResults(options);
55+
const nextPageToken = queryResults?.pageToken;
56+
return {
57+
rows,
58+
pageToken: nextPageToken,
59+
};
4460
},
4561
_updateLastResultId(rows) {
4662
const lastRow = rows.pop();
@@ -53,26 +69,49 @@ export default {
5369
this.db.set("lastResultId", newLastResultId);
5470
},
5571
async processCollection(queryOpts, timestamp) {
56-
const rows = await this.getRowsForQuery(queryOpts);
57-
chunk(rows, this.eventSize)
58-
.forEach((rows) => {
59-
const meta = this.generateMetaForCollection(rows, timestamp);
60-
const rowCount = rows.length;
61-
const data = {
62-
rows,
63-
rowCount,
64-
};
65-
this.$emit(data, meta);
72+
const job = await this.createQueryJob(queryOpts);
73+
74+
const pageSize = 1000, maxPages = 10;
75+
let pageToken = null;
76+
let allProcessed = false;
77+
let pageCount = 0;
78+
79+
while (!allProcessed) {
80+
const {
81+
rows, pageToken: nextPageToken,
82+
} = await this.getRowsForQuery(job, pageSize, pageToken);
83+
84+
if (rows.length === 0) {
85+
allProcessed = true;
86+
break;
87+
}
88+
89+
chunk(rows, this.eventSize).forEach((batch) => {
90+
if (this.eventSize === 1) {
91+
const meta = this.generateMeta(batch[0], timestamp);
92+
this.$emit(batch[0], meta);
93+
} else {
94+
const meta = this.generateMetaForCollection(batch, timestamp);
95+
const data = {
96+
rows: batch,
97+
rowCount: batch.length,
98+
};
99+
this.$emit(data, meta);
100+
}
66101
});
67-
if (this.uniqueKey) this._updateLastResultId(rows);
68-
},
69-
async processSingle(queryOpts, timestamp) {
70-
const rows = await this.getRowsForQuery(queryOpts);
71-
rows.forEach((row) => {
72-
const meta = this.generateMeta(row, timestamp);
73-
this.$emit(row, meta);
74-
});
75-
if (this.uniqueKey) this._updateLastResultId(rows);
102+
103+
pageCount++;
104+
if (pageCount >= maxPages) {
105+
allProcessed = true;
106+
}
107+
if (this.uniqueKey) {
108+
this._updateLastResultId(rows);
109+
}
110+
if (!nextPageToken) {
111+
break;
112+
}
113+
pageToken = nextPageToken;
114+
}
76115
},
77116
getInitialEventCount() {
78117
return 10;
@@ -93,8 +132,6 @@ export default {
93132
run(event) {
94133
const { timestamp } = event;
95134
const queryOpts = this.getQueryOpts(event);
96-
return (this.eventSize === 1) ?
97-
this.processSingle(queryOpts, timestamp) :
98-
this.processCollection(queryOpts, timestamp);
135+
return this.processCollection(queryOpts, timestamp);
99136
},
100137
};

pnpm-lock.yaml

Lines changed: 8 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)