-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtask.ts
More file actions
105 lines (90 loc) · 3.72 KB
/
task.ts
File metadata and controls
105 lines (90 loc) · 3.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import ETL, { Event, SchemaType, handler as internal, local, DataFlowType, InvocationType } from '@tak-ps/etl';
import { Feature } from '@tak-ps/node-cot'
import { Static, Type, TSchema } from '@sinclair/typebox';
const InputSchema = Type.Object({
'COTRIP_TOKEN': Type.String({ description: 'API Token for CoTrip' }),
'DEBUG': Type.Boolean({ description: 'Print GeoJSON results in logs', default: false })
});
export default class Task extends ETL {
static name = 'etl-cotrip-weather';
static flow = [ DataFlowType.Incoming ];
static invocation = [ InvocationType.Schedule ];
async schema(
type: SchemaType = SchemaType.Input,
flow: DataFlowType = DataFlowType.Incoming
): Promise<TSchema> {
if (flow === DataFlowType.Incoming) {
if (type === SchemaType.Input) {
return InputSchema;
} else {
return Type.Object({
publicName: Type.String(),
direction: Type.String(),
nativeId: Type.String(),
communicationStatus: Type.String(),
marker: Type.String(),
routeName: Type.String(),
id: Type.String(),
lastUpdated: Type.String({ format: 'date-time' }),
name: Type.String(),
});
}
} else {
return Type.Object({});
}
}
async control() {
const env = await this.env(InputSchema);
const api = 'https://data.cotrip.org/';
if (!env.COTRIP_TOKEN) throw new Error('No COTrip API Token Provided');
const stations = [];
let batch = -1;
let res;
do {
console.log(`ok - fetching ${++batch} of weather stations`);
const url = new URL('/api/v1/weatherStations', api);
url.searchParams.append('apiKey', String(env.COTRIP_TOKEN));
if (res) {
const nextOffset = res.headers.get('next-offset');
if (nextOffset) url.searchParams.append('offset', nextOffset);
}
res = await fetch(url);
stations.push(...(await res.json()).features);
} while (res.headers.has('next-offset') && res.headers.get('next-offset') !== 'None');
console.log(`ok - fetched ${stations.length} stations`);
const features = [];
for (const feature of stations.map((station) => {
station.id = station.properties.id;
station.properties = {
metadata: station.properties
};
station.properties.callsign = station.properties.metadata.type;
station.properties.type = 'a-f-G';
return station;
})) {
if (feature.geometry.type.startsWith('Multi')) {
const feat = JSON.stringify(feature);
const type = feature.geometry.type.replace('Multi', '');
let i = 0;
for (const coordinates of feature.geometry.coordinates) {
const new_feat = JSON.parse(feat);
new_feat.geometry = { type, coordinates };
new_feat.id = new_feat.id + '-' + i;
features.push(new_feat);
++i;
}
} else {
features.push(feature);
}
}
const fc: Static<typeof Feature.InputFeatureCollection> = {
type: 'FeatureCollection',
features: features
};
await this.submit(fc);
}
}
await local(await Task.init(import.meta.url), import.meta.url);
export async function handler(event: Event = {}) {
return await internal(await Task.init(import.meta.url), event);
}