Skip to content

Commit 56245fa

Browse files
authored
Merge pull request #570 from yanhom1314/master
Add Thrift adapter module with support for multiple server types
2 parents 2f182d8 + 03ec5ea commit 56245fa

File tree

25 files changed

+1946
-7
lines changed

25 files changed

+1946
-7
lines changed

adapter/adapter-thrift/pom.xml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
<parent>
6+
<groupId>org.dromara.dynamictp</groupId>
7+
<artifactId>dynamic-tp-adapter</artifactId>
8+
<version>${revision}</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
<artifactId>dynamic-tp-adapter-thrift</artifactId>
12+
13+
<dependencies>
14+
<dependency>
15+
<groupId>org.dromara.dynamictp</groupId>
16+
<artifactId>dynamic-tp-adapter-common</artifactId>
17+
</dependency>
18+
19+
<dependency>
20+
<groupId>org.dromara.dynamictp</groupId>
21+
<artifactId>dynamic-tp-jvmti-runtime</artifactId>
22+
</dependency>
23+
24+
<dependency>
25+
<groupId>org.apache.thrift</groupId>
26+
<artifactId>libthrift</artifactId>
27+
<scope>provided</scope>
28+
</dependency>
29+
</dependencies>
30+
</project>
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.dromara.dynamictp.adapter.thrift;
19+
20+
import lombok.extern.slf4j.Slf4j;
21+
import org.apache.commons.collections4.CollectionUtils;
22+
import org.apache.thrift.server.TThreadPoolServer;
23+
import org.apache.thrift.server.THsHaServer;
24+
import org.apache.thrift.server.TThreadedSelectorServer;
25+
import org.dromara.dynamictp.adapter.common.AbstractDtpAdapter;
26+
import org.dromara.dynamictp.common.properties.DtpProperties;
27+
import org.dromara.dynamictp.common.util.MethodUtil;
28+
import org.dromara.dynamictp.common.util.ReflectionUtil;
29+
import org.dromara.dynamictp.core.support.proxy.ThreadPoolExecutorProxy;
30+
import org.dromara.dynamictp.jvmti.JVMTI;
31+
32+
import java.lang.reflect.Method;
33+
import java.util.List;
34+
import java.util.Objects;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.ThreadPoolExecutor;
37+
38+
/**
39+
* ThriftDtpAdapter for managing Thrift server thread pools
40+
*
41+
* @author devin
42+
* @since 1.2.2
43+
*/
44+
@Slf4j
45+
@SuppressWarnings("all")
46+
public class ThriftDtpAdapter extends AbstractDtpAdapter {
47+
48+
private static final String TP_PREFIX = "thriftTp";
49+
50+
private static final String THREAD_POOL_SERVER_EXECUTOR_FIELD = "executorService_";
51+
private static final String HSHASERVER_EXECUTOR_FIELD = "invoker";
52+
private static final String THREADED_SELECTOR_WORKER_FIELD = "invoker";
53+
54+
@Override
55+
public void refresh(DtpProperties dtpProperties) {
56+
refresh(dtpProperties.getThriftTp(), dtpProperties.getPlatforms());
57+
}
58+
59+
@Override
60+
protected String getTpPrefix() {
61+
return TP_PREFIX;
62+
}
63+
64+
@Override
65+
protected void initialize() {
66+
super.initialize();
67+
List<TThreadPoolServer> tThreadPoolServers = JVMTI.getInstances(TThreadPoolServer.class);
68+
if (CollectionUtils.isEmpty(tThreadPoolServers)) {
69+
if (log.isDebugEnabled()) {
70+
log.debug("Cannot find instances of TThreadPoolServer.");
71+
}
72+
}
73+
tThreadPoolServers.forEach(this::initializeTThreadPoolServer);
74+
75+
List<THsHaServer> tHsHaServers = JVMTI.getInstances(THsHaServer.class);
76+
if (CollectionUtils.isEmpty(tHsHaServers)) {
77+
if (log.isDebugEnabled()) {
78+
log.debug("Cannot find instances of THsHaServer.");
79+
}
80+
}
81+
tHsHaServers.forEach(this::initializeTHsHaServer);
82+
83+
List<TThreadedSelectorServer> tThreadedSelectorServers = JVMTI.getInstances(TThreadedSelectorServer.class);
84+
if (CollectionUtils.isEmpty(tThreadedSelectorServers)) {
85+
if (log.isDebugEnabled()) {
86+
log.debug("Cannot find instances of TThreadedSelectorServer.");
87+
}
88+
}
89+
tThreadedSelectorServers.forEach(this::initializeTThreadedSelectorServer);
90+
}
91+
92+
public void initializeTThreadPoolServer(TThreadPoolServer server) {
93+
ThreadPoolExecutor executor = (ThreadPoolExecutor) ReflectionUtil.getFieldValue(
94+
TThreadPoolServer.class, THREAD_POOL_SERVER_EXECUTOR_FIELD, server);
95+
if (Objects.nonNull(executor)) {
96+
int port = getServerPort(server);
97+
String tpName = genTpName("TThreadPoolServer", port);
98+
ThreadPoolExecutorProxy proxy = new ThreadPoolExecutorProxy(executor);
99+
enhanceOriginExecutor(tpName, proxy, THREAD_POOL_SERVER_EXECUTOR_FIELD, server);
100+
log.info("DynamicTp adapter, thrift TThreadPoolServer executorService_ enhanced, tpName: {}", tpName);
101+
}
102+
}
103+
104+
public void initializeTHsHaServer(THsHaServer server) {
105+
ExecutorService executor = (ExecutorService) ReflectionUtil.getFieldValue(
106+
THsHaServer.class, HSHASERVER_EXECUTOR_FIELD, server);
107+
if (Objects.nonNull(executor) && executor instanceof ThreadPoolExecutor) {
108+
int port = getServerPort(server);
109+
String tpName = genTpName("THsHaServer", port);
110+
ThreadPoolExecutorProxy proxy = new ThreadPoolExecutorProxy((ThreadPoolExecutor) executor);
111+
enhanceOriginExecutor(tpName, proxy, HSHASERVER_EXECUTOR_FIELD, server);
112+
log.info("DynamicTp adapter, thrift THsHaServer invoker enhanced, tpName: {}", tpName);
113+
}
114+
}
115+
116+
public void initializeTThreadedSelectorServer(TThreadedSelectorServer server) {
117+
ExecutorService executor = (ExecutorService) ReflectionUtil.getFieldValue(
118+
TThreadedSelectorServer.class, THREADED_SELECTOR_WORKER_FIELD, server);
119+
if (Objects.nonNull(executor) && executor instanceof ThreadPoolExecutor) {
120+
int port = getServerPort(server);
121+
String tpName = genTpName("TThreadedSelectorServer", port);
122+
ThreadPoolExecutorProxy proxy = new ThreadPoolExecutorProxy((ThreadPoolExecutor) executor);
123+
enhanceOriginExecutor(tpName, proxy, THREADED_SELECTOR_WORKER_FIELD, server);
124+
log.info("DynamicTp adapter, thrift TThreadedSelectorServer invoker enhanced, tpName: {}", tpName);
125+
}
126+
}
127+
128+
private String genTpName(String serverType, int port) {
129+
return TP_PREFIX + "#" + serverType + "#" + (port > 0 ? port : "");
130+
}
131+
132+
/**
133+
* Try to get the server port for better naming
134+
*
135+
* @param server Thrift server instance
136+
* @return port number or -1 if not available
137+
*/
138+
private int getServerPort(Object server) {
139+
try {
140+
Object serverTransport = ReflectionUtil.getFieldValue("serverTransport_", server);
141+
if (Objects.isNull(serverTransport)) {
142+
return -1;
143+
}
144+
Object serverSocket = ReflectionUtil.getFieldValue("serverSocket_", serverTransport);
145+
if (Objects.isNull(serverSocket)) {
146+
return -1;
147+
}
148+
Method getLocalPortMethod = ReflectionUtil.findMethod(serverSocket.getClass(), "getLocalPort");
149+
if (getLocalPortMethod != null) {
150+
int port = MethodUtil.invokeAndReturnInt(getLocalPortMethod, serverSocket);
151+
if (port > 0) {
152+
return port;
153+
}
154+
}
155+
log.warn("Could not extract port from Thrift server: {}", server.getClass().getSimpleName());
156+
} catch (Exception e) {
157+
log.warn("Error extracting port from Thrift server: {}", e.getMessage());
158+
}
159+
return -1;
160+
}
161+
}

adapter/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
<module>adapter-sofa</module>
2525
<module>adapter-rabbitmq</module>
2626
<module>adapter-liteflow</module>
27+
<module>adapter-thrift</module>
2728
</modules>
2829

2930
<dependencies>

common/src/main/java/org/dromara/dynamictp/common/manager/NullContextManager.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.dromara.dynamictp.common.manager;
1919

20+
import java.util.Collections;
2021
import java.util.Map;
2122

2223
/**
@@ -29,36 +30,36 @@ public class NullContextManager implements ContextManager {
2930

3031
@Override
3132
public <T> T getBean(Class<T> clazz) {
32-
throw new UnsupportedOperationException();
33+
return null;
3334
}
3435

3536
@Override
3637
public <T> T getBean(String name, Class<T> clazz) {
37-
throw new UnsupportedOperationException();
38+
return null;
3839
}
3940

4041
@Override
4142
public <T> Map<String, T> getBeansOfType(Class<T> clazz) {
42-
throw new UnsupportedOperationException();
43+
return Collections.emptyMap();
4344
}
4445

4546
@Override
4647
public Object getEnvironment() {
47-
throw new UnsupportedOperationException();
48+
return null;
4849
}
4950

5051
@Override
5152
public String getEnvironmentProperty(String key) {
52-
throw new UnsupportedOperationException();
53+
return null;
5354
}
5455

5556
@Override
5657
public String getEnvironmentProperty(String key, Object environment) {
57-
throw new UnsupportedOperationException();
58+
return null;
5859
}
5960

6061
@Override
6162
public String getEnvironmentProperty(String key, String defaultValue) {
62-
throw new UnsupportedOperationException();
63+
return null;
6364
}
6465
}

common/src/main/java/org/dromara/dynamictp/common/properties/DtpProperties.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,11 @@ private DtpProperties() { }
172172
*/
173173
private List<TpExecutorProps> liteflowTp;
174174

175+
/**
176+
* Thrift thread pools.
177+
*/
178+
private List<TpExecutorProps> thriftTp;
179+
175180
public static DtpProperties getInstance() {
176181
return Holder.INSTANCE;
177182
}

common/src/main/java/org/dromara/dynamictp/common/util/MethodUtil.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,19 @@ public static long invokeAndReturnLong(Method method, Object targetObj) {
5858
return -1;
5959
}
6060
}
61+
62+
/**
63+
* Invoke method and return int value.
64+
*
65+
* @param method target method
66+
* @param targetObj the object the underlying method is invoked from
67+
* @return result
68+
*/
69+
public static int invokeAndReturnInt(Method method, Object targetObj) {
70+
try {
71+
return method != null ? (int) method.invoke(targetObj) : -1;
72+
} catch (Exception e) {
73+
return -1;
74+
}
75+
}
6176
}

dependencies/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
<tars.version>1.7.3</tars.version>
4747
<skywalking.version>9.1.0</skywalking.version>
4848
<opentelemetry.version>1.25.0</opentelemetry.version>
49+
<thrift.version>0.21.0</thrift.version>
4950

5051
<apollo.version>2.0.0</apollo.version>
5152
<nacos-api.version>2.0.4</nacos-api.version>
@@ -338,6 +339,12 @@
338339
<version>${bytebuddy.version}</version>
339340
</dependency>
340341

342+
<dependency>
343+
<groupId>org.apache.thrift</groupId>
344+
<artifactId>libthrift</artifactId>
345+
<version>${thrift.version}</version>
346+
</dependency>
347+
341348
<dependency>
342349
<groupId>org.dromara.dynamictp</groupId>
343350
<artifactId>dynamic-tp-jvmti-runtime</artifactId>
@@ -446,6 +453,12 @@
446453
<version>${revision}</version>
447454
</dependency>
448455

456+
<dependency>
457+
<groupId>org.dromara.dynamictp</groupId>
458+
<artifactId>dynamic-tp-adapter-thrift</artifactId>
459+
<version>${revision}</version>
460+
</dependency>
461+
449462
<dependency>
450463
<groupId>org.dromara.dynamictp</groupId>
451464
<artifactId>dynamic-tp-extension-limiter-redis</artifactId>
@@ -610,6 +623,11 @@
610623
<artifactId>dynamic-tp-spring-boot-starter-adapter-liteflow</artifactId>
611624
<version>${revision}</version>
612625
</dependency>
626+
<dependency>
627+
<groupId>org.dromara.dynamictp</groupId>
628+
<artifactId>dynamic-tp-spring-boot-starter-adapter-thrift</artifactId>
629+
<version>${revision}</version>
630+
</dependency>
613631
</dependencies>
614632
</dependencyManagement>
615633

0 commit comments

Comments
 (0)