Skip to content

Commit 701ffbb

Browse files
authored
[ES|QL] Return ConstNullBlock in FromAggMetricDouble (elastic#136773) (elastic#136892)
If we receive a block that contains all null values in FromAggregateMetricDouble, we want to return a new ConstantNullBlock, in order to allow other functions down the line to operate on it down the line like a typical null block, rather than an AggregateMetricDouble
1 parent e1d9a15 commit 701ffbb

File tree

3 files changed

+158
-53
lines changed

3 files changed

+158
-53
lines changed

docs/changelog/136773.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 136773
2+
summary: Return `ConstNullBlock` in `FromAggMetricDouble`
3+
area: ES|QL
4+
type: bug
5+
issues: []

x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java

Lines changed: 150 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -228,55 +228,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
228228
};
229229
bulkIndex(dataStreamName, sourceSupplier, 100);
230230

231-
// Rollover to ensure the index we will downsample is not the write index
232-
assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)));
233-
List<String> backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2);
234-
String sourceIndex = backingIndices.get(0);
235-
String secondIndex = backingIndices.get(1);
236-
String interval = "5m";
237-
String targetIndex = "downsample-" + interval + "-" + sourceIndex;
238-
// Set the source index to read-only state
239-
assertAcked(
240-
indicesAdmin().prepareUpdateSettings(sourceIndex)
241-
.setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build())
242-
);
243-
244-
DownsampleConfig downsampleConfig = new DownsampleConfig(new DateHistogramInterval(interval));
245-
assertAcked(
246-
client().execute(
247-
DownsampleAction.INSTANCE,
248-
new DownsampleAction.Request(TEST_REQUEST_TIMEOUT, sourceIndex, targetIndex, TIMEOUT, downsampleConfig)
249-
)
250-
);
251-
252-
// Wait for downsampling to complete
253-
SubscribableListener<Void> listener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
254-
final var indexMetadata = clusterState.metadata().getProject().index(targetIndex);
255-
if (indexMetadata == null) {
256-
return false;
257-
}
258-
var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings());
259-
return downsampleStatus == IndexMetadata.DownsampleTaskStatus.SUCCESS;
260-
});
261-
safeAwait(listener);
262-
263-
assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig);
264-
265-
// remove old backing index and replace with downsampled index and delete old so old is not queried
266-
assertAcked(
267-
client().execute(
268-
ModifyDataStreamsAction.INSTANCE,
269-
new ModifyDataStreamsAction.Request(
270-
TEST_REQUEST_TIMEOUT,
271-
TEST_REQUEST_TIMEOUT,
272-
List.of(
273-
DataStreamAction.removeBackingIndex(dataStreamName, sourceIndex),
274-
DataStreamAction.addBackingIndex(dataStreamName, targetIndex)
275-
)
276-
)
277-
).actionGet()
278-
);
279-
assertAcked(client().execute(TransportDeleteIndexAction.TYPE, new DeleteIndexRequest(sourceIndex)).actionGet());
231+
String secondBackingIndex = rolloverAndDownsample(dataStreamName, 0, "5m");
280232

281233
// index to the next backing index; random time between 31 and 59m in the future to because default look_ahead_time is 30m and we
282234
// don't want to conflict with the previous backing index
@@ -323,7 +275,99 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
323275
)
324276
);
325277
}
278+
testEsqlMetrics(dataStreamName, secondBackingIndex);
279+
}
280+
281+
public void testPartialNullMetricsAfterDownsampling() throws Exception {
282+
String dataStreamName = "metrics-foo";
283+
Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host", "cluster")).build();
284+
putTSDBIndexTemplate("my-template", List.of("metrics-foo"), settings, """
285+
{
286+
"properties": {
287+
"host": {
288+
"type": "keyword",
289+
"time_series_dimension": true
290+
},
291+
"cluster" : {
292+
"type": "keyword",
293+
"time_series_dimension": true
294+
},
295+
"cpu": {
296+
"type": "double",
297+
"time_series_metric": "gauge"
298+
},
299+
"request": {
300+
"type": "double",
301+
"time_series_metric": "counter"
302+
}
303+
}
304+
}
305+
""", null, null);
306+
307+
// Create data stream by indexing documents with no values in numerics
308+
final Instant now = Instant.now();
309+
Supplier<XContentBuilder> sourceSupplier = () -> {
310+
String ts = randomDateForRange(now.minusSeconds(60 * 60).toEpochMilli(), now.minusSeconds(60 * 15).toEpochMilli());
311+
try {
312+
return XContentFactory.jsonBuilder()
313+
.startObject()
314+
.field("@timestamp", ts)
315+
.field("host", randomFrom("host1", "host2", "host3"))
316+
.field("cluster", randomFrom("cluster1", "cluster2", "cluster3"))
317+
.endObject();
318+
} catch (IOException e) {
319+
throw new RuntimeException(e);
320+
}
321+
};
322+
bulkIndex(dataStreamName, sourceSupplier, 100);
323+
// And index documents with values
324+
sourceSupplier = () -> {
325+
String ts = randomDateForRange(now.minusSeconds(60 * 14).toEpochMilli(), now.plusSeconds(60 * 30).toEpochMilli());
326+
try {
327+
return XContentFactory.jsonBuilder()
328+
.startObject()
329+
.field("@timestamp", ts)
330+
.field("host", randomFrom("host1", "host2", "host3"))
331+
.field("cluster", randomFrom("cluster1", "cluster2", "cluster3"))
332+
.field("cpu", randomDouble())
333+
.field("request", randomDoubleBetween(0, 100, true))
334+
.endObject();
335+
} catch (IOException e) {
336+
throw new RuntimeException(e);
337+
}
338+
};
339+
bulkIndex(dataStreamName, sourceSupplier, 100);
340+
String secondBackingIndex = rolloverAndDownsample(dataStreamName, 0, "5m");
326341

342+
Supplier<XContentBuilder> nextSourceSupplier = () -> {
343+
String ts = randomDateForRange(now.plusSeconds(60 * 31).toEpochMilli(), now.plusSeconds(60 * 59).toEpochMilli());
344+
try {
345+
return XContentFactory.jsonBuilder()
346+
.startObject()
347+
.field("@timestamp", ts)
348+
.field("host", randomFrom("host1", "host2", "host3"))
349+
.field("cluster", randomFrom("cluster1", "cluster2", "cluster3"))
350+
.field("cpu", randomDouble())
351+
.field("request", randomDoubleBetween(0, 100, true))
352+
.endObject();
353+
} catch (IOException e) {
354+
throw new RuntimeException(e);
355+
}
356+
};
357+
bulkIndex(dataStreamName, nextSourceSupplier, 100);
358+
359+
// check that aggregate metric double is available
360+
var response = clusterAdmin().nodesCapabilities(
361+
new NodesCapabilitiesRequest().method(RestRequest.Method.POST)
362+
.path("/_query")
363+
.capabilities(AGGREGATE_METRIC_DOUBLE_V0.capabilityName())
364+
).actionGet();
365+
assumeTrue("Require aggregate_metric_double casting", response.isSupported().orElse(Boolean.FALSE));
366+
367+
testEsqlMetrics(dataStreamName, secondBackingIndex);
368+
}
369+
370+
private void testEsqlMetrics(String dataStreamName, String nonDownsampledIndex) throws Exception {
327371
// test _over_time commands with implicit casting of aggregate_metric_double
328372
for (String outerCommand : List.of("min", "max", "sum", "count")) {
329373
String expectedType = outerCommand.equals("count") ? "long" : "double";
@@ -350,7 +394,9 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
350394
// TODO: add to counter tests below when support for counters is added
351395
for (String innerCommand : List.of("first_over_time", "last_over_time")) {
352396
String command = outerCommand + " (" + innerCommand + "(cpu))";
353-
try (var resp = esqlCommand("TS " + secondIndex + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)")) {
397+
try (
398+
var resp = esqlCommand("TS " + nonDownsampledIndex + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)")
399+
) {
354400
var columns = resp.columns();
355401
assertThat(columns, hasSize(3));
356402
assertThat(
@@ -393,6 +439,60 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
393439
}
394440
}
395441

442+
private String rolloverAndDownsample(String dataStreamName, int timesDownsampledAlready, String interval) throws Exception {
443+
// returns the name of the new backing index
444+
// Rollover to ensure the index we will downsample is not the write index
445+
assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)));
446+
List<String> backingIndices = waitForDataStreamBackingIndices(dataStreamName, timesDownsampledAlready + 2);
447+
String sourceIndex = backingIndices.get(timesDownsampledAlready);
448+
String secondIndex = backingIndices.get(timesDownsampledAlready + 1);
449+
String targetIndex = "downsample-" + interval + "-" + sourceIndex;
450+
// Set the source index to read-only state
451+
assertAcked(
452+
indicesAdmin().prepareUpdateSettings(sourceIndex)
453+
.setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build())
454+
);
455+
456+
DownsampleConfig downsampleConfig = new DownsampleConfig(new DateHistogramInterval(interval));
457+
assertAcked(
458+
client().execute(
459+
DownsampleAction.INSTANCE,
460+
new DownsampleAction.Request(TEST_REQUEST_TIMEOUT, sourceIndex, targetIndex, TIMEOUT, downsampleConfig)
461+
)
462+
);
463+
464+
// Wait for downsampling to complete
465+
SubscribableListener<Void> listener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
466+
final var indexMetadata = clusterState.metadata().getProject().index(targetIndex);
467+
if (indexMetadata == null) {
468+
return false;
469+
}
470+
var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings());
471+
return downsampleStatus == IndexMetadata.DownsampleTaskStatus.SUCCESS;
472+
});
473+
safeAwait(listener);
474+
475+
assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig);
476+
477+
// remove old backing index and replace with downsampled index and delete old so old is not queried
478+
assertAcked(
479+
client().execute(
480+
ModifyDataStreamsAction.INSTANCE,
481+
new ModifyDataStreamsAction.Request(
482+
TEST_REQUEST_TIMEOUT,
483+
TEST_REQUEST_TIMEOUT,
484+
List.of(
485+
DataStreamAction.removeBackingIndex(dataStreamName, sourceIndex),
486+
DataStreamAction.addBackingIndex(dataStreamName, targetIndex)
487+
)
488+
)
489+
).actionGet()
490+
);
491+
assertAcked(client().execute(TransportDeleteIndexAction.TYPE, new DeleteIndexRequest(sourceIndex)).actionGet());
492+
493+
return secondIndex;
494+
}
495+
396496
private EsqlQueryResponse esqlCommand(String command) throws IOException {
397497
return client().execute(EsqlQueryAction.INSTANCE, new EsqlQueryRequest().query(command)).actionGet(30, TimeUnit.SECONDS);
398498
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/FromAggregateMetricDouble.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,10 @@ private record Evaluator(BlockFactory blockFactory, EvalOperator.ExpressionEvalu
155155
@Override
156156
public Block eval(Page page) {
157157
Block block = eval.eval(page);
158-
if (block.areAllValuesNull()) {
159-
return block;
160-
}
161158
try {
159+
if (block.areAllValuesNull()) {
160+
return blockFactory.newConstantNullBlock(block.getPositionCount());
161+
}
162162
Block resultBlock = ((AggregateMetricDoubleBlock) block).getMetricBlock(subFieldIndex);
163163
resultBlock.incRef();
164164
return resultBlock;

0 commit comments

Comments
 (0)