Skip to content

[POC] Reader: customize replica plan iterator for topk #1674

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ public enum CassandraRelevantProperties
/** Which class to use for token metadata provider */
CUSTOM_TMD_PROVIDER_PROPERTY("cassandra.custom_token_metadata_provider_class"),

CUSTOM_REPLICA_PLAN_ITERATOR_PROVIDER_PROPERTY("cassandra.custom_replica_plan_iterator_provider_class"),

/** Which class to use for failure detection */
CUSTOM_FAILURE_DETECTOR_PROPERTY("cassandra.custom_failure_detector_class"),

Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/locator/ReplicaLayout.java
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ static ReplicaLayout.ForTokenRead forTokenReadLiveSorted(AbstractReplicationStra
* @return the read layout for a range - this includes only live natural replicas, i.e. those that are not pending
* and not marked down by the failure detector. these are reverse sorted by the badness score of the configured snitch
*/
static ReplicaLayout.ForRangeRead forRangeReadLiveSorted(AbstractReplicationStrategy replicationStrategy, AbstractBounds<PartitionPosition> range)
public static ReplicaLayout.ForRangeRead forRangeReadLiveSorted(AbstractReplicationStrategy replicationStrategy, AbstractBounds<PartitionPosition> range)
{
EndpointsForRange replicas = replicationStrategy.getNaturalReplicas(range.right);
replicas = DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas);
Expand Down
6 changes: 3 additions & 3 deletions src/java/org/apache/cassandra/locator/ReplicaPlans.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public static boolean isSufficientLiveReplicasForRead(AbstractReplicationStrateg
}
}

static void assureSufficientLiveReplicasForRead(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints<?> liveReplicas) throws UnavailableException
public static void assureSufficientLiveReplicasForRead(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints<?> liveReplicas) throws UnavailableException
{
assureSufficientLiveReplicas(replicationStrategy, consistencyLevel, liveReplicas, consistencyLevel.blockFor(replicationStrategy), 1);
}
Expand Down Expand Up @@ -659,7 +659,7 @@ public static ReplicaPlan.ForPaxosWrite forPaxos(Keyspace keyspace, DecoratedKey
}


private static <E extends Endpoints<E>> E candidatesForRead(Keyspace keyspace, Index.QueryPlan indexQueryPlan, ConsistencyLevel consistencyLevel, E liveNaturalReplicas)
public static <E extends Endpoints<E>> E candidatesForRead(Keyspace keyspace, Index.QueryPlan indexQueryPlan, ConsistencyLevel consistencyLevel, E liveNaturalReplicas)
{
E replicas = consistencyLevel.isDatacenterLocal() ? liveNaturalReplicas.filter(InOurDcTester.replicas()) : liveNaturalReplicas;

Expand All @@ -677,7 +677,7 @@ private static <E extends Endpoints<E>> E contactForEachQuorumRead(NetworkTopolo
});
}

private static <E extends Endpoints<E>> E contactForRead(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, boolean alwaysSpeculate, E candidates)
public static <E extends Endpoints<E>> E contactForRead(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, boolean alwaysSpeculate, E candidates)
{
/*
* If we are doing an each quorum query, we have to make sure that the endpoints we select
Expand Down
8 changes: 7 additions & 1 deletion src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.cassandra.db.marshal.UserType;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.schema.UserFunctions.FunctionsDiff;
import org.apache.cassandra.schema.Tables.TablesDiff;
import org.apache.cassandra.schema.Types.TypesDiff;
Expand Down Expand Up @@ -389,10 +390,15 @@ public void validate()
}

public AbstractReplicationStrategy createReplicationStrategy()
{
return createReplicationStrategy(StorageService.instance.getTokenMetadataForKeyspace(name));
}

public AbstractReplicationStrategy createReplicationStrategy(TokenMetadata tokenMetadata)
{
return AbstractReplicationStrategy.createReplicationStrategy(name,
params.replication.klass,
StorageService.instance.getTokenMetadataForKeyspace(name),
tokenMetadata,
DatabaseDescriptor.getEndpointSnitch(),
params.replication.options);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.service.reads.range;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.Pair;

public abstract class AbstractReplicaPlanIterator extends AbstractIterator<ReplicaPlan.ForRangeRead>
{
public abstract int size();

/**
* Compute all ranges we're going to query, in sorted order. Nodes can be replica destinations for many ranges,
* so we need to restrict each scan to the specific range we want, or else we'd get duplicate results.
*/
public static List<AbstractBounds<PartitionPosition>> getRestrictedRanges(TokenMetadata tokenMetadata, final AbstractBounds<PartitionPosition> queryRange)
{
// special case for bounds containing exactly 1 (non-minimum) token
if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.isMinimum())
{
return Collections.singletonList(queryRange);
}

List<AbstractBounds<PartitionPosition>> ranges = new ArrayList<>();
// divide the queryRange into pieces delimited by the ring and minimum tokens
Iterator<Token> ringIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), queryRange.left.getToken(), true);
AbstractBounds<PartitionPosition> remainder = queryRange;
while (ringIter.hasNext())
{
/*
* remainder is a range/bounds of partition positions and we want to split it with a token. We want to split
* using the key returned by token.maxKeyBound. For instance, if remainder is [DK(10, 'foo'), DK(20, 'bar')],
* and we have 3 nodes with tokens 0, 15, 30, we want to split remainder to A=[DK(10, 'foo'), 15] and
* B=(15, DK(20, 'bar')]. But since we can't mix tokens and keys at the same time in a range, we use
* 15.maxKeyBound() to have A include all keys having 15 as token and B include none of those (since that is
* what our node owns).
*/
Token upperBoundToken = ringIter.next();
PartitionPosition upperBound = upperBoundToken.maxKeyBound();
if (!remainder.left.equals(upperBound) && !remainder.contains(upperBound))
// no more splits
break;
Pair<AbstractBounds<PartitionPosition>, AbstractBounds<PartitionPosition>> splits = remainder.split(upperBound);
if (splits == null)
continue;

ranges.add(splits.left);
remainder = splits.right;
}
ranges.add(remainder);

return ranges;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,7 @@ static RangeCommandIterator rangeCommandIterator(PartitionRangeReadCommand comma
Tracing.trace("Computing ranges to query");

Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
ReplicaPlanIterator replicaPlans = new ReplicaPlanIterator(command.dataRange().keyRange(),
command.indexQueryPlan(),
keyspace,
consistencyLevel);
AbstractReplicaPlanIterator replicaPlans = ReplicaPlanIteratorProvider.instance.getReplicaPlanIterator(command, keyspace, consistencyLevel);
if (command.isTopK())
return new ScanAllRangesCommandIterator(keyspace, replicaPlans, command, replicaPlans.size(), queryStartNanoTime, readTracker);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,9 @@
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.ReplicaPlans;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.Pair;

class ReplicaPlanIterator extends AbstractIterator<ReplicaPlan.ForRangeRead>
class ReplicaPlanIterator extends AbstractReplicaPlanIterator
{
private final Keyspace keyspace;
private final ConsistencyLevel consistency;
Expand Down Expand Up @@ -69,7 +67,8 @@ class ReplicaPlanIterator extends AbstractIterator<ReplicaPlan.ForRangeRead>
/**
* @return the number of {@link ReplicaPlan.ForRangeRead}s in this iterator
*/
int size()
@Override
public int size()
{
return rangeCount;
}
Expand All @@ -82,47 +81,4 @@ protected ReplicaPlan.ForRangeRead computeNext()

return ReplicaPlans.forRangeRead(keyspace, indexQueryPlan, consistency, ranges.next(), 1);
}

/**
* Compute all ranges we're going to query, in sorted order. Nodes can be replica destinations for many ranges,
* so we need to restrict each scan to the specific range we want, or else we'd get duplicate results.
*/
private static List<AbstractBounds<PartitionPosition>> getRestrictedRanges(TokenMetadata tokenMetadata, final AbstractBounds<PartitionPosition> queryRange)
{
// special case for bounds containing exactly 1 (non-minimum) token
if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.isMinimum())
{
return Collections.singletonList(queryRange);
}

List<AbstractBounds<PartitionPosition>> ranges = new ArrayList<>();
// divide the queryRange into pieces delimited by the ring and minimum tokens
Iterator<Token> ringIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), queryRange.left.getToken(), true);
AbstractBounds<PartitionPosition> remainder = queryRange;
while (ringIter.hasNext())
{
/*
* remainder is a range/bounds of partition positions and we want to split it with a token. We want to split
* using the key returned by token.maxKeyBound. For instance, if remainder is [DK(10, 'foo'), DK(20, 'bar')],
* and we have 3 nodes with tokens 0, 15, 30, we want to split remainder to A=[DK(10, 'foo'), 15] and
* B=(15, DK(20, 'bar')]. But since we can't mix tokens and keys at the same time in a range, we use
* 15.maxKeyBound() to have A include all keys having 15 as token and B include none of those (since that is
* what our node owns).
*/
Token upperBoundToken = ringIter.next();
PartitionPosition upperBound = upperBoundToken.maxKeyBound();
if (!remainder.left.equals(upperBound) && !remainder.contains(upperBound))
// no more splits
break;
Pair<AbstractBounds<PartitionPosition>, AbstractBounds<PartitionPosition>> splits = remainder.split(upperBound);
if (splits == null)
continue;

ranges.add(splits.left);
remainder = splits.right;
}
ranges.add(remainder);

return ranges;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.service.reads.range;

import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.PartitionRangeReadCommand;
import org.apache.cassandra.utils.FBUtilities;

import static org.apache.cassandra.config.CassandraRelevantProperties.CUSTOM_REPLICA_PLAN_ITERATOR_PROVIDER_PROPERTY;

public interface ReplicaPlanIteratorProvider
{
ReplicaPlanIteratorProvider instance = CUSTOM_REPLICA_PLAN_ITERATOR_PROVIDER_PROPERTY.isPresent()
? FBUtilities.construct(CUSTOM_REPLICA_PLAN_ITERATOR_PROVIDER_PROPERTY.getString(),
"Replica Plan Iterator Provider")
: new DefaultReplicaPlanIteratorProvider();

AbstractReplicaPlanIterator getReplicaPlanIterator(PartitionRangeReadCommand command, Keyspace keyspace, ConsistencyLevel consistencyLevel);

class DefaultReplicaPlanIteratorProvider implements ReplicaPlanIteratorProvider
{
@Override
public AbstractReplicaPlanIterator getReplicaPlanIterator(PartitionRangeReadCommand command, Keyspace keyspace, ConsistencyLevel consistencyLevel)
{
return new ReplicaPlanIterator(command.dataRange().keyRange(), command.indexQueryPlan(), keyspace, consistencyLevel);
}
}
}