|
3 | 3 | package com.azure.cosmos.spark
|
4 | 4 |
|
5 | 5 | import com.azure.core.management.AzureEnvironment
|
| 6 | +import com.azure.cosmos.changeFeedMetrics.ChangeFeedMetricsTracker |
6 | 7 | import com.azure.cosmos.{ReadConsistencyStrategy, spark}
|
7 | 8 | import org.apache.spark.sql.connector.read.streaming.ReadLimit
|
8 | 9 |
|
9 | 10 | import java.time.Instant
|
10 | 11 | import java.util.UUID
|
| 12 | +import java.util.concurrent.ConcurrentHashMap |
11 | 13 | import java.util.concurrent.atomic.AtomicLong
|
12 | 14 |
|
13 | 15 | class CosmosPartitionPlannerSpec extends UnitSpec {
|
@@ -484,4 +486,135 @@ class CosmosPartitionPlannerSpec extends UnitSpec {
|
484 | 486 | calculate(0).endLsn.get shouldEqual 2150
|
485 | 487 | calculate(1).endLsn.get shouldEqual 2150
|
486 | 488 | }
|
| 489 | + |
| 490 | + it should "calculateEndLsn should distribute rate based on metrics with readLimit and multiple partitions" in { |
| 491 | + val clientConfig = spark.CosmosClientConfiguration( |
| 492 | + UUID.randomUUID().toString, |
| 493 | + UUID.randomUUID().toString, |
| 494 | + CosmosMasterKeyAuthConfig(UUID.randomUUID().toString), |
| 495 | + None, |
| 496 | + UUID.randomUUID().toString, |
| 497 | + useGatewayMode = false, |
| 498 | + enforceNativeTransport = false, |
| 499 | + proactiveConnectionInitialization = None, |
| 500 | + proactiveConnectionInitializationDurationInSeconds = 120, |
| 501 | + httpConnectionPoolSize = 1000, |
| 502 | + readConsistencyStrategy = ReadConsistencyStrategy.EVENTUAL, |
| 503 | + disableTcpConnectionEndpointRediscovery = false, |
| 504 | + preferredRegionsList = Option.empty, |
| 505 | + subscriptionId = None, |
| 506 | + tenantId = None, |
| 507 | + resourceGroupName = None, |
| 508 | + azureEnvironmentEndpoints = AzureEnvironment.AZURE.getEndpoints, |
| 509 | + sparkEnvironmentInfo = "", |
| 510 | + clientBuilderInterceptors = None, |
| 511 | + clientInterceptors = None, |
| 512 | + sampledDiagnosticsLoggerConfig = None, |
| 513 | + azureMonitorConfig = None |
| 514 | + ) |
| 515 | + |
| 516 | + val containerConfig = CosmosContainerConfig(UUID.randomUUID().toString, UUID.randomUUID().toString) |
| 517 | + val normalizedRange = NormalizedRange(UUID.randomUUID().toString, UUID.randomUUID().toString) |
| 518 | + val docSizeInKB = rnd.nextInt() |
| 519 | + val maxRows = 10 |
| 520 | + val nowEpochMs = Instant.now.toEpochMilli |
| 521 | + val createdAt = new AtomicLong(nowEpochMs) |
| 522 | + val lastRetrievedAt = new AtomicLong(nowEpochMs) |
| 523 | + |
| 524 | + val metadata = PartitionMetadata( |
| 525 | + Map[String, String](), |
| 526 | + clientConfig, |
| 527 | + None, |
| 528 | + containerConfig, |
| 529 | + normalizedRange, |
| 530 | + documentCount = 2150, |
| 531 | + docSizeInKB, |
| 532 | + firstLsn = Some(0), |
| 533 | + latestLsn = 2150, |
| 534 | + startLsn = 2050, |
| 535 | + None, |
| 536 | + createdAt, |
| 537 | + lastRetrievedAt) |
| 538 | + |
| 539 | + val metricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]() |
| 540 | + val metricsTracker = new ChangeFeedMetricsTracker(0L, normalizedRange) |
| 541 | + // Simulate metrics showing 2 changes per LSN on average |
| 542 | + metricsTracker.track(10, 20) |
| 543 | + metricsMap.put(normalizedRange, metricsTracker) |
| 544 | + |
| 545 | + val calculate = CosmosPartitionPlanner.calculateEndLsn( |
| 546 | + Array[PartitionMetadata](metadata), |
| 547 | + ReadLimit.maxRows(maxRows), |
| 548 | + isChangeFeed = true, |
| 549 | + Some(metricsMap) |
| 550 | + ) |
| 551 | + |
| 552 | + // With 2 changes per LSN average from metrics and maxRows=10, should allow 5 LSN progress |
| 553 | + calculate(0).endLsn.get shouldEqual 2055 |
| 554 | + } |
| 555 | + |
| 556 | + it should "calculateEndLsn should handle when no progress is made even with metrics" in { |
| 557 | + val clientConfig = spark.CosmosClientConfiguration( |
| 558 | + UUID.randomUUID().toString, |
| 559 | + UUID.randomUUID().toString, |
| 560 | + CosmosMasterKeyAuthConfig(UUID.randomUUID().toString), |
| 561 | + None, |
| 562 | + UUID.randomUUID().toString, |
| 563 | + useGatewayMode = false, |
| 564 | + enforceNativeTransport = false, |
| 565 | + proactiveConnectionInitialization = None, |
| 566 | + proactiveConnectionInitializationDurationInSeconds = 120, |
| 567 | + httpConnectionPoolSize = 1000, |
| 568 | + readConsistencyStrategy = ReadConsistencyStrategy.EVENTUAL, |
| 569 | + disableTcpConnectionEndpointRediscovery = false, |
| 570 | + preferredRegionsList = Option.empty, |
| 571 | + subscriptionId = None, |
| 572 | + tenantId = None, |
| 573 | + resourceGroupName = None, |
| 574 | + azureEnvironmentEndpoints = AzureEnvironment.AZURE.getEndpoints, |
| 575 | + sparkEnvironmentInfo = "", |
| 576 | + clientBuilderInterceptors = None, |
| 577 | + clientInterceptors = None, |
| 578 | + sampledDiagnosticsLoggerConfig = None, |
| 579 | + azureMonitorConfig = None |
| 580 | + ) |
| 581 | + |
| 582 | + val containerConfig = CosmosContainerConfig(UUID.randomUUID().toString, UUID.randomUUID().toString) |
| 583 | + val normalizedRange = NormalizedRange(UUID.randomUUID().toString, UUID.randomUUID().toString) |
| 584 | + val docSizeInKB = rnd.nextInt() |
| 585 | + val maxRows = 10 |
| 586 | + val nowEpochMs = Instant.now.toEpochMilli |
| 587 | + val createdAt = new AtomicLong(nowEpochMs) |
| 588 | + val lastRetrievedAt = new AtomicLong(nowEpochMs) |
| 589 | + |
| 590 | + val metadata = PartitionMetadata( |
| 591 | + Map[String, String](), |
| 592 | + clientConfig, |
| 593 | + None, |
| 594 | + containerConfig, |
| 595 | + normalizedRange, |
| 596 | + documentCount = 2150, |
| 597 | + docSizeInKB, |
| 598 | + firstLsn = Some(0), |
| 599 | + latestLsn = 2050, // Latest LSN same as start LSN |
| 600 | + startLsn = 2050, |
| 601 | + None, |
| 602 | + createdAt, |
| 603 | + lastRetrievedAt) |
| 604 | + |
| 605 | + val metricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]() |
| 606 | + val metricsTracker = new ChangeFeedMetricsTracker(0L, normalizedRange) |
| 607 | + metricsTracker.track(2050, 100) |
| 608 | + metricsMap.put(normalizedRange, metricsTracker) |
| 609 | + |
| 610 | + val calculate = CosmosPartitionPlanner.calculateEndLsn( |
| 611 | + Array[PartitionMetadata](metadata), |
| 612 | + ReadLimit.maxRows(maxRows), |
| 613 | + isChangeFeed = true, |
| 614 | + Some(metricsMap) |
| 615 | + ) |
| 616 | + |
| 617 | + // Should stay at start LSN since no progress can be made |
| 618 | + calculate(0).endLsn.get shouldEqual 2050 |
| 619 | + } |
487 | 620 | }
|
0 commit comments