@@ -2325,6 +2325,115 @@ public void test_EventClient_vector_decimal_1() throws IOException, Interrupted
23252325 Assert .assertEquals (1 ,bt2 .rows ());
23262326 checkData (bt1 ,bt2 );
23272327 }
2328+ public static EventMessageHandler handler_string = new EventMessageHandler () {
2329+ @ Override
2330+ public void doEvent (String eventType , List <Entity > attribute ) {
2331+ System .out .println ("eventType: " + eventType );
2332+ System .out .println (attribute .toString ());
2333+ System .out .println (eventType .equals ("event_string" ));
2334+ try {
2335+ conn .run ("tableInsert{outputTable}" , attribute );
2336+ } catch (IOException e ) {
2337+ throw new RuntimeException (e );
2338+ }
2339+ }
2340+ };
2341+ @ Test
2342+ public void test_EventClient_vector_string () throws IOException , InterruptedException {
2343+ String script = "share streamTable(1000000:0, `eventType`event, [STRING,BLOB]) as inputTable;\n " +
2344+ "share table(1:0,[\" col1\" ],[STRING]) as outputTable;\n " ;
2345+ conn .run (script );
2346+ String script1 ="class event_string{\n " +
2347+ "\t stringv :: STRING VECTOR\n " +
2348+ " def event_string(string){\n " +
2349+ "\t stringv = string\n " +
2350+ " \t }\n " +
2351+ "} \n " +
2352+ "schemaTable = table(array(STRING, 0) as eventType, array(STRING, 0) as eventKeys, array(INT[], ) as type, array(INT[], 0) as form)\n " +
2353+ "eventType = 'event_string'\n " +
2354+ "eventKeys = 'stringv';\n " +
2355+ "typeV = [ STRING];\n " +
2356+ "formV = [ VECTOR];\n " +
2357+ "insert into schemaTable values([eventType], [eventKeys], [typeV],[formV]);\n " +
2358+ "share streamTable( array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as intput1;\n " +
2359+ "try{\n dropStreamEngine(`serInput)\n }catch(ex){\n }\n " +
2360+ "inputSerializer = streamEventSerializer(name=`serInput, eventSchema=schemaTable, outputTable=intput1);" ;
2361+ conn .run (script1 );
2362+ EventSchema scheme = new EventSchema ();
2363+ scheme .setEventType ("event_string" );
2364+ scheme .setFieldNames (Arrays .asList ("stringv" ));
2365+ scheme .setFieldTypes (Arrays .asList ( DT_STRING ));
2366+ scheme .setFieldForms (Arrays .asList ( DF_VECTOR ));
2367+ scheme .setFieldExtraParams (Arrays .asList ( 2 ));
2368+
2369+ List <EventSchema > eventSchemes = Collections .singletonList (scheme );
2370+ List <String > eventTimeKeys = new ArrayList <>();
2371+ List <String > commonKeys = new ArrayList <>();
2372+ EventSender sender = new EventSender (conn , "inputTable" ,eventSchemes , eventTimeKeys , commonKeys );
2373+ EventClient client = new EventClient (eventSchemes , eventTimeKeys , commonKeys );
2374+ client .subscribe (HOST , PORT , "intput1" , "test1" , handler_string , -1 , true , "admin" , "123456" );
2375+
2376+ String script2 = "\t event_string1=event_string( [\" 111\" ,\" 222\" ,\" \" ,NULL])\n " +
2377+ "\t appendEvent(inputSerializer, event_string1)\n " ;
2378+ conn .run (script2 );
2379+ List <Entity > attributes = new ArrayList <>();
2380+ attributes .add (new BasicStringVector (new String []{"111" ,"222" ,"" ,"" }));
2381+ sender .sendEvent ("event_string" ,attributes );
2382+ BasicTable bt1 = (BasicTable )conn .run ("select * from inputTable;" );
2383+ Assert .assertEquals (1 ,bt1 .rows ());
2384+ Thread .sleep (2000 );
2385+ BasicTable bt2 = (BasicTable )conn .run ("select * from intput1;" );
2386+ Assert .assertEquals (1 ,bt2 .rows ());
2387+ checkData (bt1 ,bt2 );
2388+ }
2389+ @ Test
2390+ public void test_EventClient_vector_symbol () throws IOException , InterruptedException {
2391+ String script = "share streamTable(1000000:0, `eventType`event, [STRING,BLOB]) as inputTable;\n " +
2392+ "share table(1:0,[\" col1\" ],[STRING]) as outputTable;\n " ;
2393+ conn .run (script );
2394+ String script1 ="class event_symbol{\n " +
2395+ "\t symbolv :: SYMBOL VECTOR\n " +
2396+ " def event_symbol(symbol){\n " +
2397+ "\t symbolv = symbol\n " +
2398+ " \t }\n " +
2399+ "} \n " +
2400+ "schemaTable = table(array(STRING, 0) as eventType, array(STRING, 0) as eventKeys, array(INT[], ) as type, array(INT[], 0) as form)\n " +
2401+ "eventType = 'event_symbol'\n " +
2402+ "eventKeys = 'symbolv';\n " +
2403+ "typeV = [ SYMBOL];\n " +
2404+ "formV = [ VECTOR];\n " +
2405+ "insert into schemaTable values([eventType], [eventKeys], [typeV],[formV]);\n " +
2406+ "share streamTable( array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as intput1;\n " +
2407+ "try{\n dropStreamEngine(`serInput)\n }catch(ex){\n }\n " +
2408+ "inputSerializer = streamEventSerializer(name=`serInput, eventSchema=schemaTable, outputTable=intput1);" ;
2409+ conn .run (script1 );
2410+ EventSchema scheme = new EventSchema ();
2411+ scheme .setEventType ("event_symbol" );
2412+ scheme .setFieldNames (Arrays .asList ("symbolv" ));
2413+ scheme .setFieldTypes (Arrays .asList ( DT_SYMBOL ));
2414+ scheme .setFieldForms (Arrays .asList ( DF_VECTOR ));
2415+ scheme .setFieldExtraParams (Arrays .asList ( 2 ));
2416+
2417+ List <EventSchema > eventSchemes = Collections .singletonList (scheme );
2418+ List <String > eventTimeKeys = new ArrayList <>();
2419+ List <String > commonKeys = new ArrayList <>();
2420+ EventSender sender = new EventSender (conn , "inputTable" ,eventSchemes , eventTimeKeys , commonKeys );
2421+ EventClient client = new EventClient (eventSchemes , eventTimeKeys , commonKeys );
2422+ client .subscribe (HOST , PORT , "intput1" , "test1" , handler_string , -1 , true , "admin" , "123456" );
2423+
2424+ String script2 = "\t event_symbol1=event_symbol( symbol([\" 111\" ,\" 222\" ,\" \" ,NULL]))\n " +
2425+ "\t appendEvent(inputSerializer, event_symbol1)\n " ;
2426+ conn .run (script2 );
2427+ List <Entity > attributes = new ArrayList <>();
2428+ attributes .add (new BasicSymbolVector (Arrays .asList (new String []{"111" , "222" , "" , "" })));
2429+ sender .sendEvent ("event_symbol" ,attributes );
2430+ BasicTable bt1 = (BasicTable )conn .run ("select * from inputTable;" );
2431+ Assert .assertEquals (1 ,bt1 .rows ());
2432+ Thread .sleep (2000 );
2433+ BasicTable bt2 = (BasicTable )conn .run ("select * from intput1;" );
2434+ Assert .assertEquals (1 ,bt2 .rows ());
2435+ checkData (bt1 ,bt2 );
2436+ }
23282437 @ Test
23292438 public void test_EventSender_all_dateType_array () throws IOException {
23302439 EventSchema scheme = new EventSchema ();
0 commit comments