Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/gmail/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pipedream/gmail",
"version": "0.1.3",
"version": "0.1.4",
"description": "Pipedream Gmail Components",
"main": "gmail.app.mjs",
"keywords": [
Expand Down
6 changes: 4 additions & 2 deletions components/gmail/sources/common/verify-client-id.mjs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
const restrictedClientId = "38931588176-fnd4m13k1mjb6djallp1m9kr7o8kslcu.apps.googleusercontent.com";
import { PD_OFFICIAL_GMAIL_OAUTH_CLIENT_ID } from "@pipedream/platform";

export default {
methods: {
async checkClientId() {
return this.gmail.$auth.oauth_client_id !== restrictedClientId;
return (
this.gmail.$auth.oauth_client_id !== PD_OFFICIAL_GMAIL_OAUTH_CLIENT_ID
);
},
},
};
119 changes: 74 additions & 45 deletions components/gmail/sources/new-email-received/new-email-received.mjs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import gmail from "../../gmail.app.mjs";
import common from "../common/polling-history.mjs";
import {
axios, DEFAULT_POLLING_SOURCE_TIMER_INTERVAL, ConfigurationError,
axios,
DEFAULT_POLLING_SOURCE_TIMER_INTERVAL,
ConfigurationError,
} from "@pipedream/platform";
import { PubSub } from "@google-cloud/pubsub";
import { v4 as uuidv4 } from "uuid";
Expand All @@ -13,7 +15,7 @@ export default {
name: "New Email Received",
description: "Emit new event when a new email is received.",
type: "source",
version: "0.1.0",
version: "0.1.1",
dedupe: "unique",
props: {
gmail,
Expand All @@ -25,7 +27,8 @@ export default {
"webhook",
"polling",
],
description: "Configuring this trigger as a `webhook` requires a Custom OAuth client, which is available on Pipedream's [Advanced plan or higher](https://pipedream.com/pricing?plan=Advanced). Refer to the getting started guide [here](https://pipedream.com/apps/gmail/triggers/new-email-received#getting-started).",
description:
"Configuring this source as a `webhook` (instant) trigger requires a custom OAuth client. [Refer to the guide here to get started](https://pipedream.com/apps/gmail/triggers/new-email-received#getting-started).",
reloadProps: true,
},
serviceAccountKeyJson: {
Expand All @@ -48,7 +51,8 @@ export default {
topicType: {
type: "string",
label: "Pub/Sub Topic",
description: "Do you have an existing Pub/Sub topic, or would you like to create a new one?",
description:
"Do you have an existing Pub/Sub topic, or would you like to create a new one?",
options: [
"existing",
"new",
Expand Down Expand Up @@ -93,7 +97,8 @@ export default {
latencyWarningAlert: {
type: "alert",
alertType: "warning",
content: "Please allow up to 1 minute for deployment. We're setting up your real-time email notifications behind the scenes.",
content:
"Please allow up to 1 minute for deployment. We're setting up your real-time email notifications behind the scenes.",
hidden: true,
},
},
Expand All @@ -111,7 +116,9 @@ export default {
// verify that a Custom OAuth client is being used
const isValidClientId = await this.checkClientId();
if (!isValidClientId) {
throw new ConfigurationError("You must use a custom OAuth client to use this component. Please see [here](https://pipedream.com/docs/connected-accounts/oauth-clients) for more details.");
throw new ConfigurationError(
"Configuring this source as a `webhook` (instant) trigger requires a custom OAuth client. [Refer to the guide here to get started](https://pipedream.com/apps/gmail/triggers/new-email-received#getting-started).",
);
}

newProps.http = "$.interface.http";
Expand Down Expand Up @@ -143,7 +150,9 @@ export default {
if (this.topicType === "new") {
const authKeyJSON = JSON.parse(this.serviceAccountKeyJson);
const { project_id: projectId } = authKeyJSON;
topicName = `projects/${projectId}/topics/${this.convertNameToValidPubSubTopicName(uuidv4())}`;
topicName = `projects/${projectId}/topics/${this.convertNameToValidPubSubTopicName(
uuidv4(),
)}`;
props.topic.default = topicName;
props.topic.reloadProps = false;
} else {
Expand All @@ -160,9 +169,14 @@ export default {
const [
policy,
] = await topic.iam.getPolicy();
hasPublisherRole = policy.bindings.find(({
members, role,
}) => members.includes("serviceAccount:[email protected]") && role === "roles/pubsub.publisher");
hasPublisherRole = policy.bindings.find(
({
members, role,
}) =>
members.includes(
"serviceAccount:[email protected]",
) && role === "roles/pubsub.publisher",
);
} catch {
console.log("Could not retrieve iam policy");
}
Expand Down Expand Up @@ -216,7 +230,8 @@ export default {

// Create subscription
const pushEndpoint = this.http.endpoint;
const subscriptionName = this.convertNameToValidPubSubTopicName(pushEndpoint);
const subscriptionName =
this.convertNameToValidPubSubTopicName(pushEndpoint);
const subscriptionOptions = {
pushConfig: {
pushEndpoint,
Expand Down Expand Up @@ -267,9 +282,7 @@ export default {
sdkParams() {
const authKeyJSON = JSON.parse(this.serviceAccountKeyJson);
const {
project_id: projectId,
client_email,
private_key,
project_id: projectId, client_email, private_key,
} = authKeyJSON;
const sdkParams = {
credentials: {
Expand All @@ -291,18 +304,18 @@ export default {
},
convertNameToValidPubSubTopicName(name) {
// For valid names, see https://cloud.google.com/pubsub/docs/admin#resource_names
return name
// Must not start with `goog`. We add a `pd-` at the beginning if that's the case.
.replace(/(^goog.*)/g, "pd-$1")
// Must start with a letter, otherwise we add `pd-` at the beginning.
.replace(/^(?![a-zA-Z]+)/, "pd-")
// Only certain characters are allowed, the rest will be replaced with a `-`.
.replace(/[^a-zA-Z0-9_\-.~+%]+/g, "-");
return (
name
// Must not start with `goog`. We add a `pd-` at the beginning if that's the case.
.replace(/(^goog.*)/g, "pd-$1")
// Must start with a letter, otherwise we add `pd-` at the beginning.
.replace(/^(?![a-zA-Z]+)/, "pd-")
// Only certain characters are allowed, the rest will be replaced with a `-`.
.replace(/[^a-zA-Z0-9_\-.~+%]+/g, "-")
);
},
makeRequest({
$ = this,
path,
...opts
$ = this, path, ...opts
}) {
return axios($, {
url: `https://gmail.googleapis.com/gmail/v1${path}`,
Expand Down Expand Up @@ -339,7 +352,8 @@ export default {
] = await pubSubClient.createTopic(topicName);
console.log(`Topic ${topicName} created.`);
} catch (error) {
if (error.code === 6) { // Already exists
if (error.code === 6) {
// Already exists
topic = pubSubClient.topic(topicName);
} else {
throw error;
Expand All @@ -354,7 +368,8 @@ export default {
return {
id: msg.id,
threadId: msg.threadId,
subject: headers.find((h) => h.name.toLowerCase() === "subject")?.value,
subject: headers.find((h) => h.name.toLowerCase() === "subject")
?.value,
from: headers.find((h) => h.name.toLowerCase() === "from")?.value,
to: headers.find((h) => h.name.toLowerCase() === "to")?.value,
date: headers.find((h) => h.name.toLowerCase() === "date")?.value,
Expand All @@ -370,16 +385,18 @@ export default {
generateMeta(message) {
return {
id: message.id,
summary: `A new message with ID: ${message.id} was received"`,
summary: message.snippet,
ts: message.internalDate,
};
},
filterHistory(history) {
return this.label
? history.filter((item) =>
item.messagesAdded?.length
&& item.messagesAdded[0].message.labelIds
&& item.messagesAdded[0].message.labelIds.includes(this.label))
? history.filter(
(item) =>
item.messagesAdded?.length &&
item.messagesAdded[0].message.labelIds &&
item.messagesAdded[0].message.labelIds.includes(this.label),
)
: history.filter((item) => item.messagesAdded?.length);
},
},
Expand Down Expand Up @@ -414,7 +431,9 @@ export default {
if (!pubsubMessage) {
return;
}
const decodedData = JSON.parse(Buffer.from(pubsubMessage.data, "base64").toString());
const decodedData = JSON.parse(
Buffer.from(pubsubMessage.data, "base64").toString(),
);

console.log("Decoded Pub/Sub data:", decodedData);

Expand All @@ -425,8 +444,10 @@ export default {
console.log("Last processed historyId:", lastProcessedHistoryId);

// Use the minimum of lastProcessedHistoryId and the received historyId
const startHistoryId =
Math.min(parseInt(lastProcessedHistoryId), parseInt(receivedHistoryId));
const startHistoryId = Math.min(
parseInt(lastProcessedHistoryId),
parseInt(receivedHistoryId),
);
console.log("Using startHistoryId:", startHistoryId);

// Fetch the history
Expand All @@ -438,14 +459,19 @@ export default {
labelId: this.label,
});

console.log("History response:", JSON.stringify(historyResponse, null, 2));
console.log(
"History response:",
JSON.stringify(historyResponse, null, 2),
);

// Process history to find new messages
const newMessages = [];
if (historyResponse.history) {
for (const historyItem of historyResponse.history) {
if (historyItem.messagesAdded) {
newMessages.push(...historyItem.messagesAdded.map((msg) => msg.message));
newMessages.push(
...historyItem.messagesAdded.map((msg) => msg.message),
);
}
}
}
Expand All @@ -466,15 +492,18 @@ export default {
console.log("Updated lastProcessedHistoryId:", latestHistoryId);

if (processedEmails?.length) {
this.$emit({
newEmailsCount: processedEmails.length,
emails: processedEmails,
lastProcessedHistoryId: latestHistoryId,
}, {
id: processedEmails[0].id,
summary: processedEmails[0].subject,
ts: Date.now(),
});
this.$emit(
{
newEmailsCount: processedEmails.length,
emails: processedEmails,
lastProcessedHistoryId: latestHistoryId,
},
{
id: processedEmails[0].id,
summary: processedEmails[0].subject,
ts: Date.now(),
},
);
}
}
},
Expand Down
3 changes: 2 additions & 1 deletion platform/lib/constants.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export const DEFAULT_POLLING_SOURCE_TIMER_INTERVAL = 60 * 15
export const DEFAULT_POLLING_SOURCE_TIMER_INTERVAL = 60 * 15;
export const PD_OFFICIAL_GMAIL_OAUTH_CLIENT_ID = "38931588176-fnd4m13k1mjb6djallp1m9kr7o8kslcu.apps.googleusercontent.com";
2 changes: 1 addition & 1 deletion platform/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pipedream/platform",
"version": "3.0.1",
"version": "3.0.2",
"description": "Pipedream platform globals (typing and runtime type checking)",
"homepage": "https://pipedream.com",
"main": "dist/index.js",
Expand Down
Loading