Skip to content

Commit 379b574

Browse files
committed
[FIX] Airtable webhook sources intermittently emit the raw Airtable payload
1 parent 5040561 commit 379b574

File tree

10 files changed

+104
-49
lines changed

10 files changed

+104
-49
lines changed

components/airtable_oauth/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@pipedream/airtable_oauth",
3-
"version": "0.5.1",
3+
"version": "0.5.2",
44
"description": "Pipedream Airtable (OAuth) Components",
55
"main": "airtable_oauth.app.mjs",
66
"keywords": [
@@ -15,7 +15,9 @@
1515
"dependencies": {
1616
"@pipedream/platform": "^3.0.3",
1717
"airtable": "^0.11.1",
18+
"async-retry": "^1.3.3",
1819
"bottleneck": "^2.19.5",
20+
"crypto": "^1.0.1",
1921
"lodash.chunk": "^4.2.0",
2022
"lodash.isempty": "^4.4.0",
2123
"moment": "^2.30.1"

components/airtable_oauth/sources/common/common-webhook-field.mjs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,6 @@ export default {
3838
fieldUpdateInfo,
3939
] = Object.entries(fieldObj)[0];
4040

41-
const timestamp = Date.parse(payload.timestamp);
42-
if (this.isDuplicateEvent(fieldId, timestamp)) return;
43-
this._setLastObjectId(fieldId);
44-
this._setLastTimestamp(timestamp);
45-
4641
const updateType = operation === "createdFieldsById"
4742
? "created"
4843
: "updated";

components/airtable_oauth/sources/common/common-webhook-record.mjs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import common from "./common-webhook.mjs";
2+
import retry from "async-retry";
23

34
export default {
45
...common,
@@ -9,6 +10,17 @@ export default {
910
"tableData",
1011
];
1112
},
13+
withRetries(apiCall, retries = 3) {
14+
return retry(async (bail) => {
15+
try {
16+
return await apiCall();
17+
} catch (err) {
18+
return bail(err);
19+
}
20+
}, {
21+
retries,
22+
});
23+
},
1224
async emitEvent(payload) {
1325
const [
1426
tableId,
@@ -43,20 +55,20 @@ export default {
4355
recordUpdateInfo,
4456
] = Object.entries(recordObj)[0];
4557

46-
const timestamp = Date.parse(payload.timestamp);
47-
if (this.isDuplicateEvent(recordId, timestamp)) return;
48-
this._setLastObjectId(recordId);
49-
this._setLastTimestamp(timestamp);
50-
5158
let updateType = operation === "createdRecordsById"
5259
? "created"
5360
: "updated";
5461

55-
const { fields } = await this.airtable.getRecord({
56-
baseId: this.baseId,
57-
tableId,
58-
recordId,
59-
});
62+
let fields = {};
63+
try {
64+
({ fields } = await this.withRetries(() => this.airtable.getRecord({
65+
baseId: this.baseId,
66+
tableId,
67+
recordId,
68+
})));
69+
} catch (e) {
70+
fields = {};
71+
}
6072

6173
const summary = `Record ${updateType}: ${fields?.name ?? recordId}`;
6274

components/airtable_oauth/sources/common/common-webhook.mjs

Lines changed: 51 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { createHmac } from "crypto";
12
import airtable from "../../airtable_oauth.app.mjs";
23
import constants from "../common/constants.mjs";
34

@@ -48,7 +49,9 @@ export default {
4849
},
4950
hooks: {
5051
async activate() {
51-
const { id } = await this.airtable.createWebhook({
52+
const {
53+
id, macSecretBase64,
54+
} = await this.airtable.createWebhook({
5255
baseId: this.baseId,
5356
data: {
5457
notificationUrl: `${this.http.endpoint}/`,
@@ -76,6 +79,7 @@ export default {
7679
},
7780
});
7881
this._setHookId(id);
82+
this._setMacSecretBase64(macSecretBase64);
7983
},
8084
async deactivate() {
8185
const webhookId = this._getHookId();
@@ -94,28 +98,17 @@ export default {
9498
_setHookId(hookId) {
9599
this.db.set("hookId", hookId);
96100
},
97-
_getLastObjectId() {
98-
return this.db.get("lastObjectId");
99-
},
100-
async _setLastObjectId(id) {
101-
this.db.set("lastObjectId", id);
101+
_getMacSecretBase64() {
102+
return this.db.get("macSecretBase64");
102103
},
103-
_getLastTimestamp() {
104-
return this.db.get("lastTimestamp");
104+
_setMacSecretBase64(value) {
105+
this.db.set("macSecretBase64", value);
105106
},
106-
async _setLastTimestamp(ts) {
107-
this.db.set("lastTimestamp", ts);
107+
_setLastCursor(cursor) {
108+
this.db.set("lastCursor", cursor);
108109
},
109-
isDuplicateEvent(id, ts) {
110-
const lastId = this._getLastObjectId();
111-
const lastTs = this._getLastTimestamp();
112-
113-
if (id === lastId && (ts - lastTs < 5000 )) {
114-
console.log("Skipping trigger: another event was emitted for the same object within the last 5 seconds");
115-
return true;
116-
}
117-
118-
return false;
110+
_getLastCursor() {
111+
return this.db.get("lastCursor");
119112
},
120113
getSpecificationOptions() {
121114
throw new Error("getSpecificationOptions is not implemented");
@@ -135,7 +128,9 @@ export default {
135128
},
136129
emitDefaultEvent(payload) {
137130
const meta = this.generateMeta(payload);
138-
this.$emit(payload, meta);
131+
this.$emit({
132+
originalPayload: payload,
133+
}, meta);
139134
},
140135
async emitEvent(payload) {
141136
// sources may call this to customize event emission, but it is
@@ -147,30 +142,60 @@ export default {
147142
// and it can be silently ignored when not required
148143
return true;
149144
},
145+
isSignatureValid(signature, bodyRaw) {
146+
const macSecretBase64FromCreate = this._getMacSecretBase64();
147+
const macSecretDecoded = Buffer.from(macSecretBase64FromCreate, "base64");
148+
const body = Buffer.from(bodyRaw, "utf8");
149+
const hmac = createHmac("sha256", macSecretDecoded)
150+
.update(body.toString(), "ascii")
151+
.digest("hex");
152+
const expectedContentHmac = "hmac-sha256=" + hmac;
153+
return signature === expectedContentHmac;
154+
},
155+
payloadFilter() {
156+
return true;
157+
},
150158
},
151-
async run() {
159+
async run({
160+
bodyRaw, headers: { ["x-airtable-content-mac"]: signature },
161+
}) {
162+
const isValid = this.isSignatureValid(signature, bodyRaw);
163+
if (!isValid) {
164+
return this.http.respond({
165+
status: 401,
166+
});
167+
}
168+
152169
this.http.respond({
153170
status: 200,
154171
});
155172
// webhook pings source, we then fetch webhook events to emit
156173
const webhookId = this._getHookId();
157174
let hasMore = false;
158-
const params = {};
175+
159176
try {
160177
await this.saveAdditionalData();
161178
} catch (err) {
162179
console.log("Error fetching additional data, proceeding to event emission");
163180
console.log(err);
164181
}
182+
const params = {
183+
cursor: this._getLastCursor(),
184+
};
185+
165186
do {
166187
const {
167188
cursor, mightHaveMore, payloads,
168189
} = await this.airtable.listWebhookPayloads({
190+
debug: true,
169191
baseId: this.baseId,
170192
webhookId,
171193
params,
172194
});
173-
for (const payload of payloads) {
195+
196+
const filteredPayloads = payloads.filter(this.payloadFilter);
197+
198+
for (const payload of filteredPayloads) {
174199
try {
175200
await this.emitEvent(payload);
176201
} catch (err) {
@@ -182,5 +207,7 @@ export default {
182207
params.cursor = cursor;
183208
hasMore = mightHaveMore;
184209
} while (hasMore);
210+
211+
this._setLastCursor(params.cursor);
185212
},
186213
};

components/airtable_oauth/sources/new-field/new-field.mjs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@ export default {
55
name: "New Field Created (Instant)",
66
description: "Emit new event when a field is created in the selected table. [See the documentation](https://airtable.com/developers/web/api/get-base-schema)",
77
key: "airtable_oauth-new-field",
8-
version: "1.0.3",
8+
version: "1.0.4",
99
type: "source",
1010
dedupe: "unique",
1111
methods: {
1212
...common.methods,
13+
payloadFilter(payload) {
14+
return !!payload.changedTablesById;
15+
},
1316
getChangeTypes() {
1417
return [
1518
"add",

components/airtable_oauth/sources/new-modified-or-deleted-records-instant/new-modified-or-deleted-records-instant.mjs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
import common from "../common/common-webhook-record.mjs";
22
import constants from "../common/constants.mjs";
33
import sampleEmit from "./test-event.mjs";
4-
import airtable from "../../airtable_oauth.app.mjs";
54

65
export default {
76
...common,
87
name: "New Record Created, Updated or Deleted (Instant)",
98
description: "Emit new event when a record is added, updated, or deleted in a table or selected view.",
109
key: "airtable_oauth-new-modified-or-deleted-records-instant",
11-
version: "0.1.3",
10+
version: "0.1.4",
1211
type: "source",
1312
dedupe: "unique",
1413
props: {
@@ -27,7 +26,7 @@ export default {
2726
},
2827
watchDataInFieldIds: {
2928
propDefinition: [
30-
airtable,
29+
common.props.airtable,
3130
"sortFieldId",
3231
(c) => ({
3332
baseId: c.baseId,
@@ -42,6 +41,9 @@ export default {
4241
},
4342
methods: {
4443
...common.methods,
44+
payloadFilter(payload) {
45+
return !!payload.changedTablesById;
46+
},
4547
getDataTypes() {
4648
return [
4749
"tableData",

components/airtable_oauth/sources/new-or-modified-field/new-or-modified-field.mjs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@ export default {
66
name: "New or Modified Field (Instant)",
77
description: "Emit new event when a field is created or updated in the selected table",
88
key: "airtable_oauth-new-or-modified-field",
9-
version: "1.0.3",
9+
version: "1.0.4",
1010
type: "source",
1111
dedupe: "unique",
1212
methods: {
1313
...common.methods,
14+
payloadFilter(payload) {
15+
return !!payload.changedTablesById;
16+
},
1417
getChangeTypes() {
1518
return [
1619
"add",

components/airtable_oauth/sources/new-or-modified-records/new-or-modified-records.mjs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
import common from "../common/common-webhook-record.mjs";
2-
import airtable from "../../airtable_oauth.app.mjs";
32

43
export default {
54
...common,
65
name: "New or Modified Records (Instant)",
76
key: "airtable_oauth-new-or-modified-records",
87
description: "Emit new event for each new or modified record in a table or view",
9-
version: "1.0.3",
8+
version: "1.0.4",
109
type: "source",
1110
dedupe: "unique",
1211
methods: {
1312
...common.methods,
13+
payloadFilter(payload) {
14+
return !!payload.changedTablesById;
15+
},
1416
getChangeTypes() {
1517
return [
1618
"add",
@@ -22,7 +24,7 @@ export default {
2224
...common.props,
2325
watchDataInFieldIds: {
2426
propDefinition: [
25-
airtable,
27+
common.props.airtable,
2628
"sortFieldId",
2729
(c) => ({
2830
baseId: c.baseId,

components/airtable_oauth/sources/new-records/new-records.mjs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@ export default {
55
name: "New Record(s) Created (Instant)",
66
description: "Emit new event for each new record in a table",
77
key: "airtable_oauth-new-records",
8-
version: "1.0.3",
8+
version: "1.0.4",
99
type: "source",
1010
dedupe: "unique",
1111
methods: {
1212
...common.methods,
13+
payloadFilter(payload) {
14+
return !!payload.changedTablesById;
15+
},
1316
getChangeTypes() {
1417
return [
1518
"add",

pnpm-lock.yaml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)