Skip to content

Commit 1a63f2b

Browse files
authored
Merge pull request #710 from nasa/harmony-1963
Harmony 1963 - data size change information in job status page
2 parents e4edc2e + 6851e82 commit 1a63f2b

File tree

9 files changed

+274
-25
lines changed

9 files changed

+274
-25
lines changed

db/db.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ CREATE TABLE `jobs` (
1818
`ignoreErrors` boolean not null,
1919
`destination_url` varchar(8192),
2020
`service_name` varchar(255),
21-
`provider_id` varchar(255)
21+
`provider_id` varchar(255),
22+
`original_data_size` double precision,
23+
`output_data_size` double precision
2224
);
2325

2426
CREATE TABLE `job_links` (
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
exports.up = function up(knex) {
2+
return knex.schema.alterTable('jobs', (t) => {
3+
t.double('original_data_size');
4+
t.double('output_data_size');
5+
});
6+
};
7+
8+
exports.down = function down(knex) {
9+
return knex.schema.table('jobs', (t) => {
10+
t.dropColumn('output_data_size');
11+
t.dropColumn('original_data_size');
12+
});
13+
};

services/harmony/app/backends/workflow-orchestration/work-item-updates.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { makeWorkScheduleRequest } from '../../backends/workflow-orchestration/w
99
import { Job, JobStatus } from '../../models/job';
1010
import JobLink, { getJobDataLinkCount } from '../../models/job-link';
1111
import JobMessage, {
12-
getMessageCountForJob, getErrorMessagesForJob, getWarningMessagesForJob, JobMessageLevel,
12+
getErrorMessagesForJob, getMessageCountForJob, getWarningMessagesForJob, JobMessageLevel,
1313
} from '../../models/job-message';
1414
import {
1515
decrementRunningCount, deleteUserWorkForJob, incrementReadyAndDecrementRunningCounts,
@@ -324,6 +324,7 @@ async function createAggregatingWorkItem(
324324
const itemLinks: StacItemLink[] = [];
325325
const s3 = objectStoreForProtocol('s3');
326326
// get all the previous results
327+
// TODO if we start supporting work-item warnings that still have output data, this will need to include them as well
327328
const workItemCount = await workItemCountForStep(tx, currentWorkItem.jobID, nextStep.stepIndex - 1, WorkItemStatus.SUCCESSFUL);
328329
if (workItemCount < 1) return false; // nothing to aggregate
329330
let page = 1;

services/harmony/app/frontends/jobs.ts

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,58 @@ export async function getJobsListing(
171171
}
172172
}
173173

174+
/**
175+
* Get a message explaining the change in size from the input to the output
176+
*
177+
* @param sizes - original and output sizes of the input in MiB (1024 x 1024 bytes)
178+
* @param precision - the number of decimal places to allow in the output
179+
* @returns a message explaining the size change as a percentage
180+
*/
181+
export function sizeChangeMessage(
182+
sizes: { originalSize: number; outputSize: number; },
183+
precision: number = 2): string {
184+
if (sizes.originalSize === 0) {
185+
return 'Original size is 0 - percent size change N/A';
186+
}
187+
if (sizes.outputSize === 0) {
188+
return 'Output size is 0 - percent size change N/A';
189+
}
190+
let result: string;
191+
const diff = sizes.originalSize - sizes.outputSize;
192+
if (diff < 0) {
193+
const percent = (-diff / sizes.originalSize * 100.0).toFixed(precision);
194+
result = `${percent}% increase`;
195+
} else if (diff > 0) {
196+
let percent = (diff / sizes.originalSize * 100.0).toFixed(precision);
197+
// due to JS precision issues, big changes will appear to be 100% reduction, which is impossible
198+
if (percent === 100.0.toFixed(precision)) percent = 99.99.toFixed(precision);
199+
200+
result = `${percent}% reduction`;
201+
} else {
202+
result = 'no change';
203+
}
204+
205+
return result;
206+
}
207+
208+
/**
209+
* Format a data size number as a string for human presentation
210+
* @param mibSize - the float size in MiB (1024x1024 bytes)
211+
* @param precision - the number of decimal places to allow in the output
212+
* @returns a string representing the size using B, KiB, MiB, etc., notation
213+
*/
214+
export function formatDataSize(mibSize: number, precision: number = 2): string {
215+
const units = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB'];
216+
let size = mibSize * 1024 * 1024;
217+
let unitIndex = 0;
218+
while (size >= 1024 && unitIndex < units.length - 1) {
219+
size /= 1024;
220+
unitIndex++;
221+
}
222+
223+
return `${size.toFixed(precision)} ${units[unitIndex]}`;
224+
}
225+
174226
/**
175227
* Express.js handler that returns job status for a single job `(/jobs/{jobID})`
176228
*
@@ -189,17 +241,11 @@ export async function getJobStatus(
189241
try {
190242
validateJobId(jobID);
191243
const { page, limit } = getPagingParams(req, env.defaultResultPageSize);
192-
let job: Job;
193-
let pagination;
194-
let messages: JobMessage[];
195-
196-
await db.transaction(async (tx) => {
197-
({ job, pagination } = await Job.byJobID(tx, jobID, true, true, false, page, limit));
198-
messages = await getMessagesForJob(tx, jobID);
199-
});
244+
const { job, pagination } = await Job.byJobID(db, jobID, true, true, false, page, limit);
200245
if (!job) {
201246
throw new NotFoundError(`Unable to find job ${jobID}`);
202247
}
248+
const messages: JobMessage[] = await getMessagesForJob(db, jobID);
203249
const isAdmin = await isAdminUser(req);
204250
const isAdminOrOwner = job.belongsToOrIsAdmin(req.user, isAdmin);
205251
const isJobShareable = await job.isShareable(req.accessToken);
@@ -210,6 +256,12 @@ export async function getJobStatus(
210256
const pagingLinks = getPagingLinks(req, pagination).map((link) => new JobLink(link));
211257
job.links = job.links.concat(pagingLinks);
212258
const jobForDisplay = getJobForDisplay(job, urlRoot, linkType, messages);
259+
if (job.original_data_size && job.output_data_size) {
260+
jobForDisplay.originalDataSize = formatDataSize(job.original_data_size);
261+
jobForDisplay.outputDataSize = formatDataSize(job.output_data_size);
262+
jobForDisplay.dataSizePercentChange =
263+
sizeChangeMessage({ originalSize: job.original_data_size, outputSize: job.output_data_size });
264+
}
213265
res.send(jobForDisplay);
214266
} catch (e) {
215267
req.context.logger.error(e);

services/harmony/app/models/job.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import JobMessage from './job-message';
1919
import { getLabelsForJob, JOBS_LABELS_TABLE, LABELS_TABLE, setLabelsForJob } from './label';
2020
import DBRecord from './record';
2121
import { setReadyCountToZero } from './user-work';
22+
import { getTotalWorkItemSizesForJobID } from './work-item';
2223
import WorkflowStep, { getWorkflowStepsByJobId } from './workflow-steps';
2324

2425
// how long data generated by this job will be available
@@ -34,7 +35,7 @@ let providerIdsSnapshot: string[];
3435
export const jobRecordFields = [
3536
'username', 'status', 'message', 'progress', 'createdAt', 'updatedAt', 'request',
3637
'numInputGranules', 'jobID', 'requestId', 'batchesCompleted', 'isAsync', 'ignoreErrors', 'destination_url',
37-
'service_name', 'provider_id',
38+
'service_name', 'provider_id', 'original_data_size', 'output_data_size',
3839
];
3940

4041
const stagingBucketTitle = `Results in AWS S3. Access from AWS ${awsDefaultRegion} with keys from /cloud-access.sh`;
@@ -84,6 +85,8 @@ export interface JobRecord {
8485
provider_id?: string;
8586
destination_url?: string;
8687
service_name?: string,
88+
original_data_size?: number;
89+
output_data_size?: number;
8790
}
8891

8992
/**
@@ -114,6 +117,12 @@ export class JobForDisplay {
114117

115118
numInputGranules: number;
116119

120+
originalDataSize?: string;
121+
122+
outputDataSize?: string;
123+
124+
dataSizePercentChange?: string;
125+
117126
errors?: JobMessage[];
118127

119128
warnings?: JobMessage[];
@@ -464,6 +473,10 @@ export class Job extends DBRecord implements JobRecord {
464473

465474
labels: string[];
466475

476+
original_data_size?: number;
477+
478+
output_data_size?: number;
479+
467480
/**
468481
* Get the job message for the current status.
469482
* @returns the message string describing the job
@@ -1100,6 +1113,14 @@ export class Job extends DBRecord implements JobRecord {
11001113
const truncatedFailureMessage = truncateString(this.getMessage(JobStatus.FAILED), TEXT_LIMIT - reservedMessageChars);
11011114
this.setMessage(truncatedFailureMessage, JobStatus.FAILED);
11021115
this.request = truncateString(this.request, TEXT_LIMIT);
1116+
1117+
// only get data reduction numbers when the job is complete and at least partially successful
1118+
if (this.status === JobStatus.SUCCESSFUL || this.status === JobStatus.COMPLETE_WITH_ERRORS) {
1119+
const workItemsSizes = await getTotalWorkItemSizesForJobID(tx, this.jobID);
1120+
this.original_data_size = workItemsSizes.originalSize;
1121+
this.output_data_size = workItemsSizes.outputSize;
1122+
}
1123+
11031124
const dbRecord: Record<string, unknown> = pick(this, jobRecordFields);
11041125
dbRecord.collectionIds = JSON.stringify(this.collectionIds || []);
11051126
dbRecord.message = JSON.stringify(this.statesToMessages || {});

services/harmony/app/models/work-item.ts

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,19 @@
22
import { subMinutes } from 'date-fns';
33
import { ILengthAwarePagination } from 'knex-paginate';
44
import _ from 'lodash';
5-
import logger from '../util/log';
5+
6+
import { getWorkSchedulerQueue } from '../../app/util/queue/queue-factory';
7+
import { eventEmitter } from '../events';
68
import db, { Transaction } from '../util/db';
7-
import DataOperation from './data-operation';
89
import env from '../util/env';
10+
import logger from '../util/log';
11+
import DataOperation from './data-operation';
912
import { Job, JobStatus } from './job';
1013
import Record from './record';
14+
import {
15+
getStacLocation, WorkItemQuery, WorkItemRecord, WorkItemStatus,
16+
} from './work-item-interface';
1117
import WorkflowStep from './workflow-steps';
12-
import { WorkItemRecord, WorkItemStatus, getStacLocation, WorkItemQuery } from './work-item-interface';
13-
import { eventEmitter } from '../events';
14-
import { getWorkSchedulerQueue } from '../../app/util/queue/queue-factory';
1518

1619
// The step index for the query-cmr task. Right now query-cmr only runs as the first step -
1720
// if this changes we will have to revisit this
@@ -69,7 +72,7 @@ export default class WorkItem extends Record implements WorkItemRecord {
6972
// The location of the resulting STAC catalog(s) (not serialized)
7073
results?: string[];
7174

72-
// The sum of the sizes of the granules associated with this work item
75+
// The sum of the sizes of the outputs associated with this work item
7376
totalItemsSize?: number;
7477

7578
// The size (in bytes) of each STAC item produced by this work item (used for batching)
@@ -106,7 +109,7 @@ export default class WorkItem extends Record implements WorkItemRecord {
106109
/**
107110
* Saves the work items to the database using a single SQL statement.
108111
*
109-
* @param transaction - The transaction to use for saving the job link
112+
* @param transaction - The transaction to use for saving the work-items
110113
* @param workItems - The work items to save
111114
*/
112115
static async insertBatch(transaction: Transaction, workItems: WorkItem[]): Promise<void> {
@@ -759,7 +762,9 @@ export async function getScrollIdForJob(
759762
export async function getTotalWorkItemSizesForJobID(
760763
tx: Transaction,
761764
jobID: string,
762-
): Promise<{ originalSize: number, outputSize: number }> {
765+
): Promise<{ originalSize: number, outputSize: number; }> {
766+
logger.info('timing.getTotalWorkItemSizesForJobID.start');
767+
const startTime = new Date().getTime();
763768
const workflowStepIndexResults = await tx(WorkflowStep.table)
764769
.select()
765770
.min('stepIndex')
@@ -799,6 +804,8 @@ export async function getTotalWorkItemSizesForJobID(
799804
outputSize = Number(outputSizeResults[0]['sum(`totalItemsSize`)']);
800805
}
801806

807+
const durationMs = new Date().getTime() - startTime;
808+
logger.info('timing.getTotalWorkItemSizesForJobID.end', { durationMs });
802809

803810
return { originalSize, outputSize };
804811
}

services/harmony/app/util/aggregation-batch.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ async function createCatalogAndWorkItemForBatch(
195195
batchItemUrls);
196196

197197
// create a work item for the batch
198+
// TODO add calculation for source_items_size here
198199
const newWorkItem = new WorkItem({
199200
jobID,
200201
serviceID,

services/harmony/test/helpers/work-items.ts

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
import { Application } from 'express';
2+
import _ from 'lodash';
23
import { afterEach, beforeEach } from 'mocha';
34
import request, { Test } from 'supertest';
4-
import _ from 'lodash';
5+
6+
import { RecordConstructor } from '../../app/models/record';
57
import WorkItem from '../../app/models/work-item';
8+
import {
9+
getStacLocation, WorkItemRecord, WorkItemStatus,
10+
} from '../../app/models/work-item-interface';
611
import db, { Transaction } from '../../app/util/db';
12+
import { objectStoreForProtocol } from '../../app/util/object-store';
713
import { truncateAll } from './db';
814
import { hookBackendRequest } from './hooks';
9-
import { buildWorkflowStep, hookWorkflowStepCreation, hookWorkflowStepCreationEach } from './workflow-steps';
10-
import { RecordConstructor } from '../../app/models/record';
11-
import { WorkItemStatus, WorkItemRecord, getStacLocation } from '../../app/models/work-item-interface';
12-
import { objectStoreForProtocol } from '../../app/util/object-store';
15+
import {
16+
buildWorkflowStep, hookWorkflowStepCreation, hookWorkflowStepCreationEach,
17+
} from './workflow-steps';
1318

1419
export const exampleWorkItemProps = {
1520
jobID: '1',
@@ -45,7 +50,7 @@ export function buildWorkItem(fields: Partial<WorkItemRecord> = {}): WorkItem {
4550

4651
/**
4752
* Save a work item without validating or updating createdAt/updatedAt
48-
* @param tx - The transaction to use for saving the job
53+
* @param tx - The transaction to use for saving the work item
4954
* @param fields - The fields to save to the database, defaults to example values
5055
* @returns The saved work item
5156
* @throws Error - if the save to the database fails

0 commit comments

Comments
 (0)