Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "modules/accord"]
path = modules/accord
url = https://github.com/apache/cassandra-accord.git
branch = trunk
url = https://github.com/belliottsmith/cassandra-accord.git
branch = rebootstrap-sq
2 changes: 1 addition & 1 deletion modules/accord
Submodule accord updated 94 files
+1 −5 accord-core/src/main/java/accord/api/Agent.java
+8 −1 accord-core/src/main/java/accord/api/ConfigurationService.java
+9 −1 accord-core/src/main/java/accord/api/DataStore.java
+6 −1 accord-core/src/main/java/accord/api/Journal.java
+39 −0 accord-core/src/main/java/accord/api/OwnershipEventListener.java
+10 −5 accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
+1 −1 accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
+6 −5 accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java
+20 −15 accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
+16 −4 accord-core/src/main/java/accord/coordinate/FetchRoute.java
+14 −10 accord-core/src/main/java/accord/coordinate/Infer.java
+35 −8 accord-core/src/main/java/accord/coordinate/Invalidate.java
+36 −7 accord-core/src/main/java/accord/coordinate/KeyBarriers.java
+26 −54 accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+17 −10 accord-core/src/main/java/accord/impl/CommandChange.java
+12 −16 accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+1 −4 accord-core/src/main/java/accord/impl/progresslog/PackedKeyTracker.java
+34 −22 accord-core/src/main/java/accord/local/Bootstrap.java
+32 −8 accord-core/src/main/java/accord/local/Cleanup.java
+1 −1 accord-core/src/main/java/accord/local/Command.java
+224 −60 accord-core/src/main/java/accord/local/CommandStore.java
+126 −178 accord-core/src/main/java/accord/local/CommandStores.java
+13 −7 accord-core/src/main/java/accord/local/Commands.java
+39 −0 accord-core/src/main/java/accord/local/LogUnavailableException.java
+68 −0 accord-core/src/main/java/accord/local/MapReduceCommandStores.java
+95 −0 accord-core/src/main/java/accord/local/MapReduceConsumeCommandStores.java
+16 −52 accord-core/src/main/java/accord/local/Node.java
+126 −137 accord-core/src/main/java/accord/local/RedundantBefore.java
+89 −112 accord-core/src/main/java/accord/local/RedundantStatus.java
+24 −6 accord-core/src/main/java/accord/local/SafeCommandStore.java
+65 −65 accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+18 −18 accord-core/src/main/java/accord/local/cfk/PostProcess.java
+12 −12 accord-core/src/main/java/accord/local/cfk/Pruning.java
+19 −19 accord-core/src/main/java/accord/local/cfk/Serialize.java
+21 −21 accord-core/src/main/java/accord/local/cfk/Updating.java
+1 −0 accord-core/src/main/java/accord/local/durability/DurabilityService.java
+1 −2 accord-core/src/main/java/accord/local/durability/GlobalDurability.java
+0 −1 accord-core/src/main/java/accord/local/durability/ShardDurability.java
+10 −18 accord-core/src/main/java/accord/messages/Accept.java
+5 −6 accord-core/src/main/java/accord/messages/Apply.java
+3 −3 accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
+44 −31 accord-core/src/main/java/accord/messages/Await.java
+6 −9 accord-core/src/main/java/accord/messages/BeginInvalidation.java
+4 −3 accord-core/src/main/java/accord/messages/BeginRecovery.java
+20 −29 accord-core/src/main/java/accord/messages/CheckStatus.java
+68 −25 accord-core/src/main/java/accord/messages/Commit.java
+3 −3 accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java
+3 −3 accord-core/src/main/java/accord/messages/GetLatestDeps.java
+3 −3 accord-core/src/main/java/accord/messages/GetMaxConflict.java
+3 −3 accord-core/src/main/java/accord/messages/InformDurable.java
+21 −12 accord-core/src/main/java/accord/messages/NoWaitRequest.java
+15 −59 accord-core/src/main/java/accord/messages/ParticipantsRequest.java
+3 −3 accord-core/src/main/java/accord/messages/PreAccept.java
+28 −15 accord-core/src/main/java/accord/messages/Propagate.java
+26 −16 accord-core/src/main/java/accord/messages/ReadData.java
+11 −8 accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java
+78 −0 accord-core/src/main/java/accord/messages/RouteRequest.java
+11 −7 accord-core/src/main/java/accord/messages/SetShardDurable.java
+10 −2 accord-core/src/main/java/accord/messages/StableThenRead.java
+1 −1 accord-core/src/main/java/accord/primitives/LatestDeps.java
+10 −8 accord-core/src/main/java/accord/primitives/Ranges.java
+6 −0 accord-core/src/main/java/accord/utils/DeterministicSet.java
+6 −0 accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java
+1 −10 accord-core/src/main/java/accord/utils/MapReduceConsume.java
+10 −7 accord-core/src/main/java/accord/utils/async/AsyncResults.java
+9 −11 accord-core/src/test/java/accord/burn/BurnTestBase.java
+3 −10 accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
+0 −26 accord-core/src/test/java/accord/burn/TopologyUpdates.java
+4 −5 accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
+3 −3 accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
+62 −0 accord-core/src/test/java/accord/impl/AbstractTestConfigurationService.java
+3 −3 accord-core/src/test/java/accord/impl/MessageListener.java
+8 −1 accord-core/src/test/java/accord/impl/TestAgent.java
+86 −51 accord-core/src/test/java/accord/impl/basic/Cluster.java
+0 −12 accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+20 −4 accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+2 −2 accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
+74 −28 accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
+24 −9 accord-core/src/test/java/accord/impl/list/ListAgent.java
+2 −1 accord-core/src/test/java/accord/impl/list/ListData.java
+1 −1 accord-core/src/test/java/accord/impl/list/ListStore.java
+2 −2 accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java
+1 −1 accord-core/src/test/java/accord/impl/mock/MockStore.java
+6 −11 accord-core/src/test/java/accord/local/RedundantBeforeTest.java
+9 −2 accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
+1 −3 accord-core/src/test/java/accord/messages/ReadDataTest.java
+11 −11 accord-core/src/test/java/accord/messages/RouteRequestScopeTest.java
+1 −1 accord-core/src/test/java/accord/topology/TopologyManagerTest.java
+105 −26 accord-core/src/test/java/accord/topology/TopologyRandomizer.java
+2 −2 accord-core/src/test/java/accord/utils/ExtendedAssertions.java
+1 −1 accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+8 −1 accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
+1 −1 accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java
+2 −1 accord-maelstrom/src/main/java/accord/maelstrom/SimpleConfigService.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,13 @@ public static StatementRestrictions empty(StatementType type, TableMetadata tabl
return new StatementRestrictions(type, table, IndexHints.NONE, false);
}

private StatementRestrictions(StatementType type, TableMetadata table, IndexHints indexHints, boolean allowFiltering)
private StatementRestrictions(StatementType type, TableMetadata table, IndexHints indexHints, boolean allowFilteringOfPrimaryKeys)
{
this.type = type;
this.table = table;
this.indexHints = indexHints;
this.partitionKeyRestrictions = new PartitionKeyRestrictions(table.partitionKeyAsClusteringComparator());
this.clusteringColumnsRestrictions = new ClusteringColumnRestrictions(table, allowFiltering);
this.clusteringColumnsRestrictions = new ClusteringColumnRestrictions(table, allowFilteringOfPrimaryKeys);
this.nonPrimaryKeyRestrictions = RestrictionSet.empty();
this.notNullColumns = new HashSet<>();
}
Expand Down Expand Up @@ -370,7 +370,7 @@ else if (operator.requiresIndexing())
}
else
{
if (!allowFiltering && requiresAllowFilteringIfNotSpecified(table))
if (!allowFiltering && requiresAllowFilteringIfNotSpecified(table, false))
throw invalidRequest(allowFilteringMessage(state));
}

Expand All @@ -381,14 +381,14 @@ else if (operator.requiresIndexing())
validateSecondaryIndexSelections();
}

public static boolean requiresAllowFilteringIfNotSpecified(TableMetadata metadata)
public static boolean requiresAllowFilteringIfNotSpecified(TableMetadata metadata, boolean isPrimaryKey)
{
if (!metadata.isVirtual())
return true;

VirtualTable tableNullable = VirtualKeyspaceRegistry.instance.getTableNullable(metadata.id);
assert tableNullable != null;
return !tableNullable.allowFilteringImplicitly();
return isPrimaryKey ? !tableNullable.allowFilteringPrimaryKeysImplicitly() : !tableNullable.allowFilteringImplicitly();
}

private void addRestriction(Restriction restriction, IndexRegistry indexRegistry, IndexHints indexHints)
Expand Down Expand Up @@ -593,7 +593,7 @@ private void processPartitionKeyRestrictions(ClientState state, boolean hasQueri
// components must have a EQ. Only the last partition key component can be in IN relation.
if (partitionKeyRestrictions.needFiltering())
{
if (!allowFiltering && !forView && !hasQueriableIndex && requiresAllowFilteringIfNotSpecified(table))
if (!allowFiltering && !forView && !hasQueriableIndex && requiresAllowFilteringIfNotSpecified(table, true))
throw new InvalidRequestException(allowFilteringMessage(state));

isKeyRange = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1474,7 +1474,7 @@ private StatementRestrictions prepareRestrictions(ClientState state,
boundNames,
orderings,
selectsOnlyStaticColumns,
parameters.allowFiltering || !requiresAllowFilteringIfNotSpecified(metadata),
parameters.allowFiltering || !requiresAllowFilteringIfNotSpecified(metadata, true),
forView);
}

Expand Down Expand Up @@ -1700,7 +1700,7 @@ private void checkNeedsFiltering(TableMetadata table, StatementRestrictions rest
{
// We will potentially filter data if the row filter is not the identity and there isn't any index group
// supporting all the expressions in the filter.
if (requiresAllowFilteringIfNotSpecified(table))
if (requiresAllowFilteringIfNotSpecified(table, true))
checkFalse(restrictions.needFiltering(table), StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ public PartitionIterator execute(ConsistencyLevel consistency, ClientState state
public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
{
VirtualTable view = VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id);
UnfilteredPartitionIterator resultIterator = view.select(dataRange, columnFilter(), rowFilter());
UnfilteredPartitionIterator resultIterator = view.select(dataRange, columnFilter(), rowFilter(), limits());
return limits().filter(rowFilter().filter(resultIterator, nowInSec()), nowInSec(), selectsFullPartition());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1516,7 +1516,7 @@ public PartitionIterator execute(ConsistencyLevel consistency, ClientState state
public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
{
VirtualTable view = VirtualKeyspaceRegistry.instance.getTableNullable(metadata().id);
UnfilteredPartitionIterator resultIterator = view.select(partitionKey, clusteringIndexFilter, columnFilter(), rowFilter());
UnfilteredPartitionIterator resultIterator = view.select(partitionKey, clusteringIndexFilter, columnFilter(), rowFilter(), limits());
return limits().filter(rowFilter().filter(resultIterator, nowInSec()), nowInSec(), selectsFullPartition());
}

Expand Down
46 changes: 45 additions & 1 deletion src/java/org/apache/cassandra/db/filter/ColumnFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.btree.BTree;

/**
* Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected
Expand Down Expand Up @@ -64,7 +65,6 @@
*/
public abstract class ColumnFilter
{

public static final ColumnFilter NONE = selection(RegularAndStaticColumns.NONE);

public static final Serializer serializer = new Serializer();
Expand Down Expand Up @@ -305,6 +305,11 @@ public boolean isWildcard()
return false;
}

/**
* Rebinds matching columns into a new filter; ignores any missing but fails if any are a different type
*/
public abstract ColumnFilter rebind(TableMetadata newTable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a dangerous feature to generally expose. Could we add an assert that this is for virtual tables only, and add a comment that this is not for general use?


/**
* Returns the CQL string corresponding to this {@code ColumnFilter}.
*
Expand Down Expand Up @@ -630,6 +635,12 @@ public boolean isWildcard()
return true;
}

@Override
public ColumnFilter rebind(TableMetadata newTable)
{
return new WildCardColumnFilter(ColumnFilter.rebind(newTable, fetchedAndQueried));
}

@Override
protected SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections()
{
Expand Down Expand Up @@ -779,6 +790,17 @@ public Tester newTester(ColumnMetadata column)
return new Tester(fetchingStrategy.fetchesAllColumns(column.isStatic()), s.iterator());
}

@Override
public ColumnFilter rebind(TableMetadata newTable)
{
RegularAndStaticColumns queried = ColumnFilter.rebind(newTable, this.queried);
RegularAndStaticColumns fetched = this.queried == this.fetched ? queried : ColumnFilter.rebind(newTable, this.fetched);
SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections = this.subSelections;
if (subSelections != null)
subSelections = TreeMultimap.create(subSelections);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate, why we need to re-wrap this? I thought this should be effectively immutable.

return new SelectionColumnFilter(fetchingStrategy, queried, fetched, subSelections);
}

@Override
protected SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections()
{
Expand Down Expand Up @@ -1003,4 +1025,26 @@ private long subSelectionsSerializedSize(SortedSetMultimap<ColumnIdentifier, Col
return size;
}
}

private static RegularAndStaticColumns rebind(TableMetadata newTable, RegularAndStaticColumns columns)
{
return new RegularAndStaticColumns(rebind(newTable, columns.statics), rebind(newTable, columns.regulars));
}

private static Columns rebind(TableMetadata newTable, Columns columns)
{
if (columns.isEmpty())
return columns;

try (BTree.FastBuilder<ColumnMetadata> builder = BTree.fastBuilder())
{
for (ColumnMetadata in : columns)
{
ColumnMetadata out = newTable.getColumn(in.name);
if (out != null)
builder.add(out);
}
return Columns.from(builder);
}
}
}
50 changes: 49 additions & 1 deletion src/java/org/apache/cassandra/db/filter/RowFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void addCustomIndexExpression(TableMetadata metadata, IndexMetadata targe
add(new CustomExpression(metadata, targetIndex, value));
}

private void add(Expression expression)
public void add(Expression expression)
{
expression.validate();
expressions.add(expression);
Expand Down Expand Up @@ -549,6 +549,28 @@ public void validateForIndexing()
"Index expression values may not be larger than 64K");
}

/**
* Rebind this expression to a table metadata that is expected to have equivalent columns.
* If any referenced column is missing, returns null;
* if any referenced column has a different type throws an exception
*/
public Expression rebind(TableMetadata newTable)
{
throw new UnsupportedOperationException("Expression " + toString(true) + " does not support rebinding to another table definition");
}

protected static ColumnMetadata rebind(ColumnMetadata in, TableMetadata newTable)
{
ColumnMetadata out = newTable.getColumn(in.name);
if (out == null)
return null;

if (!out.type.equals(in.type) && !out.type.isCompatibleWith(in.type) || !in.type.isCompatibleWith(out.type))
throw new IllegalArgumentException("The provided TableMetadata is not compatible with the expression");

return out;
}

/**
* Returns whether the provided row satisfied this expression or not.
*
Expand Down Expand Up @@ -734,6 +756,16 @@ public static class SimpleExpression extends Expression
super(column, operator, value);
}

@Override
public Expression rebind(TableMetadata newTable)
{
ColumnMetadata out = rebind(column, newTable);
if (out == null)
return null;

return new SimpleExpression(out, operator, value);
}

@Override
public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey partitionKey, Row row, long nowInSec)
{
Expand Down Expand Up @@ -853,6 +885,16 @@ public void validate() throws InvalidRequestException
checkBindValueSet(value, "Unsupported unset map value for column %s", column.name);
}

@Override
public Expression rebind(TableMetadata newTable)
{
ColumnMetadata out = rebind(column, newTable);
if (out == null)
return null;

return new MapElementExpression(out, key, operator, value);
}

@Override
public ByteBuffer getIndexValue()
{
Expand Down Expand Up @@ -978,6 +1020,12 @@ protected Kind kind()
return Kind.CUSTOM;
}

@Override
public Expression rebind(TableMetadata newTable)
{
return new CustomExpression(table, targetIndex, value);
}

// Filtering by custom expressions isn't supported yet, so just accept any row
@Override
public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey partitionKey, Row row, long nowInSec)
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/db/marshal/TxnIdUtf8Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public <V> void validate(V value, ValueAccessor<V> accessor) throws MarshalExcep

String describe() { return "TxnId"; }

@Override
public boolean isEmptyValueMeaningless()
{
return true;
}

@Override
public TypeSerializer<String> getSerializer()
{
Expand Down
Loading