Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions components/google_cloud/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pipedream/google_cloud",
"version": "0.6.0",
"version": "0.6.1",
"description": "Pipedream Google_cloud Components",
"main": "google_cloud.app.mjs",
"keywords": [
Expand All @@ -20,7 +20,7 @@
"@google-cloud/logging": "^10.0.3",
"@google-cloud/pubsub": "^3.0.1",
"@google-cloud/storage": "^6.0.1",
"@pipedream/platform": "^0.10.0",
"@pipedream/platform": "^3.1.0",
"crypto": "^1.0.1",
"lodash-es": "^4.17.21",
"uuid": "^8.3.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export default {
// eslint-disable-next-line pipedream/source-name
name: "BigQuery - New Row",
description: "Emit new events when a new row is added to a table",
version: "0.1.6",
version: "0.1.7",
dedupe: "unique",
type: "source",
props: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export default {
// eslint-disable-next-line pipedream/source-name
name: "BigQuery - Query Results",
description: "Emit new events with the results of an arbitrary query",
version: "0.1.5",
version: "0.1.6",
dedupe: "unique",
type: "source",
props: {
Expand Down
82 changes: 57 additions & 25 deletions components/google_cloud/sources/common/bigquery.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,33 @@ export default {
},
},
methods: {
async getRowsForQuery(queryOpts) {
async createQueryJob(queryOpts) {
const client = this.googleCloud
.getBigQueryClient()
.dataset(this.datasetId);
const [
job,
] = await client.createQueryJob(queryOpts);
return job;
},
async getRowsForQuery(job, pageSize = 1000, pageToken = null) {
const options = {
maxResults: pageSize,
};

if (pageToken) {
options.pageToken = pageToken;
}

const response = await job.getQueryResults(options);
const [
rows,
] = await job.getQueryResults();
return rows;
] = response;
const nextPageToken = response.pageToken;
return {
rows,
pageToken: nextPageToken,
};
},
_updateLastResultId(rows) {
const lastRow = rows.pop();
Expand All @@ -53,26 +69,44 @@ export default {
this.db.set("lastResultId", newLastResultId);
},
async processCollection(queryOpts, timestamp) {
const rows = await this.getRowsForQuery(queryOpts);
chunk(rows, this.eventSize)
.forEach((rows) => {
const meta = this.generateMetaForCollection(rows, timestamp);
const rowCount = rows.length;
const data = {
rows,
rowCount,
};
this.$emit(data, meta);
const job = await this.createQueryJob(queryOpts);

const pageSize = 1000, maxPages = 10;
let pageToken = null;
let allProcessed = false;
let pageCount = 0;

while (!allProcessed) {
const {
rows, pageToken: nextPageToken,
} = await this.getRowsForQuery(job, pageSize, pageToken);

if (rows.length === 0) {
allProcessed = true;
break;
}

chunk(rows, this.eventSize).forEach((batch) => {
if (this.eventSize === 1) {
const meta = this.generateMeta(batch[0], timestamp);
this.$emit(batch[0], meta);
} else {
const meta = this.generateMetaForCollection(batch, timestamp);
const data = {
rows: batch,
rowCount: batch.length,
};
this.$emit(data, meta);
}
});
if (this.uniqueKey) this._updateLastResultId(rows);
},
async processSingle(queryOpts, timestamp) {
const rows = await this.getRowsForQuery(queryOpts);
rows.forEach((row) => {
const meta = this.generateMeta(row, timestamp);
this.$emit(row, meta);
});
if (this.uniqueKey) this._updateLastResultId(rows);

pageToken = nextPageToken;
pageCount++;
if (pageCount >= maxPages) {
allProcessed = true;
}
if (this.uniqueKey) this._updateLastResultId(rows);
}
},
getInitialEventCount() {
return 10;
Expand All @@ -93,8 +127,6 @@ export default {
run(event) {
const { timestamp } = event;
const queryOpts = this.getQueryOpts(event);
return (this.eventSize === 1) ?
this.processSingle(queryOpts, timestamp) :
this.processCollection(queryOpts, timestamp);
return this.processCollection(queryOpts, timestamp);
},
};
18 changes: 8 additions & 10 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading