Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/gmail/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pipedream/gmail",
"version": "0.2.0",
"version": "0.2.1",
"description": "Pipedream Gmail Components",
"main": "gmail.app.mjs",
"keywords": [
Expand Down
51 changes: 33 additions & 18 deletions components/gmail/sources/common/polling-history.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ export default {
const params = {
maxResults: constants.HISTORICAL_EVENTS,
};
if (this.label) {
params.labelIds = this.label;
if (this.labels?.length) {
params.labelIds = this.labelIds;
}
let { messages } = await this.gmail.listMessages({
params,
...params,
});
if (!messages?.length) {
return;
Expand All @@ -48,30 +48,42 @@ export default {
startHistoryId,
historyTypes: this.getHistoryTypes(),
};
if (this.label) {
opts.labelId = this.label;
}

const {
history, historyId,
} = await this.gmail.listHistory(opts);
const length = this.labels?.length > 0
? this.labels.length
: 1;
let maxHistoryId = 0;

if (!history) {
return;
}
this._setLastHistoryId(historyId);
for (let i = 0; i < length; i++) {
if (this.labels) {
opts.labelId = this.labels[i];
}

const responseArray = this.filterHistory(history);
for (const item of responseArray) {
await this.emitFullMessage(item.messages[0].id);
const {
history, historyId,
} = await this.gmail.listHistory(opts);

if (!history) {
continue;
}

maxHistoryId = Math.max(maxHistoryId, historyId);
const responseArray = this.filterHistory(history);

for (const item of responseArray) {
await this.emitFullMessage(item.messages[0].id);
}
}
if (maxHistoryId > 0) {
this._setLastHistoryId(maxHistoryId);
}
},
async emitRecentMessages() {
const opts = {
maxResults: constants.HISTORICAL_EVENTS,
};
if (this.label) {
opts.labelIds = this.label;
if (this.labels?.length) {
opts.labelIds = this.labels;
}
let { messages } = await this.gmail.listMessages(opts);
if (!messages?.length) {
Expand All @@ -88,6 +100,9 @@ export default {
message = await this.gmail.getMessage({
id,
});
if (this.excludeLabels && message.labelIds.some((i) => this.excludeLabels.includes(i))) {
return;
}
} catch {
console.log(`Message ${id} not found`);
}
Expand Down
91 changes: 60 additions & 31 deletions components/gmail/sources/new-email-received/new-email-received.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
name: "New Email Received",
description: "Emit new event when a new email is received.",
type: "source",
version: "0.1.9",
version: "0.1.10",
dedupe: "unique",
props: {
gmail,
Expand All @@ -31,14 +31,14 @@
"Configuring this source as a `webhook` (instant) trigger requires a custom OAuth client. [Refer to the guide here to get started](https://pipedream.com/apps/gmail/#getting-started).",
reloadProps: true,
},
serviceAccountKeyJson: {

Check warning on line 34 in components/gmail/sources/new-email-received/new-email-received.mjs

View workflow job for this annotation

GitHub Actions / Lint Code Base

Component prop serviceAccountKeyJson must have a description. See https://pipedream.com/docs/components/guidelines/#props
type: "string",
label: "Service Account Key JSON",
optional: true,
hidden: true,
reloadProps: true,
},
serviceAccountKeyJsonInstructions: {

Check warning on line 41 in components/gmail/sources/new-email-received/new-email-received.mjs

View workflow job for this annotation

GitHub Actions / Lint Code Base

Component prop serviceAccountKeyJsonInstructions must have a label. See https://pipedream.com/docs/components/guidelines/#props

Check warning on line 41 in components/gmail/sources/new-email-received/new-email-received.mjs

View workflow job for this annotation

GitHub Actions / Lint Code Base

Component prop serviceAccountKeyJsonInstructions must have a description. See https://pipedream.com/docs/components/guidelines/#props
type: "alert",
alertType: "info",
content: `1) [Create a service account in GCP](https://cloud.google.com/iam/docs/creating-managing-service-accounts) and set the following permission: **Pub/Sub Admin**
Expand Down Expand Up @@ -72,16 +72,31 @@
hidden: true,
reloadProps: true,
},
label: {
labels: {
propDefinition: [
gmail,
"label",
],
default: "INBOX",
type: "string[]",
label: "Labels",
default: [
"INBOX",
],
optional: true,
hidden: true,
},
excludeLabels: {
propDefinition: [
gmail,
"label",
],
type: "string[]",
label: "Exclude Labels",
description: "Emails with the specified labels will be excluded from results",
optional: true,
hidden: true,
},
permissionAlert: {

Check warning on line 99 in components/gmail/sources/new-email-received/new-email-received.mjs

View workflow job for this annotation

GitHub Actions / Lint Code Base

Component prop permissionAlert must have a label. See https://pipedream.com/docs/components/guidelines/#props

Check warning on line 99 in components/gmail/sources/new-email-received/new-email-received.mjs

View workflow job for this annotation

GitHub Actions / Lint Code Base

Component prop permissionAlert must have a description. See https://pipedream.com/docs/components/guidelines/#props
type: "alert",
alertType: "error",
content: `Unable to grant publish permission to Gmail API service account.
Expand All @@ -94,7 +109,7 @@
`,
hidden: true,
},
latencyWarningAlert: {

Check warning on line 112 in components/gmail/sources/new-email-received/new-email-received.mjs

View workflow job for this annotation

GitHub Actions / Lint Code Base

Component prop latencyWarningAlert must have a label. See https://pipedream.com/docs/components/guidelines/#props

Check warning on line 112 in components/gmail/sources/new-email-received/new-email-received.mjs

View workflow job for this annotation

GitHub Actions / Lint Code Base

Component prop latencyWarningAlert must have a description. See https://pipedream.com/docs/components/guidelines/#props
type: "alert",
alertType: "warning",
content:
Expand Down Expand Up @@ -221,7 +236,8 @@
};
}
}
props.label.hidden = false;
props.labels.hidden = false;
props.excludeLabels.hidden = false;
return newProps;
},
hooks: {
Expand Down Expand Up @@ -354,8 +370,8 @@
path: "/users/me/watch",
data: {
topicName,
labelIds: [
this.label || "INBOX",
labelIds: this.labels || [
"INBOX",
],
},
});
Expand Down Expand Up @@ -396,14 +412,18 @@
};
},
filterHistory(history) {
return this.label
? history.filter(
(item) =>
item.messagesAdded?.length &&
item.messagesAdded[0].message.labelIds &&
item.messagesAdded[0].message.labelIds.includes(this.label),
)
: history.filter((item) => item.messagesAdded?.length);
let filteredHistory = history.filter((item) => item.messagesAdded?.length);
if (this.labels) {
filteredHistory = filteredHistory.filter((item) =>
item.messagesAdded[0].message.labelIds &&
item.messagesAdded[0].message.labelIds.some((i) => this.labels.includes(i)));
}
if (this.excludeLabels) {
filteredHistory = filteredHistory.filter((item) =>
item.messagesAdded[0].message.labelIds &&
!(item.messagesAdded[0].message.labelIds.some((i) => this.excludeLabels.includes(i))));
}
return filteredHistory;
},
async getMessageDetails(ids) {
const messages = await Promise.all(ids.map(async (id) => {
Expand All @@ -419,14 +439,19 @@
}));
return messages;
},
getHistoryResponse(startHistoryId) {
return this.gmail.listHistory({
startHistoryId,
historyTypes: [
"messageAdded",
],
labelId: this.label,
});
async getHistoryResponses(startHistoryId) {
const historyResponses = [];
for (const labelId of this.labels) {
const response = await this.gmail.listHistory({
startHistoryId,
historyTypes: [
"messageAdded",
],
labelId,
});
historyResponses.push(response);
}
return historyResponses;
},
},
async run(event) {
Expand Down Expand Up @@ -495,9 +520,9 @@
console.log("Using startHistoryId:", startHistoryId);

// Fetch the history
let historyResponse;
let historyResponses;
try {
historyResponse = await this.getHistoryResponse(startHistoryId);
historyResponses = await this.getHistoryResponses(startHistoryId);
} catch {
// catch error thrown if startHistoryId is invalid or expired

Expand All @@ -507,19 +532,20 @@
// set startHistoryId to the historyId received from the webhook
startHistoryId = parseInt(receivedHistoryId);
console.log("Using startHistoryId:", startHistoryId);
historyResponse = await this.getHistoryResponse(startHistoryId);
historyResponses = await this.getHistoryResponses(startHistoryId);
}

console.log(
"History response:",
JSON.stringify(historyResponse, null, 2),
"History responses:",
JSON.stringify(historyResponses, null, 2),
);

// Process history to find new messages
const newMessages = [];
if (historyResponse.history) {
for (const historyItem of historyResponse.history) {
if (historyItem.messagesAdded) {
for (const historyResponse of historyResponses) {
if (historyResponse.history) {
const historyResponseFiltered = this.filterHistory(historyResponse.history);
for (const historyItem of historyResponseFiltered) {
newMessages.push(
...historyItem.messagesAdded.map((msg) => msg.message),
);
Expand All @@ -540,7 +566,10 @@
console.log("Fetched message details count:", messageDetails.length);

// Store the latest historyId in the db
const latestHistoryId = historyResponse.historyId || receivedHistoryId;
let latestHistoryId = receivedHistoryId;
for (const historyResponse of historyResponses) {
latestHistoryId = Math.max(latestHistoryId, historyResponse.historyId);
}
this._setLastProcessedHistoryId(latestHistoryId);
console.log("Updated lastProcessedHistoryId:", latestHistoryId);

Expand Down
14 changes: 8 additions & 6 deletions components/gmail/sources/new-labeled-email/new-labeled-email.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ export default {
name: "New Labeled Email",
description: "Emit new event when a new email is labeled.",
type: "source",
version: "0.0.6",
version: "0.0.7",
dedupe: "unique",
props: {
...common.props,
gmail,
label: {
labels: {
propDefinition: [
gmail,
"label",
],
type: "string[]",
label: "Labels",
},
},
methods: {
Expand All @@ -30,17 +32,17 @@ export default {
},
generateMeta(message) {
return {
id: `${message.id}-${this.label}`,
summary: `A new message with ID: ${message.id} was labeled with "${this.label}"`,
id: `${message.id}-${message.historyId}`,
summary: `A new message with ID: ${message.id} was labeled"`,
ts: +message.internalDate,
};
},
filterHistory(history) {
return history.filter((item) =>
(item.labelsAdded && item.labelsAdded[0].labelIds.includes(this.label))
(item.labelsAdded && item.labelsAdded[0].labelIds.some((i) => this.labels.includes(i)))
|| (item.messagesAdded
&& item.messagesAdded[0].message.labelIds
&& item.messagesAdded[0].message.labelIds.includes(this.label)));
&& item.messagesAdded[0].message.labelIds.some((i) => this.labels.includes(i))));
},
},
sampleEmit,
Expand Down
Loading