Skip to content

Commit cc08ff2

Browse files
committed
Spark 4.0: Implement SupportsReportOrdering DSv2 API
1 parent 296a06d commit cc08ff2

15 files changed

+1805
-6
lines changed

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,14 @@ public boolean preserveDataGrouping() {
268268
.parse();
269269
}
270270

271+
public boolean preserveDataOrdering() {
272+
return confParser
273+
.booleanConf()
274+
.sessionConf(SparkSQLProperties.PRESERVE_DATA_ORDERING)
275+
.defaultValue(SparkSQLProperties.PRESERVE_DATA_ORDERING_DEFAULT)
276+
.parse();
277+
}
278+
271279
public boolean aggregatePushDownEnabled() {
272280
return confParser
273281
.booleanConf()

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ private SparkSQLProperties() {}
4343
"spark.sql.iceberg.planning.preserve-data-grouping";
4444
public static final boolean PRESERVE_DATA_GROUPING_DEFAULT = false;
4545

46+
// Controls whether to preserve data ordering and report it to Spark
47+
public static final String PRESERVE_DATA_ORDERING =
48+
"spark.sql.iceberg.planning.preserve-data-ordering";
49+
public static final boolean PRESERVE_DATA_ORDERING_DEFAULT = false;
50+
4651
// Controls whether to push down aggregate (MAX/MIN/COUNT) to Iceberg
4752
public static final String AGGREGATE_PUSH_DOWN_ENABLED =
4853
"spark.sql.iceberg.aggregate-push-down.enabled";
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.spark.source;
20+
21+
import java.util.Comparator;
22+
import org.apache.iceberg.Schema;
23+
import org.apache.iceberg.SortOrder;
24+
import org.apache.iceberg.SortOrderComparators;
25+
import org.apache.iceberg.StructLike;
26+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
27+
import org.apache.spark.sql.catalyst.InternalRow;
28+
import org.apache.spark.sql.types.StructType;
29+
30+
/**
31+
* A comparator for Spark {@link InternalRow} objects based on an Iceberg {@link SortOrder}.
32+
*
33+
* <p>This comparator adapts Spark's InternalRow to Iceberg's StructLike interface and delegates to
34+
* Iceberg's existing {@link SortOrderComparators} infrastructure, which provides full support for:
35+
*
36+
* <ul>
37+
* <li>All Iceberg data types
38+
* <li>ASC/DESC sort directions
39+
* <li>NULLS_FIRST/NULLS_LAST null ordering
40+
* <li>Transform functions (identity, bucket, truncate, etc.)
41+
* </ul>
42+
*
43+
* <p><strong>This class is NOT thread-safe.</strong>
44+
*/
45+
class InternalRowComparator implements Comparator<InternalRow> {
46+
private final Comparator<StructLike> delegate;
47+
private final InternalRowWrapper leftWrapper;
48+
private final InternalRowWrapper rightWrapper;
49+
50+
/**
51+
* Creates a comparator for the given sort order and schemas.
52+
*
53+
* @param sortOrder the Iceberg sort order to use for comparison
54+
* @param sparkSchema the Spark schema of the rows to compare
55+
* @param icebergSchema the Iceberg schema of the rows to compare
56+
*/
57+
InternalRowComparator(SortOrder sortOrder, StructType sparkSchema, Schema icebergSchema) {
58+
Preconditions.checkArgument(
59+
sortOrder.isSorted(), "Cannot create comparator for unsorted order");
60+
Preconditions.checkNotNull(sparkSchema, "Spark schema cannot be null");
61+
Preconditions.checkNotNull(icebergSchema, "Iceberg schema cannot be null");
62+
63+
this.delegate = SortOrderComparators.forSchema(icebergSchema, sortOrder);
64+
this.leftWrapper = new InternalRowWrapper(sparkSchema, icebergSchema.asStruct());
65+
this.rightWrapper = new InternalRowWrapper(sparkSchema, icebergSchema.asStruct());
66+
}
67+
68+
@Override
69+
public int compare(InternalRow row1, InternalRow row2) {
70+
return delegate.compare(leftWrapper.wrap(row1), rightWrapper.wrap(row2));
71+
}
72+
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.spark.source;
20+
21+
import java.io.IOException;
22+
import java.util.Comparator;
23+
import java.util.List;
24+
import java.util.stream.Collectors;
25+
import org.apache.iceberg.Schema;
26+
import org.apache.iceberg.SortOrder;
27+
import org.apache.iceberg.io.CloseableIterable;
28+
import org.apache.iceberg.io.CloseableIterator;
29+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
30+
import org.apache.iceberg.util.SortedMerge;
31+
import org.apache.spark.sql.catalyst.InternalRow;
32+
import org.apache.spark.sql.connector.read.PartitionReader;
33+
import org.apache.spark.sql.types.StructType;
34+
35+
/**
36+
* A {@link PartitionReader} that performs a k-way merge of multiple sorted readers.
37+
*
38+
* <p>This reader takes multiple {@link PartitionReader}s (one per file), each producing sorted data
39+
* according to the same {@link SortOrder}, and merges them into a single sorted stream using
40+
* Iceberg's {@link SortedMerge} utility.
41+
*
42+
* <p>The merge is performed using a priority queue (heap) to efficiently select the next row from
43+
* among all readers, maintaining the sort order with O(log k) comparisons per row, where k is the
44+
* number of files being merged.
45+
*
46+
* @param <T> the type of InternalRow being read
47+
*/
48+
class MergingPartitionReader<T extends InternalRow> implements PartitionReader<T> {
49+
private final List<PartitionReader<T>> readers;
50+
private final CloseableIterator<T> mergedIterator;
51+
private T current = null;
52+
private boolean closed = false;
53+
54+
MergingPartitionReader(
55+
List<PartitionReader<T>> readers,
56+
SortOrder sortOrder,
57+
StructType sparkSchema,
58+
Schema icebergSchema) {
59+
Preconditions.checkNotNull(readers, "Readers cannot be null");
60+
Preconditions.checkArgument(!readers.isEmpty(), "Readers cannot be empty");
61+
Preconditions.checkNotNull(sortOrder, "Sort order cannot be null");
62+
Preconditions.checkArgument(sortOrder.isSorted(), "Sort order must be sorted");
63+
64+
this.readers = readers;
65+
66+
Comparator<T> comparator =
67+
(Comparator<T>) new InternalRowComparator(sortOrder, sparkSchema, icebergSchema);
68+
69+
List<CloseableIterable<T>> iterables =
70+
readers.stream().map(this::readerToIterable).collect(Collectors.toList());
71+
72+
SortedMerge<T> sortedMerge = new SortedMerge<>(comparator, iterables);
73+
this.mergedIterator = sortedMerge.iterator();
74+
}
75+
76+
/** Converts a PartitionReader to a CloseableIterable for use with SortedMerge. */
77+
private CloseableIterable<T> readerToIterable(PartitionReader<T> reader) {
78+
return new CloseableIterable<T>() {
79+
@Override
80+
public CloseableIterator<T> iterator() {
81+
return new CloseableIterator<T>() {
82+
private boolean advanced = false;
83+
private boolean hasNext = false;
84+
85+
@Override
86+
public boolean hasNext() {
87+
if (!advanced) {
88+
try {
89+
hasNext = reader.next();
90+
advanced = true;
91+
} catch (IOException e) {
92+
throw new RuntimeException("Failed to advance reader", e);
93+
}
94+
}
95+
return hasNext;
96+
}
97+
98+
@Override
99+
public T next() {
100+
if (!advanced) {
101+
hasNext();
102+
}
103+
advanced = false;
104+
// Spark readers reuse InternalRow objects for performance (see
105+
// SparkParquetReaders.java:547)
106+
// Return a copy of the row to avoid corruption.
107+
return (T) reader.get().copy();
108+
}
109+
110+
@Override
111+
public void close() throws IOException {
112+
reader.close();
113+
}
114+
};
115+
}
116+
117+
@Override
118+
public void close() throws IOException {
119+
reader.close();
120+
}
121+
};
122+
}
123+
124+
@Override
125+
public boolean next() throws IOException {
126+
if (mergedIterator.hasNext()) {
127+
this.current = mergedIterator.next();
128+
return true;
129+
}
130+
return false;
131+
}
132+
133+
@Override
134+
public T get() {
135+
return current;
136+
}
137+
138+
@Override
139+
public void close() throws IOException {
140+
if (closed) {
141+
return;
142+
}
143+
144+
try {
145+
mergedIterator.close();
146+
} finally {
147+
closed = true;
148+
}
149+
}
150+
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.spark.source;
20+
21+
import java.io.IOException;
22+
import java.util.Arrays;
23+
import java.util.List;
24+
import java.util.stream.Collectors;
25+
import org.apache.iceberg.BaseScanTaskGroup;
26+
import org.apache.iceberg.FileScanTask;
27+
import org.apache.iceberg.ScanTaskGroup;
28+
import org.apache.iceberg.Schema;
29+
import org.apache.iceberg.SortOrder;
30+
import org.apache.iceberg.Table;
31+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
32+
import org.apache.iceberg.spark.SparkSchemaUtil;
33+
import org.apache.iceberg.util.SnapshotUtil;
34+
import org.apache.spark.sql.catalyst.InternalRow;
35+
import org.apache.spark.sql.connector.metric.CustomTaskMetric;
36+
import org.apache.spark.sql.connector.read.PartitionReader;
37+
import org.apache.spark.sql.types.StructType;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
40+
41+
/**
42+
* A {@link PartitionReader} that reads multiple sorted files and merges them into a single sorted
43+
* stream.
44+
*
45+
* <p>This reader is used when {@code preserve-data-ordering} is enabled and the task group contains
46+
* multiple files that all have the same sort order. It creates one {@link RowDataReader} per file
47+
* and uses {@link MergingPartitionReader} to perform a k-way merge.
48+
*/
49+
class MergingSortedRowDataReader implements PartitionReader<InternalRow> {
50+
private static final Logger LOG = LoggerFactory.getLogger(MergingSortedRowDataReader.class);
51+
52+
private final MergingPartitionReader<InternalRow> mergingReader;
53+
private final List<RowDataReader> fileReaders;
54+
55+
MergingSortedRowDataReader(SparkInputPartition partition, int reportableSortOrderId) {
56+
Table table = partition.table();
57+
ScanTaskGroup<FileScanTask> taskGroup = partition.taskGroup();
58+
Schema tableSchema = SnapshotUtil.schemaFor(table, partition.branch());
59+
Schema expectedSchema = partition.expectedSchema();
60+
61+
Preconditions.checkArgument(
62+
reportableSortOrderId > 0, "Invalid sort order ID: %s", reportableSortOrderId);
63+
Preconditions.checkArgument(
64+
taskGroup.tasks().size() > 1,
65+
"Merging reader requires multiple files, got %s",
66+
taskGroup.tasks().size());
67+
68+
LOG.info(
69+
"Creating merging reader for {} files with sort order ID {} in table {}",
70+
taskGroup.tasks().size(),
71+
reportableSortOrderId,
72+
table.name());
73+
74+
SortOrder sortOrder = table.sortOrders().get(reportableSortOrderId);
75+
Preconditions.checkNotNull(
76+
sortOrder,
77+
"Cannot find sort order with ID %s in table %s",
78+
reportableSortOrderId,
79+
table.name());
80+
81+
this.fileReaders =
82+
taskGroup.tasks().stream()
83+
.map(
84+
task -> {
85+
ScanTaskGroup<FileScanTask> singleTaskGroup =
86+
new BaseScanTaskGroup<>(java.util.Collections.singletonList(task));
87+
88+
return new RowDataReader(
89+
table,
90+
singleTaskGroup,
91+
tableSchema,
92+
expectedSchema,
93+
partition.isCaseSensitive(),
94+
partition.cacheDeleteFilesOnExecutors());
95+
})
96+
.collect(Collectors.toList());
97+
98+
List<PartitionReader<InternalRow>> readers =
99+
fileReaders.stream()
100+
.map(reader -> (PartitionReader<InternalRow>) reader)
101+
.collect(Collectors.toList());
102+
103+
StructType sparkSchema = SparkSchemaUtil.convert(expectedSchema);
104+
this.mergingReader =
105+
new MergingPartitionReader<>(readers, sortOrder, sparkSchema, expectedSchema);
106+
}
107+
108+
@Override
109+
public boolean next() throws IOException {
110+
return mergingReader.next();
111+
}
112+
113+
@Override
114+
public InternalRow get() {
115+
return mergingReader.get();
116+
}
117+
118+
@Override
119+
public void close() throws IOException {
120+
mergingReader.close();
121+
}
122+
123+
public CustomTaskMetric[] currentMetricsValues() {
124+
long totalSplits = fileReaders.size();
125+
126+
long totalDeletes =
127+
fileReaders.stream()
128+
.flatMap(reader -> Arrays.stream(reader.currentMetricsValues()))
129+
.filter(
130+
metric -> metric instanceof org.apache.iceberg.spark.source.metrics.TaskNumDeletes)
131+
.mapToLong(CustomTaskMetric::value)
132+
.sum();
133+
134+
return new CustomTaskMetric[] {
135+
new org.apache.iceberg.spark.source.metrics.TaskNumSplits(totalSplits),
136+
new org.apache.iceberg.spark.source.metrics.TaskNumDeletes(totalDeletes)
137+
};
138+
}
139+
}

0 commit comments

Comments
 (0)