@@ -583,9 +583,7 @@ public void test_EventClient_commonKeys_two_column() throws IOException, Interr
583583 eventSchemas .add (scheme1 );
584584 List <String > eventTimeKeys = new ArrayList <>();
585585 List <String > commonKeys = Arrays .asList (new String []{"time" ,"market" });
586- EventSender sender = EventSender .createEventSender (eventSchemas , eventTimeKeys , commonKeys );
587-
588- sender .connect (conn , "inputTable" );
586+ EventSender sender = new EventSender (conn , "inputTable" ,eventSchemas , eventTimeKeys , commonKeys );
589587 List <Entity > attributes = new ArrayList <>();
590588 attributes .add (new BasicString ("123456" ));
591589 attributes .add (new BasicTime (LocalTime .from (LocalDateTime .of (2024 ,3 ,22 ,10 ,45 ,3 ,100000000 ))));
@@ -614,7 +612,7 @@ public static void subscribePrepare() throws IOException {
614612 eventSchemas .add (scheme );
615613 List <String > eventTimeKeys = Arrays .asList (new String []{"timestamp" });
616614 List <String > commonKeys = Arrays .asList (new String []{"comment1" });
617- sender = EventSender . createEventSender ( eventSchemas , eventTimeKeys , commonKeys );
615+ sender = new EventSender ( conn , "inputTable" , eventSchemas , eventTimeKeys , commonKeys );
618616 client = new EventClient (eventSchemas , eventTimeKeys , commonKeys );
619617 }
620618 @ Test
@@ -721,7 +719,6 @@ public void test_EventClient_subscribe_handler_null() throws IOException, Inter
721719 public void test_EventClient_subscribe_offset_negative_1 () throws IOException , InterruptedException {
722720 subscribePrepare ();
723721 conn .run ("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;" );
724- sender .connect (conn ,"inputTable" );
725722 List <Entity > attributes = new ArrayList <>();
726723 attributes .add (new BasicTimestamp (LocalDateTime .of (2024 ,3 ,22 ,10 ,45 ,3 ,100000000 )));
727724 attributes .add (new BasicString ("123456" ));
@@ -736,7 +733,6 @@ public void test_EventClient_subscribe_offset_negative_1() throws IOException,
736733 public void test_EventClient_subscribe_offset_negative_2 () throws IOException , InterruptedException {
737734 subscribePrepare ();
738735 conn .run ("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;" );
739- sender .connect (conn ,"inputTable" );
740736 List <Entity > attributes = new ArrayList <>();
741737 attributes .add (new BasicTimestamp (LocalDateTime .of (2024 ,3 ,22 ,10 ,45 ,3 ,100000000 )));
742738 attributes .add (new BasicString ("123456" ));
@@ -751,7 +747,6 @@ public void test_EventClient_subscribe_offset_negative_2() throws IOException,
751747 public void test_EventClient_subscribe_offset_0 () throws IOException , InterruptedException {
752748 subscribePrepare ();
753749 conn .run ("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;" );
754- sender .connect (conn ,"inputTable" );
755750 List <Entity > attributes = new ArrayList <>();
756751 attributes .add (new BasicTimestamp (LocalDateTime .of (2024 ,3 ,22 ,10 ,45 ,3 ,100000000 )));
757752 attributes .add (new BasicString ("123456" ));
@@ -767,7 +762,6 @@ public void test_EventClient_subscribe_offset_0() throws IOException, Interrupt
767762 public void test_EventClient_subscribe_offset_1 () throws IOException , InterruptedException {
768763 subscribePrepare ();
769764 conn .run ("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;" );
770- sender .connect (conn ,"inputTable" );
771765 List <Entity > attributes = new ArrayList <>();
772766 attributes .add (new BasicTimestamp (LocalDateTime .of (2024 ,3 ,22 ,10 ,45 ,3 ,100000000 )));
773767 attributes .add (new BasicString ("123456" ));
@@ -782,7 +776,6 @@ public void test_EventClient_subscribe_offset_1() throws IOException, Interrupt
782776 public void test_EventClient_subscribe_offset_not_match () throws IOException , InterruptedException {
783777 subscribePrepare ();
784778 conn .run ("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;" );
785- sender .connect (conn ,"inputTable" );
786779 List <Entity > attributes = new ArrayList <>();
787780 attributes .add (new BasicTimestamp (LocalDateTime .of (2024 ,3 ,22 ,10 ,45 ,3 ,100000000 )));
788781 attributes .add (new BasicString ("123456" ));
@@ -799,7 +792,6 @@ public void test_EventClient_subscribe_offset_not_match() throws IOException, I
799792 public void test_EventClient_subscribe_reconnect_true () throws IOException , InterruptedException {
800793 subscribePrepare ();
801794 conn .run ("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;" );
802- sender .connect (conn ,"inputTable" );
803795 List <Entity > attributes = new ArrayList <>();
804796 attributes .add (new BasicTimestamp (LocalDateTime .of (2024 ,3 ,22 ,10 ,45 ,3 ,100000000 )));
805797 attributes .add (new BasicString ("123456" ));
@@ -813,7 +805,6 @@ public void test_EventClient_subscribe_reconnect_true() throws IOException, Int
813805 public void test_EventClient_subscribe_reconnect_false () throws IOException , InterruptedException {
814806 subscribePrepare ();
815807 conn .run ("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;" );
816- sender .connect (conn ,"inputTable" );
817808 List <Entity > attributes = new ArrayList <>();
818809 attributes .add (new BasicTimestamp (LocalDateTime .of (2024 ,3 ,22 ,10 ,45 ,3 ,100000000 )));
819810 attributes .add (new BasicString ("123456" ));
@@ -853,15 +844,15 @@ public void test_EventClient_subscribe_admin() throws IOException, InterruptedE
853844 PrepareUser ("user1" ,"123456" );
854845 DBConnection conn = new DBConnection ();
855846 conn .connect (HOST , PORT ,"user1" ,"123456" );
856- subscribePrepare ();
857- conn .run ("share streamTable(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]) as inputTable1;" );
858- conn .run ("addAccessControl(`inputTable1)" );
847+ conn .run ("share streamTable(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]) as inputTable;" );
848+ conn .run ("addAccessControl(`inputTable)" );
859849 conn .run ("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;" );
860- sender .connect (conn ,"inputTable1" );
850+ subscribePrepare ();
851+
861852 List <Entity > attributes = new ArrayList <>();
862853 attributes .add (new BasicTimestamp (LocalDateTime .of (2024 ,3 ,22 ,10 ,45 ,3 ,100000000 )));
863854 attributes .add (new BasicString ("123456" ));
864- client .subscribe (HOST , PORT , "inputTable1 " , "test1" , handler , -1 , true , "admin" , "123456" );
855+ client .subscribe (HOST , PORT , "inputTable " , "test1" , handler , -1 , true , "admin" , "123456" );
865856 sender .sendEvent ("MarketData" , attributes );
866857 Thread .sleep (1000 );
867858 BasicTable re = (BasicTable )conn .run ("select * from outputTable" );
@@ -872,15 +863,15 @@ public void test_EventClient_subscribe_other_user() throws IOException, Interru
872863 PrepareUser ("user1" ,"123456" );
873864 DBConnection conn = new DBConnection ();
874865 conn .connect (HOST , PORT ,"user1" ,"123456" );
875- subscribePrepare ();
876- conn .run ("share streamTable(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]) as inputTable1;" );
877- conn .run ("addAccessControl(`inputTable1)" );
866+ conn .run ("share streamTable(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]) as inputTable;" );
867+ conn .run ("addAccessControl(`inputTable)" );
878868 conn .run ("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;" );
879- sender .connect (conn ,"inputTable1" );
869+ subscribePrepare ();
870+
880871 List <Entity > attributes = new ArrayList <>();
881872 attributes .add (new BasicTimestamp (LocalDateTime .of (2024 ,3 ,22 ,10 ,45 ,3 ,100000000 )));
882873 attributes .add (new BasicString ("123456" ));
883- client .subscribe (HOST , PORT , "inputTable1 " , "test1" , handler , -1 , true , "user1" , "123456" );
874+ client .subscribe (HOST , PORT , "inputTable " , "test1" , handler , -1 , true , "user1" , "123456" );
884875 sender .sendEvent ("MarketData" , attributes );
885876 Thread .sleep (1000 );
886877 BasicTable re = (BasicTable )conn .run ("select * from outputTable" );
@@ -892,22 +883,31 @@ public void test_EventClient_other_user_unallow() throws IOException, Interrupt
892883 PrepareUser ("user2" ,"123456" );
893884 DBConnection conn = new DBConnection ();
894885 conn .connect (HOST , PORT ,"user1" ,"123456" );
895- subscribePrepare ();
896- conn .run ("share streamTable(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]) as inputTable1;" );
897- conn .run ("addAccessControl(`inputTable1)" );
886+ conn .run ("share streamTable(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]) as inputTable;" );
887+ conn .run ("addAccessControl(`inputTable)" );
888+ EventSchema scheme = new EventSchema ();
889+ scheme .setEventType ("MarketData" );
890+ scheme .setFieldNames (Arrays .asList ("timestamp" , "comment1" ));
891+ scheme .setFieldTypes (Arrays .asList ( DT_TIMESTAMP ,DT_STRING ));
892+ scheme .setFieldForms (Arrays .asList (DF_SCALAR , DF_SCALAR ));
893+ List <EventSchema > eventSchemas = new ArrayList <>();
894+ eventSchemas .add (scheme );
895+ List <String > eventTimeKeys = Arrays .asList (new String []{"timestamp" });
896+ List <String > commonKeys = Arrays .asList (new String []{"comment1" });
897+ sender = new EventSender (conn , "inputTable" , eventSchemas , eventTimeKeys , commonKeys );
898+ client = new EventClient (eventSchemas , eventTimeKeys , commonKeys );
898899 String re = null ;
899900 try {
900- client .subscribe (HOST , PORT , "inputTable1 " , "test1" , handler1 , -1 , true , "user2" , "123456" );
901+ client .subscribe (HOST , PORT , "inputTable " , "test1" , handler1 , -1 , true , "user2" , "123456" );
901902 }catch (Exception ex ){
902903 re = ex .getMessage ();
903904 }
904- Assert .assertEquals (true , re .contains ("No access to shared table [inputTable1 ]" ));
905+ Assert .assertEquals (true , re .contains ("No access to shared table [inputTable ]" ));
905906 }
906907 @ Test
907908 public void test_EventClient_subscribe_unsubscribe_resubscribe () throws IOException , InterruptedException {
908909 subscribePrepare ();
909910 conn .run ("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;" );
910- sender .connect (conn ,"inputTable" );
911911 List <Entity > attributes = new ArrayList <>();
912912 attributes .add (new BasicTimestamp (LocalDateTime .of (2024 ,3 ,22 ,10 ,45 ,3 ,100000000 )));
913913 attributes .add (new BasicString ("123456" ));
@@ -955,23 +955,22 @@ public void test_EventClient_unsubscribe_duplicated() throws IOException, Inter
955955
956956 @ Test
957957 public void test_EventClient_subscribe_haStreamTable () throws IOException , InterruptedException {
958- subscribePrepare ();
959- conn .run ("table = table(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]) as inputTable1;" );
960- conn .run ("haStreamTable(11, table, `inputTable1, 100000)" );
958+ conn .run ("table = table(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]) as inputTable;" );
959+ conn .run ("haStreamTable(11, table, `inputTable, 100000)" );
961960 conn .run ("share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;" );
962- sender .connect (conn ,"inputTable1" );
961+ subscribePrepare ();
962+
963963 List <Entity > attributes = new ArrayList <>();
964964 attributes .add (new BasicTimestamp (LocalDateTime .of (2024 ,3 ,22 ,10 ,45 ,3 ,100000000 )));
965965 attributes .add (new BasicString ("123456" ));
966- client .subscribe (HOST , PORT , "inputTable1 " , "test1" , handler , -1 , true , "user1" , "123456" );
966+ client .subscribe (HOST , PORT , "inputTable " , "test1" , handler , -1 , true , "user1" , "123456" );
967967 sender .sendEvent ("MarketData" , attributes );
968968 Thread .sleep (1000 );
969969 BasicTable re = (BasicTable )conn .run ("select * from outputTable" );
970970 Assert .assertEquals (1 ,re .rows ());
971971 }
972972 @ Test
973973 public void test_EventClient_subscribe_haStreamTable_leader () throws IOException , InterruptedException {
974- subscribePrepare ();
975974 BasicString StreamLeaderTmp = (BasicString )conn .run (String .format ("getStreamingLeader(%d)" , GROUP_ID ));
976975 String StreamLeader = StreamLeaderTmp .getString ();
977976 BasicString StreamLeaderHostTmp = (BasicString )conn .run (String .format ("(exec host from rpc(getControllerAlias(), getClusterPerf) where name=\" %s\" )[0]" , StreamLeader ));
@@ -982,28 +981,28 @@ public void test_EventClient_subscribe_haStreamTable_leader() throws IOExceptio
982981 System .out .println (StreamLeaderPort );
983982 DBConnection conn1 = new DBConnection ();
984983 conn1 .connect (StreamLeaderHost , StreamLeaderPort , "admin" , "123456" );
985- String script = "try{\n dropStreamTable(`inputTable1 )\n }catch(ex){\n }\n " +
984+ String script = "try{\n dropStreamTable(`inputTable )\n }catch(ex){\n }\n " +
986985 "table = table(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]);\n " +
987- "haStreamTable(" +GROUP_ID +", table, `inputTable1 , 100000);\n " +
986+ "haStreamTable(" +GROUP_ID +", table, `inputTable , 100000);\n " +
988987 "share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;;\n " ;
989988 conn1 .run (script );
990- sender .connect (conn ,"inputTable1" );
989+ subscribePrepare ();
990+
991991 List <Entity > attributes = new ArrayList <>();
992992 attributes .add (new BasicTimestamp (LocalDateTime .of (2024 ,3 ,22 ,10 ,45 ,3 ,100000000 )));
993993 attributes .add (new BasicString ("123456" ));
994- client .subscribe (StreamLeaderHost , StreamLeaderPort , "inputTable1 " , "test1" , handler , -1 , true , "user1" , "123456" );
994+ client .subscribe (StreamLeaderHost , StreamLeaderPort , "inputTable " , "test1" , handler , -1 , true , "user1" , "123456" );
995995 sender .sendEvent ("MarketData" , attributes );
996996 Thread .sleep (1000 );
997997 BasicTable re = (BasicTable )conn1 .run ("select * from outputTable" );
998998 Assert .assertEquals (1 ,re .rows ());
999999 Assert .assertEquals ("2024.03.22T10:45:03.100" ,re .getColumn (0 ).get (0 ).getString ());
10001000 Assert .assertEquals ("123456" ,re .getColumn (1 ).get (0 ).getString ());
1001- client .unsubscribe (StreamLeaderHost , StreamLeaderPort , "inputTable1 " , "test1" );
1001+ client .unsubscribe (StreamLeaderHost , StreamLeaderPort , "inputTable " , "test1" );
10021002 }
10031003
10041004 @ Test //not support
10051005 public void test_EventClient_subscribe_haStreamTable_follower () throws IOException , InterruptedException {
1006- subscribePrepare ();
10071006 String script0 ="leader = getStreamingLeader(" +GROUP_ID +");\n " +
10081007 "groupSitesStr = (exec sites from getStreamingRaftGroups() where id ==" +GROUP_ID +")[0];\n " +
10091008 "groupSites = split(groupSitesStr, \" ,\" );\n " +
@@ -1017,23 +1016,24 @@ public void test_EventClient_subscribe_haStreamTable_follower() throws IOExcept
10171016 System .out .println (StreamFollowerPort );
10181017 DBConnection conn1 = new DBConnection ();
10191018 conn1 .connect (StreamFollowerHost , StreamFollowerPort , "admin" , "123456" );
1020- String script = "try{\n dropStreamTable(`inputTable1 )\n }catch(ex){\n }\n " +
1019+ String script = "try{\n dropStreamTable(`inputTable )\n }catch(ex){\n }\n " +
10211020 "table = table(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]);\n " +
1022- "haStreamTable(" +GROUP_ID +", table, `inputTable1 , 100000);\n " +
1021+ "haStreamTable(" +GROUP_ID +", table, `inputTable , 100000);\n " +
10231022 "share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;\n " ;
10241023 conn1 .run (script );
1025- sender .connect (conn ,"inputTable1" );
1024+
1025+ subscribePrepare ();
10261026 List <Entity > attributes = new ArrayList <>();
10271027 attributes .add (new BasicTimestamp (LocalDateTime .of (2024 ,3 ,22 ,10 ,45 ,3 ,100000000 )));
10281028 attributes .add (new BasicString ("123456" ));
1029- client .subscribe (StreamFollowerHost , StreamFollowerPort , "inputTable1 " , "test1" , handler , -1 , true , "user1" , "123456" );
1029+ client .subscribe (StreamFollowerHost , StreamFollowerPort , "inputTable " , "test1" , handler , -1 , true , "user1" , "123456" );
10301030 sender .sendEvent ("MarketData" , attributes );
10311031 Thread .sleep (1000 );
10321032 BasicTable re = (BasicTable )conn1 .run ("select * from outputTable" );
10331033 Assert .assertEquals (1 ,re .rows ());
10341034 Assert .assertEquals ("2024.03.22T10:45:03.100" ,re .getColumn (0 ).get (0 ).getString ());
10351035 Assert .assertEquals ("123456" ,re .getColumn (1 ).get (0 ).getString ());
1036- client .unsubscribe (StreamFollowerHost , StreamFollowerPort , "inputTable1 " , "test1" );
1036+ client .unsubscribe (StreamFollowerHost , StreamFollowerPort , "inputTable " , "test1" );
10371037 }
10381038
10391039 @ Test
@@ -1441,8 +1441,7 @@ public void test_EventClient_all_dateType_vector_no_decimal() throws IOExceptio
14411441 List <EventSchema > eventSchemes = Collections .singletonList (scheme );
14421442 List <String > eventTimeKeys = new ArrayList <>();
14431443 List <String > commonKeys = new ArrayList <>();
1444- EventSender sender = EventSender .createEventSender (eventSchemes , eventTimeKeys , commonKeys );
1445- sender .connect (conn ,"inputTable" );
1444+ EventSender sender = new EventSender (conn , "inputTable" , eventSchemes , eventTimeKeys , commonKeys );
14461445
14471446 EventClient client = new EventClient (eventSchemes , eventTimeKeys , commonKeys );
14481447 client .subscribe (HOST , PORT , "intput1" , "test1" , handler_array_no_decimal , -1 , true , "admin" , "123456" );
@@ -1515,8 +1514,7 @@ public void test_EventClient_all_dateType_vector_decimal() throws IOException,
15151514 List <EventSchema > eventSchemes = Collections .singletonList (scheme );
15161515 List <String > eventTimeKeys = new ArrayList <>();
15171516 List <String > commonKeys = new ArrayList <>();
1518- EventSender sender = EventSender .createEventSender (eventSchemes , eventTimeKeys , commonKeys );
1519- sender .connect (conn ,"inputTable" );
1517+ EventSender sender = new EventSender (conn , "inputTable" , eventSchemes , eventTimeKeys , commonKeys );
15201518
15211519 EventClient client = new EventClient (eventSchemes , eventTimeKeys , commonKeys );
15221520 client .subscribe (HOST , PORT , "intput1" , "test1" , handler_array_decimal , -1 , true , "admin" , "123456" );
@@ -1568,10 +1566,9 @@ public void test_EventClient_all_dateType_array() throws IOException, Interrupt
15681566 List <EventSchema > eventSchemas = Collections .singletonList (scheme );
15691567 List <String > eventTimeKeys = new ArrayList <>();
15701568 List <String > commonKeys = new ArrayList <>();
1571- EventSender sender = EventSender .createEventSender (eventSchemas , eventTimeKeys , commonKeys );
15721569 String script = "share streamTable(1000000:0, `eventType`event, [STRING,BLOB]) as inputTable;\n " ;
15731570 conn .run (script );
1574- sender . connect (conn ,"inputTable" );
1571+ EventSender sender = new EventSender (conn , "inputTable" , eventSchemas , eventTimeKeys , commonKeys );
15751572 EventClient client = new EventClient (eventSchemas , eventTimeKeys , commonKeys );
15761573 client .subscribe (HOST , PORT , "inputTable" , "test1" , handler_array , -1 , true , "admin" , "123456" );
15771574 Preparedata_array (100 ,10 );
0 commit comments