Skip to content

Commit 3c5b029

Browse files
puremilkfansaintping
authored andcommitted
fix issue #358 : detect rule cyclic in DAG (#359)
1 parent 9105813 commit 3c5b029

File tree

12 files changed

+357
-46
lines changed

12 files changed

+357
-46
lines changed
Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,45 @@
11
package com.webank.weevent.governance.controller;
22

3+
import javax.servlet.http.HttpServletRequest;
4+
import javax.servlet.http.HttpServletResponse;
5+
6+
import com.webank.weevent.governance.exception.GovernanceException;
7+
import com.webank.weevent.governance.service.CommonService;
8+
39
import lombok.extern.slf4j.Slf4j;
10+
import org.apache.http.client.methods.CloseableHttpResponse;
11+
import org.apache.http.util.EntityUtils;
412
import org.springframework.beans.factory.annotation.Autowired;
513
import org.springframework.beans.factory.annotation.Value;
614
import org.springframework.web.bind.annotation.CrossOrigin;
715
import org.springframework.web.bind.annotation.PathVariable;
816
import org.springframework.web.bind.annotation.RequestMapping;
917
import org.springframework.web.bind.annotation.RequestMethod;
10-
import org.springframework.web.bind.annotation.RequestParam;
11-
import org.springframework.web.client.RestTemplate;
18+
import org.springframework.web.bind.annotation.RestController;
1219

1320
@CrossOrigin
1421
@Slf4j
22+
@RestController
1523
public class ForwardController {
1624

25+
1726
@Autowired
18-
private RestTemplate restTemplate;
27+
private CommonService commonService;
1928

2029
@Value("${weevent.url}")
2130
private String url;
2231

2332
@RequestMapping(value = "/weevent/{path1}/{path2}", method = RequestMethod.GET)
24-
public Object forward(@PathVariable(name = "path1") String path1, @PathVariable(name = "path2") String path2) {
33+
public Object forward(HttpServletRequest request, HttpServletResponse response, @PathVariable(name = "path1") String path1, @PathVariable(name = "path2") String path2)throws GovernanceException {
2534
log.info("weevent url: /wevent/ {} \"/\" {}", path1, path2);
2635
String forwarUrl = new StringBuffer(this.url).append("/").append(path1).append("/").append(path2).toString();
27-
Object result = restTemplate.getForEntity(forwarUrl, Object.class).getBody();
28-
return result;
36+
try {
37+
CloseableHttpResponse closeResponse = commonService.getCloseResponse(request, forwarUrl);
38+
return EntityUtils.toString(closeResponse.getEntity());
39+
} catch (Exception e) {
40+
throw new GovernanceException(e.getMessage());
41+
}
2942
}
3043

31-
@RequestMapping(value = "/weevent/admin/deploy_topic_control", method = RequestMethod.GET)
32-
public Object forward() {
33-
log.info("wevent url: /weevent/admin/deploy_topic_control");
34-
String forwarUrl = new StringBuffer(this.url).append("/admin/deploy_topic_control").toString();
35-
String result = restTemplate.getForEntity(forwarUrl, String.class).getBody();
36-
return result;
37-
}
3844

39-
@RequestMapping(value = "/weevent/{path1}/{path2}")
40-
public Object forward(@PathVariable(name = "path1") String path1, @PathVariable(name = "path2") String path2,
41-
@RequestParam String topic) {
42-
log.info("weevent url: /wevent/ {} \"/\" {}", path1, path2);
43-
44-
String forwarUrl = new StringBuffer(this.url).append("/").append(path1).append("/").append(path2).append("?topic=").append(topic).toString();
45-
Object result = restTemplate.getForEntity(forwarUrl, Object.class).getBody();
46-
return result;
47-
}
4845
}

weevent-governance/src/main/java/com/webank/weevent/governance/mapper/RuleEngineMapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,14 @@ public interface RuleEngineMapper {
2424
// update RuleEngineEntity
2525
Boolean updateRuleEngine(RuleEngineEntity ruleEngineEntity);
2626

27+
List<RuleEngineEntity> getRuleTopicList(RuleEngineEntity ruleEngineEntity);
28+
2729
Boolean updateRuleEngineStatus(RuleEngineEntity ruleEngineEntity);
2830

2931
int countRuleEngine(RuleEngineEntity ruleEngineEntity);
3032

3133
RuleEngineEntity getRuleById(Integer id);
3234

35+
List<RuleEngineEntity> checkRuleNameRepeat(RuleEngineEntity ruleEngineEntity);
36+
3337
}

weevent-governance/src/main/java/com/webank/weevent/governance/service/CommonService.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
import java.util.ArrayList;
1313
import java.util.Enumeration;
1414
import java.util.HashMap;
15+
import java.util.HashSet;
1516
import java.util.List;
1617
import java.util.Map;
18+
import java.util.Set;
1719

1820
import javax.servlet.ServletException;
1921
import javax.servlet.ServletOutputStream;
@@ -85,7 +87,6 @@ public CloseableHttpResponse getCloseResponse(HttpServletRequest req, String new
8587
public CloseableHttpResponse getCloseResponse(HttpServletRequest req, String newUrl, String jsonString) throws ServletException {
8688
CloseableHttpResponse closeResponse;
8789
try {
88-
8990
log.info("url {}", newUrl);
9091
CloseableHttpClient client = this.generateHttpClient(newUrl);
9192
if (req.getMethod().equals(METHOD_TYPE)) {
@@ -276,6 +277,13 @@ private static String truncateUrlPage(String strURL) {
276277
return strAllParam;
277278
}
278279

280+
public Set<String> mergeSet(Set<String> list1, Set<String> list2) {
281+
Set<String> set = new HashSet<>();
282+
set.addAll(list1);
283+
set.addAll(list2);
284+
return set;
285+
}
286+
279287

280288
@Override
281289
public void close() throws Exception {

weevent-governance/src/main/java/com/webank/weevent/governance/service/RuleEngineService.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.net.URLEncoder;
44
import java.text.SimpleDateFormat;
55
import java.util.ArrayList;
6+
import java.util.Collections;
67
import java.util.Date;
78
import java.util.HashMap;
89
import java.util.HashSet;
@@ -29,6 +30,7 @@
2930
import com.webank.weevent.governance.mapper.RuleEngineMapper;
3031
import com.webank.weevent.governance.properties.ConstantProperties;
3132
import com.webank.weevent.governance.utils.CookiesTools;
33+
import com.webank.weevent.governance.utils.DAGDetectUtil;
3234
import com.webank.weevent.governance.utils.NumberValidationUtils;
3335

3436
import com.alibaba.fastjson.JSONObject;
@@ -67,6 +69,10 @@ public class RuleEngineService {
6769
@Autowired
6870
private BrokerMapper brokerMapper;
6971

72+
@Autowired
73+
private DAGDetectUtil dagDetectUtil;
74+
75+
7076
@Value("${weevent.processor.url:http://127.0.0.1:7008}")
7177
private String processorUrl;
7278

@@ -271,6 +277,10 @@ public boolean updateRuleEngine(RuleEngineEntity ruleEngineEntity, HttpServletRe
271277
if (!flag) {
272278
throw new GovernanceException("conditional is illegal");
273279
}
280+
flag = verifyInfiniteLoop(ruleEngineEntity);
281+
if (!flag) {
282+
throw new GovernanceException("update rule failed, detected DAG loop at topic [" + ruleEngineEntity.getFromDestination() + "]");
283+
}
274284
RuleDatabaseEntity ruleDataBase = getRuleDataBase(ruleEngineEntity.getRuleDataBaseId());
275285
if (ruleDataBase != null) {
276286
ruleEngineEntity.setDatabaseUrl(ruleDataBase.getDatabaseUrl() + "&tableName=" + ruleDataBase.getTableName());
@@ -661,8 +671,8 @@ private void checkRule(RuleEngineEntity ruleEngineEntity) throws GovernanceExcep
661671
if (ruleEngineEntity.getPayloadMap().isEmpty()) {
662672
throw new GovernanceException("rule description is empty");
663673
}
664-
if (ruleEngineEntity.getPayload() != null && ruleEngineEntity.getPayload().length() > 100) {
665-
throw new GovernanceException("rule description length cannot exceed 100");
674+
if (ruleEngineEntity.getPayload() != null && ruleEngineEntity.getPayload().length() > 4096) {
675+
throw new GovernanceException("rule description length cannot exceed 4096");
666676
}
667677

668678
}
@@ -677,12 +687,9 @@ private boolean checkRuleName(String ruleName, String regex) {
677687
//check name repeat
678688
private boolean checkRuleNameRepeat(RuleEngineEntity ruleEngineEntity) {
679689
RuleEngineEntity rule = new RuleEngineEntity();
680-
rule.setGroupId(ruleEngineEntity.getGroupId());
681-
rule.setUserId(ruleEngineEntity.getUserId());
682-
rule.setBrokerId(ruleEngineEntity.getBrokerId());
683690
rule.setRuleName(ruleEngineEntity.getRuleName());
684691
rule.setSystemTag("2");
685-
List<RuleEngineEntity> ruleEngines = ruleEngineMapper.getRuleEngines(rule);
692+
List<RuleEngineEntity> ruleEngines = ruleEngineMapper.checkRuleNameRepeat(rule);
686693
if (CollectionUtils.isEmpty(ruleEngines)) {
687694
return true;
688695
}
@@ -789,4 +796,28 @@ private RuleDatabaseEntity getRuleDataBase(Integer id) {
789796
return ruleDatabaseMapper.getRuleDataBaseById(id);
790797
}
791798

799+
private boolean verifyInfiniteLoop(RuleEngineEntity ruleEngineEntity) {
800+
if (!ConditionTypeEnum.TOPIC.getCode().equals(ruleEngineEntity.getConditionType())) {
801+
return true;
802+
}
803+
RuleEngineEntity rule = new RuleEngineEntity();
804+
rule.setGroupId(ruleEngineEntity.getGroupId());
805+
rule.setBrokerId(ruleEngineEntity.getBrokerId());
806+
807+
//query all historical rules according to brokerId groupId
808+
List<RuleEngineEntity> ruleTopicList = ruleEngineMapper.getRuleTopicList(rule);
809+
if (CollectionUtils.isEmpty(ruleTopicList)) {
810+
return true;
811+
}
812+
ruleTopicList.add(ruleEngineEntity);
813+
Map<String, Set<String>> map = new HashMap<>();
814+
815+
for (RuleEngineEntity engineEntity : ruleTopicList) {
816+
map.merge(engineEntity.getFromDestination(), new HashSet<>(Collections.singleton(engineEntity.getToDestination())), (a, b) -> commonService.mergeSet(a, b));
817+
}
818+
Set<String> set = map.keySet();
819+
return dagDetectUtil.checkLoop(map, set);
820+
}
821+
822+
792823
}

weevent-governance/src/main/java/com/webank/weevent/governance/service/TopicHistoricalService.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import java.io.FileInputStream;
44
import java.net.URL;
55
import java.util.ArrayList;
6-
import java.util.Arrays;
76
import java.util.Calendar;
87
import java.util.Collections;
98
import java.util.Date;
@@ -249,8 +248,8 @@ private RuleEngineEntity initializationRule(String ruleName, BrokerEntity broker
249248
return ruleEngineEntity;
250249
}
251250

252-
private List<String> getGroupList(HttpServletRequest request, BrokerEntity brokerEntity) throws GovernanceException {
253-
List<String> groupList;
251+
private List getGroupList(HttpServletRequest request, BrokerEntity brokerEntity) throws GovernanceException {
252+
List groupList;
254253
String url = new StringBuffer(brokerEntity.getBrokerUrl()).append(ConstantProperties.REST_LIST_SUBSCRIPTION).toString();
255254
try {
256255
log.info("url:{}", url);
@@ -259,8 +258,14 @@ private List<String> getGroupList(HttpServletRequest request, BrokerEntity broke
259258
if (StringUtil.isBlank(mes)) {
260259
throw new GovernanceException("group is empty");
261260
}
262-
String[] split = mes.replace("[", "").replace("]", "").split(",");
263-
groupList = Arrays.asList(split);
261+
JSONObject jsonObject = JSONObject.parseObject(mes);
262+
Object data = jsonObject.get("data");
263+
if ("0".equals(jsonObject.get("code").toString()) && data instanceof List) {
264+
groupList = (List) data;
265+
} else {
266+
throw new GovernanceException(jsonObject.get("message").toString());
267+
}
268+
264269
return groupList;
265270
} catch (Exception e) {
266271
log.error("get group list fail", e);
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.webank.weevent.governance.utils;
2+
3+
import java.util.ArrayList;
4+
import java.util.HashMap;
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.Set;
8+
import java.util.Stack;
9+
10+
import org.springframework.stereotype.Component;
11+
12+
@SuppressWarnings("unchecked")
13+
@Component
14+
public class DAGDetectUtil {
15+
private Stack<String> stack = new Stack<>();
16+
17+
public boolean checkLoop(Map<String, Set<String>> map, Set<String> topicSet) {
18+
Map start = createNode("start");
19+
Map<String, Map> nodeMap = new HashMap<>();
20+
topicSet.forEach(it -> {
21+
Map node = createNode(it);
22+
((List) start.get("child")).add(node);
23+
nodeMap.put(it, node);
24+
});
25+
map.forEach((k, v) -> {
26+
v.forEach(it -> {
27+
if (nodeMap.containsKey(it)) {
28+
((List) nodeMap.get(k).get("child")).add(nodeMap.get(it));
29+
}
30+
});
31+
});
32+
return checkChild(start);
33+
}
34+
35+
private Map createNode(String name) {
36+
HashMap node = new HashMap();
37+
node.put("topic", name);
38+
node.put("child", new ArrayList());
39+
return node;
40+
}
41+
42+
43+
private boolean checkChild(Map cursor) {
44+
if (stack.contains(cursor.get("topic"))) {
45+
stack = new Stack<>();
46+
return false;
47+
}
48+
stack.push((String) cursor.get("topic"));
49+
List childs = (List) cursor.get("child");
50+
if (childs != null) {
51+
for (Object child : childs) {
52+
if (!checkChild((Map) child)) {
53+
return false;
54+
}
55+
}
56+
}
57+
stack.pop();
58+
return true;
59+
}
60+
61+
62+
}

weevent-governance/src/main/resources/application-dev.properties

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,6 @@ logging.config=classpath:log4j2.xml
2626
spring.pid.file=./logs/governance.pid
2727
spring.pid.fail-on-write-error=true
2828

29-
weevent.processor.url=http://localhost:7008
29+
weevent.processor.url=http://127.0.0.1:8080
30+
31+
weevent.url=http://127.0.0.1:8080/weevent

weevent-governance/src/main/resources/application-prod.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,5 @@ logging.config=classpath:log4j2.xml
2626
spring.pid.file=./logs/governance.pid
2727
spring.pid.fail-on-write-error=true
2828
weevent.processor.url=http://127.0.0.1:7008
29+
30+
weevent.url=http://127.0.0.1:8080/weevent

weevent-governance/src/main/resources/mappers/RuleEngineMapper.xml

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
<result column="payload" property="payload" jdbcType="VARCHAR"/>
1212
<result column="user_id" property="userId" jdbcType="INTEGER"/>
1313
<result column="broker_id" property="brokerId" jdbcType="INTEGER"/>
14-
<result column="cep_id" property="cepId" jdbcType="VARCHAR"/>
1514
<result column="broker_url" property="brokerUrl" jdbcType="VARCHAR"/>
1615
<result column="from_destination" property="fromDestination" jdbcType="VARCHAR"/>
1716
<result column="to_destination" property="toDestination" jdbcType="VARCHAR"/>
@@ -24,23 +23,24 @@
2423
<result column="error_message" property="errorMessage" jdbcType="VARCHAR"/>
2524
<result column="status" property="status" jdbcType="INTEGER"/>
2625
<result column="group_id" property="groupId" jdbcType="VARCHAR"/>
26+
<result column="cep_id" property="cepId" jdbcType="VARCHAR"/>
2727
<result column="system_tag" property="systemTag" jdbcType="VARCHAR"/>
2828
</resultMap>
2929

3030
<sql id="Base_Column_List">
31-
id,create_date,last_update, rule_name,payload_type,payload,broker_id,cep_id,broker_url,user_id,
31+
id,create_date,last_update, rule_name,payload_type,payload,broker_id,broker_url,user_id,cep_id,
3232
from_destination,to_destination,select_field,condition_field,condition_type,
3333
rule_database_id,error_destination,error_message,status,group_id,system_tag
3434
</sql>
3535

3636
<insert id="addRuleEngine" parameterType="com.webank.weevent.governance.entity.RuleEngineEntity"
3737
useGeneratedKeys="true" keyProperty="id">
38-
insert into t_rule_engine(rule_name, payload_type, payload, broker_id, broker_url, user_id, cep_id,
38+
insert into t_rule_engine(rule_name, payload_type, payload, broker_id, broker_url, user_id,
3939
from_destination, to_destination, select_field,
40-
condition_field, condition_type, status, group_id, error_message,rule_database_id,system_tag)
41-
values (#{ruleName}, #{payloadType}, #{payload}, #{brokerId}, #{brokerUrl}, #{userId}, #{cepId},
40+
condition_field, condition_type, status, group_id, error_message,rule_database_id,system_tag,cep_id)
41+
values (#{ruleName}, #{payloadType}, #{payload}, #{brokerId}, #{brokerUrl}, #{userId},
4242
#{fromDestination}, #{toDestination}, #{selectField},
43-
#{conditionField}, #{conditionType}, #{status}, #{groupId}, #{errorMessage},#{ruleDataBaseId},#{systemTag})
43+
#{conditionField}, #{conditionType}, #{status}, #{groupId}, #{errorMessage},#{ruleDataBaseId},#{systemTag},#{cepId})
4444
</insert>
4545

4646
<select id="getRuleEngines" parameterType="com.webank.weevent.governance.entity.RuleEngineEntity"
@@ -100,6 +100,20 @@
100100
</if>
101101
</sql>
102102

103+
<select id="getRuleTopicList" resultMap="BaseResultMap">
104+
select distinct from_destination,to_destination
105+
from t_rule_engine
106+
where status != 2 and condition_type=1
107+
<include refid="ruleEngineWhere"/>
108+
</select>
109+
110+
<select id="checkRuleNameRepeat" parameterType="com.webank.weevent.governance.entity.RuleEngineEntity"
111+
resultMap="BaseResultMap">
112+
select
113+
<include refid="Base_Column_List"/>
114+
from t_rule_engine where rule_name=#{ruleName}
115+
</select>
116+
103117
<update id="updateRuleEngine" parameterType="com.webank.weevent.governance.entity.RuleEngineEntity">
104118
update t_rule_engine
105119
set rule_name=#{ruleName},
@@ -111,7 +125,8 @@
111125
to_destination=#{toDestination},
112126
rule_database_id=#{ruleDataBaseId},
113127
error_destination=#{errorDestination},
114-
error_message=#{errorMessage}
128+
error_message=#{errorMessage},
129+
cep_id=#{cepId}
115130
where id = #{id,jdbcType=INTEGER}
116131
and status != 2
117132
</update>

0 commit comments

Comments
 (0)