Skip to content

Commit 45b424c

Browse files
committed
updated samples for scala and added patch example
1 parent 6a85770 commit 45b424c

File tree

1 file changed

+166
-13
lines changed

1 file changed

+166
-13
lines changed

articles/cosmos-db/sql/create-sql-api-spark.md

Lines changed: 166 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,30 +26,32 @@ ms.custom: seo-java-august2019, seo-java-september2019, devx-track-java, mode-ap
2626
> * [Go](create-sql-api-go.md)
2727
>
2828
29-
This tutorial is a quick start guide to show how to use Cosmos DB Spark Connector to read from or write to Cosmos DB. Cosmos DB Spark Connector supports Spark 3.1.x and 3.2.x. Without a credit card or an Azure subscription, you can set up a free [Try Azure Cosmos DB account](https://aka.ms/trycosmosdb)
29+
This tutorial is a quick start guide to show how to use Cosmos DB Spark Connector to read from or write to Cosmos DB. Cosmos DB Spark Connector supports Spark 3.1.x and 3.2.x.
3030

31-
Throughout this quick tutorial, we rely on [Azure Databricks Runtime 8.0 with Spark 3.1.1](/azure/databricks/release-notes/runtime/8.0) and a Jupyter Notebook to show how to use the Cosmos DB Spark Connector, but you can also use [Azure Databricks Runtime 10.3 with Spark 3.2.1](/azure/databricks/release-notes/runtime/10.3).
31+
Throughout this quick tutorial, we rely on [Azure Databricks Runtime 10.4 with Spark 3.2.1](/azure/databricks/release-notes/runtime/10.4) and a Jupyter Notebook to show how to use the Cosmos DB Spark Connector.
3232

33-
You can use any other Spark 3.1.1 or 3.2.1 spark offering as well, also you should be able to use any language supported by Spark (PySpark, Scala, Java, etc.), or any Spark interface you are familiar with (Jupyter Notebook, Livy, etc.).
33+
You can use any other Spark (for e.g., spark 3.1.1) offering as well, also you should be able to use any language supported by Spark (PySpark, Scala, Java, etc.), or any Spark interface you are familiar with (Jupyter Notebook, Livy, etc.).
3434

3535
## Prerequisites
3636

37-
* An active Azure account. If you don't have one, you can sign up for a [free account](https://aka.ms/trycosmosdb). Alternatively, you can use the [use Azure Cosmos DB Emulator](../local-emulator.md) for development and testing.
37+
* An active Azure account. If you don't have one, you can sign up for a [free account](https://azure.microsoft.com/try/cosmosdb/). Alternatively, you can use the [use Azure Cosmos DB Emulator](../local-emulator.md) for development and testing.
3838

39-
* [Azure Databricks](/azure/databricks/release-notes/runtime/8.0) runtime 8.0 with Spark 3.1.1 or [Azure Databricks](/azure/databricks/release-notes/runtime/10.3) runtime 10.3 with Spark 3.2.1.
39+
* [Azure Databricks](/azure/databricks/release-notes/runtime/10.4) runtime 10.4 with Spark 3.2.1
4040

4141
* (Optional) [SLF4J binding](https://www.slf4j.org/manual.html) is used to associate a specific logging framework with SLF4J.
4242

4343
SLF4J is only needed if you plan to use logging, also download an SLF4J binding, which will link the SLF4J API with the logging implementation of your choice. See the [SLF4J user manual](https://www.slf4j.org/manual.html) for more information.
4444

45-
Install Cosmos DB Spark Connector in your spark cluster [using the latest version for Spark 3.1.x](https://aka.ms/azure-cosmos-spark-3-1-download) or [using the latest version for Spark 3.2.x](https://aka.ms/azure-cosmos-spark-3-2-download).
45+
Install Cosmos DB Spark Connector in your spark cluster [using the latest version for Spark 3.2.x](https://aka.ms/azure-cosmos-spark-3-2-download).
4646

47-
The getting started guide is based on PySpark however you can use the equivalent scala version as well, and you can run the following code snippet in an Azure Databricks PySpark notebook.
47+
The getting started guide is based on PySpark/Scala and you can run the following code snippet in an Azure Databricks PySpark/Scala notebook.
4848

4949
## Create databases and containers
5050

5151
First, set Cosmos DB account credentials, and the Cosmos DB Database name and container name.
5252

53+
#### [Python](#tab/python)
54+
5355
```python
5456
cosmosEndpoint = "https://REPLACEME.documents.azure.com:443/"
5557
cosmosMasterKey = "REPLACEME"
@@ -64,8 +66,26 @@ cfg = {
6466
}
6567
```
6668

69+
#### [Scala](#tab/scala)
70+
71+
```scala
72+
val cosmosEndpoint = "https://REPLACEME.documents.azure.com:443/"
73+
val cosmosMasterKey = "REPLACEME"
74+
val cosmosDatabaseName = "sampleDB"
75+
val cosmosContainerName = "sampleContainer"
76+
77+
val cfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
78+
"spark.cosmos.accountKey" -> cosmosMasterKey,
79+
"spark.cosmos.database" -> cosmosDatabaseName,
80+
"spark.cosmos.container" -> cosmosContainerName
81+
)
82+
```
83+
---
84+
6785
Next, you can use the new Catalog API to create a Cosmos DB Database and Container through Spark.
6886

87+
#### [Python](#tab/python)
88+
6989
```python
7090
# Configure Catalog Api to be used
7191
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
@@ -79,6 +99,22 @@ spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabas
7999
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))
80100
```
81101

102+
#### [Scala](#tab/scala)
103+
104+
```scala
105+
// Configure Catalog Api to be used
106+
spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
107+
spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
108+
spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)
109+
110+
// create a cosmos database using catalog api
111+
spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.${cosmosDatabaseName};")
112+
113+
// create a cosmos container using catalog api
114+
spark.sql(s"CREATE TABLE IF NOT EXISTS cosmosCatalog.${cosmosDatabaseName}.${cosmosContainerName} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')")
115+
```
116+
---
117+
82118
When creating containers with the Catalog API, you can set the throughput and [partition key path](../partitioning-overview.md#choose-partitionkey) for the container to be created.
83119

84120
For more information, see the full [Catalog API](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/catalog-api.md) documentation.
@@ -87,6 +123,8 @@ For more information, see the full [Catalog API](https://github.com/Azure/azure-
87123

88124
The name of the data source is `cosmos.oltp`, and the following example shows how you can write a memory dataframe consisting of two items to Cosmos DB:
89125

126+
#### [Python](#tab/python)
127+
90128
```python
91129
spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
92130
.toDF("id","name","age","isAlive") \
@@ -97,6 +135,19 @@ spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "
97135
.save()
98136
```
99137

138+
#### [Scala](#tab/scala)
139+
140+
```scala
141+
spark.createDataFrame(Seq(("cat-alive", "Schrodinger cat", 2, true), ("cat-dead", "Schrodinger cat", 2, false)))
142+
.toDF("id","name","age","isAlive")
143+
.write
144+
.format("cosmos.oltp")
145+
.options(cfg)
146+
.mode("APPEND")
147+
.save()
148+
```
149+
---
150+
100151
Note that `id` is a mandatory field for Cosmos DB.
101152

102153
For more information related to ingesting data, see the full [write configuration](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md#write-config) documentation.
@@ -105,6 +156,8 @@ For more information related to ingesting data, see the full [write configuratio
105156

106157
Using the same `cosmos.oltp` data source, we can query data and use `filter` to push down filters:
107158

159+
#### [Python](#tab/python)
160+
108161
```python
109162
from pyspark.sql.functions import col
110163

@@ -116,23 +169,108 @@ df.filter(col("isAlive") == True)\
116169
.show()
117170
```
118171

172+
#### [Scala](#tab/scala)
173+
174+
```scala
175+
import org.apache.spark.sql.functions.col
176+
177+
val df = spark.read.format("cosmos.oltp").options(cfg).load()
178+
179+
df.filter(col("isAlive") === true)
180+
.withColumn("age", col("age") + 1)
181+
.show()
182+
```
183+
---
184+
119185
For more information related to querying data, see the full [query configuration](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md#query-config) documentation.
120186

187+
## Partial document update using Patch
188+
189+
Using the same `cosmos.oltp` data source, we can do partial update in Cosmos DB using Patch API:
190+
191+
#### [Python](#tab/python)
192+
193+
```python
194+
cfgSet = {"spark.cosmos.accountEndpoint": cosmosEndpoint,
195+
"spark.cosmos.accountKey": cosmosMasterKey,
196+
"spark.cosmos.database": cosmosDatabaseName,
197+
"spark.cosmos.container": cosmosContainerName,
198+
"spark.cosmos.write.strategy": "ItemPatch",
199+
"spark.cosmos.write.bulk.enabled": "false",
200+
"spark.cosmos.write.patch.defaultOperationType": "Set",
201+
"spark.cosmos.write.patch.columnConfigs": "[col(name).op(set)]"
202+
}
203+
204+
id = "<document-id>"
205+
query = "select * from cosmosCatalog.{}.{} where id = '{}';".format(
206+
cosmosDatabaseName, cosmosContainerName, id)
207+
208+
dfBeforePatch = spark.sql(query)
209+
print("document before patch operation")
210+
dfBeforePatch.show()
211+
212+
data = [{"id": id, "name": "Joel Brakus"}]
213+
patchDf = spark.createDataFrame(data)
214+
215+
patchDf.write.format("cosmos.oltp").mode("Append").options(**cfgSet).save()
216+
217+
dfAfterPatch = spark.sql(query)
218+
print("document after patch operation")
219+
dfAfterPatch.show()
220+
```
221+
222+
For more samples related to partial document update, see the Github code sample [Patch Sample](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3_2-12/Samples/Python/patch-sample.py).
223+
224+
225+
#### [Scala](#tab/scala)
226+
227+
```scala
228+
val cfgSet = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
229+
"spark.cosmos.accountKey" -> cosmosMasterKey,
230+
"spark.cosmos.database" -> cosmosDatabaseName,
231+
"spark.cosmos.container" -> cosmosContainerName,
232+
"spark.cosmos.write.strategy" -> "ItemPatch",
233+
"spark.cosmos.write.bulk.enabled" -> "false",
234+
235+
"spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
236+
)
237+
238+
val id = "<document-id>"
239+
val query = s"select * from cosmosCatalog.${cosmosDatabaseName}.${cosmosContainerName} where id = '$id';"
240+
241+
val dfBeforePatch = spark.sql(query)
242+
println("document before patch operation")
243+
dfBeforePatch.show()
244+
val patchDf = Seq(
245+
(id, "Joel Brakus")
246+
).toDF("id", "name")
247+
248+
patchDf.write.format("cosmos.oltp").mode("Append").options(cfgPatch).save()
249+
val dfAfterPatch = spark.sql(query)
250+
println("document after patch operation")
251+
dfAfterPatch.show()
252+
```
253+
254+
For more samples related to partial document update, see the Github code sample [Patch Sample](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3_2-12/Samples/Scala/PatchSample.scala).
255+
256+
---
257+
121258
## Schema inference
122259

123260
When querying data, the Spark Connector can infer the schema based on sampling existing items by setting `spark.cosmos.read.inferSchema.enabled` to `true`.
124261

262+
#### [Python](#tab/python)
263+
125264
```python
126265
df = spark.read.format("cosmos.oltp").options(**cfg)\
127266
.option("spark.cosmos.read.inferSchema.enabled", "true")\
128267
.load()
129268

130269
df.printSchema()
131-
```
132270

133-
Alternatively, you can pass the custom schema you want to be used to read the data:
134271

135-
```python
272+
# Alternatively, you can pass the custom schema you want to be used to read the data:
273+
136274
customSchema = StructType([
137275
StructField("id", StringType()),
138276
StructField("name", StringType()),
@@ -145,17 +283,32 @@ df = spark.read.schema(customSchema).format("cosmos.oltp").options(**cfg)\
145283
.load()
146284

147285
df.printSchema()
148-
```
149286

150-
If no custom schema is specified and schema inference is disabled, then the resulting data will be returning the raw Json content of the items:
287+
# If no custom schema is specified and schema inference is disabled, then the resulting data will be returning the raw Json content of the items:
151288

152-
```python
153289
df = spark.read.format("cosmos.oltp").options(**cfg)\
154290
.load()
155291

156292
df.printSchema()
157293
```
158294

295+
#### [Scala](#tab/scala)
296+
297+
```scala
298+
val cfgWithAutoSchemaInference = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
299+
"spark.cosmos.accountKey" -> cosmosMasterKey,
300+
"spark.cosmos.database" -> cosmosDatabaseName,
301+
"spark.cosmos.container" -> cosmosContainerName,
302+
"spark.cosmos.read.inferSchema.enabled" -> "true"
303+
)
304+
305+
val df = spark.read.format("cosmos.oltp").options(cfgWithAutoSchemaInference).load()
306+
df.printSchema()
307+
308+
df.show()
309+
```
310+
---
311+
159312
For more information related to schema inference, see the full [schema inference configuration](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md#schema-inference-config) documentation.
160313

161314
## Configuration reference

0 commit comments

Comments
 (0)