22
33import com .baidu .brpc .client .BrpcProxy ;
44import com .baidu .brpc .client .RpcClient ;
5+ import com .baidu .brpc .client .RpcClientOptions ;
6+ import com .baidu .brpc .client .instance .Endpoint ;
7+ import com .github .wenweihu86 .raft .Peer ;
58import com .github .wenweihu86 .raft .example .server .ExampleStateMachine ;
69import com .github .wenweihu86 .raft .example .server .service .ExampleProto ;
710import com .github .wenweihu86 .raft .example .server .service .ExampleService ;
1114import org .slf4j .Logger ;
1215import org .slf4j .LoggerFactory ;
1316
17+ import java .util .concurrent .locks .Lock ;
18+ import java .util .concurrent .locks .ReentrantLock ;
19+
1420/**
1521 * Created by wenweihu86 on 2017/5/9.
1622 */
@@ -21,10 +27,35 @@ public class ExampleServiceImpl implements ExampleService {
2127
2228 private RaftNode raftNode ;
2329 private ExampleStateMachine stateMachine ;
30+ private int leaderId = -1 ;
31+ private RpcClient leaderRpcClient = null ;
32+ private Lock leaderLock = new ReentrantLock ();
2433
2534 public ExampleServiceImpl (RaftNode raftNode , ExampleStateMachine stateMachine ) {
2635 this .raftNode = raftNode ;
2736 this .stateMachine = stateMachine ;
37+ onLeaderChangeEvent ();
38+ }
39+
40+ private void onLeaderChangeEvent () {
41+ if (raftNode .getLeaderId () != -1
42+ && raftNode .getLeaderId () != raftNode .getLocalServer ().getServerId ()
43+ && leaderId != raftNode .getLeaderId ()) {
44+ leaderLock .lock ();
45+ if (leaderId != -1 && leaderRpcClient != null ) {
46+ leaderRpcClient .stop ();
47+ leaderRpcClient = null ;
48+ leaderId = -1 ;
49+ }
50+ leaderId = raftNode .getLeaderId ();
51+ Peer peer = raftNode .getPeerMap ().get (leaderId );
52+ Endpoint endpoint = new Endpoint (peer .getServer ().getEndpoint ().getHost (),
53+ peer .getServer ().getEndpoint ().getPort ());
54+ RpcClientOptions rpcClientOptions = new RpcClientOptions ();
55+ rpcClientOptions .setGlobalThreadPoolSharing (true );
56+ leaderRpcClient = new RpcClient (endpoint , rpcClientOptions );
57+ leaderLock .unlock ();
58+ }
2859 }
2960
3061 @ Override
@@ -34,8 +65,8 @@ public ExampleProto.SetResponse set(ExampleProto.SetRequest request) {
3465 if (raftNode .getLeaderId () <= 0 ) {
3566 responseBuilder .setSuccess (false );
3667 } else if (raftNode .getLeaderId () != raftNode .getLocalServer ().getServerId ()) {
37- RpcClient rpcClient = raftNode . getPeerMap (). get ( raftNode . getLeaderId ()). getRpcClient ();
38- ExampleService exampleService = BrpcProxy .getProxy (rpcClient , ExampleService .class );
68+ onLeaderChangeEvent ();
69+ ExampleService exampleService = BrpcProxy .getProxy (leaderRpcClient , ExampleService .class );
3970 ExampleProto .SetResponse responseFromLeader = exampleService .set (request );
4071 responseBuilder .mergeFrom (responseFromLeader );
4172 } else {
0 commit comments