Skip to content

Commit 34b7688

Browse files
[client] Change SchemaNotExistException as retriable exception. (#2193)
1 parent 07aa237 commit 34b7688

File tree

10 files changed

+74
-55
lines changed

10 files changed

+74
-55
lines changed

fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,10 +248,6 @@ public static void waitAllReplicasReady(long tableId, int expectBucketCount) {
248248
}
249249
}
250250

251-
public static void waitAllSchemaSync(TablePath tablePath, int schemaId) {
252-
FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(tablePath, schemaId);
253-
}
254-
255251
protected static void verifyRows(
256252
RowType rowType,
257253
Map<Long, List<InternalRow>> actualRows,

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,6 @@ void testPutAndLookup() throws Exception {
275275
TableChange.ColumnPosition.last())),
276276
false)
277277
.get();
278-
waitAllSchemaSync(tablePath, 2);
279278
Table newSchemaTable = conn.getTable(tableInfo.getTablePath());
280279
// schema change case1: read new data with new schema.
281280
verifyPutAndLookup(newSchemaTable, new Object[] {2, "b", "bb"});
@@ -363,7 +362,6 @@ void testPutAndPrefixLookup() throws Exception {
363362
TableChange.ColumnPosition.last())),
364363
false)
365364
.get();
366-
waitAllSchemaSync(tablePath, 2);
367365
try (Connection connection = ConnectionFactory.createConnection(clientConf);
368366
Table newSchemaTable = connection.getTable(tableInfo.getTablePath())) {
369367
// schema change case1: read new data with new schema.
@@ -1056,7 +1054,6 @@ void testPutAndProjectDuringAddColumn() throws Exception {
10561054
TableChange.ColumnPosition.last())),
10571055
false)
10581056
.get();
1059-
waitAllSchemaSync(tablePath, 2);
10601057
try (Connection connection = ConnectionFactory.createConnection(clientConf);
10611058
Table newSchemaTable = connection.getTable(tablePath)) {
10621059
UpsertWriter oldSchemaUpsertWriter = table.newUpsert().createWriter();

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@ void testScanSnapshotDuringSchemaChange() throws Exception {
139139
TableChange.ColumnPosition.last())),
140140
false)
141141
.get();
142-
FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(tablePath, 2);
143142

144143
Schema newSchema =
145144
Schema.newBuilder()

fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java

Lines changed: 70 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,20 @@
2525
import org.apache.fluss.config.Configuration;
2626
import org.apache.fluss.config.MemorySize;
2727
import org.apache.fluss.exception.TimeoutException;
28+
import org.apache.fluss.metadata.PhysicalTablePath;
2829
import org.apache.fluss.metadata.TableBucket;
30+
import org.apache.fluss.metadata.TableInfo;
31+
import org.apache.fluss.metadata.TablePath;
2932
import org.apache.fluss.record.MemoryLogRecords;
33+
import org.apache.fluss.row.BinaryRow;
3034
import org.apache.fluss.row.GenericRow;
35+
import org.apache.fluss.row.encode.CompactedKeyEncoder;
3136
import org.apache.fluss.rpc.entity.ProduceLogResultForBucket;
37+
import org.apache.fluss.rpc.entity.PutKvResultForBucket;
3238
import org.apache.fluss.rpc.messages.ApiMessage;
3339
import org.apache.fluss.rpc.messages.ProduceLogRequest;
3440
import org.apache.fluss.rpc.messages.ProduceLogResponse;
41+
import org.apache.fluss.rpc.messages.PutKvResponse;
3542
import org.apache.fluss.rpc.protocol.Errors;
3643
import org.apache.fluss.server.tablet.TestTabletServerGateway;
3744
import org.apache.fluss.utils.clock.SystemClock;
@@ -52,11 +59,19 @@
5259

5360
import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
5461
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
62+
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
63+
import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK;
5564
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
65+
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK;
5666
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
67+
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK;
5768
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
69+
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
70+
import static org.apache.fluss.rpc.protocol.Errors.SCHEMA_NOT_EXIST;
5871
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getProduceLogData;
5972
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeProduceLogResponse;
73+
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makePutKvResponse;
74+
import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
6075
import static org.apache.fluss.testutils.DataTestUtils.row;
6176
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
6277
import static org.assertj.core.api.Assertions.assertThat;
@@ -93,7 +108,7 @@ void testSimple() throws Exception {
93108
appendToAccumulator(tb1, row(1, "a"), future::complete);
94109
sender.runOnce();
95110
assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(1);
96-
finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));
111+
finishRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));
97112

98113
sender.runOnce();
99114
assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(0);
@@ -118,7 +133,7 @@ void testRetries() throws Exception {
118133
sender1.runOnce();
119134
assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(1);
120135
long offset = 0;
121-
finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));
136+
finishRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));
122137

123138
sender1.runOnce();
124139
assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(0);
@@ -131,13 +146,13 @@ void testRetries() throws Exception {
131146
assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(1);
132147

133148
// timeout error can retry send.
134-
finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, Errors.REQUEST_TIME_OUT));
149+
finishRequest(tb1, 0, createProduceLogResponse(tb1, Errors.REQUEST_TIME_OUT));
135150
sender1.runOnce();
136151
assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(1);
137152

138153
// Even if timeout error can retry send, but the retry number > maxRetries, which will
139154
// return error.
140-
finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, Errors.REQUEST_TIME_OUT));
155+
finishRequest(tb1, 0, createProduceLogResponse(tb1, Errors.REQUEST_TIME_OUT));
141156
sender1.runOnce();
142157
assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(0);
143158
assertThat(future.get())
@@ -168,12 +183,12 @@ void testCanRetryWithoutIdempotence() throws Exception {
168183
assertThat(firstRequest).isInstanceOf(ProduceLogRequest.class);
169184
assertThat(hasIdempotentRecords(tb1, (ProduceLogRequest) firstRequest)).isFalse();
170185
// first complete with retriable error.
171-
finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, Errors.REQUEST_TIME_OUT));
186+
finishRequest(tb1, 0, createProduceLogResponse(tb1, Errors.REQUEST_TIME_OUT));
172187
sender.runOnce();
173188
assertThat(future.isDone()).isFalse();
174189

175190
// second retry complete.
176-
finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, 0L, 1L));
191+
finishRequest(tb1, 0, createProduceLogResponse(tb1, 0L, 1L));
177192
sender.runOnce();
178193
assertThat(future.isDone()).isTrue();
179194
assertThat(future.get()).isNull();
@@ -690,17 +705,52 @@ void testSendWhenDestinationIsNullInMetadata() throws Exception {
690705
sender.runOnce();
691706
assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(1);
692707

693-
finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));
708+
finishRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));
694709

695710
// send again, should send nothing since no batch in queue
696711
sender.runOnce();
697712
assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(0);
698713
assertThat(future.get()).isNull();
699714
}
700715

716+
@Test
717+
void testRetryPutKeyWithSchemaNotExistException() throws Exception {
718+
TableBucket tableBucket = new TableBucket(DATA1_TABLE_ID_PK, 0);
719+
720+
BinaryRow row = compactedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
721+
int[] pkIndex = DATA1_SCHEMA_PK.getPrimaryKeyIndexes();
722+
byte[] key = new CompactedKeyEncoder(DATA1_ROW_TYPE, pkIndex).encodeKey(row);
723+
CompletableFuture<Exception> future = new CompletableFuture<>();
724+
accumulator.append(
725+
WriteRecord.forUpsert(
726+
DATA1_TABLE_INFO_PK,
727+
PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
728+
row,
729+
key,
730+
key,
731+
WriteFormat.COMPACTED_KV,
732+
null),
733+
future::complete,
734+
metadataUpdater.getCluster(),
735+
0,
736+
false);
737+
sender.runOnce();
738+
finishRequest(tableBucket, 0, createPutKvResponse(tableBucket, SCHEMA_NOT_EXIST));
739+
assertThat(sender.numOfInFlightBatches(tableBucket)).isEqualTo(0);
740+
741+
// retry to put kv request again
742+
sender.runOnce();
743+
assertThat(sender.numOfInFlightBatches(tableBucket)).isEqualTo(1);
744+
finishRequest(tableBucket, 0, createPutKvResponse(tableBucket, 1));
745+
assertThat(sender.numOfInFlightBatches(tableBucket)).isEqualTo(0);
746+
assertThat(future.get()).isNull();
747+
}
748+
701749
private TestingMetadataUpdater initializeMetadataUpdater() {
702-
return new TestingMetadataUpdater(
703-
Collections.singletonMap(DATA1_TABLE_PATH, DATA1_TABLE_INFO));
750+
Map<TablePath, TableInfo> tableInfos = new HashMap<>();
751+
tableInfos.put(DATA1_TABLE_PATH, DATA1_TABLE_INFO);
752+
tableInfos.put(DATA1_TABLE_PATH_PK, DATA1_TABLE_INFO_PK);
753+
return new TestingMetadataUpdater(tableInfos);
704754
}
705755

706756
private void appendToAccumulator(TableBucket tb, GenericRow row, WriteCallback writeCallback)
@@ -721,7 +771,7 @@ private ApiMessage getRequest(TableBucket tb, int index) {
721771
return gateway.getRequest(index);
722772
}
723773

724-
private void finishProduceLogRequest(TableBucket tb, int index, ProduceLogResponse response) {
774+
private void finishRequest(TableBucket tb, int index, ApiMessage response) {
725775
TestTabletServerGateway gateway =
726776
(TestTabletServerGateway)
727777
metadataUpdater.newTabletServerClientForNode(
@@ -762,6 +812,16 @@ private ProduceLogResponse createProduceLogResponse(TableBucket tb, Errors error
762812
Collections.singletonList(new ProduceLogResultForBucket(tb, error.toApiError())));
763813
}
764814

815+
private PutKvResponse createPutKvResponse(TableBucket tb, long endOffset) {
816+
return makePutKvResponse(
817+
Collections.singletonList(new PutKvResultForBucket(tb, endOffset)));
818+
}
819+
820+
private PutKvResponse createPutKvResponse(TableBucket tb, Errors error) {
821+
return makePutKvResponse(
822+
Collections.singletonList(new PutKvResultForBucket(tb, error.toApiError())));
823+
}
824+
765825
private Sender setupWithIdempotenceState() {
766826
return setupWithIdempotenceState(createIdempotenceManager(false));
767827
}

fluss-common/src/main/java/org/apache/fluss/exception/SchemaNotExistException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
* @since 0.1
2626
*/
2727
@PublicEvolving
28-
public class SchemaNotExistException extends ApiException {
28+
public class SchemaNotExistException extends RetriableException {
2929

3030
public SchemaNotExistException(String message, Throwable cause) {
3131
super(message, cause);

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,6 @@ void testAppendLogDuringAddColumn(boolean compressed) throws Exception {
188188
CloseableIterator<Row> rowIter = tEnv.executeSql("select * from sink_test").collect();
189189
// add new column
190190
tEnv.executeSql("alter table sink_test add add_column int").await();
191-
FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(TablePath.of(DEFAULT_DB, "sink_test"), 2);
192191
tEnv.executeSql(
193192
"INSERT INTO sink_test "
194193
+ "VALUES (4, 3504, 'jerry', 4), "
@@ -417,7 +416,6 @@ void testPutDuringAddColumn() throws Exception {
417416
CloseableIterator<Row> rowIter = tEnv.executeSql("select * from sink_test").collect();
418417
// add new column
419418
tEnv.executeSql("alter table sink_test add add_column int").await();
420-
FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(TablePath.of(DEFAULT_DB, "sink_test"), 2);
421419
tEnv.executeSql(
422420
"INSERT INTO sink_test "
423421
+ "VALUES (4, 3504, 'jerry', 4), "
@@ -505,7 +503,6 @@ void testPartialUpsertDuringAddColumn() throws Exception {
505503
CloseableIterator<Row> rowIter = tEnv.executeSql("select * from sink_test").collect();
506504
// add new column
507505
tEnv.executeSql("alter table sink_test add add_column string").await();
508-
FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(TablePath.of(DEFAULT_DB, "sink_test"), 2);
509506
tEnv.executeSql(
510507
"INSERT INTO sink_test(add_column, a ) VALUES ('new_value', 1), ('new_value', 2)")
511508
.await();
@@ -819,7 +816,6 @@ void testDeleteAndUpdateStmtOnPkTable() throws Exception {
819816
tBatchEnv
820817
.executeSql(String.format("alter table %s add new_added_column int", tableName))
821818
.await();
822-
FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(TablePath.of(DEFAULT_DB, tableName), 2);
823819
tBatchEnv
824820
.executeSql("UPDATE " + tableName + " SET new_added_column = 2 WHERE a = 4")
825821
.await();

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,6 @@ void testSchemaChange() throws Exception {
173173
TableChange.ColumnPosition.last())),
174174
false)
175175
.get();
176-
FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(tablePath, 2);
177176

178177
try (Table table = conn.getTable(tablePath)) {
179178
UpsertWriter upsertWriter = table.newUpsert().createWriter();

fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,6 @@ public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] target
326326

327327
private void validateSchemaId(short schemaIdOfNewData, short latestSchemaId) {
328328
if (schemaIdOfNewData > latestSchemaId || schemaIdOfNewData < 0) {
329-
// TODO: we may need to support retriable exception here
330329
throw new SchemaNotExistException(
331330
"Invalid schema id: "
332331
+ schemaIdOfNewData

fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,9 @@ public CompletableFuture<FetchLogResponse> fetchLog(FetchLogRequest request) {
184184

185185
@Override
186186
public CompletableFuture<PutKvResponse> putKv(PutKvRequest request) {
187-
return null;
187+
CompletableFuture<PutKvResponse> response = new CompletableFuture<>();
188+
requests.add(Tuple2.of(request, response));
189+
return response;
188190
}
189191

190192
@Override

fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.fluss.config.MemorySize;
2626
import org.apache.fluss.fs.local.LocalFileSystem;
2727
import org.apache.fluss.metadata.PhysicalTablePath;
28-
import org.apache.fluss.metadata.SchemaInfo;
2928
import org.apache.fluss.metadata.TableBucket;
3029
import org.apache.fluss.metadata.TablePath;
3130
import org.apache.fluss.metrics.registry.MetricRegistry;
@@ -52,7 +51,6 @@
5251
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
5352
import org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandle;
5453
import org.apache.fluss.server.metadata.ServerInfo;
55-
import org.apache.fluss.server.metadata.ServerSchemaCache;
5654
import org.apache.fluss.server.metadata.TabletServerMetadataCache;
5755
import org.apache.fluss.server.replica.Replica;
5856
import org.apache.fluss.server.replica.ReplicaManager;
@@ -66,7 +64,6 @@
6664
import org.apache.fluss.server.zk.data.PartitionAssignment;
6765
import org.apache.fluss.server.zk.data.RemoteLogManifestHandle;
6866
import org.apache.fluss.server.zk.data.TableAssignment;
69-
import org.apache.fluss.server.zk.data.TableRegistration;
7067
import org.apache.fluss.utils.FileUtils;
7168
import org.apache.fluss.utils.clock.Clock;
7269
import org.apache.fluss.utils.clock.SystemClock;
@@ -663,32 +660,6 @@ public void waitUntilAllReplicaReady(TableBucket tableBucket) {
663660
});
664661
}
665662

666-
public void waitAllSchemaSync(TablePath tablePath, int schemaId) {
667-
ZooKeeperClient zkClient = getZooKeeperClient();
668-
retry(
669-
Duration.ofMinutes(1),
670-
() -> {
671-
TableRegistration tableRegistration = zkClient.getTable(tablePath).get();
672-
int bucketCount = tableRegistration.bucketCount;
673-
long tableId = tableRegistration.tableId;
674-
for (int bucketId = 0; bucketId < bucketCount; bucketId++) {
675-
TableBucket tableBucket = new TableBucket(tableId, bucketId);
676-
Optional<LeaderAndIsr> leaderAndIsrOpt =
677-
zkClient.getLeaderAndIsr(tableBucket);
678-
assertThat(leaderAndIsrOpt).isPresent();
679-
int leader = leaderAndIsrOpt.get().leader();
680-
TabletServer tabletServer = getTabletServerById(leader);
681-
ServerSchemaCache serverSchemaCache =
682-
tabletServer.getMetadataCache().getServerSchemaCache();
683-
Map<Long, SchemaInfo> latestSchemaByTablePath =
684-
serverSchemaCache.getLatestSchemaByTableId();
685-
assertThat(latestSchemaByTablePath).containsKey(tableId);
686-
assertThat(latestSchemaByTablePath.get(tableId).getSchemaId())
687-
.isEqualTo(schemaId);
688-
}
689-
});
690-
}
691-
692663
/**
693664
* Wait until some log segments copy to remote. This method can only ensure that there are at
694665
* least one log segment has been copied to remote, but it does not ensure that all log segments

0 commit comments

Comments
 (0)