Skip to content

Commit a0c6642

Browse files
committed
AJ-931: add filter parameter support to EventClient subscribe method
1 parent c24e64c commit a0c6642

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

src/com/xxdb/streaming/client/cep/EventClient.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.xxdb.streaming.client.IMessage;
1111
import com.xxdb.streaming.client.MessageHandler;
1212
import com.xxdb.streaming.client.Site;
13+
import com.xxdb.data.Vector;
1314
import org.slf4j.Logger;
1415
import org.slf4j.LoggerFactory;
1516
import java.io.IOException;
@@ -34,13 +35,17 @@ public EventClient(List<EventSchema> eventSchemas, List<String> eventTimeFields,
3435
}
3536

3637
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, String userName, String password) throws IOException {
38+
subscribe(host, port, tableName, actionName, handler, offset, reconnect, null, userName, password);
39+
}
40+
41+
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, String userName, String password) throws IOException {
3742
if (Utils.isEmpty(tableName))
3843
throw new IllegalArgumentException("EventClient subscribe 'tableName' param cannot be null or empty.");
3944

4045
if (Utils.isEmpty(actionName))
4146
actionName = DEFAULT_ACTION_NAME;
4247

43-
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, null, null, false, userName, password, false);
48+
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, null, false, userName, password, false);
4449
if (queue == null) {
4550
System.err.println("Subscription already made, handler loop not created.");
4651
return;
@@ -68,7 +73,7 @@ public void subscribe(String host, int port, String tableName, String actionName
6873
eventTypes.clear();
6974
attributes.clear();
7075

71-
// todo noticehere MessageParser handled col to row;
76+
// todo notice, here MessageParser handled col to row;
7277
if (!eventHandler.deserializeEvent(msgs, eventTypes, attributes, errorInfo)) {
7378
System.out.println("deserialize fail " + errorInfo.getErrorInfo());
7479
continue;
@@ -141,7 +146,7 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
141146
protected boolean doReconnect(Site site) {
142147
try {
143148
site.getHost();
144-
subscribe(site.getHost(), site.getPort(), site.getTableName(), site.getActionName(), site.getHandler(), site.getMsgId() + 1, true, site.getUserName(), site.getPassWord());
149+
subscribe(site.getHost(), site.getPort(), site.getTableName(), site.getActionName(), site.getHandler(), site.getMsgId() + 1, true, site.getFilter(), site.getUserName(), site.getPassWord());
145150
Date d = new Date();
146151
DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
147152
log.info(df.format(d) + " Successfully reconnected and subscribed " + site.getHost() + ":" + site.getPort() + "/" + site.getTableName() + "/" + site.getActionName());

0 commit comments

Comments
 (0)