Skip to content

Commit 64ca26c

Browse files
committed
namespace the directory used for processing data by date and hour
1 parent fade83e commit 64ca26c

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

pipeline/src/process.js

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,19 @@ const { containsInvalidCoordinate } = require('./utils');
1616
async function processChangesets(changesets, date, hour) {
1717
// Array to store the file paths of the processed changesets
1818
const results = [];
19+
const dataPath = `${config.DATA_PATH}/${date}T${hour}`;
20+
// create the directory if it does not exist
21+
if (!fs.existsSync(dataPath)) {
22+
fs.mkdirSync(dataPath, { recursive: true });
23+
}
1924

2025
// Process changesets in batches of 10
2126
const batchSize = 100;
2227
for (let i = 0; i < changesets.length; i += batchSize) {
2328
const batch = changesets.slice(i, i + batchSize);
2429
await Promise.all(batch.map(async (changeset) => {
2530
// Process the changeset and get the result
26-
const result = await processChangeset(changeset);
31+
const result = await processChangeset(changeset, dataPath);
2732
results.push(result);
2833
}));
2934
}
@@ -35,9 +40,10 @@ async function processChangesets(changesets, date, hour) {
3540
/**
3641
* Processes a single changeset and returns the file path of the processed changeset.
3742
* @param {string} changeset - The changeset ID.
43+
* @param {string} dataPath - The path to the directory where the processed changesets will be stored.
3844
* @returns {Promise<string>} A promise that resolves to the file path of the processed changeset.
3945
*/
40-
async function processChangeset(changeset) {
46+
async function processChangeset(changeset, dataPath) {
4147
// Process the changeset asynchronously and return the result
4248
const url = `https://real-changesets.s3.amazonaws.com/${changeset}.json`;
4349
// console.log(`Processing changeset ${changeset}`);
@@ -52,7 +58,7 @@ async function processChangeset(changeset) {
5258
console.log(`No features found in changeset ${changeset}`);
5359
return;
5460
}
55-
const filePath = `${config.DATA_PATH}/${changeset}_features.json`;
61+
const filePath = `${dataPath}/${changeset}_features.json`;
5662

5763
await Promise.all(features.map(async (feature) => {
5864
if (feature !== null && feature !== undefined) {
@@ -87,7 +93,8 @@ async function processChangeset(changeset) {
8793
*/
8894
async function combineResults(results, date, hour) {
8995
console.log(`Combining results from ${results.length} changesets`);
90-
const outputStream = fs.createWriteStream(`${config.DATA_PATH}/${date}T${hour}:00.geojson`);
96+
const dataPath = `${config.DATA_PATH}/${date}T${hour}`;
97+
const outputStream = fs.createWriteStream(`${dataPath}/${date}T${hour}:00.geojson`);
9198

9299
outputStream.write('{"type":"FeatureCollection","features":[');
93100

0 commit comments

Comments
 (0)