@@ -1767,152 +1767,6 @@ public void Test_ThreadPooledClient_subscribe_backupSites() throws IOException,
17671767 threadPooledClient .unsubscribe (HOST ,PORT ,"Trades" ,"subTread1" );
17681768 }
17691769
1770- @ Test (timeout = 180000 )
1771- public void Test_ThreadPooledClient_subscribe_backupSites_server_disconnect () throws IOException , InterruptedException {
1772- DBConnection controller_conn = new DBConnection ();
1773- controller_conn .connect (controller_host ,controller_port ,"admin" ,"123456" );
1774- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1775- controller_conn .run ("sleep(1000)" );
1776- String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
1777- "share(st1,`Trades)\t \n "
1778- + "setStreamTableFilterColumn(objByName(`Trades),`tag)" ;
1779- conn .run (script1 );
1780- String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
1781- "share(st2, `Receive)\t \n " ;
1782- conn .run (script2 );
1783- DBConnection conn1 = new DBConnection ();
1784- conn1 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
1785- conn1 .run (script1 );
1786- conn1 .run (script2 );
1787-
1788- Vector filter1 = (Vector ) conn .run ("1..50000" );
1789- List <String > backupSites = new ArrayList <>(Collections .singleton (HOST +":" +PORT ));
1790- threadPooledClient .subscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" ,MessageHandler_handler , -1 ,true ,filter1 , (StreamDeserializer ) null ,true ,"admin" ,"123456" ,false ,backupSites ,10 ,true );
1791- System .out .println ("Successful subscribe" );
1792- conn .run ("n=5000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)" );
1793- conn1 .run ("n=5000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)" );
1794- Thread .sleep (1000 );
1795- controller_conn .run ("try{stopDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1796- Thread .sleep (8000 );
1797- conn .run ("t=table(5001..5500 as tag,now()+5001..5500 as ts,rand(100.0,500) as data);" + "Trades.append!(t)" );
1798- Thread .sleep (1000 );
1799- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1800- Thread .sleep (5000 );
1801- BasicTable row_num = (BasicTable )conn .run ("select count(*) from Receive" );
1802- System .out .println (row_num .getColumn (0 ).get (0 ));
1803- assertEquals ("5500" ,row_num .getColumn (0 ).get (0 ).getString ());
1804- threadPooledClient .unsubscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" );
1805- }
1806-
1807- @ Test (timeout = 180000 )
1808- public void Test_ThreadPooledClient_subscribe_backupSites_server_disconnect_backupSites_disconnect_subOnce_false () throws IOException , InterruptedException {
1809- DBConnection controller_conn = new DBConnection ();
1810- controller_conn .connect (controller_host ,controller_port ,"admin" ,"123456" );
1811- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1812- controller_conn .run ("sleep(1000)" );
1813- String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
1814- "share(st1,`Trades)\t \n "
1815- + "setStreamTableFilterColumn(objByName(`Trades),`tag)" ;
1816- conn .run (script1 );
1817- String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
1818- "share(st2, `Receive)\t \n " ;
1819- conn .run (script2 );
1820- DBConnection conn1 = new DBConnection ();
1821- conn1 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
1822- conn1 .run (script1 );
1823- conn1 .run (script2 );
1824-
1825- DBConnection conn2 = new DBConnection ();
1826- conn2 .connect (HOST ,port_list [2 ],"admin" ,"123456" );
1827- conn2 .run (script1 );
1828- conn2 .run (script2 );
1829- Vector filter1 = (Vector ) conn .run ("1..100000" );
1830- List <String > backupSites = new ArrayList <>(Collections .singleton (HOST +":" +port_list [2 ]));
1831- threadPooledClient .subscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" ,MessageHandler_handler , -1 ,true ,filter1 , (StreamDeserializer ) null ,true ,"admin" ,"123456" ,false ,backupSites ,10 ,false );
1832- System .out .println ("Successful subscribe" );
1833- conn1 .run ("n=1000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
1834- conn2 .run ("n=1000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
1835- Thread .sleep (1000 );
1836- controller_conn .run ("try{stopDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1837- System .out .println (port_list [1 ]+"断掉啦---------------------------------------------------" );
1838- Thread .sleep (8000 );
1839- conn2 .run ("n=2000;t=table(1001..n as tag,timestamp(1001..n) as ts,take(100.0,1000) as data);" + "Trades.append!(t)" );
1840- Thread .sleep (1000 );
1841- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1842- Thread .sleep (5000 );
1843- DBConnection conn3 = new DBConnection ();
1844- conn3 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
1845- conn3 .run (script1 );
1846- conn3 .run (script2 );
1847- controller_conn .run ("try{stopDataNode('" +HOST +":" +port_list [2 ]+"')}catch(ex){}" );
1848- System .out .println (port_list [2 ]+"节点断掉啦---------------------------------------------------" );
1849- Thread .sleep (8000 );
1850- conn3 .run ("n=3000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
1851- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [2 ]+"')}catch(ex){}" );
1852- Thread .sleep (8000 );
1853-
1854- BasicTable row_num = (BasicTable )conn .run ("select count(*) from Receive" );
1855- System .out .println (row_num .getColumn (0 ).get (0 ));
1856- assertEquals ("3000" ,row_num .getColumn (0 ).get (0 ).getString ());
1857- threadPooledClient .unsubscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" );
1858- }
1859-
1860- @ Test (timeout = 180000 )
1861- public void Test_ThreadPooledClient_subscribe_backupSites_server_disconnect_backupSites_disconnect_subOnce_true () throws IOException , InterruptedException {
1862- DBConnection controller_conn = new DBConnection ();
1863- controller_conn .connect (controller_host ,controller_port ,"admin" ,"123456" );
1864- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1865- controller_conn .run ("sleep(1000)" );
1866- String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
1867- "share(st1,`Trades)\t \n "
1868- + "setStreamTableFilterColumn(objByName(`Trades),`tag)" ;
1869- conn .run (script1 );
1870- String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
1871- "share(st2, `Receive)\t \n " ;
1872- conn .run (script2 );
1873- DBConnection conn1 = new DBConnection ();
1874- conn1 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
1875- conn1 .run (script1 );
1876- conn1 .run (script2 );
1877-
1878- DBConnection conn2 = new DBConnection ();
1879- conn2 .connect (HOST ,port_list [2 ],"admin" ,"123456" );
1880- conn2 .run (script1 );
1881- conn2 .run (script2 );
1882- Vector filter1 = (Vector ) conn .run ("1..100000" );
1883- List <String > backupSites = new ArrayList <>(Collections .singleton (HOST +":" +port_list [2 ]));
1884- threadPooledClient .subscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" ,MessageHandler_handler , -1 ,true ,filter1 , (StreamDeserializer ) null ,true ,"admin" ,"123456" ,false ,backupSites ,10 ,true );
1885- System .out .println ("Successful subscribe" );
1886- conn1 .run ("n=1000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
1887- conn2 .run ("n=1000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
1888- Thread .sleep (1000 );
1889- controller_conn .run ("try{stopDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1890- System .out .println (port_list [1 ]+"断掉啦---------------------------------------------------" );
1891- Thread .sleep (8000 );
1892- conn2 .run ("n=2000;t=table(1001..n as tag,timestamp(1001..n) as ts,take(100.0,1000) as data);" + "Trades.append!(t)" );
1893- Thread .sleep (1000 );
1894- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1895- Thread .sleep (5000 );
1896- DBConnection conn3 = new DBConnection ();
1897- conn3 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
1898- conn3 .run (script1 );
1899- conn3 .run (script2 );
1900- controller_conn .run ("try{stopDataNode('" +HOST +":" +port_list [2 ]+"')}catch(ex){}" );
1901- System .out .println (port_list [2 ]+"节点断掉啦---------------------------------------------------" );
1902- Thread .sleep (10000 );
1903- conn3 .run ("n=3000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
1904- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [2 ]+"')}catch(ex){}" );
1905- Thread .sleep (5000 );
1906-
1907- BasicTable row_num = (BasicTable )conn .run ("select count(*) from Receive" );
1908- System .out .println (row_num .getColumn (0 ).get (0 ));
1909- assertEquals ("2000" ,row_num .getColumn (0 ).get (0 ).getString ());
1910- DBConnection conn4 = new DBConnection ();
1911- conn4 .connect (HOST ,port_list [2 ],"admin" ,"123456" );
1912- conn4 .run (script1 );
1913- conn4 .run (script2 );
1914- //client.unsubscribe(HOST,port_list[1],"Trades","subTread1");
1915- }
19161770
19171771 @ Test (timeout = 180000 )
19181772 public void Test_ThreadPooledClient_subscribe_backupSites_unsubscribe () throws IOException , InterruptedException {
@@ -1954,181 +1808,7 @@ public void Test_ThreadPooledClient_subscribe_resubscribeInterval_not_true() thr
19541808 assertEquals ("1000" ,row_num .getColumn (0 ).get (0 ).getString ());
19551809 threadPooledClient .unsubscribe (HOST ,PORT ,"Trades" ,"subTread1" );
19561810 }
1957- public static MessageHandler MessageHandler_handler1 = new MessageHandler () {
1958- @ Override
1959- public void doEvent (IMessage msg ) {
1960- try {
1961- String script = String .format ("insert into Receive values(%d,%s,%f,%d)" , Integer .parseInt (msg .getEntity (0 ).getString ()), msg .getEntity (1 ).getString (), Double .valueOf (msg .getEntity (2 ).toString ()),System .currentTimeMillis ());
1962- conn .run (script );
1963- System .out .println (msg .getEntity (0 ).getString ());
1964- } catch (IOException e ) {
1965- e .printStackTrace ();
1966- }
1967- }
1968- };
1969-
1970- @ Test (timeout = 180000 )
1971- public void Test_ThreadPooledClient_subscribe_backupSites_resubscribeInterval () throws Exception {
1972- DBConnection controller_conn = new DBConnection ();
1973- controller_conn .connect (controller_host ,controller_port ,"admin" ,"123456" );
1974- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
1975- controller_conn .run ("sleep(1000)" );
1976- String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
1977- "share(st1,`Trades)\t \n "
1978- + "setStreamTableFilterColumn(objByName(`Trades),`tag)" ;
1979- conn .run (script1 );
1980- String script2 = "st2 = streamTable(1000000:0,`tag`ts`data`now,[INT,TIMESTAMP,DOUBLE,TIMESTAMP])\n " +
1981- "share(st2, `Receive)\t \n " ;
1982- conn .run (script2 );
1983- DBConnection conn1 = new DBConnection ();
1984- conn1 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
1985- conn1 .run (script1 );
1986- conn1 .run (script2 );
1987- DBConnection conn3 = new DBConnection ();
1988- conn3 .connect (HOST ,port_list [2 ],"admin" ,"123456" );
1989- conn3 .run (script1 );
1990- conn3 .run (script2 );
1991- Vector filter1 = (Vector ) conn .run ("1..10000000" );
1992- List <String > backupSites = new ArrayList <>(Collections .singleton (HOST +":" +port_list [2 ]));
1993- threadPooledClient .subscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" ,MessageHandler_handler1 , -1 ,true ,filter1 , (StreamDeserializer ) null ,true ,"admin" ,"123456" ,false ,backupSites ,1000 ,true );
1994- System .out .println ("Successful subscribe" );
1995- class MyThread extends Thread {
1996- @ Override
1997- public void run () {
1998- try {
1999- conn3 .run ("for(n in 1..1000){\n " +
2000- " insert into Trades values(n,now()+n,n);\n " +
2001- " sleep(100);\n " +
2002- "}" );
2003- } catch (Exception e ) {
2004- // 捕获异常并打印错误信息
2005- System .err .println ( e .getMessage ());
2006- }
2007- }
2008- }
2009- class MyThread1 extends Thread {
2010- @ Override
2011- public void run () {
2012- try {
2013- conn1 .run ("for(n in 1..1000){\n " +
2014- " insert into Trades values(n,now()+n,n);\n " +
2015- " sleep(100);\n " +
2016- "}" );
2017- } catch (Exception e ) {
2018- // 捕获异常并打印错误信息
2019- System .err .println ( e .getMessage ());
2020- }
2021- }
2022- }
2023- class MyThread2 extends Thread {
2024- @ Override
2025- public void run () {
2026- try {
2027- controller_conn .run ("try{stopDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
2028- } catch (Exception e ) {
2029- // 捕获异常并打印错误信息
2030- System .err .println (e .getMessage ());
2031- }
2032- }
2033- }
2034- MyThread thread = new MyThread ();
2035- MyThread1 thread1 = new MyThread1 ();
2036- MyThread2 thread2 = new MyThread2 ();
2037- thread .start ();
2038- thread1 .start ();
2039- Thread .sleep (2000 );
2040- thread2 .start ();
2041- thread .join ();
2042- Thread .sleep (5000 );
2043- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
2044- Thread .sleep (1000 );
2045- BasicTable re = (BasicTable )conn .run ("select tag ,now,deltas(now) from Receive order by deltas(now) desc \n " );
2046- System .out .println (re .getString ());
2047- Assert .assertEquals (1000 ,re .rows ());
2048- Assert .assertEquals (true ,Integer .valueOf (re .getColumn (2 ).get (0 ).toString ())>1000 );
2049- DBConnection conn2 = new DBConnection ();
2050- conn2 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
2051- conn2 .run (script1 );
2052- conn2 .run (script2 );
2053- threadPooledClient .unsubscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" );
2054- }
20551811
2056- @ Test (timeout = 180000 )
2057- public void Test_ThreadPooledClient_subscribe_resubscribeInterval_subOnce_not_set () throws IOException , InterruptedException {
2058- DBConnection controller_conn = new DBConnection ();
2059- controller_conn .connect (controller_host ,controller_port ,"admin" ,"123456" );
2060- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
2061- controller_conn .run ("sleep(1000)" );
2062- String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
2063- "share(st1,`Trades)\t \n "
2064- + "setStreamTableFilterColumn(objByName(`Trades),`tag)" ;
2065- conn .run (script1 );
2066- String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
2067- "share(st2, `Receive)\t \n " ;
2068- conn .run (script2 );
2069- DBConnection conn1 = new DBConnection ();
2070- conn1 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
2071- conn1 .run (script1 );
2072- conn1 .run (script2 );
2073-
2074- DBConnection conn2 = new DBConnection ();
2075- conn2 .connect (HOST ,port_list [2 ],"admin" ,"123456" );
2076- conn2 .run (script1 );
2077- conn2 .run (script2 );
2078- Vector filter1 = (Vector ) conn .run ("1..100000" );
2079- List <String > backupSites = new ArrayList <>(Collections .singleton (HOST +":" +port_list [2 ]));
2080- threadPooledClient .subscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" ,MessageHandler_handler , -1 ,true ,filter1 , (StreamDeserializer ) null ,true ,"admin" ,"123456" ,false ,backupSites );
2081- System .out .println ("Successful subscribe" );
2082- conn1 .run ("n=1000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
2083- conn2 .run ("n=1000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
2084- Thread .sleep (1000 );
2085- controller_conn .run ("try{stopDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
2086- System .out .println (port_list [1 ]+"断掉啦---------------------------------------------------" );
2087- Thread .sleep (8000 );
2088- conn2 .run ("n=2000;t=table(1001..n as tag,timestamp(1001..n) as ts,take(100.0,1000) as data);" + "Trades.append!(t)" );
2089- Thread .sleep (1000 );
2090- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
2091- Thread .sleep (5000 );
2092- DBConnection conn3 = new DBConnection ();
2093- conn3 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
2094- conn3 .run (script1 );
2095- conn3 .run (script2 );
2096- controller_conn .run ("try{stopDataNode('" +HOST +":" +port_list [2 ]+"')}catch(ex){}" );
2097- System .out .println (port_list [2 ]+"节点断掉啦---------------------------------------------------" );
2098- Thread .sleep (8000 );
2099- conn3 .run ("n=3000;t=table(1..n as tag,timestamp(1..n) as ts,take(100.0,n) as data);" + "Trades.append!(t)" );
2100- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [2 ]+"')}catch(ex){}" );
2101- Thread .sleep (5000 );
2102-
2103- BasicTable row_num = (BasicTable )conn .run ("select count(*) from Receive" );
2104- System .out .println (row_num .getColumn (0 ).get (0 ));
2105- assertEquals ("3000" ,row_num .getColumn (0 ).get (0 ).getString ());
2106- threadPooledClient .unsubscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" );
2107- }
2108- //@Test(timeout = 180000)
2109- public void Test_ThreadPooledClient_subscribe_backupSites_server_disconnect_1 () throws IOException , InterruptedException {
2110- DBConnection controller_conn = new DBConnection ();
2111- controller_conn .connect (controller_host ,controller_port ,"admin" ,"123456" );
2112- controller_conn .run ("try{startDataNode('" +HOST +":" +port_list [1 ]+"')}catch(ex){}" );
2113- controller_conn .run ("sleep(1000)" );
2114- String script1 = "st1 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
2115- "share(st1,`Trades)\t \n "
2116- + "setStreamTableFilterColumn(objByName(`Trades),`tag)" ;
2117- conn .run (script1 );
2118- String script2 = "st2 = streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE])\n " +
2119- "share(st2, `Receive)\t \n " ;
2120- conn .run (script2 );
2121- DBConnection conn1 = new DBConnection ();
2122- conn1 .connect (HOST ,port_list [1 ],"admin" ,"123456" );
2123- conn1 .run (script1 );
2124- conn1 .run (script2 );
2125-
2126- Vector filter1 = (Vector ) conn .run ("1..50000" );
2127- List <String > backupSites = Arrays .asList (new String []{"192.168.0.69:18921" , "192.168.0.69:18922" , "192.168.0.69:18923" });
2128- threadPooledClient .subscribe (HOST ,port_list [1 ],"Trades" ,"subTread1" ,MessageHandler_handler , -1 ,true ,filter1 , (StreamDeserializer ) null ,true ,"admin" ,"123456" ,false ,backupSites );
2129- System .out .println ("这里可以手工断掉这个集群下所有可用节点http://192.168.0.69:18920/?view=overview-old" );
2130- Thread .sleep (1000000 );
2131- }
21321812 public static MessageHandler MessageHandler_handler_getOffset = new MessageHandler () {
21331813 @ Override
21341814 public void doEvent (IMessage msg ) {
0 commit comments