|
1 | | -import moment from "moment"; |
2 | 1 | import nocodb from "../../nocodb.app.mjs"; |
3 | 2 | import { DEFAULT_POLLING_SOURCE_TIMER_INTERVAL } from "@pipedream/platform"; |
4 | 3 |
|
@@ -46,59 +45,53 @@ export default { |
46 | 45 | _setLastTime(lastTime) { |
47 | 46 | this.db.set("lastTime", lastTime); |
48 | 47 | }, |
49 | | - async processEvent({ |
50 | | - params, lastTime, |
51 | | - }) { |
| 48 | + getParams(timeField) { |
| 49 | + return { |
| 50 | + sort: `-${timeField}`, |
| 51 | + }; |
| 52 | + }, |
| 53 | + async getRows(records, timeField, lastTime) { |
| 54 | + const rows = []; |
| 55 | + for await (const row of records) { |
| 56 | + if (!lastTime || Date.parse(row[timeField]) >= Date.parse(lastTime)) { |
| 57 | + rows.push(row); |
| 58 | + } else { |
| 59 | + break; |
| 60 | + } |
| 61 | + } |
| 62 | + return rows.reverse(); |
| 63 | + }, |
| 64 | + async processEvent(max) { |
52 | 65 | const timeField = this.getTimeField(); |
| 66 | + const lastTime = this._getLastTime(); |
53 | 67 |
|
54 | 68 | const records = this.nocodb.paginate({ |
55 | 69 | fn: this.nocodb.listTableRow, |
56 | 70 | args: { |
57 | 71 | tableId: this.tableId.value, |
58 | | - params, |
| 72 | + params: this.getParams(timeField), |
59 | 73 | }, |
| 74 | + max, |
60 | 75 | }); |
61 | 76 |
|
62 | | - for await (const record of records) { |
63 | | - if (moment(record[timeField]).isAfter(lastTime)) this._setLastTime(record[timeField]); |
64 | | - this.$emit(record, this.getDataToEmit(record)); |
| 77 | + const rows = await this.getRows(records, timeField, lastTime); |
| 78 | + |
| 79 | + if (!rows.length) { |
| 80 | + return; |
65 | 81 | } |
| 82 | + |
| 83 | + this._setLastTime(rows[rows.length - 1][timeField]); |
| 84 | + |
| 85 | + rows.forEach((row) => this.$emit(row, this.getDataToEmit(row))); |
66 | 86 | }, |
67 | 87 | }, |
68 | 88 | hooks: { |
69 | | - async activate() { |
70 | | - const timeField = this.getTimeField(); |
71 | | - const lastTime = this._getLastTime(); |
72 | | - const { list } = await this.nocodb.listTableRow({ |
73 | | - tableId: this.tableId.value, |
74 | | - params: { |
75 | | - sort: `-${timeField}`, |
76 | | - }, |
77 | | - }); |
78 | | - |
79 | | - list.reverse(); |
80 | | - |
81 | | - for (const row of list) { |
82 | | - if (!lastTime || moment(lastTime).isAfter(row[timeField])) { |
83 | | - this._setLastTime(row[timeField]); |
84 | | - } |
85 | | - this.$emit(row, this.getDataToEmit(row)); |
86 | | - } |
| 89 | + async deploy() { |
| 90 | + await this.processEvent(25); |
87 | 91 | }, |
88 | 92 | }, |
89 | 93 | async run() { |
90 | | - const timeField = this.getTimeField(); |
91 | | - const lastTime = this._getLastTime(); |
92 | | - const params = { |
93 | | - sort: timeField, |
94 | | - }; |
95 | | - // moment is necessary because nocodb query doesn't filter equal datetime in 'greater than' |
96 | | - if (lastTime) params.where = `(${timeField},gte,${moment(lastTime).add(1, "ms") |
97 | | - .toISOString()})`; |
98 | | - return this.processEvent({ |
99 | | - params, |
100 | | - lastTime, |
101 | | - }); |
| 94 | + await this.processEvent(); |
102 | 95 | }, |
103 | 96 | }; |
104 | 97 |
|
0 commit comments