Skip to content

Commit 68e5cbe

Browse files
committed
branch-4.0: [Fix](catalog)Resources should be closed when dropping a Catalog.
apache#59512 apache#59063
1 parent bcb9b08 commit 68e5cbe

File tree

13 files changed

+950
-14
lines changed

13 files changed

+950
-14
lines changed

.licenserc.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ header:
4343
- "**/*.parquet"
4444
- "docs/.markdownlintignore"
4545
- "fe/fe-core/src/test/resources/data/net_snmp_normal"
46+
- "fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java"
4647
- "fe/fe-core/src/main/antlr4/org/apache/doris/nereids/JavaLexer.g4"
4748
- "fe/fe-core/src/main/antlr4/org/apache/doris/nereids/JavaParser.g4"
4849
- "be/dict/ik/*"

LICENSE.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,13 @@
202202

203203
--------------------------------------------------------------------------------
204204

205+
The following components are provided under the Apache License. See project link for details.
206+
The text of each license is the standard Apache 2.0 license.
207+
208+
software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder from AWS SDK v2 (sdk-core 2.29.52)
209+
210+
--------------------------------------------------------------------------------
211+
205212
be/src/common/status.* : BSD-style license
206213

207214
Copyright (c) 2011 The LevelDB Authors. All rights reserved.

NOTICE.txt

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,33 @@ its NOTICE file:
5252
This product includes cryptographic software written by Eric Young
5353
(eay@cryptsoft.com). This product includes software written by Tim
5454
Hudson (tjh@cryptsoft.com).
55+
56+
--------------------------------------------------------------------------------
57+
This product includes code from AWS SDK, which includes the following in
58+
its NOTICE file:
59+
60+
AWS SDK for Java 2.0
61+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
62+
63+
This product includes software developed by
64+
Amazon Technologies, Inc (http://www.amazon.com/).
65+
66+
**********************
67+
THIRD PARTY COMPONENTS
68+
**********************
69+
This software includes third party software subject to the following copyrights:
70+
- XML parsing and utility functions from JetS3t - Copyright 2006-2009 James Murty.
71+
- PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
72+
- Apache Commons Lang - https://github.com/apache/commons-lang
73+
- Netty Reactive Streams - https://github.com/playframework/netty-reactive-streams
74+
- Jackson-core - https://github.com/FasterXML/jackson-core
75+
- Jackson-dataformat-cbor - https://github.com/FasterXML/jackson-dataformats-binary
76+
77+
The licenses for these third party components are included in LICENSE.txt
78+
79+
- For Apache Commons Lang see also this required NOTICE:
80+
Apache Commons Lang
81+
Copyright 2001-2020 The Apache Software Foundation
82+
83+
This product includes software developed at
84+
The Apache Software Foundation (https://www.apache.org/).

fe/check/checkstyle/suppressions.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,5 @@ under the License.
7474

7575
<!-- ignore gensrc/thrift/ExternalTableSchema.thrift -->
7676
<suppress files=".*thrift/schema/external/.*" checks=".*"/>
77+
<suppress files="software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java" checks=".*"/>
7778
</suppressions>

fe/fe-common/src/main/java/org/apache/doris/common/Config.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3729,6 +3729,13 @@ public static int metaServiceRpcRetryTimes() {
37293729
@ConfField(mutable = true)
37303730
public static String aws_credentials_provider_version = "v2";
37313731

3732+
@ConfField(description = {
3733+
"AWS SDK 用于调度异步重试、超时任务以及其他后台操作的线程池大小,全局共享",
3734+
"The thread pool size used by the AWS SDK to schedule asynchronous retries, timeout tasks, "
3735+
+ "and other background operations, shared globally"
3736+
})
3737+
public static int aws_sdk_async_scheduler_thread_pool_size = 20;
3738+
37323739
@ConfField(description = {
37333740
"agent tasks 健康检查的时间间隔,默认五分钟,小于等于0时不做健康检查",
37343741
"agent tasks health check interval, default is five minutes, no health check when less than or equal to 0"

fe/fe-core/pom.xml

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -636,14 +636,6 @@ under the License.
636636
</exclusions>
637637
</dependency>
638638

639-
<!-- for fe recognize files stored on gcs -->
640-
<dependency>
641-
<groupId>com.google.cloud.bigdataoss</groupId>
642-
<artifactId>gcs-connector</artifactId>
643-
<version>${gcs.version}</version>
644-
<scope>${gcs.dependency.scope}</scope>
645-
</dependency>
646-
647639
<!-- https://mvnrepository.com/artifact/org.apache.ranger/ranger-plugins-common -->
648640
<dependency>
649641
<groupId>org.apache.ranger</groupId>
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.common.util;
19+
20+
import java.util.Collection;
21+
import java.util.Collections;
22+
import java.util.List;
23+
import java.util.Objects;
24+
import java.util.concurrent.Callable;
25+
import java.util.concurrent.ExecutionException;
26+
import java.util.concurrent.Future;
27+
import java.util.concurrent.ScheduledExecutorService;
28+
import java.util.concurrent.ScheduledFuture;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.TimeoutException;
31+
32+
public final class UncloseableScheduledExecutorService
33+
implements ScheduledExecutorService {
34+
35+
private final ScheduledExecutorService delegate;
36+
37+
public UncloseableScheduledExecutorService(
38+
ScheduledExecutorService delegate) {
39+
this.delegate = Objects.requireNonNull(delegate);
40+
}
41+
42+
// ================= Lifecycle methods (NO-OP) =================
43+
@Override
44+
public void shutdown() {
45+
// NO-OP
46+
}
47+
48+
@Override
49+
public List<Runnable> shutdownNow() {
50+
return Collections.emptyList();
51+
}
52+
53+
@Override
54+
public boolean isShutdown() {
55+
return false;
56+
}
57+
58+
@Override
59+
public boolean isTerminated() {
60+
return false;
61+
}
62+
63+
@Override
64+
public boolean awaitTermination(long timeout, TimeUnit unit) {
65+
return false;
66+
}
67+
68+
// ================= Scheduled methods =================
69+
@Override
70+
public ScheduledFuture<?> schedule(
71+
Runnable command, long delay, TimeUnit unit) {
72+
return delegate.schedule(command, delay, unit);
73+
}
74+
75+
@Override
76+
public <V> ScheduledFuture<V> schedule(
77+
Callable<V> callable, long delay, TimeUnit unit) {
78+
return delegate.schedule(callable, delay, unit);
79+
}
80+
81+
@Override
82+
public ScheduledFuture<?> scheduleAtFixedRate(
83+
Runnable command,
84+
long initialDelay,
85+
long period,
86+
TimeUnit unit) {
87+
return delegate.scheduleAtFixedRate(
88+
command, initialDelay, period, unit);
89+
}
90+
91+
@Override
92+
public ScheduledFuture<?> scheduleWithFixedDelay(
93+
Runnable command,
94+
long initialDelay,
95+
long delay,
96+
TimeUnit unit) {
97+
return delegate.scheduleWithFixedDelay(
98+
command, initialDelay, delay, unit);
99+
}
100+
101+
// ================= Executor methods =================
102+
@Override
103+
public void execute(Runnable command) {
104+
delegate.execute(command);
105+
}
106+
107+
@Override
108+
public <T> Future<T> submit(Callable<T> task) {
109+
return delegate.submit(task);
110+
}
111+
112+
@Override
113+
public <T> Future<T> submit(Runnable task, T result) {
114+
return delegate.submit(task, result);
115+
}
116+
117+
@Override
118+
public Future<?> submit(Runnable task) {
119+
return delegate.submit(task);
120+
}
121+
122+
@Override
123+
public <T> List<Future<T>> invokeAll(
124+
Collection<? extends Callable<T>> tasks)
125+
throws InterruptedException {
126+
return delegate.invokeAll(tasks);
127+
}
128+
129+
@Override
130+
public <T> List<Future<T>> invokeAll(
131+
Collection<? extends Callable<T>> tasks,
132+
long timeout,
133+
TimeUnit unit)
134+
throws InterruptedException {
135+
return delegate.invokeAll(tasks, timeout, unit);
136+
}
137+
138+
@Override
139+
public <T> T invokeAny(
140+
Collection<? extends Callable<T>> tasks)
141+
throws InterruptedException, ExecutionException {
142+
return delegate.invokeAny(tasks);
143+
}
144+
145+
@Override
146+
public <T> T invokeAny(
147+
Collection<? extends Callable<T>> tasks,
148+
long timeout,
149+
TimeUnit unit)
150+
throws InterruptedException, ExecutionException, TimeoutException {
151+
return delegate.invokeAny(tasks, timeout, unit);
152+
}
153+
}

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,16 @@
3434

3535
import org.apache.commons.lang3.math.NumberUtils;
3636
import org.apache.iceberg.catalog.Catalog;
37+
import org.apache.logging.log4j.LogManager;
38+
import org.apache.logging.log4j.Logger;
3739

3840
import java.util.List;
3941
import java.util.Map;
4042
import java.util.Objects;
4143

4244
public abstract class IcebergExternalCatalog extends ExternalCatalog {
4345

46+
private static final Logger LOG = LogManager.getLogger(IcebergExternalCatalog.class);
4447
public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type";
4548
public static final String ICEBERG_REST = "rest";
4649
public static final String ICEBERG_HMS = "hms";
@@ -209,7 +212,14 @@ public List<String> listTableNames(SessionContext ctx, String dbName) {
209212
public void onClose() {
210213
super.onClose();
211214
if (null != catalog) {
212-
catalog = null;
215+
try {
216+
if (catalog instanceof AutoCloseable) {
217+
((AutoCloseable) catalog).close();
218+
}
219+
catalog = null;
220+
} catch (Exception e) {
221+
LOG.warn("Failed to close iceberg catalog: {}", getName(), e);
222+
}
213223
}
214224
}
215225

fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,4 +190,16 @@ public void checkProperties() throws DdlException {
190190
super.checkProperties();
191191
catalogProperty.checkMetaStoreAndStorageProperties(AbstractPaimonProperties.class);
192192
}
193+
194+
@Override
195+
public void onClose() {
196+
super.onClose();
197+
if (null != catalog) {
198+
try {
199+
catalog.close();
200+
} catch (Exception e) {
201+
LOG.warn("Failed to close paimon catalog: {}", getName(), e);
202+
}
203+
}
204+
}
193205
}

fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ protected AbstractIcebergProperties(Map<String, String> props) {
111111
* This field is used to perform metadata operations like creating, querying,
112112
* and deleting Iceberg tables.
113113
*/
114-
public final Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList) {
114+
public final Catalog initializeCatalog(String catalogName,
115+
List<StorageProperties> storagePropertiesList) {
115116
Map<String, String> catalogProps = new HashMap<>(getOrigProps());
116117
if (StringUtils.isNotBlank(warehouse)) {
117118
catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);

0 commit comments

Comments
 (0)