Skip to content
Merged
57 changes: 50 additions & 7 deletions components/hubspot/hubspot.app.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -1264,15 +1264,58 @@ export default {
...opts,
});
},
batchGetObjects({
async getListMembershipsByJoinOrder({
listId, ...opts
}) {
const MAX_RETRIES = 5;
const BASE_RETRY_DELAY = 500;
let success = false;
let retries = 0;
while (!success) {
try {
const response = await this.makeRequest({
api: API_PATH.CRMV3,
endpoint: `/lists/${listId}/memberships/join-order`,
...opts,
});
return response;
} catch (error) {
if (error.status === 429 && ++retries < MAX_RETRIES) {
const randomDelay = Math.floor(Math.random() * BASE_RETRY_DELAY);
const delay = BASE_RETRY_DELAY * (2 ** retries) + randomDelay;
await new Promise((resolve) => setTimeout(resolve, delay));
} else {
throw error;
}
}
}
},
async batchGetObjects({
objectType, ...opts
}) {
return this.makeRequest({
api: API_PATH.CRMV3,
endpoint: `/objects/${objectType}/batch/read`,
method: "POST",
...opts,
});
const MAX_RETRIES = 5;
const BASE_RETRY_DELAY = 500;
let success = false;
let retries = 0;
while (!success) {
try {
const response = await this.makeRequest({
api: API_PATH.CRMV3,
endpoint: `/objects/${objectType}/batch/read`,
method: "POST",
...opts,
});
return response;
} catch (error) {
if (error.status === 429 && ++retries < MAX_RETRIES) {
const randomDelay = Math.floor(Math.random() * BASE_RETRY_DELAY);
const delay = BASE_RETRY_DELAY * (2 ** retries) + randomDelay;
await new Promise((resolve) => setTimeout(resolve, delay));
} else {
throw error;
}
}
}
},
listNotes(opts = {}) {
return this.makeRequest({
Expand Down
2 changes: 1 addition & 1 deletion components/hubspot/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pipedream/hubspot",
"version": "1.6.2",
"version": "1.7.0",
"description": "Pipedream Hubspot Components",
"main": "hubspot.app.mjs",
"keywords": [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
import common from "../common/common.mjs";
import {
DEFAULT_LIMIT,
DEFAULT_CONTACT_PROPERTIES,
} from "../../common/constants.mjs";
import sampleEmit from "./test-event.mjs";

export default {
...common,
key: "hubspot-new-contact-added-to-list",
name: "New Contact Added to List",
description:
"Emit new event when a contact is added to a HubSpot list. [See the documentation](https://developers.hubspot.com/docs/reference/api/crm/lists#get-%2Fcrm%2Fv3%2Flists%2F%7Blistid%7D%2Fmemberships%2Fjoin-order)",
version: "0.0.1",
type: "source",
dedupe: "unique",
props: {
...common.props,
info: {

Check warning on line 19 in components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs

View workflow job for this annotation

GitHub Actions / Lint Code Base

Component prop info must have a description. See https://pipedream.com/docs/components/guidelines/#props

Check warning on line 19 in components/hubspot/sources/new-contact-added-to-list/new-contact-added-to-list.mjs

View workflow job for this annotation

GitHub Actions / Lint Code Base

Component prop info must have a label. See https://pipedream.com/docs/components/guidelines/#props
type: "alert",
alertType: "info",
content: `Properties:\n\`${DEFAULT_CONTACT_PROPERTIES.join(", ")}\``,
},
listId: {
propDefinition: [
common.props.hubspot,
"listId",
],
type: "string",
description: "Select the list to watch for new contacts.",
optional: false,
},
properties: {
propDefinition: [
common.props.hubspot,
"contactProperties",
() => ({
excludeDefaultProperties: true,
}),
],
label: "Additional contact properties to retrieve",
optional: true,
},
},
methods: {
...common.methods,
_getKey(listId) {
return `list_${listId}_last_timestamp`;
},
_getLastMembershipTimestamp(listId) {
return this.db.get(this._getKey(listId));
},
_setLastMembershipTimestamp(listId, timestamp) {
this.db.set(this._getKey(listId), timestamp);
},
getTs() {
return Date.now();
},
generateMeta(membership, listInfo) {
const {
recordId, membershipTimestamp,
} = membership;
const ts = membershipTimestamp
? new Date(membershipTimestamp).getTime()
: this.getTs();

return {
id: `${listInfo.listId}-${recordId}-${ts}`,
summary: `Contact ${recordId} added to list: ${listInfo.name}`,
ts,
};
},
async getContactDetails(contactIds) {
if (!contactIds.length) return {};

const { properties = [] } = this;
const allProperties = [
...DEFAULT_CONTACT_PROPERTIES,
...properties,
];

const chunks = [];
const chunkSize = 100;
for (let i = 0; i < contactIds.length; i += chunkSize) {
chunks.push(contactIds.slice(i, i + chunkSize));
}

const contactMap = {};

try {
for (const chunk of chunks) {
try {
const { results } = await this.hubspot.batchGetObjects({
objectType: "contacts",
data: {
inputs: chunk.map((id) => ({
id,
})),
properties: allProperties,
},
});

results.forEach((contact) => {
contactMap[contact.id] = contact;
});
} catch (error) {
console.warn(
`Error fetching contact details for chunk of ${chunk.length} contacts:`,
error,
);
}
}

return contactMap;
} catch (error) {
console.warn("Error processing contact details:", error);
return {};
}
},
async processListMemberships(listId, listInfo) {
const lastMembershipTimestamp = this._getLastMembershipTimestamp(listId);
const newMemberships = [];

let params = {
limit: DEFAULT_LIMIT,
};

try {
let hasMore = true;
let latestMembershipTimestamp = lastMembershipTimestamp;

if (!lastMembershipTimestamp) {
const baselineTimestamp = new Date().toISOString();
this._setLastMembershipTimestamp(listId, baselineTimestamp);
return newMemberships;
}

while (hasMore) {
const {
results, paging,
} =
await this.hubspot.getListMembershipsByJoinOrder({
listId,
params,
});

if (!results) {
console.warn(
`No results returned from API for list ${listId} - possible API issue`,
);
break;
}

if (results.length === 0) {
break;
}

for (const membership of results) {
const { membershipTimestamp } = membership;

if (membershipTimestamp > lastMembershipTimestamp) {
newMemberships.push({
membership,
listInfo,
});

if (
!latestMembershipTimestamp ||
membershipTimestamp > latestMembershipTimestamp
) {
latestMembershipTimestamp = membershipTimestamp;
}
}
}

if (paging?.next?.after) {
params.after = paging.next.after;
} else {
hasMore = false;
}
}

if (latestMembershipTimestamp !== lastMembershipTimestamp) {
this._setLastMembershipTimestamp(listId, latestMembershipTimestamp);
}
} catch (error) {
console.error(`Error processing list ${listId}:`, error);
}

return newMemberships;
},
async processResults() {
const {
listId,
listInfo: { name },
} = this;

if (!listId) {
console.warn("No list selected to monitor");
return;
}

const listInfo = {
listId,
name: `List ${name}`,
};

try {
const newMemberships = await this.processListMemberships(
listId,
listInfo,
);

if (newMemberships.length > 0) {
const contactIds = newMemberships.map(
({ membership }) => membership.recordId,
);
const contactDetails = await this.getContactDetails(contactIds);

for (const {
membership, listInfo,
} of newMemberships) {
const contactDetail = contactDetails[membership.recordId] || {};

const eventData = {
listId: listInfo.listId,
listName: listInfo.name,
contactId: membership.recordId,
contact: contactDetail,
membership,
addedAt: membership.membershipTimestamp,
};

const meta = this.generateMeta(membership, listInfo);
this.$emit(eventData, meta);
}
}
} catch (error) {
console.error(`Error processing list ${listId}:`, error);
}
},
getParams() {
return {};
},
},
sampleEmit,
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
export default {
listId: "123456789",
listName: "List 123456789",
contactId: "31612976545",
contact: {
id: "31612976545",
properties: {
address: null,
annualrevenue: null,
city: "San Francisco",
company: "Acme Corp",
country: "United States",
createdate: "2024-08-26T15:30:45.123Z",
email: "[email protected]",
fax: null,
firstname: "John",
hs_createdate: "2024-08-26T15:30:45.123Z",
hs_email_domain: "example.com",
hs_language: null,
hs_object_id: "31612976545",
hs_persona: null,
industry: "Technology",
jobtitle: "Software Engineer",
lastmodifieddate: "2024-08-26T15:32:15.456Z",
lastname: "Doe",
lifecyclestage: "lead",
mobilephone: null,
numemployees: null,
phone: "+1-555-123-4567",
salutation: null,
state: "California",
website: "https://example.com",
zip: "94102",
},
createdAt: "2024-08-26T15:30:45.123Z",
updatedAt: "2024-08-26T15:32:15.456Z",
archived: false,
},
membership: {
recordId: "31612976545",
},
addedAt: "2024-08-26T15:35:20.789Z",
};
3 changes: 1 addition & 2 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading