Skip to content

Commit 60bae8e

Browse files
committed
update api and add tests
1 parent 4155cf7 commit 60bae8e

File tree

7 files changed

+542
-0
lines changed

7 files changed

+542
-0
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import org.apache.fluss.client.lookup.TableLookup;
2424
import org.apache.fluss.client.metadata.ClientSchemaGetter;
2525
import org.apache.fluss.client.table.scanner.Scan;
26+
import org.apache.fluss.client.table.scanner.SnapshotQuery;
2627
import org.apache.fluss.client.table.scanner.TableScan;
28+
import org.apache.fluss.client.table.scanner.TableSnapshotQuery;
2729
import org.apache.fluss.client.table.writer.Append;
2830
import org.apache.fluss.client.table.writer.TableAppend;
2931
import org.apache.fluss.client.table.writer.TableUpsert;
@@ -67,6 +69,15 @@ public Scan newScan() {
6769
return new TableScan(conn, tableInfo, schemaGetter);
6870
}
6971

72+
@Override
73+
public SnapshotQuery newSnapshotQuery() {
74+
checkState(
75+
hasPrimaryKey,
76+
"Table %s is not a Primary Key Table and doesn't support SnapshotQuery.",
77+
tablePath);
78+
return new TableSnapshotQuery(conn, tableInfo, schemaGetter);
79+
}
80+
7081
@Override
7182
public Lookup newLookup() {
7283
return new TableLookup(

fluss-client/src/main/java/org/apache/fluss/client/table/Table.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.fluss.client.lookup.Lookup;
2323
import org.apache.fluss.client.lookup.Lookuper;
2424
import org.apache.fluss.client.table.scanner.Scan;
25+
import org.apache.fluss.client.table.scanner.SnapshotQuery;
2526
import org.apache.fluss.client.table.writer.Append;
2627
import org.apache.fluss.client.table.writer.AppendWriter;
2728
import org.apache.fluss.client.table.writer.Upsert;
@@ -55,6 +56,12 @@ public interface Table extends AutoCloseable {
5556
*/
5657
Scan newScan();
5758

59+
/**
60+
* Creates a new {@link SnapshotQuery} for this table to configure and execute a snapshot query
61+
* to read all current data in a table bucket (requires to be a Primary Key Table).
62+
*/
63+
SnapshotQuery newSnapshotQuery();
64+
5865
/**
5966
* Creates a new {@link Lookup} for this table to configure and create a {@link Lookuper} to
6067
* lookup data for this table by primary key or a prefix of primary key.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table.scanner;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.metadata.TableBucket;
22+
import org.apache.fluss.row.InternalRow;
23+
import org.apache.fluss.utils.CloseableIterator;
24+
25+
import javax.annotation.Nullable;
26+
27+
import java.util.List;
28+
29+
/**
30+
* Used to configure and execute a snapshot query to read all current data in a table bucket for a
31+
* primary key table.
32+
*
33+
* @since 0.6
34+
*/
35+
@PublicEvolving
36+
public interface SnapshotQuery {
37+
38+
/**
39+
* Returns a new snapshot query from this that will read the given data columns.
40+
*
41+
* @param projectedColumns the selected column indexes
42+
*/
43+
SnapshotQuery project(@Nullable int[] projectedColumns);
44+
45+
/**
46+
* Returns a new snapshot query from this that will read the given data columns.
47+
*
48+
* @param projectedColumnNames the selected column names
49+
*/
50+
SnapshotQuery project(List<String> projectedColumnNames);
51+
52+
/**
53+
* Executes the snapshot query to read all current data in the given table bucket.
54+
*
55+
* @param tableBucket the table bucket to read
56+
* @return a closeable iterator of the rows in the table bucket
57+
*/
58+
CloseableIterator<InternalRow> execute(TableBucket tableBucket);
59+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table.scanner;
19+
20+
import org.apache.fluss.client.FlussConnection;
21+
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
22+
import org.apache.fluss.metadata.SchemaGetter;
23+
import org.apache.fluss.metadata.TableBucket;
24+
import org.apache.fluss.metadata.TableInfo;
25+
import org.apache.fluss.row.InternalRow;
26+
import org.apache.fluss.types.RowType;
27+
import org.apache.fluss.utils.CloseableIterator;
28+
29+
import javax.annotation.Nullable;
30+
31+
import java.io.IOException;
32+
import java.time.Duration;
33+
import java.util.Iterator;
34+
import java.util.List;
35+
import java.util.NoSuchElementException;
36+
37+
/** Implementation of {@link SnapshotQuery}. */
38+
public class TableSnapshotQuery implements SnapshotQuery {
39+
40+
private final FlussConnection conn;
41+
private final TableInfo tableInfo;
42+
private final SchemaGetter schemaGetter;
43+
44+
/** The projected fields to do projection. No projection if is null. */
45+
@Nullable private final int[] projectedColumns;
46+
47+
public TableSnapshotQuery(
48+
FlussConnection conn, TableInfo tableInfo, SchemaGetter schemaGetter) {
49+
this(conn, tableInfo, schemaGetter, null);
50+
}
51+
52+
private TableSnapshotQuery(
53+
FlussConnection conn,
54+
TableInfo tableInfo,
55+
SchemaGetter schemaGetter,
56+
@Nullable int[] projectedColumns) {
57+
this.conn = conn;
58+
this.tableInfo = tableInfo;
59+
this.schemaGetter = schemaGetter;
60+
this.projectedColumns = projectedColumns;
61+
}
62+
63+
@Override
64+
public SnapshotQuery project(@Nullable int[] projectedColumns) {
65+
return new TableSnapshotQuery(conn, tableInfo, schemaGetter, projectedColumns);
66+
}
67+
68+
@Override
69+
public SnapshotQuery project(List<String> projectedColumnNames) {
70+
int[] columnIndexes = new int[projectedColumnNames.size()];
71+
RowType rowType = tableInfo.getRowType();
72+
for (int i = 0; i < projectedColumnNames.size(); i++) {
73+
int index = rowType.getFieldIndex(projectedColumnNames.get(i));
74+
if (index < 0) {
75+
throw new IllegalArgumentException(
76+
String.format(
77+
"Field '%s' not found in table schema. Available fields: %s, Table: %s",
78+
projectedColumnNames.get(i),
79+
rowType.getFieldNames(),
80+
tableInfo.getTablePath()));
81+
}
82+
columnIndexes[i] = index;
83+
}
84+
return new TableSnapshotQuery(conn, tableInfo, schemaGetter, columnIndexes);
85+
}
86+
87+
@Override
88+
public CloseableIterator<InternalRow> execute(TableBucket tableBucket) {
89+
Scan scan = new TableScan(conn, tableInfo, schemaGetter);
90+
if (projectedColumns != null) {
91+
scan = scan.project(projectedColumns);
92+
}
93+
BatchScanner batchScanner = scan.createBatchScanner(tableBucket);
94+
return new BatchScannerIterator(batchScanner);
95+
}
96+
97+
private static class BatchScannerIterator implements CloseableIterator<InternalRow> {
98+
private final BatchScanner scanner;
99+
private Iterator<InternalRow> currentBatch;
100+
private boolean isClosed = false;
101+
102+
private BatchScannerIterator(BatchScanner scanner) {
103+
this.scanner = scanner;
104+
}
105+
106+
@Override
107+
public boolean hasNext() {
108+
ensureBatch();
109+
return currentBatch != null && currentBatch.hasNext();
110+
}
111+
112+
@Override
113+
public InternalRow next() {
114+
if (!hasNext()) {
115+
throw new NoSuchElementException();
116+
}
117+
return currentBatch.next();
118+
}
119+
120+
private void ensureBatch() {
121+
try {
122+
while ((currentBatch == null || !currentBatch.hasNext()) && !isClosed) {
123+
CloseableIterator<InternalRow> it =
124+
scanner.pollBatch(Duration.ofMinutes(1)); // Use a large timeout
125+
if (it == null) {
126+
isClosed = true;
127+
break;
128+
}
129+
if (it.hasNext()) {
130+
currentBatch = it;
131+
} else {
132+
it.close();
133+
}
134+
}
135+
} catch (IOException e) {
136+
throw new RuntimeException("Error polling batch from scanner", e);
137+
}
138+
}
139+
140+
@Override
141+
public void close() {
142+
if (!isClosed) {
143+
try {
144+
scanner.close();
145+
} catch (IOException e) {
146+
throw new RuntimeException("Error closing scanner", e);
147+
}
148+
isClosed = true;
149+
}
150+
}
151+
}
152+
}

0 commit comments

Comments
 (0)