Skip to content

Commit b003752

Browse files
authored
Merge pull request #259912 from sethiaarun/hdinsight-on-aks-spark-airflow
added spark orchestration using the azure data managed airflow
2 parents 8e0dd24 + 941c342 commit b003752

9 files changed

+171
-4
lines changed

articles/hdinsight-aks/TOC.yml

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -247,14 +247,16 @@ items:
247247
href: ./spark/create-spark-cluster.md
248248
- name: How-to guides
249249
items:
250-
- name: Submit and manage jobs
251-
href: ./spark/submit-manage-jobs.md
250+
- name: Azure Data Factory Managed Airflow
251+
href: ./spark/spark-job-orchestration.md
252252
- name: Configuration management
253253
href: ./spark/configuration-management.md
254-
- name: Library management
255-
href: ./spark/library-management.md
256254
- name: Connect to One Lake Storage
257255
href: ./spark/connect-to-one-lake-storage.md
256+
- name: Library management
257+
href: ./spark/library-management.md
258+
- name: Submit and manage jobs
259+
href: ./spark/submit-manage-jobs.md
258260
- name: Use Delta Lake scenario in Azure HDInsight on AKS cluster
259261
href: ./spark/azure-hdinsight-spark-on-aks-delta-lake.md
260262
- name: Use ML Notebook on Spark
147 KB
Loading
36.3 KB
Loading
5.71 KB
Loading
26.7 KB
Loading
15.7 KB
Loading
184 KB
Loading
13.5 KB
Loading
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
---
2+
title: Azure Data Factory Managed Airflow with Apache Spark® on HDInsight on AKS
3+
description: Learn how to perform Apache Spark® job orchestration using Azure Data Factory Managed Airflow
4+
ms.service: hdinsight-aks
5+
ms.topic: how-to
6+
ms.date: 11/28/2023
7+
---
8+
9+
# Apache Spark® job orchestration using Azure Data Factory Managed Airflow
10+
11+
[!INCLUDE [feature-in-preview](../includes/feature-in-preview.md)]
12+
13+
This article covers managing a Spark job using [Apache Spark Livy API](https://livy.incubator.apache.org/docs/latest/rest-api.html) and orchestration data pipeline with Azure Data Factory Managed Airflow. [Azure Data Factory Managed Airflow](/azure/data-factory/concept-managed-airflow) service is a simple and efficient way to create and manage [Apache Airflow](https://airflow.apache.org/) environments, enabling you to run data pipelines at scale easily.
14+
15+
Apache Airflow is an open-source platform that programmatically creates, schedules, and monitors complex data workflows. It allows you to define a set of tasks, called operators that can be combined into directed acyclic graphs (DAGs) to represent data pipelines.
16+
17+
The following diagram shows the placement of Airflow, Key Vault, and HDInsight on AKS in Azure.
18+
19+
:::image type="content" source="./media/spark-job-orchestration/spark-airflow-azure.png" alt-text="Screenshot shows the placement of airflow, key vault, and HDInsight on AKS in Azure." lightbox="./media/spark-job-orchestration/spark-airflow-azure.png":::
20+
21+
Multiple Azure Service Principals are created based on the scope to limit the access it needs and to manage the client credential life cycle independently.
22+
23+
It is recommended to rotate access keys or secrets periodically (you can use various [design pattern’s](/azure/key-vault/secrets/tutorial-rotation-dual?tabs=azure-cli) to rotate secrets).
24+
25+
## Setup steps
26+
27+
1. [Setup Spark Cluster](create-spark-cluster.md)
28+
29+
1. Upload your Apache Spark Application jar to the storage account. It can be the primary storage account associated with the Spark cluster or any other storage account, where you should assign the "Storage Blob Data Owner" role to the user-assigned MSI used for the cluster in this storage account.
30+
31+
1. Azure Key Vault - You can follow [this tutorial to create a new Azure Key Vault](/azure/key-vault/general/quick-create-portal/) in case, if you don't have one.
32+
33+
1. Create [Microsoft Entra service principal](/cli/azure/ad/sp/) to access Key Vault – Grant permission to access Azure Key Vault with the “Key Vault Secrets Officer” role, and make a note of ‘appId’ ‘password’, and ‘tenant’ from the response. We need to use the same for Airflow to use Key Vault storage as backends for storing sensitive information.
34+
35+
```
36+
az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID>
37+
```
38+
39+
40+
1. Create Managed Airflow enable with [Azure Key Vault](/azure/data-factory/enable-azure-key-vault-for-managed-airflow) to store and manage your sensitive information in a secure and centralized manner. By doing this, you can use variables and connections, and they automatically be stored in Azure Key Vault. The name of connections and variables need to be prefixed by variables_prefix  defined in AIRFLOW__SECRETS__BACKEND_KWARGS. For example, If variables_prefix has a value as  hdinsight-aks-variables then for a variable key of hello, you would want to store your Variable at hdinsight-aks-variable -hello.
41+
42+
- Add the following settings for the Airflow configuration overrides in integrated runtime properties:
43+
44+
- AIRFLOW__SECRETS__BACKEND:
45+
`"airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"`
46+
47+
- AIRFLOW__SECRETS__BACKEND_KWARGS:
48+
`"{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”`
49+
50+
- Add the following setting for the Environment variables configuration in the Airflow integrated runtime properties:
51+
52+
- AZURE_CLIENT_ID = `<App Id from Create Azure AD Service Principal>`
53+
54+
- AZURE_TENANT_ID = `<Tenant from Create Azure AD Service Principal> `
55+
56+
- AZURE_CLIENT_SECRET = `<Password from Create Azure AD Service Principal> `
57+
58+
Add Airflow requirements - [apache-airflow-providers-microsoft-azure](https://airflow.apache.org/docs/apache-airflow-providers-microsoft-azure/stable/index.html)
59+
60+
:::image type="content" source="./media/spark-job-orchestration/airflow-configuration-environment-variable.png" alt-text="Screenshot shows airflow configuration and environment variables." lightbox="./media/spark-job-orchestration/airflow-configuration-environment-variable.png":::
61+
62+
63+
1. Create [Microsoft Entra service principal](/cli/azure/ad/sp/) to access HDInsight on AKS cluster Azure – [Grant access to HDInsight AKS Cluster](/azure/hdinsight-aks/hdinsight-on-aks-manage-authorization-profile#how-to-grant-access), make a note of appId, password, and tenant from the response.
64+
65+
`az ad sp create-for-rbac -n <sp name>`
66+
67+
1. Create the following secrets in your key vault with the value from the previous AD Service principal appId, password, and tenant, prefixed by property variables_prefix defined in AIRFLOW__SECRETS__BACKEND_KWARGS. The DAG code can access any of these variables without variables_prefix.
68+
69+
- hdinsight-aks-variables-api-client-id=`<App ID from previous step> `
70+
71+
- hdinsight-aks-variables-api-secret=`<Password from previous step> `
72+
73+
- hdinsight-aks-variables-tenant-id=`<Tenant from previous step> `
74+
75+
```python
76+
from airflow.models import Variable
77+
78+
def retrieve_variable_from_akv():
79+
80+
variable_value = Variable.get("client-id")
81+
82+
print(variable_value)
83+
```
84+
85+
86+
## DAG definition
87+
88+
A DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run.
89+
90+
There are three ways to declare a DAG:
91+
92+
- You can use a context manager, which adds the DAG to anything inside it implicitly
93+
94+
- You can use a standard constructor, passing the DAG into any operators you use
95+
96+
- You can use the @dag decorator to turn a function into a DAG generator (from airflow.decorators import dag)
97+
98+
DAGs are nothing without Tasks to run, and those are come in the form of either Operators, Sensors or TaskFlow.
99+
100+
You can read more details about DAGs, Control Flow, SubDAGs, TaskGroups, etc. directly from [Apache Airflow](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html). 
101+
102+
## DAG execution
103+
104+
Example code is available on the [git](https://github.com/sethiaarun/hdinsight-aks/blob/spark-airflow-example/spark/Airflow/airflow-python-example-code.py); download the code locally on your computer and upload the wordcount.py to a blob storage. Follow the [steps](/azure/data-factory/how-does-managed-airflow-work#steps-to-import) to import DAG into your Managed Airflow created during setup.
105+
106+
The airflow-python-example-code.py is an example of orchestrating a Spark job submission using Apache Spark with HDInsight on AKS. The example is based on [SparkPi](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala) example provided on Apache Spark.
107+
108+
The DAG has the following steps:
109+
110+
1. get `OAuth Token`
111+
112+
1. Invoke Apache Spark Livy Batch API to submit a new job
113+
114+
The DAG expects to have setup for the Service Principal, as described during the setup process for the OAuth Client credential and pass the following input configuration for the execution.
115+
116+
### Execution steps
117+
118+
1. Execute the DAG from the [Airflow UI](https://airflow.apache.org/docs/apache-airflow/stable/ui.html), you can open the Azure Data Factory Managed Airflow UI by clicking on Monitor icon.
119+
120+
:::image type="content" source="./media/spark-job-orchestration/airflow-user-interface-step-1.png" alt-text="Screenshot shows open the Azure data factory managed airflow UI by clicking on monitor icon." lightbox="./media/spark-job-orchestration/airflow-user-interface-step-1.png":::
121+
122+
1. Select the “SparkWordCountExample” DAG from the “DAGs” page.
123+
124+
:::image type="content" source="./media/spark-job-orchestration/airflow-user-interface-step-2.png" alt-text="Screenshot shows select the Spark word count example." lightbox="./media/spark-job-orchestration/airflow-user-interface-step-2.png":::
125+
126+
1. Click on the “execute” icon from the top right corner and select “Trigger DAG w/ config”.
127+
128+
:::image type="content" source="./media/spark-job-orchestration/airflow-user-interface-step-3.png" alt-text="Screenshot shows select execute icon." lightbox="./media/spark-job-orchestration/airflow-user-interface-step-3.png":::
129+
130+
131+
1. Pass required configuration JSON
132+
133+
```JSON
134+
{
135+
136+
"spark_cluster_fqdn":"<<domain name>>.eastus2.hdinsightaks.net",
137+
138+
"app_jar_path":"abfs://filesystem@<storageaccount>.dfs.core.windows.net",
139+
140+
"job_name":"<job_name>",
141+
142+
}
143+
```
144+
145+
1. Click on “Trigger” button, it starts the execution of the DAG.
146+
147+
1. You can visualize the status of DAG tasks from the DAG run
148+
149+
:::image type="content" source="./media/spark-job-orchestration/dag-task-status.png" alt-text="Screenshot shows dag task status." lightbox="./media/spark-job-orchestration/dag-task-status.png":::
150+
151+
1. Validate the job from “Apache Spark History Server”
152+
153+
:::image type="content" source="./media/spark-job-orchestration/validate-job-execution.png" alt-text="Screenshot shows validate job execution." lightbox="./media/spark-job-orchestration/validate-job-execution.png":::
154+
155+
## Example code
156+
157+
This is an example of orchestrating data pipeline using Airflow with HDInsight on AKS
158+
159+
The example is based on [SparkPi](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala) example provided on Apache Spark.
160+
161+
### Reference
162+
163+
- Refer to the [sample code](https://github.com/Azure-Samples/hdinsight-aks/blob/main/spark/Airflow/airflow-python-example-code.py).
164+
- [Apache Spark Website](https://spark.apache.org/)
165+
- Apache, Apache Airflow, Airflow, Apache Spark, Spark, and associated open source project names are [trademarks](../trademarks.md) of the [Apache Software Foundation](https://www.apache.org/) (ASF).

0 commit comments

Comments
 (0)