Skip to content

Commit 40191ed

Browse files
authored
[Fix](catalog)Resources should be closed when dropping a Catalog. (#59512)
### What problem does this PR solve? When creating an AWS SDK V2 async client (e.g. S3AsyncClient), the SDK requires a thread pool to manage asynchronous task scheduling, timeouts, and retries (e.g. ScheduledExecutorService or async executor). If no executor is explicitly provided, the AWS SDK creates its own internal thread pool, which is expected to be shut down when client.close() is invoked. ### Issue In Doris, when using Paimon Catalog, some catalog implementations provide an empty close() method. As a result, when a user executes DROP CATALOG: starting with Hadoop 3.4, the AWS SDK was upgraded to v2. Since #57226 upgraded HDFS to 3.4.2, the catalog runs into this issue. Some Catalog instance is discarded,AWS SDK client.close() is never called,The internally created thread pool cannot be shut down. This leads to a thread leak FYI aws/aws-sdk-java-v2#3746 ### Problem Analysis The lifecycle of Catalog creation and destruction is complex and managed internally by Paimon Doris cannot reliably intervene in the call chain to enforce AWS SDK client.close() If each Catalog creates its own AWS SDK client with an internally managed thread pool, threads will continue to leak as Catalogs are repeatedly created and dropped ### Solution This PR resolves the issue by introducing a shared executor strategy: A Doris-managed shared thread pool is explicitly passed when creating AWS SDK V2 clients This prevents the AWS SDK from implicitly creating per-client internal thread pools The lifecycle of the executor is fully controlled by Doris and no longer depends on Paimon Catalog’s close() implementation With this approach, even if a Paimon Catalog’s close() method is a no-op, the system will no longer leak threads. None
1 parent 9a58d8e commit 40191ed

File tree

11 files changed

+951
-4
lines changed

11 files changed

+951
-4
lines changed

.licenserc.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ header:
4444
- "**/*.parquet"
4545
- "docs/.markdownlintignore"
4646
- "fe/fe-core/src/test/resources/data/net_snmp_normal"
47+
- "fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java"
4748
- "fe/fe-core/src/main/antlr4/org/apache/doris/nereids/JavaLexer.g4"
4849
- "fe/fe-core/src/main/antlr4/org/apache/doris/nereids/JavaParser.g4"
4950
- "be/dict/ik/*"

LICENSE.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,13 @@
202202

203203
--------------------------------------------------------------------------------
204204

205+
The following components are provided under the Apache License. See project link for details.
206+
The text of each license is the standard Apache 2.0 license.
207+
208+
software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder from AWS SDK v2 (sdk-core 2.29.52)
209+
210+
--------------------------------------------------------------------------------
211+
205212
be/src/common/status.* : BSD-style license
206213

207214
Copyright (c) 2011 The LevelDB Authors. All rights reserved.

NOTICE.txt

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,33 @@ its NOTICE file:
5252
This product includes cryptographic software written by Eric Young
5353
(eay@cryptsoft.com). This product includes software written by Tim
5454
Hudson (tjh@cryptsoft.com).
55+
56+
--------------------------------------------------------------------------------
57+
This product includes code from AWS SDK, which includes the following in
58+
its NOTICE file:
59+
60+
AWS SDK for Java 2.0
61+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
62+
63+
This product includes software developed by
64+
Amazon Technologies, Inc (http://www.amazon.com/).
65+
66+
**********************
67+
THIRD PARTY COMPONENTS
68+
**********************
69+
This software includes third party software subject to the following copyrights:
70+
- XML parsing and utility functions from JetS3t - Copyright 2006-2009 James Murty.
71+
- PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
72+
- Apache Commons Lang - https://github.com/apache/commons-lang
73+
- Netty Reactive Streams - https://github.com/playframework/netty-reactive-streams
74+
- Jackson-core - https://github.com/FasterXML/jackson-core
75+
- Jackson-dataformat-cbor - https://github.com/FasterXML/jackson-dataformats-binary
76+
77+
The licenses for these third party components are included in LICENSE.txt
78+
79+
- For Apache Commons Lang see also this required NOTICE:
80+
Apache Commons Lang
81+
Copyright 2001-2020 The Apache Software Foundation
82+
83+
This product includes software developed at
84+
The Apache Software Foundation (https://www.apache.org/).

fe/check/checkstyle/suppressions.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,6 @@ under the License.
7474

7575
<!-- ignore gensrc/thrift/ExternalTableSchema.thrift -->
7676
<suppress files=".*thrift/schema/external/.*" checks=".*"/>
77+
<suppress files="software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java" checks=".*"/>
78+
<suppress files="software/amazon/awssdk/core/client/builder/UncloseableScheduledExecutorService.java" checks=".*"/>
7779
</suppressions>

fe/fe-common/src/main/java/org/apache/doris/common/Config.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3771,6 +3771,13 @@ public static int metaServiceRpcRetryTimes() {
37713771
@ConfField(mutable = true)
37723772
public static String aws_credentials_provider_version = "v2";
37733773

3774+
@ConfField(description = {
3775+
"AWS SDK 用于调度异步重试、超时任务以及其他后台操作的线程池大小,全局共享",
3776+
"The thread pool size used by the AWS SDK to schedule asynchronous retries, timeout tasks, "
3777+
+ "and other background operations, shared globally"
3778+
})
3779+
public static int aws_sdk_async_scheduler_thread_pool_size = 20;
3780+
37743781
@ConfField(description = {
37753782
"agent tasks 健康检查的时间间隔,默认五分钟,小于等于 0 时不做健康检查",
37763783
"agent tasks health check interval, default is five minutes, no health check when less than or equal to 0"
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.common.util;
19+
20+
import java.util.Collection;
21+
import java.util.Collections;
22+
import java.util.List;
23+
import java.util.Objects;
24+
import java.util.concurrent.Callable;
25+
import java.util.concurrent.ExecutionException;
26+
import java.util.concurrent.Future;
27+
import java.util.concurrent.ScheduledExecutorService;
28+
import java.util.concurrent.ScheduledFuture;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.TimeoutException;
31+
32+
public final class UncloseableScheduledExecutorService
33+
implements ScheduledExecutorService {
34+
35+
private final ScheduledExecutorService delegate;
36+
37+
public UncloseableScheduledExecutorService(
38+
ScheduledExecutorService delegate) {
39+
this.delegate = Objects.requireNonNull(delegate);
40+
}
41+
42+
// ================= Lifecycle methods (NO-OP) =================
43+
@Override
44+
public void shutdown() {
45+
// NO-OP
46+
}
47+
48+
@Override
49+
public List<Runnable> shutdownNow() {
50+
return Collections.emptyList();
51+
}
52+
53+
@Override
54+
public boolean isShutdown() {
55+
return false;
56+
}
57+
58+
@Override
59+
public boolean isTerminated() {
60+
return false;
61+
}
62+
63+
@Override
64+
public boolean awaitTermination(long timeout, TimeUnit unit) {
65+
return false;
66+
}
67+
68+
// ================= Scheduled methods =================
69+
@Override
70+
public ScheduledFuture<?> schedule(
71+
Runnable command, long delay, TimeUnit unit) {
72+
return delegate.schedule(command, delay, unit);
73+
}
74+
75+
@Override
76+
public <V> ScheduledFuture<V> schedule(
77+
Callable<V> callable, long delay, TimeUnit unit) {
78+
return delegate.schedule(callable, delay, unit);
79+
}
80+
81+
@Override
82+
public ScheduledFuture<?> scheduleAtFixedRate(
83+
Runnable command,
84+
long initialDelay,
85+
long period,
86+
TimeUnit unit) {
87+
return delegate.scheduleAtFixedRate(
88+
command, initialDelay, period, unit);
89+
}
90+
91+
@Override
92+
public ScheduledFuture<?> scheduleWithFixedDelay(
93+
Runnable command,
94+
long initialDelay,
95+
long delay,
96+
TimeUnit unit) {
97+
return delegate.scheduleWithFixedDelay(
98+
command, initialDelay, delay, unit);
99+
}
100+
101+
// ================= Executor methods =================
102+
@Override
103+
public void execute(Runnable command) {
104+
delegate.execute(command);
105+
}
106+
107+
@Override
108+
public <T> Future<T> submit(Callable<T> task) {
109+
return delegate.submit(task);
110+
}
111+
112+
@Override
113+
public <T> Future<T> submit(Runnable task, T result) {
114+
return delegate.submit(task, result);
115+
}
116+
117+
@Override
118+
public Future<?> submit(Runnable task) {
119+
return delegate.submit(task);
120+
}
121+
122+
@Override
123+
public <T> List<Future<T>> invokeAll(
124+
Collection<? extends Callable<T>> tasks)
125+
throws InterruptedException {
126+
return delegate.invokeAll(tasks);
127+
}
128+
129+
@Override
130+
public <T> List<Future<T>> invokeAll(
131+
Collection<? extends Callable<T>> tasks,
132+
long timeout,
133+
TimeUnit unit)
134+
throws InterruptedException {
135+
return delegate.invokeAll(tasks, timeout, unit);
136+
}
137+
138+
@Override
139+
public <T> T invokeAny(
140+
Collection<? extends Callable<T>> tasks)
141+
throws InterruptedException, ExecutionException {
142+
return delegate.invokeAny(tasks);
143+
}
144+
145+
@Override
146+
public <T> T invokeAny(
147+
Collection<? extends Callable<T>> tasks,
148+
long timeout,
149+
TimeUnit unit)
150+
throws InterruptedException, ExecutionException, TimeoutException {
151+
return delegate.invokeAny(tasks, timeout, unit);
152+
}
153+
}

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,16 @@
3434

3535
import org.apache.commons.lang3.math.NumberUtils;
3636
import org.apache.iceberg.catalog.Catalog;
37+
import org.apache.logging.log4j.LogManager;
38+
import org.apache.logging.log4j.Logger;
3739

3840
import java.util.List;
3941
import java.util.Map;
4042
import java.util.Objects;
4143

4244
public abstract class IcebergExternalCatalog extends ExternalCatalog {
4345

46+
private static final Logger LOG = LogManager.getLogger(IcebergExternalCatalog.class);
4447
public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type";
4548
public static final String ICEBERG_REST = "rest";
4649
public static final String ICEBERG_HMS = "hms";
@@ -210,7 +213,14 @@ public List<String> listTableNames(SessionContext ctx, String dbName) {
210213
public void onClose() {
211214
super.onClose();
212215
if (null != catalog) {
213-
catalog = null;
216+
try {
217+
if (catalog instanceof AutoCloseable) {
218+
((AutoCloseable) catalog).close();
219+
}
220+
catalog = null;
221+
} catch (Exception e) {
222+
LOG.warn("Failed to close iceberg catalog: {}", getName(), e);
223+
}
214224
}
215225
}
216226

fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,4 +190,16 @@ public void checkProperties() throws DdlException {
190190
super.checkProperties();
191191
catalogProperty.checkMetaStoreAndStorageProperties(AbstractPaimonProperties.class);
192192
}
193+
194+
@Override
195+
public void onClose() {
196+
super.onClose();
197+
if (null != catalog) {
198+
try {
199+
catalog.close();
200+
} catch (Exception e) {
201+
LOG.warn("Failed to close paimon catalog: {}", getName(), e);
202+
}
203+
}
204+
}
193205
}

fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ protected AbstractIcebergProperties(Map<String, String> props) {
111111
* This field is used to perform metadata operations like creating, querying,
112112
* and deleting Iceberg tables.
113113
*/
114-
public final Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList) {
114+
public final Catalog initializeCatalog(String catalogName,
115+
List<StorageProperties> storagePropertiesList) {
115116
Map<String, String> catalogProps = new HashMap<>(getOrigProps());
116117
if (StringUtils.isNotBlank(warehouse)) {
117118
catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);

0 commit comments

Comments
 (0)