Skip to content

Commit 995baad

Browse files
committed
handle null values
1 parent e905a79 commit 995baad

File tree

3 files changed

+87
-9
lines changed

3 files changed

+87
-9
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ChangePointOperator.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public String describe() {
4848
private final List<Page> outputPages;
4949
private boolean finished;
5050
private int outputPageIndex;
51+
private Warnings warnings;
5152

5253
public ChangePointOperator(DriverContext driverContext, int inputChannel, String sourceText, int sourceLine, int sourceColumn) {
5354
this.driverContext = driverContext;
@@ -59,6 +60,7 @@ public ChangePointOperator(DriverContext driverContext, int inputChannel, String
5960
finished = false;
6061
inputPages = new ArrayList<>();
6162
outputPages = new ArrayList<>();
63+
warnings = null;
6264
}
6365

6466
@Override
@@ -105,23 +107,30 @@ private void createOutputPages() {
105107
// TODO: account for this memory?
106108
double[] values = new double[valuesCount];
107109
int valuesIndex = 0;
110+
boolean hasNulls = false;
108111
for (Page inputPage : inputPages) {
109112
Block inputBlock = inputPage.getBlock(inputChannel);
110113
for (int i = 0; i < inputBlock.getPositionCount(); i++) {
111-
// TODO: nulls
112-
values[valuesIndex++] = ((Number) BlockUtils.toJavaObject(inputBlock, i)).doubleValue();
114+
Object value = BlockUtils.toJavaObject(inputBlock, i);
115+
if (value == null) {
116+
hasNulls = true;
117+
values[valuesIndex++] = 0;
118+
} else {
119+
values[valuesIndex++] = ((Number) value).doubleValue();
120+
}
113121
}
114122
}
115123

116124
ChangeType changeType = ChangePointDetector.getChangeType(new MlAggsHelper.DoubleBucketValues(null, values));
117125
int changePointIndex = changeType.changePoint();
118126

119127
if (changeType instanceof ChangeType.Indeterminable indeterminable) {
120-
Warnings.createWarnings(driverContext.warningsMode(), sourceLine, sourceColumn, sourceText)
121-
.registerException(new IllegalArgumentException(indeterminable.getReason()));
128+
warnings(false).registerException(new IllegalArgumentException(indeterminable.getReason()));
129+
}
130+
if (hasNulls) {
131+
warnings(true).registerException(new IllegalArgumentException("values contain nulls; treating them as zeroes"));
122132
}
123133

124-
// TODO: throw error when indeterminable, due to not enough data
125134
BlockFactory blockFactory = driverContext.blockFactory();
126135
int pageStartIndex = 0;
127136
for (Page inputPage : inputPages) {
@@ -159,4 +168,15 @@ private void createOutputPages() {
159168

160169
@Override
161170
public void close() {}
171+
172+
private Warnings warnings(boolean onlyWarnings) {
173+
if (warnings == null) {
174+
if (onlyWarnings) {
175+
this.warnings = Warnings.createOnlyWarnings(driverContext.warningsMode(), sourceLine, sourceColumn, sourceText);
176+
} else {
177+
this.warnings = Warnings.createWarnings(driverContext.warningsMode(), sourceLine, sourceColumn, sourceText);
178+
}
179+
}
180+
return warnings;
181+
}
162182
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Warnings.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public void registerException(Exception exception) {
3232
* @return A warnings collector object
3333
*/
3434
public static Warnings createWarnings(DriverContext.WarningsMode warningsMode, int lineNumber, int columnNumber, String sourceText) {
35-
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "treating result as null");
35+
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "evaluation of [{}] failed, treating result as null");
3636
}
3737

3838
/**
@@ -50,7 +50,25 @@ public static Warnings createWarningsTreatedAsFalse(
5050
int columnNumber,
5151
String sourceText
5252
) {
53-
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "treating result as false");
53+
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "evaluation of [{}] failed, treating result as false");
54+
}
55+
56+
/**
57+
* Create a new warnings object based on the given mode which warns that
58+
* evaluation resulted in warnings.
59+
* @param warningsMode The warnings collection strategy to use
60+
* @param lineNumber The line number of the source text. Same as `source.getLineNumber()`
61+
* @param columnNumber The column number of the source text. Same as `source.getColumnNumber()`
62+
* @param sourceText The source text that caused the warning. Same as `source.text()`
63+
* @return A warnings collector object
64+
*/
65+
public static Warnings createOnlyWarnings(
66+
DriverContext.WarningsMode warningsMode,
67+
int lineNumber,
68+
int columnNumber,
69+
String sourceText
70+
) {
71+
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "warnings during evaluation of [{}]");
5472
}
5573

5674
private static Warnings createWarnings(
@@ -80,10 +98,9 @@ private Warnings(int lineNumber, int columnNumber, String sourceText, String fir
8098
this.location = format("Line {}:{}: ", lineNumber, columnNumber);
8199
this.first = format(
82100
null,
83-
"{}evaluation of [{}] failed, {}. Only first {} failures recorded.",
101+
"{}" + first + ". Only first {} failures recorded.",
84102
location,
85103
sourceText,
86-
first,
87104
MAX_ADDED_WARNINGS
88105
);
89106
}

x-pack/plugin/esql/qa/testFixtures/src/main/resources/change_point.csv-spec

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,3 +224,44 @@ count:long | @timestamp:datetime | type:text | pvalue:double
224224
93 | 2024-05-10T00:10:00.000Z | null | null
225225
23 | 2024-05-10T00:20:00.000Z | null | null
226226
;
227+
228+
229+
null values
230+
required_capability: change_point
231+
232+
FROM k8s
233+
| STATS count=COUNT() BY @timestamp=BUCKET(@timestamp, 1 MINUTE)
234+
| EVAL count=count+CASE(@timestamp>="2024-05-10T00:11:00.000Z", 100, 0)
235+
| EVAL count=CASE(@timestamp=="2024-05-10T00:04:00.000Z", NULL, count)
236+
| EVAL count=CASE(@timestamp=="2024-05-10T00:08:00.000Z", NULL, count)
237+
| CHANGE_POINT count ON @timestamp AS type, pvalue
238+
;
239+
240+
warning:Line 6:3: warnings during evaluation of [CHANGE_POINT count ON @timestamp AS type, pvalue]. Only first 20 failures recorded.
241+
warning:Line 6:3: java.lang.IllegalArgumentException: values contain nulls; treating them as zeroes
242+
243+
@timestamp:datetime | count:long | type:text | pvalue:double
244+
2024-05-10T00:00:00.000Z | 4 | null | null
245+
2024-05-10T00:01:00.000Z | 4 | null | null
246+
2024-05-10T00:02:00.000Z | 8 | null | null
247+
2024-05-10T00:03:00.000Z | 8 | null | null
248+
2024-05-10T00:04:00.000Z | null | null | null
249+
2024-05-10T00:05:00.000Z | 8 | null | null
250+
2024-05-10T00:06:00.000Z | 10 | null | null
251+
2024-05-10T00:07:00.000Z | 5 | null | null
252+
2024-05-10T00:08:00.000Z | null | null | null
253+
2024-05-10T00:09:00.000Z | 20 | null | null
254+
2024-05-10T00:10:00.000Z | 5 | null | null
255+
2024-05-10T00:11:00.000Z | 107 | step_change | 2.1947350086494926E-22
256+
2024-05-10T00:12:00.000Z | 108 | null | null
257+
2024-05-10T00:13:00.000Z | 109 | null | null
258+
2024-05-10T00:14:00.000Z | 109 | null | null
259+
2024-05-10T00:15:00.000Z | 111 | null | null
260+
2024-05-10T00:16:00.000Z | 107 | null | null
261+
2024-05-10T00:17:00.000Z | 115 | null | null
262+
2024-05-10T00:18:00.000Z | 117 | null | null
263+
2024-05-10T00:19:00.000Z | 105 | null | null
264+
2024-05-10T00:20:00.000Z | 110 | null | null
265+
2024-05-10T00:21:00.000Z | 104 | null | null
266+
2024-05-10T00:22:00.000Z | 109 | null | null
267+
;

0 commit comments

Comments
 (0)