Skip to content

Commit 8a14f2c

Browse files
committed
AJ-894:add test case about sendEvent any
1 parent d5d5095 commit 8a14f2c

File tree

2 files changed

+197
-42
lines changed

2 files changed

+197
-42
lines changed

test/com/xxdb/streaming/client/cep/EventClientTest.java

Lines changed: 54 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1536,40 +1536,68 @@ public void test_EventClient_all_dateType_vector_decimal() throws IOException,
15361536
}
15371537

15381538
//@Test//not support
1539-
public void test_EventClient_all_dateType_ANY() throws IOException, InterruptedException {
1540-
String script0 = "share streamTable(1000000:0, `eventType`event, [STRING,BLOB]) as inputTable;\n"+
1541-
"colNames=\"col\"+string(1..25);\n" +
1542-
"colTypes=[BOOL[],CHAR[],SHORT[],INT[],LONG[],DOUBLE[],FLOAT[],DATE[],MONTH[],TIME[],MINUTE[],SECOND[],DATETIME[],TIMESTAMP[],NANOTIME[],NANOTIMESTAMP[], DATEHOUR[],UUID[],IPADDR[],INT128[],POINT[],COMPLEX[],DECIMAL32(2)[],DECIMAL64(7)[],DECIMAL128(10)[]];\n" +
1543-
"share table(1:0,colNames,colTypes) as outputTable;\n" ;
1544-
conn.run(script0);
1539+
public void test_EventClient_ANY() throws IOException, InterruptedException {
15451540
EventSchema scheme = new EventSchema();
1546-
scheme.setEventType("event_all_array_dateType");
1547-
scheme.setFieldNames(Arrays.asList("boolv", "charv", "shortv", "intv", "longv", "doublev", "floatv", "datev", "monthv", "timev", "minutev", "secondv", "datetimev", "timestampv", "nanotimev", "nanotimestampv", "datehourv", "uuidv", "ippaddrv", "int128v", "pointv", "complexv", "decimal32v", "decimal64v", "decimal128v"));
1548-
scheme.setFieldTypes(Arrays.asList(DT_BOOL_ARRAY, DT_BYTE_ARRAY, DT_SHORT_ARRAY, DT_INT_ARRAY, DT_LONG_ARRAY, DT_DOUBLE_ARRAY, DT_FLOAT_ARRAY, DT_DATE_ARRAY,DT_MONTH_ARRAY, DT_TIME_ARRAY, DT_MINUTE_ARRAY, DT_SECOND_ARRAY, DT_DATETIME_ARRAY, DT_TIMESTAMP_ARRAY, DT_NANOTIME_ARRAY, DT_NANOTIMESTAMP_ARRAY, DT_DATEHOUR_ARRAY, DT_UUID_ARRAY, DT_IPADDR_ARRAY, DT_INT128_ARRAY, DT_POINT_ARRAY, DT_COMPLEX_ARRAY, DT_DECIMAL32_ARRAY, DT_DECIMAL64_ARRAY, DT_DECIMAL128_ARRAY));
1549-
scheme.setFieldForms(Arrays.asList( DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR));
1550-
scheme.setFieldExtraParams(Arrays.asList(null, null, null, null, null, null, null, null, null, null, null, null,null, null, null, null, null, null, null, null, null, null, 2, 7,19));
1541+
scheme.setEventType("event_any");
1542+
scheme.setFieldNames(Arrays.asList( "any1", "any2", "any3" ));
1543+
scheme.setFieldTypes(Arrays.asList( DT_ANY, DT_ANY, DT_ANY));
1544+
scheme.setFieldForms(Arrays.asList( DF_VECTOR, DF_VECTOR, DF_VECTOR));
1545+
scheme.setFieldExtraParams(Arrays.asList( null, null, null));
15511546
List<EventSchema> eventSchemas = Collections.singletonList(scheme);
15521547
List<String> eventTimeFields = new ArrayList<>();
15531548
List<String> commonFields = new ArrayList<>();
1554-
String script = "share streamTable(1000000:0, `eventType`event, [STRING,BLOB]) as inputTable;\n";
1549+
String script = "share streamTable(1000000:0, `eventType`event, [STRING,BLOB]) as inputTable;\n" +
1550+
"share table(1:0,`any1`any2`any3,[ANY,ANY,ANY]) as outputTable;\n" +
1551+
"n=100;\n" +
1552+
"num=3;\n" +
1553+
"intv = int(rand(rand(-100..100, 1000) join take(int(), 4), n));\n" +
1554+
"any1 = cut(take([true, false, NULL], n*num), num);\n" +
1555+
"any2 = cut(take(char(-100..100 join NULL), n*num), num);\n" +
1556+
"any3 = cut(take(short(-100..100 join NULL), n*num), num);\n" +
1557+
"share table(intv, any1, any2, any3) as data;";
15551558
conn.run(script);
1556-
EventSender sender = new EventSender(conn, "inputTable", eventSchemas, eventTimeFields, commonFields);
1559+
String script1 ="class event_any{\n" +
1560+
"\tany1 :: ANY \n" +
1561+
"\tany2 :: ANY \n" +
1562+
"\tany3 :: ANY \n" +
1563+
" def event_any(any1V,any2V,any3V){\n" +
1564+
"\tany1 = any1V\n" +
1565+
"\tany2 = any2V\n" +
1566+
"\tany3 = any3V\n" +
1567+
" \t}\n" +
1568+
"} \n" +
1569+
"schemaTable = table(array(STRING, 0) as eventType, array(STRING, 0) as eventKeys, array(INT[],0 ) as type, array(INT[], 0) as form)\n" +
1570+
"eventType = 'event_any'\n" +
1571+
"eventKeys = 'any1, any2, any3';\n" +
1572+
"typeV = [ANY, ANY, ANY];\n" +
1573+
"formV = [SCALAR, SCALAR, SCALAR];\n" +
1574+
"insert into schemaTable values([eventType], [eventKeys], [typeV], [formV]);\n" +
1575+
"share streamTable( array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as intput1;\n" +
1576+
"try{\ndropStreamEngine(`serInput)\n}catch(ex){\n}\n" +
1577+
"inputSerializer = streamEventSerializer(name=`serInput, eventSchema=schemaTable, outputTable=intput1);";
1578+
conn.run(script1);
1579+
EventSender sender = new EventSender(conn, "inputTable",eventSchemas, eventTimeFields, commonFields);
15571580
EventClient client = new EventClient(eventSchemas, eventTimeFields, commonFields);
1558-
client.subscribe(HOST, PORT, "inputTable", "test1", handler_array, -1, true, "admin", "123456");
1559-
Preparedata_array(100,10);
1581+
//需要新写一个handler
1582+
client.subscribe(HOST, PORT, "intput1", "test1", handler_array, -1, true, "admin", "123456");
1583+
15601584
BasicTable bt = (BasicTable)conn.run("select * from data");
1561-
List<Entity> attributes = new ArrayList<>();
1562-
for(int j=0;j<bt.columns();j++){
1563-
Entity pt = (bt.getColumn(j));
1564-
System.out.println(pt.getDataType());
1565-
System.out.println( j + "列:" + pt.getString());
1566-
attributes.add(pt);
1567-
}
1568-
sender.sendEvent("event_all_array_dateType",attributes);
1569-
BasicTable bt1 = (BasicTable)conn.run("select * from inputTable;");
1570-
Assert.assertEquals(1,bt1.rows());
1571-
Assert.assertEquals(1,bt1.rows());
1585+
String script2 = "data1=select * from data;\n" +
1586+
"for(i in 0..99){\n" +
1587+
" any1v=array(ANY,0).append!(data1.row(i)[`any1])\n" +
1588+
" any2v=array(ANY,0).append!(data1.row(i)[`any2])\n" +
1589+
" any3v=array(ANY,0).append!(data1.row(i)[`any3])\n" +
1590+
" event_any1=event_any( any1v, any2v, any3v)\n" +
1591+
" appendEvent(inputSerializer, event_any1)\n" +
1592+
"\t}" ;
1593+
conn.run(script2);
15721594
sleep(2000);
1595+
BasicTable bt1 = (BasicTable)conn.run("select * from data;");
1596+
System.out.println(bt1.getString());
1597+
Assert.assertEquals(100,bt1.rows());
1598+
BasicTable bt2 = (BasicTable)conn.run("select * from outputTable;");
1599+
Assert.assertEquals(100,bt2.rows());
1600+
checkData(bt1,bt2);
15731601
client.unsubscribe(HOST, PORT, "inputTable", "test1");
15741602
}
15751603
@Test

test/com/xxdb/streaming/client/cep/EventSenderTest.java

Lines changed: 143 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import static com.xxdb.Prepare.*;
1818
import static com.xxdb.data.Entity.DATA_TYPE.*;
1919
import static com.xxdb.data.Entity.DATA_FORM.*;
20+
import static java.lang.Thread.sleep;
2021

2122
public class EventSenderTest {
2223
public static DBConnection conn ;
@@ -2325,33 +2326,159 @@ public void test_EventClient_vector_symbol() throws IOException, InterruptedExc
23252326
checkData(bt1,bt2);
23262327
client.unsubscribe(HOST, PORT, "intput1", "test1");
23272328
}
2328-
//@Test//not support
2329-
public void test_EventSender_all_dateType_any() throws IOException {
2329+
@Test
2330+
public void test_EventSender_any() throws Exception {
23302331
EventSchema scheme = new EventSchema();
2331-
scheme.setEventType("event_all_array_dateType");
2332-
scheme.setFieldNames(Arrays.asList("boolv", "charv", "shortv", "intv", "longv", "doublev", "floatv", "datev", "monthv", "timev", "minutev", "secondv", "datetimev", "timestampv", "nanotimev", "nanotimestampv", "datehourv", "uuidv", "ippaddrv", "int128v", "pointv", "complexv", "decimal32v", "decimal64v", "decimal128v"));
2333-
scheme.setFieldTypes(Arrays.asList(DT_BOOL_ARRAY, DT_BYTE_ARRAY, DT_SHORT_ARRAY, DT_INT_ARRAY, DT_LONG_ARRAY, DT_DOUBLE_ARRAY, DT_FLOAT_ARRAY, DT_DATE_ARRAY,DT_MONTH_ARRAY, DT_TIME_ARRAY, DT_MINUTE_ARRAY, DT_SECOND_ARRAY, DT_DATETIME_ARRAY, DT_TIMESTAMP_ARRAY, DT_NANOTIME_ARRAY, DT_NANOTIMESTAMP_ARRAY, DT_DATEHOUR_ARRAY, DT_UUID_ARRAY, DT_IPADDR_ARRAY, DT_INT128_ARRAY, DT_POINT_ARRAY, DT_COMPLEX_ARRAY, DT_DECIMAL32_ARRAY, DT_DECIMAL64_ARRAY, DT_DECIMAL128_ARRAY));
2334-
scheme.setFieldForms(Arrays.asList( DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR, DF_VECTOR));
2335-
scheme.setFieldExtraParams(Arrays.asList(null, null, null, null, null, null, null, null, null, null, null, null,null, null, null, null, null, null, null, null, null, null, 2, 7,19));
2332+
scheme.setEventType("event_any");
2333+
scheme.setFieldNames(Arrays.asList( "any1", "any2", "any3" ));
2334+
scheme.setFieldTypes(Arrays.asList( DT_ANY, DT_ANY, DT_ANY));
2335+
scheme.setFieldForms(Arrays.asList( DF_VECTOR, DF_VECTOR, DF_VECTOR));
2336+
scheme.setFieldExtraParams(Arrays.asList( null, null, null));
23362337
List<EventSchema> eventSchemas = Collections.singletonList(scheme);
23372338
List<String> eventTimeFields = new ArrayList<>();
23382339
List<String> commonFields = new ArrayList<>();
2339-
String script = "share streamTable(1000000:0, `eventType`event, [STRING,BLOB]) as inputTable;\n";
2340+
String script = "share streamTable(1000000:0, `eventType`event, [STRING,BLOB]) as inputTable;\n" +
2341+
"n=100;\n" +
2342+
"num=3;\n" +
2343+
"intv = int(rand(rand(-100..100, 1000) join take(int(), 4), n));\n" +
2344+
"any1 = cut(take([true, false, NULL], n*num), num);\n" +
2345+
"any2 = cut(take(char(-100..100 join NULL), n*num), num);\n" +
2346+
"any3 = cut(take(short(-100..100 join NULL), n*num), num);\n" +
2347+
"share table(intv, any1, any2, any3) as data;";
23402348
conn.run(script);
2349+
String script1 ="class event_any{\n" +
2350+
"\tany1 :: ANY \n" +
2351+
"\tany2 :: ANY \n" +
2352+
"\tany3 :: ANY \n" +
2353+
" def event_any(any1V,any2V,any3V){\n" +
2354+
"\tany1 = any1V\n" +
2355+
"\tany2 = any2V\n" +
2356+
"\tany3 = any3V\n" +
2357+
" \t}\n" +
2358+
"} \n" +
2359+
"schemaTable = table(array(STRING, 0) as eventType, array(STRING, 0) as eventKeys, array(INT[],0 ) as type, array(INT[], 0) as form)\n" +
2360+
"eventType = 'event_any'\n" +
2361+
"eventKeys = 'any1, any2, any3';\n" +
2362+
"typeV = [ANY, ANY, ANY];\n" +
2363+
"formV = [SCALAR, SCALAR, SCALAR];\n" +
2364+
"insert into schemaTable values([eventType], [eventKeys], [typeV], [formV]);\n" +
2365+
"share streamTable( array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as intput1;\n" +
2366+
"try{\ndropStreamEngine(`serInput)\n}catch(ex){\n}\n" +
2367+
"inputSerializer = streamEventSerializer(name=`serInput, eventSchema=schemaTable, outputTable=intput1);";
2368+
conn.run(script1);
23412369
EventSender sender = new EventSender(conn, "inputTable",eventSchemas, eventTimeFields, commonFields);
2342-
Preparedata_array(100,10);
23432370
BasicTable bt = (BasicTable)conn.run("select * from data");
2344-
List<Entity> attributes = new ArrayList<>();
2345-
for(int j=0;j<bt.columns();j++){
2346-
Entity pt = (bt.getColumn(j));
2347-
System.out.println(pt.getDataType());
2348-
System.out.println( j + "列:" + pt.getString());
2349-
attributes.add(pt);
2371+
String script2 = "data1=select * from data;\n" +
2372+
"for(i in 0..99){\n" +
2373+
" any1v=array(ANY,0).append!(data1.row(i)[`any1])\n" +
2374+
" any2v=array(ANY,0).append!(data1.row(i)[`any2])\n" +
2375+
" any3v=array(ANY,0).append!(data1.row(i)[`any3])\n" +
2376+
" event_any1=event_any( any1v, any2v, any3v)\n" +
2377+
" appendEvent(inputSerializer, event_any1)\n" +
2378+
"\t}" ;
2379+
conn.run(script2);
2380+
Vector column = bt.getColumn(1);
2381+
for(int i=0;i<bt.rows();i++){
2382+
List<Entity> attributes = new ArrayList<>();
2383+
for(int j=1;j<bt.columns();j++){
2384+
BasicAnyVector bav = new BasicAnyVector(1);
2385+
bav.set(0,bt.getColumn(j).get(i));
2386+
attributes.add(bav);
2387+
}
2388+
sender.sendEvent("event_any",attributes);
23502389
}
2351-
sender.sendEvent("event_all_array_dateType",attributes);
2390+
BasicTable bt1 = (BasicTable)conn.run("select * from inputTable;");
2391+
System.out.println(bt1.getString());
2392+
Assert.assertEquals(100,bt1.rows());
2393+
BasicTable bt2 = (BasicTable)conn.run("select * from intput1;");
2394+
Assert.assertEquals(100,bt2.rows());
2395+
checkData(bt1,bt2);
2396+
}
2397+
@Test//
2398+
public void test_EventSender_any_1() throws IOException, InterruptedException {
2399+
conn1 = new DBConnection();
2400+
conn1.connect(HOST, PORT, "admin", "123456");
2401+
String script = "share streamTable(1000000:0, `eventType`event, [STRING,BLOB]) as inputTable;\n" +
2402+
"share table(1:0,[\"stringv\",\"doublev\"],[STRING, ANY]) as outputTable;\n" ;
2403+
conn.run(script);
2404+
String script1 ="class StockTick{\n" +
2405+
" name :: STRING \n" +
2406+
" price :: FLOAT \n" +
2407+
" def StockTick(name_, price_){\n" +
2408+
" name = name_\n" +
2409+
" price = price_\n" +
2410+
" }\n" +
2411+
"}\n" +
2412+
"\n" +
2413+
"class BondDepth{\n" +
2414+
" contract :: STRING \n" +
2415+
" bidYields :: ANY\n" +
2416+
"\n" +
2417+
" def BondDepth(contract_, bidYields_){\n" +
2418+
" contract = contract_\n" +
2419+
" bidYields = bidYields_\n" +
2420+
" }\n" +
2421+
"}\n" +
2422+
"\n" +
2423+
"class SimpleShareSearch:CEPMonitor {\n" +
2424+
" //保存最新的 StockTick 事件\n" +
2425+
" newTick :: StockTick \n" +
2426+
" def SimpleShareSearch(){\n" +
2427+
" newTick = StockTick(\"init\", 0.0)\n" +
2428+
" }\n" +
2429+
" def processTick(stockTickEvent)\n" +
2430+
" \n" +
2431+
" def onload() {\n" +
2432+
" addEventListener(handler=processTick, eventType=\"BondDepth\", times=\"all\")\n" +
2433+
" } \n" +
2434+
" def processTick(BondDepth) { \n" +
2435+
" str = \"+++++++++++++++++++++++ BondDepth event received\" + now(true)$STRING\n" +
2436+
" writeLog(str)\n" +
2437+
" }\n" +
2438+
"}\n" +
2439+
"\n" +
2440+
" try {unsubscribeTable(tableName=\"testInput\", actionName=\"test1235\")} catch(ex) {}\n" +
2441+
"go\n" +
2442+
" try {undef(`testInput,SHARED)} catch(ex) {}\n" +
2443+
"share(streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as eventBody), `testInput)\n" +
2444+
"go\n" +
2445+
"\n" +
2446+
"try {dropStreamEngine(`simpleMonitor)} catch(ex) {}\n" +
2447+
"createCEPEngine(name=\"simpleMonitor\", monitors=<SimpleShareSearch()>, dummyTable=testInput, eventSchema=[StockTick,BondDepth])\n" +
2448+
"\n" +
2449+
"subscribeTable(tableName=\"testInput\", actionName=\"test1235\", handler=getStreamEngine(`simpleMonitor), msgAsTable=true)\n" +
2450+
"go\n" +
2451+
"eventSchema = getCEPEngineStat(engine=\"simpleMonitor\").eventSchema\n" +
2452+
"try {dropStreamEngine(`testSerializer)} catch(ex) {}\n" +
2453+
"ses = streamEventSerializer(name=\"testSerializer\", eventSchema=eventSchema, outputTable=testInput)\n" +
2454+
"\n" +
2455+
"depth = BondDepth(\"test2\", (111,`1))\n" +
2456+
"ses.appendEvent(depth)";
2457+
conn.run(script1);
2458+
EventSchema scheme = new EventSchema();
2459+
scheme.setEventType("BondDepth");
2460+
scheme.setFieldNames(Arrays.asList("contract", "bidYields"));
2461+
scheme.setFieldTypes(Arrays.asList( DT_STRING, DT_ANY));
2462+
scheme.setFieldForms(Arrays.asList( DF_SCALAR, DF_VECTOR));
2463+
scheme.setFieldExtraParams(Arrays.asList( 0, 0));
2464+
2465+
List<EventSchema> eventSchemes = Collections.singletonList(scheme);
2466+
List<String> eventTimeFields = new ArrayList<>();
2467+
List<String> commonFields = new ArrayList<>();
2468+
EventSender sender = new EventSender(conn, "inputTable", eventSchemes, eventTimeFields, commonFields);
2469+
EventClient client = new EventClient(eventSchemes, eventTimeFields, commonFields);
2470+
List<Entity> attributes = new ArrayList<>();
2471+
attributes.add(new BasicString("test2"));
2472+
BasicAnyVector bbb = (BasicAnyVector)conn.run("(111,`1)");
2473+
attributes.add(bbb);
2474+
sender.sendEvent("BondDepth", attributes);
23522475
BasicTable bt1 = (BasicTable)conn.run("select * from inputTable;");
23532476
Assert.assertEquals(1,bt1.rows());
2477+
BasicTable bt2 = (BasicTable)conn.run("select * from testInput;");
2478+
Assert.assertEquals(1,bt2.rows());
2479+
checkData(bt1,bt2);
23542480
}
2481+
23552482
@Test
23562483
public void test_EventSender_Double_vector() throws IOException, InterruptedException {
23572484
String script = "share streamTable(1000000:0, `eventType`event, [STRING,BLOB]) as inputTable;\n"+

0 commit comments

Comments
 (0)