Skip to content

Commit 99b27c3

Browse files
CNDB-13403: Add source column to synthetic column's ColumnMetadata (#1641)
### What is the issue Fixes riptano/cndb#13403 by implementing one of the solutions. ### What does this PR fix and why was it fixed The current synthetic column logic relies on convention to know that the synthetic column's source column is the SAI ORDER BY column. This seems fragile and will break if we find a reason to introduce a new synthetic column. I propose that we serialize the source column name so that we can more easily support multiple synthetic columns. The urgency for this change is that we want to get it in before we are locked into a given serde protocol. Note however, that I haven't solved the primary issue described in riptano/cndb#13402. CNDB tests riptano/cndb#13418
1 parent 83a5b3a commit 99b27c3

File tree

6 files changed

+59
-22
lines changed

6 files changed

+59
-22
lines changed

src/java/org/apache/cassandra/cql3/statements/SelectStatement.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1282,7 +1282,7 @@ private Map<ColumnMetadata, Ordering> getScoreOrdering(List<Ordering> orderings)
12821282

12831283
// Create synthetic score column
12841284
ColumnMetadata sourceColumn = expr.getColumn();
1285-
var cm = ColumnMetadata.syntheticColumn(sourceColumn.ksName, sourceColumn.cfName, ColumnMetadata.SYNTHETIC_SCORE_ID, FloatType.instance);
1285+
var cm = ColumnMetadata.syntheticScoreColumn(sourceColumn, FloatType.instance);
12861286
return Map.of(cm, orderings.get(0));
12871287
}
12881288

src/java/org/apache/cassandra/db/Columns.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,7 @@ public void serialize(Columns columns, DataOutputPlus out) throws IOException
487487
if (column.isSynthetic())
488488
{
489489
ByteBufferUtil.writeWithVIntLength(column.name.bytes, out);
490+
ByteBufferUtil.writeWithVIntLength(column.sythenticSourceColumn.bytes, out);
490491
typeSerializer.serialize(column.type, out);
491492
}
492493
}
@@ -519,6 +520,7 @@ public long serializedSize(Columns columns)
519520
{
520521
syntheticCount++;
521522
size += ByteBufferUtil.serializedSizeWithVIntLength(column.name.bytes);
523+
size += ByteBufferUtil.serializedSizeWithVIntLength(column.sythenticSourceColumn.bytes);
522524
size += typeSerializer.serializedSize(column.type);
523525
}
524526
else
@@ -544,12 +546,22 @@ public Columns deserialize(DataInputPlus in, TableMetadata metadata) throws IOEx
544546
for (int i = 0; i < syntheticCount; i++)
545547
{
546548
ByteBuffer name = ByteBufferUtil.readWithVIntLength(in);
549+
ByteBuffer sourceColumnName = ByteBufferUtil.readWithVIntLength(in);
547550
AbstractType<?> type = typeSerializer.deserialize(in);
548551

549552
if (!name.equals(ColumnMetadata.SYNTHETIC_SCORE_ID.bytes))
550553
throw new IllegalStateException("Unknown synthetic column " + UTF8Type.instance.getString(name));
551554

552-
ColumnMetadata column = ColumnMetadata.syntheticColumn(metadata.keyspace, metadata.name, ColumnMetadata.SYNTHETIC_SCORE_ID, type);
555+
ColumnMetadata sourceColumn = metadata.getColumn(sourceColumnName);
556+
if (sourceColumn == null)
557+
{
558+
// If we don't find the definition, it could be we have data for a dropped column
559+
sourceColumn = metadata.getDroppedColumn(name);
560+
if (sourceColumn == null)
561+
throw new RuntimeException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization of " + metadata.keyspace + '.' + metadata.name);
562+
}
563+
564+
ColumnMetadata column = ColumnMetadata.syntheticScoreColumn(sourceColumn, type);
553565
builder.add(column);
554566
}
555567

src/java/org/apache/cassandra/db/ReadCommand.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1062,7 +1062,8 @@ public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
10621062
// synthetic columns sort first, so when we hit the first non-synthetic, we're done
10631063
if (!c.isSynthetic())
10641064
break;
1065-
tmb.addColumn(ColumnMetadata.syntheticColumn(c.ksName, c.cfName, c.name, c.type));
1065+
assert c.sythenticSourceColumn != null;
1066+
tmb.addColumn(c);
10661067
}
10671068
metadata = tmb.build();
10681069

src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.apache.cassandra.db.PartitionPosition;
4444
import org.apache.cassandra.db.ReadCommand;
4545
import org.apache.cassandra.db.ReadExecutionController;
46-
import org.apache.cassandra.db.filter.ColumnFilter;
4746
import org.apache.cassandra.db.marshal.FloatType;
4847
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
4948
import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
@@ -124,7 +123,8 @@ public UnfilteredPartitionIterator search(ReadExecutionController executionContr
124123
assert !(keysIterator instanceof KeyRangeIterator);
125124
var scoredKeysIterator = (CloseableIterator<PrimaryKeyWithSortKey>) keysIterator;
126125
var result = new ScoreOrderedResultRetriever(scoredKeysIterator, filterTree, controller,
127-
executionController, queryContext, command.limits().count());
126+
executionController, queryContext, command.limits().count(),
127+
ordering.context.getDefinition());
128128
return new TopKProcessor(command).filter(result);
129129
}
130130
catch (QueryView.Builder.MissingIndexException e)
@@ -479,6 +479,10 @@ public static class ScoreOrderedResultRetriever extends AbstractIterator<Unfilte
479479
private final HashSet<PrimaryKey> processedKeys;
480480
private final Queue<UnfilteredRowIterator> pendingRows;
481481

482+
// Null indicates we are not sending the synthetic score column to the coordinator
483+
@Nullable
484+
private final ColumnMetadata syntheticScoreColumn;
485+
482486
// The limit requested by the query. We cannot load more than softLimit rows in bulk because we only want
483487
// to fetch the topk rows where k is the limit. However, we allow the iterator to fetch more rows than the
484488
// soft limit to avoid confusing behavior. When the softLimit is reached, the iterator will fetch one row
@@ -491,7 +495,8 @@ private ScoreOrderedResultRetriever(CloseableIterator<PrimaryKeyWithSortKey> sco
491495
QueryController controller,
492496
ReadExecutionController executionController,
493497
QueryContext queryContext,
494-
int limit)
498+
int limit,
499+
ColumnMetadata orderedColumn)
495500
{
496501
IndexContext context = controller.getOrderer().context;
497502
this.view = controller.getQueryView(context).view;
@@ -507,6 +512,13 @@ private ScoreOrderedResultRetriever(CloseableIterator<PrimaryKeyWithSortKey> sco
507512
this.processedKeys = new HashSet<>(limit);
508513
this.pendingRows = new ArrayDeque<>(limit);
509514
this.softLimit = limit;
515+
516+
// When +score is added on the coordinator side, it's represented as a PrecomputedColumnFilter
517+
// even in a 'SELECT *' because WCF is not capable of representing synthetic columns.
518+
// This can be simplified when we remove ANN_USE_SYNTHETIC_SCORE
519+
var tempColumn = ColumnMetadata.syntheticScoreColumn(orderedColumn, FloatType.instance);
520+
var isScoreFetched = controller.command().columnFilter().fetchesExplicitly(tempColumn);
521+
this.syntheticScoreColumn = isScoreFetched ? tempColumn : null;
510522
}
511523

512524
@Override
@@ -663,7 +675,8 @@ public UnfilteredRowIterator readAndValidatePartition(PrimaryKey pk, List<Primar
663675
}
664676
}
665677
}
666-
return isRowValid ? new PrimaryKeyIterator(partition, staticRow, row, sourceKeys, controller.command()) : null;
678+
return isRowValid ? new PrimaryKeyIterator(partition, staticRow, row, sourceKeys, syntheticScoreColumn)
679+
: null;
667680
}
668681
}
669682

@@ -684,7 +697,7 @@ public static class PrimaryKeyIterator extends AbstractUnfilteredRowIterator
684697
private boolean consumed = false;
685698
private final Unfiltered row;
686699

687-
public PrimaryKeyIterator(UnfilteredRowIterator partition, Row staticRow, Unfiltered content, List<PrimaryKeyWithSortKey> primaryKeysWithScore, ReadCommand command)
700+
public PrimaryKeyIterator(UnfilteredRowIterator partition, Row staticRow, Unfiltered content, List<PrimaryKeyWithSortKey> primaryKeysWithScore, ColumnMetadata syntheticScoreColumn)
688701
{
689702
super(partition.metadata(),
690703
partition.partitionKey(),
@@ -702,16 +715,8 @@ public PrimaryKeyIterator(UnfilteredRowIterator partition, Row staticRow, Unfilt
702715
return;
703716
}
704717

705-
// When +score is added on the coordinator side, it's represented as a PrecomputedColumnFilter
706-
// even in a 'SELECT *' because WCF is not capable of representing synthetic columns.
707-
// This can be simplified when we remove ANN_USE_SYNTHETIC_SCORE
708-
var tm = metadata();
709-
var scoreColumn = ColumnMetadata.syntheticColumn(tm.keyspace,
710-
tm.name,
711-
ColumnMetadata.SYNTHETIC_SCORE_ID,
712-
FloatType.instance);
713-
var isScoreFetched = command.columnFilter().fetchesExplicitly(scoreColumn);
714-
if (!isScoreFetched)
718+
719+
if (syntheticScoreColumn == null)
715720
{
716721
this.row = content;
717722
return;
@@ -724,7 +729,7 @@ public PrimaryKeyIterator(UnfilteredRowIterator partition, Row staticRow, Unfilt
724729

725730
// inject +score as a new column
726731
var pkWithScore = (PrimaryKeyWithScore) primaryKeysWithScore.get(0);
727-
columnData.add(BufferCell.live(scoreColumn,
732+
columnData.add(BufferCell.live(syntheticScoreColumn,
728733
FBUtilities.nowInSeconds(),
729734
FloatType.instance.decompose(pkWithScore.indexScore)));
730735

src/java/org/apache/cassandra/index/sai/plan/TopKProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public TopKProcessor(ReadCommand command)
110110
else
111111
this.queryVector = null;
112112
this.limit = command.limits().count();
113-
this.scoreColumn = ColumnMetadata.syntheticColumn(indexContext.getKeyspace(), indexContext.getTable(), ColumnMetadata.SYNTHETIC_SCORE_ID, FloatType.instance);
113+
this.scoreColumn = ColumnMetadata.syntheticScoreColumn(expression.column(), FloatType.instance);
114114
}
115115

116116
/**

src/java/org/apache/cassandra/schema/ColumnMetadata.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ public boolean isPrimaryKeyKind()
117117
private final Comparator<Object> asymmetricCellPathComparator;
118118
private final Comparator<? super Cell<?>> cellComparator;
119119

120+
// When the kind is SYNTHETIC, this is the column from which the synthetic column is derived
121+
public final ColumnIdentifier sythenticSourceColumn;
122+
120123
private int hash;
121124

122125
/**
@@ -184,9 +187,9 @@ public static ColumnMetadata staticColumn(String keyspace, String table, String
184187
/**
185188
* Creates a new synthetic column metadata instance.
186189
*/
187-
public static ColumnMetadata syntheticColumn(String keyspace, String table, ColumnIdentifier id, AbstractType<?> type)
190+
public static ColumnMetadata syntheticScoreColumn(ColumnMetadata sourceColumn, AbstractType<?> type)
188191
{
189-
return new ColumnMetadata(keyspace, table, id, type, NO_POSITION, Kind.SYNTHETIC);
192+
return new ColumnMetadata(sourceColumn.ksName, sourceColumn.cfName, SYNTHETIC_SCORE_ID, type, NO_POSITION, Kind.SYNTHETIC, false, sourceColumn.name);
190193
}
191194

192195
/**
@@ -236,6 +239,18 @@ public ColumnMetadata(String ksName,
236239
int position,
237240
Kind kind,
238241
boolean isDropped)
242+
{
243+
this(ksName, cfName, name, type, position, kind, isDropped, null);
244+
}
245+
246+
public ColumnMetadata(String ksName,
247+
String cfName,
248+
ColumnIdentifier name,
249+
AbstractType<?> type,
250+
int position,
251+
Kind kind,
252+
boolean isDropped,
253+
ColumnIdentifier sythenticSourceColumnName)
239254
{
240255
super(ksName, cfName, name, type);
241256
assert name != null && type != null && kind != null;
@@ -263,6 +278,10 @@ public int compare(Object a, Object b)
263278
};
264279
this.comparisonOrder = comparisonOrder(kind, isComplex(), Math.max(0, position), name);
265280
this.isDropped = isDropped;
281+
282+
// Synthetic columns are the only ones that can have a source column
283+
assert kind == Kind.SYNTHETIC || sythenticSourceColumnName == null;
284+
this.sythenticSourceColumn = sythenticSourceColumnName;
266285
}
267286

268287
private static Comparator<CellPath> makeCellPathComparator(Kind kind, AbstractType<?> type)

0 commit comments

Comments
 (0)