-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtask.ts
More file actions
141 lines (124 loc) · 5.86 KB
/
task.ts
File metadata and controls
141 lines (124 loc) · 5.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import moment from 'moment-timezone';
import { Feature } from '@tak-ps/node-cot'
import type { Feature as GeoJSONFeature } from 'geojson';
import { Static, Type, TSchema } from '@sinclair/typebox';
import ETL, { Event, SchemaType, handler as internal, local, DataFlowType, InvocationType } from '@tak-ps/etl';
const InputSchema = Type.Object({
'COTRIP_TOKEN': Type.String({ description: 'API Token for CoTrip' }),
'Point Geometries': Type.Boolean({ description: 'Allow point geometries', default: true }),
'LineString Geometries': Type.Boolean({ description: 'Allow LineString geometries', default: true }),
'Polygon Geometries': Type.Boolean({ description: 'Allow Polygon Geometries', default: true }),
'DEBUG': Type.Boolean({ description: 'Print GeoJSON Features in logs', default: false, })
});
export default class Task extends ETL {
static name = 'etl-cotrip-incidents';
static flow = [ DataFlowType.Incoming ];
static invocation = [ InvocationType.Schedule ];
async schema(
type: SchemaType = SchemaType.Input,
flow: DataFlowType = DataFlowType.Incoming
): Promise<TSchema> {
if (flow === DataFlowType.Incoming) {
if (type === SchemaType.Input) {
return InputSchema;
} else {
return Type.Object({
incident_type: Type.String(),
status: Type.String(),
direction: Type.Number(),
routeName: Type.String(),
severity: Type.String(),
responseLevel: Type.String(),
category: Type.String(),
startTime: Type.String(),
startMarker: Type.Optional(Type.Number()),
endMarker: Type.Optional(Type.Number()),
lastUpdated: Type.String(),
travelerInformationMessage: Type.String(),
});
}
} else {
return Type.Object({});
}
}
async control() {
const env = await this.env(InputSchema);
const api = 'https://data.cotrip.org/';
if (!env.COTRIP_TOKEN) throw new Error('No COTrip API Token Provided');
const token = env.COTRIP_TOKEN;
const incidents: GeoJSONFeature[] = [];
let batch = -1;
let res;
do {
console.log(`ok - fetching ${++batch} of incidents`);
const url = new URL('/api/v1/incidents', api);
url.searchParams.append('apiKey', String(token));
if (res) {
const nextOffset = res.headers.get('next-offset');
if (nextOffset) url.searchParams.append('offset', nextOffset);
}
res = await fetch(url);
incidents.push(...(await res.json()).features);
} while (res.headers.has('next-offset') && res.headers.get('next-offset') !== 'None');
console.log(`ok - fetched ${incidents.length} incidents`);
const features: Static<typeof Feature.InputFeature>[] = [];
for (const feature of incidents.map((incident) => {
if (!incident.properties) return null;
return {
id: incident.properties.id,
type: 'Feature',
properties: {
remarks: incident.properties.travelerInformationMessage,
callsign: incident.properties.type,
type: 'a-f-G',
metadata: {
incident_type: incident.properties.type,
status: incident.properties.status,
direction: incident.properties.direction,
routeName: incident.properties.routeName,
severity: incident.properties.severity,
responseLevel: incident.properties.responseLevel,
category: incident.properties.category,
startMarker: incident.properties.startMarker,
endMarker: incident.properties.endMarker,
startTime: moment(incident.properties.startTime).tz('America/Denver').format('YYYY-MM-DD HH:mm z'),
lastUpdated: moment(incident.properties.lastUpdated).tz('America/Denver').format('YYYY-MM-DD HH:mm z'),
travelerInformationMessage: incident.properties.travelerInformationMessage
}
},
geometry: incident.geometry
} as Static<typeof Feature.InputFeature>;
})) {
if (!feature) continue;
if (feature.geometry.type.startsWith('Multi')) {
const feat = JSON.stringify(feature);
const type = feature.geometry.type.replace('Multi', '');
let i = 0;
for (const coordinates of feature.geometry.coordinates) {
const new_feat = JSON.parse(feat);
new_feat.geometry = { type, coordinates };
new_feat.id = new_feat.id + '-' + i;
features.push(new_feat);
++i;
}
} else {
features.push(feature);
}
}
const allowed: string[] = [];
if (env['Point Geometries']) allowed.push('Point');
if (env['LineString Geometries']) allowed.push('LineString');
if (env['Polygon Geometries']) allowed.push('Polygon');
const fc: Static<typeof Feature.InputFeatureCollection> = {
type: 'FeatureCollection',
features: features.filter((feat) => {
return allowed.includes(feat.geometry.type);
})
};
await this.submit(fc);
}
}
await local(await Task.init(import.meta.url), import.meta.url);
export async function handler(event: Event = {}) {
return await internal(await Task.init(import.meta.url), event);
}