Skip to content

Commit 61c4ece

Browse files
authored
[client] Add unavailable tabletServer set for MetadataUpdater to avoid serverNode info of all tabletServers are invalid (#2098)
1 parent 7dd9cc1 commit 61c4ece

File tree

6 files changed

+84
-28
lines changed

6 files changed

+84
-28
lines changed

fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.fluss.client.token.SecurityTokenManager;
3030
import org.apache.fluss.client.token.SecurityTokenProvider;
3131
import org.apache.fluss.client.write.WriterClient;
32+
import org.apache.fluss.cluster.ServerNode;
3233
import org.apache.fluss.config.ConfigOptions;
3334
import org.apache.fluss.config.Configuration;
3435
import org.apache.fluss.exception.FlussRuntimeException;
@@ -42,6 +43,7 @@
4243

4344
import java.time.Duration;
4445
import java.util.HashMap;
46+
import java.util.HashSet;
4547
import java.util.List;
4648

4749
import static org.apache.fluss.client.utils.MetadataUtils.getOneAvailableTabletServerNode;
@@ -162,9 +164,17 @@ public RemoteFileDownloader getOrCreateRemoteFileDownloader() {
162164
// todo: may add retry logic when no any available tablet server?
163165
AdminReadOnlyGateway gateway =
164166
GatewayClientProxy.createGatewayProxy(
165-
() ->
166-
getOneAvailableTabletServerNode(
167-
metadataUpdater.getCluster()),
167+
() -> {
168+
ServerNode serverNode =
169+
getOneAvailableTabletServerNode(
170+
metadataUpdater.getCluster(),
171+
new HashSet<>());
172+
if (serverNode == null) {
173+
throw new FlussRuntimeException(
174+
"no available tablet server");
175+
}
176+
return serverNode;
177+
},
168178
rpcClient,
169179
AdminReadOnlyGateway.class);
170180
SecurityTokenProvider securityTokenProvider =

fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,14 @@
5050
import java.util.Collections;
5151
import java.util.HashSet;
5252
import java.util.List;
53+
import java.util.Map;
5354
import java.util.Optional;
5455
import java.util.Set;
56+
import java.util.concurrent.CopyOnWriteArraySet;
5557
import java.util.concurrent.TimeoutException;
5658
import java.util.stream.Collectors;
5759

60+
import static org.apache.fluss.client.utils.MetadataUtils.getOneAvailableTabletServerNode;
5861
import static org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
5962
import static org.apache.fluss.utils.ExceptionUtils.stripExecutionException;
6063

@@ -65,16 +68,19 @@ public class MetadataUpdater {
6568
private static final int MAX_RETRY_TIMES = 3;
6669
private static final int RETRY_INTERVAL_MS = 100;
6770

71+
private final Configuration conf;
6872
private final RpcClient rpcClient;
73+
private final Set<Integer> unavailableTabletServerIds = new CopyOnWriteArraySet<>();
6974
protected volatile Cluster cluster;
7075

71-
public MetadataUpdater(Configuration configuration, RpcClient rpcClient) {
72-
this(rpcClient, initializeCluster(configuration, rpcClient));
76+
public MetadataUpdater(Configuration conf, RpcClient rpcClient) {
77+
this(rpcClient, conf, initializeCluster(conf, rpcClient));
7378
}
7479

7580
@VisibleForTesting
76-
public MetadataUpdater(RpcClient rpcClient, Cluster cluster) {
81+
public MetadataUpdater(RpcClient rpcClient, Configuration conf, Cluster cluster) {
7782
this.rpcClient = rpcClient;
83+
this.conf = conf;
7884
this.cluster = cluster;
7985
}
8086

@@ -240,19 +246,43 @@ public void updateMetadata(
240246
@Nullable Collection<PhysicalTablePath> tablePartitionNames,
241247
@Nullable Collection<Long> tablePartitionIds)
242248
throws PartitionNotExistException {
249+
ServerNode serverNode =
250+
getOneAvailableTabletServerNode(cluster, unavailableTabletServerIds);
243251
try {
244252
synchronized (this) {
245-
cluster =
246-
sendMetadataRequestAndRebuildCluster(
247-
cluster,
248-
rpcClient,
249-
tablePaths,
250-
tablePartitionNames,
251-
tablePartitionIds);
253+
if (serverNode == null) {
254+
LOG.info(
255+
"No available tablet server to update metadata, try to re-initialize cluster using bootstrap server.");
256+
cluster = initializeCluster(conf, rpcClient);
257+
} else {
258+
cluster =
259+
sendMetadataRequestAndRebuildCluster(
260+
cluster,
261+
rpcClient,
262+
tablePaths,
263+
tablePartitionNames,
264+
tablePartitionIds,
265+
serverNode);
266+
}
267+
}
268+
269+
Map<Integer, ServerNode> aliveTabletServers = cluster.getAliveTabletServers();
270+
unavailableTabletServerIds.removeIf(aliveTabletServers::containsKey);
271+
if (!unavailableTabletServerIds.isEmpty()) {
272+
LOG.info(
273+
"After update metadata, unavailable tabletServer set: {}",
274+
unavailableTabletServerIds);
252275
}
253276
} catch (Exception e) {
254277
Throwable t = stripExecutionException(e);
255278
if (t instanceof RetriableException || t instanceof TimeoutException) {
279+
if (serverNode != null) {
280+
unavailableTabletServerIds.add(serverNode.id());
281+
LOG.warn(
282+
"tabletServer {} is unavailable for updating metadata for retriable exception. unavailable tabletServer set {}",
283+
serverNode,
284+
unavailableTabletServerIds);
285+
}
256286
LOG.warn("Failed to update metadata, but the exception is re-triable.", t);
257287
} else if (t instanceof PartitionNotExistException) {
258288
LOG.warn("Failed to update metadata because the partition does not exist", t);

fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.fluss.cluster.Cluster;
2222
import org.apache.fluss.cluster.ServerNode;
2323
import org.apache.fluss.cluster.ServerType;
24-
import org.apache.fluss.exception.FlussRuntimeException;
2524
import org.apache.fluss.exception.StaleMetadataException;
2625
import org.apache.fluss.metadata.PhysicalTablePath;
2726
import org.apache.fluss.metadata.TableBucket;
@@ -39,6 +38,9 @@
3938
import org.apache.fluss.rpc.messages.PbTableMetadata;
4039
import org.apache.fluss.rpc.messages.PbTablePath;
4140

41+
import org.slf4j.Logger;
42+
import org.slf4j.LoggerFactory;
43+
4244
import javax.annotation.Nullable;
4345

4446
import java.util.ArrayList;
@@ -55,6 +57,8 @@
5557
/** Utils for metadata for client. */
5658
public class MetadataUtils {
5759

60+
private static final Logger LOG = LoggerFactory.getLogger(MetadataUtils.class);
61+
5862
private static final Random randOffset = new Random();
5963

6064
/**
@@ -79,13 +83,12 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
7983
RpcClient client,
8084
@Nullable Set<TablePath> tablePaths,
8185
@Nullable Collection<PhysicalTablePath> tablePartitionNames,
82-
@Nullable Collection<Long> tablePartitionIds)
86+
@Nullable Collection<Long> tablePartitionIds,
87+
ServerNode serverNode)
8388
throws ExecutionException, InterruptedException, TimeoutException {
8489
AdminReadOnlyGateway gateway =
8590
GatewayClientProxy.createGatewayProxy(
86-
() -> getOneAvailableTabletServerNode(cluster),
87-
client,
88-
AdminReadOnlyGateway.class);
91+
() -> serverNode, client, AdminReadOnlyGateway.class);
8992
return sendMetadataRequestAndRebuildCluster(
9093
gateway, true, cluster, tablePaths, tablePartitionNames, tablePartitionIds);
9194
}
@@ -253,10 +256,16 @@ public NewTableMetadata(
253256
}
254257
}
255258

256-
public static ServerNode getOneAvailableTabletServerNode(Cluster cluster) {
257-
List<ServerNode> aliveTabletServers = cluster.getAliveTabletServerList();
259+
public static @Nullable ServerNode getOneAvailableTabletServerNode(
260+
Cluster cluster, Set<Integer> unavailableTabletServerIds) {
261+
List<ServerNode> aliveTabletServers = new ArrayList<>(cluster.getAliveTabletServerList());
262+
if (!unavailableTabletServerIds.isEmpty()) {
263+
aliveTabletServers.removeIf(
264+
serverNode -> unavailableTabletServerIds.contains(serverNode.id()));
265+
}
266+
258267
if (aliveTabletServers.isEmpty()) {
259-
throw new FlussRuntimeException("no alive tablet server in cluster");
268+
return null;
260269
}
261270
// just pick one random server node
262271
int offset = randOffset.nextInt(aliveTabletServers.size());

fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ void testUpdateWithEmptyMetadataResponse() throws Exception {
111111
Collections.emptyMap(),
112112
Collections.emptyMap());
113113

114-
metadataUpdater = new MetadataUpdater(rpcClient, newCluster);
114+
metadataUpdater = new MetadataUpdater(rpcClient, new Configuration(), newCluster);
115115
// shouldn't update metadata to empty since the empty metadata will be ignored
116116
metadataUpdater.updateMetadata(null, null, null);
117117
assertThat(metadataUpdater.getCluster().getAliveTabletServers())

fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,23 @@ public class TestingMetadataUpdater extends MetadataUpdater {
5555
private final Map<Integer, TestTabletServerGateway> tabletServerGatewayMap;
5656

5757
public TestingMetadataUpdater(Map<TablePath, TableInfo> tableInfos) {
58-
this(COORDINATOR, Arrays.asList(NODE1, NODE2, NODE3), tableInfos, null);
58+
this(
59+
COORDINATOR,
60+
Arrays.asList(NODE1, NODE2, NODE3),
61+
tableInfos,
62+
null,
63+
new Configuration());
5964
}
6065

6166
private TestingMetadataUpdater(
6267
ServerNode coordinatorServer,
6368
List<ServerNode> tabletServers,
6469
Map<TablePath, TableInfo> tableInfos,
65-
Map<Integer, TestTabletServerGateway> customGateways) {
70+
Map<Integer, TestTabletServerGateway> customGateways,
71+
Configuration conf) {
6672
super(
67-
RpcClient.create(
68-
new Configuration(), TestingClientMetricGroup.newInstance(), false),
73+
RpcClient.create(conf, TestingClientMetricGroup.newInstance(), false),
74+
conf,
6975
Cluster.empty());
7076
initializeCluster(coordinatorServer, tabletServers, tableInfos);
7177
coordinatorGateway = new TestCoordinatorGateway();
@@ -122,7 +128,8 @@ public TestingMetadataUpdater build() {
122128
COORDINATOR,
123129
Arrays.asList(NODE1, NODE2, NODE3),
124130
tableInfos,
125-
customGateways.isEmpty() ? null : customGateways);
131+
customGateways.isEmpty() ? null : customGateways,
132+
new Configuration());
126133
}
127134
}
128135

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ void testFetchWhenDestinationIsNullInMetadata() throws Exception {
262262
oldCluster.getTableIdByPath(),
263263
oldCluster.getPartitionIdByPath(),
264264
oldCluster.getTableInfoByPath());
265-
metadataUpdater = new MetadataUpdater(rpcClient, newCluster);
265+
metadataUpdater = new MetadataUpdater(rpcClient, clientConf, newCluster);
266266

267267
LogScannerStatus logScannerStatus = new LogScannerStatus();
268268
logScannerStatus.assignScanBuckets(Collections.singletonMap(tb0, 0L));

0 commit comments

Comments
 (0)