Skip to content

Commit 1e7a2be

Browse files
committed
添加监控
1 parent f44308e commit 1e7a2be

File tree

9 files changed

+49
-33
lines changed

9 files changed

+49
-33
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# BootNettyRpc
22

33
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg?label=license)](https://github.com/forezp/BootNettyRpc/blob/master/LICENSE)
4-
[![Maven Central](https://img.shields.io/maven-central/v/com.nepxion/thunder.svg?label=maven%20central)](http://mvnrepository.com/artifact/io.github.forezp/boot-netty-rpc-core)
4+
[![Maven Central](https://img.shields.io/maven-central/v/io.github.forezp/boot-netty-rpc-core.svg?label=maven%20central)](http://mvnrepository.com/artifact/io.github.forezp/boot-netty-rpc-core)
55

66

77
README: [English](https://github.com/forezp/BootNettyRpc/blob/master/README-en.md) | [中文](https://github.com/forezp/BootNettyRpc/blob/master/README.md)

boot-netty-rpc-core/src/main/java/io/github/forezp/netty/rpc/core/common/constant/ConfigConstants.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class ConfigConstants {
2727
public static final String SERVER_POOL_KEEPALIVE_TIME = "netty.server.pool.keepalive.time";
2828
public static final String SERVER_POOL_KEEPALIVE_TIME_DEFAULT = "1800000";
2929
public static final String SERVER_POOL_REJECT_TYPE = "netty.server.pool.reject.type";
30-
public static final String SERVER_POOL_REJECT_TYPE_DEFAULT = "DiscardedPolicyWithReport";
30+
public static final String SERVER_POOL_REJECT_TYPE_DEFAULT = "BlockingPolicyWithReport";
3131

3232
public static final String CLIENT_POOL_CORE_SIZE = "netty.client.pool.coresize";
3333
public static final String CLIENT_POOL_CORE_SIZE_DEFAULT = "4";
@@ -40,7 +40,7 @@ public class ConfigConstants {
4040
public static final String CLIENT_POOL_KEEPALIVE_TIME = "netty.client.pool.keepalive.time";
4141
public static final String CLIENT_POOL_KEEPALIVE_TIME_DEFAULT = "1800000";
4242
public static final String CLIENT_POOL_REJECT_TYPE = "netty.client.pool.reject.type";
43-
public static final String CLIENT_POOL_REJECT_TYPE_DEFAULT = "DiscardedPolicyWithReport";
43+
public static final String CLIENT_POOL_REJECT_TYPE_DEFAULT = "BlockingPolicyWithReport";
4444

4545
public static final String NETTY_CLIENT_RENEW_INTERVAL = "netty.client.renew.interval";
4646
public static final String NETTY_CLIENT_RENEW_INTERVAL_DEFAULT = "30";//second

boot-netty-rpc-core/src/main/java/io/github/forezp/netty/rpc/core/common/thread/ThreadPoolFactory.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,22 +36,22 @@ public static ThreadPoolExecutor createServerPoolExecutor(String interfaceName)
3636

3737

3838
public static ThreadPoolExecutor createThreadPoolDefaultExecutor() {
39-
return createThreadPoolExecutor(CommonProperties.CPUS * 1,
39+
return createThreadPoolExecutor( CommonProperties.CPUS * 1,
4040
CommonProperties.CPUS * 2,
4141
15 * 60 * 1000,
42-
false);
42+
false );
4343
}
4444

4545

46-
public static ThreadPoolExecutor createThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, boolean allowCoreThreadTimeOut) {
46+
public static ThreadPoolExecutor createThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, boolean allowCoreThreadTimeOut) {
4747

48-
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
48+
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( corePoolSize,
4949
maximumPoolSize,
5050
keepAliveTime,
5151
TimeUnit.MILLISECONDS,
52-
new LinkedBlockingQueue( ) ,
53-
new BlockingPolicyWithReport());
54-
threadPoolExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
52+
new LinkedBlockingQueue(),
53+
new BlockingPolicyWithReport() );
54+
threadPoolExecutor.allowCoreThreadTimeOut( allowCoreThreadTimeOut );
5555

5656
return threadPoolExecutor;
5757
}
@@ -68,18 +68,18 @@ public static ThreadPoolExecutor createThreadPoolExecutor(Map<String, ThreadPool
6868
int coreSize, int maxSize, Long keepAliveTime, String queneType,
6969
int queueSize, String rejectType) {
7070

71-
if (executorMap.get( interfaceName ) != null) {
72-
return executorMap.get( interfaceName );
73-
} else {
71+
ThreadPoolExecutor threadPoolExecutor = executorMap.get( interfaceName );
72+
if (threadPoolExecutor == null) {
7473
ThreadPoolExecutor newThreadPool = new ThreadPoolExecutor( CommonProperties.CPUS * coreSize
7574
, CommonProperties.CPUS * maxSize, keepAliveTime, TimeUnit.MILLISECONDS
7675
, createQuene( queneType, queueSize ), createThreadFactory( interfaceName )
7776
, createRejectedExecutionHandler( rejectType ) );
78-
if (newThreadPool != null) {
79-
executorMap.putIfAbsent( interfaceName, newThreadPool );
77+
threadPoolExecutor = executorMap.putIfAbsent( interfaceName, newThreadPool );
78+
if (threadPoolExecutor == null) {
79+
threadPoolExecutor = newThreadPool;
8080
}
81-
return newThreadPool;
8281
}
82+
return threadPoolExecutor;
8383
}
8484

8585
private static RejectedExecutionHandler createRejectedExecutionHandler(String rejectedPolicy) {

boot-netty-rpc-core/src/main/java/io/github/forezp/netty/rpc/core/protocol/client/RequestInterceptor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.netty.channel.ChannelFuture;
77

88
import java.util.concurrent.CyclicBarrier;
9+
import java.util.concurrent.TimeUnit;
910

1011
/**
1112
* ${DESCRIPTION}
@@ -30,7 +31,7 @@ public Object invokeSync(ChannelFuture channelFuture, NettyRpcRequest request) t
3031
cacheContainer.getSyncEntityMap().put( messageId, responseSyncEntity );
3132
try {
3233
invokeAsync( channelFuture, request );
33-
barrier.await();//TODO 超时没有处理
34+
barrier.await( 5000, TimeUnit.MILLISECONDS );//TODO 5s 超时没有处理
3435
return responseSyncEntity.getResult();
3536
} catch (Exception e) {
3637
e.printStackTrace();

boot-netty-rpc-core/src/main/java/io/github/forezp/netty/rpc/core/protocol/client/RequstInvocationHandler.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
/**
2424
* 参考了 https://www.cnblogs.com/clonen/p/6735011.html
25+
*
2526
* @author fangzhipeng
2627
* create 2018-05-21
2728
**/
@@ -66,7 +67,7 @@ public Object run(Method method, Object[] args) throws Exception {
6667
NettyRpcRequest request = buildRequest( entity, method, args );
6768
ConnectionEntity connectionEntity = findCandidateConnection( entity );
6869
if (connectionEntity != null) {
69-
LOG.info( "run connectionEntity:" + connectionEntity.toString() );
70+
// LOG.info( "run connectionEntity:" + connectionEntity.toString() );
7071
RequestInterceptor interceptor = (RequestInterceptor) NettyRpcApplication.getBean( "requestInterceptor" );
7172
if (request.isSyn()) {
7273
Object object = interceptor.invokeSync( connectionEntity.getChannelFuture(), request );
@@ -84,14 +85,19 @@ public Object run(Method method, Object[] args) throws Exception {
8485
BeanUtils.copyProperties( entity, appEntity );
8586
ExcutorContainer excutorContainer = (ExcutorContainer) NettyRpcApplication.getBean( "excutorContainer" );
8687
NettyClientExcutor excutor = (NettyClientExcutor) excutorContainer.getClientExcutor();
87-
try {
88-
excutor.start( appEntity );
89-
while (!excutor.started( appEntity )) {
88+
synchronized (RequstInvocationHandler.class) {
89+
try {
90+
if (excutor.started( appEntity )) {
91+
return run( method, args );
92+
}
93+
excutor.start( appEntity );
94+
while (!excutor.started( appEntity )) {
95+
}
96+
return run( method, args );
97+
} catch (Exception e) {
98+
e.printStackTrace();
99+
return null;
90100
}
91-
return run( method, args );
92-
} catch (Exception e) {
93-
e.printStackTrace();
94-
return null;
95101
}
96102
}
97103
}

boot-netty-rpc-core/src/main/java/io/github/forezp/netty/rpc/core/protocol/client/ResponseHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public class ResponseHandler extends AbstractResponseHandler {
1717
@Override
1818
public void handle(NettyRpcResponse response) {
1919
response.setEndTime(System.currentTimeMillis());
20-
LOG.info("Client received: " + response.toString());
20+
// LOG.info("Client received: " + response.toString());
2121
LOG.info("耗时: " + (response.getEndTime() - response.getStartTime()) + "ms");
2222
//处理response
2323
if (response.isSyn()) {

boot-netty-rpc-core/src/main/java/io/github/forezp/netty/rpc/core/protocol/server/NettyServerHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public void channelRead0(final ChannelHandlerContext ctx, final NettyRpcRequest
3737
@Override
3838
public Object call() throws Exception {
3939

40-
LOG.info( "Server received: " + request.toString() );
40+
// LOG.info( "Server received: " + request.toString() );
4141
NettyRpcResponse response = new NettyRpcResponse( 1, "sucess", null );
4242
excutorContainer.getServerRequestHandler().handle( request, response );
4343
ctx.writeAndFlush( response );

examples/example-rpc-lib/src/main/java/com/forezp/examplerpclib/lib/IGreeting.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
* @author fangzhipeng
99
* @create 2018-05-18
1010
**/
11-
@RpcClient(name = "server", rpcClz = "com.forezp.examplerpcserver.api.Greeting")
11+
@RpcClient(name = "server", rpcClz = "com.forezp.examplerpcserver.api.Greeting", isSyn = false)
1212
public interface IGreeting {
1313

1414
String sayHello(String name);

examples/local-rpc-lient/src/main/java/com/forezp/localrpclient/LocalRpcLientApplication.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.forezp.examplerpclib.lib.IGreeting;
55
import io.github.forezp.netty.rpc.core.annotation.EnableNettyRpc;
66
import io.github.forezp.netty.rpc.core.common.dto.RespDTO;
7+
import io.github.forezp.netty.rpc.core.common.thread.ThreadPoolFactory;
78
import io.github.forezp.netty.rpc.core.protocol.client.Invoker;
89
import io.github.forezp.netty.rpc.core.monitor.MonitorMessage;
910
import org.springframework.boot.SpringApplication;
@@ -13,26 +14,34 @@
1314
import org.springframework.web.bind.annotation.RequestBody;
1415
import org.springframework.web.bind.annotation.RestController;
1516

17+
import java.util.Random;
18+
import java.util.concurrent.BrokenBarrierException;
19+
import java.util.concurrent.CyclicBarrier;
20+
import java.util.concurrent.Semaphore;
21+
import java.util.concurrent.ThreadFactory;
22+
1623
@SpringBootApplication
1724
@RestController
1825
@EnableNettyRpc(basePackages = "com.forezp")
1926
public class LocalRpcLientApplication {
2027

2128
public static void main(String[] args) {
22-
SpringApplication.run(LocalRpcLientApplication.class, args);
29+
SpringApplication.run( LocalRpcLientApplication.class, args );
2330
}
2431

2532
@GetMapping("/test")
2633
public RespDTO test() throws Exception {
2734

28-
IGreeting invoker = (IGreeting) Invoker.invoke(IGreeting.class);
29-
Object result = invoker.sayHello("sww");
30-
return RespDTO.success(result);
35+
IGreeting invoker = (IGreeting) Invoker.invoke( IGreeting.class );
36+
Object result = invoker.sayHello( "sww" );
37+
38+
39+
return RespDTO.success( result );
3140
}
3241

3342
@PostMapping("/postmsg")
3443
public void post(@RequestBody MonitorMessage message) {
35-
System.out.println(JSON.toJSONString(message));
44+
//System.out.println(JSON.toJSONString(message));
3645
}
3746

3847
}

0 commit comments

Comments
 (0)