Skip to content

Commit 7bdb2e2

Browse files
committed
Add checks to handle malformed data
Some examples of Malformed data: - invalid coordinates like Infinity, null - invalid undefined features - changesets with empty list of features
1 parent c5a0361 commit 7bdb2e2

File tree

3 files changed

+68
-24
lines changed

3 files changed

+68
-24
lines changed

pipeline/cli.js

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
const { program } = require('commander');
44

55
const { getHourlyReplicationFileURL, getChangesetIDs } = require('./src/utils');
6-
const { processChangesets } = require('./src/process');
6+
const { processChangesets, processChangeset } = require('./src/process');
77

88
/**
99
* Runs the process to retrieve and process changesets for a given date and hour.
@@ -19,9 +19,18 @@ async function run(date, hour) {
1919
processChangesets(changesets, date, hour);
2020
}
2121

22+
async function processSingleChangeset(changeset) {
23+
await processChangeset(changeset);
24+
}
25+
2226
program
2327
.command('process-hour <date> <hour>')
2428
.description('Process an hour of changesets starting from a given date and hour (in UTC) and combine the changed features into a single GeoJSON file.')
2529
.action(run);
2630

31+
program
32+
.command('process-changeset <changeset>')
33+
.description('Process a single changeset and save the features to a JSON file - for debugging purposes.')
34+
.action(processSingleChangeset);
35+
2736
program.parseAsync(process.argv);

pipeline/src/process.js

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const fs = require('fs');
44
const readline = require('readline');
55

66
const config = require('./config');
7+
const { containsInvalidCoordinate } = require('./utils');
78

89
/**
910
* Processes an array of changesets asynchronously.
@@ -16,12 +17,16 @@ async function processChangesets(changesets, date, hour) {
1617
// Array to store the file paths of the processed changesets
1718
const results = [];
1819

19-
// Process each changeset asynchronously
20-
await Promise.all(changesets.map(async (changeset) => {
21-
// Process the changeset and get the result
22-
const result = await processChangeset(changeset);
23-
results.push(result);
24-
}));
20+
// Process changesets in batches of 10
21+
const batchSize = 100;
22+
for (let i = 0; i < changesets.length; i += batchSize) {
23+
const batch = changesets.slice(i, i + batchSize);
24+
await Promise.all(batch.map(async (changeset) => {
25+
// Process the changeset and get the result
26+
const result = await processChangeset(changeset);
27+
results.push(result);
28+
}));
29+
}
2530

2631
// Combine the processed changesets into a single file
2732
combineResults(results, date, hour);
@@ -35,26 +40,38 @@ async function processChangesets(changesets, date, hour) {
3540
async function processChangeset(changeset) {
3641
// Process the changeset asynchronously and return the result
3742
const url = `https://real-changesets.s3.amazonaws.com/${changeset}.json`;
43+
// console.log(`Processing changeset ${changeset}`);
3844
try {
3945
const response = await axios.get(url);
4046
const data = response.data;
4147
const geojson = changesetParser(data);
48+
// console.log(`geojson: ${JSON.stringify(geojson)}`);
4249
const features = geojson.features;
50+
51+
if (features.length === 0) {
52+
console.log(`No features found in changeset ${changeset}`);
53+
return;
54+
}
4355
const filePath = `${config.DATA_PATH}/${changeset}_features.json`;
44-
await fs.writeFile(filePath, '', (error) => {
45-
if (error) {
46-
console.error(`Error writing to file: ${error}`);
47-
}
48-
});
49-
features.forEach(async (feature) => {
50-
const featureString = JSON.stringify(feature);
51-
52-
await fs.appendFile(filePath, `${featureString}\n`, (error) => {
53-
if (error) {
54-
console.error(`Error writing feature to file: ${error}`);
56+
57+
await Promise.all(features.map(async (feature) => {
58+
if (feature !== null && feature !== undefined) {
59+
if (containsInvalidCoordinate(feature.geometry.coordinates)) {
60+
console.log(`Dropping invalid feature ${feature.properties.id} in changeset ${changeset}`);
61+
console.log(`Feature geometry containing invalid co-ordinates: ${JSON.stringify(feature.geometry)}`);
62+
return;
5563
}
56-
});
57-
});
64+
const featureString = JSON.stringify(feature);
65+
66+
await fs.appendFile(filePath, `${featureString}\n`, (error) => {
67+
if (error) {
68+
console.error(`Error writing feature to file: ${error}`);
69+
}
70+
});
71+
} else {
72+
console.log(`undefined feature skipped in changeset ${changeset}`)
73+
}
74+
}));
5875
return filePath;
5976
} catch (error) {
6077
console.error(`Error processing changeset ${changeset}: ${error}`);
@@ -78,6 +95,9 @@ async function combineResults(results, date, hour) {
7895

7996
for (let i = 0; i < results.length; i++) {
8097
const filePath = results[i];
98+
if (!filePath) {
99+
continue;
100+
}
81101
const inputStream = fs.createReadStream(filePath);
82102

83103
const rl = readline.createInterface({
@@ -88,7 +108,7 @@ async function combineResults(results, date, hour) {
88108

89109
rl.on('line', (line) => {
90110
outputStream.write(divider);
91-
divider = ',';
111+
divider = ',\n';
92112
outputStream.write(line);
93113
});
94114

@@ -98,7 +118,7 @@ async function combineResults(results, date, hour) {
98118
});
99119

100120
rl.on('error', (error) => {
101-
reject(error);
121+
console.error(`Error reading file: ${error}`);
102122
});
103123
});
104124
}
@@ -108,4 +128,4 @@ async function combineResults(results, date, hour) {
108128
console.log(`Combined results written to ${outputStream.path}`);
109129
}
110130

111-
module.exports = { processChangesets };
131+
module.exports = { processChangesets, processChangeset };

pipeline/src/utils.js

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,19 @@ async function getChangesetIDs(replicationFileURL) {
7676
});
7777
}
7878

79-
module.exports = { getHourlyReplicationFileURL, getChangesetIDs };
79+
/**
80+
* Checks if the given coordinates are valid.
81+
* @param {number|number[]} coordinates - The coordinates to check.
82+
* @returns {boolean} true if the coordinates are invalid, false otherwise.
83+
*/
84+
function containsInvalidCoordinate(coordinates) {
85+
if (coordinates === null || coordinates === undefined || coordinates === Infinity || coordinates === -Infinity) {
86+
return true;
87+
} else if (Array.isArray(coordinates)) {
88+
return coordinates.some(containsInvalidCoordinate);
89+
} else {
90+
return false;
91+
}
92+
}
93+
94+
module.exports = { getHourlyReplicationFileURL, getChangesetIDs, containsInvalidCoordinate };

0 commit comments

Comments
 (0)