Skip to content
Merged
9 changes: 9 additions & 0 deletions components/hubspot/hubspot.app.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,15 @@ export default {
...opts,
});
},
getListMembershipsByJoinOrder({
listId, ...opts
}) {
return this.makeRequest({
api: API_PATH.CRMV3,
endpoint: `/lists/${listId}/memberships/join-order`,
...opts,
});
},
batchGetObjects({
objectType, ...opts
}) {
Expand Down
2 changes: 1 addition & 1 deletion components/hubspot/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pipedream/hubspot",
"version": "1.6.0",
"version": "1.7.0",
"description": "Pipedream Hubspot Components",
"main": "hubspot.app.mjs",
"keywords": [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
import common from "../common/common.mjs";
import {
DEFAULT_LIMIT,
DEFAULT_CONTACT_PROPERTIES,
} from "../../common/constants.mjs";
import sampleEmit from "./test-event.mjs";

export default {
...common,
key: "hubspot-new-contact-added-to-list",
name: "New Contact Added to List",
description:
"Emit new event when a contact is added to a HubSpot list. [See the documentation](https://developers.hubspot.com/docs/reference/api/crm/lists#get-%2Fcrm%2Fv3%2Flists%2F%7Blistid%7D%2Fmemberships%2Fjoin-order)",
version: "0.0.1",
type: "source",
dedupe: "unique",
props: {
...common.props,
info: {
type: "alert",
alertType: "info",
content: `Properties:\n\`${DEFAULT_CONTACT_PROPERTIES.join(", ")}\``,
},
lists: {
propDefinition: [
common.props.hubspot,
"lists",
],
description: "Select the lists to watch for new contacts.",
optional: false,
},
properties: {
propDefinition: [
common.props.hubspot,
"contactProperties",
() => ({
excludeDefaultProperties: true,
}),
],
label: "Additional contact properties to retrieve",
optional: true,
},
},
methods: {
...common.methods,
_getAfterToken(listId) {
const key = `list_${listId}_after_token`;
return this.db.get(key);
},
_setAfterToken(listId, afterToken) {
const key = `list_${listId}_after_token`;
this.db.set(key, afterToken);
},
_getLastRecordId(listId) {
const key = `list_${listId}_last_record_id`;
return this.db.get(key);
},
_setLastRecordId(listId, recordId) {
const key = `list_${listId}_last_record_id`;
this.db.set(key, recordId);
},
getTs() {
return Date.now();
},
generateMeta(membership, listInfo) {
const { recordId } = membership;
const ts = this.getTs();

return {
id: `${listInfo.listId}-${recordId}`,
summary: `Contact ${recordId} added to list: ${listInfo.name}`,
ts,
};
},
async getContactDetails(contactIds) {
if (!contactIds.length) return {};

const uniqueContactIds = [
...new Set(contactIds),
];

const { properties = [] } = this;
const allProperties = [
...new Set([
...DEFAULT_CONTACT_PROPERTIES,
...properties,
]),
];

const chunks = [];
const chunkSize = 100;
for (let i = 0; i < uniqueContactIds.length; i += chunkSize) {
chunks.push(uniqueContactIds.slice(i, i + chunkSize));
}

const contactMap = {};

const chunkPromises = chunks.map(async (chunk) => {
try {
const { results } = await this.hubspot.batchGetObjects({
objectType: "contacts",
data: {
inputs: chunk.map((id) => ({
id,
})),
properties: allProperties,
},
});
return results;
} catch (error) {
console.warn("Error fetching contact details for chunk:", error);
return [];
}
});

try {
const chunkResults = await Promise.all(chunkPromises);

chunkResults.forEach((results) => {
results.forEach((contact) => {
contactMap[contact.id] = contact;
});
});

return contactMap;
} catch (error) {
console.warn("Error processing contact details:", error);
return {};
}
},
async processListMemberships(listId, listInfo) {
const afterToken = this._getAfterToken(listId);
const lastRecordId = this._getLastRecordId(listId);
const newMemberships = [];

let params = {
limit: DEFAULT_LIMIT,
};

if (afterToken) {
params.after = afterToken;
}

try {
let hasMore = true;
let latestAfterToken = afterToken;
let latestRecordId = lastRecordId;

while (hasMore) {
const {
results, paging,
} =
await this.hubspot.getListMembershipsByJoinOrder({
listId,
params,
});

if (!results || results.length === 0) {
break;
}

for (const membership of results) {
if (lastRecordId && membership.recordId === lastRecordId) {
continue;
}

newMemberships.push({
membership,
listInfo,
});
latestRecordId = membership.recordId;
}

if (paging?.next?.after) {
latestAfterToken = paging.next.after;
params.after = paging.next.after;
} else {
hasMore = false;
}
}

if (latestAfterToken !== afterToken) {
this._setAfterToken(listId, latestAfterToken);
}
if (latestRecordId) {
this._setLastRecordId(listId, latestRecordId);
}
} catch (error) {
console.error(`Error processing list ${listId}:`, error);
}

return newMemberships;
},
async processResults() {
const { lists } = this;

if (!lists || lists.length === 0) {
console.warn("No lists selected to monitor");
return;
}

const allNewMemberships = [];

for (const listId of lists) {
try {
const listInfo = {
listId,
name: `List ${listId}`,
};

const newMemberships = await this.processListMemberships(
listId,
listInfo,
);
allNewMemberships.push(...newMemberships);
} catch (error) {
console.error(`Error processing list ${listId}:`, error);
}
}

if (allNewMemberships.length > 0) {
const contactIds = allNewMemberships.map(
({ membership }) => membership.recordId,
);
const contactDetails = await this.getContactDetails(contactIds);

for (const {
membership, listInfo,
} of allNewMemberships) {
const contactDetail = contactDetails[membership.recordId] || {};

const eventData = {
listId: listInfo.listId,
listName: listInfo.name,
contactId: membership.recordId,
contact: contactDetail,
membership,
addedAt: new Date().toISOString(),
};

const meta = this.generateMeta(membership, listInfo);
this.$emit(eventData, meta);
}
}
},
getParams() {
return {};
},
},
sampleEmit,
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
export default {
listId: "123456789",
listName: "List 123456789",
contactId: "31612976545",
contact: {
id: "31612976545",
properties: {
address: null,
annualrevenue: null,
city: "San Francisco",
company: "Acme Corp",
country: "United States",
createdate: "2024-08-26T15:30:45.123Z",
email: "[email protected]",
fax: null,
firstname: "John",
hs_createdate: "2024-08-26T15:30:45.123Z",
hs_email_domain: "example.com",
hs_language: null,
hs_object_id: "31612976545",
hs_persona: null,
industry: "Technology",
jobtitle: "Software Engineer",
lastmodifieddate: "2024-08-26T15:32:15.456Z",
lastname: "Doe",
lifecyclestage: "lead",
mobilephone: null,
numemployees: null,
phone: "+1-555-123-4567",
salutation: null,
state: "California",
website: "https://example.com",
zip: "94102",
},
createdAt: "2024-08-26T15:30:45.123Z",
updatedAt: "2024-08-26T15:32:15.456Z",
archived: false,
},
membership: {
recordId: "31612976545",
},
addedAt: "2024-08-26T15:35:20.789Z",
};
16 changes: 4 additions & 12 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.