Skip to content

Commit 917ec03

Browse files
authored
[core] Support consumer_id predicate pushdown for ConsumersTable (apache#7329)
1 parent 610ab4a commit 917ec03

File tree

2 files changed

+112
-1
lines changed

2 files changed

+112
-1
lines changed

paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@
2626
import org.apache.paimon.disk.IOManager;
2727
import org.apache.paimon.fs.FileIO;
2828
import org.apache.paimon.fs.Path;
29+
import org.apache.paimon.predicate.And;
30+
import org.apache.paimon.predicate.CompoundPredicate;
31+
import org.apache.paimon.predicate.Equal;
32+
import org.apache.paimon.predicate.InPredicateVisitor;
33+
import org.apache.paimon.predicate.LeafPredicate;
34+
import org.apache.paimon.predicate.LeafPredicateExtractor;
35+
import org.apache.paimon.predicate.Or;
2936
import org.apache.paimon.predicate.Predicate;
3037
import org.apache.paimon.reader.RecordReader;
3138
import org.apache.paimon.table.FileStoreTable;
@@ -47,8 +54,10 @@
4754
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
4855

4956
import java.io.IOException;
57+
import java.util.ArrayList;
5058
import java.util.Arrays;
5159
import java.util.Collections;
60+
import java.util.HashMap;
5261
import java.util.Iterator;
5362
import java.util.List;
5463
import java.util.Map;
@@ -171,16 +180,49 @@ private class ConsumersRead implements InnerTableRead {
171180

172181
private final FileIO fileIO;
173182
private RowType readType;
183+
private final List<String> consumerIds = new ArrayList<>();
174184

175185
public ConsumersRead(FileIO fileIO) {
176186
this.fileIO = fileIO;
177187
}
178188

179189
@Override
180190
public InnerTableRead withFilter(Predicate predicate) {
191+
if (predicate == null) {
192+
return this;
193+
}
194+
195+
String leafName = "consumer_id";
196+
if (predicate instanceof CompoundPredicate) {
197+
CompoundPredicate compoundPredicate = (CompoundPredicate) predicate;
198+
if ((compoundPredicate.function()) instanceof Or) {
199+
// optimize for IN filter
200+
InPredicateVisitor.extractInElements(predicate, leafName)
201+
.ifPresent(
202+
leafs ->
203+
leafs.forEach(
204+
leaf -> consumerIds.add(leaf.toString())));
205+
} else if ((compoundPredicate.function()) instanceof And) {
206+
List<Predicate> children = compoundPredicate.children();
207+
for (Predicate leaf : children) {
208+
handleLeafPredicate(leaf, leafName);
209+
}
210+
}
211+
} else {
212+
handleLeafPredicate(predicate, leafName);
213+
}
214+
181215
return this;
182216
}
183217

218+
public void handleLeafPredicate(Predicate predicate, String leafName) {
219+
LeafPredicate consumerPred =
220+
predicate.visit(LeafPredicateExtractor.INSTANCE).get(leafName);
221+
if (consumerPred != null && consumerPred.function() instanceof Equal) {
222+
consumerIds.add(consumerPred.literals().get(0).toString());
223+
}
224+
}
225+
184226
@Override
185227
public InnerTableRead withReadType(RowType readType) {
186228
this.readType = readType;
@@ -198,7 +240,19 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
198240
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
199241
}
200242
Path location = ((ConsumersTable.ConsumersSplit) split).location;
201-
Map<String, Long> consumers = new ConsumerManager(fileIO, location, branch).consumers();
243+
Map<String, Long> consumers;
244+
if (!consumerIds.isEmpty()) {
245+
consumers = new HashMap<>();
246+
ConsumerManager consumerManager = new ConsumerManager(fileIO, location, branch);
247+
for (String consumerId : consumerIds) {
248+
consumerManager
249+
.consumer(consumerId)
250+
.ifPresent(
251+
consumer -> consumers.put(consumerId, consumer.nextSnapshot()));
252+
}
253+
} else {
254+
consumers = new ConsumerManager(fileIO, location, branch).consumers();
255+
}
202256
Iterator<InternalRow> rows =
203257
Iterators.transform(consumers.entrySet().iterator(), this::toRow);
204258
if (readType != null) {

paimon-core/src/test/java/org/apache/paimon/table/system/ConsumersTableTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,23 @@
2525
import org.apache.paimon.data.BinaryString;
2626
import org.apache.paimon.data.GenericRow;
2727
import org.apache.paimon.data.InternalRow;
28+
import org.apache.paimon.data.serializer.InternalRowSerializer;
29+
import org.apache.paimon.predicate.Predicate;
30+
import org.apache.paimon.predicate.PredicateBuilder;
31+
import org.apache.paimon.reader.RecordReader;
2832
import org.apache.paimon.schema.Schema;
2933
import org.apache.paimon.table.FileStoreTable;
34+
import org.apache.paimon.table.Table;
3035
import org.apache.paimon.table.TableTestBase;
36+
import org.apache.paimon.table.source.ReadBuilder;
3137
import org.apache.paimon.types.DataTypes;
3238

3339
import org.junit.jupiter.api.BeforeEach;
3440
import org.junit.jupiter.api.Test;
3541

3642
import java.io.IOException;
43+
import java.util.ArrayList;
44+
import java.util.Arrays;
3745
import java.util.List;
3846
import java.util.Map;
3947
import java.util.stream.Collectors;
@@ -75,6 +83,40 @@ public void testPartitionRecordCount() throws Exception {
7583
assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
7684
}
7785

86+
@Test
87+
public void testFilterByConsumerIdEqual() throws Exception {
88+
Predicate predicate = consumerIdEqual("id1");
89+
List<InternalRow> expectedRow =
90+
Arrays.asList(GenericRow.of(BinaryString.fromString("id1"), 5L));
91+
List<InternalRow> result = readWithFilter(consumersTable, predicate);
92+
assertThat(result).containsExactlyElementsOf(expectedRow);
93+
}
94+
95+
@Test
96+
public void testFilterByConsumerIdEqualNoMatch() throws Exception {
97+
Predicate predicate = consumerIdEqual("id999");
98+
List<InternalRow> result = readWithFilter(consumersTable, predicate);
99+
assertThat(result).isEmpty();
100+
}
101+
102+
@Test
103+
public void testFilterByConsumerIdIn() throws Exception {
104+
PredicateBuilder builder = new PredicateBuilder(ConsumersTable.TABLE_TYPE);
105+
Predicate predicate = builder.in(0, Arrays.asList("id1", "id999"));
106+
List<InternalRow> expectedRow =
107+
Arrays.asList(GenericRow.of(BinaryString.fromString("id1"), 5L));
108+
List<InternalRow> result = readWithFilter(consumersTable, predicate);
109+
assertThat(result).containsExactlyElementsOf(expectedRow);
110+
}
111+
112+
@Test
113+
public void testFilterByConsumerIdInNoMatch() throws Exception {
114+
PredicateBuilder builder = new PredicateBuilder(ConsumersTable.TABLE_TYPE);
115+
Predicate predicate = builder.in(0, Arrays.asList("id998", "id999"));
116+
List<InternalRow> result = readWithFilter(consumersTable, predicate);
117+
assertThat(result).isEmpty();
118+
}
119+
78120
private List<InternalRow> getExpectedResult() throws IOException {
79121
Map<String, Long> consumers = manager.consumers();
80122
return consumers.entrySet().stream()
@@ -84,4 +126,19 @@ private List<InternalRow> getExpectedResult() throws IOException {
84126
BinaryString.fromString(entry.getKey()), entry.getValue()))
85127
.collect(Collectors.toList());
86128
}
129+
130+
private Predicate consumerIdEqual(String consumerId) {
131+
PredicateBuilder builder = new PredicateBuilder(ConsumersTable.TABLE_TYPE);
132+
return builder.equal(0, BinaryString.fromString(consumerId));
133+
}
134+
135+
private List<InternalRow> readWithFilter(Table table, Predicate predicate) throws Exception {
136+
ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate);
137+
RecordReader<InternalRow> reader =
138+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
139+
InternalRowSerializer serializer = new InternalRowSerializer(table.rowType());
140+
List<InternalRow> rows = new ArrayList<>();
141+
reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
142+
return rows;
143+
}
87144
}

0 commit comments

Comments
 (0)