|
| 1 | +--- |
| 2 | +title: Working with Azure Cosmos DB for MongoDB vCore from Azure Databricks |
| 3 | +description: This article is the main page for Azure Cosmos DB for MongoDB vCore integration from Azure Databricks. |
| 4 | +author: Gary3207Lee |
| 5 | +ms.author: yongl |
| 6 | +ms.reviewer: krmeht |
| 7 | +ms.service: cosmos-db |
| 8 | +ms.subservice: mongodb-vcore |
| 9 | +ms.topic: how-to |
| 10 | +ms.date: 03/08/2024 |
| 11 | +--- |
| 12 | + |
| 13 | +# Connect to Azure Cosmos DB for MongoDB vCore from Azure Databricks |
| 14 | +This article explains how to connect Azure Cosmos DB MongoDB vCore from Azure Databricks. It walks through basic Data Manipulation Language(DML) operations like Read, Filter, SQLs, Aggregation Pipelines and Write Tables using python code. |
| 15 | + |
| 16 | +## Prerequisites |
| 17 | +* [Provision an Azure Cosmos DB for MongoDB vCore cluster.](quickstart-portal.md) |
| 18 | + |
| 19 | +* Provision your choice of Spark environment [Azure Databricks](/azure/databricks/scenarios/quickstart-create-databricks-workspace-portal). |
| 20 | + |
| 21 | +## Configure dependencies for connectivity |
| 22 | +The following are the dependencies required to connect to Azure Cosmos DB for MongoDB vCore from Azure Databricks: |
| 23 | +* **Spark connector for MongoDB** |
| 24 | + Spark connector is used to connect to Azure Cosmos DB for MongoDB vCore. Identify and use the version of the connector located in [Maven central](https://mvnrepository.com/artifact/org.mongodb.spark/mongo-spark-connector) that is compatible with the Spark and Scala versions of your Spark environment. We recommend an environment that supports Spark 3.2.1 or higher, and the spark connector available at maven coordinates `org.mongodb.spark:mongo-spark-connector_2.12:3.0.1`. |
| 25 | + |
| 26 | +* **Azure Cosmos DB for MongoDB connection strings:** Your Azure Cosmos DB for MongoDB vCore connection string, user name, and passwords. |
| 27 | + |
| 28 | +## Provision an Azure Databricks cluster |
| 29 | + |
| 30 | +You can follow instructions to [provision an Azure Databricks cluster](/azure/databricks/scenarios/quickstart-create-databricks-workspace-portal). We recommend selecting Databricks runtime version 7.6, which supports Spark 3.0. |
| 31 | + |
| 32 | +:::image type="content" source="../media/migrate-databricks/databricks-cluster-creation.png" alt-text="Diagram of databricks new cluster creation."::: |
| 33 | + |
| 34 | + |
| 35 | +## Add dependencies |
| 36 | + |
| 37 | +Add the MongoDB Connector for Spark library to your cluster to connect to both native MongoDB and Azure Cosmos DB for MongoDB endpoints. In your cluster, select **Libraries** > **Install New** > **Maven**, and then add `org.mongodb.spark:mongo-spark-connector_2.12:3.0.1` Maven coordinates. |
| 38 | + |
| 39 | +:::image type="content" source="../media/migrate-databricks/databricks-cluster-dependencies.png" alt-text="Diagram of adding databricks cluster dependencies."::: |
| 40 | + |
| 41 | +Select **Install**, and then restart the cluster when installation is complete. |
| 42 | + |
| 43 | +> [!NOTE] |
| 44 | +> Make sure that you restart the Databricks cluster after the MongoDB Connector for Spark library has been installed. |
| 45 | +
|
| 46 | +After that, you may create a Scala or Python notebook for migration. |
| 47 | + |
| 48 | +## Create Python notebook to connect to Azure Cosmos DB for MongoDB vCore |
| 49 | + |
| 50 | +Create a Python Notebook in Databricks. Make sure to enter the right values for the variables before running the following codes. |
| 51 | + |
| 52 | +### Update Spark configuration with the Azure Cosmos DB for MongoDB connection string |
| 53 | + |
| 54 | +1. Note the connect string under the **Settings** -> **Connection strings** in Azure Cosmos DB MongoDB vCore Resource in Azure portal. It has the form of "mongodb+srv://\<user>\:\<password>\@\<database_name>.mongocluster.cosmos.azure.com" |
| 55 | +2. Back in Databricks in your cluster configuration, under **Advanced Options** (bottom of page), paste the connection string for both the `spark.mongodb.output.uri` and `spark.mongodb.input.uri` variables. Populate the username and password field appropriate. This way all the workbooks, which running on the cluster uses this configuration. |
| 56 | +3. Alternatively you can explicitly set the `option` when calling APIs like: `spark.read.format("mongo").option("spark.mongodb.input.uri", connectionString).load()`. If you configure the variables in the cluster, you don't have to set the option. |
| 57 | + |
| 58 | +```python |
| 59 | +connectionString_vcore="mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.com/?tls=true&authMechanism=SCRAM-SHA-256&retrywrites=false&maxIdleTimeMS=120000" |
| 60 | +database="<database_name>" |
| 61 | +collection="<collection_name>" |
| 62 | +``` |
| 63 | + |
| 64 | +### Data sample set |
| 65 | + |
| 66 | +For the purpose with this lab, we're using the CSV 'Citibike2019' data set. You can import it: |
| 67 | +[CitiBike Trip History 2019](https://citibikenyc.com/system-data). |
| 68 | +We loaded it into a database called "CitiBikeDB" and the collection "CitiBike2019". |
| 69 | +We're setting the variables database and collection to point to the data loaded and we're using variables in the examples. |
| 70 | +```python |
| 71 | +database="CitiBikeDB" |
| 72 | +collection="CitiBike2019" |
| 73 | +``` |
| 74 | + |
| 75 | +### Read data from Azure Cosmos DB for MongoDB vCore |
| 76 | + |
| 77 | +The general syntax looks like this: |
| 78 | +```python |
| 79 | +df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).load() |
| 80 | +``` |
| 81 | + |
| 82 | +You can validate the data frame loaded as follows: |
| 83 | +```python |
| 84 | +df_vcore.printSchema() |
| 85 | +display(df_vcore) |
| 86 | +``` |
| 87 | + |
| 88 | +Let's see an example: |
| 89 | +```python |
| 90 | +df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).load() |
| 91 | +df_vcore.printSchema() |
| 92 | +display(df_vcore) |
| 93 | +``` |
| 94 | + |
| 95 | +Output: |
| 96 | + |
| 97 | +**Schema** |
| 98 | + :::image type="content" source="./media/connect-from-databricks/print-schema.png" alt-text="Screenshot of the Print Schema."::: |
| 99 | + |
| 100 | +**DataFrame** |
| 101 | + :::image type="content" source="./media/connect-from-databricks/display-dataframe-vcore.png" alt-text="Screenshot of the Display DataFrame."::: |
| 102 | + |
| 103 | +### Filter data from Azure Cosmos DB for MongoDB vCore |
| 104 | + |
| 105 | +The general syntax looks like this: |
| 106 | +```python |
| 107 | +df_v = df_vcore.filter(df_vcore[column number/column name] == [filter condition]) |
| 108 | +display(df_v) |
| 109 | +``` |
| 110 | + |
| 111 | +Let's see an example: |
| 112 | +```python |
| 113 | +df_v = df_vcore.filter(df_vcore[2] == 1970) |
| 114 | +display(df_v) |
| 115 | +``` |
| 116 | + |
| 117 | +Output: |
| 118 | + :::image type="content" source="./media/connect-from-databricks/display-filter.png" alt-text="Screenshot of the Display Filtered DataFrame."::: |
| 119 | + |
| 120 | +### Create a view or temporary table and run SQL queries against it |
| 121 | + |
| 122 | +The general syntax looks like this: |
| 123 | +```python |
| 124 | +df_[dataframename].createOrReplaceTempView("[View Name]") |
| 125 | +spark.sql("SELECT * FROM [View Name]") |
| 126 | +``` |
| 127 | + |
| 128 | +Let's see an example: |
| 129 | +```python |
| 130 | +df_vcore.createOrReplaceTempView("T_VCORE") |
| 131 | +df_v = spark.sql(" SELECT * FROM T_VCORE WHERE birth_year == 1970 and gender == 2 ") |
| 132 | +display(df_v) |
| 133 | +``` |
| 134 | + |
| 135 | +Output: |
| 136 | + :::image type="content" source="./media/connect-from-databricks/display-sql.png" alt-text="Screenshot of the Display SQL Query."::: |
| 137 | + |
| 138 | +### Write data to Azure Cosmos DB for MongoDB vCore |
| 139 | + |
| 140 | +The general syntax looks like this: |
| 141 | +```python |
| 142 | +df.write.format("mongo").option("spark.mongodb.output.uri", connectionString).option("database",database).option("collection","<collection_name>").mode("append").save() |
| 143 | +``` |
| 144 | + |
| 145 | +Let's see an example: |
| 146 | +```python |
| 147 | +df_vcore.write.format("mongo").option("spark.mongodb.output.uri", connectionString_vcore).option("database",database).option("collection","CitiBike2019").mode("append").save() |
| 148 | +``` |
| 149 | + |
| 150 | +This command doesn't have an output as it writes directly to the collection. You can cross check if the record is updated using a read command. |
| 151 | + |
| 152 | +### Read data from Azure Cosmos DB for MongoDB vCore collection running an Aggregation Pipeline |
| 153 | + |
| 154 | +[!Note] |
| 155 | +[Aggregation Pipeline](../tutorial-aggregation.md) is a powerful capability that allows to preprocess and transform data within Azure Cosmos DB for MongoDB. It's a great match for real-time analytics, dashboards, report generation with roll-ups, sums & averages with 'server-side' data post-processing. (Note: there's a [whole book written about it](https://www.practical-mongodb-aggregations.com/front-cover.html)). |
| 156 | + |
| 157 | +Azure Cosmos DB for MongoDB even supports [rich secondary/compound indexes](../indexing.md) to extract, filter, and process only the data it needs. |
| 158 | + |
| 159 | +For example, analyzing all customers located in a specific geography right within the database without first having to load the full data-set, minimizing data-movement and reducing latency. <br/> |
| 160 | + |
| 161 | +Here's an example of using aggregate function: |
| 162 | + |
| 163 | +```python |
| 164 | +pipeline = "[{ $group : { _id : '$birth_year', totaldocs : { $count : 1 }, totalduration: {$sum: '$tripduration'}} }]" |
| 165 | +df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).option("pipeline", pipeline).load() |
| 166 | +display(df_vcore) |
| 167 | +``` |
| 168 | + |
| 169 | +Output: |
| 170 | + |
| 171 | + :::image type="content" source="./media/connect-from-databricks/display-aggregation-pipeline.png" alt-text="Screenshot of the Display Aggregate Data."::: |
| 172 | + |
| 173 | +## Related contents |
| 174 | + |
| 175 | +The following articles demonstrate how to use aggregation pipelines in Azure Cosmos DB for MongoDB vCore: |
| 176 | + |
| 177 | +* [Maven central](https://mvnrepository.com/artifact/org.mongodb.spark/mongo-spark-connector) is where you can find Spark Connector. |
| 178 | +* [Aggregation Pipeline](../tutorial-aggregation.md) |
0 commit comments