diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 0681a7f70499..d4616721a259 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -321,6 +321,8 @@ public enum CassandraRelevantProperties /** Which class to use for token metadata provider */ CUSTOM_TMD_PROVIDER_PROPERTY("cassandra.custom_token_metadata_provider_class"), + CUSTOM_REPLICA_PLAN_ITERATOR_PROVIDER_PROPERTY("cassandra.custom_replica_plan_iterator_provider_class"), + /** Which class to use for failure detection */ CUSTOM_FAILURE_DETECTOR_PROPERTY("cassandra.custom_failure_detector_class"), diff --git a/src/java/org/apache/cassandra/locator/ReplicaLayout.java b/src/java/org/apache/cassandra/locator/ReplicaLayout.java index 3a2dcc40e9db..99b5190a873e 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaLayout.java +++ b/src/java/org/apache/cassandra/locator/ReplicaLayout.java @@ -341,7 +341,7 @@ static ReplicaLayout.ForTokenRead forTokenReadLiveSorted(AbstractReplicationStra * @return the read layout for a range - this includes only live natural replicas, i.e. those that are not pending * and not marked down by the failure detector. these are reverse sorted by the badness score of the configured snitch */ - static ReplicaLayout.ForRangeRead forRangeReadLiveSorted(AbstractReplicationStrategy replicationStrategy, AbstractBounds range) + public static ReplicaLayout.ForRangeRead forRangeReadLiveSorted(AbstractReplicationStrategy replicationStrategy, AbstractBounds range) { EndpointsForRange replicas = replicationStrategy.getNaturalReplicas(range.right); replicas = DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas); diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java index 2c4186a89bd6..4b745029a345 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java @@ -138,7 +138,7 @@ public static boolean isSufficientLiveReplicasForRead(AbstractReplicationStrateg } } - static void assureSufficientLiveReplicasForRead(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints liveReplicas) throws UnavailableException + public static void assureSufficientLiveReplicasForRead(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints liveReplicas) throws UnavailableException { assureSufficientLiveReplicas(replicationStrategy, consistencyLevel, liveReplicas, consistencyLevel.blockFor(replicationStrategy), 1); } @@ -659,7 +659,7 @@ public static ReplicaPlan.ForPaxosWrite forPaxos(Keyspace keyspace, DecoratedKey } - private static > E candidatesForRead(Keyspace keyspace, Index.QueryPlan indexQueryPlan, ConsistencyLevel consistencyLevel, E liveNaturalReplicas) + public static > E candidatesForRead(Keyspace keyspace, Index.QueryPlan indexQueryPlan, ConsistencyLevel consistencyLevel, E liveNaturalReplicas) { E replicas = consistencyLevel.isDatacenterLocal() ? liveNaturalReplicas.filter(InOurDcTester.replicas()) : liveNaturalReplicas; @@ -677,7 +677,7 @@ private static > E contactForEachQuorumRead(NetworkTopolo }); } - private static > E contactForRead(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, boolean alwaysSpeculate, E candidates) + public static > E contactForRead(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, boolean alwaysSpeculate, E candidates) { /* * If we are doing an each quorum query, we have to make sure that the endpoints we select diff --git a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java index 79185d10b27d..0dc801bd3cb8 100644 --- a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java +++ b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java @@ -35,6 +35,7 @@ import org.apache.cassandra.db.marshal.UserType; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.schema.UserFunctions.FunctionsDiff; import org.apache.cassandra.schema.Tables.TablesDiff; import org.apache.cassandra.schema.Types.TypesDiff; @@ -389,10 +390,15 @@ public void validate() } public AbstractReplicationStrategy createReplicationStrategy() + { + return createReplicationStrategy(StorageService.instance.getTokenMetadataForKeyspace(name)); + } + + public AbstractReplicationStrategy createReplicationStrategy(TokenMetadata tokenMetadata) { return AbstractReplicationStrategy.createReplicationStrategy(name, params.replication.klass, - StorageService.instance.getTokenMetadataForKeyspace(name), + tokenMetadata, DatabaseDescriptor.getEndpointSnitch(), params.replication.options); } diff --git a/src/java/org/apache/cassandra/service/reads/range/AbstractReplicaPlanIterator.java b/src/java/org/apache/cassandra/service/reads/range/AbstractReplicaPlanIterator.java new file mode 100644 index 000000000000..9886fba3efae --- /dev/null +++ b/src/java/org/apache/cassandra/service/reads/range/AbstractReplicaPlanIterator.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.range; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.utils.AbstractIterator; +import org.apache.cassandra.utils.Pair; + +public abstract class AbstractReplicaPlanIterator extends AbstractIterator +{ + public abstract int size(); + + /** + * Compute all ranges we're going to query, in sorted order. Nodes can be replica destinations for many ranges, + * so we need to restrict each scan to the specific range we want, or else we'd get duplicate results. + */ + public static List> getRestrictedRanges(TokenMetadata tokenMetadata, final AbstractBounds queryRange) + { + // special case for bounds containing exactly 1 (non-minimum) token + if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.isMinimum()) + { + return Collections.singletonList(queryRange); + } + + List> ranges = new ArrayList<>(); + // divide the queryRange into pieces delimited by the ring and minimum tokens + Iterator ringIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), queryRange.left.getToken(), true); + AbstractBounds remainder = queryRange; + while (ringIter.hasNext()) + { + /* + * remainder is a range/bounds of partition positions and we want to split it with a token. We want to split + * using the key returned by token.maxKeyBound. For instance, if remainder is [DK(10, 'foo'), DK(20, 'bar')], + * and we have 3 nodes with tokens 0, 15, 30, we want to split remainder to A=[DK(10, 'foo'), 15] and + * B=(15, DK(20, 'bar')]. But since we can't mix tokens and keys at the same time in a range, we use + * 15.maxKeyBound() to have A include all keys having 15 as token and B include none of those (since that is + * what our node owns). + */ + Token upperBoundToken = ringIter.next(); + PartitionPosition upperBound = upperBoundToken.maxKeyBound(); + if (!remainder.left.equals(upperBound) && !remainder.contains(upperBound)) + // no more splits + break; + Pair, AbstractBounds> splits = remainder.split(upperBound); + if (splits == null) + continue; + + ranges.add(splits.left); + remainder = splits.right; + } + ranges.add(remainder); + + return ranges; + } +} diff --git a/src/java/org/apache/cassandra/service/reads/range/RangeCommands.java b/src/java/org/apache/cassandra/service/reads/range/RangeCommands.java index 63291c34a943..c5d4b0005646 100644 --- a/src/java/org/apache/cassandra/service/reads/range/RangeCommands.java +++ b/src/java/org/apache/cassandra/service/reads/range/RangeCommands.java @@ -73,10 +73,7 @@ static RangeCommandIterator rangeCommandIterator(PartitionRangeReadCommand comma Tracing.trace("Computing ranges to query"); Keyspace keyspace = Keyspace.open(command.metadata().keyspace); - ReplicaPlanIterator replicaPlans = new ReplicaPlanIterator(command.dataRange().keyRange(), - command.indexQueryPlan(), - keyspace, - consistencyLevel); + AbstractReplicaPlanIterator replicaPlans = ReplicaPlanIteratorProvider.instance.getReplicaPlanIterator(command, keyspace, consistencyLevel); if (command.isTopK()) return new ScanAllRangesCommandIterator(keyspace, replicaPlans, command, replicaPlans.size(), queryStartNanoTime, readTracker); diff --git a/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java b/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java index 605afa4db632..7636047418e2 100644 --- a/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java +++ b/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java @@ -36,11 +36,9 @@ import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.locator.ReplicaPlans; import org.apache.cassandra.locator.TokenMetadata; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.Pair; -class ReplicaPlanIterator extends AbstractIterator +class ReplicaPlanIterator extends AbstractReplicaPlanIterator { private final Keyspace keyspace; private final ConsistencyLevel consistency; @@ -69,7 +67,8 @@ class ReplicaPlanIterator extends AbstractIterator /** * @return the number of {@link ReplicaPlan.ForRangeRead}s in this iterator */ - int size() + @Override + public int size() { return rangeCount; } @@ -82,47 +81,4 @@ protected ReplicaPlan.ForRangeRead computeNext() return ReplicaPlans.forRangeRead(keyspace, indexQueryPlan, consistency, ranges.next(), 1); } - - /** - * Compute all ranges we're going to query, in sorted order. Nodes can be replica destinations for many ranges, - * so we need to restrict each scan to the specific range we want, or else we'd get duplicate results. - */ - private static List> getRestrictedRanges(TokenMetadata tokenMetadata, final AbstractBounds queryRange) - { - // special case for bounds containing exactly 1 (non-minimum) token - if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.isMinimum()) - { - return Collections.singletonList(queryRange); - } - - List> ranges = new ArrayList<>(); - // divide the queryRange into pieces delimited by the ring and minimum tokens - Iterator ringIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), queryRange.left.getToken(), true); - AbstractBounds remainder = queryRange; - while (ringIter.hasNext()) - { - /* - * remainder is a range/bounds of partition positions and we want to split it with a token. We want to split - * using the key returned by token.maxKeyBound. For instance, if remainder is [DK(10, 'foo'), DK(20, 'bar')], - * and we have 3 nodes with tokens 0, 15, 30, we want to split remainder to A=[DK(10, 'foo'), 15] and - * B=(15, DK(20, 'bar')]. But since we can't mix tokens and keys at the same time in a range, we use - * 15.maxKeyBound() to have A include all keys having 15 as token and B include none of those (since that is - * what our node owns). - */ - Token upperBoundToken = ringIter.next(); - PartitionPosition upperBound = upperBoundToken.maxKeyBound(); - if (!remainder.left.equals(upperBound) && !remainder.contains(upperBound)) - // no more splits - break; - Pair, AbstractBounds> splits = remainder.split(upperBound); - if (splits == null) - continue; - - ranges.add(splits.left); - remainder = splits.right; - } - ranges.add(remainder); - - return ranges; - } } diff --git a/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIteratorProvider.java b/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIteratorProvider.java new file mode 100644 index 000000000000..5097de9807c7 --- /dev/null +++ b/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIteratorProvider.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.range; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.PartitionRangeReadCommand; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.config.CassandraRelevantProperties.CUSTOM_REPLICA_PLAN_ITERATOR_PROVIDER_PROPERTY; + +public interface ReplicaPlanIteratorProvider +{ + ReplicaPlanIteratorProvider instance = CUSTOM_REPLICA_PLAN_ITERATOR_PROVIDER_PROPERTY.isPresent() + ? FBUtilities.construct(CUSTOM_REPLICA_PLAN_ITERATOR_PROVIDER_PROPERTY.getString(), + "Replica Plan Iterator Provider") + : new DefaultReplicaPlanIteratorProvider(); + + AbstractReplicaPlanIterator getReplicaPlanIterator(PartitionRangeReadCommand command, Keyspace keyspace, ConsistencyLevel consistencyLevel); + + class DefaultReplicaPlanIteratorProvider implements ReplicaPlanIteratorProvider + { + @Override + public AbstractReplicaPlanIterator getReplicaPlanIterator(PartitionRangeReadCommand command, Keyspace keyspace, ConsistencyLevel consistencyLevel) + { + return new ReplicaPlanIterator(command.dataRange().keyRange(), command.indexQueryPlan(), keyspace, consistencyLevel); + } + } +}