Skip to content

Commit 51bc713

Browse files
committed
add new message source
1 parent 9aace2c commit 51bc713

File tree

19 files changed

+103
-1212
lines changed

19 files changed

+103
-1212
lines changed

components/drift/common/utils.mjs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,27 @@ const removeNullEntries = (obj) =>
2222
: acc;
2323
}, {});
2424

25+
function doesContextMatch(inputContext, fetchedContext) {
26+
27+
if (typeof inputContext !== "object" || inputContext === null || Array.isArray(inputContext)) {
28+
throw new Error ("Message context is not an object");
29+
};
30+
31+
for (const key of Object.keys(inputContext)) {
32+
if (!(key in fetchedContext)) {
33+
console.log(`Invalid context field "${key}", emission skipped` );
34+
return false;
35+
}
36+
if (fetchedContext[key] !== inputContext[key]) {
37+
console.log(`Context values of "${key}" do not match, emission skipped` );
38+
return false;
39+
}
40+
}
41+
return true;
42+
};
43+
2544
export {
2645
removeNullEntries,
46+
doesContextMatch,
2747
};
48+

components/drift/drift.app.mjs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,27 @@ export default {
126126
const newest = arr.slice(0, firtsNew);
127127
return newest.reverse();
128128
},
129+
async getMessagesByConvId($, conversationId) {
130+
131+
const messages = [];
132+
let next;
133+
134+
do {
135+
const result = await this._makeRequest({
136+
$,
137+
path: `/conversations/${conversationId}/messages${next
138+
? `?next=${next}`
139+
: ""}`,
140+
});
141+
142+
messages.push(...result.data.messages);
143+
next = result?.pagination?.next;
144+
145+
} while (next);
146+
147+
return messages;
148+
},
149+
129150
parseIfJSONString(input) {
130151

131152
if (typeof input === "string") {

components/drift/sources/new-conversation/new-conversation.mjs

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,6 @@ export default {
2020
},
2121
},
2222
},
23-
hooks: {
24-
async activate() {
25-
26-
const {
27-
drift,
28-
db,
29-
} = this;
30-
31-
await db.set("lastConversation", null); //reset
32-
33-
const result = await drift._makeRequest({
34-
path: "/conversations/list?limit=100&statusId=1",
35-
});
36-
37-
if (!result.data.length) {
38-
console.log("No conversations found.");
39-
return;
40-
};
41-
42-
await db.set("lastConversation", result.data[0].id);
43-
console.log(`Initialized with ID ${result.data[0].id}.`);
44-
45-
},
46-
},
4723

4824
async run({ $ }) {
4925
const {
Lines changed: 61 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,90 @@
11
import drift from "../../drift.app.mjs";
2+
import { DEFAULT_POLLING_SOURCE_TIMER_INTERVAL } from "@pipedream/platform";
3+
import { doesContextMatch } from "../../common/utils.mjs";
24

35
export default {
4-
key: "drift-new-message-instant",
5-
name: "New Message",
6-
description: "Emit new event when a new message is received in Drift. [See the docs](https://devdocs.drift.com/docs/webhook-events-1).",
7-
version: "0.0.1",
6+
key: "drift-new-message",
7+
name: "New Message in Conversation",
8+
description: "Emit new event when a message is received in a specific Drift conversation.",
9+
version: "0.0.3",
810
type: "source",
11+
dedupe: "unique",
12+
913
props: {
1014
drift,
11-
http: "$.interface.http",
15+
db: "$.service.db",
1216
conversationId: {
1317
type: "integer",
1418
label: "Conversation ID",
15-
description: "The ID of the conversation to monitor. Emits events for all new messages if not provided.",
16-
optional: true,
19+
description: "Enter the ID of the conversation",
1720
},
18-
emailOrId: {
19-
type: "string",
20-
label: "Email or ID",
21-
description: "Email or ID of the user to monitor. Emits events for all new messages if not provided.",
21+
messageContext: {
22+
type: "object",
23+
label: "Message Context",
24+
description: "Enter message context [See the documentation](https://devdocs.drift.com/docs/message-model).",
2225
optional: true,
2326
},
27+
timer: {
28+
type: "$.interface.timer",
29+
label: "Polling Interval",
30+
description: "How often to poll Drift for new messages.",
31+
default: {
32+
intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL,
33+
},
34+
},
2435
},
25-
async run(event) {
2636

27-
const { body } = event;
28-
const { drift } = this;
37+
async run({ $ }) {
38+
const {
39+
drift,
40+
db,
41+
conversationId,
42+
} = this;
2943

30-
if (body?.type !== "new_message") {
31-
console.log("Ignored non-new_message event:", body?.type);
44+
const messageContext = drift.parseIfJSONString(this.messageContext);
45+
46+
console.log(messageContext);
47+
const messages = await drift.getMessagesByConvId($, conversationId);
48+
49+
if (!messages?.length) {
50+
console.log("No messages found.");
3251
return;
3352
};
3453

35-
// If conversationId provided
36-
if (this.conversationId && !(this.conversationId === Number(body.data.conversationId))) {
37-
console.log(`Ignored. Wrong conversationId.
38-
Expected ${this.conversationId} got ${body.data.conversationId}`);
54+
let lastMessageId = await db.get("lastMessage");
55+
56+
const lastFetchedMsgId = messages[messages.length - 1].id;
57+
58+
if (!lastMessageId) {
59+
await db.set("lastMessage", lastFetchedMsgId);
60+
console.log(`Initialized with ID ${lastFetchedMsgId}.`);
3961
return;
4062
};
4163

42-
const contactId = body.data.author.id;
43-
44-
const result = await drift.getContactById({
45-
contactId,
46-
});
64+
if (lastMessageId === lastFetchedMsgId) {
65+
console.log("No new messages found");
66+
return;
67+
};
4768

48-
const email = result.data?.attributes?.email || "unknown";
69+
const lastMessageIndex = messages.findIndex((obj) => obj.id === lastMessageId);
4970

50-
if (this.emailOrId &&
51-
!(email === this.emailOrId || Number(contactId) === Number(this.emailOrId))) {
52-
console.log(`Ignored. Wrong emailOrId. Expected ${this.emailOrId}`);
71+
if (lastMessageIndex === -1) {
72+
console.log("Last message ID not found.");
5373
return;
5474
};
5575

56-
body.data.attributes = result.data.attributes;
76+
for (let i = lastMessageIndex + 1; i < messages.length; i++) {
77+
if (messageContext) {
78+
if (!doesContextMatch(messageContext, messages[i].context)) continue;
79+
}
80+
this.$emit(messages[i], {
81+
id: messages[i].id,
82+
summary: `New message with ID ${messages[i].id}`,
83+
ts: messages[i].createdAt,
84+
});
85+
};
86+
87+
await db.set("lastMessage", lastFetchedMsgId);
5788

58-
this.$emit(body, {
59-
summary: `New message from contact "${email} " ID "${contactId || "unknown"}"`,
60-
id: body.data.endUserId,
61-
ts: body.timeStamp,
62-
});
6389
},
6490
};

components/drift/tests/README.md

Lines changed: 0 additions & 44 deletions
This file was deleted.
-18.7 KB
Binary file not shown.

components/drift/tests/action-tests/create-contact.mjs

Lines changed: 0 additions & 111 deletions
This file was deleted.

0 commit comments

Comments
 (0)