Skip to content

Duplicated MLFlow runs when submitting argo-kedro jobs with kedro-mlflow #29

@alexeistepa

Description

@alexeistepa

Currently, the kedro-argo package is effectively incompatible with kedro-mlflow due to the fact that each argo node initiates a new MLFlow run with the same name but different run ID.

One way to get around this is to make the below modification to the argo_kedro/templates/argo_wf_spec.tmpl, which:

  • Introduces a new argo node init-mlflow-run, which initiates a new MLFlow run
  • Passes on the run ID to all subsequent argo nodes to consume.
    We also add the following lines to conf/cloud/mlflow.yml
tracking:
  experiment:
    name: gnn
  run:
    id: ${oc.env:MLFLOW_RUN_ID, null}
    name: ${oc.env:WORKFLOW_ID, dummy}

Things to figure out:

  • When not running kedro-mlflow, we do not ant this argo node to be included
  • How do we correctly set the experiment name? Let's ensure that it is correctly passed on from the mlflow.yml file, as below the experiment name "gnn" is hardcoded.

Modifications to the argo_kedro/templates/argo_wf_spec.tmpl file:

{# <project_root>/templates/argo_spec.tmpl #}
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: {{ workflow_name }}-
  namespace: {{ namespace }}
spec:
  workflowMetadata:
    labels:
      plugin: argo-kedro 
  entrypoint: "pipeline"
  templates:
  - name: kedro
    metadata:
      labels:
        app: argo-kedro
    inputs:
      parameters:
      - name: pipeline
      - name: kedro_nodes
      - name: mem
      - name: cpu
      - name: num_gpu
      - name: mlflow_run_id
    podSpecPatch: |
      containers:
        - name: main
          resources:
            requests:
              memory: {% raw %} "{{inputs.parameters.mem}}Gi"
              {% endraw %}
              cpu: {% raw %} "{{inputs.parameters.cpu}}"
              {% endraw %}
              nvidia.com/gpu: {% raw %} "{{inputs.parameters.num_gpu}}"
              {% endraw %}
            limits:
              memory: {% raw %} "{{inputs.parameters.mem}}Gi"
              {% endraw %}
              cpu: {% raw %} "{{inputs.parameters.cpu}}"
              {% endraw %}
              nvidia.com/gpu: {% raw %} "{{inputs.parameters.num_gpu}}"
              {% endraw %}
    container:
      image: {{ image }}
      command: ["kedro"]
      imagePullPolicy: Always
      env:
        - name: WORKFLOW_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.labels['workflows.argoproj.io/workflow']
        - name: MLFLOW_RUN_ID
          value: "{{ '{{inputs.parameters.mlflow_run_id}}' }}"
      {% for env in template.environment %}
        - name: {{ env.name }}
          valueFrom:
            secretKeyRef:
              name: {{ env.secret_ref.name }}
              key: {{ env.secret_ref.key }}
      {% endfor %}
      args:
      - "run"
      - "--pipeline"
      - "{{ '{{inputs.parameters.pipeline}}' }}"
      - "--nodes"
      - "{{ '{{inputs.parameters.kedro_nodes}}' }}"
      - "--env"
      - "{{ environment }}"

  - name: init-mlflow-run
    metadata:
      labels:
        app: argo-kedro
    container:
      image: {{ image }}
      command: ["python", "-c"]
      imagePullPolicy: Always
      env:
        - name: WORKFLOW_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.labels['workflows.argoproj.io/workflow']
      {% for env in template.environment %}
        - name: {{ env.name }}
          valueFrom:
            secretKeyRef:
              name: {{ env.secret_ref.name }}
              key: {{ env.secret_ref.key }}
      {% endfor %}
      args:
      - |
        import os
        import mlflow
        mlflow.set_tracking_uri(os.environ["MLFLOW_TRACKING_URI"])
        mlflow.set_experiment(os.getenv("MLFLOW_EXPERIMENT_NAME", "gnn"))
        run = mlflow.start_run(run_name=os.getenv("WORKFLOW_ID", "dummy"))
        with open("/tmp/mlflow_run_id", "w", encoding="utf-8") as f:
            f.write(run.info.run_id)
    outputs:
      parameters:
      - name: mlflow_run_id
        valueFrom:
          path: /tmp/mlflow_run_id

  - name: pipeline
    dag:
      tasks:
      - name: init-mlflow-run
        template: init-mlflow-run
      {% for task in pipeline_tasks %}
      - name: {{ task.name }}
        template: {{ task.get('template', 'kedro') }}
        dependencies:
          - init-mlflow-run
        {% if task.deps %}
        {% for dep in task.deps %}
          - {{ dep }}
        {% endfor %}
        {% endif %}
        {% if task.num_gpu > 0 %}
        podSpecPatch: |
          tolerations:
            - key: "nvidia.com/gpu"
              value: "present"
              effect: "NoSchedule"
        {% endif %}
        arguments:
          parameters:
          - name: pipeline
            value: {{ pipeline_name }}
          - name: kedro_nodes
            value: {{ task.nodes }}
          - name: num_gpu
            value: {{ task.num_gpu }}
          - name: mem
            value: {{ task.mem }}
          - name: cpu
            value: {{ task.cpu }}
          - name: mlflow_run_id
            value: "{% raw %}{{tasks.init-mlflow-run.outputs.parameters.mlflow_run_id}}{% endraw %}"
      {% endfor %}

I have successfully tested this. This is how it looks like in the Argo UI:
Image

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions