Skip to content

Commit d6d151f

Browse files
committed
clean up and add pipeline
1 parent c9227d1 commit d6d151f

File tree

12 files changed

+281
-969
lines changed

12 files changed

+281
-969
lines changed

.github/workflows/pipeline.yaml

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
name: Hourly Pipeline
2+
3+
on:
4+
workflow_dispatch:
5+
push:
6+
branches:
7+
- new-pipeline
8+
# schedule:
9+
# - cron: "0 * * * *"
10+
11+
12+
jobs:
13+
build:
14+
runs-on: ubuntu-latest
15+
16+
steps:
17+
- name: Checkout code
18+
uses: actions/checkout@v2
19+
20+
- name: Build docker image
21+
run: docker build -t gradient-pipeline ./pipeline
22+
23+
- name: Run the pipeline
24+
run: |
25+
current_date=$(date +%Y-%m-%d)
26+
current_hour=$(date +%H)
27+
hour_minus_two=$((current_hour - 2))
28+
if [ $hour_minus_two -lt 0 ]; then
29+
current_date=$(date -d "-1 day" +%Y-%m-%d)
30+
hour_minus_two=$((current_hour + 22))
31+
fi
32+
docker run -it -v ./data:/tmp gradient-pipeline sh -c "node cli.js process-hour $current_date $hour_minus_two"
33+
if [ $? -eq 0 ]; then
34+
# Add a leading zero if the hour is less than 10
35+
[[ $hour_minus_two -lt 10 && $hour_minus_two -ge 0 ]] && hour_minus_two="0$hour_minus_two"
36+
file_name="${current_date}T${hour_minus_two}"
37+
docker run -it -v ./data:/tmp gradient-pipeline sh -c "ogr2ogr -f FlatGeobuf /tmp/${file_name}.fgb /tmp/${file_name}.geojson -skipfailures"
38+
else
39+
echo "Previous command failed"
40+
exit 1
41+
fi
42+
43+
- name: Check if the pipeline ran successfully
44+
run: ls -lh ./data
45+
46+
# - name: Upload the FGB to Cloud Storage
47+
48+
49+

.gitignore

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,8 @@ node_modules/
22
.yarn
33
out
44
package-lock.json
5-
.pnp*
5+
.pnp*
6+
7+
pipeline/node_modules
8+
pipeline/.yarn
9+
pipeline/data

pipeline/Dockerfile

Lines changed: 9 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,15 @@
1-
# Start with an official Node image
2-
FROM node:18
1+
FROM ghcr.io/osgeo/gdal:alpine-small-latest
32

4-
# Update the system and install necessary dependencies
5-
RUN apt-get update && \
6-
apt-get install -y parallel jq && \
7-
apt-get clean && \
8-
rm -rf /var/lib/apt/lists/*
3+
RUN apk add --no-cache nodejs yarn git
94

10-
# Create an app directory to hold the application code inside the image
11-
WORKDIR /usr/src/app
5+
WORKDIR /app
126

13-
# Copy your package.json and package-lock.json (if you have one) into the container
14-
COPY package*.json ./
7+
COPY package.json ./
8+
COPY yarn.lock ./
159

16-
# Install your Node dependencies
17-
RUN npm install
10+
RUN yarn install
1811

1912
# Copy your Node scripts into the container
20-
COPY fetchOsc.js ./
21-
COPY parser.js ./
22-
23-
# The main script to run the tasks
24-
COPY process.sh ./
25-
26-
# Install geojson-merge
27-
RUN npm install -g geojson-merge
28-
29-
# Give execute permissions to the script
30-
RUN chmod +x process.sh
31-
32-
# The command to run when the container starts
33-
ENTRYPOINT [ "./process.sh" ]
13+
COPY src/ ./src/
14+
COPY cli.js ./
15+
RUN chmod +x cli.js

pipeline/cli.js

Lines changed: 4 additions & 184 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
#!/usr/bin/env node
22

33
const { program } = require('commander');
4-
const axios = require('axios');
5-
const zlib = require('zlib');
6-
const sax = require('sax');
7-
const changesetParser = require("real-changesets-parser");
8-
const fs = require('fs');
9-
const readline = require('readline');
104

11-
const dataPath = process.env.DATA_PATH || 'data';
5+
const { getHourlyReplicationFileURL, getChangesetIDs } = require('./src/utils');
6+
const { processChangesets } = require('./src/process');
127

138
/**
149
* Runs the process to retrieve and process changesets for a given date and hour.
@@ -19,186 +14,11 @@ const dataPath = process.env.DATA_PATH || 'data';
1914
async function run(date, hour) {
2015
let url = getHourlyReplicationFileURL(date, hour);
2116
let changesets = await getChangesetIDs(url);
22-
// changesets = changesets.slice(0, 2);
23-
// console.log(changesets);
17+
changesets = changesets.slice(0, 2);
18+
console.log(changesets);
2419
processChangesets(changesets, date, hour);
2520
}
2621

27-
/**
28-
* Generates the URL for the hourly replication file based on the given date and hour.
29-
* @param {string} date - The date in the format 'YYYY-MM-DD'.
30-
* @param {number} hour - The hour of the day (0-23).
31-
* @returns {string} The URL for the hourly replication file.
32-
*/
33-
function getHourlyReplicationFileURL(date, hour) {
34-
// Add a leading zero to the hour if it is a single digit
35-
hour = hour.toString().padStart(2, '0');
36-
let startDate = new Date(`${date}T${hour}:00:00Z`);
37-
console.log(`Processing an hour of data starting from ${startDate}`);
38-
// Calculate the sequence number for the hourly replication file from the date
39-
let timestamp = startDate.getTime() / 1000;
40-
let sequenceNumber = timestamp/(60*60) - 374287;
41-
console.log(`Processing hourly replication sequence number ${sequenceNumber}`);
42-
43-
// Break the sequence number into 3 chunks
44-
let sequenceChunks = sequenceNumber.toString().padStart(9, '0').match(/(\d{3})(\d{3})(\d{3})/);
45-
let sequenceChunksFormatted = sequenceChunks.slice(1).join('/');
46-
47-
let url = `https://planet.openstreetmap.org/replication/hour/${sequenceChunksFormatted}.osc.gz`;
48-
return url;
49-
};
50-
51-
/**
52-
* Retrieves the unique changeset IDs from the hourly replication file.
53-
* @param {string} replicationFileURL - The URL of the hourly replication file.
54-
* @returns {Promise<string[]>} A promise that resolves to an array of unique changeset IDs.
55-
*/
56-
async function getChangesetIDs(replicationFileURL) {
57-
return new Promise((resolve, reject) => {
58-
// Process the XML file as a stream to avoid loading the entire file into memory
59-
// Create a SAX parser
60-
const parser = sax.createStream(true);
61-
62-
// Extract unique changeset ids
63-
const changesets = new Set();
64-
65-
// Handle the opening tag of an element
66-
parser.on('opentag', (node) => {
67-
if (node.name === 'node' || node.name === 'way' || node.name === 'relation') {
68-
const changeset = node.attributes.changeset;
69-
if (changeset) {
70-
changesets.add(changeset);
71-
}
72-
}
73-
});
74-
75-
// Handle the end of the XML document
76-
parser.on('end', () => {
77-
console.log(`Found ${changesets.size} unique changesets in the hourly replication file`);
78-
resolve(Array.from(changesets));
79-
});
80-
81-
// Handle errors
82-
parser.on('error', (error) => {
83-
reject(error);
84-
});
85-
86-
// Download the gz file and unzip it
87-
console.log(`Downloading and processing ${replicationFileURL}`);
88-
axios.get(replicationFileURL, { responseType: 'stream' })
89-
.then(response => {
90-
response.data
91-
.pipe(zlib.createGunzip())
92-
.pipe(parser);
93-
})
94-
.catch(error => {
95-
reject(error);
96-
});
97-
});
98-
}
99-
100-
/**
101-
* Processes an array of changesets asynchronously.
102-
* @param {string[]} changesets - An array of changeset IDs.
103-
* @param {string} date - The date in the format 'YYYY-MM-DD'.
104-
* @param {number} hour - The hour of the day (0-23).
105-
* @returns {void}
106-
*/
107-
async function processChangesets(changesets, date, hour) {
108-
// Array to store the file paths of the processed changesets
109-
const results = [];
110-
111-
// Process each changeset asynchronously
112-
await Promise.all(changesets.map(async (changeset) => {
113-
// Process the changeset and get the result
114-
const result = await processChangeset(changeset);
115-
results.push(result);
116-
}));
117-
118-
// Combine the processed changesets into a single file
119-
combineResults(results, date, hour);
120-
}
121-
122-
/**
123-
* Processes a single changeset and returns the file path of the processed changeset.
124-
* @param {string} changeset - The changeset ID.
125-
* @returns {Promise<string>} A promise that resolves to the file path of the processed changeset.
126-
*/
127-
async function processChangeset(changeset) {
128-
// Process the changeset asynchronously and return the result
129-
const url = `https://real-changesets.s3.amazonaws.com/${changeset}.json`;
130-
try {
131-
const response = await axios.get(url);
132-
const data = response.data;
133-
const geojson = changesetParser(data);
134-
const features = geojson.features;
135-
const filePath = `${dataPath}/${changeset}_features.json`;
136-
await fs.writeFile(filePath, '', (error) => {
137-
if (error) {
138-
console.error(`Error writing to file: ${error}`);
139-
}
140-
});
141-
features.forEach(async (feature) => {
142-
const featureString = JSON.stringify(feature);
143-
144-
await fs.appendFile(filePath, `${featureString}\n`, (error) => {
145-
if (error) {
146-
console.error(`Error writing feature to file: ${error}`);
147-
}
148-
});
149-
});
150-
return filePath;
151-
} catch (error) {
152-
console.error(`Error processing changeset ${changeset}: ${error}`);
153-
}
154-
}
155-
156-
/**
157-
* Combines the results of processed changesets into a single file.
158-
* @param {string[]} results - An array of file paths of processed changesets.
159-
* @param {string} date - The date in the format 'YYYY-MM-DD'.
160-
* @param {number} hour - The hour of the day (0-23).
161-
* @returns {void}
162-
*/
163-
async function combineResults(results, date, hour) {
164-
console.log(`Combining results from ${results.length} changesets`);
165-
const outputStream = fs.createWriteStream(`${dataPath}/${date}T${hour}:00.geojson`);
166-
167-
outputStream.write('{"type":"FeatureCollection","features":[');
168-
169-
let divider = '';
170-
171-
for (let i = 0; i < results.length; i++) {
172-
const filePath = results[i];
173-
const inputStream = fs.createReadStream(filePath);
174-
175-
const rl = readline.createInterface({
176-
input: inputStream,
177-
output: process.stdout,
178-
terminal: false
179-
});
180-
181-
rl.on('line', (line) => {
182-
outputStream.write(divider);
183-
divider = ',';
184-
outputStream.write(line);
185-
});
186-
187-
await new Promise((resolve, reject) => {
188-
rl.on('close', () => {
189-
resolve();
190-
});
191-
192-
rl.on('error', (error) => {
193-
reject(error);
194-
});
195-
});
196-
}
197-
198-
outputStream.write(']}');
199-
outputStream.end();
200-
}
201-
20222
program
20323
.command('process-hour <date> <hour>')
20424
.description('Process an hour of changesets starting from a given date and hour (in UTC) and combine the changed features into a single GeoJSON file.')

pipeline/fetchOsc.js

Lines changed: 0 additions & 29 deletions
This file was deleted.

pipeline/package.json

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
{
2-
"name": "osmgradient",
2+
"name": "osm-gradient-pipeline",
33
"version": "1.0.0",
44
"description": "Minutely metrics for OSM using FlatGeoBuff",
5-
"main": "index.js",
65
"scripts": {
76
"test": "echo \"Error: no test specified\" && exit 1"
87
},
@@ -11,11 +10,8 @@
1110
"dependencies": {
1211
"axios": "^1.5.0",
1312
"commander": "12.0.0",
14-
"osm-adiff-parser": "^1.1.0",
1513
"real-changesets-parser": "https://github.com/developmentseed/real-changesets-parser.git",
16-
"sax": "^1.3.0",
17-
"serve": "^14.2.1"
18-
14+
"sax": "^1.3.0"
1915
},
2016
"devDependencies": {
2117
"geojson-validation": "^1.0.2"

pipeline/parser.js

Lines changed: 0 additions & 31 deletions
This file was deleted.

0 commit comments

Comments
 (0)