Skip to content

Commit d5d5095

Browse files
committed
AJ-931:add test case about filter
1 parent a0c6642 commit d5d5095

File tree

1 file changed

+331
-0
lines changed

1 file changed

+331
-0
lines changed

test/com/xxdb/streaming/client/cep/EventClientTest.java

Lines changed: 331 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1757,4 +1757,335 @@ public void test_EventClient_Double_vector() throws IOException, InterruptedExc
17571757
"------- ------- --------\n" +
17581758
"test2 [1,1.1] [1,1.1] \n", bt3.getString());
17591759
}
1760+
1761+
@Test//客户真实场景
1762+
public void test_EventClient_Double_vector_1() throws IOException, InterruptedException {
1763+
conn1 = new DBConnection();
1764+
conn1.connect(HOST, PORT, "admin", "123456");
1765+
String script = "share streamTable(1000000:0, `eventType`event, [STRING,BLOB]) as inputTable;\n" +
1766+
"share table(1:0,[\"stringv\",\"doublev\",\"double1v\"],[STRING, DOUBLE[],DOUBLE[]]) as outputTable;\n" ;
1767+
conn.run(script);
1768+
String script1 ="class StockTick{\n" +
1769+
" name :: STRING \n" +
1770+
" price :: FLOAT \n" +
1771+
" def StockTick(name_, price_){\n" +
1772+
" name = name_\n" +
1773+
" price = price_\n" +
1774+
" }\n" +
1775+
"}\n" +
1776+
"\n" +
1777+
"class BondDepth{\n" +
1778+
" contract :: STRING \n" +
1779+
" bidYields :: DOUBLE VECTOR \n" +
1780+
" ofrYields :: DOUBLE VECTOR \n" +
1781+
" bidouantities :: INT VECTOR \n" +
1782+
" ofrouantities :: INT VECTOR \n" +
1783+
"\n" +
1784+
" def BondDepth(contract_, bidYields_, ofrYields_,bidouantities_,ofrouantities_){\n" +
1785+
" contract = contract_\n" +
1786+
" bidYields = bidYields_\n" +
1787+
" ofrYields = ofrYields_\n" +
1788+
" bidouantities = bidouantities_\n" +
1789+
" ofrouantities = ofrouantities_\n" +
1790+
" }\n" +
1791+
"}\n" +
1792+
"\n" +
1793+
"class SimpleShareSearch:CEPMonitor {\n" +
1794+
" //保存最新的 StockTick 事件\n" +
1795+
" newTick :: StockTick \n" +
1796+
" def SimpleShareSearch(){\n" +
1797+
" newTick = StockTick(\"init\", 0.0)\n" +
1798+
" }\n" +
1799+
" def processTick(stockTickEvent)\n" +
1800+
" \n" +
1801+
" def onload() {\n" +
1802+
" addEventListener(handler=processTick, eventType=\"BondDepth\", times=\"all\")\n" +
1803+
" } \n" +
1804+
" def processTick(BondDepth) { \n" +
1805+
" str = \"+++++++++++++++++++++++ BondDepth event received\" + now(true)$STRING\n" +
1806+
" writeLog(str)\n" +
1807+
" }\n" +
1808+
"}\n" +
1809+
"\n" +
1810+
" try {unsubscribeTable(tableName=\"testInput\", actionName=\"test1235\")} catch(ex) {}\n" +
1811+
"go\n" +
1812+
" try {undef(`testInput,SHARED)} catch(ex) {}\n" +
1813+
"share(streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as eventBody), `testInput)\n" +
1814+
"go\n" +
1815+
"\n" +
1816+
"try {dropStreamEngine(`simpleMonitor)} catch(ex) {}\n" +
1817+
"createCEPEngine(name=\"simpleMonitor\", monitors=<SimpleShareSearch()>, dummyTable=testInput, eventSchema=[StockTick,BondDepth])\n" +
1818+
"\n" +
1819+
"subscribeTable(tableName=\"testInput\", actionName=\"test1235\", handler=getStreamEngine(`simpleMonitor), msgAsTable=true)\n" +
1820+
"go\n" +
1821+
"eventSchema = getCEPEngineStat(engine=\"simpleMonitor\").eventSchema\n" +
1822+
"try {dropStreamEngine(`testSerializer)} catch(ex) {}\n" +
1823+
"ses = streamEventSerializer(name=\"testSerializer\", eventSchema=eventSchema, outputTable=testInput)\n" +
1824+
"\n" +
1825+
"depth = BondDepth(\"test2.11TB\", [3.01], [3.0], [10000000], [10000000])\n" +
1826+
"ses.appendEvent(depth)";
1827+
conn.run(script1);
1828+
EventSchema scheme = new EventSchema();
1829+
scheme.setEventType("BondDepth");
1830+
scheme.setFieldNames(Arrays.asList("contract", "bidYields", "ofrYields", "bidouantities", "ofrouantities"));
1831+
scheme.setFieldTypes(Arrays.asList( DT_STRING, DT_DOUBLE, DT_DOUBLE, DT_INT, DT_INT));
1832+
scheme.setFieldForms(Arrays.asList( DF_SCALAR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR));
1833+
//scheme.setFieldExtraParams(Arrays.asList( 0, 0, 0));
1834+
1835+
List<EventSchema> eventSchemes = Collections.singletonList(scheme);
1836+
List<String> eventTimeFields = new ArrayList<>();
1837+
List<String> commonFields = new ArrayList<>();
1838+
EventSender sender = new EventSender(conn, "testInput", eventSchemes, eventTimeFields, commonFields);
1839+
List<Entity> attributes = new ArrayList<>();
1840+
attributes.add(new BasicString("test2.11TB"));
1841+
attributes.add(new BasicDoubleVector(new double[]{3.01}));
1842+
attributes.add(new BasicDoubleVector(new double[]{3.0}));
1843+
attributes.add(new BasicIntVector((Collections.singletonList(10000000))));
1844+
attributes.add(new BasicIntVector((Collections.singletonList(10000000))));
1845+
sender.sendEvent("BondDepth", attributes);
1846+
1847+
BasicTable bt1 = (BasicTable)conn.run("select * from testInput;");
1848+
Assert.assertEquals(2,bt1.rows());
1849+
System.out.println(bt1.getString());
1850+
Assert.assertEquals(bt1.getColumn(0).get(0),bt1.getColumn(0).get(1));
1851+
Assert.assertEquals(bt1.getColumn(1).get(0),bt1.getColumn(1).get(1));
1852+
}
1853+
public static EventMessageHandler handler_schema_two = new EventMessageHandler() {
1854+
@Override
1855+
public void doEvent(String eventType, List<Entity> attribute) {
1856+
// System.out.println("eventType: " + eventType);
1857+
// System.out.println(attribute.toString());
1858+
try {
1859+
if(eventType.contains("event_bool")){
1860+
conn.run("tableInsert{outputTable1}", attribute);
1861+
}else{
1862+
conn.run("tableInsert{outputTable2}", attribute);
1863+
}
1864+
1865+
} catch (IOException e) {
1866+
throw new RuntimeException(e);
1867+
}
1868+
}
1869+
};
1870+
@Test
1871+
public void test_EventClient_subscribe_filter_null() throws IOException, InterruptedException {
1872+
String script = "share streamTable(1:0, `eventType`blobs, [STRING,BLOB]) as inputTable;\n" +
1873+
"share table(100:0, `boolv`intv, [BOOL, INT]) as outputTable1;\n" +
1874+
"share table(100:0, `boolv`intv, [BOOL, INT]) as outputTable2;\n";
1875+
conn.run(script);
1876+
String script1 =
1877+
" class event_bool{\n" +
1878+
"boolv :: BOOL\n" +
1879+
"intv :: INT\n" +
1880+
"\n" +
1881+
" def event_bool(bool,int){\n" +
1882+
"boolv = bool\n" +
1883+
"intv = int\n" +
1884+
" }\n" +
1885+
"} " +
1886+
"class event_int{\n" +
1887+
"boolv :: BOOL\n" +
1888+
"intv :: INT\n" +
1889+
"\n" +
1890+
" def event_int(bool,int){\n" +
1891+
"boolv = bool\n" +
1892+
"intv = int\n" +
1893+
" }\n" +
1894+
"} " +
1895+
"schemaTable = table(array(STRING, 0) as eventType, array(STRING, 0) as eventKeys, array(INT[], ) as type, array(INT[], 0) as form)\n" +
1896+
"eventType = ['event_bool', 'event_int']\n" +
1897+
"eventKeys = ['boolv,intv','boolv,intv'];\n" +
1898+
"typeV = [[BOOL, INT],[BOOL, INT]];\n" +
1899+
"formV = [[SCALAR, SCALAR], [SCALAR, SCALAR]];\n" +
1900+
"insert into schemaTable values(eventType, eventKeys, typeV, formV);\n" +
1901+
"share streamTable( array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as intput;\n" +
1902+
"setStreamTableFilterColumn(objByName(`intput),`eventType);\n" +
1903+
"try{\ndropStreamEngine(`serInput)\n}catch(ex){\n}\n" +
1904+
"inputSerializer = streamEventSerializer(name=`serInput, eventSchema=schemaTable, outputTable=intput);" ;
1905+
conn.run(script1);
1906+
EventSchema scheme = new EventSchema();
1907+
scheme.setEventType("event_bool");
1908+
scheme.setFieldNames(Arrays.asList("boolv", "intv"));
1909+
scheme.setFieldTypes(Arrays.asList(DT_BOOL, DT_INT));
1910+
scheme.setFieldForms(Arrays.asList(DF_SCALAR, DF_SCALAR));
1911+
scheme.setFieldExtraParams(Arrays.asList(null, null));
1912+
1913+
EventSchema scheme1 = new EventSchema();
1914+
scheme1.setEventType("event_int");
1915+
scheme1.setFieldNames(Arrays.asList("boolv", "intv"));
1916+
scheme1.setFieldTypes(Arrays.asList(DT_BOOL, DT_INT));
1917+
scheme1.setFieldForms(Arrays.asList(DF_SCALAR, DF_SCALAR));
1918+
scheme1.setFieldExtraParams(Arrays.asList(null, null));
1919+
List<EventSchema> eventSchemas = new ArrayList<>();
1920+
eventSchemas.add(scheme);
1921+
eventSchemas.add(scheme1);
1922+
List<String> eventTimeFields = new ArrayList<>();
1923+
List<String> commonFields = new ArrayList<>();
1924+
1925+
EventClient client = new EventClient(eventSchemas, eventTimeFields, commonFields);
1926+
1927+
client.subscribe(HOST, PORT, "intput", "test1", handler_schema_two, -1, true,null, "admin", "123456");
1928+
String script2 = "event1 = event_bool(false,1);\n" +
1929+
"event2 = event_bool(false,2);\n" +
1930+
"event3 = event_int(true,3);\n" +
1931+
"event4 = event_int(true,4);\n" +
1932+
"appendEvent(inputSerializer, [event1, event2, event3, event4]);";
1933+
conn.run(script2);
1934+
sleep(5000);
1935+
BasicTable bt2 = (BasicTable)conn.run("select * from outputTable1;");
1936+
Assert.assertEquals(2,bt2.rows());
1937+
Assert.assertEquals("boolv intv\n" +
1938+
"----- ----\n" +
1939+
"false 1 \n" +
1940+
"false 2 \n", bt2.getString());
1941+
BasicTable bt3 = (BasicTable)conn.run("select * from outputTable2;");
1942+
Assert.assertEquals(2,bt3.rows());
1943+
Assert.assertEquals("boolv intv\n" +
1944+
"----- ----\n" +
1945+
"true 3 \n" +
1946+
"true 4 \n", bt3.getString());
1947+
}
1948+
1949+
@Test
1950+
public void test_EventClient_subscribe_filter() throws IOException, InterruptedException {
1951+
String script = "share streamTable(1:0, `eventType`blobs, [STRING,BLOB]) as inputTable;\n" +
1952+
"share table(100:0, `boolv`intv, [BOOL, INT]) as outputTable;\n";
1953+
conn.run(script);
1954+
String script1 =
1955+
" class event_bool{\n" +
1956+
"boolv :: BOOL\n" +
1957+
"intv :: INT\n" +
1958+
"\n" +
1959+
" def event_bool(bool,int){\n" +
1960+
"boolv = bool\n" +
1961+
"intv = int\n" +
1962+
" }\n" +
1963+
"} " +
1964+
"class event_int{\n" +
1965+
"boolv :: BOOL\n" +
1966+
"intv :: INT\n" +
1967+
"\n" +
1968+
" def event_int(bool,int){\n" +
1969+
"boolv = bool\n" +
1970+
"intv = int\n" +
1971+
" }\n" +
1972+
"} " +
1973+
"schemaTable = table(array(STRING, 0) as eventType, array(STRING, 0) as eventKeys, array(INT[], ) as type, array(INT[], 0) as form)\n" +
1974+
"eventType = ['event_bool', 'event_int']\n" +
1975+
"eventKeys = ['boolv,intv','boolv,intv'];\n" +
1976+
"typeV = [[BOOL, INT],[BOOL, INT]];\n" +
1977+
"formV = [[SCALAR, SCALAR], [SCALAR, SCALAR]];\n" +
1978+
"insert into schemaTable values(eventType, eventKeys, typeV, formV);\n" +
1979+
"share streamTable( array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as intput;\n" +
1980+
"setStreamTableFilterColumn(objByName(`intput),`eventType);\n" +
1981+
"try{\ndropStreamEngine(`serInput)\n}catch(ex){\n}\n" +
1982+
"inputSerializer = streamEventSerializer(name=`serInput, eventSchema=schemaTable, outputTable=intput);" ;
1983+
conn.run(script1);
1984+
EventSchema scheme = new EventSchema();
1985+
scheme.setEventType("event_bool");
1986+
scheme.setFieldNames(Arrays.asList("boolv", "intv"));
1987+
scheme.setFieldTypes(Arrays.asList(DT_BOOL, DT_INT));
1988+
scheme.setFieldForms(Arrays.asList(DF_SCALAR, DF_SCALAR));
1989+
scheme.setFieldExtraParams(Arrays.asList(null, null));
1990+
List<EventSchema> eventSchemas = Collections.singletonList(scheme);
1991+
1992+
List<String> eventTimeFields = new ArrayList<>();
1993+
List<String> commonFields = new ArrayList<>();
1994+
1995+
EventClient client = new EventClient(eventSchemas, eventTimeFields, commonFields);
1996+
Vector filter1 =new BasicStringVector(new String[]{"event_bool"});
1997+
client.subscribe(HOST, PORT, "intput", "test1", handler, -1, true, filter1, "admin", "123456");
1998+
1999+
String script2 = "event1 = event_bool(false,1);\n" +
2000+
"event2 = event_bool(false,2);\n" +
2001+
"event3 = event_int(true,3);\n" +
2002+
"event4 = event_int(true,4);\n" +
2003+
"appendEvent(inputSerializer, [event1, event2, event3, event4]);";
2004+
conn.run(script2);
2005+
sleep(5000);
2006+
BasicTable bt2 = (BasicTable)conn.run("select * from outputTable;");
2007+
Assert.assertEquals(2,bt2.rows());
2008+
Assert.assertEquals("boolv intv\n" +
2009+
"----- ----\n" +
2010+
"false 1 \n" +
2011+
"false 2 \n", bt2.getString());
2012+
}
2013+
2014+
@Test
2015+
public void test_EventClient_subscribe_filter_1() throws IOException, InterruptedException {
2016+
String script = "share streamTable(1:0, `eventType`blobs, [STRING,BLOB]) as inputTable;\n" +
2017+
"share table(100:0, `boolv`intv, [BOOL, INT]) as outputTable;\n";
2018+
conn.run(script);
2019+
String script1 =
2020+
" class event_bool{\n" +
2021+
"boolv :: BOOL\n" +
2022+
"intv :: INT\n" +
2023+
"\n" +
2024+
" def event_bool(bool,int){\n" +
2025+
"boolv = bool\n" +
2026+
"intv = int\n" +
2027+
" }\n" +
2028+
"} " +
2029+
"class event_int{\n" +
2030+
"boolv :: BOOL\n" +
2031+
"intv :: INT\n" +
2032+
"\n" +
2033+
" def event_int(bool,int){\n" +
2034+
"boolv = bool\n" +
2035+
"intv = int\n" +
2036+
" }\n" +
2037+
"} " +
2038+
"schemaTable = table(array(STRING, 0) as eventType, array(STRING, 0) as eventKeys, array(INT[], ) as type, array(INT[], 0) as form)\n" +
2039+
"eventType = ['event_bool', 'event_int']\n" +
2040+
"eventKeys = ['boolv,intv','boolv,intv'];\n" +
2041+
"typeV = [[BOOL, INT],[BOOL, INT]];\n" +
2042+
"formV = [[SCALAR, SCALAR], [SCALAR, SCALAR]];\n" +
2043+
"insert into schemaTable values(eventType, eventKeys, typeV, formV);\n" +
2044+
"share streamTable( array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as intput;\n" +
2045+
"setStreamTableFilterColumn(objByName(`intput),`eventType);\n" +
2046+
"try{\ndropStreamEngine(`serInput)\n}catch(ex){\n}\n" +
2047+
"inputSerializer = streamEventSerializer(name=`serInput, eventSchema=schemaTable, outputTable=intput);" ;
2048+
conn.run(script1);
2049+
EventSchema scheme = new EventSchema();
2050+
scheme.setEventType("event_bool");
2051+
scheme.setFieldNames(Arrays.asList("boolv", "intv"));
2052+
scheme.setFieldTypes(Arrays.asList(DT_BOOL, DT_INT));
2053+
scheme.setFieldForms(Arrays.asList(DF_SCALAR, DF_SCALAR));
2054+
scheme.setFieldExtraParams(Arrays.asList(null, null));
2055+
2056+
EventSchema scheme1 = new EventSchema();
2057+
scheme1.setEventType("event_int");
2058+
scheme1.setFieldNames(Arrays.asList("boolv", "intv"));
2059+
scheme1.setFieldTypes(Arrays.asList(DT_BOOL, DT_INT));
2060+
scheme1.setFieldForms(Arrays.asList(DF_SCALAR, DF_SCALAR));
2061+
scheme1.setFieldExtraParams(Arrays.asList(null, null));
2062+
List<EventSchema> eventSchemas = new ArrayList<>();
2063+
eventSchemas.add(scheme);
2064+
eventSchemas.add(scheme1);
2065+
List<String> eventTimeFields = new ArrayList<>();
2066+
List<String> commonFields = new ArrayList<>();
2067+
2068+
EventClient client = new EventClient(eventSchemas, eventTimeFields, commonFields);
2069+
Vector filter1 =new BasicStringVector(new String[]{"event_bool"});
2070+
client.subscribe(HOST, PORT, "intput", "test1", handler, -1, true, filter1, "admin", "123456");
2071+
2072+
Vector filter2 =new BasicStringVector(new String[]{"event_int"});
2073+
client.subscribe(HOST, PORT, "intput", "test2", handler, -1, true, filter2, "admin", "123456");
2074+
2075+
String script2 = "event1 = event_bool(false,1);\n" +
2076+
"event2 = event_bool(false,2);\n" +
2077+
"event3 = event_int(true,3);\n" +
2078+
"event4 = event_int(true,4);\n" +
2079+
"appendEvent(inputSerializer, [event1, event2, event3, event4]);";
2080+
conn.run(script2);
2081+
sleep(2000);
2082+
BasicTable bt2 = (BasicTable)conn.run("select * from outputTable;");
2083+
Assert.assertEquals(4,bt2.rows());
2084+
Assert.assertEquals("boolv intv\n" +
2085+
"----- ----\n" +
2086+
"true 3 \n" +
2087+
"false 1 \n" +
2088+
"true 4 \n" +
2089+
"false 2 \n", bt2.getString());
2090+
}
17602091
}

0 commit comments

Comments
 (0)