Skip to content

Commit a9f93dc

Browse files
committed
CNDB-poc-reader: customize replica plan iterator for topk
1 parent 8f31782 commit a9f93dc

File tree

8 files changed

+143
-56
lines changed

8 files changed

+143
-56
lines changed

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,8 @@ public enum CassandraRelevantProperties
321321
/** Which class to use for token metadata provider */
322322
CUSTOM_TMD_PROVIDER_PROPERTY("cassandra.custom_token_metadata_provider_class"),
323323

324+
CUSTOM_REPLICA_PLAN_ITERATOR_PROVIDER_PROPERTY("cassandra.custom_replica_plan_iterator_provider_class"),
325+
324326
/** Which class to use for failure detection */
325327
CUSTOM_FAILURE_DETECTOR_PROPERTY("cassandra.custom_failure_detector_class"),
326328

src/java/org/apache/cassandra/locator/ReplicaLayout.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ static ReplicaLayout.ForTokenRead forTokenReadLiveSorted(AbstractReplicationStra
341341
* @return the read layout for a range - this includes only live natural replicas, i.e. those that are not pending
342342
* and not marked down by the failure detector. these are reverse sorted by the badness score of the configured snitch
343343
*/
344-
static ReplicaLayout.ForRangeRead forRangeReadLiveSorted(AbstractReplicationStrategy replicationStrategy, AbstractBounds<PartitionPosition> range)
344+
public static ReplicaLayout.ForRangeRead forRangeReadLiveSorted(AbstractReplicationStrategy replicationStrategy, AbstractBounds<PartitionPosition> range)
345345
{
346346
EndpointsForRange replicas = replicationStrategy.getNaturalReplicas(range.right);
347347
replicas = DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas);

src/java/org/apache/cassandra/locator/ReplicaPlans.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public static boolean isSufficientLiveReplicasForRead(AbstractReplicationStrateg
138138
}
139139
}
140140

141-
static void assureSufficientLiveReplicasForRead(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints<?> liveReplicas) throws UnavailableException
141+
public static void assureSufficientLiveReplicasForRead(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints<?> liveReplicas) throws UnavailableException
142142
{
143143
assureSufficientLiveReplicas(replicationStrategy, consistencyLevel, liveReplicas, consistencyLevel.blockFor(replicationStrategy), 1);
144144
}
@@ -659,7 +659,7 @@ public static ReplicaPlan.ForPaxosWrite forPaxos(Keyspace keyspace, DecoratedKey
659659
}
660660

661661

662-
private static <E extends Endpoints<E>> E candidatesForRead(Keyspace keyspace, Index.QueryPlan indexQueryPlan, ConsistencyLevel consistencyLevel, E liveNaturalReplicas)
662+
public static <E extends Endpoints<E>> E candidatesForRead(Keyspace keyspace, Index.QueryPlan indexQueryPlan, ConsistencyLevel consistencyLevel, E liveNaturalReplicas)
663663
{
664664
E replicas = consistencyLevel.isDatacenterLocal() ? liveNaturalReplicas.filter(InOurDcTester.replicas()) : liveNaturalReplicas;
665665

@@ -677,7 +677,7 @@ private static <E extends Endpoints<E>> E contactForEachQuorumRead(NetworkTopolo
677677
});
678678
}
679679

680-
private static <E extends Endpoints<E>> E contactForRead(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, boolean alwaysSpeculate, E candidates)
680+
public static <E extends Endpoints<E>> E contactForRead(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, boolean alwaysSpeculate, E candidates)
681681
{
682682
/*
683683
* If we are doing an each quorum query, we have to make sure that the endpoints we select

src/java/org/apache/cassandra/schema/KeyspaceMetadata.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.cassandra.db.marshal.UserType;
3636
import org.apache.cassandra.exceptions.ConfigurationException;
3737
import org.apache.cassandra.locator.AbstractReplicationStrategy;
38+
import org.apache.cassandra.locator.TokenMetadata;
3839
import org.apache.cassandra.schema.UserFunctions.FunctionsDiff;
3940
import org.apache.cassandra.schema.Tables.TablesDiff;
4041
import org.apache.cassandra.schema.Types.TypesDiff;
@@ -389,10 +390,15 @@ public void validate()
389390
}
390391

391392
public AbstractReplicationStrategy createReplicationStrategy()
393+
{
394+
return createReplicationStrategy(StorageService.instance.getTokenMetadataForKeyspace(name));
395+
}
396+
397+
public AbstractReplicationStrategy createReplicationStrategy(TokenMetadata tokenMetadata)
392398
{
393399
return AbstractReplicationStrategy.createReplicationStrategy(name,
394400
params.replication.klass,
395-
StorageService.instance.getTokenMetadataForKeyspace(name),
401+
tokenMetadata,
396402
DatabaseDescriptor.getEndpointSnitch(),
397403
params.replication.options);
398404
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.service.reads.range;
20+
21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.Iterator;
24+
import java.util.List;
25+
26+
import org.apache.cassandra.db.PartitionPosition;
27+
import org.apache.cassandra.dht.AbstractBounds;
28+
import org.apache.cassandra.dht.Bounds;
29+
import org.apache.cassandra.dht.Token;
30+
import org.apache.cassandra.locator.ReplicaPlan;
31+
import org.apache.cassandra.locator.TokenMetadata;
32+
import org.apache.cassandra.utils.AbstractIterator;
33+
import org.apache.cassandra.utils.Pair;
34+
35+
public abstract class AbstractReplicaPlanIterator extends AbstractIterator<ReplicaPlan.ForRangeRead>
36+
{
37+
public abstract int size();
38+
39+
/**
40+
* Compute all ranges we're going to query, in sorted order. Nodes can be replica destinations for many ranges,
41+
* so we need to restrict each scan to the specific range we want, or else we'd get duplicate results.
42+
*/
43+
public static List<AbstractBounds<PartitionPosition>> getRestrictedRanges(TokenMetadata tokenMetadata, final AbstractBounds<PartitionPosition> queryRange)
44+
{
45+
// special case for bounds containing exactly 1 (non-minimum) token
46+
if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.isMinimum())
47+
{
48+
return Collections.singletonList(queryRange);
49+
}
50+
51+
List<AbstractBounds<PartitionPosition>> ranges = new ArrayList<>();
52+
// divide the queryRange into pieces delimited by the ring and minimum tokens
53+
Iterator<Token> ringIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), queryRange.left.getToken(), true);
54+
AbstractBounds<PartitionPosition> remainder = queryRange;
55+
while (ringIter.hasNext())
56+
{
57+
/*
58+
* remainder is a range/bounds of partition positions and we want to split it with a token. We want to split
59+
* using the key returned by token.maxKeyBound. For instance, if remainder is [DK(10, 'foo'), DK(20, 'bar')],
60+
* and we have 3 nodes with tokens 0, 15, 30, we want to split remainder to A=[DK(10, 'foo'), 15] and
61+
* B=(15, DK(20, 'bar')]. But since we can't mix tokens and keys at the same time in a range, we use
62+
* 15.maxKeyBound() to have A include all keys having 15 as token and B include none of those (since that is
63+
* what our node owns).
64+
*/
65+
Token upperBoundToken = ringIter.next();
66+
PartitionPosition upperBound = upperBoundToken.maxKeyBound();
67+
if (!remainder.left.equals(upperBound) && !remainder.contains(upperBound))
68+
// no more splits
69+
break;
70+
Pair<AbstractBounds<PartitionPosition>, AbstractBounds<PartitionPosition>> splits = remainder.split(upperBound);
71+
if (splits == null)
72+
continue;
73+
74+
ranges.add(splits.left);
75+
remainder = splits.right;
76+
}
77+
ranges.add(remainder);
78+
79+
return ranges;
80+
}
81+
}

src/java/org/apache/cassandra/service/reads/range/RangeCommands.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,7 @@ static RangeCommandIterator rangeCommandIterator(PartitionRangeReadCommand comma
7373
Tracing.trace("Computing ranges to query");
7474

7575
Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
76-
ReplicaPlanIterator replicaPlans = new ReplicaPlanIterator(command.dataRange().keyRange(),
77-
command.indexQueryPlan(),
78-
keyspace,
79-
consistencyLevel);
76+
AbstractReplicaPlanIterator replicaPlans = ReplicaPlanIteratorProvider.instance.getReplicaPlanIterator(command, keyspace, consistencyLevel);
8077
if (command.isTopK())
8178
return new ScanAllRangesCommandIterator(keyspace, replicaPlans, command, replicaPlans.size(), queryStartNanoTime, readTracker);
8279

src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java

Lines changed: 3 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,9 @@
3636
import org.apache.cassandra.locator.ReplicaPlan;
3737
import org.apache.cassandra.locator.ReplicaPlans;
3838
import org.apache.cassandra.locator.TokenMetadata;
39-
import org.apache.cassandra.service.StorageService;
40-
import org.apache.cassandra.utils.AbstractIterator;
4139
import org.apache.cassandra.utils.Pair;
4240

43-
class ReplicaPlanIterator extends AbstractIterator<ReplicaPlan.ForRangeRead>
41+
class ReplicaPlanIterator extends AbstractReplicaPlanIterator
4442
{
4543
private final Keyspace keyspace;
4644
private final ConsistencyLevel consistency;
@@ -69,7 +67,8 @@ class ReplicaPlanIterator extends AbstractIterator<ReplicaPlan.ForRangeRead>
6967
/**
7068
* @return the number of {@link ReplicaPlan.ForRangeRead}s in this iterator
7169
*/
72-
int size()
70+
@Override
71+
public int size()
7372
{
7473
return rangeCount;
7574
}
@@ -82,47 +81,4 @@ protected ReplicaPlan.ForRangeRead computeNext()
8281

8382
return ReplicaPlans.forRangeRead(keyspace, indexQueryPlan, consistency, ranges.next(), 1);
8483
}
85-
86-
/**
87-
* Compute all ranges we're going to query, in sorted order. Nodes can be replica destinations for many ranges,
88-
* so we need to restrict each scan to the specific range we want, or else we'd get duplicate results.
89-
*/
90-
private static List<AbstractBounds<PartitionPosition>> getRestrictedRanges(TokenMetadata tokenMetadata, final AbstractBounds<PartitionPosition> queryRange)
91-
{
92-
// special case for bounds containing exactly 1 (non-minimum) token
93-
if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.isMinimum())
94-
{
95-
return Collections.singletonList(queryRange);
96-
}
97-
98-
List<AbstractBounds<PartitionPosition>> ranges = new ArrayList<>();
99-
// divide the queryRange into pieces delimited by the ring and minimum tokens
100-
Iterator<Token> ringIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), queryRange.left.getToken(), true);
101-
AbstractBounds<PartitionPosition> remainder = queryRange;
102-
while (ringIter.hasNext())
103-
{
104-
/*
105-
* remainder is a range/bounds of partition positions and we want to split it with a token. We want to split
106-
* using the key returned by token.maxKeyBound. For instance, if remainder is [DK(10, 'foo'), DK(20, 'bar')],
107-
* and we have 3 nodes with tokens 0, 15, 30, we want to split remainder to A=[DK(10, 'foo'), 15] and
108-
* B=(15, DK(20, 'bar')]. But since we can't mix tokens and keys at the same time in a range, we use
109-
* 15.maxKeyBound() to have A include all keys having 15 as token and B include none of those (since that is
110-
* what our node owns).
111-
*/
112-
Token upperBoundToken = ringIter.next();
113-
PartitionPosition upperBound = upperBoundToken.maxKeyBound();
114-
if (!remainder.left.equals(upperBound) && !remainder.contains(upperBound))
115-
// no more splits
116-
break;
117-
Pair<AbstractBounds<PartitionPosition>, AbstractBounds<PartitionPosition>> splits = remainder.split(upperBound);
118-
if (splits == null)
119-
continue;
120-
121-
ranges.add(splits.left);
122-
remainder = splits.right;
123-
}
124-
ranges.add(remainder);
125-
126-
return ranges;
127-
}
12884
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.service.reads.range;
20+
21+
import org.apache.cassandra.db.ConsistencyLevel;
22+
import org.apache.cassandra.db.Keyspace;
23+
import org.apache.cassandra.db.PartitionRangeReadCommand;
24+
import org.apache.cassandra.utils.FBUtilities;
25+
26+
import static org.apache.cassandra.config.CassandraRelevantProperties.CUSTOM_REPLICA_PLAN_ITERATOR_PROVIDER_PROPERTY;
27+
28+
public interface ReplicaPlanIteratorProvider
29+
{
30+
ReplicaPlanIteratorProvider instance = CUSTOM_REPLICA_PLAN_ITERATOR_PROVIDER_PROPERTY.isPresent()
31+
? FBUtilities.construct(CUSTOM_REPLICA_PLAN_ITERATOR_PROVIDER_PROPERTY.getString(),
32+
"Replica Plan Iterator Provider")
33+
: new DefaultReplicaPlanIteratorProvider();
34+
35+
AbstractReplicaPlanIterator getReplicaPlanIterator(PartitionRangeReadCommand command, Keyspace keyspace, ConsistencyLevel consistencyLevel);
36+
37+
class DefaultReplicaPlanIteratorProvider implements ReplicaPlanIteratorProvider
38+
{
39+
@Override
40+
public AbstractReplicaPlanIterator getReplicaPlanIterator(PartitionRangeReadCommand command, Keyspace keyspace, ConsistencyLevel consistencyLevel)
41+
{
42+
return new ReplicaPlanIterator(command.dataRange().keyRange(), command.indexQueryPlan(), keyspace, consistencyLevel);
43+
}
44+
}
45+
}

0 commit comments

Comments
 (0)