11package org .graylog .plugins .slookup ;
22
3- //import org.graylog2.indexer.results.ScrollResult;
4- import com .google .common .collect .Lists ;
53import org .elasticsearch .action .search .SearchPhaseExecutionException ;
64import org .graylog2 .indexer .results .ResultMessage ;
75import org .graylog2 .indexer .results .SearchResult ;
86import org .graylog2 .indexer .searches .SearchesConfig ;
9- import org .graylog2 .indexer .searches .Searches ;
107import org .graylog2 .indexer .searches .Sorting ;
8+ import org .graylog2 .indexer .searches .Searches ;
119import org .graylog2 .plugin .Message ;
12- import org .graylog2 .plugin .MessageSummary ;
1310import org .graylog2 .plugin .indexer .searches .timeranges .TimeRange ;
1411import org .graylog2 .plugin .indexer .searches .timeranges .RelativeRange ;
1512import org .graylog .plugins .pipelineprocessor .EvaluationContext ;
@@ -32,6 +29,7 @@ public class StreamLookupFunction extends AbstractFunction<String> {
3229 private static final String DST_FIELD_ARG = "dstField" ;
3330 private static final String RTN_FIELD_ARG = "rtnField" ;
3431 private static final String TIMERANGE_ARG = "timeRange" ;
32+ private static final String SORTORDER_ARG = "sortOrder" ;
3533
3634 private String query ;
3735 private String filter ;
@@ -65,6 +63,10 @@ public StreamLookupFunction(Searches searches) {
6563 .string (TIMERANGE_ARG )
6664 .description ("Relative Time Range" )
6765 .build ();
66+ private final ParameterDescriptor <String , String > sortOrderParam = ParameterDescriptor
67+ .string (SORTORDER_ARG )
68+ .description ("Sorting Order - asc or desc" )
69+ .build ();
6870
6971 @ Override
7072 public Object preComputeConstantArgument (FunctionArgs functionArgs , String s , Expression expression ) {
@@ -78,31 +80,43 @@ public String evaluate(FunctionArgs functionArgs, EvaluationContext evaluationCo
7880 String dstField = dstFieldParam .required (functionArgs , evaluationContext );
7981 String rtnField = rtnFieldParam .required (functionArgs , evaluationContext );
8082 Integer timeRange = Integer .parseInt (timeRangeParam .required (functionArgs , evaluationContext ));
83+ String sortField = sortOrderParam .required (functionArgs , evaluationContext );
8184
8285 List <String > fields = new ArrayList <>();
8386
8487 fields .add (rtnField );
8588
8689 this .timeRange = RelativeRange .builder ().type ("relative" ).range (timeRange ).build ();
8790
88- // Trying to build a query string here.
8991 this .query = dstField + ":" + evaluationContext .currentMessage ().getField (srcField ).toString ();
9092
9193 this .filter = "streams:" + stream ;
9294
95+ if (sortField .equals ("asc" )) {
96+ this .sortType = new Sorting ("timestamp" , Sorting .Direction .ASC );
97+ LOG .debug ("This sortType - field: {}, order: {}" , this .sortType .getField ().toString (), this .sortType .asElastic ().toString ());
98+ }
99+ else
100+ {
101+ this .sortType = new Sorting ("timestamp" , Sorting .Direction .DESC );
102+ LOG .debug ("This sortType - field: {}, order: {}" , this .sortType .getField ().toString (), this .sortType .asElastic ().toString ());
103+ }
104+
93105 final SearchesConfig searchesConfig = SearchesConfig .builder ()
94106 .query (this .query )
95107 .filter (this .filter )
96108 .fields (fields )
97109 .range (this .timeRange )
110+ .sorting (this .sortType )
98111 .limit (1 )
99112 .offset (0 )
100113 .build ();
101114
102115 try {
103116 SearchResult response = this .searches .search (searchesConfig );
117+ LOG .debug ("Search config - field: {}, order: {}" , searchesConfig .sorting ().getField ().toString (), searchesConfig .sorting ().asElastic ().toString ());
104118 if (response .getResults ().size () == 0 ) {
105- LOG .debug ("No Search Results observed." );
119+ LOG .info ("No Search Results observed." );
106120 return "" ;
107121 }
108122 else
@@ -130,7 +144,7 @@ public FunctionDescriptor<String> descriptor() {
130144 return FunctionDescriptor .<String >builder ()
131145 .name (NAME )
132146 .description ("Conduct a lookup in a remote stream and return a field value based on a matching source field. Similar to VLOOKUP in Excel" )
133- .params (of (streamParam , srcFieldParam , dstFieldParam , rtnFieldParam , timeRangeParam ))
147+ .params (of (streamParam , srcFieldParam , dstFieldParam , rtnFieldParam , timeRangeParam , sortOrderParam ))
134148 .returnType (String .class )
135149 .build ();
136150 }
0 commit comments