diff --git a/kernel-default/src/main/java/io/delta/core/data/JsonRow.java b/kernel-default/src/main/java/io/delta/core/data/JsonRow.java
index 117ee7489..30bd508a7 100644
--- a/kernel-default/src/main/java/io/delta/core/data/JsonRow.java
+++ b/kernel-default/src/main/java/io/delta/core/data/JsonRow.java
@@ -114,21 +114,21 @@ private static Object decodeElement(JsonNode jsonValue, DataType dataType) {
}
private static Object decodeField(ObjectNode rootNode, StructField field) {
- if (rootNode.get(field.name) == null) {
- if (field.nullable) {
+ if (rootNode.get(field.getName()) == null) {
+ if (field.isNullable()) {
return null;
}
throw new RuntimeException(
String.format(
"Root node at key %s is null but field isn't nullable. Root node: %s",
- field.name,
+ field.getName(),
rootNode
)
);
}
- return decodeElement(rootNode.get(field.name), field.dataType);
+ return decodeElement(rootNode.get(field.getName()), field.getDataType());
}
////////////////////////////////////////////////////////////////////////////////
@@ -216,7 +216,7 @@ public String toString() {
////////////////////////////////////////
private void assertType(int ordinal, DataType expectedType) {
- final String actualTypeName = readSchema.at(ordinal).dataType.typeName();
+ final String actualTypeName = readSchema.at(ordinal).getDataType().typeName();
if (!actualTypeName.equals(expectedType.typeName()) &&
!actualTypeName.equals(UnresolvedDataType.INSTANCE.typeName())) {
throw new RuntimeException(
diff --git a/kernel-default/src/test/scala/io/delta/core/internal/TableSuite.scala b/kernel-default/src/test/scala/io/delta/core/internal/TableSuite.scala
index e2f9f7431..4cf21298c 100644
--- a/kernel-default/src/test/scala/io/delta/core/internal/TableSuite.scala
+++ b/kernel-default/src/test/scala/io/delta/core/internal/TableSuite.scala
@@ -19,6 +19,7 @@ package io.delta.core.internal
import scala.collection.JavaConverters._
import io.delta.core.Table
+import io.delta.core.expressions.{And, EqualTo, Literal}
import io.delta.core.helpers.DefaultTableHelper
import io.delta.core.util.GoldenTableUtils
import org.scalatest.funsuite.AnyFunSuite
@@ -75,4 +76,35 @@ class TableSuite extends AnyFunSuite with GoldenTableUtils {
snapshot.getAddFiles.forEachRemaining(x => println(x))
}
}
+
+ test("can perform partition pruning - basic - no checkpoint") {
+ withGoldenTable("basic-partitioned-no-checkpoint") { path =>
+ val table = Table.forPath(path, new DefaultTableHelper())
+ val snapshot = table.getLatestSnapshot.asInstanceOf[SnapshotImpl]
+ val schema = snapshot.getSchema
+
+ val partitionFilter1 = new EqualTo(schema.column("part_a"), Literal.of(0L));
+ val scan1 = snapshot.getScanBuilder().withFilter(partitionFilter1).build()
+ scan1
+ .getTasks
+ .asScala
+ .map(task => task.asInstanceOf[ScanTaskImpl].getAddFile)
+ .foreach(add => assert(add.getPartitionValues.get("part_a").toLong == 0))
+
+ val partitionFilter2 = new And(
+ new EqualTo(schema.column("part_a"), Literal.of(0L)),
+ new EqualTo(schema.column("part_b"), Literal.of(0L))
+ )
+ val scan2 = snapshot.getScanBuilder().withFilter(partitionFilter2).build()
+ scan2
+ .getTasks
+ .asScala
+ .map(task => task.asInstanceOf[ScanTaskImpl].getAddFile)
+ .foreach { add => assert(
+ add.getPartitionValues.get("part_a").toLong == 0 &&
+ add.getPartitionValues.get("part_b").toLong == 0
+ )
+ }
+ }
+ }
}
diff --git a/kernel/src/main/java/io/delta/core/expressions/And.java b/kernel/src/main/java/io/delta/core/expressions/And.java
new file mode 100644
index 000000000..72a89e020
--- /dev/null
+++ b/kernel/src/main/java/io/delta/core/expressions/And.java
@@ -0,0 +1,46 @@
+package io.delta.core.expressions;
+
+import java.util.Collection;
+
+import io.delta.core.types.BooleanType;
+
+/**
+ * Evaluates logical {@code expr1} AND {@code expr2} for {@code new And(expr1, expr2)}.
+ *
+ * Requires both left and right input expressions evaluate to booleans.
+ */
+public final class And extends BinaryOperator implements Predicate {
+
+ public static And apply(Collection conjunctions) {
+ if (conjunctions.size() == 0) {
+ throw new IllegalArgumentException("And.apply must be called with at least 1 element");
+ }
+
+ return (And) conjunctions
+ .stream()
+ // we start off with And(true, true)
+ // then we get the 1st expression: And(And(true, true), expr1)
+ // then we get the 2nd expression: And(And(true, true), expr1), expr2) etc.
+ .reduce(new And(Literal.TRUE, Literal.TRUE), And::new);
+ }
+
+ public And(Expression left, Expression right) {
+ super(left, right, "&&");
+ if (!(left.dataType() instanceof BooleanType) ||
+ !(right.dataType() instanceof BooleanType)) {
+
+ throw new IllegalArgumentException(
+ String.format(
+ "'And' requires expressions of type boolean. Got %s and %s.",
+ left.dataType().typeName(),
+ right.dataType().typeName()
+ )
+ );
+ }
+ }
+
+ @Override
+ public Object nullSafeEval(Object leftResult, Object rightResult) {
+ return (boolean) leftResult && (boolean) rightResult;
+ }
+}
diff --git a/kernel/src/main/java/io/delta/core/expressions/BinaryComparison.java b/kernel/src/main/java/io/delta/core/expressions/BinaryComparison.java
new file mode 100644
index 000000000..f7b5a4301
--- /dev/null
+++ b/kernel/src/main/java/io/delta/core/expressions/BinaryComparison.java
@@ -0,0 +1,23 @@
+package io.delta.core.expressions;
+
+import java.util.Comparator;
+
+/**
+ * A {@link BinaryOperator} that compares the left and right {@link Expression}s and evaluates to a
+ * boolean value.
+ */
+public abstract class BinaryComparison extends BinaryOperator implements Predicate {
+ private final Comparator comparator;
+
+ protected BinaryComparison(Expression left, Expression right, String symbol) {
+ super(left, right, symbol);
+
+ // super asserted that left and right DataTypes were the same
+
+ comparator = CastingComparator.forDataType(left.dataType());
+ }
+
+ protected int compare(Object leftResult, Object rightResult) {
+ return comparator.compare(leftResult, rightResult);
+ }
+}
diff --git a/kernel/src/main/java/io/delta/core/expressions/BinaryExpression.java b/kernel/src/main/java/io/delta/core/expressions/BinaryExpression.java
new file mode 100644
index 000000000..ac6975fee
--- /dev/null
+++ b/kernel/src/main/java/io/delta/core/expressions/BinaryExpression.java
@@ -0,0 +1,61 @@
+package io.delta.core.expressions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import io.delta.core.data.Row;
+
+/**
+ * An {@link Expression} with two inputs and one output. The output is by default evaluated to null
+ * if either input is evaluated to null.
+ */
+public abstract class BinaryExpression implements Expression {
+ protected final Expression left;
+ protected final Expression right;
+
+ protected BinaryExpression(Expression left, Expression right) {
+ this.left = left;
+ this.right = right;
+ }
+
+ public Expression getLeft() {
+ return left;
+ }
+
+ public Expression getRight() {
+ return right;
+ }
+
+ @Override
+ public final Object eval(Row row) {
+ Object leftResult = left.eval(row);
+ if (null == leftResult) return null;
+
+ Object rightResult = right.eval(row);
+ if (null == rightResult) return null;
+
+ return nullSafeEval(leftResult, rightResult);
+ }
+
+ protected abstract Object nullSafeEval(Object leftResult, Object rightResult);
+
+ @Override
+ public List children() {
+ return Arrays.asList(left, right);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ BinaryExpression that = (BinaryExpression) o;
+ return Objects.equals(left, that.left) &&
+ Objects.equals(right, that.right);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(left, right);
+ }
+}
diff --git a/kernel/src/main/java/io/delta/core/expressions/BinaryOperator.java b/kernel/src/main/java/io/delta/core/expressions/BinaryOperator.java
new file mode 100644
index 000000000..e3fc9661b
--- /dev/null
+++ b/kernel/src/main/java/io/delta/core/expressions/BinaryOperator.java
@@ -0,0 +1,30 @@
+package io.delta.core.expressions;
+
+/**
+ * A {@link BinaryExpression} that is an operator, meaning the string representation is
+ * {@code x symbol y}, rather than {@code funcName(x, y)}.
+ *
+ * Requires both inputs to be of the same data type.
+ */
+public abstract class BinaryOperator extends BinaryExpression {
+ protected final String symbol;
+
+ protected BinaryOperator(Expression left, Expression right, String symbol) {
+ super(left, right);
+ this.symbol = symbol;
+
+ if (!left.dataType().equivalent(right.dataType())) {
+ throw new IllegalArgumentException(
+ String.format(
+ "BinaryOperator left and right DataTypes must be the same. Found %s and %s.",
+ left.dataType().typeName(),
+ right.dataType().typeName())
+ );
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("(%s %s %s)", left.toString(), symbol, right.toString());
+ }
+}
\ No newline at end of file
diff --git a/kernel/src/main/java/io/delta/core/expressions/CastingComparator.java b/kernel/src/main/java/io/delta/core/expressions/CastingComparator.java
new file mode 100644
index 000000000..3d4d22381
--- /dev/null
+++ b/kernel/src/main/java/io/delta/core/expressions/CastingComparator.java
@@ -0,0 +1,42 @@
+package io.delta.core.expressions;
+
+import java.util.Comparator;
+
+import io.delta.core.types.*;
+
+public class CastingComparator> implements Comparator {
+
+ public static Comparator forDataType(DataType dataType) {
+ if (dataType instanceof IntegerType) {
+ return new CastingComparator();
+ }
+
+ if (dataType instanceof BooleanType) {
+ return new CastingComparator();
+ }
+
+ if (dataType instanceof LongType) {
+ return new CastingComparator();
+ }
+
+ if (dataType instanceof StringType) {
+ return new CastingComparator();
+ }
+
+ throw new IllegalArgumentException(
+ String.format("Unsupported DataType: %s", dataType.typeName())
+ );
+ }
+
+ private final Comparator comparator;
+
+ public CastingComparator() {
+ comparator = Comparator.naturalOrder();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compare(Object a, Object b) {
+ return comparator.compare((T) a, (T) b);
+ }
+}
diff --git a/kernel/src/main/java/io/delta/core/expressions/Column.java b/kernel/src/main/java/io/delta/core/expressions/Column.java
new file mode 100644
index 000000000..9d20b2a5d
--- /dev/null
+++ b/kernel/src/main/java/io/delta/core/expressions/Column.java
@@ -0,0 +1,92 @@
+package io.delta.core.expressions;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+import io.delta.core.data.Row;
+import io.delta.core.types.*;
+
+/**
+ * A column whose row-value will be computed based on the data in a {@link io.delta.core.data.Row}.
+ *
+ * It is recommended that you instantiate using an existing table schema
+ * {@link io.delta.core.types.StructType} with {@link StructType#column(int)}.
+ *
+ * Only supports primitive data types, see
+ * Delta Transaction Log Protocol: Primitive Types .
+ */
+public final class Column extends LeafExpression {
+ private final int ordinal;
+ private final String name;
+ private final DataType dataType;
+ private final RowEvaluator evaluator;
+
+ public Column(int ordinal, String name, DataType dataType) {
+ this.ordinal = ordinal;
+ this.name = name;
+ this.dataType = dataType;
+
+ if (dataType instanceof IntegerType) {
+ evaluator = (row -> row.getInt(ordinal));
+ } else if (dataType instanceof BooleanType) {
+ evaluator = (row -> row.getBoolean(ordinal));
+ } else if (dataType instanceof LongType) {
+ evaluator = (row -> row.getLong(ordinal));
+ } else if (dataType instanceof StringType) {
+ evaluator = (row -> row.getString(ordinal));
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "The data type %s of column %s at ordinal %s is not supported",
+ dataType.typeName(),
+ name,
+ ordinal)
+ );
+ }
+ }
+
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Object eval(Row row) {
+ return row.isNullAt(ordinal) ? null : evaluator.nullSafeEval(row);
+ }
+
+ @Override
+ public DataType dataType() {
+ return dataType;
+ }
+
+ @Override
+ public String toString() {
+ return "Column(" + name + ")";
+ }
+
+ @Override
+ public Set references() {
+ return Collections.singleton(name);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Column column = (Column) o;
+ return Objects.equals(ordinal, column.ordinal) &&
+ Objects.equals(name, column.name) &&
+ Objects.equals(dataType, column.dataType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, dataType);
+ }
+
+ @FunctionalInterface
+ private interface RowEvaluator {
+ Object nullSafeEval(Row row);
+ }
+}
diff --git a/kernel/src/main/java/io/delta/core/expressions/EqualTo.java b/kernel/src/main/java/io/delta/core/expressions/EqualTo.java
new file mode 100644
index 000000000..09160c4da
--- /dev/null
+++ b/kernel/src/main/java/io/delta/core/expressions/EqualTo.java
@@ -0,0 +1,17 @@
+package io.delta.core.expressions;
+
+/**
+ * Evaluates {@code expr1} = {@code expr2} for {@code new EqualTo(expr1, expr2)}.
+ */
+public final class EqualTo extends BinaryComparison implements Predicate {
+
+ public EqualTo(Expression left, Expression right) {
+ super(left, right, "=");
+ }
+
+ @Override
+ protected Object nullSafeEval(Object leftResult, Object rightResult) {
+ return compare(leftResult, rightResult) == 0;
+ }
+}
+
diff --git a/kernel/src/main/java/io/delta/core/expressions/Expression.java b/kernel/src/main/java/io/delta/core/expressions/Expression.java
index 4a117af4d..95f9ba48c 100644
--- a/kernel/src/main/java/io/delta/core/expressions/Expression.java
+++ b/kernel/src/main/java/io/delta/core/expressions/Expression.java
@@ -1,4 +1,44 @@
package io.delta.core.expressions;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import io.delta.core.data.Row;
+import io.delta.core.types.DataType;
+
+/**
+ * Generic interface for all Expressions
+ */
public interface Expression {
+
+ /**
+ * @param row the input row to evaluate.
+ * @return the result of evaluating this expression on the given input {@link Row}.
+ */
+ Object eval(Row row);
+
+ /**
+ * @return the {@link DataType} of the result of evaluating this expression.
+ */
+ DataType dataType();
+
+ /**
+ * @return the String representation of this expression.
+ */
+ String toString();
+
+ /**
+ * @return a {@link List} of the immediate children of this node
+ */
+ List children();
+
+ /**
+ * @return the names of columns referenced by this expression.
+ */
+ default Set references() {
+ Set result = new HashSet<>();
+ children().forEach(child -> result.addAll(child.references()));
+ return result;
+ }
}
diff --git a/kernel/src/main/java/io/delta/core/expressions/LeafExpression.java b/kernel/src/main/java/io/delta/core/expressions/LeafExpression.java
new file mode 100644
index 000000000..fb6598bb5
--- /dev/null
+++ b/kernel/src/main/java/io/delta/core/expressions/LeafExpression.java
@@ -0,0 +1,27 @@
+package io.delta.core.expressions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * An {@link Expression} with no children.
+ */
+public abstract class LeafExpression implements Expression {
+
+ protected LeafExpression() {}
+
+ @Override
+ public List children() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Set references() {
+ return Collections.emptySet();
+ }
+
+ public abstract boolean equals(Object o);
+
+ public abstract int hashCode();
+}
diff --git a/kernel/src/main/java/io/delta/core/expressions/Literal.java b/kernel/src/main/java/io/delta/core/expressions/Literal.java
new file mode 100644
index 000000000..e5d53355d
--- /dev/null
+++ b/kernel/src/main/java/io/delta/core/expressions/Literal.java
@@ -0,0 +1,95 @@
+package io.delta.core.expressions;
+
+import java.util.Objects;
+
+import io.delta.core.data.Row;
+import io.delta.core.types.*;
+
+/**
+ * A literal value.
+ *
+ * Only supports primitive data types, see
+ * Delta Transaction Log Protocol: Primitive Types .
+ */
+public final class Literal extends LeafExpression {
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Static Fields / Methods
+ ////////////////////////////////////////////////////////////////////////////////
+
+ public static final Literal TRUE = Literal.of(true);
+ public static final Literal FALSE = Literal.of(false);
+
+ /**
+ * @return a {@link Literal} with data type {@link IntegerType}
+ */
+ public static Literal of(int value) {
+ return new Literal(value, IntegerType.INSTANCE);
+ }
+
+ /**
+ * @return a {@link Literal} with data type {@link BooleanType}
+ */
+ public static Literal of(boolean value) {
+ return new Literal(value, BooleanType.INSTANCE);
+ }
+
+ /**
+ * @return a {@link Literal} with data type {@link LongType}
+ */
+ public static Literal of(long value) {
+ return new Literal(value, LongType.INSTANCE);
+ }
+
+ /**
+ * @return a {@link Literal} with data type {@link StringType}
+ */
+ public static Literal of(String value) {
+ return new Literal(value, StringType.INSTANCE);
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // Instance Fields / Methods
+ ////////////////////////////////////////////////////////////////////////////////
+
+ private final Object value;
+ private final DataType dataType;
+
+ private Literal(Object value, DataType dataType) {
+ this.value = value;
+ this.dataType = dataType;
+ }
+
+ public Object value() {
+ return value;
+ }
+
+ @Override
+ public Object eval(Row record) {
+ return value;
+ }
+
+ @Override
+ public DataType dataType() {
+ return dataType;
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(value);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Literal literal = (Literal) o;
+ return Objects.equals(value, literal.value) &&
+ Objects.equals(dataType, literal.dataType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(value, dataType);
+ }
+}
diff --git a/kernel/src/main/java/io/delta/core/expressions/Predicate.java b/kernel/src/main/java/io/delta/core/expressions/Predicate.java
new file mode 100644
index 000000000..c1664a000
--- /dev/null
+++ b/kernel/src/main/java/io/delta/core/expressions/Predicate.java
@@ -0,0 +1,14 @@
+package io.delta.core.expressions;
+
+import io.delta.core.types.BooleanType;
+import io.delta.core.types.DataType;
+
+/**
+ * An {@link Expression} that defines a relation on inputs. Evaluates to true, false, or null.
+ */
+public interface Predicate extends Expression {
+ @Override
+ default DataType dataType() {
+ return BooleanType.INSTANCE;
+ }
+}
diff --git a/kernel/src/main/java/io/delta/core/internal/ScanBuilderImpl.java b/kernel/src/main/java/io/delta/core/internal/ScanBuilderImpl.java
index 5cd8c914f..f4166e139 100644
--- a/kernel/src/main/java/io/delta/core/internal/ScanBuilderImpl.java
+++ b/kernel/src/main/java/io/delta/core/internal/ScanBuilderImpl.java
@@ -1,31 +1,50 @@
package io.delta.core.internal;
+import java.util.Optional;
+
import io.delta.core.Scan;
import io.delta.core.ScanBuilder;
import io.delta.core.expressions.Expression;
-import io.delta.core.internal.replay.LogReplay;
+import io.delta.core.internal.actions.AddFile;
import io.delta.core.types.StructType;
+import io.delta.core.utils.CloseableIterator;
public class ScanBuilderImpl implements ScanBuilder {
- private final LogReplay logReplay;
+ private final StructType snapshotSchema;
+ private final StructType snapshotPartitionSchema;
+ private final CloseableIterator filesIter;
+
+ private StructType readSchema;
+ private Optional filter;
+
+ public ScanBuilderImpl(
+ StructType snapshotSchema,
+ StructType snapshotPartitionSchema,
+ CloseableIterator filesIter) {
+ this.snapshotSchema = snapshotSchema;
+ this.snapshotPartitionSchema = snapshotPartitionSchema;
+ this.filesIter = filesIter;
- public ScanBuilderImpl(LogReplay logReplay) {
- this.logReplay = logReplay;
+ this.readSchema = snapshotSchema;
+ this.filter = Optional.empty();
}
@Override
- public ScanBuilder withReadSchema(StructType schema) {
- return null;
+ public ScanBuilder withReadSchema(StructType readSchema) {
+ // TODO: validate
+ this.readSchema = readSchema;
+ return this;
}
@Override
- public ScanBuilder withFilter(Expression filter) {
- return null;
+ public ScanBuilder withFilter(Expression expression) {
+ this.filter = Optional.of(expression);
+ return this;
}
@Override
public Scan build() {
- return new ScanImpl(logReplay);
+ return new ScanImpl(snapshotSchema, readSchema, snapshotPartitionSchema, filesIter, filter);
}
}
diff --git a/kernel/src/main/java/io/delta/core/internal/ScanImpl.java b/kernel/src/main/java/io/delta/core/internal/ScanImpl.java
index 273387c81..a74b2d9f3 100644
--- a/kernel/src/main/java/io/delta/core/internal/ScanImpl.java
+++ b/kernel/src/main/java/io/delta/core/internal/ScanImpl.java
@@ -1,39 +1,91 @@
package io.delta.core.internal;
-import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import io.delta.core.Scan;
import io.delta.core.ScanTask;
+import io.delta.core.expressions.Expression;
import io.delta.core.internal.actions.AddFile;
-import io.delta.core.internal.replay.LogReplay;
+import io.delta.core.internal.data.PartitionRow;
+import io.delta.core.internal.lang.FilteredCloseableIterator;
+import io.delta.core.internal.lang.Tuple2;
+import io.delta.core.internal.util.PartitionUtils;
+import io.delta.core.types.StructType;
import io.delta.core.utils.CloseableIterator;
public class ScanImpl implements Scan {
- private final LogReplay logReplay;
+ /** Complete schema of the snapshot to be scanned. */
+ private final StructType snapshotSchema;
- public ScanImpl(LogReplay logReplay) {
- this.logReplay = logReplay;
+ /** Schema that we actually want to read. */
+ private final StructType readSchema;
+
+ /** Partition schema. */
+ private final StructType snapshotPartitionSchema;
+
+ private final CloseableIterator filesIter;
+
+ /** Mapping from partitionColumnName to its ordinal in the `snapshotSchema`. */
+ private final Map partitionColumnOrdinals;
+
+ private final Optional metadataFilterConjunction;
+ private final Optional dataFilterConjunction;
+
+ public ScanImpl(
+ StructType snapshotSchema,
+ StructType readSchema,
+ StructType snapshotPartitionSchema,
+ CloseableIterator filesIter,
+ Optional filter) {
+ this.snapshotSchema = snapshotSchema;
+ this.readSchema = readSchema;
+ this.snapshotPartitionSchema = snapshotPartitionSchema;
+ this.filesIter = filesIter;
+ this.partitionColumnOrdinals = PartitionUtils.getPartitionOrdinals(snapshotSchema, snapshotPartitionSchema);
+
+ if (filter.isPresent()) {
+ final List partitionColumns = snapshotPartitionSchema.fieldNames();
+ final Tuple2, Optional> metadataAndDataConjunctions =
+ PartitionUtils.splitMetadataAndDataPredicates(filter.get(), partitionColumns);
+
+ this.metadataFilterConjunction = metadataAndDataConjunctions._1;
+ this.dataFilterConjunction = metadataAndDataConjunctions._2;
+ } else {
+ this.metadataFilterConjunction = Optional.empty();
+ this.dataFilterConjunction = Optional.empty();
+ }
+
+ System.out.println("ScanImpl: snapshotSchema: " + snapshotSchema.fields());
+ System.out.println("ScanImpl: readSchema: " + readSchema.fields());
+ System.out.println("ScanImpl: snapshotPartitionSchema: " + snapshotPartitionSchema.fields());
+ System.out.println("ScanImpl: partitionColumnOrdinals: " + partitionColumnOrdinals.toString());
+
+ System.out.println("ScanImpl: snapshotPartitionSchema: " + snapshotPartitionSchema.fields());
+ System.out.println("ScanImpl: metadataFilterConjunction: " + metadataFilterConjunction.toString());
+ System.out.println("ScanImpl: dataFilterConjunction: " + dataFilterConjunction.toString());
}
@Override
public CloseableIterator getTasks() {
- return new CloseableIterator() {
- final CloseableIterator addFileIter = logReplay.getAddFiles();
-
+ return new FilteredCloseableIterator(filesIter) {
@Override
- public void close() throws IOException {
- addFileIter.close();
- }
+ protected Optional accept(AddFile addFile) {
+ if (!metadataFilterConjunction.isPresent()) {
+ return Optional.of(new ScanTaskImpl(addFile));
+ }
- @Override
- public boolean hasNext() {
- return addFileIter.hasNext();
- }
+ // Perform Partition Pruning
+ final PartitionRow row = new PartitionRow(partitionColumnOrdinals, addFile.getPartitionValues());
+ final boolean accept = (boolean) metadataFilterConjunction.get().eval(row);
- @Override
- public ScanTask next() {
- return new ScanTaskImpl(addFileIter.next());
+ if (accept) {
+ return Optional.of(new ScanTaskImpl(addFile));
+ }
+
+ return Optional.empty();
}
};
}
diff --git a/kernel/src/main/java/io/delta/core/internal/ScanTaskImpl.java b/kernel/src/main/java/io/delta/core/internal/ScanTaskImpl.java
index a2a7a92a9..a3694e22d 100644
--- a/kernel/src/main/java/io/delta/core/internal/ScanTaskImpl.java
+++ b/kernel/src/main/java/io/delta/core/internal/ScanTaskImpl.java
@@ -7,12 +7,19 @@
public class ScanTaskImpl implements ScanTask {
+ private final AddFile addFile;
+
public ScanTaskImpl(AddFile addFile) {
- System.out.println("Created ScanTaskImpl for AddFile " + addFile.getPath());
+ this.addFile = addFile;
}
@Override
public CloseableIterator getData() {
return null;
}
+
+ /** Visible for testing */
+ public AddFile getAddFile() {
+ return this.addFile;
+ }
}
diff --git a/kernel/src/main/java/io/delta/core/internal/SnapshotImpl.java b/kernel/src/main/java/io/delta/core/internal/SnapshotImpl.java
index 513eb559c..882acc427 100644
--- a/kernel/src/main/java/io/delta/core/internal/SnapshotImpl.java
+++ b/kernel/src/main/java/io/delta/core/internal/SnapshotImpl.java
@@ -50,18 +50,26 @@ public long getVersion() {
@Override
public StructType getSchema() {
- return protocolAndMetadata.get()._2.getSchema();
+ return getMetadata().getSchema();
}
@Override
public ScanBuilder getScanBuilder() {
- return new ScanBuilderImpl(logReplay);
+ return new ScanBuilderImpl(
+ getSchema(),
+ getMetadata().getPartitionSchema(),
+ logReplay.getAddFiles()
+ );
}
////////////////////////////////////////
// Internal APIs
////////////////////////////////////////
+ public Metadata getMetadata() {
+ return protocolAndMetadata.get()._2;
+ }
+
public CloseableIterator getAddFiles() {
return logReplay.getAddFiles();
}
diff --git a/kernel/src/main/java/io/delta/core/internal/actions/AddFile.java b/kernel/src/main/java/io/delta/core/internal/actions/AddFile.java
index d6fe86619..b3ab259ce 100644
--- a/kernel/src/main/java/io/delta/core/internal/actions/AddFile.java
+++ b/kernel/src/main/java/io/delta/core/internal/actions/AddFile.java
@@ -1,5 +1,6 @@
package io.delta.core.internal.actions;
+import java.util.Collections;
import java.util.Map;
import java.util.Optional;
@@ -61,6 +62,18 @@ public AddFile copyWithDataChange(boolean dataChange) {
return this; // TODO
}
+ public Map getPartitionValues() {
+ return Collections.unmodifiableMap(partitionValues);
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public long getModificationTime() {
+ return modificationTime;
+ }
+
@Override
public String toString() {
return "AddFile{" +
diff --git a/kernel/src/main/java/io/delta/core/internal/actions/Metadata.java b/kernel/src/main/java/io/delta/core/internal/actions/Metadata.java
index 06c46b78b..00985f91b 100644
--- a/kernel/src/main/java/io/delta/core/internal/actions/Metadata.java
+++ b/kernel/src/main/java/io/delta/core/internal/actions/Metadata.java
@@ -1,5 +1,8 @@
package io.delta.core.internal.actions;
+import java.util.List;
+import java.util.stream.Collectors;
+
import io.delta.core.data.Row;
import io.delta.core.helpers.TableHelper;
import io.delta.core.types.*;
@@ -17,10 +20,11 @@ public static Metadata fromRow(Row row, TableHelper tableHelper) {
final String description = row.getString(2);
final Format format = Format.fromRow(row.getRecord(3));
final String schemaJson = row.getString(4);
+ final List partitionColumns = row.getList(5);
Row schemaRow = tableHelper.parseJson(schemaJson, StructType.READ_SCHEMA);
StructType schema = StructType.fromRow(schemaRow);
- return new Metadata(schema);
+ return new Metadata(schema, partitionColumns);
}
public static final StructType READ_SCHEMA = new StructType()
@@ -47,12 +51,24 @@ public static Metadata fromRow(Row row, TableHelper tableHelper) {
// createdTime
private final StructType schema;
+ private final List partitionColumns;
- public Metadata(StructType schema) {
+ public Metadata(StructType schema, List partitionColumns) {
this.schema = schema;
+ this.partitionColumns = partitionColumns;
}
public StructType getSchema() {
return schema;
}
+
+ public List getPartitionColumns() {
+ return partitionColumns;
+ }
+
+ public StructType getPartitionSchema() {
+ return new StructType(
+ partitionColumns.stream().map(schema::get).collect(Collectors.toList())
+ );
+ }
}
diff --git a/kernel/src/main/java/io/delta/core/internal/data/PartitionRow.java b/kernel/src/main/java/io/delta/core/internal/data/PartitionRow.java
new file mode 100644
index 000000000..f7f44dd78
--- /dev/null
+++ b/kernel/src/main/java/io/delta/core/internal/data/PartitionRow.java
@@ -0,0 +1,69 @@
+package io.delta.core.internal.data;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.delta.core.data.Row;
+
+/**
+ * The type of Row that will be evaluated against {@link io.delta.core.expressions.Column}s.
+ *
+ * These Columns must be partition columns, and will have ordinals matching the latest snapshot
+ * schema.
+ */
+public class PartitionRow implements Row {
+
+ private final Map ordinalToValue;
+
+ public PartitionRow(Map partitionOrdinals, Map partitionValuesMap) {
+ this.ordinalToValue = new HashMap<>();
+
+ for (Map.Entry entry : partitionOrdinals.entrySet()) {
+ final String partitionColumnName = entry.getKey();
+ final int partitionColumnOrdinal = entry.getValue();
+ final String partitionColumnValue = partitionValuesMap.get(partitionColumnName);
+ ordinalToValue.put(partitionColumnOrdinal, partitionColumnValue);
+ }
+ }
+
+ @Override
+ public boolean isNullAt(int ordinal) {
+ return ordinalToValue.get(ordinal) == null;
+ }
+
+ @Override
+ public boolean getBoolean(int ordinal) {
+ return Boolean.parseBoolean(ordinalToValue.get(ordinal));
+ }
+
+ @Override
+ public int getInt(int ordinal) {
+ return Integer.parseInt(ordinalToValue.get(ordinal));
+ }
+
+ @Override
+ public long getLong(int ordinal) {
+ return Long.parseLong(ordinalToValue.get(ordinal));
+ }
+
+ @Override
+ public String getString(int ordinal) {
+ return ordinalToValue.get(ordinal);
+ }
+
+ @Override
+ public Row getRecord(int ordinal) {
+ throw new UnsupportedOperationException("Partition values can't be StructTypes");
+ }
+
+ @Override
+ public List getList(int ordinal) {
+ throw new UnsupportedOperationException("Partition values can't be Lists");
+ }
+
+ @Override
+ public Map getMap(int ordinal) {
+ throw new UnsupportedOperationException("Partition values can't be Maps");
+ }
+}
diff --git a/kernel/src/main/java/io/delta/core/internal/lang/FilteredCloseableIterator.java b/kernel/src/main/java/io/delta/core/internal/lang/FilteredCloseableIterator.java
new file mode 100644
index 000000000..533e99eb8
--- /dev/null
+++ b/kernel/src/main/java/io/delta/core/internal/lang/FilteredCloseableIterator.java
@@ -0,0 +1,62 @@
+package io.delta.core.internal.lang;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+
+import io.delta.core.utils.CloseableIterator;
+
+public abstract class FilteredCloseableIterator implements CloseableIterator {
+
+ private final CloseableIterator iter;
+ private Optional nextValid;
+ private boolean closed;
+
+ public FilteredCloseableIterator(CloseableIterator iter) {
+ this.iter = iter;
+ this.nextValid = Optional.empty();
+ this.closed = false;
+ }
+
+ protected abstract Optional accept(ITER_TYPE element);
+
+ @Override
+ public final boolean hasNext() {
+ if (closed) {
+ throw new IllegalStateException("Can't call `hasNext` on a closed iterator.");
+ }
+ if (!nextValid.isPresent()) {
+ nextValid = findNextValid();
+ }
+ return nextValid.isPresent();
+ }
+
+ @Override
+ public final RETURN_TYPE next() {
+ if (closed) {
+ throw new IllegalStateException("Can't call `next` on a closed iterator.");
+ }
+ if (!hasNext()) throw new NoSuchElementException();
+
+ // By the definition of hasNext, we know that nextValid is non-empty
+
+ final RETURN_TYPE ret = nextValid.get();
+ nextValid = Optional.empty();
+ return ret;
+ }
+
+ @Override
+ public final void close() throws IOException {
+ iter.close();
+ this.closed = true;
+ }
+
+ private Optional findNextValid() {
+ while (iter.hasNext()) {
+ final Optional acceptedElementOpt = accept(iter.next());
+ if (acceptedElementOpt.isPresent()) return acceptedElementOpt;
+ }
+
+ return Optional.empty();
+ }
+}
diff --git a/kernel/src/main/java/io/delta/core/internal/replay/ReverseActionsToAddFilesIterator.java b/kernel/src/main/java/io/delta/core/internal/replay/ReverseActionsToAddFilesIterator.java
index f72e5264e..5dc7870f0 100644
--- a/kernel/src/main/java/io/delta/core/internal/replay/ReverseActionsToAddFilesIterator.java
+++ b/kernel/src/main/java/io/delta/core/internal/replay/ReverseActionsToAddFilesIterator.java
@@ -1,18 +1,18 @@
package io.delta.core.internal.replay;
-import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
-import java.util.NoSuchElementException;
import java.util.Optional;
import io.delta.core.internal.actions.Action;
import io.delta.core.internal.actions.AddFile;
import io.delta.core.internal.actions.RemoveFile;
+import io.delta.core.internal.lang.FilteredCloseableIterator;
import io.delta.core.internal.lang.Tuple2;
import io.delta.core.utils.CloseableIterator;
-public class ReverseActionsToAddFilesIterator implements CloseableIterator {
+public class ReverseActionsToAddFilesIterator
+ extends FilteredCloseableIterator> {
private final CloseableIterator> reverseActionIter;
@@ -20,74 +20,55 @@ public class ReverseActionsToAddFilesIterator implements CloseableIterator addFilesFromJson;
- private Optional nextValid;
-
public ReverseActionsToAddFilesIterator(CloseableIterator> reverseActionIter) {
+ super(reverseActionIter);
+
this.reverseActionIter = reverseActionIter;
this.tombstonesFromJson = new HashMap<>();
this.addFilesFromJson = new HashMap<>();
- this.nextValid = Optional.empty();
}
@Override
- public boolean hasNext() {
- if (!nextValid.isPresent()) {
- nextValid = findNextValid();
- }
-
- return nextValid.isPresent();
- }
-
- @Override
- public AddFile next() {
- if (!hasNext()) throw new NoSuchElementException();
-
- // By the definition of hasNext, we know that actionsIter is non-empty
+ protected Optional accept(Tuple2 element) {
+ final Action action = element._1;
+ final boolean isFromCheckpoint = element._2;
+
+ if (action instanceof AddFile) {
+ final AddFile add = ((AddFile) action).copyWithDataChange(false);
+ final UniqueFileActionTuple key =
+ new UniqueFileActionTuple(add.toURI(), add.getDeletionVectorUniqueId());
+ final boolean alreadyDeleted = tombstonesFromJson.containsKey(key);
+ final boolean alreadyReturned = addFilesFromJson.containsKey(key);
+
+ if (!alreadyReturned) {
+ // Note: No AddFile will appear twice in a checkpoint, so we only need
+ // non-checkpoint AddFiles in the set
+ if (!isFromCheckpoint) {
+ addFilesFromJson.put(key, add);
+ }
- final AddFile ret = nextValid.get();
- nextValid = Optional.empty();
- return ret;
- }
+ if (!alreadyDeleted) {
+ return Optional.of(add);
+ }
+ }
+ } else if (action instanceof RemoveFile && !isFromCheckpoint) {
+ // Note: There's no reason to put a RemoveFile from a checkpoint into tombstones map
+ // since, when we generate a checkpoint, any corresponding AddFile would have
+ // been excluded
+ final RemoveFile remove = ((RemoveFile) action).copyWithDataChange(false);
+ final UniqueFileActionTuple key =
+ new UniqueFileActionTuple(remove.toURI(), remove.getDeletionVectorUniqueId());
+
+ tombstonesFromJson.put(key, remove);
+ }
- @Override
- public void close() throws IOException {
- reverseActionIter.close();
+ return Optional.empty();
}
private Optional findNextValid() {
while (reverseActionIter.hasNext()) {
final Tuple2 tuple = reverseActionIter.next();
- final Action action = tuple._1;
- final boolean isFromCheckpoint = tuple._2;
-
- if (action instanceof AddFile) {
- final AddFile add = ((AddFile) action).copyWithDataChange(false);
- final UniqueFileActionTuple key =
- new UniqueFileActionTuple(add.toURI(), add.getDeletionVectorUniqueId());
- final boolean alreadyDeleted = tombstonesFromJson.containsKey(key);
- final boolean alreadyReturned = addFilesFromJson.containsKey(key);
-
- if (!alreadyReturned) {
- // Note: No AddFile will appear twice in a checkpoint, so we only need
- // non-checkpoint AddFiles in the set
- if (!isFromCheckpoint) {
- addFilesFromJson.put(key, add);
- }
-
- if (!alreadyDeleted) {
- return Optional.of(add);
- }
- }
- } else if (action instanceof RemoveFile && !isFromCheckpoint) {
- // Note: There's no reason to put a RemoveFile from a checkpoint into tombstones map
- // since, when we generate a checkpoint, any corresponding AddFile would have
- // been excluded
- final RemoveFile remove = ((RemoveFile) action).copyWithDataChange(false);
- final UniqueFileActionTuple key =
- new UniqueFileActionTuple(remove.toURI(), remove.getDeletionVectorUniqueId());
-
- tombstonesFromJson.put(key, remove);
- }
+
}
return Optional.empty();
diff --git a/kernel/src/main/java/io/delta/core/internal/util/PartitionUtils.java b/kernel/src/main/java/io/delta/core/internal/util/PartitionUtils.java
new file mode 100644
index 000000000..e89aaa6a2
--- /dev/null
+++ b/kernel/src/main/java/io/delta/core/internal/util/PartitionUtils.java
@@ -0,0 +1,83 @@
+package io.delta.core.internal.util;
+
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import io.delta.core.expressions.And;
+import io.delta.core.expressions.Expression;
+import io.delta.core.internal.lang.ListUtils;
+import io.delta.core.internal.lang.Tuple2;
+import io.delta.core.types.StructType;
+
+public class PartitionUtils {
+
+ private PartitionUtils() { }
+
+ public static Map getPartitionOrdinals(
+ StructType snapshotSchema,
+ StructType partitionSchema) {
+ final Map output = new HashMap<>();
+ partitionSchema
+ .fieldNames()
+ .forEach(fieldName -> output.put(fieldName, snapshotSchema.indexOf(fieldName)));
+
+ return output;
+ }
+
+ /**
+ * Partition the given condition into two optional conjunctive predicates M, D such that
+ * condition = M AND D, where we define:
+ * - M: conjunction of predicates that can be evaluated using metadata only.
+ * - D: conjunction of other predicates.
+ */
+ public static Tuple2, Optional> splitMetadataAndDataPredicates(
+ Expression condition,
+ List partitionColumns) {
+ final Tuple2, List> metadataAndDataPredicates = ListUtils
+ .partition(
+ splitConjunctivePredicates(condition),
+ c -> isPredicateMetadataOnly(c, partitionColumns)
+ );
+
+ final Optional metadataConjunction;
+ if (metadataAndDataPredicates._1.isEmpty()) {
+ metadataConjunction = Optional.empty();
+ } else {
+ metadataConjunction = Optional.of(And.apply(metadataAndDataPredicates._1));
+ }
+
+ final Optional dataConjunction;
+ if (metadataAndDataPredicates._2.isEmpty()) {
+ dataConjunction = Optional.empty();
+ } else {
+ dataConjunction = Optional.of(And.apply(metadataAndDataPredicates._2));
+ }
+ return new Tuple2<>(metadataConjunction, dataConjunction);
+ }
+
+ private static List splitConjunctivePredicates(Expression condition) {
+ if (condition instanceof And) {
+ final And andExpr = (And) condition;
+ return Stream.concat(
+ splitConjunctivePredicates(andExpr.getLeft()).stream(),
+ splitConjunctivePredicates(andExpr.getRight()).stream()
+ ).collect(Collectors.toList());
+ }
+ return Collections.singletonList(condition);
+ }
+
+ private static boolean isPredicateMetadataOnly(
+ Expression condition,
+ List partitionColumns) {
+ Set lowercasePartCols = partitionColumns
+ .stream().map(s -> s.toLowerCase(Locale.ROOT))
+ .collect(Collectors.toSet());
+
+ return condition
+ .references()
+ .stream()
+ .map(s -> s.toLowerCase(Locale.ROOT))
+ .allMatch(lowercasePartCols::contains);
+ }
+}
diff --git a/kernel/src/main/java/io/delta/core/types/DataType.java b/kernel/src/main/java/io/delta/core/types/DataType.java
index 920eb92b3..ec525cc80 100644
--- a/kernel/src/main/java/io/delta/core/types/DataType.java
+++ b/kernel/src/main/java/io/delta/core/types/DataType.java
@@ -20,5 +20,21 @@ public String typeName() {
}
return name.toLowerCase(Locale.ROOT);
}
+ public boolean equivalent(DataType dt) {
+ return this.equals(dt);
+ }
+
+ @Override
+ public String toString() {
+ return typeName();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ DataType that = (DataType) o;
+ return typeName().equals(that.typeName());
+ }
}
diff --git a/kernel/src/main/java/io/delta/core/types/StructField.java b/kernel/src/main/java/io/delta/core/types/StructField.java
index 9c7dd37c8..a2f147491 100644
--- a/kernel/src/main/java/io/delta/core/types/StructField.java
+++ b/kernel/src/main/java/io/delta/core/types/StructField.java
@@ -24,9 +24,9 @@ public static StructField fromRow(Row row) {
// Instance Fields / Methods
////////////////////////////////////////////////////////////////////////////////
- public final String name;
- public final DataType dataType;
- public final boolean nullable;
+ private final String name;
+ private final DataType dataType;
+ private final boolean nullable;
// private final FieldMetadata metadata;
public StructField(String name, DataType dataType, boolean nullable) {
@@ -35,8 +35,20 @@ public StructField(String name, DataType dataType, boolean nullable) {
this.nullable = nullable;
}
+ public String getName() {
+ return name;
+ }
+
+ public DataType getDataType() {
+ return dataType;
+ }
+
+ public boolean isNullable() {
+ return nullable;
+ }
+
@Override
public String toString() {
- return String.format("StructField(%s,%s,%s)", name, dataType, nullable);
+ return String.format("StructField(name=%s,type=%s,nullable=%s)", name, dataType, nullable);
}
}
diff --git a/kernel/src/main/java/io/delta/core/types/StructType.java b/kernel/src/main/java/io/delta/core/types/StructType.java
index 0b8b2caf3..265644566 100644
--- a/kernel/src/main/java/io/delta/core/types/StructType.java
+++ b/kernel/src/main/java/io/delta/core/types/StructType.java
@@ -1,11 +1,11 @@
package io.delta.core.types;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
import java.util.stream.Collectors;
import io.delta.core.data.Row;
+import io.delta.core.expressions.Column;
+import io.delta.core.internal.lang.Tuple2;
public final class StructType extends DataType {
@@ -32,7 +32,9 @@ public static StructType fromRow(Row row) {
// Instance Fields / Methods
////////////////////////////////////////////////////////////////////////////////
+ private final Map> nameToFieldAndOrdinal;
private final List fields;
+ private final List fieldNames;
public StructType() {
this(new ArrayList<>());
@@ -40,6 +42,12 @@ public StructType() {
public StructType(List fields) {
this.fields = fields;
+ this.fieldNames = fields.stream().map(f -> f.getName()).collect(Collectors.toList());
+
+ this.nameToFieldAndOrdinal = new HashMap<>();
+ for (int i = 0; i < fields.size(); i++) {
+ nameToFieldAndOrdinal.put(fields.get(i).getName(), new Tuple2<>(fields.get(i), i));
+ }
}
public StructType add(StructField field) {
@@ -58,17 +66,43 @@ public List fields() {
}
public List fieldNames() {
- return fields.stream().map(f -> f.name).collect(Collectors.toList());
+ return fieldNames;
}
public int length() {
return fields.size();
}
+ public int indexOf(String fieldName) {
+ return fieldNames.indexOf(fieldName);
+ }
+
+ public StructField get(String fieldName) {
+ return nameToFieldAndOrdinal.get(fieldName)._1;
+ }
+
public StructField at(int index) {
return fields.get(index);
}
+ /**
+ * Creates a {@link io.delta.core.expressions.Column} expression for the field with the given
+ * {@code fieldName}.
+ *
+ * @param ordinal the ordinal of the {@link StructField} to create a column for
+ * @return a {@link Column} expression for the {@link StructField} with name {@code fieldName}
+ */
+ public Column column(int ordinal) {
+ final StructField field = at(ordinal);
+ return new Column(ordinal, field.getName(), field.getDataType());
+ }
+
+ public Column column(String fieldName) {
+ Tuple2 fieldAndOrdinal = nameToFieldAndOrdinal.get(fieldName);
+ System.out.println("Created column " + fieldName + " with ordinal " + fieldAndOrdinal._2);
+ return new Column(fieldAndOrdinal._2, fieldName, fieldAndOrdinal._1.getDataType());
+ }
+
@Override
public String toString() {
return String.format(