Skip to content

Commit 731d0f7

Browse files
committed
Adding timer source
1 parent c586933 commit 731d0f7

File tree

3 files changed

+114
-7
lines changed

3 files changed

+114
-7
lines changed

components/airtop/airtop.app.mjs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ export default {
1515

1616
const data = await this.listSessions({
1717
params: {
18+
status: "running",
1819
limit,
1920
offset,
2021
},
@@ -124,15 +125,9 @@ export default {
124125
...args,
125126
});
126127
},
127-
async listSessions({
128-
params, ...args
129-
}) {
128+
async listSessions(args = {}) {
130129
return this._makeRequest({
131130
url: "/sessions",
132-
params: {
133-
status: "running",
134-
...params,
135-
},
136131
...args,
137132
});
138133
},
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { DEFAULT_POLLING_SOURCE_TIMER_INTERVAL } from "@pipedream/platform";
2+
import airtop from "../../airtop.app.mjs";
3+
4+
export default {
5+
props: {
6+
airtop,
7+
db: "$.service.db",
8+
timer: {
9+
type: "$.interface.timer",
10+
default: {
11+
intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL,
12+
},
13+
},
14+
},
15+
methods: {
16+
_getLastTs() {
17+
return this.db.get("lastTs");
18+
},
19+
_setLastTs(lastTs) {
20+
this.db.set("lastTs", lastTs);
21+
},
22+
isNew(resource, lastTs) {
23+
if (!resource.dateCreated || !lastTs) {
24+
return true;
25+
}
26+
return new Date(resource.dateCreated).getTime() > lastTs;
27+
},
28+
getResources() {
29+
throw new Error("getResources is not implemented");
30+
},
31+
generateMeta() {
32+
throw new Error("generateMeta is not implemented");
33+
},
34+
},
35+
async run() {
36+
let lastTs = this._getLastTs();
37+
38+
const resources = await this.getResources(lastTs);
39+
for (const resource of resources) {
40+
const { dateCreated } = resource;
41+
if (!lastTs || (dateCreated && new Date(dateCreated).getTime() > lastTs)) {
42+
lastTs = new Date(dateCreated).getTime();
43+
}
44+
const meta = this.generateMeta(resource);
45+
this.$emit(resource, meta);
46+
}
47+
48+
if (lastTs) {
49+
this._setLastTs(lastTs);
50+
}
51+
},
52+
};
53+
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import common from "../common/polling.mjs";
2+
3+
export default {
4+
...common,
5+
key: "airtop-new-session-created",
6+
name: "New Session Created",
7+
description: "Emit new event when a new session is created in Airtop. [See the documentation](https://docs.airtop.ai/api-reference/airtop-api/sessions/list)",
8+
version: "0.0.1",
9+
type: "source",
10+
dedupe: "unique",
11+
methods: {
12+
...common.methods,
13+
async getResources(lastTs) {
14+
const resources = [];
15+
let hasMore = true;
16+
let offset = 0;
17+
const limit = 25;
18+
const isFirstRun = !lastTs;
19+
20+
while (hasMore) {
21+
const data = await this.airtop.listSessions({
22+
params: {
23+
limit,
24+
offset,
25+
},
26+
});
27+
28+
if (!data?.sessions?.length) {
29+
break;
30+
}
31+
32+
for (const resource of data.sessions) {
33+
const isNewResource = this.isNew(resource, lastTs);
34+
if (isNewResource) {
35+
resources.push(resource);
36+
}
37+
}
38+
39+
hasMore = data.pagination?.hasMore;
40+
offset = data.pagination?.nextOffset;
41+
42+
// Stop on first run or on last page
43+
if (isFirstRun || !hasMore) {
44+
break;
45+
}
46+
}
47+
48+
return resources;
49+
},
50+
generateMeta(session) {
51+
return {
52+
id: session.id,
53+
summary: `New Session: ${session.id.slice(0, 8)}`,
54+
ts: new Date(session.dateCreated).getTime(),
55+
};
56+
},
57+
},
58+
};
59+

0 commit comments

Comments
 (0)