diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index ab899b88..f6357894 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -12,8 +12,9 @@ env: OFFSET: 0 SUFFIX: ${{ secrets.SUFFIX }} ID: openaq - PSQL_HOST: ${{ secrets.PSQL_HOST }} + PSQL_HOST: ${{ secrets.PSQL_HOST }} API_URL: ${{ secrets.API_URL }} + TOPIC_ARN: ${{ secrets.TOPIC_ARN }} jobs: deploy: @@ -41,4 +42,4 @@ jobs: working-directory: ./cdk run: | npm install - cdk deploy openaq-fetcher --require-approval never + cdk deploy openaq-fetcher --require-approval never diff --git a/src/fetch.js b/src/fetch.js index 7eb2a8db..0eb096c2 100644 --- a/src/fetch.js +++ b/src/fetch.js @@ -125,9 +125,17 @@ export function handler (event, context) { return true; } + const fetchReport = { + itemsInserted: 0, + timeStarted: Date.now(), + results: {}, // placeholder for the results + errors: null, + timeEnded: NaN, + }; + return Promise.race([ - handleSigInt(runningSources), - handleProcessTimeout(processTimeout, runningSources), + handleSigInt(runningSources, fetchReport, env), + handleProcessTimeout(processTimeout, runningSources, fetchReport, env), handleUnresolvedPromises(strict), handleWarnings(['MaxListenersExceededWarning'], strict), (async function () { @@ -138,13 +146,6 @@ export function handler (event, context) { } else { log.info('--- Full fetch started. ---'); } - const fetchReport = { - itemsInserted: 0, - timeStarted: Date.now(), - results: null, - errors: null, - timeEnded: NaN, - }; log.info( `--- Running with ${maxParallelAdapters} parallel adapters ---` diff --git a/src/lib/adapters.js b/src/lib/adapters.js index bda569f4..e5a59726 100644 --- a/src/lib/adapters.js +++ b/src/lib/adapters.js @@ -161,7 +161,7 @@ export function prepareCompleteResultsMessage (stream, fetchReport, {dryrun}) { log.info(`complete results - ${Date.now()}`); return stream.map( measurements => { - log.debug(`Fetch results for ${measurements.source.name}`); + log.debug(`Fetched results for ${measurements.source.name}`); const result = measurements.resultsMessage; // Add to inserted count if response has a count, if there was a failure // response will not have a count @@ -169,6 +169,11 @@ export function prepareCompleteResultsMessage (stream, fetchReport, {dryrun}) { fetchReport.itemsInserted += result.count; } + + if (result.sourceName) { + fetchReport.results[result.sourceName] = result; + } + log.info(`${result.count} new measurements found for "${result.sourceName}" in ${result.duration}s`); for (let [error, count] of Object.entries(result.failures || {})) { diff --git a/src/lib/env.js b/src/lib/env.js index ac347d7b..cd75a492 100644 --- a/src/lib/env.js +++ b/src/lib/env.js @@ -60,6 +60,13 @@ const _argv = yargs alias: 'd', group: 'Main options:' }) + .options('local', { + boolean: true, + describe: + 'Run the fetch process locally, useful for testing.', + alias: 'l', + group: 'Main options:' + }) .options('source', { describe: 'Run the fetch process with only the defined source using source name.', @@ -118,7 +125,7 @@ const _env = process.env; * Read values from local file and add them to the global _env * this is to help with local testing */ -export const readEnvFromLocalFile = (envFile) => { +const readEnvFromLocalFile = (envFile) => { const envs = readFileSync(envFile, 'utf8'); envs.split('\n').forEach(function (e) { if (e) { @@ -148,6 +155,7 @@ export default () => { important, datetime, offset, + local, verbose: _verbose, quiet: _quiet, strict: _strict @@ -195,6 +203,7 @@ export default () => { strict, dryrun, deployments, + local, nofetch, debug, source, diff --git a/src/lib/errors.js b/src/lib/errors.js index 8d5bdeb7..cb5f610d 100644 --- a/src/lib/errors.js +++ b/src/lib/errors.js @@ -3,6 +3,7 @@ import sj from 'scramjet'; const { DataStream } = sj; import log from './logger.js'; +import { publish } from './notification.js'; // Symbol exports export const MEASUREMENT_ERROR = Symbol('Measurement error'); @@ -237,28 +238,40 @@ export function resolveOnTimeout (timeout, value) { return new Promise((resolve) => setTimeout(() => resolve(value), timeout)); } -export async function handleSigInt (runningSources) { - await (new Promise((resolve) => process.once('SIGINT', () => resolve()))); - const unfinishedSources = Object.entries(runningSources) - .filter(([, v]) => v !== 'finished' && v !== 'filtered') - .map(([k]) => k) - .join(', '); +async function publishAfterError(runningSources, fetchReport, env) { + const unfinished = []; + Object.entries(runningSources).forEach(([key, status]) => { + if(status != 'filtered' && !fetchReport.results[key]) { + fetchReport.results[key] = { + message: 'not finished', + count: 0, + locations: 0, + sourceName: key, + } + unfinished.push(key) + } + }); - log.warn(`Still running sources at interruption: [${unfinishedSources}]`); + if (!env.dryrun) { + await publish(fetchReport.results, 'fetcher/success'); + } else { + Object.values(fetchReport.results) + .filter(r =>r.parameters) + .map( r => log.info(`${r.locations} locations from ${r.from} - ${r.to} | Parameters for ${r.sourceName}`, r.parameters)); + } + log.warn(`Still running sources at interruption: [${unfinished}]`); +} +export async function handleSigInt (runningSources, fetchReport, env) { + await (new Promise((resolve) => process.once('SIGINT', () => resolve()))); + await publishAfterError(runningSources, fetchReport, env); throw new Error('Process interruped'); } -export async function handleProcessTimeout (processTimeout, runningSources) { +export async function handleProcessTimeout (processTimeout, runningSources, fetchReport, env) { await resolveOnTimeout(processTimeout); - - const unfinishedSources = Object.entries(runningSources) - .filter(([, v]) => v !== 'finished' && v !== 'filtered') - .map(([k]) => k); - - log.error(`Still running sources at time out: ${unfinishedSources}`); - + await publishAfterError(runningSources, fetchReport, env); throw new Error('Process timed out'); } diff --git a/src/lib/notification.js b/src/lib/notification.js index badb0238..90201df8 100644 --- a/src/lib/notification.js +++ b/src/lib/notification.js @@ -17,7 +17,7 @@ async function sendUpdatedWebhook (apiURL, webhookKey) { return promisify(request.post)(apiURL, { form: form }); } -async function publish(message, subject) { +export async function publish(message, subject) { // the following just looks better in the log if(process.env.TOPIC_ARN) { const cmd = new PublishCommand({ @@ -40,6 +40,7 @@ async function publish(message, subject) { */ export function reportAndRecordFetch (fetchReport, sources, env, apiURL, webhookKey) { return async (results) => { + // copy over the results for now fetchReport.results = results; fetchReport.timeEnded = Date.now(); fetchReport.errors = results.reduce((acc, {failures}) => { @@ -65,8 +66,8 @@ export function reportAndRecordFetch (fetchReport, sources, env, apiURL, webhook } else { // for dev purposes failures.map(r => console.warn(`No results`, r)); - fetchReport.results.map( r => log.debug(`${r.locations} locations from ${r.from} - ${r.to} | Parameters for ${r.sourceName}`, r.parameters)); + fetchReport.results.map( r => log.info(`${r.locations} locations from ${r.from} - ${r.to} | Parameters for ${r.sourceName}`, r.parameters)); } return 0; }; -} \ No newline at end of file +} diff --git a/src/lib/utils.js b/src/lib/utils.js index 0211694d..0e1801b5 100644 --- a/src/lib/utils.js +++ b/src/lib/utils.js @@ -29,7 +29,7 @@ export function convertUnits (input) { return input; } * @return {Array} An array of measurements converted to system-preferred units */ export function unifyMeasurementUnits (m) { - if (!m || typeof m.unit !== 'string' || isNaN(+m.value)) return; + if (!m || typeof m.unit !== 'string' || isNaN(+m.value)) return m; // ignore and pass through values that are known error codes if (m.value === -9999 || m.value === 9999) { diff --git a/src/scheduler.js b/src/scheduler.js index 608dbc33..06c9be14 100644 --- a/src/scheduler.js +++ b/src/scheduler.js @@ -56,7 +56,8 @@ export async function handler (event, context) { d.suffix = `${d.name}_`; d.sources = sources; let body = JSON.stringify(d) - if (env.dryrun) { + + if (env.dryrun || env.local) { console.log(`${d.name} with ${d.sources.length} sources`) let messageId = 'fake-message-id'; let event = { name: d.name, Records: [{ body, messageId }] }