Skip to content

Conversation

@michelle0927
Copy link
Collaborator

@michelle0927 michelle0927 commented Sep 25, 2025

Resolves #18352

Summary by CodeRabbit

  • New Features

    • BigQuery query sources now support paginated processing of large result sets, emitting events in batches for improved reliability.
  • Refactor

    • Unified BigQuery query processing into a single, paginated collection flow for more predictable behavior and stability.
  • Chores

    • Updated Google Cloud component dependency and bumped package versions.
    • Incremented versions for BigQuery - New Row and BigQuery - Query Results sources.

@vercel
Copy link

vercel bot commented Sep 25, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

2 Skipped Deployments
Project Deployment Preview Comments Updated (UTC)
pipedream-docs Ignored Ignored Sep 25, 2025 6:31pm
pipedream-docs-redirect-do-not-edit Ignored Ignored Sep 25, 2025 6:31pm

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 25, 2025

Walkthrough

Adds explicit BigQuery pagination: introduces createQueryJob, changes getRowsForQuery to accept a job and page tokens, implements a paginated processCollection loop emitting per-row or batched events, removes processSingle, and bumps package and source version metadata and a dependency.

Changes

Cohort / File(s) Summary
Package version & dependency
components/google_cloud/package.json
Bumps package version 0.6.00.6.1; updates dependency @pipedream/platform ^0.10.0^3.1.0.
Source version metadata
components/google_cloud/sources/bigquery-new-row/bigquery-new-row.mjs, components/google_cloud/sources/bigquery-query-results/bigquery-query-results.mjs
Updated exported version strings: bigquery-new-row 0.1.60.1.7; bigquery-query-results 0.1.50.1.6.
BigQuery common pagination refactor
components/google_cloud/sources/common/bigquery.mjs
Adds createQueryJob(queryOpts); changes getRowsForQuery signature to accept a job and pagination params; implements processCollection(queryOpts, timestamp) with paginated loop using pageToken, pageSize, and maxPages; removes processSingle; run always calls processCollection; several abstract methods now throw if unimplemented.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Src as Source
  participant BQ as BigQuery API

  rect rgb(242,248,255)
  note over Src: run() → getQueryOpts() → createQueryJob()
  Src->>Src: getQueryOpts()
  Src->>BQ: createQueryJob(queryOpts)
  BQ-->>Src: job
  end

  rect rgb(245,255,245)
  loop paginate (pageToken, maxPages)
    Src->>BQ: getRowsForQuery(job, pageSize, pageToken)
    BQ-->>Src: { rows[], nextPageToken }
    alt eventSize == 1
      Src->>Src: processEvent(row) / $emit(row, meta)
    else
      Src->>Src: generateMetaForCollection(rows) / $emit(rows, meta)
    end
    Src->>Src: update pageToken / increment page count
    alt stop conditions
      Src->>Src: stop on no rows or no nextPageToken or maxPages reached
    end
  end
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

I hop through pages, token by token,
Fetching rows so no memory's broken.
New job, new flow, I skip and bound—
Emitting batches, one hop, then round.
Version badges on my back, I’m proud and awoken. 🐇✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Description Check ⚠️ Warning The pull request description only contains the line "Resolves #18352" and does not follow the required template structure, missing the "## WHY" section and any details explaining the motivation or summary of the changes. Please expand the description to include the "## WHY" section from the repository template, explaining the motivation for the changes, summarizing the updates, and linking to the issue context.
✅ Passed checks (4 passed)
Check name Status Explanation
Title Check ✅ Passed The title "Google Cloud BigQuery improvements" accurately references the nature of the changes by highlighting improvements to BigQuery components but is somewhat generic and does not specify the key updates such as pagination and memory management enhancements.
Linked Issues Check ✅ Passed The code implements pagination and streaming of query results by introducing createQueryJob and an explicit paging loop in processCollection, directly addressing the out-of-memory issues described in linked issue #18352.
Out of Scope Changes Check ✅ Passed All modifications, including version bumps and refactored BigQuery methods, are directly related to implementing streaming pagination and releasing the updated components, with no unrelated changes detected.
Docstring Coverage ✅ Passed No functions found in the changes. Docstring coverage check skipped.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch issue-18352

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/google_cloud/sources/bigquery-new-row/bigquery-new-row.mjs (1)

103-126: Restore compatibility with updated getRowsForQuery signature

Line 116 still passes queryOpts and treats the return value as an array, but getRowsForQuery now expects a query job and returns an object. The current call will throw (job.getQueryResults is not a function) and prevent the source from bootstrapping its lastResultId. Please create the job first and destruct the rows from the new return shape.

-      const rows = await this.getRowsForQuery(queryOpts, this.datasetId);
+      const job = await this.createQueryJob(queryOpts);
+      const { rows } = await this.getRowsForQuery(job, limit);
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 214e592 and 2b9144b.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (4)
  • components/google_cloud/package.json (2 hunks)
  • components/google_cloud/sources/bigquery-new-row/bigquery-new-row.mjs (1 hunks)
  • components/google_cloud/sources/bigquery-query-results/bigquery-query-results.mjs (1 hunks)
  • components/google_cloud/sources/common/bigquery.mjs (3 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-12-12T19:23:09.039Z
Learnt from: jcortes
PR: PipedreamHQ/pipedream#14935
File: components/sailpoint/package.json:15-18
Timestamp: 2024-12-12T19:23:09.039Z
Learning: When developing Pipedream components, do not add built-in Node.js modules like `fs` to `package.json` dependencies, as they are native modules provided by the Node.js runtime.

Applied to files:

  • components/google_cloud/package.json
🧬 Code graph analysis (1)
components/google_cloud/sources/common/bigquery.mjs (1)
components/google_cloud/sources/bigquery-new-row/bigquery-new-row.mjs (2)
  • queryOpts (110-115)
  • rows (116-116)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: pnpm publish
  • GitHub Check: Publish TypeScript components
  • GitHub Check: Verify TypeScript components
  • GitHub Check: Lint Code Base
🔇 Additional comments (2)
components/google_cloud/package.json (1)

23-23: Confirm compatibility with @pipedream/platform@^3.1.0

This jumps the SDK from the pre-1.0 line to the 3.x series, so previous helper APIs and runtime behavior may have breaking changes. Please re-run the affected Google Cloud sources end-to-end (or at least rebuild + dry-run them) to confirm nothing regresses before publishing. If you’ve already validated this, just note it in the PR.

components/google_cloud/sources/bigquery-query-results/bigquery-query-results.mjs (1)

11-11: Version bump matches the BigQuery pagination work

Updating the source version to 0.1.6 lines up with the underlying pagination changes—looks good.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
components/google_cloud/sources/common/bigquery.mjs (3)

74-78: Make page size and max pages configurable (avoids OOMs and fits PR objective).

Hard-coded pageSize = 1000 and maxPages = 10 may still cause memory pressure for large rows. Expose them as props and use here.

Apply within this segment:

-      const pageSize = 1000, maxPages = 10;
+      const pageSize = this.pageSize ?? 1000;
+      const maxPages = this.maxPages ?? 10;

Then add props (outside this range):

// In props:
pageSize: {
  type: "integer",
  label: "Rows per page",
  description: "Max rows fetched per BigQuery page (lower values reduce memory).",
  default: 1000,
  min: 1,
},
maxPages: {
  type: "integer",
  label: "Max pages per run",
  description: "Safety cap on pages processed per run (prevents long/expensive pulls).",
  default: 10,
  min: 1,
}

74-114: Simplify pagination loop with a bounded for-loop.

The allProcessed flag is unnecessary. A bounded for-loop is clearer and avoids state juggling.

-      let pageToken = null;
-      let allProcessed = false;
-      let pageCount = 0;
-
-      while (!allProcessed) {
+      let pageToken = null;
+      for (let pageCount = 0; pageCount < maxPages; pageCount++) {
         const {
           rows, pageToken: nextPageToken,
         } = await this.getRowsForQuery(job, pageSize, pageToken);

         if (rows.length === 0) {
-          allProcessed = true;
           break;
         }

         chunk(rows, this.eventSize).forEach((batch) => {
           if (this.eventSize === 1) {
             const meta = this.generateMeta(batch[0], timestamp);
             this.$emit(batch[0], meta);
           } else {
             const meta = this.generateMetaForCollection(batch, timestamp);
             const data = {
               rows: batch,
               rowCount: batch.length,
             };
             this.$emit(data, meta);
           }
         });

-        pageCount++;
-        if (pageCount >= maxPages) {
-          allProcessed = true;
-        }
         if (this.uniqueKey) {
           this._updateLastResultId(rows);
         }
         if (!nextPageToken) {
           break;
         }
         pageToken = nextPageToken;
-      }
+      }

79-114: Add contextual error handling to improve OOM reporting.

Wrap page fetch/emit in try/catch and include page index, pageSize, and row counts in the error to aid debugging (aligns with issue #18352).

-      while (!allProcessed) {
+      while (!allProcessed) {
+        try {
           const {
             rows, pageToken: nextPageToken,
           } = await this.getRowsForQuery(job, pageSize, pageToken);
 
           if (rows.length === 0) {
             allProcessed = true;
             break;
           }
 
           chunk(rows, this.eventSize).forEach((batch) => {
             if (this.eventSize === 1) {
               const meta = this.generateMeta(batch[0], timestamp);
               this.$emit(batch[0], meta);
             } else {
               const meta = this.generateMetaForCollection(batch, timestamp);
               const data = {
                 rows: batch,
                 rowCount: batch.length,
               };
               this.$emit(data, meta);
             }
           });
 
           pageCount++;
           if (pageCount >= maxPages) {
             allProcessed = true;
           }
           if (this.uniqueKey) {
             this._updateLastResultId(rows);
           }
           if (!nextPageToken) {
             break;
           }
           pageToken = nextPageToken;
+        } catch (err) {
+          const context = {
+            pageCount,
+            pageSize,
+            pageToken,
+            eventSize: this.eventSize,
+          };
+          err.message = `BigQuery pagination error: ${err.message} | context=${JSON.stringify(context)}`;
+          throw err;
+        }
       }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2b9144b and cdf1870.

📒 Files selected for processing (1)
  • components/google_cloud/sources/common/bigquery.mjs (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
components/google_cloud/sources/common/bigquery.mjs (1)
components/google_cloud/sources/bigquery-new-row/bigquery-new-row.mjs (2)
  • queryOpts (110-115)
  • rows (116-116)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: pnpm publish
  • GitHub Check: Lint Code Base
  • GitHub Check: Verify TypeScript components
  • GitHub Check: Publish TypeScript components
🔇 Additional comments (3)
components/google_cloud/sources/common/bigquery.mjs (3)

110-114: Break on missing next page token — pagination termination fixed.

Exiting when nextPageToken is falsy prevents re-reading the first page. Good.


33-41: Dataset.createQueryJob is supported — available on both the BigQuery client and Dataset instance; no changes needed.


42-59: Confirm getRowsForQuery usages updated
Ripgrep found no JS/TS references; check .mjs and other files for any lingering old-signature calls.

Copy link
Collaborator

@GTFalcao GTFalcao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@vunguyenhung vunguyenhung merged commit 2b5ab6f into master Sep 26, 2025
10 checks passed
@vunguyenhung vunguyenhung deleted the issue-18352 branch September 26, 2025 07:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] BigQuery - New Query Result throws out of memory error

4 participants