Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "modules/accord"]
path = modules/accord
url = https://github.com/apache/cassandra-accord.git
branch = trunk
url = https://github.com/belliottsmith/cassandra-accord.git
branch = rebootstrap-sq
2 changes: 1 addition & 1 deletion modules/accord
Submodule accord updated 94 files
+1 −5 accord-core/src/main/java/accord/api/Agent.java
+8 −1 accord-core/src/main/java/accord/api/ConfigurationService.java
+9 −1 accord-core/src/main/java/accord/api/DataStore.java
+6 −1 accord-core/src/main/java/accord/api/Journal.java
+39 −0 accord-core/src/main/java/accord/api/OwnershipEventListener.java
+10 −5 accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
+1 −1 accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
+6 −5 accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java
+20 −15 accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
+16 −4 accord-core/src/main/java/accord/coordinate/FetchRoute.java
+14 −10 accord-core/src/main/java/accord/coordinate/Infer.java
+35 −8 accord-core/src/main/java/accord/coordinate/Invalidate.java
+36 −7 accord-core/src/main/java/accord/coordinate/KeyBarriers.java
+26 −54 accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+17 −10 accord-core/src/main/java/accord/impl/CommandChange.java
+12 −16 accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+1 −4 accord-core/src/main/java/accord/impl/progresslog/PackedKeyTracker.java
+34 −22 accord-core/src/main/java/accord/local/Bootstrap.java
+32 −8 accord-core/src/main/java/accord/local/Cleanup.java
+1 −1 accord-core/src/main/java/accord/local/Command.java
+224 −60 accord-core/src/main/java/accord/local/CommandStore.java
+126 −178 accord-core/src/main/java/accord/local/CommandStores.java
+13 −7 accord-core/src/main/java/accord/local/Commands.java
+39 −0 accord-core/src/main/java/accord/local/LogUnavailableException.java
+68 −0 accord-core/src/main/java/accord/local/MapReduceCommandStores.java
+95 −0 accord-core/src/main/java/accord/local/MapReduceConsumeCommandStores.java
+16 −52 accord-core/src/main/java/accord/local/Node.java
+126 −137 accord-core/src/main/java/accord/local/RedundantBefore.java
+89 −112 accord-core/src/main/java/accord/local/RedundantStatus.java
+24 −6 accord-core/src/main/java/accord/local/SafeCommandStore.java
+65 −65 accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+18 −18 accord-core/src/main/java/accord/local/cfk/PostProcess.java
+12 −12 accord-core/src/main/java/accord/local/cfk/Pruning.java
+19 −19 accord-core/src/main/java/accord/local/cfk/Serialize.java
+21 −21 accord-core/src/main/java/accord/local/cfk/Updating.java
+1 −0 accord-core/src/main/java/accord/local/durability/DurabilityService.java
+1 −2 accord-core/src/main/java/accord/local/durability/GlobalDurability.java
+0 −1 accord-core/src/main/java/accord/local/durability/ShardDurability.java
+10 −18 accord-core/src/main/java/accord/messages/Accept.java
+5 −6 accord-core/src/main/java/accord/messages/Apply.java
+3 −3 accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
+44 −31 accord-core/src/main/java/accord/messages/Await.java
+6 −9 accord-core/src/main/java/accord/messages/BeginInvalidation.java
+4 −3 accord-core/src/main/java/accord/messages/BeginRecovery.java
+20 −29 accord-core/src/main/java/accord/messages/CheckStatus.java
+68 −25 accord-core/src/main/java/accord/messages/Commit.java
+3 −3 accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java
+3 −3 accord-core/src/main/java/accord/messages/GetLatestDeps.java
+3 −3 accord-core/src/main/java/accord/messages/GetMaxConflict.java
+3 −3 accord-core/src/main/java/accord/messages/InformDurable.java
+21 −12 accord-core/src/main/java/accord/messages/NoWaitRequest.java
+15 −59 accord-core/src/main/java/accord/messages/ParticipantsRequest.java
+3 −3 accord-core/src/main/java/accord/messages/PreAccept.java
+28 −15 accord-core/src/main/java/accord/messages/Propagate.java
+26 −16 accord-core/src/main/java/accord/messages/ReadData.java
+11 −8 accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java
+78 −0 accord-core/src/main/java/accord/messages/RouteRequest.java
+11 −7 accord-core/src/main/java/accord/messages/SetShardDurable.java
+10 −2 accord-core/src/main/java/accord/messages/StableThenRead.java
+1 −1 accord-core/src/main/java/accord/primitives/LatestDeps.java
+10 −8 accord-core/src/main/java/accord/primitives/Ranges.java
+6 −0 accord-core/src/main/java/accord/utils/DeterministicSet.java
+6 −0 accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java
+1 −10 accord-core/src/main/java/accord/utils/MapReduceConsume.java
+10 −7 accord-core/src/main/java/accord/utils/async/AsyncResults.java
+9 −11 accord-core/src/test/java/accord/burn/BurnTestBase.java
+3 −10 accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
+0 −26 accord-core/src/test/java/accord/burn/TopologyUpdates.java
+4 −5 accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
+3 −3 accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
+62 −0 accord-core/src/test/java/accord/impl/AbstractTestConfigurationService.java
+3 −3 accord-core/src/test/java/accord/impl/MessageListener.java
+8 −1 accord-core/src/test/java/accord/impl/TestAgent.java
+86 −51 accord-core/src/test/java/accord/impl/basic/Cluster.java
+0 −12 accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+20 −4 accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+2 −2 accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
+74 −28 accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
+24 −9 accord-core/src/test/java/accord/impl/list/ListAgent.java
+2 −1 accord-core/src/test/java/accord/impl/list/ListData.java
+1 −1 accord-core/src/test/java/accord/impl/list/ListStore.java
+2 −2 accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java
+1 −1 accord-core/src/test/java/accord/impl/mock/MockStore.java
+6 −11 accord-core/src/test/java/accord/local/RedundantBeforeTest.java
+9 −2 accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
+1 −3 accord-core/src/test/java/accord/messages/ReadDataTest.java
+11 −11 accord-core/src/test/java/accord/messages/RouteRequestScopeTest.java
+1 −1 accord-core/src/test/java/accord/topology/TopologyManagerTest.java
+105 −26 accord-core/src/test/java/accord/topology/TopologyRandomizer.java
+2 −2 accord-core/src/test/java/accord/utils/ExtendedAssertions.java
+1 −1 accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+8 −1 accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
+1 −1 accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java
+2 −1 accord-maelstrom/src/main/java/accord/maelstrom/SimpleConfigService.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,13 @@ public static StatementRestrictions empty(StatementType type, TableMetadata tabl
return new StatementRestrictions(type, table, IndexHints.NONE, false);
}

private StatementRestrictions(StatementType type, TableMetadata table, IndexHints indexHints, boolean allowFiltering)
private StatementRestrictions(StatementType type, TableMetadata table, IndexHints indexHints, boolean allowFilteringOfPrimaryKeys)
{
this.type = type;
this.table = table;
this.indexHints = indexHints;
this.partitionKeyRestrictions = new PartitionKeyRestrictions(table.partitionKeyAsClusteringComparator());
this.clusteringColumnsRestrictions = new ClusteringColumnRestrictions(table, allowFiltering);
this.clusteringColumnsRestrictions = new ClusteringColumnRestrictions(table, allowFilteringOfPrimaryKeys);
this.nonPrimaryKeyRestrictions = RestrictionSet.empty();
this.notNullColumns = new HashSet<>();
}
Expand Down Expand Up @@ -370,7 +370,7 @@ else if (operator.requiresIndexing())
}
else
{
if (!allowFiltering && requiresAllowFilteringIfNotSpecified(table))
if (!allowFiltering && requiresAllowFilteringIfNotSpecified(table, false))
throw invalidRequest(allowFilteringMessage(state));
}

Expand All @@ -381,14 +381,14 @@ else if (operator.requiresIndexing())
validateSecondaryIndexSelections();
}

public static boolean requiresAllowFilteringIfNotSpecified(TableMetadata metadata)
public static boolean requiresAllowFilteringIfNotSpecified(TableMetadata metadata, boolean isPrimaryKey)
{
if (!metadata.isVirtual())
return true;

VirtualTable tableNullable = VirtualKeyspaceRegistry.instance.getTableNullable(metadata.id);
assert tableNullable != null;
return !tableNullable.allowFilteringImplicitly();
return isPrimaryKey ? !tableNullable.allowFilteringPrimaryKeysImplicitly() : !tableNullable.allowFilteringImplicitly();
}

private void addRestriction(Restriction restriction, IndexRegistry indexRegistry, IndexHints indexHints)
Expand Down Expand Up @@ -593,7 +593,7 @@ private void processPartitionKeyRestrictions(ClientState state, boolean hasQueri
// components must have a EQ. Only the last partition key component can be in IN relation.
if (partitionKeyRestrictions.needFiltering())
{
if (!allowFiltering && !forView && !hasQueriableIndex && requiresAllowFilteringIfNotSpecified(table))
if (!allowFiltering && !forView && !hasQueriableIndex && requiresAllowFilteringIfNotSpecified(table, true))
throw new InvalidRequestException(allowFilteringMessage(state));

isKeyRange = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1474,7 +1474,7 @@ private StatementRestrictions prepareRestrictions(ClientState state,
boundNames,
orderings,
selectsOnlyStaticColumns,
parameters.allowFiltering || !requiresAllowFilteringIfNotSpecified(metadata),
parameters.allowFiltering || !requiresAllowFilteringIfNotSpecified(metadata, true),
forView);
}

Expand Down Expand Up @@ -1700,7 +1700,7 @@ private void checkNeedsFiltering(TableMetadata table, StatementRestrictions rest
{
// We will potentially filter data if the row filter is not the identity and there isn't any index group
// supporting all the expressions in the filter.
if (requiresAllowFilteringIfNotSpecified(table))
if (requiresAllowFilteringIfNotSpecified(table, true))
checkFalse(restrictions.needFiltering(table), StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ public PartitionIterator execute(ConsistencyLevel consistency, ClientState state
public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
{
VirtualTable view = VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id);
UnfilteredPartitionIterator resultIterator = view.select(dataRange, columnFilter(), rowFilter());
UnfilteredPartitionIterator resultIterator = view.select(dataRange, columnFilter(), rowFilter(), limits());
return limits().filter(rowFilter().filter(resultIterator, nowInSec()), nowInSec(), selectsFullPartition());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1516,7 +1516,7 @@ public PartitionIterator execute(ConsistencyLevel consistency, ClientState state
public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
{
VirtualTable view = VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id);
UnfilteredPartitionIterator resultIterator = view.select(partitionKey, clusteringIndexFilter, columnFilter(), rowFilter());
UnfilteredPartitionIterator resultIterator = view.select(partitionKey, clusteringIndexFilter, columnFilter(), rowFilter(), limits());
return limits().filter(rowFilter().filter(resultIterator, nowInSec()), nowInSec(), selectsFullPartition());
}

Expand Down
46 changes: 45 additions & 1 deletion src/java/org/apache/cassandra/db/filter/ColumnFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.btree.BTree;

/**
* Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected
Expand Down Expand Up @@ -64,7 +65,6 @@
*/
public abstract class ColumnFilter
{

public static final ColumnFilter NONE = selection(RegularAndStaticColumns.NONE);

public static final Serializer serializer = new Serializer();
Expand Down Expand Up @@ -305,6 +305,11 @@ public boolean isWildcard()
return false;
}

/**
* Rebinds matching columns into a new filter; ignores any missing but fails if any are a different type
*/
public abstract ColumnFilter rebind(TableMetadata newTable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a dangerous feature to generally expose. Could we add an assert that this is for virtual tables only, and add a comment that this is not for general use?


/**
* Returns the CQL string corresponding to this {@code ColumnFilter}.
*
Expand Down Expand Up @@ -630,6 +635,12 @@ public boolean isWildcard()
return true;
}

@Override
public ColumnFilter rebind(TableMetadata newTable)
{
return new WildCardColumnFilter(ColumnFilter.rebind(newTable, fetchedAndQueried));
}

@Override
protected SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections()
{
Expand Down Expand Up @@ -779,6 +790,17 @@ public Tester newTester(ColumnMetadata column)
return new Tester(fetchingStrategy.fetchesAllColumns(column.isStatic()), s.iterator());
}

@Override
public ColumnFilter rebind(TableMetadata newTable)
{
RegularAndStaticColumns queried = ColumnFilter.rebind(newTable, this.queried);
RegularAndStaticColumns fetched = this.queried == this.fetched ? queried : ColumnFilter.rebind(newTable, this.fetched);
SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections = this.subSelections;
if (subSelections != null)
subSelections = TreeMultimap.create(subSelections);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate, why we need to re-wrap this? I thought this should be effectively immutable.

return new SelectionColumnFilter(fetchingStrategy, queried, fetched, subSelections);
}

@Override
protected SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections()
{
Expand Down Expand Up @@ -1003,4 +1025,26 @@ private long subSelectionsSerializedSize(SortedSetMultimap<ColumnIdentifier, Col
return size;
}
}

private static RegularAndStaticColumns rebind(TableMetadata newTable, RegularAndStaticColumns columns)
{
return new RegularAndStaticColumns(rebind(newTable, columns.statics), rebind(newTable, columns.regulars));
}

private static Columns rebind(TableMetadata newTable, Columns columns)
{
if (columns.isEmpty())
return columns;

try (BTree.FastBuilder<ColumnMetadata> builder = BTree.fastBuilder())
{
for (ColumnMetadata in : columns)
{
ColumnMetadata out = newTable.getColumn(in.name);
if (out != null)
builder.add(out);
}
return Columns.from(builder);
}
}
}
50 changes: 49 additions & 1 deletion src/java/org/apache/cassandra/db/filter/RowFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void addCustomIndexExpression(TableMetadata metadata, IndexMetadata targe
add(new CustomExpression(metadata, targetIndex, value));
}

private void add(Expression expression)
public void add(Expression expression)
{
expression.validate();
expressions.add(expression);
Expand Down Expand Up @@ -549,6 +549,28 @@ public void validateForIndexing()
"Index expression values may not be larger than 64K");
}

/**
* Rebind this expression to a table metadata that is expected to have equivalent columns.
* If any referenced column is missing, returns null;
* if any referenced column has a different type throws an exception
*/
public Expression rebind(TableMetadata newTable)
{
throw new UnsupportedOperationException("Expression " + toString(true) + " does not support rebinding to another table definition");
}

protected static ColumnMetadata rebind(ColumnMetadata in, TableMetadata newTable)
{
ColumnMetadata out = newTable.getColumn(in.name);
if (out == null)
return null;

if (!out.type.equals(in.type) && !out.type.isCompatibleWith(in.type) || !in.type.isCompatibleWith(out.type))
throw new IllegalArgumentException("The provided TableMetadata is not compatible with the expression");

return out;
}

/**
* Returns whether the provided row satisfied this expression or not.
*
Expand Down Expand Up @@ -734,6 +756,16 @@ public static class SimpleExpression extends Expression
super(column, operator, value);
}

@Override
public Expression rebind(TableMetadata newTable)
{
ColumnMetadata out = rebind(column, newTable);
if (out == null)
return null;

return new SimpleExpression(out, operator, value);
}

@Override
public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey partitionKey, Row row, long nowInSec)
{
Expand Down Expand Up @@ -853,6 +885,16 @@ public void validate() throws InvalidRequestException
checkBindValueSet(value, "Unsupported unset map value for column %s", column.name);
}

@Override
public Expression rebind(TableMetadata newTable)
{
ColumnMetadata out = rebind(column, newTable);
if (out == null)
return null;

return new MapElementExpression(out, key, operator, value);
}

@Override
public ByteBuffer getIndexValue()
{
Expand Down Expand Up @@ -978,6 +1020,12 @@ protected Kind kind()
return Kind.CUSTOM;
}

@Override
public Expression rebind(TableMetadata newTable)
{
return new CustomExpression(table, targetIndex, value);
}

// Filtering by custom expressions isn't supported yet, so just accept any row
@Override
public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey partitionKey, Row row, long nowInSec)
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/db/marshal/TxnIdUtf8Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public <V> void validate(V value, ValueAccessor<V> accessor) throws MarshalExcep

String describe() { return "TxnId"; }

@Override
public boolean isEmptyValueMeaningless()
{
return true;
}

@Override
public TypeSerializer<String> getSerializer()
{
Expand Down
Loading