Skip to content

Commit 1742679

Browse files
authored
Merge pull request #193969 from FabianMeiswinkel/users/fabianm/CHangeSparkStreamingSamples
Modify streaming sample in Spark 2.4 to use forEachBatch
2 parents 014b77e + 6d4262b commit 1742679

File tree

1 file changed

+25
-10
lines changed

1 file changed

+25
-10
lines changed

articles/synapse-analytics/synapse-link/how-to-query-analytical-store-spark.md

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -177,14 +177,22 @@ The syntax in **Python** would be the following:
177177

178178
# If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway.
179179

180+
def writeBatchToCosmos(batchDF, batchId):
181+
batchDF.persist()
182+
print("--> BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
183+
batchDF.write.format("cosmos.oltp")\
184+
.option("spark.synapse.linkedService", "<enter linked service name>")\
185+
.option("spark.cosmos.container", "<enter container name>")\
186+
.option("spark.cosmos.write.upsertEnabled", "true")\
187+
.mode('append')\
188+
.save()
189+
print("<-- BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
190+
batchDF.unpersist()
191+
180192
streamQuery = dfStream\
181193
.writeStream\
182-
.format("cosmos.oltp")\
183-
.outputMode("append")\
194+
.foreachBatch(writeBatchToCosmos) \
184195
.option("checkpointLocation", "/localWriteCheckpointFolder")\
185-
.option("spark.synapse.linkedService", "<enter linked service name>")\
186-
.option("spark.cosmos.container", "<enter container name>")\
187-
.option("spark.cosmos.connection.mode", "Gateway")\
188196
.start()
189197

190198
streamQuery.awaitTermination()
@@ -198,12 +206,19 @@ The equivalent syntax in **Scala** would be the following:
198206

199207
val query = dfStream.
200208
writeStream.
201-
format("cosmos.oltp").
202-
outputMode("append").
209+
foreachBatch { (batchDF: DataFrame, batchId: Long) =>
210+
batchDF.persist()
211+
batchDF.write.format("cosmos.oltp").
212+
option("spark.synapse.linkedService", "<enter linked service name>").
213+
option("spark.cosmos.container", "<enter container name>").
214+
option("spark.cosmos.write.upsertEnabled", "true").
215+
mode(SaveMode.Overwrite).
216+
save()
217+
println(s"BatchId: $batchId, Document count: ${batchDF.count()}")
218+
batchDF.unpersist()
219+
()
220+
}.
203221
option("checkpointLocation", "/localWriteCheckpointFolder").
204-
option("spark.synapse.linkedService", "<enter linked service name>").
205-
option("spark.cosmos.container", "<enter container name>").
206-
option("spark.cosmos.connection.mode", "Gateway").
207222
start()
208223

209224
query.awaitTermination()

0 commit comments

Comments
 (0)