Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/dive-common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ const zipFileTypes = [

const stereoPipelineMarker = 'measurement';
const multiCamPipelineMarkers = ['2-cam', '3-cam'];
const pipelineCreatesDatasetMarkers = ['transcode', 'filter'];

const JsonMetaRegEx = /^.*\.?(meta|config)\.json$/;

Expand Down Expand Up @@ -153,6 +154,7 @@ export {
zipFileTypes,
stereoPipelineMarker,
multiCamPipelineMarkers,
pipelineCreatesDatasetMarkers,
JsonMetaRegEx,
simplifyTrainingName,
};
47 changes: 44 additions & 3 deletions client/platform/desktop/backend/native/viame.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
import { cleanString } from 'platform/desktop/sharedUtils';
import { serialize } from 'platform/desktop/backend/serializers/viame';
import { observeChild } from 'platform/desktop/backend/native/processManager';
import sendToRenderer from 'platform/desktop/background';

import { MultiType, stereoPipelineMarker, multiCamPipelineMarkers } from 'dive-common/constants';
import * as common from './common';
Expand Down Expand Up @@ -129,12 +130,27 @@ async function runPipeline(
}
}

let outputDir: string | undefined;
const DIVE_OUTPUT_PROJECT_DIR = 'DIVE_Jobs_Output';
if (runPipelineArgs.pipeline.type === 'filter') {
command.push(`-s kwa_writer:output_directory="${npath.join(jobWorkDir, 'output')}"`);
command.push(`-s image_writer:file_name_prefix="${jobWorkDir}/"`);
if (runPipelineArgs.outputDatasetName) {
outputDir = runPipelineArgs.outputDatasetName;
} else {
const timestamp = (new Date()).toISOString().replace(/[:.]/g, '-');
outputDir = `${runPipelineArgs.pipeline.name}_${runPipelineArgs.datasetId}_${timestamp}`;
}
outputDir = `${npath.join(settings.dataPath, DIVE_OUTPUT_PROJECT_DIR, outputDir)}`;
if (outputDir !== jobWorkDir) {
await fs.mkdir(outputDir, { recursive: true });
}
command.push(`-s kwa_writer:output_directory="${outputDir}/"`);
command.push(`-s image_writer:file_name_prefix="${outputDir}/"`);
}

let transcodedFilename: string;
if (runPipelineArgs.pipeline.type === 'transcode') {
command.push(`-s video_writer:video_filename="${npath.join(jobWorkDir, `${datasetId}.mp4`)}"`);
transcodedFilename = npath.join(jobWorkDir, `${datasetId}.mp4`);
command.push(`-s video_writer:video_filename="${transcodedFilename}"`);
}

if (requiresInput && !stereoOrMultiCam) {
Expand Down Expand Up @@ -211,6 +227,31 @@ async function runPipeline(
});
});

if (['filter', 'transcode'].includes(runPipelineArgs.pipeline.type)) {
job.on('exit', async (code) => {
if (runPipelineArgs.pipeline.type === 'transcode') {
// TODO: work must be done to find the video file
return;
}
if (code === 0) {
// Ingest the output into a new dataset
updater({
...jobBase,
body: ['Creating dataset from output...'],
exitCode: code,
endTime: new Date(),
});
// transcodedFilename will only be assigned for transcode pipelines
const importSource = transcodedFilename || outputDir;
if (importSource) {
const importPayload = await common.beginMediaImport(importSource);
const conversionJobArgs = await common.finalizeMediaImport(settings, importPayload);
sendToRenderer('filter-complete', conversionJobArgs.meta);
}
}
});
}

return jobBase;
}

Expand Down
6 changes: 6 additions & 0 deletions client/platform/desktop/background.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,9 @@ if (process.platform === 'win32') {
cleanup();
});
}

export default function sendToRenderer(channel: string, payload?: unknown) {
if (win) {
win.webContents.send(channel, payload);
}
}
1 change: 1 addition & 0 deletions client/platform/desktop/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ export interface RunPipeline extends JobArgs {
type: JobType.RunPipeline;
datasetId: string;
pipeline: Pipe;
outputDatasetName?: string;
}

export interface ExportTrainedPipeline extends JobArgs {
Expand Down
11 changes: 11 additions & 0 deletions client/platform/desktop/frontend/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ async function runPipeline(itemId: string, pipeline: Pipe): Promise<void> {
gpuJobQueue.enqueue(args);
}

async function runPipelineWithOutput(itemId: string, pipeline: Pipe, outputDatasetName: string): Promise<void> {
const args: RunPipeline = {
type: JobType.RunPipeline,
pipeline,
datasetId: itemId,
outputDatasetName,
};
gpuJobQueue.enqueue(args);
}

async function exportTrainedPipeline(path: string, pipeline: Pipe): Promise<void> {
const args: ExportTrainedPipeline = {
type: JobType.ExportTrainedPipeline,
Expand Down Expand Up @@ -277,4 +287,5 @@ export {
openLink,
nvidiaSmi,
cancelJob,
runPipelineWithOutput,
};
38 changes: 36 additions & 2 deletions client/platform/desktop/frontend/components/MultiPipeline.vue
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import {
import { DataTableHeader } from 'vuetify';
import { useRouter } from 'vue-router/composables';
import { Pipe, Pipelines, useApi } from 'dive-common/apispec';
import { runPipelineWithOutput } from 'platform/desktop/frontend/api';
import {
itemsPerPageOptions,
stereoPipelineMarker,
multiCamPipelineMarkers,
pipelineCreatesDatasetMarkers,
MultiType,
} from 'dive-common/constants';
import { usePrompt } from 'dive-common/vue-utilities/prompt-service';
Expand Down Expand Up @@ -77,6 +79,23 @@ const stagedDatasetHeaders: DataTableHeader[] = headersTmpl.concat([
width: 80,
},
]);
const createNewDatasetHeaders: DataTableHeader[] = headersTmpl.concat([
{
text: 'Output Dataset Name',
value: 'output',
sortable: false,
},
{
text: 'Remove',
value: 'remove',
sortable: false,
width: 80,
},
]);
function computeOutputDatasetName(item: JsonMetaCache) {
const timeStamp = (new Date()).toISOString().replace(/[:.]/g, '-');
return `${selectedPipeline.value?.name}_${item.name}_${timeStamp}`;
}
function getAvailableItems(): JsonMetaCache[] {
if (!selectedPipelineType.value || !selectedPipeline.value) {
return [];
Expand Down Expand Up @@ -108,7 +127,16 @@ function toggleStaged(item: JsonMetaCache) {
async function runPipelineForDatasets() {
if (selectedPipeline.value !== null) {
const results = await Promise.allSettled(
stagedDatasetIds.value.map((datasetId: string) => runPipeline(datasetId, selectedPipeline.value!)),
stagedDatasetIds.value.map((datasetId: string) => {
if (['transcode', 'filter'].includes(selectedPipeline.value?.type || '')) {
const datasetMeta = availableItems.value.find((item: JsonMetaCache) => item.id === datasetId);
if (!datasetMeta) {
throw new Error(`Attempted to run pipeline on nonexistant dataset ${datasetId}`);
}
return runPipelineWithOutput(datasetId, selectedPipeline.value!, computeOutputDatasetName(datasetMeta));
}
return runPipeline(datasetId, selectedPipeline.value!);
}),
);
const failed = results
.map((result, i) => ({ result, datasetId: stagedDatasetIds.value[i] }))
Expand Down Expand Up @@ -178,7 +206,10 @@ onBeforeMount(async () => {
<v-card-title>Datasets staged for selected pipeline</v-card-title>
<v-data-table
dense
v-bind="{ headers: stagedDatasetHeaders, items: stagedDatasets }"
v-bind="{
headers: pipelineCreatesDatasetMarkers.includes(selectedPipelineType || '') ? createNewDatasetHeaders : stagedDatasetHeaders,
items: stagedDatasets,
}"
:items-per-page.sync="clientSettings.rowsPerPage"
hide-default-footer
:hide-default-header="stagedDatasets.length === 0"
Expand All @@ -193,6 +224,9 @@ onBeforeMount(async () => {
<v-icon>mdi-minus</v-icon>
</v-btn>
</template>
<template #[`item.output`]="{ item }">
<b>{{ computeOutputDatasetName(item) }}</b>
</template>
</v-data-table>
</div>
<v-row class="mt-7">
Expand Down
5 changes: 5 additions & 0 deletions client/platform/desktop/frontend/store/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import {
JobType,
RunPipeline,
RunTraining,
JsonMeta,
} from 'platform/desktop/constants';
import AsyncGpuJobQueue from './queues/asyncGpuJobQueue';
import AsyncCpuJobQueue from './queues/asyncCpuJobQueue';
import { setRecents } from './dataset';

interface DesktopJobHistory {
job: DesktopJob;
Expand Down Expand Up @@ -94,6 +96,9 @@ function init() {
...args, body: ['Job cancelled by user'], exitCode: cancelledJobExitCode, endTime: new Date(), cancelledJob: true,
});
});
ipcRenderer.on('filter-complete', (event, args: JsonMeta) => {
setRecents(args);
});
}

init();
Expand Down
Loading