Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/notion/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pipedream/notion",
"version": "0.1.24",
"version": "0.2.0",
"description": "Pipedream Notion Components",
"main": "notion.app.mjs",
"keywords": [
Expand Down
122 changes: 75 additions & 47 deletions components/notion/sources/updated-page/updated-page.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import notion from "../../notion.app.mjs";
import sampleEmit from "./test-event.mjs";
import base from "../common/base.mjs";
import constants from "../common/constants.mjs";
import md5 from "md5";
import zlib from "zlib";

export default {
...base,
key: "notion-updated-page",
name: "Updated Page in Database", /* eslint-disable-line pipedream/source-name */
description: "Emit new event when a page in a database is updated. To select a specific page, use `Updated Page ID` instead",
version: "0.0.19",
version: "0.1.0",
type: "source",
dedupe: "unique",
props: {
Expand All @@ -20,6 +20,12 @@ export default {
"databaseId",
],
},
includeNewPages: {
type: "boolean",
label: "Include New Pages",
description: "Set to `true` to emit events when pages are created. Set to `false` to ignore new pages.",
default: true,
},
properties: {
propDefinition: [
notion,
Expand All @@ -32,35 +38,30 @@ export default {
description: "Only emit events when one or more of the selected properties have changed",
optional: true,
},
includeNewPages: {
type: "boolean",
label: "Include New Pages",
description: "Set to `true` to emit events when pages are created. Set to `false` to ignore new pages.",
default: true,
},
},
hooks: {
async deploy() {
const properties = await this.getProperties();
const propertiesToCheck = await this.getPropertiesToCheck();
const propertyValues = {};
const params = this.lastUpdatedSortParam();
const pagesStream = this.notion.getPages(this.databaseId, params);
let count = 0;
let lastUpdatedTimestamp = 0;
for await (const page of pagesStream) {
propertyValues[page.id] = {};
for (const propertyName of properties) {
const hash = this.calculateHash(page.properties[propertyName]);
propertyValues[page.id][propertyName] = hash;
for (const propertyName of propertiesToCheck) {
const currentValue = this.maybeRemoveFileSubItems(page.properties[propertyName]);
propertyValues[page.id] = {
...propertyValues[page.id],
[propertyName]: currentValue,
};
}
lastUpdatedTimestamp = Math.max(
lastUpdatedTimestamp,
Date.parse(page?.last_edited_time),
Date.parse(page.last_edited_time),
);
if (count < 25) {
if (count++ < 25) {
this.emitEvent(page);
}
count++;
}
this._setPropertyValues(propertyValues);
this.setLastUpdatedTimestamp(lastUpdatedTimestamp);
Expand All @@ -69,23 +70,23 @@ export default {
methods: {
...base.methods,
_getPropertyValues() {
return this.db.get("propertyValues");
const compressed = this.db.get("propertyValues");
const buffer = Buffer.from(compressed, "base64");
const decompressed = zlib.inflateSync(buffer).toString();
return JSON.parse(decompressed);
},
_setPropertyValues(propertyValues) {
this.db.set("propertyValues", propertyValues);
const string = JSON.stringify(propertyValues);
const compressed = zlib.deflateSync(string).toString("base64");
this.db.set("propertyValues", compressed);
},
async getProperties() {
async getPropertiesToCheck() {
if (this.properties?.length) {
return this.properties;
}
const { properties } = await this.notion.retrieveDatabase(this.databaseId);
return Object.keys(properties);
},
calculateHash(property) {
const clone = structuredClone(property);
this.maybeRemoveFileSubItems(clone);
return md5(JSON.stringify(clone));
},
maybeRemoveFileSubItems(property) {
// Files & Media type:
// `url` and `expiry_time` are constantly updated by Notion, so ignore these fields
Expand All @@ -96,20 +97,27 @@ export default {
}
}
}
return property;
},
generateMeta(obj, summary) {
const { id } = obj;
const title = this.notion.extractPageTitle(obj);
const ts = Date.now();
return {
id: `${id}-${ts}`,
summary: `${summary}: ${title} - ${id}`,
summary: `${summary}: ${title}`,
ts,
};
},
emitEvent(page) {
const meta = this.generateMeta(page, constants.summaries.PAGE_UPDATED);
this.$emit(page, meta);
emitEvent(page, changes = [], isNewPage = true) {
const meta = isNewPage
? this.generateMeta(page, constants.summaries.PAGE_ADDED)
: this.generateMeta(page, constants.summaries.PAGE_UPDATED);
const event = {
page,
changes,
};
this.$emit(event, meta);
},
},
async run() {
Expand All @@ -126,39 +134,59 @@ export default {
},
};
let newLastUpdatedTimestamp = lastCheckedTimestamp;
const properties = await this.getProperties();
const propertiesToCheck = await this.getPropertiesToCheck();
const pagesStream = this.notion.getPages(this.databaseId, params);

for await (const page of pagesStream) {
const changes = [];
let isNewPage = false;
let propertyHasChanged = false;

newLastUpdatedTimestamp = Math.max(
newLastUpdatedTimestamp,
Date.parse(page?.last_edited_time),
Date.parse(page.last_edited_time),
);

let propertyChangeFound = false;
for (const propertyName of properties) {
const hash = this.calculateHash(page.properties[propertyName]);
const dbValue = propertyValues[page.id]?.[propertyName];
if (!propertyValues[page.id] || hash !== dbValue) {
propertyChangeFound = true;
if (lastCheckedTimestamp > Date.parse(page.last_edited_time)) {
break;
}

for (const propertyName of propertiesToCheck) {
const previousValue = structuredClone(propertyValues[page.id]?.[propertyName]);
const currentValue = this.maybeRemoveFileSubItems(page.properties[propertyName]);

const pageExistsInDB = propertyValues[page.id] != null;
const propertyChanged = JSON.stringify(previousValue) !== JSON.stringify(currentValue);

if (pageExistsInDB && propertyChanged) {
propertyHasChanged = true;
propertyValues[page.id] = {
...propertyValues[page.id],
[propertyName]: hash,
[propertyName]: currentValue,
};
changes.push({
property: propertyName,
previousValue,
currentValue,
});
}
}
if (!propertyChangeFound && Date.parse(page?.last_edited_time) <= lastCheckedTimestamp) {
continue;
}

if (!this.includeNewPages && page?.last_edited_time === page?.created_time) {
continue;
if (!pageExistsInDB && this.includeNewPages) {
isNewPage = true;
propertyHasChanged = true;
propertyValues[page.id] = {
[propertyName]: currentValue,
};
changes.push({
property: propertyName,
previousValue,
currentValue,
});
}
}

this.emitEvent(page);

if (Date.parse(page?.last_edited_time) < lastCheckedTimestamp) {
break;
if (propertyHasChanged) {
this.emitEvent(page, changes, isNewPage);
}
}

Expand Down
Loading