@@ -229,7 +229,7 @@ public void test_subscribe_ofst0() throws Exception {
229229 int ofst = 0 ;
230230 client .subscribe (HOST , PORT , "Trades" , MessageHandler_handler , ofst );
231231 conn .run ("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)" );
232- Thread .sleep (20000 );
232+ Thread .sleep (30000 );
233233 BasicTable re = (BasicTable ) conn .run ("select * from Receive order by tag" );
234234 BasicTable tra = (BasicTable ) conn .run ("select * from Trades order by tag" );
235235 client .unsubscribe (HOST , PORT , "Trades" , "javaStreamingApi" );
@@ -478,7 +478,7 @@ public void test_subscribe_other_some_user() throws IOException, InterruptedExce
478478
479479 @ Test
480480 public void test_subscribe_one_user_some_table () throws IOException , InterruptedException {
481- conn .run ("def create_user(){try{deleteUser(`test1)}catch(ex){};createUser(`test1, '123456');};" +
481+ conn .run ("login('admin','123456'); def create_user(){try{deleteUser(`test1)}catch(ex){};createUser(`test1, '123456');};" +
482482 "rpc(getControllerAlias(),create_user);" +
483483 "share streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE]) as tmp_st1;" +
484484 "share streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE]) as tmp_st2;" +
@@ -489,7 +489,7 @@ public void test_subscribe_one_user_some_table() throws IOException, Interrupted
489489 client .subscribe (HOST , PORT , "tmp_st3" , "subTread1" , MessageHandler_handler , -1 , true , null , true , "test1" , "123456_error" );
490490 fail ("no exception thrown" );
491491 }catch (Exception e ){
492- System .out .println (e .getMessage ());
492+ System .out .println (e .getMessage ()+ "12345666" );
493493 }
494494 conn .run ("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "tmp_st1.append!(t)" );
495495 conn .run ("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "tmp_st2.append!(t)" );
@@ -577,11 +577,11 @@ public void test_ThreadPooledClient_null() throws Exception {
577577 client1 .subscribe (HOST , PORT , "Trades" , "subTrades" ,MessageHandler_handler ,true );
578578 conn .run ("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)" );
579579 conn .run ("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)" );
580- Thread .sleep (10000 );
580+ Thread .sleep (15000 );
581581 BasicTable re = (BasicTable ) conn .run ("select * from Receive order by tag" );
582582 BasicTable tra = (BasicTable ) conn .run ("select * from Trades order by tag" );
583583 client1 .unsubscribe (HOST , PORT , "Trades" , "subTrades" );
584- // assertEquals(20000, re.rows());
584+ assertEquals (20000 , re .rows ());
585585 for (int i = 0 ; i < 1000 ; i ++) {
586586 assertEquals (re .getColumn (0 ).get (i ), tra .getColumn (0 ).get (i ));
587587 assertEquals (re .getColumn (1 ).get (i ), tra .getColumn (1 ).get (i ));
@@ -684,4 +684,25 @@ public void test_StreamDeserializer_dataType_filters_subscribe_haStreamTable() t
684684 Assert .assertEquals (table2 .rows (), msg2 .size ());
685685 client .unsubscribe (StreamLeaderHost , StreamLeaderPort , "outTables" , "mutiSchema" );
686686 }
687+ @ Test
688+ public void test_ThreadPooledClient_threadCount () throws Exception {
689+ client = new ThreadPooledClient (10 );
690+ Vector filter1 = (Vector ) conn .run ("1..1000" );
691+ client .subscribe (HOST , PORT , "Trades" , "subTrades" , MessageHandler_handler , -1 , true , filter1 , true );
692+ conn .run ("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)" );
693+ conn .run ("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)" );
694+ Thread .sleep (10000 );
695+ BasicTable re = (BasicTable ) conn .run ("select * from Receive order by tag" );
696+ BasicTable tra = (BasicTable ) conn .run ("select * from Trades order by tag" );
697+ client .unsubscribe (HOST , PORT , "Trades" , "subTrades" );
698+ assertEquals (2000 , re .rows ());
699+ for (int i = 0 ; i < 1000 ; i ++) {
700+ assertEquals (re .getColumn (0 ).get (i ), tra .getColumn (0 ).get (i ));
701+ assertEquals (re .getColumn (1 ).get (i ), tra .getColumn (1 ).get (i ));
702+ assertEquals (((Scalar )re .getColumn (2 ).get (i )).getNumber ().doubleValue (), ((Scalar )tra .getColumn (2 ).get (i )).getNumber ().doubleValue (), 4 );
703+ assertEquals (re .getColumn (0 ).get (i + 1000 ), tra .getColumn (0 ).get (i + 1000 ));
704+ assertEquals (re .getColumn (1 ).get (i + 1000 ), tra .getColumn (1 ).get (i + 1000 ));
705+ assertEquals (((Scalar )re .getColumn (2 ).get (i + 1000 )).getNumber ().doubleValue (), ((Scalar )tra .getColumn (2 ).get (i + 1000 )).getNumber ().doubleValue (), 4 );
706+ }
707+ }
687708}
0 commit comments