diff --git a/tools/updatedb.js b/tools/updatedb.js index 6bfc458f..130a171b 100644 --- a/tools/updatedb.js +++ b/tools/updatedb.js @@ -31,6 +31,8 @@ const { Address6, Address4 } = require('ip-address'); // ============================================================================ // Logging utility for consistent and readable output +const numberFormatter = new Intl.NumberFormat('en-US'); + const log = { info: (msg, ...logArgs) => console.log('[INFO]', msg, ...logArgs), success: (msg, ...logArgs) => console.log('[SUCCESS]', msg, ...logArgs), @@ -38,8 +40,32 @@ const log = { error: (msg, ...logArgs) => console.error('[ERROR]', msg, ...logArgs), progress: (msg) => process.stdout.write(`[INFO] ${msg}... `), done: () => console.log('Done'), + progressCount: (msg, count, unit = 'entries processed') => { + console.log('[INFO]', `${msg} (${numberFormatter.format(count)} ${unit})`); + }, }; +function startProcessingStage(label, unit = 'entries', intervalMs = 5000) { + log.info(`Processing ${label} (this may take a moment)...`); + let lastLog = Date.now(); + return { + progress(count) { + const now = Date.now(); + if (now - lastLog >= intervalMs) { + lastLog = now; + log.progressCount(`Processing ${label}`, count, `${unit} processed`); + } + }, + complete(total) { + if (typeof total === 'number') { + log.info(`${label} processed (${numberFormatter.format(total)} ${unit})`); + } else { + log.info(`${label} processed`); + } + }, + }; +} + // ============================================================================ // Configuration // ============================================================================ @@ -333,17 +359,14 @@ function processLookupCountry(src, cb) { // Data Processing Functions // ============================================================================ -async function processCountryData(src, dest) { +async function processCountryData(src, dest, label = 'Country data') { let lines = 0; const dataFile = path.join(dataPath, dest); const tmpDataFile = path.join(tmpPath, src); rimraf(dataFile); mkdir(dataFile); - - process.stdout.write('\n'); - log.progress('Processing country data (this may take a moment)'); - let tstart = Date.now(); + const stage = startProcessingStage(label); const datFile = fs.createWriteStream(dataFile); function processLine(line) { @@ -351,6 +374,7 @@ async function processCountryData(src, dest) { if (!fields || fields.length < 6) return log.warn('Malformed line detected:', line); lines++; + stage.progress(lines); let sip; let eip; @@ -390,10 +414,6 @@ async function processCountryData(src, dest) { } b.write(cc, bsz - 2); - if (Date.now() - tstart > 5000) { - tstart = Date.now(); - process.stdout.write(`\nStill working (${lines})...`); - } if (datFile._writableState.needDrain) { return new Promise(resolve => { @@ -405,27 +425,62 @@ async function processCountryData(src, dest) { } } - const rl = readline.createInterface({ input: fs.createReadStream(tmpDataFile), crlfDelay: Infinity }); - let i = 0; - for await (const line of rl) { - i++; - if (i === 1) continue; - await processLine(line); - } + await new Promise((resolve, reject) => { + const rl = readline.createInterface({ input: fs.createReadStream(tmpDataFile), crlfDelay: Infinity }); + let settled = false; + let i = 0; + + function finish(err) { + if (settled) return; + settled = true; + if (!rl.closed) rl.close(); + if (err) reject(err); + else resolve(); + } + + function resume() { + if (!settled && !rl.closed) rl.resume(); + } + + rl.on('line', line => { + rl.pause(); + i++; + if (i === 1) { + resume(); + return; + } + + let result; + try { + result = processLine(line); + } catch (err) { + finish(err); + return; + } + + if (result && typeof result.then === 'function') { + result.then(() => { + resume(); + }).catch(finish); + } else { + resume(); + } + }); + + rl.on('close', () => finish()); + rl.on('error', finish); + }); datFile.close(); - log.done(); + stage.complete(lines); } -async function processCityData(src, dest) { +async function processCityData(src, dest, label = 'City data') { let lines = 0; const dataFile = path.join(dataPath, dest); const tmpDataFile = path.join(tmpPath, src); rimraf(dataFile); - - process.stdout.write('\n'); - log.progress('Processing city data (this may take a moment)'); - let tstart = Date.now(); + const stage = startProcessingStage(label); const datFile = fs.createWriteStream(dataFile); async function processLine(line) { @@ -446,6 +501,7 @@ async function processCityData(src, dest) { let i; lines++; + stage.progress(lines); if (fields[0].match(/:/)) { // IPv6 @@ -500,11 +556,6 @@ async function processCityData(src, dest) { b.writeInt32BE(area, 20); } - if (Date.now() - tstart > 5000) { - tstart = Date.now(); - process.stdout.write('\n[INFO] Processing... (' + lines + ' entries) '); - } - if (datFile._writableState.needDrain) { return new Promise((resolve) => { datFile.write(b, resolve); @@ -514,14 +565,53 @@ async function processCityData(src, dest) { } } - const rl = readline.createInterface({ input: fs.createReadStream(tmpDataFile), crlfDelay: Infinity }); - let i = 0; - for await (const line of rl) { - i++; - if (i === 1) continue; - await processLine(line); - } + await new Promise((resolve, reject) => { + const rl = readline.createInterface({ input: fs.createReadStream(tmpDataFile), crlfDelay: Infinity }); + let settled = false; + let i = 0; + + function finish(err) { + if (settled) return; + settled = true; + if (!rl.closed) rl.close(); + if (err) reject(err); + else resolve(); + } + + function resume() { + if (!settled && !rl.closed) rl.resume(); + } + + rl.on('line', line => { + rl.pause(); + i++; + if (i === 1) { + resume(); + return; + } + + let result; + try { + result = processLine(line); + } catch (err) { + finish(err); + return; + } + + if (result && typeof result.then === 'function') { + result.then(() => { + resume(); + }).catch(finish); + } else { + resume(); + } + }); + + rl.on('close', () => finish()); + rl.on('error', finish); + }); datFile.close(); + stage.complete(lines); } function processCityDataNames(src, dest, cb) { @@ -533,6 +623,7 @@ function processCityDataNames(src, dest, cb) { rimraf(dataFile); const datFile = fs.openSync(dataFile, 'w'); + const stage = startProcessingStage('City names', 'records'); function processLine(line) { if (line.match(/^Copyright/) || !line.match(/\d/)) return; @@ -567,6 +658,7 @@ function processCityDataNames(src, dest, cb) { fs.writeSync(datFile, b, 0, b.length, null); linesCount++; + stage.progress(linesCount); } const rl = readline.createInterface({ input: fs.createReadStream(tmpDataFile).pipe(decodeStream('utf-8')), output: process.stdout, terminal: false }); @@ -578,7 +670,10 @@ function processCityDataNames(src, dest, cb) { lineCount++; }); - rl.on('close', cb); + rl.on('close', () => { + stage.complete(linesCount); + cb(); + }); } // ============================================================================ @@ -595,26 +690,22 @@ function processData(database, cb) { if (type === 'country') { if (Array.isArray(src)) { processLookupCountry(src[0], () => { - processCountryData(src[1], dest[1]).then(() => { - return processCountryData(src[2], dest[2]); + processCountryData(src[1], dest[1], 'Country IPv4 data').then(() => { + return processCountryData(src[2], dest[2], 'Country IPv6 data'); }).then(() => { cb(null, database); }); }); - } - else { - processCountryData(src, dest, () => { + } else { + processCountryData(src, dest, 'Country data').then(() => { cb(null, database); }); } } else if (type === 'city') { processCityDataNames(src[0], dest[0], () => { - processCityData(src[1], dest[1]).then(() => { - process.stdout.write('\n'); - log.info('City IPv4 data processed'); - return processCityData(src[2], dest[2]); + processCityData(src[1], dest[1], 'City IPv4 data').then(() => { + return processCityData(src[2], dest[2], 'City IPv6 data'); }).then(() => { - log.info('City IPv6 data processed'); cb(null, database); }); });