Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion db/db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ CREATE TABLE `jobs` (
`ignoreErrors` boolean not null,
`destination_url` varchar(8192),
`service_name` varchar(255),
`provider_id` varchar(255)
`provider_id` varchar(255),
`original_data_size` double precision,
`output_data_size` double precision
);

CREATE TABLE `job_links` (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
exports.up = function up(knex) {
return knex.schema.alterTable('jobs', (t) => {
t.double('original_data_size');
t.double('output_data_size');
});
};

exports.down = function down(knex) {
return knex.schema.table('jobs', (t) => {
t.dropColumn('output_data_size');
t.dropColumn('original_data_size');
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { makeWorkScheduleRequest } from '../../backends/workflow-orchestration/w
import { Job, JobStatus } from '../../models/job';
import JobLink, { getJobDataLinkCount } from '../../models/job-link';
import JobMessage, {
getMessageCountForJob, getErrorMessagesForJob, getWarningMessagesForJob, JobMessageLevel,
getErrorMessagesForJob, getMessageCountForJob, getWarningMessagesForJob, JobMessageLevel,
} from '../../models/job-message';
import {
decrementRunningCount, deleteUserWorkForJob, incrementReadyAndDecrementRunningCounts,
Expand Down Expand Up @@ -324,6 +324,7 @@ async function createAggregatingWorkItem(
const itemLinks: StacItemLink[] = [];
const s3 = objectStoreForProtocol('s3');
// get all the previous results
// TODO if we start supporting work-item warnings that still have output data, this will need to include them as well
const workItemCount = await workItemCountForStep(tx, currentWorkItem.jobID, nextStep.stepIndex - 1, WorkItemStatus.SUCCESSFUL);
if (workItemCount < 1) return false; // nothing to aggregate
let page = 1;
Expand Down
68 changes: 60 additions & 8 deletions services/harmony/app/frontends/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,58 @@ export async function getJobsListing(
}
}

/**
* Get a message explaining the change in size from the input to the output
*
* @param sizes - original and output sizes of the input in MiB (1024 x 1024 bytes)
* @param precision - the number of decimal places to allow in the output
* @returns a message explaining the size change as a percentage
*/
export function sizeChangeMessage(
sizes: { originalSize: number; outputSize: number; },
precision: number = 2): string {
if (sizes.originalSize === 0) {
return 'Original size is 0 - percent size change N/A';
}
if (sizes.outputSize === 0) {
return 'Output size is 0 - percent size change N/A';
}
let result: string;
const diff = sizes.originalSize - sizes.outputSize;
if (diff < 0) {
const percent = (-diff / sizes.originalSize * 100.0).toFixed(precision);
result = `${percent}% increase`;
} else if (diff > 0) {
let percent = (diff / sizes.originalSize * 100.0).toFixed(precision);
// due to JS precision issues, big changes will appear to be 100% reduction, which is impossible
if (percent === 100.0.toFixed(precision)) percent = 99.99.toFixed(precision);

result = `${percent}% reduction`;
} else {
result = 'no change';
}

return result;
}

/**
* Format a data size number as a string for human presentation
* @param mibSize - the float size in MiB (1024x1024 bytes)
* @param precision - the number of decimal places to allow in the output
* @returns a string representing the size using B, KiB, MiB, etc., notation
*/
export function formatDataSize(mibSize: number, precision: number = 2): string {
const units = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB'];
let size = mibSize * 1024 * 1024;
let unitIndex = 0;
while (size >= 1024 && unitIndex < units.length - 1) {
size /= 1024;
unitIndex++;
}

return `${size.toFixed(precision)} ${units[unitIndex]}`;
}

/**
* Express.js handler that returns job status for a single job `(/jobs/{jobID})`
*
Expand All @@ -189,17 +241,11 @@ export async function getJobStatus(
try {
validateJobId(jobID);
const { page, limit } = getPagingParams(req, env.defaultResultPageSize);
let job: Job;
let pagination;
let messages: JobMessage[];

await db.transaction(async (tx) => {
({ job, pagination } = await Job.byJobID(tx, jobID, true, true, false, page, limit));
messages = await getMessagesForJob(tx, jobID);
});
const { job, pagination } = await Job.byJobID(db, jobID, true, true, false, page, limit);
if (!job) {
throw new NotFoundError(`Unable to find job ${jobID}`);
}
const messages: JobMessage[] = await getMessagesForJob(db, jobID);
const isAdmin = await isAdminUser(req);
const isAdminOrOwner = job.belongsToOrIsAdmin(req.user, isAdmin);
const isJobShareable = await job.isShareable(req.accessToken);
Expand All @@ -210,6 +256,12 @@ export async function getJobStatus(
const pagingLinks = getPagingLinks(req, pagination).map((link) => new JobLink(link));
job.links = job.links.concat(pagingLinks);
const jobForDisplay = getJobForDisplay(job, urlRoot, linkType, messages);
if (job.original_data_size && job.output_data_size) {
jobForDisplay.originalDataSize = formatDataSize(job.original_data_size);
jobForDisplay.outputDataSize = formatDataSize(job.output_data_size);
jobForDisplay.dataSizePercentChange =
sizeChangeMessage({ originalSize: job.original_data_size, outputSize: job.output_data_size });
}
res.send(jobForDisplay);
} catch (e) {
req.context.logger.error(e);
Expand Down
23 changes: 22 additions & 1 deletion services/harmony/app/models/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import JobMessage from './job-message';
import { getLabelsForJob, JOBS_LABELS_TABLE, LABELS_TABLE, setLabelsForJob } from './label';
import DBRecord from './record';
import { setReadyCountToZero } from './user-work';
import { getTotalWorkItemSizesForJobID } from './work-item';
import WorkflowStep, { getWorkflowStepsByJobId } from './workflow-steps';

// how long data generated by this job will be available
Expand All @@ -34,7 +35,7 @@ let providerIdsSnapshot: string[];
export const jobRecordFields = [
'username', 'status', 'message', 'progress', 'createdAt', 'updatedAt', 'request',
'numInputGranules', 'jobID', 'requestId', 'batchesCompleted', 'isAsync', 'ignoreErrors', 'destination_url',
'service_name', 'provider_id',
'service_name', 'provider_id', 'original_data_size', 'output_data_size',
];

const stagingBucketTitle = `Results in AWS S3. Access from AWS ${awsDefaultRegion} with keys from /cloud-access.sh`;
Expand Down Expand Up @@ -84,6 +85,8 @@ export interface JobRecord {
provider_id?: string;
destination_url?: string;
service_name?: string,
original_data_size?: number;
output_data_size?: number;
}

/**
Expand Down Expand Up @@ -114,6 +117,12 @@ export class JobForDisplay {

numInputGranules: number;

originalDataSize?: string;

outputDataSize?: string;

dataSizePercentChange?: string;

errors?: JobMessage[];

warnings?: JobMessage[];
Expand Down Expand Up @@ -464,6 +473,10 @@ export class Job extends DBRecord implements JobRecord {

labels: string[];

original_data_size?: number;

output_data_size?: number;

/**
* Get the job message for the current status.
* @returns the message string describing the job
Expand Down Expand Up @@ -1100,6 +1113,14 @@ export class Job extends DBRecord implements JobRecord {
const truncatedFailureMessage = truncateString(this.getMessage(JobStatus.FAILED), TEXT_LIMIT - reservedMessageChars);
this.setMessage(truncatedFailureMessage, JobStatus.FAILED);
this.request = truncateString(this.request, TEXT_LIMIT);

// only get data reduction numbers when the job is complete and at least partially successful
if (this.status === JobStatus.SUCCESSFUL || this.status === JobStatus.COMPLETE_WITH_ERRORS) {
const workItemsSizes = await getTotalWorkItemSizesForJobID(tx, this.jobID);
this.original_data_size = workItemsSizes.originalSize;
this.output_data_size = workItemsSizes.outputSize;
}

const dbRecord: Record<string, unknown> = pick(this, jobRecordFields);
dbRecord.collectionIds = JSON.stringify(this.collectionIds || []);
dbRecord.message = JSON.stringify(this.statesToMessages || {});
Expand Down
23 changes: 15 additions & 8 deletions services/harmony/app/models/work-item.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@
import { subMinutes } from 'date-fns';
import { ILengthAwarePagination } from 'knex-paginate';
import _ from 'lodash';
import logger from '../util/log';

import { getWorkSchedulerQueue } from '../../app/util/queue/queue-factory';
import { eventEmitter } from '../events';
import db, { Transaction } from '../util/db';
import DataOperation from './data-operation';
import env from '../util/env';
import logger from '../util/log';
import DataOperation from './data-operation';
import { Job, JobStatus } from './job';
import Record from './record';
import {
getStacLocation, WorkItemQuery, WorkItemRecord, WorkItemStatus,
} from './work-item-interface';
import WorkflowStep from './workflow-steps';
import { WorkItemRecord, WorkItemStatus, getStacLocation, WorkItemQuery } from './work-item-interface';
import { eventEmitter } from '../events';
import { getWorkSchedulerQueue } from '../../app/util/queue/queue-factory';

// The step index for the query-cmr task. Right now query-cmr only runs as the first step -
// if this changes we will have to revisit this
Expand Down Expand Up @@ -69,7 +72,7 @@ export default class WorkItem extends Record implements WorkItemRecord {
// The location of the resulting STAC catalog(s) (not serialized)
results?: string[];

// The sum of the sizes of the granules associated with this work item
// The sum of the sizes of the outputs associated with this work item
totalItemsSize?: number;

// The size (in bytes) of each STAC item produced by this work item (used for batching)
Expand Down Expand Up @@ -106,7 +109,7 @@ export default class WorkItem extends Record implements WorkItemRecord {
/**
* Saves the work items to the database using a single SQL statement.
*
* @param transaction - The transaction to use for saving the job link
* @param transaction - The transaction to use for saving the work-items
* @param workItems - The work items to save
*/
static async insertBatch(transaction: Transaction, workItems: WorkItem[]): Promise<void> {
Expand Down Expand Up @@ -759,7 +762,9 @@ export async function getScrollIdForJob(
export async function getTotalWorkItemSizesForJobID(
tx: Transaction,
jobID: string,
): Promise<{ originalSize: number, outputSize: number }> {
): Promise<{ originalSize: number, outputSize: number; }> {
logger.info('timing.getTotalWorkItemSizesForJobID.start');
const startTime = new Date().getTime();
const workflowStepIndexResults = await tx(WorkflowStep.table)
.select()
.min('stepIndex')
Expand Down Expand Up @@ -799,6 +804,8 @@ export async function getTotalWorkItemSizesForJobID(
outputSize = Number(outputSizeResults[0]['sum(`totalItemsSize`)']);
}

const durationMs = new Date().getTime() - startTime;
logger.info('timing.getTotalWorkItemSizesForJobID.end', { durationMs });

return { originalSize, outputSize };
}
Expand Down
1 change: 1 addition & 0 deletions services/harmony/app/util/aggregation-batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ async function createCatalogAndWorkItemForBatch(
batchItemUrls);

// create a work item for the batch
// TODO add calculation for source_items_size here
const newWorkItem = new WorkItem({
jobID,
serviceID,
Expand Down
17 changes: 11 additions & 6 deletions services/harmony/test/helpers/work-items.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import { Application } from 'express';
import _ from 'lodash';
import { afterEach, beforeEach } from 'mocha';
import request, { Test } from 'supertest';
import _ from 'lodash';

import { RecordConstructor } from '../../app/models/record';
import WorkItem from '../../app/models/work-item';
import {
getStacLocation, WorkItemRecord, WorkItemStatus,
} from '../../app/models/work-item-interface';
import db, { Transaction } from '../../app/util/db';
import { objectStoreForProtocol } from '../../app/util/object-store';
import { truncateAll } from './db';
import { hookBackendRequest } from './hooks';
import { buildWorkflowStep, hookWorkflowStepCreation, hookWorkflowStepCreationEach } from './workflow-steps';
import { RecordConstructor } from '../../app/models/record';
import { WorkItemStatus, WorkItemRecord, getStacLocation } from '../../app/models/work-item-interface';
import { objectStoreForProtocol } from '../../app/util/object-store';
import {
buildWorkflowStep, hookWorkflowStepCreation, hookWorkflowStepCreationEach,
} from './workflow-steps';

export const exampleWorkItemProps = {
jobID: '1',
Expand Down Expand Up @@ -45,7 +50,7 @@ export function buildWorkItem(fields: Partial<WorkItemRecord> = {}): WorkItem {

/**
* Save a work item without validating or updating createdAt/updatedAt
* @param tx - The transaction to use for saving the job
* @param tx - The transaction to use for saving the work item
* @param fields - The fields to save to the database, defaults to example values
* @returns The saved work item
* @throws Error - if the save to the database fails
Expand Down
Loading