Skip to content

Commit fa648e5

Browse files
committed
Add Kubeflow and Airflow examples; Fix minor typos
1 parent 74d6e31 commit fa648e5

29 files changed

+1567
-70
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# Apache Airflow Examples
2+
This directory contains example [DAG](https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#dags) definitions that show how NetApp data management functions can be incorporated into automated workflows that are orchestrated using the [Apache Airflow](https://airflow.apache.org) framework.
3+
4+
## Getting Started
5+
6+
### Instructions for Use
7+
The Python files referenced in the [DAG Definitions](#dag-definitions) section contain Airflow DAG definitions. To utilize one of these example DAGs, define your parameters within the Python code as indicated in the comments and then upload the Python file to Airflow. The method of uploading the file to Airflow will depend on your specific Airflow deployment. Typically, Airflow is configured to automatically pull DAG definitions from a specific Git repo or persistent volume.
8+
9+
### Prerequisites
10+
11+
These DAGs require the following prerequisites in order to function correctly.
12+
13+
- Airflow must be deployed within a Kubernetes cluster. These example DAGs do not support Airflow deployments that are not Kubernetes-based.
14+
- Airflow must be configured to use the [Celery Executor](https://airflow.apache.org/docs/apache-airflow/stable/executor/celery.html). Although they may work with other executors, these DAGs have only been validated with the Celery Executor.
15+
- [Trident](https://netapp.io/persistent-storage-provisioner-for-kubernetes/), NetApp's dynamic storage orchestrator for Kubernetes, must be installed within the Kubernetes cluster.
16+
- A cluster role that has all of the required permissions for executing NetApp Data Science Toolkit for Kubernetes operations must be present in the Kubernetes cluster. For an example, see [cluster-role-ntap-dsutil.yaml](cluster-role-ntap-dsutil.yaml). This file contains the manifest for a Kubernetes ClusterRole named 'ntap-dsutil' that has all of the required permissions for executing toolkit operations within the cluster.
17+
- Your Airflow [Kubernetes Pod Operator](https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html#kubernetespodoperator) service account must be bound to the the previously mentioned cluster role within the namespace that you intend to execute the DAGs in. Note that the default Airflow Kubernetes Pod Operator service account is 'default'. For an example, see [role-binding-airflow-ntap-dsutil.yaml](role-binding-airflow-ntap-dsutil.yaml). This file contains the manifest for a Kubernetes RoleBinding named 'airflow-ntap-dsutil' that will bind the 'default' ServiceAccount to the 'ntap-dsutil' cluster role within the 'airflow' namespace.
18+
19+
Some of the DAGs have additional prerequisites, which are noted under the specific DAG definitions below.
20+
21+
<a name="dag-definitions"></a>
22+
23+
## DAG Definitions
24+
25+
### [ai-training-run.py](ai-training-run.py)
26+
27+
#### Additional Prerequisites
28+
29+
In addition to the standard prerequisites outlined above, this DAG requires the following additional prerequisites in order to function correctly.
30+
31+
- Volume snapshots must be enabled within the Kubernetes cluster. Refer to the [Trident documentation](https://netapp-trident.readthedocs.io/en/latest/kubernetes/operations/tasks/volumes/snapshots.html) for more information on volume snapshots.
32+
33+
#### Description
34+
DAG definition for an AI/ML training run with built-in, near-instantaneous, dataset and model versioning. This is intended to demonstrate how a data scientist could define an automated AI/ML workflow that incorporates automated dataset and model versioning, and dataset-to-model traceability.
35+
36+
#### Workflow Steps
37+
1. Optional: Execute a data prep step.
38+
2. Create a Snapshot copy, using NetApp Snapshot technology, of the dataset volume. This Snapshot copy is created for traceability purposes. Each time that this pipeline workflow is executed, a Snapshot copy is created. Therefore, as long as the Snapshot copy is not deleted, it is always possible to trace a specific training run back to the exact training dataset that was used for that run.
39+
3. Execute a training step.
40+
4. Create a Snapshot copy, using NetApp Snapshot technology, of the trained model volume. This Snapshot copy is created for versioning purposes. Each time that this pipeline workflow is executed, a Snapshot copy is created. Therefore, for each individual training run, a read-only versioned copy of the resulting trained model is automatically saved.
41+
5. Execute a validation step.
42+
43+
### [clone-volume.py](clone-volume.py)
44+
45+
#### Description
46+
DAG definition for a workflow that can be used to near-instantaneously and efficiently clone any Trident-managed volume within the Kubernetes cluster, regardless of size. This is intended to demonstrate how a data scientist or data engineer could define an automated workflow that incorporates the rapid cloning of datasets and/or models for use in workspaces, etc.
47+
48+
#### Workflow Steps
49+
1. Create a clone, using NetApp FlexClone technology, of the source volume.
50+
51+
### [replicate-data-cloud-sync.py](replicate-data-cloud-sync.py)
52+
53+
#### Additional Prerequisites
54+
55+
In addition to the standard prerequisites outlined above, this DAG requires the following additional prerequisites in order to function correctly.
56+
57+
- An Airflow connection of type "http" containing your Cloud Sync API refresh token must exist within the Airflow connections database. This connection can be created via the Airflow UI dashboard by navigating to 'Admin' -> 'Connections' using the main menu. When creating this connection, enter your Cloud Sync API refresh token into the 'Password' field.
58+
59+
#### Description
60+
DAG definition for a workflow that can be used to perform a sync operation for an existing [Cloud Sync](https://cloudsync.netapp.com) relationship. This is intended to demonstrate how a data scientist or data engineer could define an automated AI/ML workflow that incorporates Cloud Sync for data movement between platforms (e.g. NFS, S3) and/or across environments (e.g. edge data center, core data center, private cloud, public cloud).
61+
62+
#### Workflow Steps
63+
1. Perform a sync operation for the specified Cloud Sync relationship.
64+
65+
> Tip: If you do not know the Cloud Sync relationship ID for a specific relationship, you can retrieve it by using NetApp Data Science Toolkit for Traditional Environments (refer to the 'list all Cloud Sync relationships' operation).
66+
67+
### [replicate-data-snapmirror.py](replicate-data-snapmirror.py)
68+
69+
#### Compatiibility
70+
71+
This DAG is only compatible with ONTAP storage systems/instances runnning ONTAP 9.7 or above.
72+
73+
#### Additional Prerequisites
74+
75+
In addition to the standard prerequisites outlined above, this DAG requires the following additional prerequisites in order to function correctly.
76+
77+
- An Airflow connection of type "http" containing your ONTAP cluster or SVM admin account details must exist within the Airflow connections database. This connection can be created via the Airflow UI dashboard by navigating to 'Admin' -> 'Connections' using the main menu. When creating this connection, enter your ONTAP cluster or SVM management LIF into the 'Host' field, your ONTAP cluster/SVM admin username into the 'Login' field, and your ONTAP cluster/SVM admin password into the 'Password' field.
78+
79+
#### Description
80+
DAG definition for a workflow that can be used to perform a sync operation for an existing asynchronous SnapMirror relationship. This is intended to demonstrate how a data scientist or data engineer could define an automated AI/ML workflow that incorporates SnapMirror replication for data movement across environments (e.g. edge data center, core data center, private cloud, public cloud).
81+
82+
#### Pipeline Steps
83+
1. Perform a sync operation for the specified asynchronous SnapMirror relationship.
84+
85+
> Tip: If you do not know the SnapMirror relationship UUID for a specific relationship, you can retrieve it by using NetApp Data Science Toolkit for Traditional Environments (refer to the 'list all SnapMirror relationships' operation).
86+
87+
### [replicate-data-xcp.py](replicate-data-xcp.py)
88+
89+
#### Additional Prerequisites
90+
91+
In addition to the standard prerequisites outlined above, this DAG requires the following additional prerequisites in order to function correctly.
92+
93+
- An Airflow connection of type "SSH", containing SSH access details for a Linux host on which NetApp XCP is installed and configured, must exist within the Airflow connections database. This connection can be created via the Airflow UI dashboard by navigating to 'Admin' -> 'Connections' using the main menu.
94+
95+
#### Description
96+
DAG definition for a workflow that that invokes NetApp XCP to quickly and reliably replicate data between NFS endpoints. Potential use cases include the following:
97+
- Replicating newly acquired sensor data gathered at the edge back to the core data center or to the cloud to be used for AI/ML model training or retraining.
98+
- Replicating a newly trained or newly updated model from the core data center to the edge or to the cloud to be deployed as part of an inferencing application.
99+
- Copying data from a Hadoop data lake (through Hadoop NFS Gateway) to a high-performance AI/ML training environment for use in the training of an AI/ML model.
100+
- Copying NFS-accessible data from a legacy or non-NetApp system of record to a high-performance AI/ML training environment for use in the training of an AI/ML model.
101+
102+
#### Workflow Steps
103+
1. Invoke an XCP copy or sync operation.
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
# Airflow DAG Definition: AI Training Run
2+
#
3+
# Steps:
4+
# 1. Data prep job
5+
# 2. Dataset snapshot (for traceability)
6+
# 3. Training job
7+
# 4. Model snapshot (for versioning/baselining)
8+
# 5. Inference validation job
9+
10+
11+
from airflow import DAG
12+
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
13+
from airflow.operators.python_operator import PythonOperator
14+
from airflow.utils.dates import days_ago
15+
from kubernetes.client import models as k8s
16+
import uuid
17+
18+
19+
##### DEFINE PARAMETERS: Modify parameter values in this section to match your environment #####
20+
21+
## Define default args for DAG
22+
ai_training_run_dag_default_args = {
23+
'owner': 'NetApp'
24+
}
25+
26+
## Define DAG details
27+
ai_training_run_dag = DAG(
28+
dag_id='ai_training_run',
29+
default_args=ai_training_run_dag_default_args,
30+
schedule_interval=None,
31+
start_date=days_ago(2),
32+
tags=['training']
33+
)
34+
35+
# Define Kubernetes namespace to execute DAG in
36+
namespace = 'airflow'
37+
38+
## Define volume details (change values as necessary to match your environment)
39+
40+
# Dataset volume
41+
dataset_volume_pvc_existing = 'dataset-vol'
42+
dataset_volume = k8s.V1Volume(
43+
name=dataset_volume_pvc_existing,
44+
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name=dataset_volume_pvc_existing),
45+
)
46+
dataset_volume_mount = k8s.V1VolumeMount(
47+
name=dataset_volume_pvc_existing,
48+
mount_path='/mnt/dataset',
49+
sub_path=None,
50+
read_only=False
51+
)
52+
53+
# Model volume
54+
model_volume_pvc_existing = 'airflow-model-vol'
55+
model_volume = k8s.V1Volume(
56+
name=model_volume_pvc_existing,
57+
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name=model_volume_pvc_existing),
58+
)
59+
model_volume_mount = k8s.V1VolumeMount(
60+
name=model_volume_pvc_existing,
61+
mount_path='/mnt/model',
62+
sub_path=None,
63+
read_only=False
64+
)
65+
66+
## Define job details (change values as needed)
67+
68+
# Data prep step
69+
data_prep_step_container_image = "nvcr.io/nvidia/tensorflow:21.03-tf1-py3"
70+
data_prep_step_command = ["echo", "'No data prep command entered'"] # Replace this echo command with the data prep command that you wish to execute
71+
data_prep_step_resources = {} # Hint: To request that 1 GPU be allocated to job pod, change to: {'limit_gpu': 1}
72+
73+
# Training step
74+
train_step_container_image = "nvcr.io/nvidia/tensorflow:21.03-tf1-py3"
75+
train_step_command = ["echo", "'No training command entered'"] # Replace this echo command with the training command that you wish to execute
76+
train_step_resources = {} # Hint: To request that 1 GPU be allocated to job pod, change to: {'limit_gpu': 1}
77+
78+
# Inference validation step
79+
validate_step_container_image = "nvcr.io/nvidia/tensorflow:21.03-tf1-py3"
80+
validate_step_command = ["echo", "'No inference validation command entered'"] # Replace this echo command with the inference validation command that you wish to execute
81+
validate_step_resources = {} # Hint: To request that 1 GPU be allocated to job pod, change to: {'limit_gpu': 1}
82+
83+
################################################################################################
84+
85+
86+
# Define DAG steps/workflow
87+
with ai_training_run_dag as dag :
88+
89+
# Define step to generate uuid for run
90+
generate_uuid = PythonOperator(
91+
task_id='generate-uuid',
92+
python_callable=lambda: str(uuid.uuid4())
93+
)
94+
95+
# Define data prep step using Kubernetes Pod operator (https://airflow.apache.org/docs/stable/kubernetes.html#kubernetespodoperator)
96+
data_prep = KubernetesPodOperator(
97+
namespace=namespace,
98+
image=data_prep_step_container_image,
99+
cmds=data_prep_step_command,
100+
resources = data_prep_step_resources,
101+
volumes=[dataset_volume, model_volume],
102+
volume_mounts=[dataset_volume_mount, model_volume_mount],
103+
name="ai-training-run-data-prep",
104+
task_id="data-prep",
105+
is_delete_operator_pod=True,
106+
hostnetwork=False
107+
)
108+
109+
# Define step to take a snapshot of the dataset volume for traceability
110+
dataset_snapshot = KubernetesPodOperator(
111+
namespace=namespace,
112+
image="python:3",
113+
cmds=["/bin/bash", "-c"],
114+
arguments=["\
115+
python3 -m pip install ipython kubernetes pandas tabulate && \
116+
git clone https://github.com/NetApp/netapp-data-science-toolkit && \
117+
mv /netapp-data-science-toolkit/Kubernetes/ntap_dsutil_k8s.py / && \
118+
/ntap_dsutil_k8s.py create volume-snapshot --pvc-name=" + str(dataset_volume_pvc_existing) + " --snapshot-name=dataset-{{ task_instance.xcom_pull(task_ids='generate-uuid', dag_id='ai_training_run', key='return_value') }} --namespace=" + namespace],
119+
name="ai-training-run-dataset-snapshot",
120+
task_id="dataset-snapshot",
121+
is_delete_operator_pod=True,
122+
hostnetwork=False
123+
)
124+
125+
# State that the dataset snapshot should be created after the data prep job completes and the uuid job completes
126+
data_prep >> dataset_snapshot
127+
generate_uuid >> dataset_snapshot
128+
129+
# Define training step using Kubernetes Pod operator (https://airflow.apache.org/docs/stable/kubernetes.html#kubernetespodoperator)
130+
train = KubernetesPodOperator(
131+
namespace=namespace,
132+
image=train_step_container_image,
133+
cmds=train_step_command,
134+
resources = train_step_resources,
135+
volumes=[dataset_volume, model_volume],
136+
volume_mounts=[dataset_volume_mount, model_volume_mount],
137+
name="ai-training-run-train",
138+
task_id="train",
139+
is_delete_operator_pod=True,
140+
hostnetwork=False
141+
)
142+
143+
# State that training job should be executed after dataset volume snapshot is taken
144+
dataset_snapshot >> train
145+
146+
# Define step to take a snapshot of the model volume for versioning/baselining
147+
model_snapshot = KubernetesPodOperator(
148+
namespace=namespace,
149+
image="python:3",
150+
cmds=["/bin/bash", "-c"],
151+
arguments=["\
152+
python3 -m pip install ipython kubernetes pandas tabulate && \
153+
git clone https://github.com/NetApp/netapp-data-science-toolkit && \
154+
mv /netapp-data-science-toolkit/Kubernetes/ntap_dsutil_k8s.py / && \
155+
/ntap_dsutil_k8s.py create volume-snapshot --pvc-name=" + str(model_volume_pvc_existing) + " --snapshot-name=model-{{ task_instance.xcom_pull(task_ids='generate-uuid', dag_id='ai_training_run', key='return_value') }} --namespace=" + namespace],
156+
name="ai-training-run-model-snapshot",
157+
task_id="model-snapshot",
158+
is_delete_operator_pod=True,
159+
hostnetwork=False
160+
)
161+
162+
# State that the model snapshot should be created after the training job completes
163+
train >> model_snapshot
164+
165+
# Define inference validation step using Kubernetes Pod operator (https://airflow.apache.org/docs/stable/kubernetes.html#kubernetespodoperator)
166+
validate = KubernetesPodOperator(
167+
namespace=namespace,
168+
image=validate_step_container_image,
169+
cmds=validate_step_command,
170+
resources = validate_step_resources,
171+
volumes=[dataset_volume, model_volume],
172+
volume_mounts=[dataset_volume_mount, model_volume_mount],
173+
name="ai-training-run-validate",
174+
task_id="validate",
175+
is_delete_operator_pod=True,
176+
hostnetwork=False
177+
)
178+
179+
# State that inference validation job should be executed after model volume snapshot is taken
180+
model_snapshot >> validate
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# Airflow DAG Definition: Clone Volume
2+
#
3+
# Steps:
4+
# 1. Clone source volume
5+
6+
7+
from airflow import DAG
8+
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
9+
from airflow.utils.dates import days_ago
10+
11+
12+
##### DEFINE PARAMETERS: Modify parameter values in this section to match your environment #####
13+
14+
## Define default args for DAG
15+
clone_volume_dag_default_args = {
16+
'owner': 'NetApp'
17+
}
18+
19+
## Define DAG details
20+
clone_volume_dag = DAG(
21+
dag_id='clone_volume',
22+
default_args=clone_volume_dag_default_args,
23+
schedule_interval=None,
24+
start_date=days_ago(2),
25+
tags=['vol-clone']
26+
)
27+
28+
# Define Kubernetes namespace to execute DAG in (volume must be located in same namespace)
29+
namespace = 'airflow'
30+
31+
## Define volume details (change values as necessary to match your environment)
32+
source_volume_pvc_name = "gold-datavol"
33+
new_volume_pvc_name = "datavol-clone-2"
34+
clone_from_snapshot = True
35+
source_volume_snapshot_name = "snap1"
36+
37+
################################################################################################
38+
39+
40+
# Construct command args
41+
arg = "\
42+
python3 -m pip install ipython kubernetes pandas tabulate && \
43+
git clone https://github.com/NetApp/netapp-data-science-toolkit && \
44+
mv /netapp-data-science-toolkit/Kubernetes/ntap_dsutil_k8s.py / && \
45+
/ntap_dsutil_k8s.py clone volume --namespace=" + str(namespace) + " --new-pvc-name=" + str(new_volume_pvc_name)
46+
if clone_from_snapshot :
47+
arg += " --source-snapshot-name=" + str(source_volume_snapshot_name)
48+
else :
49+
arg += " --source-pvc-name=" + str(source_volume_pvc_name)
50+
51+
52+
# Define DAG steps/workflow
53+
with clone_volume_dag as dag :
54+
55+
# Define step to clone source volume
56+
clone_volume = KubernetesPodOperator(
57+
namespace=namespace,
58+
image="python:3",
59+
cmds=["/bin/bash", "-c"],
60+
arguments=[arg],
61+
name="clone-volume-clone-volume",
62+
task_id="clone-volume",
63+
is_delete_operator_pod=True,
64+
hostnetwork=False
65+
)
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
---
2+
kind: ClusterRole
3+
apiVersion: rbac.authorization.k8s.io/v1
4+
metadata:
5+
name: ntap-dsutil
6+
rules:
7+
- apiGroups: [""]
8+
resources: ["persistentvolumeclaims", "persistentvolumeclaims/status", "services"]
9+
verbs: ["get", "list", "create", "delete"]
10+
- apiGroups: ["snapshot.storage.k8s.io"]
11+
resources: ["volumesnapshots", "volumesnapshots/status", "volumesnapshotcontents", "volumesnapshotcontents/status"]
12+
verbs: ["get", "list", "create", "delete"]
13+
- apiGroups: ["apps", "extensions"]
14+
resources: ["deployments", "deployments/scale", "deployments/status"]
15+
verbs: ["get", "list", "create", "delete", "patch", "update"]
16+
- apiGroups: [""]
17+
resources: ["nodes"]
18+
verbs: ["get", "list"]

0 commit comments

Comments
 (0)