Skip to content

Commit ea2d61c

Browse files
loserwang1024wuchong
authored andcommitted
[server][client] Support Schema Evolution (ADD COLUMN LAST) in Fluss and connector (apache#2010)
1 parent 543c703 commit ea2d61c

File tree

187 files changed

+5374
-926
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

187 files changed

+5374
-926
lines changed

fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ public Admin getAdmin() {
100100
public Table getTable(TablePath tablePath) {
101101
// force to update the table info from server to avoid stale data in cache.
102102
metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
103-
104103
Admin admin = getOrCreateAdmin();
105104
return new FlussTable(this, tablePath, admin.getTableInfo(tablePath).join());
106105
}

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import java.util.concurrent.CompletableFuture;
9191
import java.util.stream.Collectors;
9292

93+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.addPbAlterSchemas;
9394
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
9495
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
9596
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
@@ -249,12 +250,17 @@ public CompletableFuture<Void> alterTable(
249250
TablePath tablePath, List<TableChange> tableChanges, boolean ignoreIfNotExists) {
250251
tablePath.validate();
251252
AlterTableRequest request = new AlterTableRequest();
252-
253253
List<PbAlterConfig> pbFlussTableChanges =
254254
tableChanges.stream()
255+
.filter(tableChange -> !(tableChange instanceof TableChange.SchemaChange))
255256
.map(ClientRpcMessageUtils::toPbAlterConfigs)
256257
.collect(Collectors.toList());
257258

259+
List<TableChange> schemaChanges =
260+
tableChanges.stream()
261+
.filter(tableChange -> tableChange instanceof TableChange.SchemaChange)
262+
.collect(Collectors.toList());
263+
addPbAlterSchemas(request, schemaChanges);
258264
request.addAllConfigChanges(pbFlussTableChanges)
259265
.setIgnoreIfNotExists(ignoreIfNotExists)
260266
.setTablePath()

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@
2222
import org.apache.fluss.client.table.getter.PartitionGetter;
2323
import org.apache.fluss.exception.PartitionNotExistException;
2424
import org.apache.fluss.metadata.DataLakeFormat;
25+
import org.apache.fluss.metadata.Schema;
26+
import org.apache.fluss.metadata.SchemaGetter;
2527
import org.apache.fluss.metadata.TableBucket;
2628
import org.apache.fluss.metadata.TableInfo;
2729
import org.apache.fluss.row.InternalRow;
28-
import org.apache.fluss.row.decode.RowDecoder;
30+
import org.apache.fluss.row.ProjectedRow;
2931
import org.apache.fluss.row.encode.KeyEncoder;
3032
import org.apache.fluss.row.encode.ValueDecoder;
31-
import org.apache.fluss.types.DataType;
3233
import org.apache.fluss.types.RowType;
3334

3435
import javax.annotation.Nullable;
@@ -70,8 +71,11 @@ class PrefixKeyLookuper implements Lookuper {
7071
/** Decode the lookup bytes to result row. */
7172
private final ValueDecoder kvValueDecoder;
7273

74+
private final SchemaGetter schemaGetter;
75+
7376
public PrefixKeyLookuper(
7477
TableInfo tableInfo,
78+
SchemaGetter schemaGetter,
7579
MetadataUpdater metadataUpdater,
7680
LookupClient lookupClient,
7781
List<String> lookupColumnNames) {
@@ -93,10 +97,8 @@ public PrefixKeyLookuper(
9397
? new PartitionGetter(lookupRowType, tableInfo.getPartitionKeys())
9498
: null;
9599
this.kvValueDecoder =
96-
new ValueDecoder(
97-
RowDecoder.create(
98-
tableInfo.getTableConfig().getKvFormat(),
99-
tableInfo.getRowType().getChildren().toArray(new DataType[0])));
100+
new ValueDecoder(schemaGetter, tableInfo.getTableConfig().getKvFormat());
101+
this.schemaGetter = schemaGetter;
100102
}
101103

102104
private void validatePrefixLookup(TableInfo tableInfo, List<String> lookupColumns) {
@@ -167,18 +169,35 @@ public CompletableFuture<LookupResult> lookup(InternalRow prefixKey) {
167169
}
168170

169171
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
170-
return lookupClient
171-
.prefixLookup(tableBucket, bucketKeyBytes)
172-
.thenApply(
173-
result -> {
174-
List<InternalRow> rowList = new ArrayList<>(result.size());
175-
for (byte[] valueBytes : result) {
176-
if (valueBytes == null) {
177-
continue;
178-
}
179-
rowList.add(kvValueDecoder.decodeValue(valueBytes).row);
172+
CompletableFuture<LookupResult> future = new CompletableFuture<>();
173+
174+
CompletableFuture.runAsync(
175+
() -> {
176+
try {
177+
List<byte[]> result =
178+
lookupClient.prefixLookup(tableBucket, bucketKeyBytes).get();
179+
List<InternalRow> rowList = new ArrayList<>(result.size());
180+
for (byte[] valueBytes : result) {
181+
if (valueBytes == null) {
182+
continue;
183+
}
184+
ValueDecoder.Value value = kvValueDecoder.decodeValue(valueBytes);
185+
InternalRow row;
186+
if (value.schemaId == tableInfo.getSchemaId()) {
187+
row = value.row;
188+
} else {
189+
Schema schema = schemaGetter.getSchema(value.schemaId);
190+
row =
191+
ProjectedRow.from(schema, tableInfo.getSchema())
192+
.replaceRow(value.row);
180193
}
181-
return new LookupResult(rowList);
182-
});
194+
rowList.add(row);
195+
}
196+
future.complete(new LookupResult(rowList));
197+
} catch (Exception e) {
198+
future.complete(new LookupResult(Collections.emptyList()));
199+
}
200+
});
201+
return future;
183202
}
184203
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@
2222
import org.apache.fluss.client.table.getter.PartitionGetter;
2323
import org.apache.fluss.exception.PartitionNotExistException;
2424
import org.apache.fluss.metadata.DataLakeFormat;
25+
import org.apache.fluss.metadata.Schema;
26+
import org.apache.fluss.metadata.SchemaGetter;
2527
import org.apache.fluss.metadata.TableBucket;
2628
import org.apache.fluss.metadata.TableInfo;
2729
import org.apache.fluss.row.InternalRow;
28-
import org.apache.fluss.row.decode.RowDecoder;
30+
import org.apache.fluss.row.ProjectedRow;
2931
import org.apache.fluss.row.encode.KeyEncoder;
3032
import org.apache.fluss.row.encode.ValueDecoder;
31-
import org.apache.fluss.types.DataType;
3233
import org.apache.fluss.types.RowType;
3334

3435
import javax.annotation.Nullable;
@@ -39,6 +40,7 @@
3940

4041
import static org.apache.fluss.client.utils.ClientUtils.getPartitionId;
4142
import static org.apache.fluss.utils.Preconditions.checkArgument;
43+
import static org.apache.fluss.utils.Preconditions.checkNotNull;
4244

4345
/** An implementation of {@link Lookuper} that lookups by primary key. */
4446
@NotThreadSafe
@@ -67,8 +69,13 @@ class PrimaryKeyLookuper implements Lookuper {
6769
/** Decode the lookup bytes to result row. */
6870
private final ValueDecoder kvValueDecoder;
6971

72+
private final SchemaGetter schemaGetter;
73+
7074
public PrimaryKeyLookuper(
71-
TableInfo tableInfo, MetadataUpdater metadataUpdater, LookupClient lookupClient) {
75+
TableInfo tableInfo,
76+
SchemaGetter schemaGetter,
77+
MetadataUpdater metadataUpdater,
78+
LookupClient lookupClient) {
7279
checkArgument(
7380
tableInfo.hasPrimaryKey(),
7481
"Log table %s doesn't support lookup",
@@ -96,10 +103,8 @@ public PrimaryKeyLookuper(
96103
? new PartitionGetter(lookupRowType, tableInfo.getPartitionKeys())
97104
: null;
98105
this.kvValueDecoder =
99-
new ValueDecoder(
100-
RowDecoder.create(
101-
tableInfo.getTableConfig().getKvFormat(),
102-
tableInfo.getRowType().getChildren().toArray(new DataType[0])));
106+
new ValueDecoder(schemaGetter, tableInfo.getTableConfig().getKvFormat());
107+
this.schemaGetter = schemaGetter;
103108
}
104109

105110
@Override
@@ -127,15 +132,30 @@ public CompletableFuture<LookupResult> lookup(InternalRow lookupKey) {
127132

128133
int bucketId = bucketingFunction.bucketing(bkBytes, numBuckets);
129134
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
130-
return lookupClient
131-
.lookup(tableBucket, pkBytes)
132-
.thenApply(
133-
valueBytes -> {
134-
InternalRow row =
135-
valueBytes == null
136-
? null
137-
: kvValueDecoder.decodeValue(valueBytes).row;
138-
return new LookupResult(row);
139-
});
135+
CompletableFuture<LookupResult> future = new CompletableFuture<>();
136+
137+
CompletableFuture.runAsync(
138+
() -> {
139+
try {
140+
byte[] valueBytes = lookupClient.lookup(tableBucket, pkBytes).get();
141+
InternalRow row = null;
142+
if (valueBytes != null) {
143+
ValueDecoder.Value value = kvValueDecoder.decodeValue(valueBytes);
144+
if (value.schemaId == tableInfo.getSchemaId()) {
145+
row = value.row;
146+
} else {
147+
Schema schema = schemaGetter.getSchema(value.schemaId);
148+
checkNotNull(schema, "schema is null");
149+
row =
150+
ProjectedRow.from(schema, tableInfo.getSchema())
151+
.replaceRow(value.row);
152+
}
153+
}
154+
future.complete(new LookupResult(row));
155+
} catch (Exception e) {
156+
future.completeExceptionally(e);
157+
}
158+
});
159+
return future;
140160
}
141161
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.client.lookup;
1919

2020
import org.apache.fluss.client.metadata.MetadataUpdater;
21+
import org.apache.fluss.metadata.SchemaGetter;
2122
import org.apache.fluss.metadata.TableInfo;
2223

2324
import javax.annotation.Nullable;
@@ -28,39 +29,46 @@
2829
public class TableLookup implements Lookup {
2930

3031
private final TableInfo tableInfo;
32+
private final SchemaGetter schemaGetter;
3133
private final MetadataUpdater metadataUpdater;
3234
private final LookupClient lookupClient;
3335

3436
@Nullable private final List<String> lookupColumnNames;
3537

3638
public TableLookup(
37-
TableInfo tableInfo, MetadataUpdater metadataUpdater, LookupClient lookupClient) {
38-
this(tableInfo, metadataUpdater, lookupClient, null);
39+
TableInfo tableInfo,
40+
SchemaGetter schemaGetter,
41+
MetadataUpdater metadataUpdater,
42+
LookupClient lookupClient) {
43+
this(tableInfo, schemaGetter, metadataUpdater, lookupClient, null);
3944
}
4045

4146
private TableLookup(
4247
TableInfo tableInfo,
48+
SchemaGetter schemaGetter,
4349
MetadataUpdater metadataUpdater,
4450
LookupClient lookupClient,
4551
@Nullable List<String> lookupColumnNames) {
4652
this.tableInfo = tableInfo;
53+
this.schemaGetter = schemaGetter;
4754
this.metadataUpdater = metadataUpdater;
4855
this.lookupClient = lookupClient;
4956
this.lookupColumnNames = lookupColumnNames;
5057
}
5158

5259
@Override
5360
public Lookup lookupBy(List<String> lookupColumnNames) {
54-
return new TableLookup(tableInfo, metadataUpdater, lookupClient, lookupColumnNames);
61+
return new TableLookup(
62+
tableInfo, schemaGetter, metadataUpdater, lookupClient, lookupColumnNames);
5563
}
5664

5765
@Override
5866
public Lookuper createLookuper() {
5967
if (lookupColumnNames == null) {
60-
return new PrimaryKeyLookuper(tableInfo, metadataUpdater, lookupClient);
68+
return new PrimaryKeyLookuper(tableInfo, schemaGetter, metadataUpdater, lookupClient);
6169
} else {
6270
return new PrefixKeyLookuper(
63-
tableInfo, metadataUpdater, lookupClient, lookupColumnNames);
71+
tableInfo, schemaGetter, metadataUpdater, lookupClient, lookupColumnNames);
6472
}
6573
}
6674
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.metadata;
19+
20+
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.client.admin.Admin;
22+
import org.apache.fluss.metadata.Schema;
23+
import org.apache.fluss.metadata.SchemaGetter;
24+
import org.apache.fluss.metadata.SchemaInfo;
25+
import org.apache.fluss.metadata.TablePath;
26+
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import java.util.Map;
31+
import java.util.concurrent.TimeUnit;
32+
33+
import static org.apache.fluss.utils.MapUtils.newConcurrentHashMap;
34+
35+
/** Schema getter for client. */
36+
@Internal
37+
public class ClientSchemaGetter implements SchemaGetter {
38+
private static final Logger LOG = LoggerFactory.getLogger(ClientSchemaGetter.class);
39+
40+
private final TablePath tablePath;
41+
private final Map<Integer, Schema> schemasById;
42+
private final Admin admin;
43+
private SchemaInfo latestSchemaInfo;
44+
45+
public ClientSchemaGetter(TablePath tablePath, SchemaInfo latestSchemaInfo, Admin admin) {
46+
this.tablePath = tablePath;
47+
this.latestSchemaInfo = latestSchemaInfo;
48+
this.admin = admin;
49+
this.schemasById = newConcurrentHashMap();
50+
schemasById.put(latestSchemaInfo.getSchemaId(), latestSchemaInfo.getSchema());
51+
}
52+
53+
@Override
54+
public Schema getSchema(int schemaId) {
55+
return schemasById.computeIfAbsent(
56+
schemaId,
57+
(id) -> {
58+
try {
59+
SchemaInfo schemaInfo =
60+
admin.getTableSchema(tablePath, schemaId).get(1, TimeUnit.MINUTES);
61+
if (id > latestSchemaInfo.getSchemaId()) {
62+
latestSchemaInfo = schemaInfo;
63+
}
64+
return schemaInfo.getSchema();
65+
66+
} catch (Exception e) {
67+
LOG.warn("Failed to get schema for table: " + tablePath);
68+
throw new RuntimeException(e);
69+
}
70+
});
71+
}
72+
73+
@Override
74+
public SchemaInfo getLatestSchemaInfo() {
75+
return latestSchemaInfo;
76+
}
77+
78+
@Override
79+
public void release() {}
80+
}

0 commit comments

Comments
 (0)