Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions components/airtable_oauth/common/utils.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,29 @@ function buildSingleCollaboratorField(value) {
};
}

function sleep(ms) {
return new Promise((r) => setTimeout(r, ms));
}

async function withRetry(fn, {
retries = 2, baseDelay = 500,
} = {}) {
let attempt = 0;
while (attempt <= retries) {
try {
return await fn();
} catch (err) {
if (attempt === retries) throw err;
await sleep(baseDelay * (2 ** attempt));
attempt += 1;
}
}
}
Comment on lines +170 to +183
Copy link
Contributor

@coderabbitai coderabbitai bot Aug 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Retry policy retries on all errors; add shouldRetry, jitter, and max cap

Blindly retrying on 4xx (e.g., 404 for rapidly-deleted records) adds latency with no benefit. Add a shouldRetry predicate, jitter, and a maxDelay cap.

Apply:

-async function withRetry(fn, {
-  retries = 2, baseDelay = 500,
-} = {}) {
+async function withRetry(fn, {
+  retries = 2,
+  baseDelay = 500,
+  factor = 2,
+  maxDelay = 5_000,
+  jitter = 0.1,
+  shouldRetry,
+} = {}) {
   let attempt = 0;
   while (attempt <= retries) {
     try {
       return await fn();
     } catch (err) {
-      if (attempt === retries) throw err;
-      await sleep(baseDelay * (2 ** attempt));
-      attempt += 1;
+      const status = err?.statusCode ?? err?.response?.status;
+      const defaultShouldRetry = (e) => {
+        const s = e?.statusCode ?? e?.response?.status;
+        return s == null || s >= 500 || s === 429;
+      };
+      const should = (shouldRetry ?? defaultShouldRetry)(err, attempt);
+      if (!should || attempt === retries) throw err;
+      const backoff = Math.min(maxDelay, baseDelay * (factor ** attempt));
+      const rand = 1 + (Math.random() * 2 - 1) * jitter; // +/- jitter
+      const delay = Math.max(0, Math.floor(backoff * rand));
+      await sleep(delay);
+      attempt += 1;
     }
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async function withRetry(fn, {
retries = 2, baseDelay = 500,
} = {}) {
let attempt = 0;
while (attempt <= retries) {
try {
return await fn();
} catch (err) {
if (attempt === retries) throw err;
await sleep(baseDelay * (2 ** attempt));
attempt += 1;
}
}
}
async function withRetry(fn, {
retries = 2,
baseDelay = 500,
factor = 2,
maxDelay = 5_000,
jitter = 0.1,
shouldRetry,
} = {}) {
let attempt = 0;
while (attempt <= retries) {
try {
return await fn();
} catch (err) {
const status = err?.statusCode ?? err?.response?.status;
const defaultShouldRetry = (e) => {
const s = e?.statusCode ?? e?.response?.status;
return s == null || s >= 500 || s === 429;
};
const should = (shouldRetry ?? defaultShouldRetry)(err, attempt);
if (!should || attempt === retries) throw err;
const backoff = Math.min(maxDelay, baseDelay * (factor ** attempt));
const rand = 1 + (Math.random() * 2 - 1) * jitter; // +/- jitter
const delay = Math.max(0, Math.floor(backoff * rand));
await sleep(delay);
attempt += 1;
}
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might as well have a 404 when we fetch "too early" and I think this was the case at least once in our testing. Hence the retry

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariczne That's a great point! If Airtable's API can return 404s for records that exist but aren't immediately available after webhook notifications (due to eventual consistency or propagation delays), then retrying on 404s makes perfect sense for this use case.

Your simpler retry implementation is appropriate here since the domain-specific behavior justifies retrying on all errors, including client errors like 404. Thanks for the clarification!


✏️ Learnings added
Learnt from: mariczne
PR: PipedreamHQ/pipedream#18221
File: components/airtable_oauth/common/utils.mjs:170-183
Timestamp: 2025-08-29T08:35:55.886Z
Learning: In Airtable webhook integrations, 404 errors can occur when fetching records "too early" after receiving webhook notifications, likely due to propagation delays between the webhook system and the API. Therefore, retrying on 404 errors is beneficial in this context.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


export {
fieldTypeToPropType,
fieldToProp,
makeFieldProps,
makeRecord,
withRetry,
};
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,19 @@ export default {
}
},
async emitEvent(payload) {
const changed = payload?.changedTablesById;
const tableEntry = changed && Object.entries(changed)[0];
if (!tableEntry) {
// Unknown / empty shape — emit normalized raw so consumers still get a consistent shape
this.$emit({
originalPayload: payload,
}, this.generateMeta(payload));
return;
}
const [
tableId,
tableData,
] = Object.entries(payload.changedTablesById)[0];
] = tableEntry;
const [
operation,
fieldObj,
Expand Down
35 changes: 29 additions & 6 deletions components/airtable_oauth/sources/common/common-webhook-record.mjs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import common from "./common-webhook.mjs";
import { withRetry } from "../../common/utils.mjs";

export default {
...common,
Expand All @@ -10,10 +11,19 @@ export default {
];
},
async emitEvent(payload) {
const changed = payload?.changedTablesById;
const tableEntry = changed && Object.entries(changed)[0];
if (!tableEntry) {
// Unknown / empty shape — emit normalized raw so consumers still get a consistent shape
this.$emit({
originalPayload: payload,
}, this.generateMeta(payload));
return;
}
const [
tableId,
tableData,
] = Object.entries(payload.changedTablesById)[0];
] = tableEntry;
let [
operation,
recordObj,
Expand Down Expand Up @@ -52,11 +62,24 @@ export default {
? "created"
: "updated";

const { fields } = await this.airtable.getRecord({
baseId: this.baseId,
tableId,
recordId,
});
let fields = {};
try {
const res = await withRetry(
() => this.airtable.getRecord({
baseId: this.baseId,
tableId,
recordId,
}),
);

fields = res?.fields ?? {};
} catch (err) {
Comment on lines +65 to +76
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refine retries to avoid useless 4xx retries and add jitter

Pass shouldRetry to avoid retrying on 404/400s and to support 429/5xx/network only. This reduces latency when records are deleted between event and fetch.

-      try {
-        const res = await withRetry(
-          () => this.airtable.getRecord({
-            baseId: this.baseId,
-            tableId,
-            recordId,
-          }),
-        );
+      try {
+        const res = await withRetry(
+          () => this.airtable.getRecord({
+            baseId: this.baseId,
+            tableId,
+            recordId,
+          }),
+          {
+            retries: 2,
+            baseDelay: 500,
+            // Retry on 5xx / 429 / network only
+            shouldRetry: (err) => {
+              const status = err?.statusCode ?? err?.response?.status;
+              return status == null || status >= 500 || status === 429;
+            },
+          },
+        );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let fields = {};
try {
const res = await withRetry(
() => this.airtable.getRecord({
baseId: this.baseId,
tableId,
recordId,
}),
);
fields = res?.fields ?? {};
} catch (err) {
let fields = {};
try {
const res = await withRetry(
() => this.airtable.getRecord({
baseId: this.baseId,
tableId,
recordId,
}),
{
retries: 2,
baseDelay: 500,
// Retry on 5xx / 429 / network only
shouldRetry: (err) => {
const status = err?.statusCode ?? err?.response?.status;
return status == null || status >= 500 || status === 429;
},
},
);
fields = res?.fields ?? {};
} catch (err) {

console.log("Airtable getRecord failed; emitting empty fields", {
statusCode: err?.statusCode ?? err?.response?.status,
message: err?.message,
});
fields = {};
}

const summary = `Record ${updateType}: ${fields?.name ?? recordId}`;

Expand Down
6 changes: 5 additions & 1 deletion components/airtable_oauth/sources/common/common-webhook.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,12 @@ export default {
return this.changeTypes;
},
emitDefaultEvent(payload) {
// Normalize the fallback so consumers always get a consistent structure
// matching our other emitters (which include originalPayload).
const meta = this.generateMeta(payload);
this.$emit(payload, meta);
this.$emit({
originalPayload: payload,
}, meta);
},
Comment on lines +137 to 143
Copy link
Contributor

@coderabbitai coderabbitai bot Aug 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Potential breaking change: default emission now wrapped; provide BC path

Wrapping the payload under originalPayload can break existing workflows that read top-level fields. Consider dual-emitting the raw fields alongside originalPayload for a deprecation window.

-  const meta = this.generateMeta(payload);
-  this.$emit({
-    originalPayload: payload,
-  }, meta);
+  const meta = this.generateMeta(payload);
+  // Back-compat: spread raw keys for existing consumers; new consumers use originalPayload.
+  this.$emit({
+    originalPayload: payload,
+    ...payload,
+  }, meta);

If you prefer a hard switch, gate the spread behind an env flag (e.g., PD_AIRTABLE_EMIT_RAW_COMPAT !== "false") and plan a removal date in the changelog.


🏁 Script executed:

#!/bin/bash
set -e

echo "Listing files in components/airtable_oauth/sources/common:"
ls -1 components/airtable_oauth/sources/common

echo
echo "Inspecting all $emit calls in common directory:"
rg -n "\$emit" -C1 components/airtable_oauth/sources/common/*.mjs || true

echo
echo "Inspecting originalPayload usage in common directory:"
rg -n "originalPayload" -C1 components/airtable_oauth/sources/common/*.mjs || true

Length of output: 3278


Provide backward-compatibility by dual-emitting raw payload fields

The new emitters now nest the entire payload under originalPayload, dropping all top-level properties and breaking consumers that expect those fields. You must restore the old shape by spreading payload after originalPayload (or gate it behind an env flag for a deprecation window).

Update these emit calls:

  • components/airtable_oauth/sources/common/common-webhook.mjs (lines 140–143)
  • components/airtable_oauth/sources/common/common-webhook-record.mjs (lines 18, 43, 86)
  • components/airtable_oauth/sources/common/common-webhook-field.mjs (lines 32, 75)

Example diff for common-webhook.mjs:

- this.$emit({ originalPayload: payload }, meta);
+ this.$emit(
+   { originalPayload: payload, ...payload },
+   meta
+ );

If you prefer a toggle, wrap the spread in an env-flag check (e.g. PD_AIRTABLE_EMIT_RAW_COMPAT !== "false") and schedule its removal in your changelog.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Normalize the fallback so consumers always get a consistent structure
// matching our other emitters (which include originalPayload).
const meta = this.generateMeta(payload);
this.$emit(payload, meta);
this.$emit({
originalPayload: payload,
}, meta);
},
// Normalize the fallback so consumers always get a consistent structure
// matching our other emitters (which include originalPayload).
const meta = this.generateMeta(payload);
this.$emit(
{ originalPayload: payload, ...payload },
meta
);
},

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that what versioning is for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariczne You're absolutely right! Versioning is the cleaner approach here.

If you're planning to bump the component version for this structural change, then the clean payload wrapping under originalPayload makes perfect sense without the backward compatibility complexity I suggested.

A version bump would:

  • Give users control over when to migrate
  • Avoid the dual-emission overhead
  • Keep the new code clean and consistent
  • Signal that this is an intentional breaking change

Are you planning to version this as a major update? If so, the current implementation is much better than my suggested dual-emission approach.

🤔 Hmm, you make a good point about versioning...
(\ /)
( ..)
o
(")(")


🧠 Learnings used
Learnt from: GTFalcao
PR: PipedreamHQ/pipedream#12697
File: components/salesforce_rest_api/sources/common-webhook-methods.mjs:1-71
Timestamp: 2024-07-24T02:06:47.016Z
Learning: The `common-webhook-methods.mjs` object is designed to be extended, similar to an abstract class, and intentionally does not implement certain methods like `generateWebhookMeta` and `getEventType` to enforce implementation in subclasses.

Learnt from: GTFalcao
PR: PipedreamHQ/pipedream#12697
File: components/salesforce_rest_api/sources/common-webhook-methods.mjs:1-71
Timestamp: 2024-10-08T15:33:38.240Z
Learning: The `common-webhook-methods.mjs` object is designed to be extended, similar to an abstract class, and intentionally does not implement certain methods like `generateWebhookMeta` and `getEventType` to enforce implementation in subclasses.

async emitEvent(payload) {
// sources may call this to customize event emission, but it is
Expand Down
Loading