Skip to content

Commit 9cfd293

Browse files
authored
Fix NPE when date_buckets aggregation is missing in the response (#128974)
1 parent 930c5d4 commit 9cfd293

File tree

2 files changed

+19
-3
lines changed

2 files changed

+19
-3
lines changed

docs/changelog/128974.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 128974
2+
summary: Fix NPE when `date_buckets` aggregation is missing in the response
3+
area: Machine Learning
4+
type: bug
5+
issues: []

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
*/
77
package org.elasticsearch.xpack.ml.datafeed.delayeddatacheck;
88

9+
import org.apache.logging.log4j.LogManager;
10+
import org.apache.logging.log4j.Logger;
911
import org.elasticsearch.action.search.SearchRequest;
1012
import org.elasticsearch.action.search.SearchResponse;
1113
import org.elasticsearch.action.search.TransportSearchAction;
@@ -39,6 +41,8 @@
3941
*/
4042
public class DatafeedDelayedDataDetector implements DelayedDataDetector {
4143

44+
private static final Logger logger = LogManager.getLogger(DatafeedDelayedDataDetector.class);
45+
4246
private static final String DATE_BUCKETS = "date_buckets";
4347

4448
private final long bucketSpan;
@@ -134,9 +138,16 @@ private Map<Long, Long> checkCurrentBucketEventCount(long start, long end) {
134138

135139
SearchRequest searchRequest = new SearchRequest(datafeedIndices).source(searchSourceBuilder).indicesOptions(indicesOptions);
136140
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
137-
SearchResponse response = client.execute(TransportSearchAction.TYPE, searchRequest).actionGet();
141+
SearchResponse searchResponse = client.execute(TransportSearchAction.TYPE, searchRequest).actionGet();
138142
try {
139-
List<? extends Histogram.Bucket> buckets = ((Histogram) response.getAggregations().get(DATE_BUCKETS)).getBuckets();
143+
Histogram histogram = searchResponse.getAggregations().get(DATE_BUCKETS);
144+
if (histogram == null) {
145+
// We log search response here to get information about shards and hits which may be helpful while debugging.
146+
// The size of the search response is small as we only log if the "date_buckets" aggregation is missing.
147+
logger.warn("[{}] Delayed data check failed with missing aggregation in search response [{}]", jobId, searchResponse);
148+
return Collections.emptyMap();
149+
}
150+
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
140151
Map<Long, Long> hashMap = Maps.newMapWithExpectedSize(buckets.size());
141152
for (Histogram.Bucket bucket : buckets) {
142153
long bucketTime = toHistogramKeyToEpoch(bucket.getKey());
@@ -147,7 +158,7 @@ private Map<Long, Long> checkCurrentBucketEventCount(long start, long end) {
147158
}
148159
return hashMap;
149160
} finally {
150-
response.decRef();
161+
searchResponse.decRef();
151162
}
152163
}
153164
}

0 commit comments

Comments
 (0)