Skip to content

Task Orchestration

Matt Debont edited this page Mar 21, 2019 · 9 revisions

Acquisition

State files

Inputs

  • currently-available-from-source.json
  • previously-available-from-source.json <= previous currently-available-from-source.json
  • previously-downloaded.json <= previous downloaded.json
  • currently-downloadable-products.json <= (available-from-source.json + previously-available-from-srouce.json) - previously-downloaded.json

Outputs

  • downloaded.json <= successful downloads
  • errored.json <= errored downloads (with error message)
  • download-retry-limit-reached.json <= errored downloads that have been retried > x times

Steps

  • Query opensearch api for latest-data from search window (now - 3 days)
  • Get previous run's currently-wanted => previously-wanted
  • Get previous run's downloaded => previously-downloaded
  • Get previous run's download-retry-limit-reached => download-retry-limit-reached
  • Combine these to get the currently-wanted list ((latest-data + previously-found) - previously-downloaded - download-retry-limit-reached)
  • For each product in currently-wanted attempt to download
    • On success
      • copy to medium term storage
      • add to downloaded
    • On failure
      • add to errored, if attempt > x move to download-retry-limit-reached

Processing

If we assume we have a medium term storage blob which is filled with raw data products, how do we process that into a derived or final product?

State Folders

  • current - All current job-specs, this includes tasks which have been run to completion already
  • processing - Tasks which have started to be processed but not yet completed
  • processed - Tasks which have been completed
  • errored - Tasks which have hit a max retry limit
  • published - Potentially not part of this orchestration to publish but we should consider how this works

Job Spec Creation

Steps

  • Get all existing raw products => available-list
  • Get all current job-specs and extract files in them => currently-processing
  • Figure out current to-process => available-list - currently-processing
  • Start to produce job-specs from to-process
    • S1 => single input => single job-spec
    • S2 => group swath into job-spec => completeness check? process after x time waited for completeness?

Job-Spec JSON representation

{
    "processId": "process-id-assigned",
    "processor": "jncc/s2-ard-processor@0.0.1 => the processor to be used during processing",
    "staticFiles": {
        "staticFiles": "A key pair list of static files that are required for this processing step",
        "dem": "/path/to/dem/in/long-term-storage.kea",
        "projwkt": "/path/to/projwkt/in/long-term-storage.kea"
    },
    "inputFiles": [
        "a-list-of-all-input-files",
        "x",
        "y",
        "z",
        "..."
    ],
    "attempts": "0 => the number of attempts at processing this job",
    "errors": [
        {
            "errorObj": "a description of the error that has occurred in a previous run"
        }
    ]
}

Job Spec Runner

If we assume the previous step is putting its job-specs into a current folder structure as defined above (each job-spec is given a unique name so it can't clash with out specs)

for spec in current:
    if spec in processed:
        # Do not progress job, we are already done
    else:
        if spec in errored:
            # Do not progress job, it has become errored and requires manual intervention
        else:        
            if spec in processing:
                if spec.attempts > processing_spec.attempts:
                    # Append job spec to actionable jobs => we have attempted before but not finished
                else:
                    # Do not progress job => already processing? how do we check this?
            else:
                # Append job spec to actionable jobs

For each actionable job we then need to set up the environment for running the process;

  • Copy job-spec in current to processing
    • Increment attempted count
  • Create machine
  • Move static files to relevant mounted /static folder
  • Move input files to relevant mounted /raw folder (symlink files if possible, recover them from tape storage, move them to folder if no other way)
  • Pull docker container [processor] to machine
  • Execute docker container with mounted files
    • On success
      • Remove input raw files from medium term storage
      • Move /output folder contents to long term storage
      • Move job-spec from current to processed
      • Remove job-spec from current
    • On failure
      • Update processing job-spec with error output
      • Copy changes to current
      • If attempted > x copy job-spec from current to errored

Publishing

There are likely to be several end locations for each product, so will tasks may split out more as appropriate;

  • For each job-spec in processed but not in published
  • Initiate publishing task(s) to move files to final resting locations

Clone this wiki locally