diff --git a/adapter/adapter-common/src/main/java/org/dromara/dynamictp/adapter/common/AbstractDtpAdapter.java b/adapter/adapter-common/src/main/java/org/dromara/dynamictp/adapter/common/AbstractDtpAdapter.java index 53b68371d..d02b93cc7 100644 --- a/adapter/adapter-common/src/main/java/org/dromara/dynamictp/adapter/common/AbstractDtpAdapter.java +++ b/adapter/adapter-common/src/main/java/org/dromara/dynamictp/adapter/common/AbstractDtpAdapter.java @@ -172,6 +172,11 @@ protected void enhanceOriginExecutor(String tpName, ThreadPoolExecutor executor, putAndFinalize(tpName, executor, proxy); } + protected void enhanceOriginExecutorByOfferingProxy(String tpName, ThreadPoolExecutorProxy proxy, String fieldName, Object targetObj) { + ReflectionUtil.setFieldValue(fieldName, targetObj, proxy); + executors.put(tpName, new ExecutorWrapper(tpName, proxy)); + } + protected void putAndFinalize(String tpName, ExecutorService origin, Executor targetForWrapper) { executors.put(tpName, new ExecutorWrapper(tpName, targetForWrapper)); shutdownOriginalExecutor(origin); diff --git a/adapter/adapter-grpc/src/main/java/io/grpc/internal/ChannelExecutorFetcher.java b/adapter/adapter-grpc/src/main/java/io/grpc/internal/ChannelExecutorFetcher.java new file mode 100644 index 000000000..b9984ebbe --- /dev/null +++ b/adapter/adapter-grpc/src/main/java/io/grpc/internal/ChannelExecutorFetcher.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import io.grpc.ManagedChannel; +import org.dromara.dynamictp.common.util.ReflectionUtil; + +import java.util.concurrent.Executor; + +/** + * ChannelExecutorFetcher + * + * @author Assassinxc + * @since 1.1.9.1 + */ +public class ChannelExecutorFetcher { + public static Executor getManagedChannelImplExecutor(ManagedChannel channel) { + if (channel instanceof ManagedChannelImpl) { + return (Executor) ReflectionUtil.getFieldValue(ManagedChannelImpl.class, "executor", channel); + } else { + return null; + } + } +} diff --git a/adapter/adapter-grpc/src/main/java/org/dromara/dynamictp/adapter/grpc/GrpcDtpAdapter.java b/adapter/adapter-grpc/src/main/java/org/dromara/dynamictp/adapter/grpc/GrpcDtpAdapter.java index 5b76765f1..8eb1db667 100644 --- a/adapter/adapter-grpc/src/main/java/org/dromara/dynamictp/adapter/grpc/GrpcDtpAdapter.java +++ b/adapter/adapter-grpc/src/main/java/org/dromara/dynamictp/adapter/grpc/GrpcDtpAdapter.java @@ -17,7 +17,9 @@ package org.dromara.dynamictp.adapter.grpc; +import io.grpc.ManagedChannel; import io.grpc.inprocess.InProcessSocketAddress; +import io.grpc.internal.ChannelExecutorFetcher; import io.grpc.internal.InternalServer; import io.grpc.internal.ServerImpl; import lombok.extern.slf4j.Slf4j; @@ -26,13 +28,19 @@ import org.dromara.dynamictp.adapter.common.AbstractDtpAdapter; import org.dromara.dynamictp.common.properties.DtpProperties; import org.dromara.dynamictp.common.util.ReflectionUtil; +import org.dromara.dynamictp.core.support.proxy.ThreadPoolExecutorProxy; import org.dromara.dynamictp.jvmti.JVMTI; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.Objects; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; import java.util.Optional; +import java.util.Objects; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; /** @@ -63,6 +71,7 @@ protected void initialize() { log.warn("Cannot find beans of type ServerImpl."); return; } + Map> proxiedMap = new HashMap<>(); for (val serverImpl : beans) { val internalServer = (InternalServer) ReflectionUtil.getFieldValue(ServerImpl.class, SERVER_FIELD, serverImpl); String key = Optional.ofNullable(internalServer) @@ -80,7 +89,25 @@ protected void initialize() { } val executor = (Executor) ReflectionUtil.getFieldValue(ServerImpl.class, EXECUTOR_FIELD, serverImpl); if (Objects.nonNull(executor) && executor instanceof ThreadPoolExecutor) { - enhanceOriginExecutor(genTpName(key), (ThreadPoolExecutor) executor, EXECUTOR_FIELD, serverImpl); + enhanceOriginExecutorByOfferingProxy(genTpName(key), + new ThreadPoolExecutorProxy((ThreadPoolExecutor) executor), EXECUTOR_FIELD, serverImpl); + proxiedMap.computeIfAbsent(executor.hashCode(), k -> new ArrayList<>()).add(executor); + } + } + if (!proxiedMap.isEmpty()) { + val clientBeans = JVMTI.getInstances(ManagedChannel.class); + if (!clientBeans.isEmpty()) { + for (val channelImpl : clientBeans) { + val executor = ChannelExecutorFetcher.getManagedChannelImplExecutor(channelImpl); + if (Objects.nonNull(executor) && executor instanceof ThreadPoolExecutor && proxiedMap.containsKey(executor.hashCode())) { + proxiedMap.get(executor.hashCode()).removeIf(proxiedExecutor -> Objects.equals(executor, proxiedExecutor)); + } + } + } + for (List executorList : proxiedMap.values()) { + for (Executor executor : executorList) { + shutdownOriginalExecutor((ExecutorService) executor); + } } } }