|
1 | 1 | import drift from "../../drift.app.mjs"; |
| 2 | +import { DEFAULT_POLLING_SOURCE_TIMER_INTERVAL } from "@pipedream/platform"; |
| 3 | +import { doesContextMatch } from "../../common/utils.mjs"; |
2 | 4 |
|
3 | 5 | export default { |
4 | | - key: "drift-new-message-instant", |
5 | | - name: "New Message", |
6 | | - description: "Emit new event when a new message is received in Drift. [See the docs](https://devdocs.drift.com/docs/webhook-events-1).", |
7 | | - version: "0.0.1", |
| 6 | + key: "drift-new-message-test2", |
| 7 | + name: "New Message in Conversation", |
| 8 | + description: "Emit new event when a message is received in a specific Drift conversation.", |
| 9 | + version: "0.0.3", |
8 | 10 | type: "source", |
| 11 | + dedupe: "unique", |
| 12 | + |
9 | 13 | props: { |
10 | 14 | drift, |
11 | | - http: "$.interface.http", |
| 15 | + db: "$.service.db", |
12 | 16 | conversationId: { |
13 | 17 | type: "integer", |
14 | 18 | label: "Conversation ID", |
15 | | - description: "The ID of the conversation to monitor. Emits events for all new messages if not provided.", |
16 | | - optional: true, |
| 19 | + description: "Enter the ID of the conversation", |
17 | 20 | }, |
18 | | - emailOrId: { |
19 | | - type: "string", |
20 | | - label: "Email or ID", |
21 | | - description: "Email or ID of the user to monitor. Emits events for all new messages if not provided.", |
| 21 | + messageContext: { |
| 22 | + type: "object", |
| 23 | + label: "Message Context", |
| 24 | + description: "Enter message context [See the documentation](https://devdocs.drift.com/docs/message-model).", |
22 | 25 | optional: true, |
23 | 26 | }, |
| 27 | + timer: { |
| 28 | + type: "$.interface.timer", |
| 29 | + label: "Polling Interval", |
| 30 | + description: "How often to poll Drift for new messages.", |
| 31 | + default: { |
| 32 | + intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL, |
| 33 | + }, |
| 34 | + }, |
24 | 35 | }, |
25 | | - async run(event) { |
26 | 36 |
|
27 | | - const { body } = event; |
28 | | - const { drift } = this; |
| 37 | + async run({ $ }) { |
| 38 | + const { |
| 39 | + drift, |
| 40 | + db, |
| 41 | + conversationId, |
| 42 | + } = this; |
29 | 43 |
|
30 | | - if (body?.type !== "new_message") { |
31 | | - console.log("Ignored non-new_message event:", body?.type); |
| 44 | + const messageContext = drift.parseIfJSONString(this.messageContext); |
| 45 | + |
| 46 | + console.log(messageContext); |
| 47 | + const messages = await drift.getMessagesByConvId($, conversationId); |
| 48 | + |
| 49 | + if (!messages?.length) { |
| 50 | + console.log("No messages found."); |
32 | 51 | return; |
33 | 52 | }; |
34 | 53 |
|
35 | | - // If conversationId provided |
36 | | - if (this.conversationId && !(this.conversationId === Number(body.data.conversationId))) { |
37 | | - console.log(`Ignored. Wrong conversationId. |
38 | | - Expected ${this.conversationId} got ${body.data.conversationId}`); |
| 54 | + let lastMessageId = await db.get("lastMessage"); |
| 55 | + |
| 56 | + const lastFetchedMsgId = messages[messages.length - 1].id; |
| 57 | + |
| 58 | + if (!lastMessageId) { |
| 59 | + await db.set("lastMessage", lastFetchedMsgId); |
| 60 | + console.log(`Initialized with ID ${lastFetchedMsgId}.`); |
39 | 61 | return; |
40 | 62 | }; |
41 | 63 |
|
42 | | - const contactId = body.data.author.id; |
43 | | - |
44 | | - const result = await drift.getContactById({ |
45 | | - contactId, |
46 | | - }); |
| 64 | + if (lastMessageId === lastFetchedMsgId) { |
| 65 | + console.log("No new messages found"); |
| 66 | + return; |
| 67 | + }; |
47 | 68 |
|
48 | | - const email = result.data?.attributes?.email || "unknown"; |
| 69 | + const lastMessageIndex = messages.findIndex((obj) => obj.id === lastMessageId); |
49 | 70 |
|
50 | | - if (this.emailOrId && |
51 | | - !(email === this.emailOrId || Number(contactId) === Number(this.emailOrId))) { |
52 | | - console.log(`Ignored. Wrong emailOrId. Expected ${this.emailOrId}`); |
| 71 | + if (lastMessageIndex === -1) { |
| 72 | + console.log("Last message ID not found."); |
53 | 73 | return; |
54 | 74 | }; |
55 | 75 |
|
56 | | - body.data.attributes = result.data.attributes; |
| 76 | + for (let i = lastMessageIndex + 1; i < messages.length; i++) { |
| 77 | + if (messageContext) { |
| 78 | + if (!doesContextMatch(messageContext, messages[i].context)) continue; |
| 79 | + } |
| 80 | + this.$emit(messages[i], { |
| 81 | + id: messages[i].id, |
| 82 | + summary: `New message with ID ${messages[i].id}`, |
| 83 | + ts: messages[i].createdAt, |
| 84 | + }); |
| 85 | + }; |
| 86 | + |
| 87 | + await db.set("lastMessage", lastFetchedMsgId); |
57 | 88 |
|
58 | | - this.$emit(body, { |
59 | | - summary: `New message from contact "${email} " ID "${contactId || "unknown"}"`, |
60 | | - id: body.data.endUserId, |
61 | | - ts: body.timeStamp, |
62 | | - }); |
63 | 89 | }, |
64 | 90 | }; |
0 commit comments