Skip to content

Commit f968629

Browse files
committed
bigquery improvements
1 parent 214e592 commit f968629

File tree

4 files changed

+61
-29
lines changed

4 files changed

+61
-29
lines changed

components/google_cloud/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@pipedream/google_cloud",
3-
"version": "0.6.0",
3+
"version": "0.6.1",
44
"description": "Pipedream Google_cloud Components",
55
"main": "google_cloud.app.mjs",
66
"keywords": [
@@ -20,7 +20,7 @@
2020
"@google-cloud/logging": "^10.0.3",
2121
"@google-cloud/pubsub": "^3.0.1",
2222
"@google-cloud/storage": "^6.0.1",
23-
"@pipedream/platform": "^0.10.0",
23+
"@pipedream/platform": "^3.1.0",
2424
"crypto": "^1.0.1",
2525
"lodash-es": "^4.17.21",
2626
"uuid": "^8.3.2"

components/google_cloud/sources/bigquery-new-row/bigquery-new-row.mjs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ export default {
99
// eslint-disable-next-line pipedream/source-name
1010
name: "BigQuery - New Row",
1111
description: "Emit new events when a new row is added to a table",
12-
version: "0.1.6",
12+
version: "0.1.7",
1313
dedupe: "unique",
1414
type: "source",
1515
props: {

components/google_cloud/sources/bigquery-query-results/bigquery-query-results.mjs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ export default {
88
// eslint-disable-next-line pipedream/source-name
99
name: "BigQuery - Query Results",
1010
description: "Emit new events with the results of an arbitrary query",
11-
version: "0.1.5",
11+
version: "0.1.6",
1212
dedupe: "unique",
1313
type: "source",
1414
props: {

components/google_cloud/sources/common/bigquery.mjs

Lines changed: 57 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,33 @@ export default {
3030
},
3131
},
3232
methods: {
33-
async getRowsForQuery(queryOpts) {
33+
async createQueryJob(queryOpts) {
3434
const client = this.googleCloud
3535
.getBigQueryClient()
3636
.dataset(this.datasetId);
3737
const [
3838
job,
3939
] = await client.createQueryJob(queryOpts);
40+
return job;
41+
},
42+
async getRowsForQuery(job, pageSize = 1000, pageToken = null) {
43+
const options = {
44+
maxResults: pageSize,
45+
};
46+
47+
if (pageToken) {
48+
options.pageToken = pageToken;
49+
}
50+
51+
const response = await job.getQueryResults(options);
4052
const [
4153
rows,
42-
] = await job.getQueryResults();
43-
return rows;
54+
] = response;
55+
const nextPageToken = response.pageToken;
56+
return {
57+
rows,
58+
pageToken: nextPageToken,
59+
};
4460
},
4561
_updateLastResultId(rows) {
4662
const lastRow = rows.pop();
@@ -53,26 +69,44 @@ export default {
5369
this.db.set("lastResultId", newLastResultId);
5470
},
5571
async processCollection(queryOpts, timestamp) {
56-
const rows = await this.getRowsForQuery(queryOpts);
57-
chunk(rows, this.eventSize)
58-
.forEach((rows) => {
59-
const meta = this.generateMetaForCollection(rows, timestamp);
60-
const rowCount = rows.length;
61-
const data = {
62-
rows,
63-
rowCount,
64-
};
65-
this.$emit(data, meta);
72+
const job = await this.createQueryJob(queryOpts);
73+
74+
const pageSize = 1000, maxPages = 10;
75+
let pageToken = null;
76+
let allProcessed = false;
77+
let pageCount = 0;
78+
79+
while (!allProcessed) {
80+
const {
81+
rows, pageToken: nextPageToken,
82+
} = await this.getRowsForQuery(job, pageSize, pageToken);
83+
84+
if (rows.length === 0) {
85+
allProcessed = true;
86+
break;
87+
}
88+
89+
chunk(rows, this.eventSize).forEach((batch) => {
90+
if (this.eventSize === 1) {
91+
const meta = this.generateMeta(batch[0], timestamp);
92+
this.$emit(batch[0], meta);
93+
} else {
94+
const meta = this.generateMetaForCollection(batch, timestamp);
95+
const data = {
96+
rows: batch,
97+
rowCount: batch.length,
98+
};
99+
this.$emit(data, meta);
100+
}
66101
});
67-
if (this.uniqueKey) this._updateLastResultId(rows);
68-
},
69-
async processSingle(queryOpts, timestamp) {
70-
const rows = await this.getRowsForQuery(queryOpts);
71-
rows.forEach((row) => {
72-
const meta = this.generateMeta(row, timestamp);
73-
this.$emit(row, meta);
74-
});
75-
if (this.uniqueKey) this._updateLastResultId(rows);
102+
103+
pageToken = nextPageToken;
104+
pageCount++;
105+
if (pageCount >= maxPages) {
106+
allProcessed = true;
107+
}
108+
if (this.uniqueKey) this._updateLastResultId(rows);
109+
}
76110
},
77111
getInitialEventCount() {
78112
return 10;
@@ -93,8 +127,6 @@ export default {
93127
run(event) {
94128
const { timestamp } = event;
95129
const queryOpts = this.getQueryOpts(event);
96-
return (this.eventSize === 1) ?
97-
this.processSingle(queryOpts, timestamp) :
98-
this.processCollection(queryOpts, timestamp);
130+
return this.processCollection(queryOpts, timestamp);
99131
},
100132
};

0 commit comments

Comments
 (0)