Skip to content

Commit b712cfb

Browse files
committed
RemoteToLocalVirtualKeyspace: supporting access to all nodes' local virtual tables from any node in the cluster
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20900
1 parent f7743fe commit b712cfb

File tree

15 files changed

+1025
-11
lines changed

15 files changed

+1025
-11
lines changed

src/java/org/apache/cassandra/db/filter/ColumnFilter.java

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.cassandra.io.util.DataOutputPlus;
3434
import org.apache.cassandra.schema.ColumnMetadata;
3535
import org.apache.cassandra.schema.TableMetadata;
36+
import org.apache.cassandra.utils.btree.BTree;
3637

3738
/**
3839
* Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected
@@ -64,7 +65,6 @@
6465
*/
6566
public abstract class ColumnFilter
6667
{
67-
6868
public static final ColumnFilter NONE = selection(RegularAndStaticColumns.NONE);
6969

7070
public static final Serializer serializer = new Serializer();
@@ -305,6 +305,20 @@ public boolean isWildcard()
305305
return false;
306306
}
307307

308+
/**
309+
* Rebinds matching columns into a new filter; ignores any missing but fails if any are a different type
310+
*/
311+
abstract ColumnFilter rebind(TableMetadata newTable);
312+
313+
public static ColumnFilter rebindVirtual(ColumnFilter filter, TableMetadata newTable)
314+
{
315+
// review feedback; nothing actually preventing its use with other tables,
316+
// but unclear utility/rationale so just some protection against incorrect usage
317+
if (!newTable.isVirtual())
318+
throw new UnsupportedOperationException("This feature is intended only to be used with virtual keyspaces");
319+
return filter.rebind(newTable);
320+
}
321+
308322
/**
309323
* Returns the CQL string corresponding to this {@code ColumnFilter}.
310324
*
@@ -630,6 +644,12 @@ public boolean isWildcard()
630644
return true;
631645
}
632646

647+
@Override
648+
ColumnFilter rebind(TableMetadata newTable)
649+
{
650+
return new WildCardColumnFilter(ColumnFilter.rebind(newTable, fetchedAndQueried));
651+
}
652+
633653
@Override
634654
protected SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections()
635655
{
@@ -779,6 +799,21 @@ public Tester newTester(ColumnMetadata column)
779799
return new Tester(fetchingStrategy.fetchesAllColumns(column.isStatic()), s.iterator());
780800
}
781801

802+
@Override
803+
ColumnFilter rebind(TableMetadata newTable)
804+
{
805+
RegularAndStaticColumns queried = ColumnFilter.rebind(newTable, this.queried);
806+
RegularAndStaticColumns fetched = this.queried == this.fetched ? queried : ColumnFilter.rebind(newTable, this.fetched);
807+
SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections = null;
808+
if (this.subSelections != null)
809+
{
810+
subSelections = TreeMultimap.create();
811+
for (Map.Entry<ColumnIdentifier, ColumnSubselection> e : this.subSelections.entries())
812+
subSelections.put(e.getKey(), e.getValue().rebind(newTable));
813+
}
814+
return new SelectionColumnFilter(fetchingStrategy, queried, fetched, subSelections);
815+
}
816+
782817
@Override
783818
protected SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections()
784819
{
@@ -1003,4 +1038,26 @@ private long subSelectionsSerializedSize(SortedSetMultimap<ColumnIdentifier, Col
10031038
return size;
10041039
}
10051040
}
1041+
1042+
private static RegularAndStaticColumns rebind(TableMetadata newTable, RegularAndStaticColumns columns)
1043+
{
1044+
return new RegularAndStaticColumns(rebind(newTable, columns.statics), rebind(newTable, columns.regulars));
1045+
}
1046+
1047+
private static Columns rebind(TableMetadata newTable, Columns columns)
1048+
{
1049+
if (columns.isEmpty())
1050+
return columns;
1051+
1052+
try (BTree.FastBuilder<ColumnMetadata> builder = BTree.fastBuilder())
1053+
{
1054+
for (ColumnMetadata in : columns)
1055+
{
1056+
ColumnMetadata out = newTable.getColumn(in.name);
1057+
if (out != null)
1058+
builder.add(out);
1059+
}
1060+
return Columns.from(builder);
1061+
}
1062+
}
10061063
}

src/java/org/apache/cassandra/db/filter/ColumnSubselection.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ public ColumnMetadata column()
7676

7777
protected abstract CellPath comparisonPath();
7878

79+
protected abstract ColumnSubselection rebind(TableMetadata newTable);
80+
7981
public int compareTo(ColumnSubselection other)
8082
{
8183
assert other.column().name.equals(column().name);
@@ -118,6 +120,12 @@ public CellPath comparisonPath()
118120
return from;
119121
}
120122

123+
@Override
124+
protected ColumnSubselection rebind(TableMetadata newTable)
125+
{
126+
return new Slice(newTable.getColumn(column.name), from, to);
127+
}
128+
121129
public int compareInclusionOf(CellPath path)
122130
{
123131
Comparator<CellPath> cmp = column.cellPathComparator();
@@ -160,6 +168,12 @@ public CellPath comparisonPath()
160168
return element;
161169
}
162170

171+
@Override
172+
protected ColumnSubselection rebind(TableMetadata newTable)
173+
{
174+
return new Element(newTable.getColumn(column.name), element);
175+
}
176+
163177
public int compareInclusionOf(CellPath path)
164178
{
165179
return column.cellPathComparator().compare(path, element);

src/java/org/apache/cassandra/db/filter/RowFilter.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public void addCustomIndexExpression(TableMetadata metadata, IndexMetadata targe
141141
add(new CustomExpression(metadata, targetIndex, value));
142142
}
143143

144-
private void add(Expression expression)
144+
public void add(Expression expression)
145145
{
146146
expression.validate();
147147
expressions.add(expression);
@@ -549,6 +549,28 @@ public void validateForIndexing()
549549
"Index expression values may not be larger than 64K");
550550
}
551551

552+
/**
553+
* Rebind this expression to a table metadata that is expected to have equivalent columns.
554+
* If any referenced column is missing, returns null;
555+
* if any referenced column has a different type throws an exception
556+
*/
557+
public Expression rebind(TableMetadata newTable)
558+
{
559+
throw new UnsupportedOperationException("Expression " + toString(true) + " does not support rebinding to another table definition");
560+
}
561+
562+
protected static ColumnMetadata rebind(ColumnMetadata in, TableMetadata newTable)
563+
{
564+
ColumnMetadata out = newTable.getColumn(in.name);
565+
if (out == null)
566+
return null;
567+
568+
if (!out.type.equals(in.type) && !out.type.isCompatibleWith(in.type) || !in.type.isCompatibleWith(out.type))
569+
throw new IllegalArgumentException("The provided TableMetadata is not compatible with the expression");
570+
571+
return out;
572+
}
573+
552574
/**
553575
* Returns whether the provided row satisfied this expression or not.
554576
*
@@ -734,6 +756,16 @@ public static class SimpleExpression extends Expression
734756
super(column, operator, value);
735757
}
736758

759+
@Override
760+
public Expression rebind(TableMetadata newTable)
761+
{
762+
ColumnMetadata out = rebind(column, newTable);
763+
if (out == null)
764+
return null;
765+
766+
return new SimpleExpression(out, operator, value);
767+
}
768+
737769
@Override
738770
public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey partitionKey, Row row, long nowInSec)
739771
{
@@ -853,6 +885,16 @@ public void validate() throws InvalidRequestException
853885
checkBindValueSet(value, "Unsupported unset map value for column %s", column.name);
854886
}
855887

888+
@Override
889+
public Expression rebind(TableMetadata newTable)
890+
{
891+
ColumnMetadata out = rebind(column, newTable);
892+
if (out == null)
893+
return null;
894+
895+
return new MapElementExpression(out, key, operator, value);
896+
}
897+
856898
@Override
857899
public ByteBuffer getIndexValue()
858900
{
@@ -978,6 +1020,12 @@ protected Kind kind()
9781020
return Kind.CUSTOM;
9791021
}
9801022

1023+
@Override
1024+
public Expression rebind(TableMetadata newTable)
1025+
{
1026+
return new CustomExpression(table, targetIndex, value);
1027+
}
1028+
9811029
// Filtering by custom expressions isn't supported yet, so just accept any row
9821030
@Override
9831031
public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey partitionKey, Row row, long nowInSec)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.db.virtual;
20+
21+
import org.apache.cassandra.schema.SchemaConstants;
22+
23+
public class AccordDebugRemoteKeyspace extends RemoteToLocalVirtualKeyspace
24+
{
25+
public static final AccordDebugRemoteKeyspace instance = new AccordDebugRemoteKeyspace(SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.instance);
26+
27+
public AccordDebugRemoteKeyspace(String name, VirtualKeyspace wrap)
28+
{
29+
super(name, wrap);
30+
}
31+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.db.virtual;
20+
21+
import java.util.function.Predicate;
22+
import java.util.stream.Collectors;
23+
24+
public class RemoteToLocalVirtualKeyspace extends VirtualKeyspace
25+
{
26+
public RemoteToLocalVirtualKeyspace(String name, VirtualKeyspace wrap)
27+
{
28+
this(name, wrap, ignore -> true);
29+
}
30+
31+
public RemoteToLocalVirtualKeyspace(String name, VirtualKeyspace wrap, Predicate<VirtualTable> include)
32+
{
33+
super(name, wrap.tables().stream().filter(include).map(vt -> new RemoteToLocalVirtualTable(name, vt)).collect(Collectors.toList()));
34+
}
35+
}

0 commit comments

Comments
 (0)