|
25 | 25 | import org.joda.time.format.DateTimeFormat; |
26 | 26 | import org.joda.time.format.DateTimeFormatter; |
27 | 27 |
|
| 28 | +import java.lang.reflect.Method; |
28 | 29 | import java.util.ArrayList; |
29 | 30 | import java.util.List; |
30 | 31 | import java.util.Map; |
@@ -60,33 +61,85 @@ private String buildSearchQuery(Optional<EventDefinitionDto> eventDefinitionOpt, |
60 | 61 | if (eventDefinitionOpt.isPresent()) { |
61 | 62 | EventDefinitionDto eventDefinition = eventDefinitionOpt.get(); |
62 | 63 | EventProcessorConfig config = eventDefinition.config(); |
| 64 | + String configType = getEventProcessorConfigType(config); |
63 | 65 |
|
64 | | - if (config instanceof AggregationEventProcessorConfig) { |
65 | | - AggregationEventProcessorConfig aggregationConfig = (AggregationEventProcessorConfig) config; |
66 | | - List<String> filters = new ArrayList<>(); |
| 66 | + List<String> filters = new ArrayList<>(); |
67 | 67 |
|
68 | | - String searchQuery = aggregationConfig.query(); |
69 | | - if (searchQuery != null && !searchQuery.isEmpty() && !searchQuery.equals("*")) { |
70 | | - filters.add(searchQuery); |
71 | | - } |
| 68 | + if (configType.equals(AggregationEventProcessorConfig.TYPE_NAME)) { |
| 69 | + filters.addAll(getFiltersFromAggregation((AggregationEventProcessorConfig) config)); |
| 70 | + } else if (configType.equals("correlation-count")) { |
| 71 | + filters.addAll(getFiltersFromCorrelationCount(config)); |
| 72 | + } |
72 | 73 |
|
73 | | - // Add groupByFields in filters (separate empty value) |
74 | | - groupByFields.entrySet().stream().filter(MessagesURLBuilder::emptyValue) |
75 | | - .map(entry -> "NOT _exists_: " + entry.getKey()).forEach(filters::add); |
76 | | - groupByFields.entrySet().stream().filter(MessagesURLBuilder::notEmptyValue) |
77 | | - .map( entry -> entry.getKey() + ": " + entry.getValue()).forEach(filters::add); |
| 74 | + // Add groupByFields in filters (separate empty value) |
| 75 | + groupByFields.entrySet().stream().filter(MessagesURLBuilder::emptyValue) |
| 76 | + .map(entry -> "NOT _exists_: " + entry.getKey()).forEach(filters::add); |
| 77 | + groupByFields.entrySet().stream().filter(MessagesURLBuilder::notEmptyValue) |
| 78 | + .map( entry -> entry.getKey() + ": " + entry.getValue()).forEach(filters::add); |
78 | 79 |
|
79 | | - Optional<String> filterResult = filters.stream().reduce((x, y) -> "(" + x + ") AND (" + y + ")"); |
| 80 | + // Build query |
| 81 | + Optional<String> filterResult = filters.stream().reduce((x, y) -> "(" + x + ") AND (" + y + ")"); |
80 | 82 |
|
81 | | - if (filterResult.isPresent()) { |
82 | | - return MSGS_URL_QUERY + filterResult.get(); |
83 | | - } |
| 83 | + if (filterResult.isPresent()) { |
| 84 | + return MSGS_URL_QUERY + filterResult.get(); |
84 | 85 | } |
85 | 86 | } |
86 | 87 |
|
87 | 88 | return ""; |
88 | 89 | } |
89 | 90 |
|
| 91 | + /** |
| 92 | + * Get type and avoid Exception for FallbackConfig |
| 93 | + */ |
| 94 | + private String getEventProcessorConfigType(EventProcessorConfig config) { |
| 95 | + try { |
| 96 | + return config.type(); |
| 97 | + } catch (UnsupportedOperationException e) { |
| 98 | + return ""; |
| 99 | + } |
| 100 | + } |
| 101 | + |
| 102 | + private List<String> getFiltersFromAggregation(AggregationEventProcessorConfig aggregationConfig) { |
| 103 | + List<String> filters = new ArrayList<>(); |
| 104 | + |
| 105 | + String searchQuery = aggregationConfig.query(); |
| 106 | + if (isValidSearchQuery(searchQuery)) { |
| 107 | + filters.add(searchQuery); |
| 108 | + } |
| 109 | + |
| 110 | + return filters; |
| 111 | + } |
| 112 | + |
| 113 | + /** |
| 114 | + * Use Reflexion for CorrelationCountProcessorConfig to avoid dependency with graylog-plugin-correlation-count |
| 115 | + */ |
| 116 | + private List<String> getFiltersFromCorrelationCount(EventProcessorConfig config) { |
| 117 | + try { |
| 118 | + List<String> filters = new ArrayList<>(); |
| 119 | + Class<?> correlationCountClass = config.getClass().getSuperclass(); |
| 120 | + Method methodSearchQuery = correlationCountClass.getMethod("searchQuery"); |
| 121 | + String searchQuery = (String) methodSearchQuery.invoke(config); |
| 122 | + if (isValidSearchQuery(searchQuery)) { |
| 123 | + filters.add(searchQuery); |
| 124 | + } |
| 125 | + |
| 126 | + Method additionalSearchQueryMethod = correlationCountClass.getMethod("additionalSearchQuery"); |
| 127 | + String additionalSearchQuery = (String) additionalSearchQueryMethod.invoke(config); |
| 128 | + if (isValidSearchQuery(additionalSearchQuery)) { |
| 129 | + filters.add(additionalSearchQuery); |
| 130 | + } |
| 131 | + |
| 132 | + return filters; |
| 133 | + } catch (Exception e) { |
| 134 | + // Keep Exception to be noticed if class signature changed |
| 135 | + throw new RuntimeException(e); |
| 136 | + } |
| 137 | + } |
| 138 | + |
| 139 | + private boolean isValidSearchQuery(String searchQuery) { |
| 140 | + return searchQuery != null && !searchQuery.isEmpty() && !searchQuery.equals("*"); |
| 141 | + } |
| 142 | + |
90 | 143 | private static boolean emptyValue(Map.Entry<String, String> entry) { |
91 | 144 | return EMPTY_VALUE.equals(entry.getValue()); |
92 | 145 | } |
|
0 commit comments