Skip to content

Commit 0d170e0

Browse files
authored
Merge pull request #295734 from Rodrigossz/main
New authentication
2 parents 392194d + 0bc46fd commit 0d170e0

File tree

1 file changed

+116
-18
lines changed

1 file changed

+116
-18
lines changed

articles/synapse-analytics/synapse-link/how-to-query-analytical-store-spark-3.md

Lines changed: 116 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,49 +5,147 @@ author: Rodrigossz
55
ms.service: azure-synapse-analytics
66
ms.topic: quickstart
77
ms.subservice: synapse-link
8-
ms.date: 02/15/2022
8+
ms.date: 03/04/2025
99
ms.author: rosouz
1010
ms.custom: cosmos-db, mode-other
1111
---
1212

1313
# Interact with Azure Cosmos DB using Apache Spark 3 in Azure Synapse Link
1414

15-
In this article, you'll learn how to interact with Azure Cosmos DB using Synapse Apache Spark 3. With its full support for Scala, Python, SparkSQL, and C#, Synapse Apache Spark 3 is central to analytics, data engineering, data science, and data exploration scenarios in [Azure Synapse Link for Azure Cosmos DB](/azure/cosmos-db/synapse-link).
15+
In this article, you learn how to interact with Azure Cosmos DB using Synapse Apache Spark 3. Customers can use Scala, Python, SparkSQL, and C#, for analytics, data engineering, data science, and data exploration scenarios in [Azure Synapse Link for Azure Cosmos DB](/azure/cosmos-db/synapse-link).
1616

1717
The following capabilities are supported while interacting with Azure Cosmos DB:
1818
* Synapse Apache Spark 3 allows you to analyze data in your Azure Cosmos DB containers that are enabled with Azure Synapse Link in near real-time without impacting the performance of your transactional workloads. The following two options are available to query the Azure Cosmos DB [analytical store](/azure/cosmos-db/analytical-store-introduction) from Spark:
1919
+ Load to Spark DataFrame
2020
+ Create Spark table
2121
* Synapse Apache Spark also allows you to ingest data into Azure Cosmos DB. It is important to note that data is always ingested into Azure Cosmos DB containers through the transactional store. When Synapse Link is enabled, any new inserts, updates, and deletes are then automatically synced to the analytical store.
22-
* Synapse Apache Spark also supports Spark structured streaming with Azure Cosmos DB as a source as well as a sink.
22+
* Synapse Apache Spark also supports Spark structured streaming with Azure Cosmos DB as a source and a sink.
2323

24-
The following sections walk you through the syntax of above capabilities. You can also checkout the Learn module on how to [Query Azure Cosmos DB with Apache Spark for Azure Synapse Analytics](/training/modules/query-azure-cosmos-db-with-apache-spark-for-azure-synapse-analytics/). Gestures in Azure Synapse Analytics workspace are designed to provide an easy out-of-the-box experience to get started. Gestures are visible when you right-click on an Azure Cosmos DB container in the **Data** tab of the Synapse workspace. With gestures, you can quickly generate code and tailor it to your needs. Gestures are also perfect for discovering data with a single click.
24+
The following sections walk you through the syntax. You can also checkout the Learn module on how to [Query Azure Cosmos DB with Apache Spark for Azure Synapse Analytics](/training/modules/query-azure-cosmos-db-with-apache-spark-for-azure-synapse-analytics/). Gestures in Azure Synapse Analytics workspace are designed to provide an easy out-of-the-box experience to get started. Gestures are visible when you right-click on an Azure Cosmos DB container in the **Data** tab of the Synapse workspace. With gestures, you can quickly generate code and tailor it to your needs. Gestures are also perfect for discovering data with a single click.
2525

2626
> [!IMPORTANT]
2727
> You should be aware of some constraints in the analytical schema that could lead to the unexpected behavior in data loading operations.
28-
> As an example, only first 1000 properties from transactional schema are available in the analytical schema, properties with spaces are not available, etc. If you are experiencing some unexpected results, check the [analytical store schema constraints](/azure/cosmos-db/analytical-store-introduction#schema-constraints) for more details.
28+
> As an example, only first 1,000 properties from transactional schema are available in the analytical schema, properties with spaces are not available, etc. If you are experiencing some unexpected results, check the [analytical store schema constraints](/azure/cosmos-db/analytical-store-introduction#schema-constraints) for more details.
2929
3030
## Query Azure Cosmos DB analytical store
3131

32-
Before you learn about the two possible options to query Azure Cosmos DB analytical store, loading to Spark DataFrame and creating Spark table, it is worth exploring the differences in experience so you can choose the option that works for your needs.
32+
Customers can load analytical store data to Spark DataFrames or create Spark tables.
3333

34-
The difference in experience is around whether underlying data changes in the Azure Cosmos DB container should be automatically reflected in the analysis performed in Spark. When either a Spark DataFrame is registered or a Spark table is created against a container's analytical store, metadata around the current snapshot of data in the analytical store is fetched to Spark for efficient pushdown of subsequent analysis. It is important to note that since Spark follows a lazy evaluation policy, unless an action is invoked on the Spark DataFrame or a SparkSQL query is executed against the Spark table, actual data is not fetched from the underlying container's analytical store.
34+
The difference in experience is around whether underlying data changes in the Azure Cosmos DB container should be automatically reflected in the analysis performed in Spark. When Spark DataFrames are registered, or a Spark table is created, Spark fetches analytical store metadata for efficient pushdown. It is important to note that since Spark follows a lazy evaluation policy. You need to take action to fecth the last snapshot of the data in Spark DataFrames or SparkSQL queries.
3535

3636
In the case of **loading to Spark DataFrame**, the fetched metadata is cached through the lifetime of the Spark session and hence subsequent actions invoked on the DataFrame are evaluated against the snapshot of the analytical store at the time of DataFrame creation.
3737

3838
On the other hand, in the case of **creating a Spark table**, the metadata of the analytical store state is not cached in Spark and is reloaded on every SparkSQL query execution against the Spark table.
3939

40-
Thus, you can choose between loading to Spark DataFrame and creating a Spark table based on whether you want your Spark analysis to be evaluated against a fixed snapshot of the analytical store or against the latest snapshot of the analytical store respectively.
40+
To conclude, you can choose between loading a snapshot to Spark DataFrame or querying a Spark table for the latest snapshot.
4141

4242
> [!NOTE]
4343
> To query Azure Cosmos DB for MongoDB accounts, learn more about the [full fidelity schema representation](/azure/cosmos-db/analytical-store-introduction#analytical-schema) in the analytical store and the extended property names to be used.
4444
4545
> [!NOTE]
46-
> Please note that all `options` in the commands below are case sensitive.
46+
> All `options` are case sensitive.
47+
48+
## Authentication
49+
50+
Now Spark 3.x customers can authenticate to Azure Cosmos DB analytical store using trusted identities access tokens or database account keys. Tokens are more secure as they are short lived, and assigned to the required permission using Cosmos DB RBAC.
51+
52+
The connector now supports two auth types, `MasterKey` and `AccessToken` for the `spark.cosmos.auth.type` property.
53+
54+
### Master key authentication
55+
56+
Use the key to read a dataframe using spark:
57+
58+
```scala
59+
val config = Map(
60+
"spark.cosmos.accountEndpoint" -> "<endpoint>",
61+
"spark.cosmos.accountKey" -> "<key>",
62+
"spark.cosmos.database" -> "<db>",
63+
"spark.cosmos.container" -> "<container>"
64+
)
65+
66+
val df = spark.read.format("cosmos.olap").options(config).load()
67+
df.show(10)
68+
```
69+
70+
### Access token authentication
71+
72+
The new keyless authentication introduces support for access tokens:
73+
74+
```scala
75+
val config = Map(
76+
"spark.cosmos.accountEndpoint" -> "<endpoint>",
77+
"spark.cosmos.auth.type" -> "AccessToken",
78+
"spark.cosmos.auth.accessToken" -> "<accessToken>",
79+
"spark.cosmos.database" -> "<db>",
80+
"spark.cosmos.container" -> "<container>"
81+
)
82+
83+
val df = spark.read.format("cosmos.olap").options(config).load()
84+
df.show(10)
85+
```
86+
87+
#### Access token authentication requires role assignment
88+
89+
To use the access token approach, you need to generate access tokens. Since access tokens are associated with Azure identities, correct role-based access control (RBAC) must be assigned to the identity. The role assignment is on data plane level, and you must have minimum control plane permissions to perform the role assignment.
90+
91+
The Identity Access Management (IAM) role assignments from Azure portal are on control plane level and don't affect the role assignments on data plane. Data plane role assignments are only available via Azure CLI. The `readAnalytics` action is required to read data from analytical store in Cosmos DB and is not part of any predefined roles. As such we must create a custom role definition. In addition to the `readAnalytics` action, also add the actions required for Data Reader. Create a JSON file with the following content and name it role_definition.json
92+
93+
```JSON
94+
{
95+
"RoleName": "CosmosAnalyticsRole",
96+
"Type": "CustomRole",
97+
"AssignableScopes": ["/"],
98+
"Permissions": [{
99+
"DataActions": [
100+
"Microsoft.DocumentDB/databaseAccounts/readAnalytics",
101+
"Microsoft.DocumentDB/databaseAccounts/readMetadata",
102+
"Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read",
103+
"Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/executeQuery",
104+
"Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed"
105+
]
106+
}]
107+
}
108+
```
109+
110+
#### Access Token authentication requires Azure CLI
111+
112+
- Log into Azure CLI: `az login`
113+
- Set the default subscription which has your Cosmos DB account: `az account set --subscription <name or id>`
114+
- Create the role definition in the desired Cosmos DB account: `az cosmosdb sql role definition create --account-name <cosmos-account-name> --resource-group <resource-group-name> --body @role_definition.json`
115+
- Copy over the role `definition id` returned: `/subscriptions/<subscription-id>/resourceGroups/<resource-group-name>/providers/Microsoft.DocumentDB/databaseAccounts/< cosmos-account-name >/sqlRoleDefinitions/<a-random-generated-guid>`
116+
- Get the principal ID of the identity that you want to assign the role to. The identity could be an Azure app registration, a virtual machine, or any other supported Azure resource. Assign the role to the principal using: `az cosmosdb sql role assignment create --account-name "<cosmos-account-name>" --resource-group "<resource-group>" --scope "/" --principal-id "<principal-id-of-identity>" --role-definition-id "<role-definition-id-from-previous-step>"`
117+
118+
> [!Note]
119+
> When using an Azure app registration, Use the `Object Id` as the service principal ID. Also, the principal ID and the Cosmos DB account must be in the same tenant.
120+
121+
122+
#### Generating the access token - Synapse Notebooks
123+
124+
The recommended method for Synapse Notebooks is to use service principal with a certificate to generate access tokens. Click [here](../spark/apache-spark-secure-credentials-with-tokenlibrary.md) for more information.
125+
126+
```scala
127+
The following code snippet has been validated to work in a Synapse notebook
128+
val tenantId = "<azure-tenant-id>"
129+
val clientId = "<client-id-of-service-principal>"
130+
val kvLinkedService = "<azure-key-vault-linked-service>"
131+
val certName = "<certificate-name>"
132+
val token = mssparkutils.credentials.getSPTokenWithCertLS(
133+
"https://<cosmos-account-name>.documents.azure.com/.default",
134+
"https://login.microsoftonline.com/" + tenantId, clientId, kvLinkedService, certName)
135+
```
136+
137+
Now you can use the access token generated in this step to read data from analytical store when auth type is set to access token.
138+
139+
> [!Note]
140+
> When using an Azure app registration, use the application (Client Id).
141+
142+
> [!Note]
143+
> Currently, Synapse doesn’t support generating access tokens using the azure-identity package in notebooks. Furthermore, synapse VHDs don’t include azure-identity package and its dependencies. Click [here](../synapse-service-identity.md) for more information.
144+
47145

48146
### Load to Spark DataFrame
49147

50-
In this example, you'll create a Spark DataFrame that points to the Azure Cosmos DB analytical store. You can then perform additional analysis by invoking Spark actions against the DataFrame. This operation doesn't impact the transactional store.
148+
In this example, you create a Spark DataFrame that points to the Azure Cosmos DB analytical store. You can then perform more analysis by invoking Spark actions against the DataFrame. This operation doesn't impact the transactional store.
51149

52150
The syntax in **Python** would be the following:
53151
```python
@@ -60,7 +158,7 @@ df = spark.read.format("cosmos.olap")\
60158
```
61159

62160
The equivalent syntax in **Scala** would be the following:
63-
```java
161+
```scala
64162
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
65163

66164
val df_olap = spark.read.format("cosmos.olap").
@@ -71,7 +169,7 @@ val df_olap = spark.read.format("cosmos.olap").
71169

72170
### Create Spark table
73171

74-
In this example, you'll create a Spark table that points the Azure Cosmos DB analytical store. You can then perform additional analysis by invoking SparkSQL queries against the table. This operation neither impacts the transactional store nor does it incur any data movement. If you decide to delete this Spark table, the underlying Azure Cosmos DB container and the corresponding analytical store will not be affected.
172+
In this example, you create a Spark table that points the Azure Cosmos DB analytical store. You can then perform additional analysis by invoking SparkSQL queries against the table. This operation doesn't impact transactional store or incur data movement. If you decide to delete this Spark table, the underlying Azure Cosmos DB container and the corresponding analytical store will not be affected.
75173

76174
This scenario is convenient to reuse Spark tables through third-party tools and provide accessibility to the underlying data for the run-time.
77175

@@ -92,7 +190,7 @@ create table call_center using cosmos.olap options (
92190

93191
## Write Spark DataFrame to Azure Cosmos DB container
94192

95-
In this example, you'll write a Spark DataFrame into an Azure Cosmos DB container. This operation will impact the performance of transactional workloads and consume request units provisioned on the Azure Cosmos DB container or the shared database.
193+
In this example, you write a Spark DataFrame into an Azure Cosmos DB container. This operation impacts the performance of transactional workloads and consume request units provisioned on the Azure Cosmos DB container or the shared database.
96194

97195
The syntax in **Python** would be the following:
98196
```python
@@ -120,12 +218,12 @@ df.write.format("cosmos.oltp").
120218
```
121219

122220
## Load streaming DataFrame from container
123-
In this gesture, you'll use Spark Streaming capability to load data from a container into a dataframe. The data will be stored in the primary data lake account (and file system) you connected to the workspace.
221+
In this gesture, you use Spark Streaming capability to load data from a container into a dataframe. The data is stored in the primary data lake account (and file system) you connected to the workspace.
124222
> [!NOTE]
125-
> If you are looking to reference external libraries in Synapse Apache Spark, learn more [here](../spark/apache-spark-azure-portal-add-libraries.md). For instance, if you are looking to ingest a Spark DataFrame to a container of Azure Cosmos DB for MongoDB, you can leverage the MongoDB connector for Spark [here](https://docs.mongodb.com/spark-connector/master/).
223+
> If you are looking to reference external libraries in Synapse Apache Spark, learn more [here](../spark/apache-spark-azure-portal-add-libraries.md). For instance, if you are looking to ingest a Spark DataFrame to a container of Azure Cosmos DB for MongoDB, you can use the MongoDB connector for Spark [here](https://docs.mongodb.com/spark-connector/master/).
126224
127225
## Load streaming DataFrame from Azure Cosmos DB container
128-
In this example, you'll use Spark's structured streaming capability to load data from an Azure Cosmos DB container into a Spark streaming DataFrame using the change feed functionality in Azure Cosmos DB. The checkpoint data used by Spark will be stored in the primary data lake account (and file system) that you connected to the workspace.
226+
In this example, you use Spark's structured streaming to load data from an Azure Cosmos DB container into a Spark streaming DataFrame, using the change feed functionality in Azure Cosmos DB. The checkpoint data used by Spark will be stored in the primary data lake account (and file system) that you connected to the workspace.
129227

130228
The syntax in **Python** would be the following:
131229
```python
@@ -154,7 +252,7 @@ val dfStream = spark.readStream.
154252
```
155253

156254
## Write streaming DataFrame to Azure Cosmos DB container
157-
In this example, you'll write a streaming DataFrame into an Azure Cosmos DB container. This operation will impact the performance of transactional workloads and consume Request Units provisioned on the Azure Cosmos DB container or shared database. If the folder */localWriteCheckpointFolder* isn't created (in the example below), it will be automatically created.
255+
In this example, you write a streaming DataFrame into an Azure Cosmos DB container. This operation impacts the performance of transactional workloads and consume Request Units provisioned on the Azure Cosmos DB container or shared database. If the folder */localWriteCheckpointFolder* isn't created (in the example below), it is automatically created.
158256

159257
The syntax in **Python** would be the following:
160258

@@ -195,4 +293,4 @@ query.awaitTermination()
195293
* [Samples to get started with Azure Synapse Link on GitHub](https://aka.ms/cosmosdb-synapselink-samples)
196294
* [Learn what is supported in Azure Synapse Link for Azure Cosmos DB](./concept-synapse-link-cosmos-db-support.md)
197295
* [Connect to Synapse Link for Azure Cosmos DB](../quickstart-connect-synapse-link-cosmos-db.md)
198-
* Checkout the Learn module on how to [Query Azure Cosmos DB with Apache Spark for Azure Synapse Analytics](/training/modules/query-azure-cosmos-db-with-apache-spark-for-azure-synapse-analytics/).
296+
* Check out the Learn module on how to [Query Azure Cosmos DB with Apache Spark for Azure Synapse Analytics](/training/modules/query-azure-cosmos-db-with-apache-spark-for-azure-synapse-analytics/).

0 commit comments

Comments
 (0)