Skip to content

Commit 42f9a76

Browse files
haohao0103imbajinCopilot
authored
refactor: adjust the related filters of sofa-bolt (#2735)
* 优化调整sofa-bolt相关filter * 优化调整sofa-bolt相关filter * 优化调整sofa-bolt相关filter * Update hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/auth/IpAuthHandler.java Co-authored-by: Copilot <[email protected]> * Update hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/auth/IpAuthHandler.java Co-authored-by: Copilot <[email protected]> * tiny improve * fix wrong usage in log --------- Co-authored-by: imbajin <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent c99cfcd commit 42f9a76

File tree

5 files changed

+282
-10
lines changed

5 files changed

+282
-10
lines changed

hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/KVOperation.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929

3030
import lombok.Data;
3131

32+
import org.apache.hugegraph.pd.raft.serializer.HugegraphHessianSerializerFactory;
33+
3234
@Data
3335
public class KVOperation {
3436

@@ -84,6 +86,7 @@ public static KVOperation fromByteArray(byte[] value) throws IOException {
8486

8587
try (ByteArrayInputStream bis = new ByteArrayInputStream(value, 1, value.length - 1)) {
8688
Hessian2Input input = new Hessian2Input(bis);
89+
input.setSerializerFactory(HugegraphHessianSerializerFactory.getInstance());
8790
KVOperation op = new KVOperation();
8891
op.op = value[0];
8992
op.key = input.readBytes();

hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,24 @@
1919

2020
import java.io.File;
2121
import java.util.ArrayList;
22+
import java.util.Collections;
2223
import java.util.HashSet;
2324
import java.util.List;
2425
import java.util.Objects;
2526
import java.util.concurrent.CompletableFuture;
2627
import java.util.concurrent.CountDownLatch;
2728
import java.util.concurrent.ExecutionException;
2829
import java.util.concurrent.atomic.AtomicReference;
30+
import java.util.stream.Collectors;
2931

3032
import org.apache.hugegraph.pd.common.PDException;
3133
import org.apache.hugegraph.pd.config.PDConfig;
3234
import org.apache.hugegraph.pd.grpc.Metapb;
3335
import org.apache.hugegraph.pd.grpc.Pdpb;
36+
import org.apache.hugegraph.pd.raft.auth.IpAuthHandler;
3437

38+
import com.alipay.remoting.ExtendedNettyChannelHandler;
39+
import com.alipay.remoting.config.BoltServerOption;
3540
import com.alipay.sofa.jraft.JRaftUtils;
3641
import com.alipay.sofa.jraft.Node;
3742
import com.alipay.sofa.jraft.RaftGroupService;
@@ -47,10 +52,12 @@
4752
import com.alipay.sofa.jraft.option.RpcOptions;
4853
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
4954
import com.alipay.sofa.jraft.rpc.RpcServer;
55+
import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer;
5056
import com.alipay.sofa.jraft.util.Endpoint;
5157
import com.alipay.sofa.jraft.util.ThreadId;
5258
import com.alipay.sofa.jraft.util.internal.ThrowUtil;
5359

60+
import io.netty.channel.ChannelHandler;
5461
import lombok.extern.slf4j.Slf4j;
5562

5663
@Slf4j
@@ -117,7 +124,7 @@ public boolean init(PDConfig.Raft config) {
117124

118125
final PeerId serverId = JRaftUtils.getPeerId(config.getAddress());
119126

120-
rpcServer = createRaftRpcServer(config.getAddress());
127+
rpcServer = createRaftRpcServer(config.getAddress(), initConf.getPeers());
121128
// construct raft group and start raft
122129
this.raftGroupService =
123130
new RaftGroupService(groupId, serverId, nodeOptions, rpcServer, true);
@@ -130,14 +137,40 @@ public boolean init(PDConfig.Raft config) {
130137
/**
131138
* Create a Raft RPC Server for communication between PDs
132139
*/
133-
private RpcServer createRaftRpcServer(String raftAddr) {
140+
private RpcServer createRaftRpcServer(String raftAddr, List<PeerId> peers) {
134141
Endpoint endpoint = JRaftUtils.getEndPoint(raftAddr);
135142
RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(endpoint);
143+
configureRaftServerIpWhitelist(peers, rpcServer);
136144
RaftRpcProcessor.registerProcessor(rpcServer, this);
137145
rpcServer.init(null);
138146
return rpcServer;
139147
}
140148

149+
private static void configureRaftServerIpWhitelist(List<PeerId> peers, RpcServer rpcServer) {
150+
if (rpcServer instanceof BoltRpcServer) {
151+
((BoltRpcServer) rpcServer).getServer().option(
152+
BoltServerOption.EXTENDED_NETTY_CHANNEL_HANDLER,
153+
new ExtendedNettyChannelHandler() {
154+
@Override
155+
public List<ChannelHandler> frontChannelHandlers() {
156+
return Collections.singletonList(
157+
IpAuthHandler.getInstance(
158+
peers.stream()
159+
.map(PeerId::getIp)
160+
.collect(Collectors.toSet())
161+
)
162+
);
163+
}
164+
165+
@Override
166+
public List<ChannelHandler> backChannelHandlers() {
167+
return Collections.emptyList();
168+
}
169+
}
170+
);
171+
}
172+
}
173+
141174
public void shutDown() {
142175
if (this.raftGroupService != null) {
143176
this.raftGroupService.shutdown();

hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftStateMachine.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public void onApply(Iterator iter) {
9090
done.run(Status.OK());
9191
}
9292
} catch (Throwable t) {
93-
log.error("StateMachine meet critical error: {}.", t);
93+
log.error("StateMachine encountered critical error", t);
9494
if (done != null) {
9595
done.run(new Status(RaftError.EINTERNAL, t.getMessage()));
9696
}
@@ -101,7 +101,7 @@ public void onApply(Iterator iter) {
101101

102102
@Override
103103
public void onError(final RaftException e) {
104-
log.error("Raft StateMachine on error {}", e);
104+
log.error("Raft StateMachine encountered an error", e);
105105
}
106106

107107
@Override
@@ -117,9 +117,7 @@ public void onLeaderStart(final long term) {
117117
log.info("Raft becomes leader");
118118
Utils.runInThread(() -> {
119119
if (!CollectionUtils.isEmpty(stateListeners)) {
120-
stateListeners.forEach(listener -> {
121-
listener.onRaftLeaderChanged();
122-
});
120+
stateListeners.forEach(RaftStateListener::onRaftLeaderChanged);
123121
}
124122
});
125123
}
@@ -136,9 +134,7 @@ public void onStartFollowing(final LeaderChangeContext ctx) {
136134
super.onStartFollowing(ctx);
137135
Utils.runInThread(() -> {
138136
if (!CollectionUtils.isEmpty(stateListeners)) {
139-
stateListeners.forEach(listener -> {
140-
listener.onRaftLeaderChanged();
141-
});
137+
stateListeners.forEach(RaftStateListener::onRaftLeaderChanged);
142138
}
143139
});
144140
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hugegraph.pd.raft.auth;
19+
20+
import java.net.InetSocketAddress;
21+
import java.util.Collections;
22+
import java.util.Set;
23+
24+
import io.netty.channel.ChannelDuplexHandler;
25+
import io.netty.channel.ChannelHandler;
26+
import io.netty.channel.ChannelHandlerContext;
27+
import lombok.extern.slf4j.Slf4j;
28+
29+
@Slf4j
30+
@ChannelHandler.Sharable
31+
public class IpAuthHandler extends ChannelDuplexHandler {
32+
33+
private final Set<String> allowedIps;
34+
private static volatile IpAuthHandler instance;
35+
36+
private IpAuthHandler(Set<String> allowedIps) {
37+
this.allowedIps = Collections.unmodifiableSet(allowedIps);
38+
}
39+
40+
public static IpAuthHandler getInstance(Set<String> allowedIps) {
41+
if (instance == null) {
42+
synchronized (IpAuthHandler.class) {
43+
if (instance == null) {
44+
instance = new IpAuthHandler(allowedIps);
45+
}
46+
}
47+
}
48+
return instance;
49+
}
50+
51+
@Override
52+
public void channelActive(ChannelHandlerContext ctx) throws Exception {
53+
String clientIp = getClientIp(ctx);
54+
if (!isIpAllowed(clientIp)) {
55+
log.warn("Blocked connection from {}", clientIp);
56+
ctx.close();
57+
return;
58+
}
59+
super.channelActive(ctx);
60+
}
61+
62+
private static String getClientIp(ChannelHandlerContext ctx) {
63+
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
64+
return remoteAddress.getAddress().getHostAddress();
65+
}
66+
67+
private boolean isIpAllowed(String ip) {
68+
return allowedIps.isEmpty() || allowedIps.contains(ip);
69+
}
70+
71+
@Override
72+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
73+
String clientIp = getClientIp(ctx);
74+
log.warn("Client : {} connection exception : {}", clientIp, cause);
75+
if (ctx.channel().isActive()) {
76+
ctx.close().addListener(future -> {
77+
if (!future.isSuccess()) {
78+
log.warn("Client: {} connection closed failed: {}",
79+
clientIp, future.cause().getMessage());
80+
}
81+
});
82+
}
83+
}
84+
}
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
19+
package org.apache.hugegraph.pd.raft.serializer;
20+
21+
import com.caucho.hessian.io.Deserializer;
22+
import com.caucho.hessian.io.HessianProtocolException;
23+
import com.caucho.hessian.io.Serializer;
24+
import com.caucho.hessian.io.SerializerFactory;
25+
26+
27+
import lombok.extern.slf4j.Slf4j;
28+
29+
import java.text.SimpleDateFormat;
30+
import java.time.format.DateTimeFormatter;
31+
32+
import java.util.ArrayList;
33+
import java.util.Calendar;
34+
import java.util.Date;
35+
import java.util.HashMap;
36+
import java.util.HashSet;
37+
import java.util.LinkedHashMap;
38+
import java.util.LinkedHashSet;
39+
import java.util.LinkedList;
40+
import java.util.List;
41+
import java.util.Map;
42+
import java.util.Set;
43+
import java.util.TreeMap;
44+
import java.util.TreeSet;
45+
import java.util.concurrent.ConcurrentHashMap;
46+
import java.util.concurrent.ConcurrentMap;
47+
import java.util.concurrent.ConcurrentSkipListMap;
48+
import java.util.concurrent.CopyOnWriteArrayList;
49+
import java.util.concurrent.TimeUnit;
50+
import java.util.concurrent.atomic.AtomicBoolean;
51+
import java.util.concurrent.atomic.AtomicInteger;
52+
import java.util.concurrent.atomic.AtomicLong;
53+
import java.util.concurrent.atomic.AtomicReference;
54+
55+
@Slf4j
56+
public class HugegraphHessianSerializerFactory extends SerializerFactory {
57+
58+
private static final HugegraphHessianSerializerFactory INSTANCE = new HugegraphHessianSerializerFactory();
59+
60+
private HugegraphHessianSerializerFactory() {
61+
super();
62+
initWhitelist();
63+
}
64+
65+
public static HugegraphHessianSerializerFactory getInstance() {
66+
return INSTANCE;
67+
}
68+
69+
private final Set<String> whitelist = new HashSet<>();
70+
71+
private void initWhitelist() {
72+
allowBasicType();
73+
allowCollections();
74+
allowConcurrent();
75+
allowTime();
76+
allowBusinessClasses();
77+
}
78+
79+
private void allowBasicType() {
80+
addToWhitelist(
81+
boolean.class, byte.class, char.class, double.class,
82+
float.class, int.class, long.class, short.class,
83+
Boolean.class, Byte.class, Character.class, Double.class,
84+
Float.class, Integer.class, Long.class, Short.class,
85+
String.class, Class.class, Number.class
86+
);
87+
}
88+
89+
private void allowCollections() {
90+
addToWhitelist(
91+
List.class, ArrayList.class, LinkedList.class,
92+
Set.class, HashSet.class, LinkedHashSet.class, TreeSet.class,
93+
Map.class, HashMap.class, LinkedHashMap.class, TreeMap.class
94+
);
95+
}
96+
97+
private void allowConcurrent() {
98+
addToWhitelist(
99+
AtomicBoolean.class, AtomicInteger.class, AtomicLong.class, AtomicReference.class,
100+
ConcurrentMap.class, ConcurrentHashMap.class, ConcurrentSkipListMap.class, CopyOnWriteArrayList.class
101+
);
102+
}
103+
104+
private void allowTime() {
105+
addToWhitelist(
106+
Date.class, Calendar.class, TimeUnit.class,
107+
SimpleDateFormat.class, DateTimeFormatter.class
108+
);
109+
tryAddClass("java.time.LocalDate");
110+
tryAddClass("java.time.LocalDateTime");
111+
tryAddClass("java.time.Instant");
112+
}
113+
114+
private void allowBusinessClasses() {
115+
addToWhitelist(
116+
org.apache.hugegraph.pd.raft.KVOperation.class,
117+
byte[].class
118+
);
119+
}
120+
121+
private void addToWhitelist(Class<?>... classes) {
122+
for (Class<?> clazz : classes) {
123+
whitelist.add(clazz.getName());
124+
}
125+
}
126+
127+
private void tryAddClass(String className) {
128+
try {
129+
Class.forName(className);
130+
whitelist.add(className);
131+
} catch (ClassNotFoundException e) {
132+
log.warn("Failed to load class {}", className);
133+
}
134+
}
135+
136+
@Override
137+
public Serializer getSerializer(Class cl) throws HessianProtocolException {
138+
checkWhitelist(cl);
139+
return super.getSerializer(cl);
140+
}
141+
142+
@Override
143+
public Deserializer getDeserializer(Class cl) throws HessianProtocolException {
144+
checkWhitelist(cl);
145+
return super.getDeserializer(cl);
146+
}
147+
148+
private void checkWhitelist(Class cl) {
149+
String className = cl.getName();
150+
if (!whitelist.contains(className)) {
151+
log.warn("Security alert: Blocked unauthorized class [{}] at {}",
152+
className, new Date());
153+
throw new SecurityException("hessian serialize unauthorized class: " + className);
154+
}
155+
}
156+
}

0 commit comments

Comments
 (0)