Skip to content

Commit 22a2503

Browse files
committed
RemoteToLocalVirtualKeyspace: supporting access to all nodes' local virtual tables from any node in the cluster
1 parent cb64a1f commit 22a2503

File tree

8 files changed

+706
-13
lines changed

8 files changed

+706
-13
lines changed

src/java/org/apache/cassandra/db/filter/ColumnFilter.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.cassandra.io.util.DataOutputPlus;
3434
import org.apache.cassandra.schema.ColumnMetadata;
3535
import org.apache.cassandra.schema.TableMetadata;
36+
import org.apache.cassandra.utils.btree.BTree;
3637

3738
/**
3839
* Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected
@@ -64,7 +65,6 @@
6465
*/
6566
public abstract class ColumnFilter
6667
{
67-
6868
public static final ColumnFilter NONE = selection(RegularAndStaticColumns.NONE);
6969

7070
public static final Serializer serializer = new Serializer();
@@ -305,6 +305,11 @@ public boolean isWildcard()
305305
return false;
306306
}
307307

308+
/**
309+
* Rebinds matching columns into a new filter; ignores any missing but fails if any are a different type
310+
*/
311+
public abstract ColumnFilter rebind(TableMetadata newTable);
312+
308313
/**
309314
* Returns the CQL string corresponding to this {@code ColumnFilter}.
310315
*
@@ -630,6 +635,12 @@ public boolean isWildcard()
630635
return true;
631636
}
632637

638+
@Override
639+
public ColumnFilter rebind(TableMetadata newTable)
640+
{
641+
return new WildCardColumnFilter(ColumnFilter.rebind(newTable, fetchedAndQueried));
642+
}
643+
633644
@Override
634645
protected SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections()
635646
{
@@ -779,6 +790,17 @@ public Tester newTester(ColumnMetadata column)
779790
return new Tester(fetchingStrategy.fetchesAllColumns(column.isStatic()), s.iterator());
780791
}
781792

793+
@Override
794+
public ColumnFilter rebind(TableMetadata newTable)
795+
{
796+
RegularAndStaticColumns queried = ColumnFilter.rebind(newTable, this.queried);
797+
RegularAndStaticColumns fetched = this.queried == this.fetched ? queried : ColumnFilter.rebind(newTable, this.fetched);
798+
SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections = this.subSelections;
799+
if (subSelections != null)
800+
subSelections = TreeMultimap.create(subSelections);
801+
return new SelectionColumnFilter(fetchingStrategy, queried, fetched, subSelections);
802+
}
803+
782804
@Override
783805
protected SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections()
784806
{
@@ -1003,4 +1025,26 @@ private long subSelectionsSerializedSize(SortedSetMultimap<ColumnIdentifier, Col
10031025
return size;
10041026
}
10051027
}
1028+
1029+
private static RegularAndStaticColumns rebind(TableMetadata newTable, RegularAndStaticColumns columns)
1030+
{
1031+
return new RegularAndStaticColumns(rebind(newTable, columns.statics), rebind(newTable, columns.regulars));
1032+
}
1033+
1034+
private static Columns rebind(TableMetadata newTable, Columns columns)
1035+
{
1036+
if (columns.isEmpty())
1037+
return columns;
1038+
1039+
try (BTree.FastBuilder<ColumnMetadata> builder = BTree.fastBuilder())
1040+
{
1041+
for (ColumnMetadata in : columns)
1042+
{
1043+
ColumnMetadata out = newTable.getColumn(in.name);
1044+
if (out != null)
1045+
builder.add(out);
1046+
}
1047+
return Columns.from(builder);
1048+
}
1049+
}
10061050
}

src/java/org/apache/cassandra/db/filter/RowFilter.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public void addCustomIndexExpression(TableMetadata metadata, IndexMetadata targe
141141
add(new CustomExpression(metadata, targetIndex, value));
142142
}
143143

144-
private void add(Expression expression)
144+
public void add(Expression expression)
145145
{
146146
expression.validate();
147147
expressions.add(expression);
@@ -549,6 +549,28 @@ public void validateForIndexing()
549549
"Index expression values may not be larger than 64K");
550550
}
551551

552+
/**
553+
* Rebind this expression to a table metadata that is expected to have equivalent columns.
554+
* If any referenced column is missing, returns null;
555+
* if any referenced column has a different type throws an exception
556+
*/
557+
public Expression rebind(TableMetadata newTable)
558+
{
559+
throw new UnsupportedOperationException("Expression " + toString(true) + " does not support rebinding to another table definition");
560+
}
561+
562+
protected static ColumnMetadata rebind(ColumnMetadata in, TableMetadata newTable)
563+
{
564+
ColumnMetadata out = newTable.getColumn(in.name);
565+
if (out == null)
566+
return null;
567+
568+
if (!out.type.equals(in.type) && !out.type.isCompatibleWith(in.type) || !in.type.isCompatibleWith(out.type))
569+
throw new IllegalArgumentException("The provided TableMetadata is not compatible with the expression");
570+
571+
return out;
572+
}
573+
552574
/**
553575
* Returns whether the provided row satisfied this expression or not.
554576
*
@@ -734,6 +756,16 @@ public static class SimpleExpression extends Expression
734756
super(column, operator, value);
735757
}
736758

759+
@Override
760+
public Expression rebind(TableMetadata newTable)
761+
{
762+
ColumnMetadata out = rebind(column, newTable);
763+
if (out == null)
764+
return null;
765+
766+
return new SimpleExpression(out, operator, value);
767+
}
768+
737769
@Override
738770
public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey partitionKey, Row row, long nowInSec)
739771
{
@@ -853,6 +885,16 @@ public void validate() throws InvalidRequestException
853885
checkBindValueSet(value, "Unsupported unset map value for column %s", column.name);
854886
}
855887

888+
@Override
889+
public Expression rebind(TableMetadata newTable)
890+
{
891+
ColumnMetadata out = rebind(column, newTable);
892+
if (out == null)
893+
return null;
894+
895+
return new MapElementExpression(out, key, operator, value);
896+
}
897+
856898
@Override
857899
public ByteBuffer getIndexValue()
858900
{
@@ -978,6 +1020,12 @@ protected Kind kind()
9781020
return Kind.CUSTOM;
9791021
}
9801022

1023+
@Override
1024+
public Expression rebind(TableMetadata newTable)
1025+
{
1026+
return new CustomExpression(table, targetIndex, value);
1027+
}
1028+
9811029
// Filtering by custom expressions isn't supported yet, so just accept any row
9821030
@Override
9831031
public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey partitionKey, Row row, long nowInSec)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.db.virtual;
20+
21+
import org.apache.cassandra.schema.SchemaConstants;
22+
23+
public class AccordDebugRemoteKeyspace extends RemoteToLocalVirtualKeyspace
24+
{
25+
public static final AccordDebugRemoteKeyspace instance = new AccordDebugRemoteKeyspace(SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.instance);
26+
27+
public AccordDebugRemoteKeyspace(String name, VirtualKeyspace wrap)
28+
{
29+
super(name, wrap);
30+
}
31+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.db.virtual;
20+
21+
import java.util.stream.Collectors;
22+
23+
public class RemoteToLocalVirtualKeyspace extends VirtualKeyspace
24+
{
25+
public RemoteToLocalVirtualKeyspace(String name, VirtualKeyspace wrap)
26+
{
27+
super(name, wrap.tables().stream().map(vt -> new RemoteToLocalVirtualTable(name, vt)).collect(Collectors.toList()));
28+
}
29+
}

0 commit comments

Comments
 (0)