Skip to content

Commit f1eb1b2

Browse files
committed
ChangePointOperator
1 parent eadffb9 commit f1eb1b2

File tree

4 files changed

+161
-12
lines changed

4 files changed

+161
-12
lines changed
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.compute.data.Block;
12+
import org.elasticsearch.compute.data.BlockFactory;
13+
import org.elasticsearch.compute.data.BytesRefBlock;
14+
import org.elasticsearch.compute.data.DoubleBlock;
15+
import org.elasticsearch.compute.data.LongVector;
16+
import org.elasticsearch.compute.data.Page;
17+
import org.elasticsearch.logging.LogManager;
18+
import org.elasticsearch.logging.Logger;
19+
import org.elasticsearch.xpack.ml.aggs.MlAggsHelper;
20+
import org.elasticsearch.xpack.ml.aggs.changepoint.ChangePointDetector;
21+
import org.elasticsearch.xpack.ml.aggs.changepoint.ChangeType;
22+
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
26+
public class ChangePointOperator implements Operator {
27+
private static final Logger logger = LogManager.getLogger(ChangePointOperator.class);
28+
29+
public record Factory(int inputChannel) implements OperatorFactory {
30+
@Override
31+
public Operator get(DriverContext driverContext) {
32+
return new ChangePointOperator(driverContext.blockFactory(), inputChannel);
33+
}
34+
35+
@Override
36+
public String describe() {
37+
return "ChangePointOperator[input=" + inputChannel + "]";
38+
}
39+
}
40+
41+
private final BlockFactory blockFactory;
42+
private final int inputChannel;
43+
private boolean finished;
44+
private List<Page> inputPages;
45+
private List<Page> outputPages;
46+
private int outputPageIndex;
47+
48+
public ChangePointOperator(BlockFactory blockFactory, int inputChannel) {
49+
this.blockFactory = blockFactory;
50+
this.inputChannel = inputChannel;
51+
52+
finished = false;
53+
inputPages = new ArrayList<>();
54+
outputPages = new ArrayList<>();
55+
}
56+
57+
@Override
58+
public boolean needsInput() {
59+
return finished == false;
60+
}
61+
62+
@Override
63+
public void addInput(Page page) {
64+
inputPages.add(page);
65+
}
66+
67+
@Override
68+
public void finish() {
69+
finished = true;
70+
createOutputPages();
71+
}
72+
73+
@Override
74+
public boolean isFinished() {
75+
return finished && outputPageIndex == inputPages.size();
76+
}
77+
78+
@Override
79+
public Page getOutput() {
80+
if (finished == false) {
81+
return null;
82+
}
83+
if (outputPageIndex == inputPages.size()) {
84+
return null;
85+
}
86+
return outputPages.get(outputPageIndex++);
87+
}
88+
89+
private void createOutputPages() {
90+
int valuesCount = 0;
91+
for (Page page : inputPages) {
92+
valuesCount += page.getPositionCount();
93+
}
94+
95+
double[] values = new double[valuesCount];
96+
int valuesIndex = 0;
97+
for (Page inputPage : inputPages) {
98+
// TODO: other data types
99+
LongVector vector = (LongVector) inputPage.getBlock(inputChannel).asVector();
100+
for (int i = 0; i < vector.getPositionCount(); i++) {
101+
values[valuesIndex++] = (double) vector.getLong(i);
102+
}
103+
}
104+
logger.warn("***ALL DATA*** (#pages={}) {}", inputPages.size(), values);
105+
106+
ChangeType changeType =
107+
ChangePointDetector.getChangeType(new MlAggsHelper.DoubleBucketValues(null, values));
108+
int changePointIndex = changeType.changePoint();
109+
110+
int pageStartIndex = 0;
111+
for (Page inputPage : inputPages) {
112+
Block changeTypeBlock;
113+
Block changePvalueBlock;
114+
115+
if (pageStartIndex <= changePointIndex && changePointIndex < pageStartIndex + inputPage.getPositionCount()) {
116+
BytesRefBlock.Builder changeTypeBlockBuilder = blockFactory.newBytesRefBlockBuilder(inputPage.getPositionCount());
117+
DoubleBlock.Builder pvalueBlockBuilder = blockFactory.newDoubleBlockBuilder(inputPage.getPositionCount());
118+
for (int i = 0; i < inputPage.getPositionCount(); i++) {
119+
if (pageStartIndex + i == changePointIndex) {
120+
changeTypeBlockBuilder.appendBytesRef(new BytesRef(changeType.getWriteableName()));
121+
pvalueBlockBuilder.appendDouble(changeType.pValue());
122+
} else {
123+
changeTypeBlockBuilder.appendNull();
124+
pvalueBlockBuilder.appendNull();
125+
}
126+
}
127+
changeTypeBlock = changeTypeBlockBuilder.build();
128+
changePvalueBlock = pvalueBlockBuilder.build();
129+
} else {
130+
changeTypeBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount());
131+
changePvalueBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount());
132+
}
133+
134+
// TODO: make sure it's the right channel
135+
Page outputPage = inputPage.appendBlocks(new Block[] { changeTypeBlock, changePvalueBlock });
136+
outputPages.add(outputPage);
137+
138+
pageStartIndex += inputPage.getPositionCount();
139+
}
140+
}
141+
142+
@Override
143+
public void close() {
144+
145+
}
146+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -455,8 +455,8 @@ public PlanFactory visitChangePointCommand(EsqlBaseParser.ChangePointCommandCont
455455
Source src = source(ctx);
456456
NamedExpression value = visitQualifiedName(ctx.value);
457457
NamedExpression key = visitQualifiedName(ctx.key);
458-
Attribute targetType = new ReferenceAttribute(src, visitQualifiedName(ctx.targetType).name(), DataType.LONG); // TODO: text
459-
Attribute targetPvalue = new ReferenceAttribute(src, visitQualifiedName(ctx.targetPvalue).name(), DataType.LONG); // TODO: double
458+
Attribute targetType = new ReferenceAttribute(src, visitQualifiedName(ctx.targetType).name(), DataType.TEXT);
459+
Attribute targetPvalue = new ReferenceAttribute(src, visitQualifiedName(ctx.targetPvalue).name(), DataType.DOUBLE);
460460
return child -> new ChangePoint(src, child, value, key, targetType, targetPvalue);
461461
}
462462

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.compute.data.LocalCircuitBreaker;
1818
import org.elasticsearch.compute.data.Page;
1919
import org.elasticsearch.compute.lucene.LuceneOperator;
20+
import org.elasticsearch.compute.operator.ChangePointOperator;
2021
import org.elasticsearch.compute.operator.ColumnExtractOperator;
2122
import org.elasticsearch.compute.operator.ColumnLoadOperator;
2223
import org.elasticsearch.compute.operator.Driver;
@@ -702,17 +703,17 @@ private PhysicalOperation planMvExpand(MvExpandExec mvExpandExec, LocalExecution
702703
}
703704

704705
private PhysicalOperation planChangePoint(ChangePointExec changePoint, LocalExecutionPlannerContext context) {
705-
// note: mostly copied from "planEval"
706706
PhysicalOperation source = plan(changePoint.child(), context);
707-
for (Attribute target : List.of(changePoint.targetType(), changePoint.targetPvalue())) {
708-
// For now: copy/paste "value" into both type and pvalue field.
709-
// TODO: compute actual changepoint
710-
var evaluatorSupplier = EvalMapper.toEvaluator(context.foldCtx(), changePoint.value(), source.layout);
711-
Layout.Builder layout = source.layout.builder();
712-
layout.append(target);
713-
source = source.with(new EvalOperatorFactory(evaluatorSupplier), layout.build());
714-
}
715-
return source;
707+
Layout layout = source.layout.builder()
708+
.append(changePoint.targetType())
709+
.append(changePoint.targetPvalue())
710+
.build();
711+
return source.with(
712+
new ChangePointOperator.Factory(
713+
layout.get(changePoint.value().id()).channel()
714+
),
715+
layout
716+
);
716717
}
717718

718719
/**

x-pack/plugin/ml/src/main/java/module-info.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,6 @@
4141
exports org.elasticsearch.xpack.ml.autoscaling;
4242
exports org.elasticsearch.xpack.ml.job.categorization;
4343
exports org.elasticsearch.xpack.ml.notifications;
44+
exports org.elasticsearch.xpack.ml.aggs.changepoint;
45+
exports org.elasticsearch.xpack.ml.aggs;
4446
}

0 commit comments

Comments
 (0)