diff --git a/components/lobste_rs/lobste_rs.app.mjs b/components/lobste_rs/lobste_rs.app.mjs index f7b6d82bf7ea4..c2ed540a53aaf 100644 --- a/components/lobste_rs/lobste_rs.app.mjs +++ b/components/lobste_rs/lobste_rs.app.mjs @@ -1,11 +1,111 @@ +import { axios } from "@pipedream/platform"; +import FeedParser from "feedparser"; +import hash from "object-hash"; + export default { type: "app", app: "lobste_rs", propDefinitions: {}, methods: { - // this.$auth contains connected account data - authKeys() { - console.log(Object.keys(this.$auth)); + makeRequest({ + $ = this, ...config + }) { + return axios($, config); + }, + itemTs(item = {}) { + const { + pubdate, pubDate, date_published, + } = item; + const itemPubDate = pubdate ?? pubDate ?? date_published; + if (itemPubDate) { + return +new Date(itemPubDate); + } + return +new Date(); + }, + itemKey(item = {}) { + const { + id, guid, link, title, + } = item; + const itemId = id ?? guid ?? link ?? title; + if (itemId) { + // reduce itemId length for deduping + return itemId.length > 64 + ? itemId.slice(-64) + : itemId; + } + return hash(item); + }, + async fetchFeed(url) { + const res = await axios(this, { + url, + method: "GET", + headers: { + "accept": "text/html, application/xhtml+xml, application/xml;q=0.9, */*;q=0.8, application/json, application/feed+json", + }, + responseType: "stream", // stream is required for feedparser + returnFullResponse: true, + }); + return { + data: res.data, + contentType: res.headers["content-type"], + }; + }, + async parseFeed(stream) { + const feedparser = new FeedParser({ + addmeta: true, + }); + const items = []; + await new Promise((resolve, reject) => { + feedparser.on("error", reject); + feedparser.on("end", resolve); + feedparser.on("readable", function () { + let item = this.read(); + + while (item) { + for (const k in item) { + if (item[`rss:${k}`]) { + delete item[`rss:${k}`]; + continue; + } + const o = item[k]; + if (o == null || (typeof o === "object" && !Object.keys(o).length) || Array.isArray(o) && !o.length) { + delete item[k]; + continue; + } + } + items.push(item); + item = this.read(); + } + }); + stream.pipe(feedparser); + }); + return items; + }, + isJSONFeed(response) { + const acceptedJsonFeedMimes = [ + "application/feed+json", + "application/json", + ]; + return acceptedJsonFeedMimes.includes(response?.contentType?.toLowerCase()); + }, + async parseJSONFeed(stream) { + const buffer = await new Promise((resolve, reject) => { + const _buf = []; + stream.on("data", (chunk) => _buf.push(chunk)); + stream.on("end", () => resolve(Buffer.concat(_buf))); + stream.on("error", (err) => reject(err)); + }); + const contentString = buffer.toString(); + const feed = JSON.parse(contentString); + return feed?.items || []; + }, + async fetchAndParseFeed(url) { + const response = await this.fetchFeed(url); + if (this.isJSONFeed(response)) { + return await this.parseJSONFeed(response.data); + } else { + return await this.parseFeed(response.data); + } }, }, }; diff --git a/components/lobste_rs/package.json b/components/lobste_rs/package.json index 412690b3a7f9a..5372a3a2fb863 100644 --- a/components/lobste_rs/package.json +++ b/components/lobste_rs/package.json @@ -1,6 +1,6 @@ { "name": "@pipedream/lobste_rs", - "version": "0.0.1", + "version": "0.1.0", "description": "Pipedream lobste.rs Components", "main": "lobste_rs.app.mjs", "keywords": [ @@ -11,5 +11,10 @@ "author": "Pipedream (https://pipedream.com/)", "publishConfig": { "access": "public" + }, + "dependencies": { + "@pipedream/platform": "^3.0.3", + "feedparser": "^2.2.10", + "object-hash": "^3.0.0" } -} \ No newline at end of file +} diff --git a/components/lobste_rs/sources/new-comment-in-thread/new-comment-in-thread.mjs b/components/lobste_rs/sources/new-comment-in-thread/new-comment-in-thread.mjs new file mode 100644 index 0000000000000..e1dcec2a8776c --- /dev/null +++ b/components/lobste_rs/sources/new-comment-in-thread/new-comment-in-thread.mjs @@ -0,0 +1,43 @@ +import lobsters from "../../lobste_rs.app.mjs"; +import { DEFAULT_POLLING_SOURCE_TIMER_INTERVAL } from "@pipedream/platform"; + +export default { + key: "lobste_rs-new-comment-in-thread", + name: "New Comment in Thread", + description: "Emit new event when a new comment is added to a thread.", + version: "0.0.1", + type: "source", + dedupe: "unique", + props: { + lobsters, + timer: { + type: "$.interface.timer", + default: { + intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL, + }, + }, + url: { + type: "string", + label: "URL", + description: "The URL of the comment thread to retrieve. E.g. `https://lobste.rs/s/yqjtvy/cloud_container_iceberg`", + }, + }, + methods: { + generateMeta(comment) { + return { + id: comment.short_id, + summary: comment.comment_plain.substring(0, 50), + ts: Date.parse(comment.created_at), + }; + }, + }, + async run() { + const { comments } = await this.lobsters.makeRequest({ + url: `${this.url}.json`, + }); + for (const comment of comments.reverse()) { + const meta = this.generateMeta(comment); + this.$emit(comment, meta); + } + }, +}; diff --git a/components/lobste_rs/sources/new-story-by-user/new-story-by-user.mjs b/components/lobste_rs/sources/new-story-by-user/new-story-by-user.mjs new file mode 100644 index 0000000000000..2a4b2228c0577 --- /dev/null +++ b/components/lobste_rs/sources/new-story-by-user/new-story-by-user.mjs @@ -0,0 +1,55 @@ +import lobsters from "../../lobste_rs.app.mjs"; +import { DEFAULT_POLLING_SOURCE_TIMER_INTERVAL } from "@pipedream/platform"; + +export default { + key: "lobste_rs-new-story-by-user", + name: "New Story by User", + description: "Emit new event when a new story is posted by the specified user.", + version: "0.0.1", + type: "source", + dedupe: "unique", + props: { + lobsters, + timer: { + type: "$.interface.timer", + default: { + intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL, + }, + }, + username: { + type: "string", + label: "Username", + description: "The user to watch for stories from. E.g. `adamgordonbell`", + }, + publishedAfter: { + type: "string", + label: "Published After", + description: "Emit items published after the specified date in ISO 8601 format .e.g `2022-12-07T12:57:10+07:00`", + default: new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString(), + }, + }, + methods: { + generateMeta(item) { + return { + id: this.lobsters.itemKey(item), + summary: item.title, + ts: Date.now(), + }; + }, + }, + async run() { + const url = `https://lobste.rs/~${this.username}/stories.rss`; + + const items = await this.lobsters.fetchAndParseFeed(url); + for (const item of items.reverse()) { + const publishedAfter = +new Date(this.publishedAfter); + const ts = this.lobsters.itemTs(item); + if (Number.isNaN(publishedAfter) || publishedAfter > ts) { + continue; + } + + const meta = this.generateMeta(item); + this.$emit(item, meta); + } + }, +}; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b0d4136947893..9b6d6aa6baaf9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -5872,7 +5872,17 @@ importers: specifier: ^1.0.0 version: 1.3.5 - components/lobste_rs: {} + components/lobste_rs: + dependencies: + '@pipedream/platform': + specifier: ^3.0.3 + version: 3.0.3 + feedparser: + specifier: ^2.2.10 + version: 2.2.10 + object-hash: + specifier: ^3.0.0 + version: 3.0.0 components/lodgify: {}