Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ env:
OFFSET: 0
SUFFIX: ${{ secrets.SUFFIX }}
ID: openaq
PSQL_HOST: ${{ secrets.PSQL_HOST }}
PSQL_HOST: ${{ secrets.PSQL_HOST }}
API_URL: ${{ secrets.API_URL }}
TOPIC_ARN: ${{ secrets.TOPIC_ARN }}

jobs:
deploy:
Expand Down Expand Up @@ -41,4 +42,4 @@ jobs:
working-directory: ./cdk
run: |
npm install
cdk deploy openaq-fetcher --require-approval never
cdk deploy openaq-fetcher --require-approval never
19 changes: 10 additions & 9 deletions src/fetch.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,17 @@ export function handler (event, context) {
return true;
}

const fetchReport = {
itemsInserted: 0,
timeStarted: Date.now(),
results: {}, // placeholder for the results
errors: null,
timeEnded: NaN,
};

return Promise.race([
handleSigInt(runningSources),
handleProcessTimeout(processTimeout, runningSources),
handleSigInt(runningSources, fetchReport, env),
handleProcessTimeout(processTimeout, runningSources, fetchReport, env),
handleUnresolvedPromises(strict),
handleWarnings(['MaxListenersExceededWarning'], strict),
(async function () {
Expand All @@ -138,13 +146,6 @@ export function handler (event, context) {
} else {
log.info('--- Full fetch started. ---');
}
const fetchReport = {
itemsInserted: 0,
timeStarted: Date.now(),
results: null,
errors: null,
timeEnded: NaN,
};

log.info(
`--- Running with ${maxParallelAdapters} parallel adapters ---`
Expand Down
7 changes: 6 additions & 1 deletion src/lib/adapters.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,19 @@ export function prepareCompleteResultsMessage (stream, fetchReport, {dryrun}) {
log.info(`complete results - ${Date.now()}`);
return stream.map(
measurements => {
log.debug(`Fetch results for ${measurements.source.name}`);
log.debug(`Fetched results for ${measurements.source.name}`);
const result = measurements.resultsMessage;
// Add to inserted count if response has a count, if there was a failure
// response will not have a count
if (result.count > 0) {
fetchReport.itemsInserted += result.count;
}


if (result.sourceName) {
fetchReport.results[result.sourceName] = result;
}

log.info(`${result.count} new measurements found for "${result.sourceName}" in ${result.duration}s`);

for (let [error, count] of Object.entries(result.failures || {})) {
Expand Down
11 changes: 10 additions & 1 deletion src/lib/env.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ const _argv = yargs
alias: 'd',
group: 'Main options:'
})
.options('local', {
boolean: true,
describe:
'Run the fetch process locally, useful for testing.',
alias: 'l',
group: 'Main options:'
})
.options('source', {
describe:
'Run the fetch process with only the defined source using source name.',
Expand Down Expand Up @@ -118,7 +125,7 @@ const _env = process.env;
* Read values from local file and add them to the global _env
* this is to help with local testing
*/
export const readEnvFromLocalFile = (envFile) => {
const readEnvFromLocalFile = (envFile) => {
const envs = readFileSync(envFile, 'utf8');
envs.split('\n').forEach(function (e) {
if (e) {
Expand Down Expand Up @@ -148,6 +155,7 @@ export default () => {
important,
datetime,
offset,
local,
verbose: _verbose,
quiet: _quiet,
strict: _strict
Expand Down Expand Up @@ -195,6 +203,7 @@ export default () => {
strict,
dryrun,
deployments,
local,
nofetch,
debug,
source,
Expand Down
43 changes: 28 additions & 15 deletions src/lib/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import sj from 'scramjet';
const { DataStream } = sj;

import log from './logger.js';
import { publish } from './notification.js';

// Symbol exports
export const MEASUREMENT_ERROR = Symbol('Measurement error');
Expand Down Expand Up @@ -237,28 +238,40 @@ export function resolveOnTimeout (timeout, value) {
return new Promise((resolve) => setTimeout(() => resolve(value), timeout));
}

export async function handleSigInt (runningSources) {
await (new Promise((resolve) => process.once('SIGINT', () => resolve())));

const unfinishedSources = Object.entries(runningSources)
.filter(([, v]) => v !== 'finished' && v !== 'filtered')
.map(([k]) => k)
.join(', ');
async function publishAfterError(runningSources, fetchReport, env) {
const unfinished = [];
Object.entries(runningSources).forEach(([key, status]) => {
if(status != 'filtered' && !fetchReport.results[key]) {
fetchReport.results[key] = {
message: 'not finished',
count: 0,
locations: 0,
sourceName: key,
}
unfinished.push(key)
}
});

log.warn(`Still running sources at interruption: [${unfinishedSources}]`);
if (!env.dryrun) {
await publish(fetchReport.results, 'fetcher/success');
} else {
Object.values(fetchReport.results)
.filter(r =>r.parameters)
.map( r => log.info(`${r.locations} locations from ${r.from} - ${r.to} | Parameters for ${r.sourceName}`, r.parameters));
}
log.warn(`Still running sources at interruption: [${unfinished}]`);
}

export async function handleSigInt (runningSources, fetchReport, env) {
await (new Promise((resolve) => process.once('SIGINT', () => resolve())));
await publishAfterError(runningSources, fetchReport, env);
throw new Error('Process interruped');
}

export async function handleProcessTimeout (processTimeout, runningSources) {
export async function handleProcessTimeout (processTimeout, runningSources, fetchReport, env) {
await resolveOnTimeout(processTimeout);

const unfinishedSources = Object.entries(runningSources)
.filter(([, v]) => v !== 'finished' && v !== 'filtered')
.map(([k]) => k);

log.error(`Still running sources at time out: ${unfinishedSources}`);

await publishAfterError(runningSources, fetchReport, env);
throw new Error('Process timed out');
}

Expand Down
7 changes: 4 additions & 3 deletions src/lib/notification.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async function sendUpdatedWebhook (apiURL, webhookKey) {
return promisify(request.post)(apiURL, { form: form });
}

async function publish(message, subject) {
export async function publish(message, subject) {
// the following just looks better in the log
if(process.env.TOPIC_ARN) {
const cmd = new PublishCommand({
Expand All @@ -40,6 +40,7 @@ async function publish(message, subject) {
*/
export function reportAndRecordFetch (fetchReport, sources, env, apiURL, webhookKey) {
return async (results) => {
// copy over the results for now
fetchReport.results = results;
fetchReport.timeEnded = Date.now();
fetchReport.errors = results.reduce((acc, {failures}) => {
Expand All @@ -65,8 +66,8 @@ export function reportAndRecordFetch (fetchReport, sources, env, apiURL, webhook
} else {
// for dev purposes
failures.map(r => console.warn(`No results`, r));
fetchReport.results.map( r => log.debug(`${r.locations} locations from ${r.from} - ${r.to} | Parameters for ${r.sourceName}`, r.parameters));
fetchReport.results.map( r => log.info(`${r.locations} locations from ${r.from} - ${r.to} | Parameters for ${r.sourceName}`, r.parameters));
}
return 0;
};
}
}
2 changes: 1 addition & 1 deletion src/lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export function convertUnits (input) { return input; }
* @return {Array} An array of measurements converted to system-preferred units
*/
export function unifyMeasurementUnits (m) {
if (!m || typeof m.unit !== 'string' || isNaN(+m.value)) return;
if (!m || typeof m.unit !== 'string' || isNaN(+m.value)) return m;

// ignore and pass through values that are known error codes
if (m.value === -9999 || m.value === 9999) {
Expand Down
3 changes: 2 additions & 1 deletion src/scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ export async function handler (event, context) {
d.suffix = `${d.name}_`;
d.sources = sources;
let body = JSON.stringify(d)
if (env.dryrun) {

if (env.dryrun || env.local) {
console.log(`${d.name} with ${d.sources.length} sources`)
let messageId = 'fake-message-id';
let event = { name: d.name, Records: [{ body, messageId }] }
Expand Down