Skip to content

Commit c9227d1

Browse files
committed
New changeset parsing pipeline
1 parent 7dd8680 commit c9227d1

File tree

2 files changed

+210
-0
lines changed

2 files changed

+210
-0
lines changed

pipeline/cli.js

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
#!/usr/bin/env node
2+
3+
const { program } = require('commander');
4+
const axios = require('axios');
5+
const zlib = require('zlib');
6+
const sax = require('sax');
7+
const changesetParser = require("real-changesets-parser");
8+
const fs = require('fs');
9+
const readline = require('readline');
10+
11+
const dataPath = process.env.DATA_PATH || 'data';
12+
13+
/**
14+
* Runs the process to retrieve and process changesets for a given date and hour.
15+
* @param {string} date - The date in the format 'YYYY-MM-DD'.
16+
* @param {number} hour - The hour of the day (0-23).
17+
* @returns {void}
18+
*/
19+
async function run(date, hour) {
20+
let url = getHourlyReplicationFileURL(date, hour);
21+
let changesets = await getChangesetIDs(url);
22+
// changesets = changesets.slice(0, 2);
23+
// console.log(changesets);
24+
processChangesets(changesets, date, hour);
25+
}
26+
27+
/**
28+
* Generates the URL for the hourly replication file based on the given date and hour.
29+
* @param {string} date - The date in the format 'YYYY-MM-DD'.
30+
* @param {number} hour - The hour of the day (0-23).
31+
* @returns {string} The URL for the hourly replication file.
32+
*/
33+
function getHourlyReplicationFileURL(date, hour) {
34+
// Add a leading zero to the hour if it is a single digit
35+
hour = hour.toString().padStart(2, '0');
36+
let startDate = new Date(`${date}T${hour}:00:00Z`);
37+
console.log(`Processing an hour of data starting from ${startDate}`);
38+
// Calculate the sequence number for the hourly replication file from the date
39+
let timestamp = startDate.getTime() / 1000;
40+
let sequenceNumber = timestamp/(60*60) - 374287;
41+
console.log(`Processing hourly replication sequence number ${sequenceNumber}`);
42+
43+
// Break the sequence number into 3 chunks
44+
let sequenceChunks = sequenceNumber.toString().padStart(9, '0').match(/(\d{3})(\d{3})(\d{3})/);
45+
let sequenceChunksFormatted = sequenceChunks.slice(1).join('/');
46+
47+
let url = `https://planet.openstreetmap.org/replication/hour/${sequenceChunksFormatted}.osc.gz`;
48+
return url;
49+
};
50+
51+
/**
52+
* Retrieves the unique changeset IDs from the hourly replication file.
53+
* @param {string} replicationFileURL - The URL of the hourly replication file.
54+
* @returns {Promise<string[]>} A promise that resolves to an array of unique changeset IDs.
55+
*/
56+
async function getChangesetIDs(replicationFileURL) {
57+
return new Promise((resolve, reject) => {
58+
// Process the XML file as a stream to avoid loading the entire file into memory
59+
// Create a SAX parser
60+
const parser = sax.createStream(true);
61+
62+
// Extract unique changeset ids
63+
const changesets = new Set();
64+
65+
// Handle the opening tag of an element
66+
parser.on('opentag', (node) => {
67+
if (node.name === 'node' || node.name === 'way' || node.name === 'relation') {
68+
const changeset = node.attributes.changeset;
69+
if (changeset) {
70+
changesets.add(changeset);
71+
}
72+
}
73+
});
74+
75+
// Handle the end of the XML document
76+
parser.on('end', () => {
77+
console.log(`Found ${changesets.size} unique changesets in the hourly replication file`);
78+
resolve(Array.from(changesets));
79+
});
80+
81+
// Handle errors
82+
parser.on('error', (error) => {
83+
reject(error);
84+
});
85+
86+
// Download the gz file and unzip it
87+
console.log(`Downloading and processing ${replicationFileURL}`);
88+
axios.get(replicationFileURL, { responseType: 'stream' })
89+
.then(response => {
90+
response.data
91+
.pipe(zlib.createGunzip())
92+
.pipe(parser);
93+
})
94+
.catch(error => {
95+
reject(error);
96+
});
97+
});
98+
}
99+
100+
/**
101+
* Processes an array of changesets asynchronously.
102+
* @param {string[]} changesets - An array of changeset IDs.
103+
* @param {string} date - The date in the format 'YYYY-MM-DD'.
104+
* @param {number} hour - The hour of the day (0-23).
105+
* @returns {void}
106+
*/
107+
async function processChangesets(changesets, date, hour) {
108+
// Array to store the file paths of the processed changesets
109+
const results = [];
110+
111+
// Process each changeset asynchronously
112+
await Promise.all(changesets.map(async (changeset) => {
113+
// Process the changeset and get the result
114+
const result = await processChangeset(changeset);
115+
results.push(result);
116+
}));
117+
118+
// Combine the processed changesets into a single file
119+
combineResults(results, date, hour);
120+
}
121+
122+
/**
123+
* Processes a single changeset and returns the file path of the processed changeset.
124+
* @param {string} changeset - The changeset ID.
125+
* @returns {Promise<string>} A promise that resolves to the file path of the processed changeset.
126+
*/
127+
async function processChangeset(changeset) {
128+
// Process the changeset asynchronously and return the result
129+
const url = `https://real-changesets.s3.amazonaws.com/${changeset}.json`;
130+
try {
131+
const response = await axios.get(url);
132+
const data = response.data;
133+
const geojson = changesetParser(data);
134+
const features = geojson.features;
135+
const filePath = `${dataPath}/${changeset}_features.json`;
136+
await fs.writeFile(filePath, '', (error) => {
137+
if (error) {
138+
console.error(`Error writing to file: ${error}`);
139+
}
140+
});
141+
features.forEach(async (feature) => {
142+
const featureString = JSON.stringify(feature);
143+
144+
await fs.appendFile(filePath, `${featureString}\n`, (error) => {
145+
if (error) {
146+
console.error(`Error writing feature to file: ${error}`);
147+
}
148+
});
149+
});
150+
return filePath;
151+
} catch (error) {
152+
console.error(`Error processing changeset ${changeset}: ${error}`);
153+
}
154+
}
155+
156+
/**
157+
* Combines the results of processed changesets into a single file.
158+
* @param {string[]} results - An array of file paths of processed changesets.
159+
* @param {string} date - The date in the format 'YYYY-MM-DD'.
160+
* @param {number} hour - The hour of the day (0-23).
161+
* @returns {void}
162+
*/
163+
async function combineResults(results, date, hour) {
164+
console.log(`Combining results from ${results.length} changesets`);
165+
const outputStream = fs.createWriteStream(`${dataPath}/${date}T${hour}:00.geojson`);
166+
167+
outputStream.write('{"type":"FeatureCollection","features":[');
168+
169+
let divider = '';
170+
171+
for (let i = 0; i < results.length; i++) {
172+
const filePath = results[i];
173+
const inputStream = fs.createReadStream(filePath);
174+
175+
const rl = readline.createInterface({
176+
input: inputStream,
177+
output: process.stdout,
178+
terminal: false
179+
});
180+
181+
rl.on('line', (line) => {
182+
outputStream.write(divider);
183+
divider = ',';
184+
outputStream.write(line);
185+
});
186+
187+
await new Promise((resolve, reject) => {
188+
rl.on('close', () => {
189+
resolve();
190+
});
191+
192+
rl.on('error', (error) => {
193+
reject(error);
194+
});
195+
});
196+
}
197+
198+
outputStream.write(']}');
199+
outputStream.end();
200+
}
201+
202+
program
203+
.command('process-hour <date> <hour>')
204+
.description('Process an hour of changesets starting from a given date and hour (in UTC) and combine the changed features into a single GeoJSON file.')
205+
.action(run);
206+
207+
program.parseAsync(process.argv);

pipeline/package.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@
1010
"license": "ISC",
1111
"dependencies": {
1212
"axios": "^1.5.0",
13+
"commander": "12.0.0",
1314
"osm-adiff-parser": "^1.1.0",
1415
"real-changesets-parser": "https://github.com/developmentseed/real-changesets-parser.git",
16+
"sax": "^1.3.0",
1517
"serve": "^14.2.1"
18+
1619
},
1720
"devDependencies": {
1821
"geojson-validation": "^1.0.2"

0 commit comments

Comments
 (0)