Skip to content

Commit 77036c2

Browse files
Fixing offset for Spark 2 -> Spark 3 offset conversion (Azure#33757)
1 parent f1f4b7e commit 77036c2

File tree

3 files changed

+3
-3
lines changed

3 files changed

+3
-3
lines changed

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra
277277

278278
// Spark 3 tracks the last LSN for which docs have been successfully processed
279279
// Spark 2 LSN is offset by 1 because in Spark 2 the next-to-be-sent-as-continuation LSN is tracked
280-
val lsn: Long = token._2 + 1
280+
val lsn: Long = token._2 - 1
281281

282282
val range: Range[String] = if (pkRangesByPkRangeId.contains(pkRangeId)) {
283283
pkRangesByPkRangeId.get(pkRangeId).get.toRange

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/CreateSpark2ContinuationsFromChangeFeedOffset.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ class CreateSpark2ContinuationsFromChangeFeedOffset extends UDF2[Map[String, Str
112112
pkRange.getId.toInt,
113113
// Spark 3 tracks the last LSN for which docs have been successfully processed
114114
// Spark 2 LSN is offset by 1 because in Spark 2 the next-to-be-sent-as-continuation LSN is tracked
115-
Math.max(0, SparkBridgeImplementationInternal.toLsn(minLsn.get.getToken) - 1))
115+
Math.max(0, SparkBridgeImplementationInternal.toLsn(minLsn.get.getToken) + 1))
116116
}
117117
})
118118

sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -730,7 +730,7 @@ class SparkE2EChangeFeedITest
730730
val quotedToken = filteredCompositeContinuations.head.getToken
731731
// Spark 3 tracks the last LSN for which docs have been successfully processed
732732
// Spark 2 LSN is offset by 1 because in Spark 2 the next-to-be-sent-as-continuation LSN is tracked
733-
val lsn: Long = Math.max(0, quotedToken.substring(1, quotedToken.length - 1).toLong - 1)
733+
val lsn: Long = Math.max(0, quotedToken.substring(1, quotedToken.length - 1).toLong + 1)
734734

735735
tokenMap += (pkRangeId -> lsn)
736736
})

0 commit comments

Comments
 (0)