Skip to content

Commit e14a811

Browse files
authored
[spark] Introduce global file index builder on spark (#6684)
1 parent 8fa8b0d commit e14a811

File tree

21 files changed

+1384
-72
lines changed

21 files changed

+1384
-72
lines changed

paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,4 +168,20 @@ public static RowType rowTypeWithRowTracking(RowType rowType, boolean sequenceNu
168168
: SpecialFields.SEQUENCE_NUMBER);
169169
return new RowType(fieldsWithRowTracking);
170170
}
171+
172+
public static RowType rowTypeWithRowId(RowType rowType) {
173+
List<DataField> fieldsWithRowTracking = new ArrayList<>(rowType.getFields());
174+
175+
fieldsWithRowTracking.forEach(
176+
f -> {
177+
if (ROW_ID.name().equals(f.name())) {
178+
throw new IllegalArgumentException(
179+
"Row tracking field name '"
180+
+ f.name()
181+
+ "' conflicts with existing field names.");
182+
}
183+
});
184+
fieldsWithRowTracking.add(SpecialFields.ROW_ID);
185+
return new RowType(fieldsWithRowTracking);
186+
}
171187
}

paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,22 @@
2323

2424
import java.io.Closeable;
2525
import java.util.List;
26-
import java.util.Optional;
2726

2827
/** Index reader for global index, return {@link GlobalIndexResult}. */
29-
public interface GlobalIndexReader extends FunctionVisitor<Optional<GlobalIndexResult>>, Closeable {
28+
public interface GlobalIndexReader extends FunctionVisitor<GlobalIndexResult>, Closeable {
3029

3130
@Override
32-
default Optional<GlobalIndexResult> visitAnd(List<Optional<GlobalIndexResult>> children) {
31+
default GlobalIndexResult visitAnd(List<GlobalIndexResult> children) {
3332
throw new UnsupportedOperationException("Should not invoke this");
3433
}
3534

3635
@Override
37-
default Optional<GlobalIndexResult> visitOr(List<Optional<GlobalIndexResult>> children) {
36+
default GlobalIndexResult visitOr(List<GlobalIndexResult> children) {
3837
throw new UnsupportedOperationException("Should not invoke this");
3938
}
4039

4140
@Override
42-
default Optional<GlobalIndexResult> visit(TransformPredicate predicate) {
41+
default GlobalIndexResult visit(TransformPredicate predicate) {
4342
throw new UnsupportedOperationException("Should not invoke this");
4443
}
4544
}

paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
/** Index writer for global index. */
2828
public interface GlobalIndexWriter {
2929

30-
void write(Object key);
30+
void write(@Nullable Object key);
3131

3232
List<ResultEntry> finish();
3333

paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexReaderWrapper.java

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.io.Closeable;
2828
import java.io.IOException;
2929
import java.util.List;
30-
import java.util.Optional;
3130
import java.util.function.Function;
3231

3332
/** A {@link GlobalIndexReader} wrapper for {@link FileIndexReader}. */
@@ -47,68 +46,68 @@ public FileIndexReaderWrapper(
4746
}
4847

4948
@Override
50-
public Optional<GlobalIndexResult> visitIsNotNull(FieldRef fieldRef) {
51-
return Optional.ofNullable(transform.apply(reader.visitIsNotNull(fieldRef)));
49+
public GlobalIndexResult visitIsNotNull(FieldRef fieldRef) {
50+
return transform.apply(reader.visitIsNotNull(fieldRef));
5251
}
5352

5453
@Override
55-
public Optional<GlobalIndexResult> visitIsNull(FieldRef fieldRef) {
56-
return Optional.ofNullable(transform.apply(reader.visitIsNull(fieldRef)));
54+
public GlobalIndexResult visitIsNull(FieldRef fieldRef) {
55+
return transform.apply(reader.visitIsNull(fieldRef));
5756
}
5857

5958
@Override
60-
public Optional<GlobalIndexResult> visitStartsWith(FieldRef fieldRef, Object literal) {
61-
return Optional.ofNullable(transform.apply(reader.visitStartsWith(fieldRef, literal)));
59+
public GlobalIndexResult visitStartsWith(FieldRef fieldRef, Object literal) {
60+
return transform.apply(reader.visitStartsWith(fieldRef, literal));
6261
}
6362

6463
@Override
65-
public Optional<GlobalIndexResult> visitEndsWith(FieldRef fieldRef, Object literal) {
66-
return Optional.ofNullable(transform.apply(reader.visitEndsWith(fieldRef, literal)));
64+
public GlobalIndexResult visitEndsWith(FieldRef fieldRef, Object literal) {
65+
return transform.apply(reader.visitEndsWith(fieldRef, literal));
6766
}
6867

6968
@Override
70-
public Optional<GlobalIndexResult> visitContains(FieldRef fieldRef, Object literal) {
71-
return Optional.ofNullable(transform.apply(reader.visitContains(fieldRef, literal)));
69+
public GlobalIndexResult visitContains(FieldRef fieldRef, Object literal) {
70+
return transform.apply(reader.visitContains(fieldRef, literal));
7271
}
7372

7473
@Override
75-
public Optional<GlobalIndexResult> visitLessThan(FieldRef fieldRef, Object literal) {
76-
return Optional.ofNullable(transform.apply(reader.visitLessThan(fieldRef, literal)));
74+
public GlobalIndexResult visitLessThan(FieldRef fieldRef, Object literal) {
75+
return transform.apply(reader.visitLessThan(fieldRef, literal));
7776
}
7877

7978
@Override
80-
public Optional<GlobalIndexResult> visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
81-
return Optional.ofNullable(transform.apply(reader.visitGreaterOrEqual(fieldRef, literal)));
79+
public GlobalIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
80+
return transform.apply(reader.visitGreaterOrEqual(fieldRef, literal));
8281
}
8382

8483
@Override
85-
public Optional<GlobalIndexResult> visitNotEqual(FieldRef fieldRef, Object literal) {
86-
return Optional.ofNullable(transform.apply(reader.visitNotEqual(fieldRef, literal)));
84+
public GlobalIndexResult visitNotEqual(FieldRef fieldRef, Object literal) {
85+
return transform.apply(reader.visitNotEqual(fieldRef, literal));
8786
}
8887

8988
@Override
90-
public Optional<GlobalIndexResult> visitLessOrEqual(FieldRef fieldRef, Object literal) {
91-
return Optional.ofNullable(transform.apply(reader.visitLessOrEqual(fieldRef, literal)));
89+
public GlobalIndexResult visitLessOrEqual(FieldRef fieldRef, Object literal) {
90+
return transform.apply(reader.visitLessOrEqual(fieldRef, literal));
9291
}
9392

9493
@Override
95-
public Optional<GlobalIndexResult> visitEqual(FieldRef fieldRef, Object literal) {
96-
return Optional.ofNullable(transform.apply(reader.visitEqual(fieldRef, literal)));
94+
public GlobalIndexResult visitEqual(FieldRef fieldRef, Object literal) {
95+
return transform.apply(reader.visitEqual(fieldRef, literal));
9796
}
9897

9998
@Override
100-
public Optional<GlobalIndexResult> visitGreaterThan(FieldRef fieldRef, Object literal) {
101-
return Optional.ofNullable(transform.apply(reader.visitGreaterThan(fieldRef, literal)));
99+
public GlobalIndexResult visitGreaterThan(FieldRef fieldRef, Object literal) {
100+
return transform.apply(reader.visitGreaterThan(fieldRef, literal));
102101
}
103102

104103
@Override
105-
public Optional<GlobalIndexResult> visitIn(FieldRef fieldRef, List<Object> literals) {
106-
return Optional.ofNullable(transform.apply(reader.visitIn(fieldRef, literals)));
104+
public GlobalIndexResult visitIn(FieldRef fieldRef, List<Object> literals) {
105+
return transform.apply(reader.visitIn(fieldRef, literals));
107106
}
108107

109108
@Override
110-
public Optional<GlobalIndexResult> visitNotIn(FieldRef fieldRef, List<Object> literals) {
111-
return Optional.ofNullable(transform.apply(reader.visitNotIn(fieldRef, literals)));
109+
public GlobalIndexResult visitNotIn(FieldRef fieldRef, List<Object> literals) {
110+
return transform.apply(reader.visitNotIn(fieldRef, literals));
112111
}
113112

114113
@Override

paimon-common/src/main/java/org/apache/paimon/utils/Range.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818

1919
package org.apache.paimon.utils;
2020

21+
import java.io.Serializable;
22+
import java.util.ArrayList;
23+
import java.util.List;
2124
import java.util.Objects;
2225

2326
/** Range represents from (inclusive) and to (inclusive). */
24-
public class Range {
27+
public class Range implements Serializable {
2528

2629
public final long from;
2730
public final long to;
@@ -49,6 +52,14 @@ public boolean isAfter(Range other) {
4952
return from > other.to;
5053
}
5154

55+
public List<Long> toListLong() {
56+
List<Long> longs = new ArrayList<>();
57+
for (long i = from; i <= to; i++) {
58+
longs.add(i);
59+
}
60+
return longs;
61+
}
62+
5263
@Override
5364
public boolean equals(Object o) {
5465
if (o == null || getClass() != o.getClass()) {

paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -93,26 +93,23 @@ private void testStringType(int version) throws Exception {
9393
writer.write(o);
9494
}
9595
});
96-
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, a).get())
96+
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, a))
9797
.getBitmapIndexResult()
9898
.get()
9999
.equals(RoaringBitmap32.bitmapOf(0, 4));
100-
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, b).get())
100+
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, b))
101101
.getBitmapIndexResult()
102102
.get()
103103
.equals(RoaringBitmap32.bitmapOf(2));
104-
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
104+
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
105105
.getBitmapIndexResult()
106106
.get()
107107
.equals(RoaringBitmap32.bitmapOf(1, 3));
108-
assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, Arrays.asList(a, b)).get())
108+
assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, Arrays.asList(a, b)))
109109
.getBitmapIndexResult()
110110
.get()
111111
.equals(RoaringBitmap32.bitmapOf(0, 2, 4));
112-
assert !reader.visitEqual(fieldRef, BinaryString.fromString("c"))
113-
.get()
114-
.iterator()
115-
.hasNext();
112+
assert !reader.visitEqual(fieldRef, BinaryString.fromString("c")).iterator().hasNext();
116113
}
117114

118115
private void testIntType(int version) throws Exception {
@@ -128,24 +125,24 @@ private void testIntType(int version) throws Exception {
128125
writer.write(o);
129126
}
130127
});
131-
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 0).get())
128+
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 0))
132129
.getBitmapIndexResult()
133130
.get()
134131
.equals(RoaringBitmap32.bitmapOf(0));
135-
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 1).get())
132+
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 1))
136133
.getBitmapIndexResult()
137134
.get()
138135
.equals(RoaringBitmap32.bitmapOf(1));
139-
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
136+
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
140137
.getBitmapIndexResult()
141138
.get()
142139
.equals(RoaringBitmap32.bitmapOf(2));
143-
assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, Arrays.asList(0, 1, 2)).get())
140+
assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, Arrays.asList(0, 1, 2)))
144141
.getBitmapIndexResult()
145142
.get()
146143
.equals(RoaringBitmap32.bitmapOf(0, 1));
147144

148-
assert !reader.visitEqual(fieldRef, 2).get().iterator().hasNext();
145+
assert !reader.visitEqual(fieldRef, 2).iterator().hasNext();
149146
}
150147

151148
private void testBooleanType(int version) throws Exception {
@@ -161,11 +158,11 @@ private void testBooleanType(int version) throws Exception {
161158
writer.write(o);
162159
}
163160
});
164-
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, Boolean.TRUE).get())
161+
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, Boolean.TRUE))
165162
.getBitmapIndexResult()
166163
.get()
167164
.equals(RoaringBitmap32.bitmapOf(0, 2));
168-
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
165+
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
169166
.getBitmapIndexResult()
170167
.get()
171168
.equals(RoaringBitmap32.bitmapOf(4));
@@ -202,13 +199,12 @@ private void testHighCardinality(
202199
long time2 = System.currentTimeMillis();
203200
GlobalIndexResult result =
204201
reader.visitEqual(
205-
fieldRef, BinaryString.fromString(prefix + (approxCardinality / 2)))
206-
.get();
202+
fieldRef, BinaryString.fromString(prefix + (approxCardinality / 2)));
207203
RoaringBitmap32 resultBm = ((BitmapIndexResultWrapper) result).getBitmapIndexResult().get();
208204
System.out.println("read time: " + (System.currentTimeMillis() - time2));
209205
assert resultBm.equals(middleBm);
210206
long time3 = System.currentTimeMillis();
211-
GlobalIndexResult resultNull = reader.visitIsNull(fieldRef).get();
207+
GlobalIndexResult resultNull = reader.visitIsNull(fieldRef);
212208
RoaringBitmap32 resultNullBm =
213209
((BitmapIndexResultWrapper) resultNull).getBitmapIndexResult().get();
214210
System.out.println("read null bitmap time: " + (System.currentTimeMillis() - time3));
@@ -277,26 +273,23 @@ private void testStringTypeWithReusing(int version) throws Exception {
277273
a.pointTo(c.getSegments(), c.getOffset(), c.getSizeInBytes());
278274
writer.write(null);
279275
});
280-
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, a).get())
276+
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, a))
281277
.getBitmapIndexResult()
282278
.get()
283279
.equals(RoaringBitmap32.bitmapOf(0));
284-
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, b).get())
280+
assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, b))
285281
.getBitmapIndexResult()
286282
.get()
287283
.equals(RoaringBitmap32.bitmapOf(3));
288-
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
284+
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
289285
.getBitmapIndexResult()
290286
.get()
291287
.equals(RoaringBitmap32.bitmapOf(1, 2, 4, 5));
292-
assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, Arrays.asList(a, b)).get())
288+
assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, Arrays.asList(a, b)))
293289
.getBitmapIndexResult()
294290
.get()
295291
.equals(RoaringBitmap32.bitmapOf(0, 3));
296-
assert !reader.visitEqual(fieldRef, BinaryString.fromString("c"))
297-
.get()
298-
.iterator()
299-
.hasNext();
292+
assert !reader.visitEqual(fieldRef, BinaryString.fromString("c")).iterator().hasNext();
300293
}
301294

302295
private void testAllNull(int version) throws Exception {
@@ -312,10 +305,10 @@ private void testAllNull(int version) throws Exception {
312305
writer.write(o);
313306
}
314307
});
315-
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
308+
assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
316309
.getBitmapIndexResult()
317310
.get()
318311
.equals(RoaringBitmap32.bitmapOf(0, 1, 2));
319-
assert !reader.visitIsNotNull(fieldRef).get().iterator().hasNext();
312+
assert !reader.visitIsNotNull(fieldRef).iterator().hasNext();
320313
}
321314
}

paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,21 +75,18 @@ public Optional<GlobalIndexResult> visit(LeafPredicate predicate) {
7575
Collection<GlobalIndexReader> readers =
7676
indexReadersCache.computeIfAbsent(fieldId, readersFunction::apply);
7777
for (GlobalIndexReader fileIndexReader : readers) {
78-
Optional<GlobalIndexResult> childResult =
78+
GlobalIndexResult childResult =
7979
predicate.function().visit(fileIndexReader, fieldRef, predicate.literals());
8080

8181
// AND Operation
82-
if (childResult.isPresent()) {
83-
if (compoundResult.isPresent()) {
84-
GlobalIndexResult r1 = compoundResult.get();
85-
GlobalIndexResult r2 = childResult.get();
86-
compoundResult = Optional.of(r1.and(r2));
87-
} else {
88-
compoundResult = childResult;
89-
}
82+
if (compoundResult.isPresent()) {
83+
GlobalIndexResult r1 = compoundResult.get();
84+
compoundResult = Optional.of(r1.and(childResult));
85+
} else {
86+
compoundResult = Optional.of(childResult);
9087
}
9188

92-
if (compoundResult.isPresent() && !compoundResult.get().iterator().hasNext()) {
89+
if (!compoundResult.get().iterator().hasNext()) {
9390
return compoundResult;
9491
}
9592
}

paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexFileReadWrite.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ public Path filePath(String fileName) {
4848
return indexPathFactory.toPath(fileName);
4949
}
5050

51+
public long fileSize(String fileName) throws IOException {
52+
return fileIO.getFileSize(filePath(fileName));
53+
}
54+
5155
public OutputStream newOutputStream(String fileName) throws IOException {
5256
return fileIO.newOutputStream(indexPathFactory.toPath(fileName), true);
5357
}

0 commit comments

Comments
 (0)