@@ -2008,4 +2008,194 @@ public void test_threadPooledClient_subscribe_orca_table_not_orca_node() throws
20082008 }
20092009 threadPooledClient .unsubscribe (HOST , port1 , "orca.orca_table.output" );
20102010 }
2011+
2012+ BatchMessageHandler BatchMessageHandler_msgAsTable = new BatchMessageHandler () {
2013+ @ Override
2014+ public void batchHandler (List <IMessage > msgs ) {
2015+ try {
2016+ save_batch_size .add (msgs .size ());
2017+ for (int x = 0 ; x <msgs .size (); x ++){
2018+ doEvent (msgs .get (x ));
2019+ }
2020+ } catch (Exception e ) {
2021+ e .printStackTrace ();
2022+ }
2023+ }
2024+ @ Override
2025+ public void doEvent (IMessage msg ) {
2026+ try {
2027+
2028+ System .out .println (msg .getEntity (0 ).getString ());
2029+ List <Entity > args = new ArrayList <>();
2030+ args .add (msg .getEntity (0 ));
2031+ args .add (msg .getEntity (1 ));
2032+ args .add (msg .getEntity (2 ));
2033+ conn .run ("tableInsert{Receive}" , args );
2034+ } catch (Exception e ) {
2035+ e .printStackTrace ();
2036+ }
2037+ }
2038+ };
2039+ @ Test (timeout = 120000 )
2040+ public void test_threadPooledClient_subscribe_msgAsTable_false () throws Exception {
2041+ String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
2042+ "share(st1,`Trades)\t \n "
2043+ + "setStreamTableFilterColumn(objByName(`Trades),`tag)" ;
2044+ conn .run (script1 );
2045+ String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
2046+ "share(st2, `Receive)\t \n " ;
2047+ conn .run (script2 );
2048+ Vector filter1 = (Vector ) conn .run ("1..1000" );
2049+ threadPooledClient .subscribe (HOST , PORT , "Trades" , "subTread1" , BatchMessageHandler_msgAsTable , 0 ,false , filter1 ,null ,false ,"admin" ,"123456" ,false );
2050+ conn .run ("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)" );
2051+ conn .run ("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)" );
2052+ wait_data ("Receive" ,2000 );
2053+ BasicTable re = (BasicTable ) conn .run ("select * from Receive order by tag,ts,data" );
2054+ BasicTable tra = (BasicTable ) conn .run ("select * from Trades where tag<1001 order by tag,ts,data" );
2055+ threadPooledClient .unsubscribe (HOST , PORT , "Trades" , "subTread1" );
2056+ assertEquals (2000 , re .rows ());
2057+ for (int i = 0 ; i < 2000 ; i ++) {
2058+ assertEquals (re .getColumn (0 ).get (i ), tra .getColumn (0 ).get (i ));
2059+ assertEquals (re .getColumn (1 ).get (i ), tra .getColumn (1 ).get (i ));
2060+ assertEquals (((Scalar )re .getColumn (2 ).get (i )).getNumber ().doubleValue (), ((Scalar )tra .getColumn (2 ).get (i )).getNumber ().doubleValue (), 4 );
2061+ }
2062+ }
2063+ @ Test (timeout = 120000 )
2064+ public void test_threadPooledClient_subscribe_msgAsTable_true () throws Exception {
2065+ String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
2066+ "share(st1,`Trades)\t \n "
2067+ + "setStreamTableFilterColumn(objByName(`Trades),`tag)" ;
2068+ conn .run (script1 );
2069+ String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
2070+ "share(st2, `Receive)\t \n " ;
2071+ conn .run (script2 );
2072+ Vector filter1 = (Vector ) conn .run ("1..1000" );
2073+ threadPooledClient .subscribe (HOST , PORT , "Trades" , "subTread1" , BatchMessageHandler_msgAsTable , 0 ,false , filter1 ,null ,false ,"admin" ,"123456" ,true );
2074+ conn .run ("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)" );
2075+ conn .run ("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)" );
2076+ wait_data ("Receive" ,2000 );
2077+ BasicTable re = (BasicTable ) conn .run ("select * from Receive order by tag,ts,data" );
2078+ BasicTable tra = (BasicTable ) conn .run ("select * from Trades where tag<1001 order by tag,ts,data" );
2079+ threadPooledClient .unsubscribe (HOST , PORT , "Trades" , "subTread1" );
2080+ assertEquals (2000 , re .rows ());
2081+ for (int i = 0 ; i < 2000 ; i ++) {
2082+ assertEquals (re .getColumn (0 ).get (i ), tra .getColumn (0 ).get (i ));
2083+ assertEquals (re .getColumn (1 ).get (i ), tra .getColumn (1 ).get (i ));
2084+ assertEquals (((Scalar )re .getColumn (2 ).get (i )).getNumber ().doubleValue (), ((Scalar )tra .getColumn (2 ).get (i )).getNumber ().doubleValue (), 4 );
2085+ }
2086+ }
2087+
2088+ BatchMessageHandler BatchMessageHandler_msgAsTable_all = new BatchMessageHandler () {
2089+ @ Override
2090+ public void batchHandler (List <IMessage > msgs ) {
2091+ try {
2092+ save_batch_size .add (msgs .size ());
2093+ for (int x = 0 ; x <msgs .size (); x ++){
2094+ doEvent (msgs .get (x ));
2095+ }
2096+ } catch (Exception e ) {
2097+ e .printStackTrace ();
2098+ }
2099+ }
2100+ @ Override
2101+ public void doEvent (IMessage msg ) {
2102+ try {
2103+ System .out .println (msg .getEntity (0 ).getString ());
2104+ List <Entity > args = new ArrayList <>();
2105+ for (int i =0 ; i <28 ;i ++){
2106+ System .out .println (msg .getEntity (0 ).getString ());
2107+ args .add (msg .getEntity (i ));
2108+ }
2109+ conn .run ("tableInsert{Receive}" , args );
2110+ } catch (Exception e ) {
2111+ e .printStackTrace ();
2112+ }
2113+ }
2114+ };
2115+
2116+ @ Test (timeout = 120000 )
2117+ public void test_threadedClient_subscribe_msgAsTable_true_all_dateType_1 () throws Exception {
2118+ Prepare_streamTable ("Trades" );
2119+ conn .run ("setStreamTableFilterColumn(Trades, `intv)" );
2120+ Prepare_streamTable ("Receive" );
2121+ threadPooledClient .subscribe (HOST , PORT , "Trades" , "subTread1" , BatchMessageHandler_msgAsTable_all , 0 ,false , null ,null ,false ,"admin" ,"123456" ,true );
2122+ Preparedata1 (1 );
2123+ conn .run ("Trades.append!(data)" );
2124+ wait_data ("Receive" ,1 );
2125+ BasicTable re = (BasicTable ) conn .run ("select * from Receive order by intv,boolv,charv,shortv" );
2126+ BasicTable tra = (BasicTable ) conn .run ("select * from Trades order by intv,boolv,charv,shortv" );
2127+ threadPooledClient .unsubscribe (HOST , PORT , "Trades" , "subTread1" );
2128+ assertEquals (1 , re .rows ());
2129+ checkData (tra , re );
2130+ }
2131+ @ Test (timeout = 120000 )
2132+ public void test_threadedClient_subscribe_msgAsTable_true_all_dateType_1000 () throws Exception {
2133+ Prepare_streamTable ("Trades" );
2134+ conn .run ("setStreamTableFilterColumn(Trades, `intv)" );
2135+ Prepare_streamTable ("Receive" );
2136+ threadPooledClient .subscribe (HOST , PORT , "Trades" , "subTread1" , BatchMessageHandler_msgAsTable_all , 0 ,false , null ,null ,false ,"admin" ,"123456" ,true );
2137+ Preparedata1 (1000 );
2138+ conn .run ("Trades.append!(data)" );
2139+ wait_data ("Receive" ,1000 );
2140+ BasicTable re = (BasicTable ) conn .run ("select * from Receive order by intv,boolv,charv,shortv" );
2141+ BasicTable tra = (BasicTable ) conn .run ("select * from Trades order by intv,boolv,charv,shortv" );
2142+ threadPooledClient .unsubscribe (HOST , PORT , "Trades" , "subTread1" );
2143+ assertEquals (1000 , re .rows ());
2144+ checkData (tra , re );
2145+ }
2146+
2147+ @ Test (timeout = 120000 )
2148+ public void test_threadedClient_subscribe_msgAsTable_true_all_dateType_10000 () throws Exception {
2149+ Prepare_streamTable ("Trades" );
2150+ conn .run ("setStreamTableFilterColumn(Trades, `intv)" );
2151+ Prepare_streamTable ("Receive" );
2152+ threadPooledClient .subscribe (HOST , PORT , "Trades" , "subTread1" , BatchMessageHandler_msgAsTable_all , 0 ,false , null ,null ,false ,"admin" ,"123456" ,true );
2153+ Preparedata1 (10000 );
2154+ conn .run ("Trades.append!(data)" );
2155+ wait_data ("Receive" ,10000 );
2156+ BasicTable re = (BasicTable ) conn .run ("select * from Receive order by intv,boolv,charv,shortv" );
2157+ BasicTable tra = (BasicTable ) conn .run ("select * from Trades order by intv,boolv,charv,shortv" );
2158+ threadPooledClient .unsubscribe (HOST , PORT , "Trades" , "subTread1" );
2159+ assertEquals (10000 , re .rows ());
2160+ checkData (tra , re );
2161+ }
2162+
2163+ BatchMessageHandler BatchMessageHandler_msgAsTable_all_array = new BatchMessageHandler () {
2164+ @ Override
2165+ public void batchHandler (List <IMessage > msgs ) {
2166+ try {
2167+ save_batch_size .add (msgs .size ());
2168+ for (int x = 0 ; x <msgs .size (); x ++){
2169+ doEvent (msgs .get (x ));
2170+ }
2171+ } catch (Exception e ) {
2172+ e .printStackTrace ();
2173+ }
2174+ }
2175+ @ Override
2176+ public void doEvent (IMessage msg ) {
2177+ try {
2178+ System .out .println (msg .getEntity (0 ).getString ());
2179+ List <Entity > args = new ArrayList <>();
2180+ for (int i =0 ; i <26 ;i ++){
2181+ System .out .println (msg .getEntity (0 ).getString ());
2182+ args .add (msg .getEntity (i ));
2183+ }
2184+ conn .run ("tableInsert{Receive}" , args );
2185+ } catch (Exception e ) {
2186+ e .printStackTrace ();
2187+ }
2188+ }
2189+ };
2190+ @ Test (timeout = 120000 )
2191+ public void test_threadedClient_subscribe_msgAsTable_true_all_dateType_array () throws Exception {
2192+ Preparedata_streamTable_array (100 ,3 );
2193+ threadPooledClient .subscribe (HOST , PORT , "Trades" , "subTread1" , BatchMessageHandler_msgAsTable_all_array , 0 ,false , null ,null ,false ,"admin" ,"123456" ,true );
2194+ wait_data ("Receive" ,34 );
2195+ BasicTable re = (BasicTable ) conn .run ("select * from Receive order by id" );
2196+ BasicTable tra = (BasicTable ) conn .run ("select * from Trades order by id" );
2197+ threadPooledClient .unsubscribe (HOST , PORT , "Trades" , "subTread1" );
2198+ assertEquals (34 , re .rows ());
2199+ checkData (tra , re );
2200+ }
20112201}
0 commit comments