44import java .sql .Connection ;
55import java .sql .PreparedStatement ;
66import java .sql .SQLException ;
7- import java .util .HashMap ;
87import java .util .List ;
98import java .util .Map ;
109import java .util .concurrent .BlockingDeque ;
@@ -139,7 +138,9 @@ public void onEvent(WeEvent event) {
139138 String content = new String (event .getContent ());
140139 log .info ("on event:{},content:{}" , event .toString (), content );
141140
142- if ("json" .equals (event .getExtensions ().get ("weevent-format" )) && CommonUtil .checkValidJson (content )) {
141+
142+ // check the content
143+ if (JSONObject .isValid (content )) {
143144 handleOnEvent (client , event , ruleMap );
144145 } else {
145146 handleOnEventOtherPattern (client , event , ruleMap );
@@ -162,8 +163,8 @@ public void onEvent(WeEvent event) {
162163
163164 String content = new String (event .getContent ());
164165 log .info ("on event:{},content:{}" , event .toString (), content );
165-
166- if ("json" . equals ( event . getExtensions (). get ( "weevent-format" )) && CommonUtil . checkValidJson (content )) {
166+ // check the content
167+ if (JSONObject . isValid (content )) {
167168 handleOnEvent (client , event , ruleMap );
168169 } else {
169170 handleOnEventOtherPattern (client , event , ruleMap );
@@ -246,7 +247,7 @@ private static void sendMessageToDB(String groupId, WeEvent eventContent, CEPRul
246247 // execute the preparedstatement
247248 int res = preparedStmt .executeUpdate ();
248249 if (res > 0 ) {
249- log .info ("insert db success!!! " );
250+ log .info ("insert db success... " );
250251 }
251252 preparedStmt .close ();
252253
@@ -335,8 +336,7 @@ private static void handleOnEvent(IWeEventClient client, WeEvent event, Map<Stri
335336 log .info ("publish select: {},eventContent:{}" , entry .getValue ().getSelectField (), eventContent );
336337
337338 // publish the message
338- Map <String , String > extensions = new HashMap <>();
339- WeEvent weEvent = new WeEvent (entry .getValue ().getToDestination (), eventContent .getBytes (StandardCharsets .UTF_8 ), extensions );
339+ WeEvent weEvent = new WeEvent (entry .getValue ().getToDestination (), eventContent .getBytes (StandardCharsets .UTF_8 ), event .getExtensions ());
340340 log .info ("after hitRuleEngine weEvent groupId: {}, event:{}" , groupId , weEvent .toString ());
341341 IWeEventClient toDestinationClient = getClient (entry .getValue ());
342342 toDestinationClient .publish (weEvent );
0 commit comments