diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/common/EqualPartitionsBalancingStrategyTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/common/EqualPartitionsBalancingStrategyTests.java new file mode 100644 index 000000000000..2d1c1369cd7e --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/common/EqualPartitionsBalancingStrategyTests.java @@ -0,0 +1,121 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.changefeed.common; + +import com.azure.cosmos.implementation.changefeed.Lease; +import com.azure.cosmos.implementation.changefeed.epkversion.ServiceItemLeaseV1; +import org.testng.annotations.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class EqualPartitionsBalancingStrategyTests { + + @Test(groups = "unit") + public void expiredLeases_legacyClampsToOneWhenMultipleWorkers() { + String hostName = "me"; + List allLeases = new ArrayList<>(); + + // Multiple workers exist because there are existing owners in the lease set. + for (int i = 0; i < 10; i++) { + allLeases.add(newLease("unowned-" + i, null)); + allLeases.add(newLease("old1-" + i, "old1", Instant.now())); + allLeases.add(newLease("old2-" + i, "old2", Instant.now())); + } + + EqualPartitionsBalancingStrategy strategy = + new EqualPartitionsBalancingStrategy(hostName, 0, 0, Duration.ofSeconds(60)); + + List leasesToTake = strategy.selectLeasesToTake(allLeases); + assertEquals(leasesToTake.size(), 1); + } + + @Test(groups = "unit") + public void expiredLeases_allowsMultipleWhenConfigured() { + String hostName = "me"; + List allLeases = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + allLeases.add(newLease("unowned-" + i, null)); + allLeases.add(newLease("old1-" + i, "old1", Instant.now())); + allLeases.add(newLease("old2-" + i, "old2", Instant.now())); + } + + EqualPartitionsBalancingStrategy strategy = + new EqualPartitionsBalancingStrategy(hostName, 0, 0, Duration.ofSeconds(60), 5); + + List leasesToTake = strategy.selectLeasesToTake(allLeases); + assertEquals(leasesToTake.size(), 5); + assertUniqueLeaseTokens(leasesToTake); + } + + @Test(groups = "unit") + public void stealLeases_legacyClampsToOne() { + String hostName = "me"; + List allLeases = new ArrayList<>(); + + // No expired leases: everything is owned by a single other worker. + for (int i = 0; i < 30; i++) { + allLeases.add(newLease("old-" + i, "old", Instant.now())); + } + + EqualPartitionsBalancingStrategy strategy = + new EqualPartitionsBalancingStrategy(hostName, 0, 0, Duration.ofSeconds(60)); + + List leasesToTake = strategy.selectLeasesToTake(allLeases); + assertEquals(leasesToTake.size(), 1); + assertEquals(leasesToTake.get(0).getOwner(), "old"); + } + + @Test(groups = "unit") + public void stealLeases_stillClampsToOneWhenConfigured() { + String hostName = "me"; + List allLeases = new ArrayList<>(); + + for (int i = 0; i < 30; i++) { + allLeases.add(newLease("old-" + i, "old", Instant.now())); + } + + EqualPartitionsBalancingStrategy strategy = + new EqualPartitionsBalancingStrategy(hostName, 0, 0, Duration.ofSeconds(60), 5); + + List leasesToTake = strategy.selectLeasesToTake(allLeases); + // Multi-acquire is only for unused/expired leases; stealing intentionally keeps the legacy 1-lease-per-cycle behavior. + assertEquals(leasesToTake.size(), 1); + assertUniqueLeaseTokens(leasesToTake); + + for (Lease lease : leasesToTake) { + assertEquals(lease.getOwner(), "old"); + } + } + + private static ServiceItemLeaseV1 newLease(String token, String owner) { + return newLease(token, owner, null); + } + + private static ServiceItemLeaseV1 newLease(String token, String owner, Instant timestamp) { + ServiceItemLeaseV1 lease = new ServiceItemLeaseV1() + .withLeaseToken(token) + .withOwner(owner); + if (timestamp != null) { + lease.withTimestamp(timestamp); + } + lease.setId("lease-" + token); + return lease; + } + + private static void assertUniqueLeaseTokens(List leases) { + Set tokens = new HashSet<>(); + for (Lease lease : leases) { + assertTrue(tokens.add(lease.getLeaseToken()), "Duplicate lease token: " + lease.getLeaseToken()); + } + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionLoadBalancerImplTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionLoadBalancerImplTests.java index 67d692919c08..68bc3441191e 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionLoadBalancerImplTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionLoadBalancerImplTests.java @@ -16,6 +16,7 @@ import java.time.Duration; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.UUID; @@ -79,4 +80,57 @@ public void run(boolean loadBalancingSucceeded) throws InterruptedException { .subscribeOn(Schedulers.boundedElastic()) .subscribe(); } + + @Test(groups = "unit") + public void run_multipleLeasesReturnedByStrategy_areAllAttempted() { + PartitionController partitionControllerMock = Mockito.mock(PartitionController.class); + LeaseContainer leaseContainerMock = Mockito.mock(LeaseContainer.class); + PartitionLoadBalancingStrategy partitionLoadBalancingStrategyMock = Mockito.mock(PartitionLoadBalancingStrategy.class); + + ServiceItemLeaseV1 lease1 = new ServiceItemLeaseV1().withLeaseToken("1"); + lease1.setId("TestLease-" + UUID.randomUUID()); + ServiceItemLeaseV1 lease2 = new ServiceItemLeaseV1().withLeaseToken("2"); + lease2.setId("TestLease-" + UUID.randomUUID()); + ServiceItemLeaseV1 lease3 = new ServiceItemLeaseV1().withLeaseToken("3"); + lease3.setId("TestLease-" + UUID.randomUUID()); + + List allLeases = Arrays.asList(lease1, lease2, lease3); + Mockito.when(leaseContainerMock.getAllLeases()).thenReturn(Flux.fromIterable(allLeases)); + + // PartitionLoadBalancer collects leases into a new list instance each cycle, so don't match on identity. + Mockito.when(partitionLoadBalancingStrategyMock.selectLeasesToTake(Mockito.anyList())) + .thenReturn(allLeases) + .thenReturn(Collections.emptyList()); + + // Ensure the "happy path" doesn't get swallowed by the load balancer's error handler. + Mockito.when(partitionControllerMock.addOrUpdateLease(Mockito.any())) + .thenAnswer(invocation -> Mono.just((Lease) invocation.getArgument(0))); + Mockito.when(partitionControllerMock.shutdown()).thenReturn(Mono.empty()); + + PartitionLoadBalancerImpl partitionLoadBalancerImpl = + new PartitionLoadBalancerImpl( + partitionControllerMock, + leaseContainerMock, + partitionLoadBalancingStrategyMock, + Duration.ofSeconds(1), + Schedulers.boundedElastic(), + null + ); + + partitionLoadBalancerImpl + .start() + .timeout(Duration.ofMillis(PARTITION_LOAD_BALANCER_TIMEOUT)) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe(); + + // Wait until the first balancing cycle attempts all leases returned by the strategy. + Mockito.verify(partitionControllerMock, Mockito.timeout(PARTITION_LOAD_BALANCER_TIMEOUT).times(allLeases.size())) + .addOrUpdateLease(Mockito.any()); + + partitionLoadBalancerImpl + .stop() + .timeout(Duration.ofMillis(PARTITION_LOAD_BALANCER_TIMEOUT)) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe(); + } } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionLoadBalancerImplTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionLoadBalancerImplTests.java index f9fa4f069a51..b2f0b95276ca 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionLoadBalancerImplTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionLoadBalancerImplTests.java @@ -16,6 +16,7 @@ import java.time.Duration; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.UUID; @@ -78,4 +79,54 @@ public void run(boolean loadBalancingSucceeded) throws InterruptedException { .subscribeOn(Schedulers.boundedElastic()) .subscribe(); } + + @Test(groups = "unit") + public void run_multipleLeasesReturnedByStrategy_areAllAttempted() { + PartitionController partitionControllerMock = Mockito.mock(PartitionController.class); + LeaseContainer leaseContainerMock = Mockito.mock(LeaseContainer.class); + PartitionLoadBalancingStrategy partitionLoadBalancingStrategyMock = Mockito.mock(PartitionLoadBalancingStrategy.class); + + ServiceItemLease lease1 = new ServiceItemLease().withLeaseToken("1"); + lease1.setId("TestLease-" + UUID.randomUUID()); + ServiceItemLease lease2 = new ServiceItemLease().withLeaseToken("2"); + lease2.setId("TestLease-" + UUID.randomUUID()); + ServiceItemLease lease3 = new ServiceItemLease().withLeaseToken("3"); + lease3.setId("TestLease-" + UUID.randomUUID()); + + List allLeases = Arrays.asList(lease1, lease2, lease3); + Mockito.when(leaseContainerMock.getAllLeases()).thenReturn(Flux.fromIterable(allLeases)); + + // PartitionLoadBalancer collects leases into a new list instance each cycle, so don't match on identity. + Mockito.when(partitionLoadBalancingStrategyMock.selectLeasesToTake(Mockito.anyList())) + .thenReturn(allLeases) + .thenReturn(Collections.emptyList()); + + Mockito.when(partitionControllerMock.addOrUpdateLease(Mockito.any())) + .thenAnswer(invocation -> Mono.just((Lease) invocation.getArgument(0))); + Mockito.when(partitionControllerMock.shutdown()).thenReturn(Mono.empty()); + + PartitionLoadBalancerImpl partitionLoadBalancerImpl = + new PartitionLoadBalancerImpl( + partitionControllerMock, + leaseContainerMock, + partitionLoadBalancingStrategyMock, + Duration.ofSeconds(1), + Schedulers.boundedElastic() + ); + + partitionLoadBalancerImpl + .start() + .timeout(Duration.ofMillis(PARTITION_LOAD_BALANCER_TIMEOUT)) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe(); + + Mockito.verify(partitionControllerMock, Mockito.timeout(PARTITION_LOAD_BALANCER_TIMEOUT).times(allLeases.size())) + .addOrUpdateLease(Mockito.any()); + + partitionLoadBalancerImpl + .stop() + .timeout(Duration.ofMillis(PARTITION_LOAD_BALANCER_TIMEOUT)) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe(); + } } diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index ce3b76a5d722..d5580bc343ee 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.77.0-beta.1 (Unreleased) #### Features Added +* Added `ChangeFeedProcessorOptions#setMaxLeasesToAcquirePerCycle(int)` to allow faster acquisition of unused/expired leases during scale-out and rolling deployments (default `0` preserves legacy behavior). #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/EqualPartitionsBalancingStrategy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/EqualPartitionsBalancingStrategy.java index 53827479d417..d5d78baa6934 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/EqualPartitionsBalancingStrategy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/EqualPartitionsBalancingStrategy.java @@ -25,8 +25,18 @@ public class EqualPartitionsBalancingStrategy implements PartitionLoadBalancingS private final int minPartitionCount; private final int maxPartitionCount; private final Duration leaseExpirationInterval; + private final int maxLeasesToAcquirePerCycle; public EqualPartitionsBalancingStrategy(String hostName, int minPartitionCount, int maxPartitionCount, Duration leaseExpirationInterval) { + this(hostName, minPartitionCount, maxPartitionCount, leaseExpirationInterval, 0); + } + + public EqualPartitionsBalancingStrategy( + String hostName, + int minPartitionCount, + int maxPartitionCount, + Duration leaseExpirationInterval, + int maxLeasesToAcquirePerCycle) { if (hostName == null) { throw new IllegalArgumentException("hostName"); } @@ -35,6 +45,10 @@ public EqualPartitionsBalancingStrategy(String hostName, int minPartitionCount, this.minPartitionCount = minPartitionCount; this.maxPartitionCount = maxPartitionCount; this.leaseExpirationInterval = leaseExpirationInterval; + if (maxLeasesToAcquirePerCycle < 0) { + throw new IllegalArgumentException("maxLeasesToAcquirePerCycle cannot be negative"); + } + this.maxLeasesToAcquirePerCycle = maxLeasesToAcquirePerCycle; } @Override @@ -57,43 +71,51 @@ public List selectLeasesToTake(List allLeases) { int partitionsNeededForMe = target - myCount; if (expiredLeases.size() > 0) { - // We should try to pick at least one expired lease even if already overbooked when maximum partition count is not set. - // If other CFP instances are running, limit the number of expired leases to acquire to maximum 1 (non-greedy acquiring). - if ((this.maxPartitionCount == 0 && partitionsNeededForMe <= 0) || (partitionsNeededForMe > 1 && workerToPartitionCount.size() > 1)) { - partitionsNeededForMe = 1; + // Determine how many unused/expired leases to attempt this cycle. + // 1) If maxScaleCount is not set (unlimited), try to pick at least one expired lease even if we're already overbooked. + // 2) If maxLeasesToAcquirePerCycle is configured, cap with it (overrides the legacy non-greedy clamp by design). + // 3) Otherwise, preserve the legacy non-greedy clamp (at most one lease per cycle when multiple workers exist). + int leasesToAcquire = partitionsNeededForMe; + if (this.maxPartitionCount == 0 && leasesToAcquire <= 0) { + leasesToAcquire = 1; + } + + if (this.maxLeasesToAcquirePerCycle > 0) { + leasesToAcquire = Math.min(leasesToAcquire, this.maxLeasesToAcquirePerCycle); + } else if (leasesToAcquire > 1 && workerToPartitionCount.size() > 1) { + leasesToAcquire = 1; + } + + if (leasesToAcquire <= 0) { + return new ArrayList<>(); } - if (partitionsNeededForMe == 1) { + Random random = new Random(); + + if (leasesToAcquire == 1) { // Try to minimize potential collisions between different CFP instances trying to pick the same lease. - Random random = new Random(); Lease expiredLease = expiredLeases.get(random.nextInt(expiredLeases.size())); this.logger.info("Found unused or expired lease {} (owner was {}); previous lease count for instance owner {} is {}, count of leases to target is {} and maxScaleCount {} ", - expiredLease.getLeaseToken(), expiredLease.getOwner(), this.hostName, myCount, partitionsNeededForMe, this.maxPartitionCount); + expiredLease.getLeaseToken(), expiredLease.getOwner(), this.hostName, myCount, leasesToAcquire, this.maxPartitionCount); return Collections.singletonList(expiredLease); - } else { - for (Lease lease : expiredLeases) { - this.logger.info("Found unused or expired lease {} (owner was {}); previous lease count for instance owner {} is {} and maxScaleCount {} ", - lease.getLeaseToken(), lease.getOwner(), this.hostName, myCount, this.maxPartitionCount); - } } - // If we reach here with partitionsNeededForMe < 0, then it means the change feed processor instances has owned leases >= the maxScaleCount. - // Then in this case, the change feed processor instance will not pick up any new leases. - if (partitionsNeededForMe <= 0) - { - return new ArrayList<>(); - } + // For multiple acquisitions, shuffle and take a random subset. + Collections.shuffle(expiredLeases, random); + this.logger.info("Found {} unused or expired leases; previous lease count for instance owner {} is {}, count of leases to target is {} and maxScaleCount {} ", + expiredLeases.size(), this.hostName, myCount, leasesToAcquire, this.maxPartitionCount); - return expiredLeases.subList(0, Math.min(partitionsNeededForMe, expiredLeases.size())); + return expiredLeases.subList(0, Math.min(leasesToAcquire, expiredLeases.size())); } - if (partitionsNeededForMe <= 0) - return new ArrayList(); + if (partitionsNeededForMe <= 0) { + return new ArrayList<>(); + } + // Intentionally keep the legacy behavior for stealing: attempt to steal at most 1 lease per cycle. Lease stolenLease = getLeaseToSteal(workerToPartitionCount, target, partitionsNeededForMe, allPartitions); List stolenLeases = new ArrayList<>(); - if (stolenLease != null) { stolenLeases.add(stolenLease); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ChangeFeedProcessorImplBase.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ChangeFeedProcessorImplBase.java index 0b0748b3d0ac..c726d3d2cde6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ChangeFeedProcessorImplBase.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ChangeFeedProcessorImplBase.java @@ -440,10 +440,11 @@ private Mono buildPartitionManager(LeaseStoreManager leaseStor if (this.loadBalancingStrategy == null) { this.loadBalancingStrategy = new EqualPartitionsBalancingStrategy( - this.hostName, - this.changeFeedProcessorOptions.getMinScaleCount(), - this.changeFeedProcessorOptions.getMaxScaleCount(), - this.changeFeedProcessorOptions.getLeaseExpirationInterval()); + this.hostName, + this.changeFeedProcessorOptions.getMinScaleCount(), + this.changeFeedProcessorOptions.getMaxScaleCount(), + this.changeFeedProcessorOptions.getLeaseExpirationInterval(), + this.changeFeedProcessorOptions.getMaxLeasesToAcquirePerCycle()); } PartitionController partitionController = new PartitionControllerImpl(leaseStoreManager, leaseStoreManager, partitionSupervisorFactory, synchronizer, scheduler); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/IncrementalChangeFeedProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/IncrementalChangeFeedProcessorImpl.java index 94ae7633c456..930a8b44bfb1 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/IncrementalChangeFeedProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/IncrementalChangeFeedProcessorImpl.java @@ -472,7 +472,8 @@ private Mono buildPartitionManager(LeaseStoreManager leaseStor this.hostName, this.changeFeedProcessorOptions.getMinScaleCount(), this.changeFeedProcessorOptions.getMaxScaleCount(), - this.changeFeedProcessorOptions.getLeaseExpirationInterval()); + this.changeFeedProcessorOptions.getLeaseExpirationInterval(), + this.changeFeedProcessorOptions.getMaxLeasesToAcquirePerCycle()); } PartitionController partitionController = diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ChangeFeedProcessorOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ChangeFeedProcessorOptions.java index 6e35e910f3b9..e34b92f6244f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ChangeFeedProcessorOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ChangeFeedProcessorOptions.java @@ -50,6 +50,13 @@ public final class ChangeFeedProcessorOptions { private boolean startFromBeginning; private int minScaleCount; private int maxScaleCount; + /** + * Maximum number of leases the instance will try to acquire in a single load balancing cycle. + *

+ * A value of {@code 0} keeps the legacy behavior (which is intentionally conservative when multiple workers exist and + * typically attempts to acquire at most one lease per cycle). + */ + private int maxLeasesToAcquirePerCycle; private boolean leaseVerificationOnRestartEnabled; @@ -67,6 +74,8 @@ public ChangeFeedProcessorOptions() { this.leaseExpirationInterval = DEFAULT_EXPIRATION_INTERVAL; this.feedPollDelay = DEFAULT_FEED_POLL_DELAY; this.maxScaleCount = 0; // unlimited + // 0 -> legacy behavior (typically acquires at most one lease per balancing cycle when multiple CFP instances exist). + this.maxLeasesToAcquirePerCycle = 0; this.scheduler = Schedulers.boundedElastic(); this.feedPollThroughputControlGroupConfig = null; @@ -116,6 +125,37 @@ public ChangeFeedProcessorOptions setLeaseAcquireInterval(Duration leaseAcquireI return this; } + /** + * Gets the maximum number of leases the instance will try to acquire in a single load balancing cycle. + *

+ * A value of {@code 0} keeps the legacy behavior. + * + * @return maximum leases to acquire per cycle, or {@code 0} for legacy behavior. + */ + public int getMaxLeasesToAcquirePerCycle() { + return this.maxLeasesToAcquirePerCycle; + } + + /** + * Sets the maximum number of leases the instance will try to acquire in a single load balancing cycle. + *

+ * Use this to speed up acquisition of unused/expired leases after scale-out or rolling deployments when using + * ephemeral host names. + * A value of {@code 0} keeps the legacy behavior. + * Higher values may increase RU consumption in the lease container and can increase lease acquisition conflicts + * when many instances are starting at the same time. + * + * @param maxLeasesToAcquirePerCycle max leases to acquire per cycle, or {@code 0} for legacy behavior. + * @return the current ChangeFeedProcessorOptions instance. + */ + public ChangeFeedProcessorOptions setMaxLeasesToAcquirePerCycle(int maxLeasesToAcquirePerCycle) { + if (maxLeasesToAcquirePerCycle < 0) { + throw new IllegalArgumentException("maxLeasesToAcquirePerCycle cannot be negative"); + } + this.maxLeasesToAcquirePerCycle = maxLeasesToAcquirePerCycle; + return this; + } + /** * Gets the interval for which the lease is taken on a lease representing a partition. *