From 0908d3d570bee8090219b3913ef1d117a986c167 Mon Sep 17 00:00:00 2001 From: Assassinxc <330109311@qq.com> Date: Fri, 24 Jan 2025 18:18:04 +0800 Subject: [PATCH 1/3] fix the grpc client channel executor is shutdown --- .../adapter/common/AbstractDtpAdapter.java | 5 ++++ .../grpc/internal/ChannelExecutorFetcher.java | 22 +++++++++++++++ .../adapter/grpc/GrpcDtpAdapter.java | 28 +++++++++++++++++-- 3 files changed, 52 insertions(+), 3 deletions(-) create mode 100644 adapter/adapter-grpc/src/main/java/io/grpc/internal/ChannelExecutorFetcher.java diff --git a/adapter/adapter-common/src/main/java/org/dromara/dynamictp/adapter/common/AbstractDtpAdapter.java b/adapter/adapter-common/src/main/java/org/dromara/dynamictp/adapter/common/AbstractDtpAdapter.java index 53b68371d..23f3d5e5e 100644 --- a/adapter/adapter-common/src/main/java/org/dromara/dynamictp/adapter/common/AbstractDtpAdapter.java +++ b/adapter/adapter-common/src/main/java/org/dromara/dynamictp/adapter/common/AbstractDtpAdapter.java @@ -172,6 +172,11 @@ protected void enhanceOriginExecutor(String tpName, ThreadPoolExecutor executor, putAndFinalize(tpName, executor, proxy); } + protected void enhanceOriginExecutorByOfferingProxy(String tpName,ThreadPoolExecutorProxy proxy, String fieldName, Object targetObj){ + ReflectionUtil.setFieldValue(fieldName, targetObj, proxy); + executors.put(tpName, new ExecutorWrapper(tpName, proxy)); + } + protected void putAndFinalize(String tpName, ExecutorService origin, Executor targetForWrapper) { executors.put(tpName, new ExecutorWrapper(tpName, targetForWrapper)); shutdownOriginalExecutor(origin); diff --git a/adapter/adapter-grpc/src/main/java/io/grpc/internal/ChannelExecutorFetcher.java b/adapter/adapter-grpc/src/main/java/io/grpc/internal/ChannelExecutorFetcher.java new file mode 100644 index 000000000..cc3816e17 --- /dev/null +++ b/adapter/adapter-grpc/src/main/java/io/grpc/internal/ChannelExecutorFetcher.java @@ -0,0 +1,22 @@ +package io.grpc.internal; +import io.grpc.ManagedChannel; +import org.dromara.dynamictp.common.util.ReflectionUtil; + +import java.util.concurrent.Executor; + +/** + * @author Assassinxc + * @title: ChannelExecutorFetcher + * @projectName dynamic-tp + * @description: + * @date 2025/1/24 17:31 + */ +public class ChannelExecutorFetcher { + public static Executor getManagedChannelImplExecutor(ManagedChannel channel) { + if(channel instanceof ManagedChannelImpl) { + return (Executor) ReflectionUtil.getFieldValue(ManagedChannelImpl.class, "executor", channel); + }else{ + return null; + } + } +} diff --git a/adapter/adapter-grpc/src/main/java/org/dromara/dynamictp/adapter/grpc/GrpcDtpAdapter.java b/adapter/adapter-grpc/src/main/java/org/dromara/dynamictp/adapter/grpc/GrpcDtpAdapter.java index 5b76765f1..04f700e69 100644 --- a/adapter/adapter-grpc/src/main/java/org/dromara/dynamictp/adapter/grpc/GrpcDtpAdapter.java +++ b/adapter/adapter-grpc/src/main/java/org/dromara/dynamictp/adapter/grpc/GrpcDtpAdapter.java @@ -17,7 +17,9 @@ package org.dromara.dynamictp.adapter.grpc; +import io.grpc.ManagedChannel; import io.grpc.inprocess.InProcessSocketAddress; +import io.grpc.internal.ChannelExecutorFetcher; import io.grpc.internal.InternalServer; import io.grpc.internal.ServerImpl; import lombok.extern.slf4j.Slf4j; @@ -26,13 +28,14 @@ import org.dromara.dynamictp.adapter.common.AbstractDtpAdapter; import org.dromara.dynamictp.common.properties.DtpProperties; import org.dromara.dynamictp.common.util.ReflectionUtil; +import org.dromara.dynamictp.core.support.proxy.ThreadPoolExecutorProxy; import org.dromara.dynamictp.jvmti.JVMTI; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.Objects; -import java.util.Optional; +import java.util.*; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; /** @@ -63,6 +66,7 @@ protected void initialize() { log.warn("Cannot find beans of type ServerImpl."); return; } + Map> proxiedMap = new HashMap<>(); for (val serverImpl : beans) { val internalServer = (InternalServer) ReflectionUtil.getFieldValue(ServerImpl.class, SERVER_FIELD, serverImpl); String key = Optional.ofNullable(internalServer) @@ -80,7 +84,25 @@ protected void initialize() { } val executor = (Executor) ReflectionUtil.getFieldValue(ServerImpl.class, EXECUTOR_FIELD, serverImpl); if (Objects.nonNull(executor) && executor instanceof ThreadPoolExecutor) { - enhanceOriginExecutor(genTpName(key), (ThreadPoolExecutor) executor, EXECUTOR_FIELD, serverImpl); + enhanceOriginExecutorByOfferingProxy(genTpName(key), + new ThreadPoolExecutorProxy((ThreadPoolExecutor) executor), EXECUTOR_FIELD, serverImpl); + proxiedMap.computeIfAbsent(executor.hashCode(), k -> new ArrayList<>()).add(executor); + } + } + if (!proxiedMap.isEmpty()) { + val clientBeans = JVMTI.getInstances(ManagedChannel.class); + if (!clientBeans.isEmpty()) { + for (val channelImpl : clientBeans) { + val executor = ChannelExecutorFetcher.getManagedChannelImplExecutor(channelImpl); + if (Objects.nonNull(executor) && executor instanceof ThreadPoolExecutor && proxiedMap.containsKey(executor.hashCode())) { + proxiedMap.get(executor.hashCode()).removeIf(proxiedExecutor -> Objects.equals(executor, proxiedExecutor)); + } + } + } + for (List executorList : proxiedMap.values()) { + for (Executor executor : executorList) { + shutdownOriginalExecutor((ExecutorService) executor); + } } } } From 306168559cd5cf1e46803adcc3c514ce22b4db56 Mon Sep 17 00:00:00 2001 From: Assassinxc <330109311@qq.com> Date: Fri, 24 Jan 2025 19:00:38 +0800 Subject: [PATCH 2/3] fix comment --- .../main/java/io/grpc/internal/ChannelExecutorFetcher.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/adapter/adapter-grpc/src/main/java/io/grpc/internal/ChannelExecutorFetcher.java b/adapter/adapter-grpc/src/main/java/io/grpc/internal/ChannelExecutorFetcher.java index cc3816e17..5956feb6d 100644 --- a/adapter/adapter-grpc/src/main/java/io/grpc/internal/ChannelExecutorFetcher.java +++ b/adapter/adapter-grpc/src/main/java/io/grpc/internal/ChannelExecutorFetcher.java @@ -5,11 +5,10 @@ import java.util.concurrent.Executor; /** + * ChannelExecutorFetcher + * * @author Assassinxc - * @title: ChannelExecutorFetcher - * @projectName dynamic-tp - * @description: - * @date 2025/1/24 17:31 + * @since 1.1.9.1 */ public class ChannelExecutorFetcher { public static Executor getManagedChannelImplExecutor(ManagedChannel channel) { From 6a725deb0ee08df90b40f864df4d0cb4498086ba Mon Sep 17 00:00:00 2001 From: Assassinxc <330109311@qq.com> Date: Fri, 24 Jan 2025 19:14:44 +0800 Subject: [PATCH 3/3] fix code to follow coding standard --- .../adapter/common/AbstractDtpAdapter.java | 2 +- .../grpc/internal/ChannelExecutorFetcher.java | 22 +++++++++++++++++-- .../adapter/grpc/GrpcDtpAdapter.java | 7 +++++- 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/adapter/adapter-common/src/main/java/org/dromara/dynamictp/adapter/common/AbstractDtpAdapter.java b/adapter/adapter-common/src/main/java/org/dromara/dynamictp/adapter/common/AbstractDtpAdapter.java index 23f3d5e5e..d02b93cc7 100644 --- a/adapter/adapter-common/src/main/java/org/dromara/dynamictp/adapter/common/AbstractDtpAdapter.java +++ b/adapter/adapter-common/src/main/java/org/dromara/dynamictp/adapter/common/AbstractDtpAdapter.java @@ -172,7 +172,7 @@ protected void enhanceOriginExecutor(String tpName, ThreadPoolExecutor executor, putAndFinalize(tpName, executor, proxy); } - protected void enhanceOriginExecutorByOfferingProxy(String tpName,ThreadPoolExecutorProxy proxy, String fieldName, Object targetObj){ + protected void enhanceOriginExecutorByOfferingProxy(String tpName, ThreadPoolExecutorProxy proxy, String fieldName, Object targetObj) { ReflectionUtil.setFieldValue(fieldName, targetObj, proxy); executors.put(tpName, new ExecutorWrapper(tpName, proxy)); } diff --git a/adapter/adapter-grpc/src/main/java/io/grpc/internal/ChannelExecutorFetcher.java b/adapter/adapter-grpc/src/main/java/io/grpc/internal/ChannelExecutorFetcher.java index 5956feb6d..b9984ebbe 100644 --- a/adapter/adapter-grpc/src/main/java/io/grpc/internal/ChannelExecutorFetcher.java +++ b/adapter/adapter-grpc/src/main/java/io/grpc/internal/ChannelExecutorFetcher.java @@ -1,4 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.grpc.internal; + import io.grpc.ManagedChannel; import org.dromara.dynamictp.common.util.ReflectionUtil; @@ -12,9 +30,9 @@ */ public class ChannelExecutorFetcher { public static Executor getManagedChannelImplExecutor(ManagedChannel channel) { - if(channel instanceof ManagedChannelImpl) { + if (channel instanceof ManagedChannelImpl) { return (Executor) ReflectionUtil.getFieldValue(ManagedChannelImpl.class, "executor", channel); - }else{ + } else { return null; } } diff --git a/adapter/adapter-grpc/src/main/java/org/dromara/dynamictp/adapter/grpc/GrpcDtpAdapter.java b/adapter/adapter-grpc/src/main/java/org/dromara/dynamictp/adapter/grpc/GrpcDtpAdapter.java index 04f700e69..8eb1db667 100644 --- a/adapter/adapter-grpc/src/main/java/org/dromara/dynamictp/adapter/grpc/GrpcDtpAdapter.java +++ b/adapter/adapter-grpc/src/main/java/org/dromara/dynamictp/adapter/grpc/GrpcDtpAdapter.java @@ -33,7 +33,12 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.Optional; +import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor;