Skip to content

Commit ad03a42

Browse files
committed
AJ-894:update test case about sendEvent any dict
1 parent 11c65b9 commit ad03a42

File tree

1 file changed

+203
-73
lines changed

1 file changed

+203
-73
lines changed

test/com/xxdb/streaming/client/cep/EventSenderTest.java

Lines changed: 203 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -2394,89 +2394,219 @@ public void test_EventSender_any() throws Exception {
23942394
Assert.assertEquals(100,bt2.rows());
23952395
checkData(bt1,bt2);
23962396
}
2397-
@Test//
2398-
public void test_EventSender_any_1() throws IOException, InterruptedException {
2399-
conn1 = new DBConnection();
2400-
conn1.connect(HOST, PORT, "admin", "123456");
2401-
String script = "share streamTable(1000000:0, `eventType`event, [STRING,BLOB]) as inputTable;\n" +
2402-
"share table(1:0,[\"stringv\",\"doublev\"],[STRING, ANY]) as outputTable;\n" ;
2397+
//@Test//目前会比对不通过,不过通过server订阅 反序列化可以查询出来 后续该case 补充api订阅反序列化的场景
2398+
public void test_EventSender_any_dict_1() throws Exception {
2399+
EventSchema scheme = new EventSchema();
2400+
scheme.setEventType("event_any");
2401+
scheme.setFieldNames(Arrays.asList( "any1", "any2", "any3" ));
2402+
scheme.setFieldTypes(Arrays.asList( DT_ANY, DT_ANY, DT_ANY));
2403+
scheme.setFieldForms(Arrays.asList( DF_VECTOR, DF_VECTOR, DF_VECTOR));
2404+
scheme.setFieldExtraParams(Arrays.asList( null, null, null));
2405+
List<EventSchema> eventSchemas = Collections.singletonList(scheme);
2406+
List<String> eventTimeFields = new ArrayList<>();
2407+
List<String> commonFields = new ArrayList<>();
2408+
String script = "share streamTable(1000000:0, `eventType`event, [STRING,BLOB]) as inputTable;\n" ;
2409+
24032410
conn.run(script);
2404-
String script1 ="class StockTick{\n" +
2405-
" name :: STRING \n" +
2406-
" price :: FLOAT \n" +
2407-
" def StockTick(name_, price_){\n" +
2408-
" name = name_\n" +
2409-
" price = price_\n" +
2410-
" }\n" +
2411-
"}\n" +
2412-
"\n" +
2413-
"class BondDepth{\n" +
2414-
" contract :: STRING \n" +
2415-
" bidYields :: ANY\n" +
2416-
"\n" +
2417-
" def BondDepth(contract_, bidYields_){\n" +
2418-
" contract = contract_\n" +
2419-
" bidYields = bidYields_\n" +
2420-
" }\n" +
2411+
String script1 ="class event_any { \n" +
2412+
" any1 :: ANY \n" +
2413+
" any2 :: ANY \n" +
2414+
" any3 :: ANY\n" +
2415+
" \n" +
2416+
" def event_any() { \n" +
2417+
" any1=dict(`aaa`bbb, `10`99,true)\n" +
2418+
" any2=dict(`aaa1`bbb2, [1..10, 2..5],true)\n" +
2419+
" any3=dict(`aaa11`bbb22, [dict(`p1`p2, `1`2, true), dict(`p11`p22, `100`200, true)])\n" +
2420+
" } \n" +
24212421
"}\n" +
2422-
"\n" +
2423-
"class SimpleShareSearch:CEPMonitor {\n" +
2424-
" //保存最新的 StockTick 事件\n" +
2425-
" newTick :: StockTick \n" +
2426-
" def SimpleShareSearch(){\n" +
2427-
" newTick = StockTick(\"init\", 0.0)\n" +
2428-
" }\n" +
2429-
" def processTick(stockTickEvent)\n" +
2430-
" \n" +
2431-
" def onload() {\n" +
2432-
" addEventListener(handler=processTick, eventType=\"BondDepth\", times=\"all\")\n" +
2433-
" } \n" +
2434-
" def processTick(BondDepth) { \n" +
2435-
" str = \"+++++++++++++++++++++++ BondDepth event received\" + now(true)$STRING\n" +
2436-
" writeLog(str)\n" +
2437-
" }\n" +
2422+
2423+
"schemaTable = table(array(STRING, 0) as eventType, array(STRING, 0) as eventKeys, array(INT[],0 ) as type, array(INT[], 0) as form)\n" +
2424+
"eventType = 'event_any'\n" +
2425+
"eventKeys = 'any1, any2, any3';\n" +
2426+
"typeV = [ANY, ANY, ANY];\n" +
2427+
"formV = [SCALAR, SCALAR, SCALAR];\n" +
2428+
"insert into schemaTable values([eventType], [eventKeys], [typeV], [formV]);\n" +
2429+
"share streamTable( array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as intput1;\n" +
2430+
"try{\ndropStreamEngine(`serInput)\n}catch(ex){\n}\n" +
2431+
"inputSerializer = streamEventSerializer(name=`serInput, eventSchema=schemaTable, outputTable=intput1);";
2432+
conn.run(script1);
2433+
EventSender sender = new EventSender(conn, "inputTable",eventSchemas, eventTimeFields, commonFields);
2434+
String script2 = " event_any1=event_any()\n" +
2435+
"appendEvent(inputSerializer, event_any1)\n";
2436+
conn.run(script2);
2437+
BasicAnyVector any1 = new BasicAnyVector(1);
2438+
BasicDictionary dictionary = new BasicDictionary(DT_STRING, DT_ANY);
2439+
dictionary.put(new BasicString("aaa"), new BasicString("10"));
2440+
dictionary.put(new BasicString("bbb"), new BasicString("99"));
2441+
any1.set(0, dictionary);
2442+
2443+
BasicAnyVector any2 = new BasicAnyVector(1);
2444+
BasicDictionary dictionary2 = new BasicDictionary(DT_STRING, DT_ANY);
2445+
dictionary2.put(new BasicString("aaa1"), new BasicIntVector(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)));
2446+
dictionary2.put(new BasicString("bbb2"), new BasicIntVector(Arrays.asList(2, 3, 4, 5)));
2447+
any2.set(0, dictionary2);
2448+
2449+
BasicAnyVector any3 = new BasicAnyVector(1);
2450+
BasicDictionary dictionary3 = new BasicDictionary(DT_STRING, DT_ANY);
2451+
BasicDictionary dictionaryAAA = new BasicDictionary(DT_STRING, DT_STRING);
2452+
dictionaryAAA.put(new BasicString("p1"), new BasicString("1"));
2453+
dictionaryAAA.put(new BasicString("p2"), new BasicString("2"));
2454+
2455+
BasicDictionary dictionaryBBB = new BasicDictionary(DT_STRING, DT_STRING);
2456+
dictionaryBBB.put(new BasicString("p11"), new BasicString("100"));
2457+
dictionaryBBB.put(new BasicString("p22"), new BasicString("200"));
2458+
2459+
dictionary3.put(new BasicString("aaa11"), dictionaryAAA);
2460+
dictionary3.put(new BasicString("bbb22"), dictionaryBBB);
2461+
any3.set(0, dictionary3);
2462+
2463+
BasicAnyVector any11 = (BasicAnyVector)conn.run("any1=array(ANY,0).append!(dict(`aaa`bbb, `10`99));\n any1");
2464+
BasicAnyVector any22 = (BasicAnyVector)conn.run("any2=array(ANY,0).append!(dict(`aaa1`bbb2, [1..10, 2..5]));\n any2");
2465+
BasicAnyVector any33 = (BasicAnyVector)conn.run("any3=array(ANY,0).append!(dict(`aaa11`bbb22, [dict(`p1`p2, `1`2), dict(`p11`p22, `100`200,true)]));\n any3");
2466+
2467+
List<Entity> attributes = new ArrayList<>();
2468+
attributes.add(any1);
2469+
attributes.add(any2);
2470+
attributes.add(any3);
2471+
sender.sendEvent("event_any",attributes);
2472+
2473+
List<Entity> attributes1 = new ArrayList<>();
2474+
attributes1.add(any11);
2475+
attributes1.add(any22);
2476+
attributes1.add(any33);
2477+
sender.sendEvent("event_any",attributes1);
2478+
2479+
BasicTable bt1 = (BasicTable)conn.run("select * from inputTable ;");
2480+
System.out.println(bt1.getString());
2481+
Assert.assertEquals(2,bt1.rows());
2482+
// Assert.assertEquals(bt1.getColumn(1).get(0), bt1.getColumn(1).get(1));
2483+
BasicTable bt2 = (BasicTable)conn.run("select * from intput1;");
2484+
Assert.assertEquals(1,bt2.rows());
2485+
Assert.assertEquals(bt1.getColumn(1).get(1), bt2.getColumn(1).get(0));
2486+
checkData(bt1,bt2);
2487+
}
2488+
2489+
@Test//使用server订阅反序列化判断数据的正确性
2490+
public void test_EventSender_any_dict_2() throws Exception {
2491+
EventSchema scheme = new EventSchema();
2492+
scheme.setEventType("event_any");
2493+
scheme.setFieldNames(Arrays.asList( "any1", "any2", "any3" ));
2494+
scheme.setFieldTypes(Arrays.asList( DT_ANY, DT_ANY, DT_ANY));
2495+
scheme.setFieldForms(Arrays.asList( DF_VECTOR, DF_VECTOR, DF_VECTOR));
2496+
scheme.setFieldExtraParams(Arrays.asList( null, null, null));
2497+
List<EventSchema> eventSchemas = Collections.singletonList(scheme);
2498+
List<String> eventTimeFields = new ArrayList<>();
2499+
List<String> commonFields = new ArrayList<>();
2500+
String script = "share streamTable(1000000:0, `eventType`event, [STRING,BLOB]) as inputTable;\n" ;
2501+
DBConnection conn = new DBConnection();
2502+
conn.connect(HOST,PORT,"admin","123456");
2503+
conn.run(script);
2504+
String script1 ="try {unsubscribeTable(,`intput, `subopt)} catch(ex) {}\n" +
2505+
"try {dropStreamEngine(`serInput)} catch(ex) {}\n" +
2506+
"try {dropStreamEngine(`cep1)} catch(ex) {}\n" +
2507+
"try{dropStreamTable(`intput)}catch(ex){}\n" +
2508+
"try{undef(`intput,SHARED)}catch(ex){}\n" +
2509+
"go\n" +
2510+
"class event_any { \n" +
2511+
" any1 :: ANY \n" +
2512+
" any2 :: ANY \n" +
2513+
" any3 :: ANY\n" +
2514+
" \n" +
2515+
" def event_any() { \n" +
2516+
" any1=dict(`aaa`bbb, `10`99, true)\n" +
2517+
" any2=dict(`aaa1`bbb2, [1..10, 2..5], true)\n" +
2518+
" any3=dict(`aaa11`bbb22, [dict(`p1`p2, `1`2,true), dict(`p11`p22, `100`200,true)],true)\n" +
2519+
" } \n" +
24382520
"}\n" +
2521+
"class mainMonitor:CEPMonitor{\n" +
2522+
" datas :: ANY VECTOR\n" +
2523+
" any1 :: ANY\n" +
2524+
" any2 :: ANY\n" +
2525+
" any3 :: ANY\n" +
2526+
" def mainMonitor(){\n" +
2527+
" datas = array(ANY, 0, 10)\n" +
2528+
" }\n" +
24392529
"\n" +
2440-
" try {unsubscribeTable(tableName=\"testInput\", actionName=\"test1235\")} catch(ex) {}\n" +
2441-
"go\n" +
2442-
" try {undef(`testInput,SHARED)} catch(ex) {}\n" +
2443-
"share(streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as eventBody), `testInput)\n" +
2444-
"go\n" +
2530+
" def updateData(event)\n" +
24452531
"\n" +
2446-
"try {dropStreamEngine(`simpleMonitor)} catch(ex) {}\n" +
2447-
"createCEPEngine(name=\"simpleMonitor\", monitors=<SimpleShareSearch()>, dummyTable=testInput, eventSchema=[StockTick,BondDepth])\n" +
2532+
" def onunload(){\n" +
2533+
" }\n" +
24482534
"\n" +
2449-
"subscribeTable(tableName=\"testInput\", actionName=\"test1235\", handler=getStreamEngine(`simpleMonitor), msgAsTable=true)\n" +
2450-
"go\n" +
2451-
"eventSchema = getCEPEngineStat(engine=\"simpleMonitor\").eventSchema\n" +
2452-
"try {dropStreamEngine(`testSerializer)} catch(ex) {}\n" +
2453-
"ses = streamEventSerializer(name=\"testSerializer\", eventSchema=eventSchema, outputTable=testInput)\n" +
2535+
" def onload(){\n" +
2536+
" addEventListener(updateData, 'event_any', , 'all')\n" +
2537+
" }\n" +
24542538
"\n" +
2455-
"depth = BondDepth(\"test2\", (111,`1))\n" +
2456-
"ses.appendEvent(depth)";
2539+
" def updateData(event) {\n" +
2540+
" any1 = event.any1\n" +
2541+
" any2 = event.any2\n" +
2542+
" any3 = event.any3\n" +
2543+
" writeLog(\"------------------------\")\n" +
2544+
" // writeLog(event.vobject.trader)\n" +
2545+
" // emitEvent(event)\n" +
2546+
" }\n" +
2547+
"}\n" +
2548+
2549+
"schemaTable = table(array(STRING, 0) as eventType, array(STRING, 0) as eventKeys, array(INT[],0 ) as type, array(INT[], 0) as form)\n" +
2550+
"eventType = 'event_any'\n" +
2551+
"eventKeys = 'any1, any2, any3';\n" +
2552+
"typeV = [ANY, ANY, ANY];\n" +
2553+
"formV = [SCALAR, SCALAR, SCALAR];\n" +
2554+
"insert into schemaTable values([eventType], [eventKeys], [typeV], [formV]);\n" +
2555+
"share streamTable( array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as intput;\n" +
2556+
"try{\ndropStreamEngine(`serInput)\n}catch(ex){\n}\n" +
2557+
"inputSerializer = streamEventSerializer(name=`serInput, eventSchema=schemaTable, outputTable=intput);\n" +
2558+
2559+
"dummy = table(array(STRING, 0) as eventType, array(BLOB, 0) as blobs)\n" +
2560+
"engineCep = createCEPEngine('cep1', <mainMonitor()>, dummy, [event_any], 1, , 10000, getStreamEngine(`serInput))\n" +
2561+
"subscribeTable(,`intput, `subopt, 0, getStreamEngine('cep1'), true);";
24572562
conn.run(script1);
2458-
EventSchema scheme = new EventSchema();
2459-
scheme.setEventType("BondDepth");
2460-
scheme.setFieldNames(Arrays.asList("contract", "bidYields"));
2461-
scheme.setFieldTypes(Arrays.asList( DT_STRING, DT_ANY));
2462-
scheme.setFieldForms(Arrays.asList( DF_SCALAR, DF_VECTOR));
2463-
scheme.setFieldExtraParams(Arrays.asList( 0, 0));
24642563

2465-
List<EventSchema> eventSchemes = Collections.singletonList(scheme);
2466-
List<String> eventTimeFields = new ArrayList<>();
2467-
List<String> commonFields = new ArrayList<>();
2468-
EventSender sender = new EventSender(conn, "inputTable", eventSchemes, eventTimeFields, commonFields);
2469-
EventClient client = new EventClient(eventSchemes, eventTimeFields, commonFields);
2564+
EventSender sender = new EventSender(conn, "intput",eventSchemas, eventTimeFields, commonFields);
2565+
BasicAnyVector any1 = new BasicAnyVector(1);
2566+
BasicDictionary dictionary = new BasicDictionary(DT_STRING, DT_ANY);
2567+
dictionary.put(new BasicString("aaa"), new BasicString("10"));
2568+
dictionary.put(new BasicString("bbb"), new BasicString("99"));
2569+
any1.set(0, dictionary);
2570+
2571+
BasicAnyVector any2 = new BasicAnyVector(1);
2572+
BasicDictionary dictionary2 = new BasicDictionary(DT_STRING, DT_ANY);
2573+
dictionary2.put(new BasicString("aaa1"), new BasicStringVector(new String[]{"1","2","3","4","5","6","7","8","9","10"}));
2574+
dictionary2.put(new BasicString("bbb2"), new BasicStringVector(new String[]{"2","3","4","5"}));
2575+
any2.set(0, dictionary2);
2576+
2577+
BasicAnyVector any3 = new BasicAnyVector(1);
2578+
BasicDictionary dictionary3 = new BasicDictionary(DT_STRING, DT_ANY);
2579+
BasicDictionary dictionaryAAA = new BasicDictionary(DT_STRING, DT_STRING);
2580+
dictionaryAAA.put(new BasicString("p1"), new BasicString("1"));
2581+
dictionaryAAA.put(new BasicString("p2"), new BasicString("2"));
2582+
2583+
BasicDictionary dictionaryBBB = new BasicDictionary(DT_STRING, DT_STRING);
2584+
dictionaryBBB.put(new BasicString("p11"), new BasicString("100"));
2585+
dictionaryBBB.put(new BasicString("p22"), new BasicString("200"));
2586+
2587+
dictionary3.put(new BasicString("aaa11"), dictionaryAAA);
2588+
dictionary3.put(new BasicString("bbb22"), dictionaryBBB);
2589+
any3.set(0, dictionary3);
24702590
List<Entity> attributes = new ArrayList<>();
2471-
attributes.add(new BasicString("test2"));
2472-
BasicAnyVector bbb = (BasicAnyVector)conn.run("(111,`1)");
2473-
attributes.add(bbb);
2474-
sender.sendEvent("BondDepth", attributes);
2475-
BasicTable bt1 = (BasicTable)conn.run("select * from inputTable;");
2476-
Assert.assertEquals(1,bt1.rows());
2477-
BasicTable bt2 = (BasicTable)conn.run("select * from testInput;");
2478-
Assert.assertEquals(1,bt2.rows());
2479-
checkData(bt1,bt2);
2591+
attributes.add(any1);
2592+
attributes.add(any2);
2593+
attributes.add(any3);
2594+
sender.sendEvent("event_any",attributes);
2595+
sleep(1000);
2596+
Entity re1 = conn.run("m = getCEPEngineMonitor(`cep1, `cep1, `mainMonitor);\n m.any1;");
2597+
Entity re2 = conn.run("m = getCEPEngineMonitor(`cep1, `cep1, `mainMonitor);\n m.any2;");
2598+
Entity re3 = conn.run("m = getCEPEngineMonitor(`cep1, `cep1, `mainMonitor);\n m.any3;");
2599+
Assert.assertEquals("(aaa->10\n" +
2600+
"bbb->99\n" +
2601+
")",re1.getString());
2602+
Assert.assertEquals("(aaa1->[1,2,3,4,5,6,7,8,9,10]\n" +
2603+
"bbb2->[2,3,4,5]\n" +
2604+
")",re2.getString());
2605+
Assert.assertEquals("(bbb22->{\n" +
2606+
"{p11,p22}->{100,200}}\n" +
2607+
"aaa11->{\n" +
2608+
"{p1,p2}->{1,2}}\n" +
2609+
")",re3.getString());
24802610
}
24812611

24822612
@Test

0 commit comments

Comments
 (0)