Skip to content

Commit 507b1e2

Browse files
authored
Full Fidelity Change Feed Deletes JSON Change (Azure#43881)
* Make id nullable * Add tests * Fix test * dummy commit to rerun pipeline * revert dummy commit to rerun pipeline
1 parent e745564 commit 507b1e2

File tree

3 files changed

+129
-4
lines changed

3 files changed

+129
-4
lines changed

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedTable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ private[spark] object ChangeFeedTable {
3838
StructField(RawJsonBodyAttributeName, StringType, nullable=true),
3939
StructField(IdAttributeName, StringType, nullable=false),
4040
StructField(TimestampAttributeName, LongType, nullable=false),
41-
StructField(ETagAttributeName, StringType, nullable=false),
41+
StructField(ETagAttributeName, StringType, nullable=true),
4242
StructField(LsnAttributeName, LongType, nullable=false),
4343
StructField(MetadataJsonBodyAttributeName, StringType, nullable=false),
4444
StructField(PreviousRawJsonBodyAttributeName, StringType, nullable=true),

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosRowConverterBase.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,15 @@ private[cosmos] class CosmosRowConverterBase(
712712
case _ => null
713713
}
714714
} else {
715-
null
715+
objectNode.get(MetadataJsonBodyAttributeName) match {
716+
case metadataNode: JsonNode =>
717+
metadataNode.get(IdAttributeName) match {
718+
case valueNode: JsonNode =>
719+
Option(valueNode).fold(null: String)(v => v.asText(null))
720+
case _ => null
721+
}
722+
case _ => null
723+
}
716724
}
717725
}
718726

sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala

Lines changed: 119 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,22 @@ package com.azure.cosmos.spark
55
import com.azure.cosmos.SparkBridgeInternal
66
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState
77
import com.azure.cosmos.implementation.{TestConfigurations, Utils}
8-
import com.azure.cosmos.models.PartitionKey
8+
import com.azure.cosmos.models.{ChangeFeedPolicy, CosmosContainerProperties, PartitionKey}
99
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
1010
import com.azure.cosmos.spark.udf.{CreateChangeFeedOffsetFromSpark2, CreateSpark2ContinuationsFromChangeFeedOffset, GetFeedRangeForPartitionKeyValue}
11+
import com.fasterxml.jackson.databind.node.ObjectNode
1112
import org.apache.hadoop.fs.{FileSystem, Path}
1213
import org.apache.spark.sql.functions
14+
import org.apache.spark.sql.functions.{col, concat, lit}
1315
import org.apache.spark.sql.types._
1416

1517
import java.io.{BufferedReader, InputStreamReader}
1618
import java.nio.file.Paths
19+
import java.time.Duration
1720
import java.util.UUID
1821
import scala.collection.mutable
1922
import scala.collection.mutable.ArrayBuffer
23+
import scala.jdk.CollectionConverters.asScalaBufferConverter
2024

2125
class SparkE2EChangeFeedITest
2226
extends IntegrationSpec
@@ -522,7 +526,7 @@ class SparkE2EChangeFeedITest
522526
}
523527

524528
// wait for the log store to get these changes
525-
Thread.sleep(2000)
529+
Thread.sleep(1000)
526530

527531
val df2 = spark.read.format("cosmos.oltp.changeFeed").options(cfg).load()
528532
val groupedFrame = df2.groupBy(CosmosTableSchemaInferrer.OperationTypeAttributeName)
@@ -543,6 +547,119 @@ class SparkE2EChangeFeedITest
543547
})
544548
}
545549

550+
"spark change feed query streaming (full fidelity)" can "use default schema" in {
551+
552+
val cosmosEndpoint = TestConfigurations.HOST
553+
val cosmosMasterKey = TestConfigurations.MASTER_KEY
554+
val cosmosContainerName = s"${UUID.randomUUID().toString}"
555+
val properties: CosmosContainerProperties =
556+
new CosmosContainerProperties(cosmosContainerName, "/pk")
557+
properties.setChangeFeedPolicy(
558+
ChangeFeedPolicy.createAllVersionsAndDeletesPolicy(Duration.ofMinutes(10)))
559+
cosmosClient
560+
.getDatabase(cosmosDatabase)
561+
.createContainer(properties)
562+
.block
563+
val sinkContainerName = cosmosClient
564+
.getDatabase(cosmosDatabase)
565+
.createContainer(s"sink-${UUID.randomUUID().toString}", "/pk")
566+
.block
567+
.getProperties
568+
.getId
569+
570+
val readCfg = Map(
571+
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
572+
"spark.cosmos.accountKey" -> cosmosMasterKey,
573+
"spark.cosmos.database" -> cosmosDatabase,
574+
"spark.cosmos.container" -> cosmosContainerName,
575+
"spark.cosmos.read.inferSchema.enabled" -> "false",
576+
"spark.cosmos.changeFeed.mode" -> "FullFidelity",
577+
"spark.cosmos.changeFeed.startFrom" -> "NOW",
578+
)
579+
580+
val writeCfg = Map(
581+
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
582+
"spark.cosmos.accountKey" -> cosmosMasterKey,
583+
"spark.cosmos.database" -> cosmosDatabase,
584+
"spark.cosmos.container" -> sinkContainerName,
585+
"spark.cosmos.write.strategy" -> "ItemOverwrite",
586+
"spark.cosmos.write.bulk.enabled" -> "true"
587+
)
588+
589+
val changeFeedDF = spark
590+
.readStream
591+
.format("cosmos.oltp.changeFeed")
592+
.options(readCfg)
593+
.load()
594+
595+
val modifiedChangeFeedDF = changeFeedDF.withColumn("_rawBody", concat(lit("{\"id\":\""), col("id"), lit("\"}")))
596+
597+
val microBatchQuery = modifiedChangeFeedDF
598+
.writeStream
599+
.format("cosmos.oltp")
600+
.options(writeCfg)
601+
.option("checkpointLocation", "/tmp/" + UUID.randomUUID().toString)
602+
.outputMode("append")
603+
.start()
604+
605+
val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainerName)
606+
607+
val createdObjectIds = new mutable.HashMap[String, String]()
608+
val replacedObjectIds = new mutable.HashMap[String, String]()
609+
val deletedObjectIds = new mutable.HashMap[String, String]()
610+
// Perform operations for change feed to capture
611+
for (sequenceNumber <- 1 to 20) {
612+
val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
613+
objectNode.put("name", "Shrodigner's cat")
614+
objectNode.put("type", "cat")
615+
val pk = UUID.randomUUID().toString
616+
objectNode.put("pk", pk)
617+
objectNode.put("age", 20)
618+
objectNode.put("sequenceNumber", sequenceNumber)
619+
val id = UUID.randomUUID().toString
620+
objectNode.put("id", id)
621+
createdObjectIds.put(id, pk)
622+
if (sequenceNumber % 2 == 0) {
623+
replacedObjectIds.put(id, pk)
624+
}
625+
if (sequenceNumber % 3 == 0) {
626+
deletedObjectIds.put(id, pk)
627+
}
628+
container.createItem(objectNode).block()
629+
}
630+
631+
for (id <- replacedObjectIds.keys) {
632+
val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
633+
objectNode.put("name", "Shrodigner's cat")
634+
objectNode.put("type", "dog")
635+
objectNode.put("age", 25)
636+
objectNode.put("id", id)
637+
objectNode.put("pk", replacedObjectIds(id))
638+
container.replaceItem(objectNode, id, new PartitionKey(replacedObjectIds(id))).block()
639+
}
640+
641+
for (id <- deletedObjectIds.keys) {
642+
container.deleteItem(id, new PartitionKey(deletedObjectIds(id))).block()
643+
}
644+
// wait for the log store to get these changes
645+
Thread.sleep(1000)
646+
changeFeedDF.schema.equals(
647+
ChangeFeedTable.defaultFullFidelityChangeFeedSchemaForInferenceDisabled) shouldEqual true
648+
microBatchQuery.processAllAvailable()
649+
microBatchQuery.stop()
650+
651+
val sinkContainer = cosmosClient.getDatabase(cosmosDatabase).getContainer(sinkContainerName)
652+
val feedResponses = sinkContainer.queryItems("SELECT * FROM c", classOf[ObjectNode]).byPage().collectList().block()
653+
654+
var numResults = 0
655+
for (feedResponse <- feedResponses.asScala) {
656+
numResults += feedResponse.getResults.size()
657+
}
658+
659+
// Basic validation that something was written from bulk
660+
numResults should be > 0
661+
}
662+
546663
"spark change feed query (incremental)" can "proceed with simulated Spark2 Checkpoint" in {
547664
val cosmosEndpoint = TestConfigurations.HOST
548665
val cosmosMasterKey = TestConfigurations.MASTER_KEY

0 commit comments

Comments
 (0)