2828import org .graylog .events .fields .FieldValueType ;
2929import org .graylog .events .fields .providers .AbstractFieldValueProvider ;
3030import org .graylog .events .fields .providers .FieldValueProvider ;
31+ import org .graylog .events .search .EventsSearchFilter ;
32+ import org .graylog .events .search .EventsSearchParameters ;
33+ import org .graylog .events .search .EventsSearchResult ;
34+ import org .graylog .events .search .EventsSearchService ;
35+ import org .graylog2 .plugin .Message ;
36+ import org .graylog2 .plugin .indexer .searches .timeranges .RelativeRange ;
3137import org .slf4j .Logger ;
3238import org .slf4j .LoggerFactory ;
3339
40+ import java .util .Collections ;
41+ import java .util .Optional ;
3442import java .util .UUID ;
3543
3644public class AggregationFieldValueProvider extends AbstractFieldValueProvider {
@@ -42,18 +50,64 @@ public interface Factory extends AbstractFieldValueProvider.Factory<AggregationF
4250 private static final Logger LOG = LoggerFactory .getLogger (AggregationFieldValueProvider .class );
4351
4452 private final AggregationFieldValueProvider .Config config ;
53+ private final EventsSearchService searchService ;
4554
4655 @ Inject
47- public AggregationFieldValueProvider (@ Assisted FieldValueProvider .Config config ) {
56+ public AggregationFieldValueProvider (@ Assisted FieldValueProvider .Config config , EventsSearchService searchService ) {
4857 super (config );
4958 this .config = (AggregationFieldValueProvider .Config ) config ;
59+ this .searchService = searchService ;
5060 }
5161
5262 @ Override
5363 protected FieldValue doGet (String fieldName , EventWithContext eventWithContext ) {
54- return FieldValue .create (FieldValueType .STRING , UUID .randomUUID ().toString ());
64+ LOG .debug ("Start Compute field {}" , fieldName );
65+
66+ // Check if alert already exist in time range
67+ if (this .config .aggregationTimeRange () > 0 ) {
68+ LOG .debug ("Aggregation Time Range is defined" );
69+
70+ String eventDefinitionId = eventWithContext .event ().getEventDefinitionId ();
71+ int timeRange = this .config .aggregationTimeRange () * 60 ;
72+
73+ EventsSearchFilter searchFilter = EventsSearchFilter .builder ()
74+ .alerts (EventsSearchFilter .Alerts .ONLY )
75+ .eventDefinitions (Collections .singleton (eventDefinitionId ))
76+ .build ();
77+
78+ EventsSearchParameters request = EventsSearchParameters .builder ()
79+ .filter (searchFilter )
80+ .timerange (RelativeRange .create (timeRange ))
81+ .page (1 )
82+ // Do not use higher perPage value cause request failed
83+ .perPage (5000 )
84+ .query ("" )
85+ .sortBy (Message .FIELD_TIMESTAMP )
86+ .sortDirection (EventsSearchParameters .SortDirection .DESC )
87+ .build ();
88+
89+ EventsSearchResult result = this .searchService .search (request , new EmptySubject ());
90+
91+ if (result .totalEvents () > 0 ) {
92+ LOG .debug ("Found {} events for aggregation" , result .totalEvents ());
93+
94+ Optional <EventsSearchResult .Event > existingEvent = result .events ().stream ().filter (x -> x .event ().groupByFields ().equals (eventWithContext .event ().getGroupByFields ())).findFirst ();
95+
96+ if (existingEvent .isPresent ()) {
97+ String existingId = existingEvent .get ().event ().fields ().getOrDefault (fieldName , UUID .randomUUID ().toString ());
98+ LOG .debug ("Find existing Event with aggregation Id {}" , existingId );
99+
100+ return FieldValue .create (FieldValueType .STRING , existingId );
101+ }
102+ }
103+ }
104+
105+ return getRandomFieldValue ();
55106 }
56107
108+ public FieldValue getRandomFieldValue () {
109+ return FieldValue .create (FieldValueType .STRING , UUID .randomUUID ().toString ());
110+ }
57111
58112 @ AutoValue
59113 @ JsonTypeName (AggregationFieldValueProvider .Config .TYPE_NAME )
@@ -64,7 +118,7 @@ public static abstract class Config implements AbstractFieldValueProvider.Config
64118 private static final String FIELD_AGGREGATION_TIME_RANGE = "aggregation_time_range" ;
65119
66120 @ JsonProperty (FIELD_AGGREGATION_TIME_RANGE )
67- public abstract Long aggregationTimeRange ();
121+ public abstract Integer aggregationTimeRange ();
68122
69123 public static AggregationFieldValueProvider .Config .Builder builder () {
70124 return AggregationFieldValueProvider .Config .Builder .create ();
@@ -80,7 +134,7 @@ public static AggregationFieldValueProvider.Config.Builder create() {
80134 }
81135
82136 @ JsonProperty (FIELD_AGGREGATION_TIME_RANGE )
83- public abstract AggregationFieldValueProvider .Config .Builder aggregationTimeRange (Long aggregationTimeRange );
137+ public abstract AggregationFieldValueProvider .Config .Builder aggregationTimeRange (Integer aggregationTimeRange );
84138
85139 public abstract AggregationFieldValueProvider .Config build ();
86140 }
0 commit comments