-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtask.ts
More file actions
122 lines (108 loc) · 4.55 KB
/
task.ts
File metadata and controls
122 lines (108 loc) · 4.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import { Feature } from '@tak-ps/node-cot'
import { Static, Type, TSchema } from '@sinclair/typebox';
import ETL, { Event, SchemaType, handler as internal, local, DataFlowType, InvocationType } from '@tak-ps/etl';
const InputSchema = Type.Object({
'COTRIP_TOKEN': Type.String({ description: 'API Token for CoTrip' }),
'Point Geometries': Type.Boolean({ description: 'Allow point geometries', default: true }),
'LineString Geometries': Type.Boolean({ description: 'Allow LineString geometries', default: true }),
'Polygon Geometries': Type.Boolean({ description: 'Allow Polygon Geometries', default: true }),
'DEBUG': Type.Boolean({ description: 'Print GeoJSON Features in logs', default: false })
});
export default class Task extends ETL {
static name = 'etl-cotrip-signs';
static flow = [ DataFlowType.Incoming ];
static invocation = [ InvocationType.Schedule ];
async schema(
type: SchemaType = SchemaType.Input,
flow: DataFlowType = DataFlowType.Incoming
): Promise<TSchema> {
if (flow === DataFlowType.Incoming) {
if (type === SchemaType.Input) {
return InputSchema;
} else {
return Type.Object({
communicationStatus: Type.String(),
marker: Type.Number(),
messageText: Type.String(),
direction: Type.String(),
lastUpdated: Type.String(),
messagePreview: Type.String(),
displayStatus: Type.String(),
name: Type.String(),
id: Type.String(),
speed: Type.Number(),
routeName: Type.String(),
messageMarkup: Type.String(),
publicName: Type.String(),
submittedBy: Type.String(),
nativeId: Type.String(),
activationTime: Type.String(),
});
}
} else {
return Type.Object({});
}
}
async control() {
const env = await this.env(InputSchema);
const api = 'https://data.cotrip.org/';
if (!env.COTRIP_TOKEN) throw new Error('No COTrip API Token Provided');
const token = env.COTRIP_TOKEN;
const signs = [];
let batch = -1;
let res;
do {
console.log(`ok - fetching ${++batch} of signs`);
const url = new URL('/api/v1/signs', api);
url.searchParams.append('apiKey', String(token));
if (res) {
const nextOffset = res.headers.get('next-offset');
if (nextOffset) url.searchParams.append('offset', nextOffset);
}
res = await fetch(url);
signs.push(...(await res.json()).features);
} while (res.headers.has('next-offset') && res.headers.get('next-offset') !== 'None');
console.log(`ok - fetched ${signs.length} signs`);
const features = [];
for (const feature of signs.map((sign) => {
console.error(sign)
return {
id: sign.properties.id,
type: 'Feature',
properties: {
},
geometry: sign.geometry
};
})) {
if (feature.geometry.type.startsWith('Multi')) {
const feat = JSON.stringify(feature);
const type = feature.geometry.type.replace('Multi', '');
let i = 0;
for (const coordinates of feature.geometry.coordinates) {
const new_feat = JSON.parse(feat);
new_feat.geometry = { type, coordinates };
new_feat.id = new_feat.id + '-' + i;
features.push(new_feat);
++i;
}
} else {
features.push(feature);
}
}
const allowed: string[] = [];
if (env['Point Geometries']) allowed.push('Point');
if (env['LineString Geometries']) allowed.push('LineString');
if (env['Polygon Geometries']) allowed.push('Polygon');
const fc: Static<typeof Feature.InputFeatureCollection> = {
type: 'FeatureCollection',
features: features.filter((feat) => {
return allowed.includes(feat.geometry.type);
})
};
await this.submit(fc);
}
}
await local(await Task.init(import.meta.url), import.meta.url);
export async function handler(event: Event = {}) {
return await internal(await Task.init(import.meta.url), event);
}