@@ -17,16 +17,13 @@ insert into trades values(timev, symv, take(-1, 1), pricev, exchv,x)
1717 */
1818package com .xxdb .streaming .reverse ;
1919
20- import com . xxdb . BasicDBTask ;
20+
2121import com .xxdb .DBConnection ;
22- import com .xxdb .DBTask ;
23- import com .xxdb .ExclusiveDBConnectionPool ;
2422import com .xxdb .data .*;
2523import com .xxdb .data .Vector ;
2624import com .xxdb .streaming .client .*;
2725import org .javatuples .Pair ;
2826import org .junit .*;
29-
3027import java .io .IOException ;
3128import java .net .SocketException ;
3229import java .util .*;
@@ -99,35 +96,22 @@ public void after() throws IOException, InterruptedException {
9996 try {clear_env ();}catch (Exception e ){}
10097 //client.close();
10198 conn .close ();
102- Thread .sleep (2000 );
10399 }
104100
105101 @ AfterClass
106102 public static void clear_conn () {
107103 try {clear_env_1 ();}catch (Exception e ){}
108104 }
109105
110- public void wait_data (String table_name ,int data_row ) throws IOException , InterruptedException {
111- BasicInt row_num ;
112- while (true ){
113- row_num = (BasicInt )conn .run ("(exec count(*) from " +table_name +")[0]" );
114- // System.out.println(row_num.getInt());
115- if (row_num .getInt () == data_row ){
116- break ;
117- }
118- Thread .sleep (100 );
119- }
120- }
121-
122106 public static void checkResult () throws IOException , InterruptedException {
123- for (int i = 0 ; i < 10 ; i ++)
107+ for (int i = 0 ; i < 20 ; i ++)
124108 {
125109 BasicInt tmpNum = (BasicInt )conn .run ("exec count(*) from sub1" );
126110 if (tmpNum .getInt ()==(1000 ))
127111 {
128112 break ;
129113 }
130- Thread .sleep (1000 );
114+ Thread .sleep (200 );
131115 }
132116 BasicTable except = (BasicTable )conn .run ("select * from Trades order by permno" );
133117 BasicTable res = (BasicTable )conn .run ("select * from sub1 order by permno" );
@@ -138,15 +122,15 @@ public static void checkResult() throws IOException, InterruptedException {
138122 }
139123 }
140124 public static void checkResult1 () throws IOException , InterruptedException {
141- for (int i = 0 ; i < 10 ; i ++)
125+ for (int i = 0 ; i < 20 ; i ++)
142126 {
143127 BasicInt tmpNum = (BasicInt )conn .run ("exec count(*) from sub1 " );
144128 BasicInt tmpNum1 = (BasicInt )conn .run ("exec count(*) from sub2 " );
145129 if (tmpNum .getInt ()==(1000 )&& tmpNum1 .getInt ()==(1000 ))
146130 {
147131 break ;
148132 }
149- Thread .sleep (3000 );
133+ Thread .sleep (100 );
150134 }
151135 BasicTable except = (BasicTable )conn .run ("select * from pub_t1 order by timestampv" );
152136 BasicTable res = (BasicTable )conn .run ("select * from sub1 order by timestampv" );
@@ -179,7 +163,7 @@ public void test_size() throws IOException {
179163 try {
180164 for (int i =0 ;i <10 ;i ++){//data<size
181165 conn .run ("n=50;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades1.append!(t)" );
182- msgs = poller1 .poll (100 ,1000 );
166+ msgs = poller1 .poll (1000 ,1000 );
183167 if (msgs ==null ){
184168 continue ;
185169 }
@@ -189,7 +173,7 @@ else if (msgs.size() > 0) {
189173 }
190174 for (int i =0 ;i <10 ;i ++){//data>size
191175 conn .run ("n=5000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades1.append!(t)" );
192- msgs = poller1 .poll (100000 ,1000 );
176+ msgs = poller1 .poll (1000 ,1000 );
193177 if (msgs ==null ){
194178 continue ;
195179 }
@@ -200,7 +184,7 @@ else if (msgs.size() > 0) {
200184 }
201185 for (int i =0 ;i <10 ;i ++){//data=size
202186 conn .run ("n=5000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades1.append!(t)" );
203- msgs = poller1 .poll (1000000 ,5000 );
187+ msgs = poller1 .poll (5000 ,5000 );
204188 if (msgs ==null ){
205189 continue ;
206190 }
@@ -344,7 +328,7 @@ public void test_subscribe_other_user() throws IOException, InterruptedException
344328 PrepareUser ("test1" ,"123456" );
345329 TopicPoller poller1 = client .subscribe (HOST ,PORT ,"Trades1" ,"subTread1" ,-1 ,true ,null ,"test1" ,"123456" );
346330 conn .run ("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades1.append!(t)" );
347- Thread .sleep (5000 );
331+ // Thread.sleep(5000);
348332 ArrayList <IMessage > msgs1 = poller1 .poll (500 , 10000 );
349333 assertEquals (10000 , msgs1 .size ());
350334 client .unsubscribe (HOST ,PORT ,"Trades1" ,"subTread1" );
@@ -2198,15 +2182,15 @@ public void run() {
21982182 MessageHandler_handler1 (messages );
21992183 Thread .sleep (1000 );
22002184 thread .join ();
2201- Thread .sleep (10000 );
2185+ Thread .sleep (8000 );
22022186 controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
22032187 //Thread.sleep(1000);
22042188 List <IMessage > messages1 = poller .poll (1000 ,1000 );
2205- Thread .sleep (1000 );
2189+ // Thread.sleep(1000);
22062190 System .out .println (messages1 .size ());
22072191 //Assert.assertEquals(1000,messages1.size());
22082192 MessageHandler_handler1 (messages1 );
2209- Thread .sleep (1000 );
2193+ // Thread.sleep(1000);
22102194 BasicTable re = (BasicTable )conn .run ("select tag ,now,deltas(now) from Receive order by deltas(now) desc \n " );
22112195 System .out .println (re .getString ());
22122196 Assert .assertEquals (1000 ,re .rows ());
@@ -2246,31 +2230,31 @@ public void test_PollingClient_subscribe_resubTimeout_subOnce_not_set() throws I
22462230 System .out .println ("Successful subscribe" );
22472231 conn1 .run ("n=1000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
22482232 conn2 .run ("n=1000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
2249- Thread .sleep (1000 );
2233+ // Thread.sleep(1000);
22502234 //List<IMessage> messages = poller.poll(1000,1000);
22512235 //MessageHandler_handler(messages);
22522236 controller_conn .run ("try{stopDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
22532237 System .out .println (port_list [1 ]+"断掉啦---------------------------------------------------" );
2254- Thread .sleep (10000 );
2238+ Thread .sleep (8000 );
22552239 conn2 .run ("n=2000;t=table(1001..n as tag,timestamp(1001..n) as ts,take(100.0,1000) as data);" + "Trades.append!(t)" );
22562240 Thread .sleep (2000 );
22572241 //List<IMessage> messages1 = poller.poll(2000,2000);
22582242 // MessageHandler_handler(messages1);
22592243 controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
2260- Thread .sleep (10000 );
2244+ Thread .sleep (8000 );
22612245 DBConnection conn3 = new DBConnection ();
22622246 conn3 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
22632247 conn3 .run (script1 );
22642248 conn3 .run (script2 );
22652249 controller_conn .run ("try{stopDataNode('" +HOST +":" +port_list [2 ]+"')}catch(ex){}" );
22662250 System .out .println (port_list [2 ]+"节点断掉啦---------------------------------------------------" );
2267- Thread .sleep (20000 );
2251+ Thread .sleep (8000 );
22682252 conn3 .run ("n=3000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
22692253 Thread .sleep (5000 );
22702254 List <IMessage > messages2 = poller .poll (3000 ,3000 );
22712255 MessageHandler_handler (messages2 );
22722256 controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [2 ]+"')}catch(ex){}" );
2273- Thread .sleep (10000 );
2257+ Thread .sleep (8000 );
22742258
22752259 BasicTable row_num = (BasicTable )conn .run ("select count(*) from Receive" );
22762260 System .out .println (row_num .getColumn (0 ).get (0 ));
0 commit comments