Skip to content

Commit ef27034

Browse files
committed
properly handle TableNotExistException in metadata operations
1 parent c011f69 commit ef27034

File tree

4 files changed

+39
-1
lines changed

4 files changed

+39
-1
lines changed

fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.fluss.exception.PartitionNotExistException;
3131
import org.apache.fluss.exception.RetriableException;
3232
import org.apache.fluss.exception.StaleMetadataException;
33+
import org.apache.fluss.exception.TableNotExistException;
3334
import org.apache.fluss.metadata.PhysicalTablePath;
3435
import org.apache.fluss.metadata.TableBucket;
3536
import org.apache.fluss.metadata.TablePartition;
@@ -287,6 +288,9 @@ public void updateMetadata(
287288
} else if (t instanceof PartitionNotExistException) {
288289
LOG.warn("Failed to update metadata because the partition does not exist", t);
289290
throw (PartitionNotExistException) t;
291+
} else if (t instanceof TableNotExistException) {
292+
LOG.warn("Failed to update metadata because the table does not exist", t);
293+
throw (TableNotExistException) t;
290294
} else {
291295
throw new FlussRuntimeException("Failed to update metadata", t);
292296
}

fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.fluss.cluster.ServerType;
2323
import org.apache.fluss.config.Configuration;
2424
import org.apache.fluss.exception.StaleMetadataException;
25+
import org.apache.fluss.exception.TableNotExistException;
2526
import org.apache.fluss.rpc.RpcClient;
2627
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
2728
import org.apache.fluss.rpc.messages.MetadataRequest;
@@ -70,6 +71,24 @@ void testInitializeClusterWithRetries() throws Exception {
7071
.hasMessageContaining("The metadata is stale.");
7172
}
7273

74+
@Test
75+
void testTableNotExistExceptionIsNotWrapped() throws Exception {
76+
Configuration configuration = new Configuration();
77+
RpcClient rpcClient =
78+
RpcClient.create(configuration, TestingClientMetricGroup.newInstance(), false);
79+
80+
// Gateway that throws TableNotExistException on metadata request
81+
AdminReadOnlyGateway gateway = new TestingTableNotExistGateway();
82+
83+
// TableNotExistException should propagate directly without being wrapped
84+
assertThatThrownBy(
85+
() ->
86+
MetadataUpdater.tryToInitializeClusterWithRetries(
87+
rpcClient, CS_NODE, gateway, 3))
88+
.isInstanceOf(TableNotExistException.class)
89+
.hasMessageContaining("test_table");
90+
}
91+
7392
private static final class TestingAdminReadOnlyGateway extends TestCoordinatorGateway {
7493

7594
private final int maxRetryCount;
@@ -95,4 +114,16 @@ public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) {
95114
}
96115
}
97116
}
117+
118+
/**
119+
* A testing gateway that throws TableNotExistException when metadata is requested. Used to
120+
* verify that TableNotExistException is propagated without being wrapped.
121+
*/
122+
private static final class TestingTableNotExistGateway extends TestCoordinatorGateway {
123+
124+
@Override
125+
public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) {
126+
throw new TableNotExistException("Table 'test_table' does not exist.");
127+
}
128+
}
98129
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.fluss.client.table.scanner.ScanRecord;
2424
import org.apache.fluss.client.table.scanner.log.LogScanner;
2525
import org.apache.fluss.client.table.scanner.log.ScanRecords;
26+
import org.apache.fluss.exception.TableNotExistException;
2627
import org.apache.fluss.flink.source.reader.BoundedSplitReader;
2728
import org.apache.fluss.flink.source.reader.RecordAndPos;
2829
import org.apache.fluss.flink.tiering.source.split.TieringLogSplit;
@@ -176,7 +177,7 @@ public RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> fetch() throws I
176177
ScanRecords scanRecords;
177178
try {
178179
scanRecords = currentLogScanner.poll(pollTimeout);
179-
} catch (Exception e) {
180+
} catch (TableNotExistException e) {
180181
// When a table is actually dropped, the log scanner's poll may fail
181182
// because metadata update discovers the table no longer exists.
182183
if (droppedTables.contains(currentTableId)) {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -701,6 +701,8 @@ public Map<TablePath, TableInfo> getTables(Collection<TablePath> tablePaths)
701701
.getLakeCatalogContainer()
702702
.getDefaultTableLakeOptions()));
703703
}
704+
} catch (TableNotExistException e) {
705+
throw e;
704706
} catch (Exception e) {
705707
throw new FlussRuntimeException(
706708
String.format("Failed to get tables '%s'.", tablePaths), e);

0 commit comments

Comments
 (0)