Skip to content

Commit bc1a05d

Browse files
authored
[GSOC][RIP-78][ISSUES#308] Add part of refactored backend files (#313)
1 parent 3cbff60 commit bc1a05d

File tree

86 files changed

+1578
-956
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+1578
-956
lines changed

pom.xml

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -82,29 +82,28 @@
8282

8383
<properties>
8484
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
85-
<maven.compiler.source>1.8</maven.compiler.source>
86-
<maven.compiler.target>1.8</maven.compiler.target>
87-
85+
<maven.compiler.source>17</maven.compiler.source>
86+
<maven.compiler.target>17</maven.compiler.target>
8887
<guava.version>29.0-jre</guava.version>
8988
<commons-digester.version>2.1</commons-digester.version>
9089
<commons-lang.version>2.6</commons-lang.version>
9190
<commons-io.version>2.4</commons-io.version>
9291
<commons-cli.version>1.2</commons-cli.version>
9392
<commons-collections.version>3.2.2</commons-collections.version>
94-
<rocketmq.version>5.1.0</rocketmq.version>
93+
<rocketmq.version>5.3.3</rocketmq.version>
9594
<surefire.version>2.19.1</surefire.version>
9695
<aspectj.version>1.9.6</aspectj.version>
9796
<lombok.version>1.18.22</lombok.version>
9897
<main.basedir>${basedir}/../..</main.basedir>
9998
<docker.image.prefix>apacherocketmq</docker.image.prefix>
100-
<spring.boot.version>2.6.0</spring.boot.version>
99+
<spring.boot.version>3.4.5</spring.boot.version>
101100
<mockito-inline.version>3.3.3</mockito-inline.version>
102-
<jaxb-api.version>2.3.1</jaxb-api.version>
101+
<jakarta.xml.bind-api.version>4.0.0</jakarta.xml.bind-api.version>
103102
<commons-pool2.version>2.4.3</commons-pool2.version>
104103
<easyexcel.version>2.2.10</easyexcel.version>
105104
<asm.version>4.2</asm.version>
106105
<junit.version>4.12</junit.version>
107-
<snakeyaml.version>1.32</snakeyaml.version>
106+
<snakeyaml.version>2.0</snakeyaml.version>
108107
<cglib.version>2.2.2</cglib.version>
109108
<joor.version>0.9.6</joor.version>
110109
<bcpkix-jdk15on.version>1.68</bcpkix-jdk15on.version>
@@ -115,6 +114,12 @@
115114
<groupId>org.springframework.boot</groupId>
116115
<artifactId>spring-boot-starter-web</artifactId>
117116
<version>${spring.boot.version}</version>
117+
<exclusions>
118+
<exclusion>
119+
<groupId>javax.annotation</groupId>
120+
<artifactId>javax.annotation-api</artifactId>
121+
</exclusion>
122+
</exclusions>
118123
</dependency>
119124
<dependency>
120125
<groupId>org.springframework.boot</groupId>
@@ -235,9 +240,9 @@
235240
<version>${bcpkix-jdk15on.version}</version>
236241
</dependency>
237242
<dependency>
238-
<groupId>javax.xml.bind</groupId>
239-
<artifactId>jaxb-api</artifactId>
240-
<version>${jaxb-api.version}</version>
243+
<groupId>jakarta.xml.bind</groupId>
244+
<artifactId>jakarta.xml.bind-api</artifactId>
245+
<version>${jakarta.xml.bind-api.version}</version>
241246
</dependency>
242247
<dependency>
243248
<groupId>org.projectlombok</groupId>
@@ -282,7 +287,7 @@
282287
<plugins>
283288
<plugin>
284289
<artifactId>maven-compiler-plugin</artifactId>
285-
<version>3.5.1</version>
290+
<version>3.11.0</version>
286291
<configuration>
287292
<source>${maven.compiler.source}</source>
288293
<target>${maven.compiler.target}</target>
@@ -296,19 +301,22 @@
296301
<version>${lombok.version}</version>
297302
</path>
298303
</annotationProcessorPaths>
304+
<compilerArgs>
305+
<arg>-parameters</arg>
306+
</compilerArgs>
299307
</configuration>
300308
</plugin>
301309
<plugin>
302310
<groupId>org.springframework.boot</groupId>
303311
<artifactId>spring-boot-maven-plugin</artifactId>
304312
<version>${spring.boot.version}</version>
305-
<executions>
306-
<execution>
307-
<goals>
308-
<goal>repackage</goal>
309-
</goals>
310-
</execution>
311-
</executions>
313+
<!-- <executions>-->
314+
<!-- <execution>-->
315+
<!-- <goals>-->
316+
<!-- <goal>repackage</goal>-->
317+
<!-- </goals>-->
318+
<!-- </execution>-->
319+
<!-- </executions>-->
312320
</plugin>
313321
<!-- Use dockerfile-maven instead, https://github.com/spotify/dockerfile-maven -->
314322
<plugin>
@@ -384,9 +392,9 @@
384392
<version>4.3.0</version>
385393
<dependencies>
386394
<dependency>
387-
<groupId>javax.xml.bind</groupId>
388-
<artifactId>jaxb-api</artifactId>
389-
<version>${jaxb-api.version}</version>
395+
<groupId>jakarta.xml.bind</groupId>
396+
<artifactId>jakarta.xml.bind-api</artifactId>
397+
<version>${jakarta.xml.bind-api.version}</version>
390398
</dependency>
391399
</dependencies>
392400
</plugin>
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.dashboard.admin;
19+
20+
import org.apache.rocketmq.tools.admin.MQAdminExt;
21+
22+
@FunctionalInterface
23+
public interface MQAdminExtCallback<T> {
24+
T doInMQAdminExt(MQAdminExt mqAdminExt) throws Exception;
25+
}

src/main/java/org/apache/rocketmq/dashboard/admin/MQAdminFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package org.apache.rocketmq.dashboard.admin;
1818

19-
import java.util.concurrent.atomic.AtomicLong;
2019
import lombok.extern.slf4j.Slf4j;
2120
import org.apache.commons.lang3.StringUtils;
2221
import org.apache.rocketmq.acl.common.AclClientRPCHook;
@@ -26,6 +25,8 @@
2625
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
2726
import org.apache.rocketmq.tools.admin.MQAdminExt;
2827

28+
import java.util.concurrent.atomic.AtomicLong;
29+
2930
@Slf4j
3031
public class MQAdminFactory {
3132
private RMQConfigure rmqConfigure;
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.dashboard.admin;
18+
19+
import lombok.extern.slf4j.Slf4j;
20+
import org.apache.commons.pool2.impl.GenericObjectPool;
21+
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
22+
import org.apache.rocketmq.client.ClientConfig;
23+
import org.apache.rocketmq.dashboard.config.RMQConfigure;
24+
import org.apache.rocketmq.tools.admin.MQAdminExt;
25+
import org.springframework.beans.factory.annotation.Autowired;
26+
import org.springframework.stereotype.Component;
27+
28+
import javax.annotation.PreDestroy;
29+
import java.util.concurrent.ConcurrentHashMap;
30+
import java.util.concurrent.ConcurrentMap;
31+
32+
@Component
33+
@Slf4j
34+
public class UserMQAdminPoolManager {
35+
36+
37+
private final ConcurrentMap<String/* userAk */, GenericObjectPool<MQAdminExt>> userPools = new ConcurrentHashMap<>();
38+
39+
private final ClientConfig baseClientConfig;
40+
41+
@Autowired
42+
public UserMQAdminPoolManager(RMQConfigure rmqConfigure) {
43+
this.baseClientConfig = new ClientConfig();
44+
this.baseClientConfig.setNamesrvAddr(rmqConfigure.getNamesrvAddr());
45+
this.baseClientConfig.setClientCallbackExecutorThreads(rmqConfigure.getClientCallbackExecutorThreads());
46+
this.baseClientConfig.setVipChannelEnabled(Boolean.parseBoolean(rmqConfigure.getIsVIPChannel()));
47+
this.baseClientConfig.setUseTLS(rmqConfigure.isUseTLS());
48+
log.info("UserMQAdminPoolManager initialized with baseClientConfig for NameServer: {}", rmqConfigure.getNamesrvAddr());
49+
}
50+
51+
52+
public MQAdminExt borrowMQAdminExt(String userAk, String userSk) throws Exception {
53+
GenericObjectPool<MQAdminExt> userPool = userPools.get(userAk);
54+
55+
if (userPool == null) {
56+
log.info("Creating new MQAdminExt pool for user: {}", userAk);
57+
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
58+
poolConfig.setMaxTotal(1);
59+
poolConfig.setMaxIdle(1);
60+
poolConfig.setMinIdle(0);
61+
poolConfig.setTestWhileIdle(true);
62+
poolConfig.setTimeBetweenEvictionRunsMillis(20000);
63+
poolConfig.setMaxWaitMillis(10000);
64+
65+
UserSpecificMQAdminPooledObjectFactory factory =
66+
new UserSpecificMQAdminPooledObjectFactory(baseClientConfig, userAk, userSk);
67+
68+
GenericObjectPool<MQAdminExt> newUserPool = new GenericObjectPool<>(factory, poolConfig);
69+
70+
GenericObjectPool<MQAdminExt> existingPool = userPools.putIfAbsent(userAk, newUserPool);
71+
if (existingPool != null) {
72+
log.warn("Another thread concurrently created MQAdminExt pool for user {}. Shutting down redundant pool.", userAk);
73+
newUserPool.close();
74+
userPool = existingPool;
75+
} else {
76+
userPool = newUserPool;
77+
log.info("Successfully created and registered MQAdminExt pool for user: {}", userAk);
78+
}
79+
}
80+
81+
return userPool.borrowObject();
82+
}
83+
84+
public void returnMQAdminExt(String userAk, MQAdminExt mqAdminExt) {
85+
GenericObjectPool<MQAdminExt> userPool = userPools.get(userAk);
86+
if (userPool != null) {
87+
try {
88+
userPool.returnObject(mqAdminExt);
89+
log.debug("Returned MQAdminExt object ({}) to pool for user: {}", mqAdminExt, userAk);
90+
} catch (Exception e) {
91+
log.error("Failed to return MQAdminExt object ({}) for user {}: {}", mqAdminExt, userAk, e.getMessage(), e);
92+
if (mqAdminExt != null) {
93+
try {
94+
mqAdminExt.shutdown();
95+
} catch (Exception se) {
96+
log.warn("Error shutting down MQAdminExt after failed return: {}", se.getMessage());
97+
}
98+
}
99+
}
100+
} else {
101+
log.warn("Attempted to return MQAdminExt for non-existent user pool: {}. Shutting down the object directly.", userAk);
102+
if (mqAdminExt != null) {
103+
try {
104+
mqAdminExt.shutdown();
105+
} catch (Exception se) {
106+
log.warn("Error shutting down MQAdminExt for non-existent pool: {}", se.getMessage());
107+
}
108+
}
109+
}
110+
}
111+
112+
public void shutdownUserPool(String userAk) {
113+
GenericObjectPool<MQAdminExt> userPool = userPools.remove(userAk);
114+
if (userPool != null) {
115+
userPool.close();
116+
log.info("Shutdown and removed MQAdminExt pool for user: {}", userAk);
117+
} else {
118+
log.warn("Attempted to shut down non-existent user pool: {}", userAk);
119+
}
120+
}
121+
122+
@PreDestroy
123+
public void shutdownAllPools() {
124+
log.info("Shutting down all MQAdminExt user pools...");
125+
userPools.forEach((userAk, pool) -> {
126+
pool.close();
127+
log.info("Shutdown MQAdminExt pool for user: {}", userAk);
128+
});
129+
userPools.clear();
130+
log.info("All MQAdminExt user pools have been shut down.");
131+
}
132+
}

0 commit comments

Comments
 (0)