Skip to content

Commit f5e7381

Browse files
Improve CosmosDB Live Migration (Azure#32270)
* Improve Cosmos DB Live migration * cosmos-db-parallel-container-migration * applied suggestions & bug fix * Fixed media to reflect new notebooks names
1 parent 29b798b commit f5e7381

22 files changed

+534
-112
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,82 +1,181 @@
11
// Databricks notebook source
2-
val cosmosEndpoint_cf = "" //enter the Cosmos DB Account URI of the source account
3-
val cosmosMasterKey_cf = "" //enter the Cosmos DB Account PRIMARY KEY of the source account
4-
val cosmosDatabaseName_cf = "database-v4" //replace database-v4 with the name of your source database
5-
val cosmosContainerName_cf = "customer" //replace customer with the name of the container you want to migrate
6-
val cosmosContainerName_throughputControl = "0.95" //targetThroughputThreshold defines target percentage (here it is 95%) of available throughput you want the migration to use
7-
8-
val cosmosEndpoint_write = "" //enter the Cosmos DB Account URI of the target account
9-
val cosmosMasterKey_write = "" //enter the Cosmos DB Account PRIMARY KEY of the target account
10-
val cosmosDatabaseName_write = "database-v4" //replace this with the name of your target database
11-
val cosmosContainerName_write = "customer_v2" //replace this with what you want to name your target container
2+
dbutils.widgets.removeAll()
123

134
// COMMAND ----------
145

15-
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
16-
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint_cf)
17-
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey_cf)
6+
// global config
7+
dbutils.widgets.text("cosmosEndpoint", "") // enter the Cosmos DB Account URI of the source account
8+
dbutils.widgets.text("cosmosMasterKey", "") // enter the Cosmos DB Account PRIMARY KEY of the source account
9+
dbutils.widgets.text("cosmosRegion", "") // enter the Cosmos DB Region
10+
11+
// source config
12+
dbutils.widgets.text("cosmosSourceDatabaseName", "") // enter the name of your source database
13+
dbutils.widgets.text("cosmosSourceContainerName", "") // enter the name of the container you want to migrate
14+
dbutils.widgets.text("cosmosSourceContainerThroughputControl", "") // targetThroughputThreshold defines target percentage of available throughput you want the migration to use
15+
16+
// target config
17+
dbutils.widgets.text("cosmosTargetDatabaseName", "") // enter the name of your target database
18+
dbutils.widgets.text("cosmosTargetContainerName", "") // enter the name of the target container
19+
dbutils.widgets.text("cosmosTargetContainerPartitionKey", "") // enter the partition key for how data is stored in the target container
20+
dbutils.widgets.text("cosmosTargetContainerProvisionedThroughput", "") // enter the partition key for how data is stored in the target container
1821

1922
// COMMAND ----------
2023

21-
// MAGIC %sql
22-
// MAGIC /* NOTE: It is important to enable TTL (can be off/-1 by default) on the throughput control container */
23-
// MAGIC CREATE TABLE IF NOT EXISTS cosmosCatalog.`database-v4`.ThroughputControl -- replace database-v4 with source database name - ThroughputControl table will be created there
24-
// MAGIC USING cosmos.oltp
25-
// MAGIC OPTIONS(spark.cosmos.database = 'database-v4') -- replace database-v4 with the name of your source database
26-
// MAGIC TBLPROPERTIES(partitionKeyPath = '/groupId', autoScaleMaxThroughput = '4000', indexingPolicy = 'AllProperties', defaultTtlInSeconds = '-1');
27-
// MAGIC
28-
// MAGIC CREATE TABLE IF NOT EXISTS cosmosCatalog.`database-v4`.customer_v2 -- replace database-v4 with the name of your source database, and customer_v2 with what you want to name your target container - it will be created here
29-
// MAGIC USING cosmos.oltp
30-
// MAGIC -- replace /customerId with the name of the field that you want to be used as the partition key in the new version of the container
31-
// MAGIC TBLPROPERTIES(partitionKeyPath = '/customerId', autoScaleMaxThroughput = '100000', indexingPolicy = 'OnlySystemProperties');
24+
val cosmosEndpoint = dbutils.widgets.get("cosmosEndpoint")
25+
val cosmosMasterKey = dbutils.widgets.get("cosmosMasterKey")
26+
val cosmosRegion = dbutils.widgets.get("cosmosRegion")
27+
28+
val cosmosSourceDatabaseName = dbutils.widgets.get("cosmosSourceDatabaseName")
29+
val cosmosSourceContainerName = dbutils.widgets.get("cosmosSourceContainerName")
30+
val cosmosSourceContainerThroughputControl = dbutils.widgets.get("cosmosSourceContainerThroughputControl")
31+
32+
val cosmosTargetDatabaseName = dbutils.widgets.get("cosmosTargetDatabaseName")
33+
val cosmosTargetContainerName = dbutils.widgets.get("cosmosTargetContainerName")
34+
val cosmosTargetContainerPartitionKey = dbutils.widgets.get("cosmosTargetContainerPartitionKey")
35+
val cosmosTargetContainerProvisionedThroughput = dbutils.widgets.get("cosmosTargetContainerProvisionedThroughput")
3236

3337
// COMMAND ----------
3438

35-
val changeFeedCfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint_cf,
36-
"spark.cosmos.applicationName" -> "LiveMigrationRead_",
37-
"spark.cosmos.accountKey" -> cosmosMasterKey_cf,
38-
"spark.cosmos.database" -> cosmosDatabaseName_cf,
39-
"spark.cosmos.container" -> cosmosContainerName_cf,
40-
"spark.cosmos.read.partitioning.strategy" -> "Default",
41-
"spark.cosmos.read.inferSchema.enabled" -> "false",
42-
"spark.cosmos.changeFeed.startFrom" -> "Beginning",
43-
"spark.cosmos.changeFeed.mode" -> "Incremental",
44-
"spark.cosmos.changeFeed.itemCountPerTriggerHint" -> "50000",
45-
"spark.cosmos.throughputControl.enabled" -> "true",
46-
"spark.cosmos.throughputControl.name" -> "SourceContainerThroughputControl",
47-
"spark.cosmos.throughputControl.targetThroughputThreshold" -> cosmosContainerName_throughputControl,
48-
"spark.cosmos.throughputControl.globalControl.database" -> "database-v4", //replace database-v4 with the name of your source database
49-
"spark.cosmos.throughputControl.globalControl.container" -> "ThroughputControl",
50-
"spark.cosmos.preferredRegionsList" -> "[UK South]" //replace this with comma separate list of regions appropriate for your source container
51-
)
52-
53-
//when running this notebook is stopped (or if a problem causes a crash) change feed processing will be picked up from last processed document
54-
//if you want to start from beginning, delete this folder or change checkpointLocation value
55-
val checkpointLocation = "/tmp/LiveMigration_checkpoint"
56-
57-
val writeCfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint_write,
58-
"spark.cosmos.accountKey" -> cosmosMasterKey_write,
59-
"spark.cosmos.applicationName" -> "LivemigrationWrite_",
60-
"spark.cosmos.database" -> cosmosDatabaseName_write,
61-
"spark.cosmos.container" -> cosmosContainerName_write,
62-
"spark.cosmos.write.strategy" -> "ItemOverwrite", //default
39+
import org.apache.spark.sql.DataFrame
40+
41+
def connectToCosmos(cosmosEndpoint: String, cosmosMasterKey: String) {
42+
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
43+
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
44+
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)
45+
}
46+
47+
def returnCosmosDBProperties(cosmosContainerProvisionedThroughput: String): String = {
48+
if (cosmosTargetContainerProvisionedThroughput contains "shared") { return "WITH DBPROPERTIES (%s)".format(cosmosTargetContainerProvisionedThroughput.replace("sharedDB", "")) } else return ""
49+
}
50+
51+
def returnCosmosContainerProperties(cosmosContainerProvisionedThroughput: String): String = {
52+
if(!(cosmosTargetContainerProvisionedThroughput contains "shared")) { return "%s,".format(cosmosTargetContainerProvisionedThroughput) } else return ""
53+
}
54+
55+
def createCosmosDB(cosmosDatabaseName: String, cosmosContainerProvisionedThroughput: String){
56+
val cosmosDatabaseOptions = returnCosmosDBProperties(cosmosContainerProvisionedThroughput)
57+
58+
val query = s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.`$cosmosDatabaseName` $cosmosDatabaseOptions;"
59+
println("Executing create database...")
60+
println(query.trim())
61+
try {
62+
spark.sql(query)
63+
} catch {
64+
case e:Exception=> println(e)
65+
}
66+
}
67+
68+
def createThroughtputControlTable(cosmosDatabaseName: String){/* NOTE: It is important to enable TTL (can be off/-1 by default) on the throughput control container */
69+
val query = s"""
70+
CREATE TABLE IF NOT EXISTS cosmosCatalog.`$cosmosDatabaseName`.`ThroughputControl` USING cosmos.oltp
71+
OPTIONS (
72+
spark.cosmos.database = '$cosmosDatabaseName'
73+
)
74+
TBLPROPERTIES(
75+
partitionKeyPath = '/groupId',
76+
autoScaleMaxThroughput = '4000',
77+
indexingPolicy = 'AllProperties',
78+
defaultTtlInSeconds = '-1'
79+
);
80+
"""
81+
println("Executing create ThroughputControl Container...")
82+
println(query.trim())
83+
try {
84+
spark.sql(query)
85+
} catch {
86+
case e:Exception=> println(e)
87+
}
88+
}
89+
90+
def createCosmosContainer(cosmosDatabaseName: String, cosmosContainerName: String, cosmosPartitionKey: String, cosmosContainerProvisionedThroughput: String) {
91+
val cosmosContainerOptions = returnCosmosContainerProperties(cosmosContainerProvisionedThroughput)
92+
93+
val query = s"""
94+
CREATE TABLE IF NOT EXISTS cosmosCatalog.`$cosmosDatabaseName`.`$cosmosContainerName` USING cosmos.oltp
95+
TBLPROPERTIES (
96+
partitionKeyPath = '$cosmosPartitionKey',
97+
$cosmosContainerOptions
98+
indexingPolicy = 'OnlySystemProperties'
99+
);
100+
"""
101+
println("Executing create Container...")
102+
println(query.trim())
103+
try {
104+
spark.sql(query)
105+
} catch {
106+
case e:Exception=> println(e)
107+
}
108+
}
109+
110+
def cosmosInitReadStream(cosmosEndpoint: String, cosmosMasterKey: String, cosmosRegion: String, cosmosDatabaseName: String, cosmosContainerName: String, cosmosContainerThroughputControl: String): DataFrame = {
111+
val changeFeedConfig = Map(
112+
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
113+
"spark.cosmos.accountKey" -> cosmosMasterKey,
114+
"spark.cosmos.applicationName" -> s"${cosmosDatabaseName}_${cosmosContainerName}_LiveMigrationRead_",
115+
"spark.cosmos.database" -> cosmosDatabaseName,
116+
"spark.cosmos.container" -> cosmosContainerName,
117+
"spark.cosmos.read.partitioning.strategy" -> "Restrictive",
118+
"spark.cosmos.read.inferSchema.enabled" -> "false",
119+
"spark.cosmos.read.maxItemCount" -> "5",
120+
"spark.cosmos.changeFeed.startFrom" -> "Beginning",
121+
"spark.cosmos.changeFeed.mode" -> "Incremental",
122+
"spark.cosmos.changeFeed.itemCountPerTriggerHint" -> "50000",
123+
"spark.cosmos.throughputControl.enabled" -> "true",
124+
"spark.cosmos.throughputControl.name" -> "SourceContainerThroughputControl",
125+
"spark.cosmos.throughputControl.targetThroughputThreshold" -> cosmosContainerThroughputControl,
126+
"spark.cosmos.throughputControl.globalControl.database" -> cosmosDatabaseName,
127+
"spark.cosmos.throughputControl.globalControl.container" -> "ThroughputControl",
128+
"spark.cosmos.preferredRegionsList" -> cosmosRegion
129+
)
130+
131+
val changeFeedDF = spark.readStream.format("cosmos.oltp.changeFeed")
132+
.options(changeFeedConfig)
133+
.load
134+
135+
/*this will preserve the source document fields and retain the "_etag" and "_ts" property values as "_origin_etag" and "_origin_ts" in the sink documnet*/
136+
return changeFeedDF.withColumnRenamed("_rawbody", "_origin_rawBody")
137+
}
138+
139+
def cosmosInitWriteConfig(cosmosEndpoint: String, cosmosMasterKey: String, cosmosSourceDatabaseName: String, cosmosSourceContainerName: String, cosmosTargetDatabaseName: String, cosmosTargetContainerName: String): Map[String, String] = {
140+
// when running this notebook is stopped (or if a problem causes a crash) change feed processing will be picked up from last processed document
141+
// if you want to start from beginning, delete this folder or change checkpointLocation value
142+
val checkpointLocation = s"/tmp/live_migration_checkpoint/${cosmosSourceDatabaseName}/${cosmosSourceContainerName}"
143+
val applicationName = s"${cosmosSourceDatabaseName}_${cosmosSourceContainerName}_"
144+
145+
return Map(
146+
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
147+
"spark.cosmos.accountKey" -> cosmosMasterKey,
148+
"spark.cosmos.applicationName" -> applicationName,
149+
"spark.cosmos.database" -> cosmosTargetDatabaseName,
150+
"spark.cosmos.container" -> cosmosTargetContainerName,
151+
"spark.cosmos.write.strategy" -> "ItemOverwrite",
63152
"checkpointLocation" -> checkpointLocation
64-
)
153+
)
154+
}
65155

66156
// COMMAND ----------
67157

68-
val changeFeedDF = spark.readStream.format("cosmos.oltp.changeFeed")
69-
.options(changeFeedCfg)
70-
.load
71-
/*this will preserve the source document fields and retain the "_etag" and "_ts" property values as "_origin_etag" and "_origin_ts" in the sink documnet*/
72-
val df_withAuditFields = changeFeedDF.withColumnRenamed("_rawbody", "_origin_rawBody")
158+
connectToCosmos(cosmosEndpoint = cosmosEndpoint, cosmosMasterKey = cosmosMasterKey)
159+
160+
createCosmosDB(cosmosDatabaseName = cosmosTargetDatabaseName, cosmosContainerProvisionedThroughput = cosmosTargetContainerProvisionedThroughput)
161+
162+
createThroughtputControlTable(cosmosDatabaseName = cosmosSourceDatabaseName)
163+
164+
createCosmosContainer(cosmosDatabaseName = cosmosTargetDatabaseName, cosmosContainerName = cosmosTargetContainerName, cosmosPartitionKey = cosmosTargetContainerPartitionKey, cosmosContainerProvisionedThroughput = cosmosTargetContainerProvisionedThroughput)
165+
166+
// COMMAND ----------
167+
168+
val readStream = cosmosInitReadStream(cosmosEndpoint = cosmosEndpoint, cosmosMasterKey = cosmosMasterKey, cosmosRegion = cosmosRegion, cosmosDatabaseName = cosmosSourceDatabaseName, cosmosContainerName = cosmosSourceContainerName, cosmosContainerThroughputControl = cosmosSourceContainerThroughputControl)
169+
170+
val writeConfig = cosmosInitWriteConfig(cosmosEndpoint = cosmosEndpoint, cosmosMasterKey = cosmosMasterKey, cosmosSourceDatabaseName = cosmosSourceDatabaseName, cosmosSourceContainerName = cosmosSourceContainerName, cosmosTargetDatabaseName = cosmosTargetDatabaseName, cosmosTargetContainerName = cosmosTargetContainerName)
171+
172+
val runId = java.util.UUID.randomUUID.toString;
73173

74174
// COMMAND ----------
75175

76-
val runId = java.util.UUID.randomUUID.toString;
77-
df_withAuditFields.writeStream
78-
.format("cosmos.oltp")
79-
.queryName(runId)
80-
.options(writeCfg)
81-
.outputMode("append")
82-
.start()
176+
readStream.writeStream
177+
.format("cosmos.oltp")
178+
.queryName(runId)
179+
.options(writeConfig)
180+
.outputMode("append")
181+
.start()

0 commit comments

Comments
 (0)