1414import org .elasticsearch .compute .data .BytesRefBlock ;
1515import org .elasticsearch .compute .data .DoubleBlock ;
1616import org .elasticsearch .compute .data .Page ;
17+ import org .elasticsearch .core .Releasables ;
1718import org .elasticsearch .xpack .ml .aggs .MlAggsHelper ;
1819import org .elasticsearch .xpack .ml .aggs .changepoint .ChangePointDetector ;
1920import org .elasticsearch .xpack .ml .aggs .changepoint .ChangeType ;
2021
21- import java .util .ArrayList ;
22- import java .util .List ;
22+ import java .util .Deque ;
23+ import java .util .LinkedList ;
2324
2425public class ChangePointOperator implements Operator {
2526
26- // TODO: close upon failure / interrupt
27-
2827 public static final int INPUT_VALUE_COUNT_LIMIT = 1000 ;
2928
30- public record Factory (int inputChannel , String sourceText , int sourceLine , int sourceColumn ) implements OperatorFactory {
29+ public record Factory (int channel , String sourceText , int sourceLine , int sourceColumn ) implements OperatorFactory {
3130 @ Override
3231 public Operator get (DriverContext driverContext ) {
33- return new ChangePointOperator (driverContext , inputChannel , sourceText , sourceLine , sourceColumn );
32+ return new ChangePointOperator (driverContext , channel , sourceText , sourceLine , sourceColumn );
3433 }
3534
3635 @ Override
3736 public String describe () {
38- return "ChangePointOperator[input =" + inputChannel + "]" ;
37+ return "ChangePointOperator[channel =" + channel + "]" ;
3938 }
4039 }
4140
4241 private final DriverContext driverContext ;
43- private final int inputChannel ;
42+ private final int channel ;
4443 private final String sourceText ;
4544 private final int sourceLine ;
4645 private final int sourceColumn ;
4746
48- private final List <Page > inputPages ;
49- private final List <Page > outputPages ;
47+ private final Deque <Page > inputPages ;
48+ private final Deque <Page > outputPages ;
5049 private boolean finished ;
51- private int outputPageIndex ;
5250 private Warnings warnings ;
5351
54- public ChangePointOperator (DriverContext driverContext , int inputChannel , String sourceText , int sourceLine , int sourceColumn ) {
52+ public ChangePointOperator (DriverContext driverContext , int channel , String sourceText , int sourceLine , int sourceColumn ) {
5553 this .driverContext = driverContext ;
56- this .inputChannel = inputChannel ;
54+ this .channel = channel ;
5755 this .sourceText = sourceText ;
5856 this .sourceLine = sourceLine ;
5957 this .sourceColumn = sourceColumn ;
6058
6159 finished = false ;
62- inputPages = new ArrayList <>();
63- outputPages = new ArrayList <>();
60+ inputPages = new LinkedList <>();
61+ outputPages = new LinkedList <>();
6462 warnings = null ;
6563 }
6664
@@ -89,14 +87,10 @@ public boolean isFinished() {
8987
9088 @ Override
9189 public Page getOutput () {
92- if (finished == false ) {
90+ if (finished == false || outputPages . isEmpty () ) {
9391 return null ;
9492 }
95- if (outputPageIndex == outputPages .size ()) {
96- outputPages .clear ();
97- return null ;
98- }
99- return outputPages .get (outputPageIndex ++);
93+ return outputPages .removeFirst ();
10094 }
10195
10296 private void createOutputPages () {
@@ -109,12 +103,11 @@ private void createOutputPages() {
109103 valuesCount = INPUT_VALUE_COUNT_LIMIT ;
110104 }
111105
112- // TODO: account for this memory?
113106 double [] values = new double [valuesCount ];
114107 int valuesIndex = 0 ;
115108 boolean hasNulls = false ;
116109 for (Page inputPage : inputPages ) {
117- Block inputBlock = inputPage .getBlock (inputChannel );
110+ Block inputBlock = inputPage .getBlock (channel );
118111 for (int i = 0 ; i < inputBlock .getPositionCount () && valuesIndex < valuesCount ; i ++) {
119112 Object value = BlockUtils .toJavaObject (inputBlock , i );
120113 if (value == null ) {
@@ -131,38 +124,48 @@ private void createOutputPages() {
131124
132125 BlockFactory blockFactory = driverContext .blockFactory ();
133126 int pageStartIndex = 0 ;
134- for (Page inputPage : inputPages ) {
135- Block changeTypeBlock ;
136- Block changePvalueBlock ;
137- if (pageStartIndex <= changePointIndex && changePointIndex < pageStartIndex + inputPage .getPositionCount ()) {
138- try (
139- BytesRefBlock .Builder changeTypeBlockBuilder = blockFactory .newBytesRefBlockBuilder (inputPage .getPositionCount ());
140- DoubleBlock .Builder pvalueBlockBuilder = blockFactory .newDoubleBlockBuilder (inputPage .getPositionCount ())
141- ) {
142- for (int i = 0 ; i < inputPage .getPositionCount (); i ++) {
143- if (pageStartIndex + i == changePointIndex ) {
144- changeTypeBlockBuilder .appendBytesRef (new BytesRef (changeType .getWriteableName ()));
145- pvalueBlockBuilder .appendDouble (changeType .pValue ());
146- } else {
147- changeTypeBlockBuilder .appendNull ();
148- pvalueBlockBuilder .appendNull ();
127+ while (inputPages .isEmpty () == false ) {
128+ Page inputPage = inputPages .peek ();
129+ Page outputPage ;
130+ Block changeTypeBlock = null ;
131+ Block changePvalueBlock = null ;
132+ boolean success = false ;
133+ try {
134+ if (pageStartIndex <= changePointIndex && changePointIndex < pageStartIndex + inputPage .getPositionCount ()) {
135+ try (
136+ BytesRefBlock .Builder changeTypeBlockBuilder = blockFactory .newBytesRefBlockBuilder (inputPage .getPositionCount ());
137+ DoubleBlock .Builder pvalueBlockBuilder = blockFactory .newDoubleBlockBuilder (inputPage .getPositionCount ())
138+ ) {
139+ for (int i = 0 ; i < inputPage .getPositionCount (); i ++) {
140+ if (pageStartIndex + i == changePointIndex ) {
141+ changeTypeBlockBuilder .appendBytesRef (new BytesRef (changeType .getWriteableName ()));
142+ pvalueBlockBuilder .appendDouble (changeType .pValue ());
143+ } else {
144+ changeTypeBlockBuilder .appendNull ();
145+ pvalueBlockBuilder .appendNull ();
146+ }
149147 }
148+ changeTypeBlock = changeTypeBlockBuilder .build ();
149+ changePvalueBlock = pvalueBlockBuilder .build ();
150150 }
151- changeTypeBlock = changeTypeBlockBuilder .build ();
152- changePvalueBlock = pvalueBlockBuilder .build ();
151+ } else {
152+ changeTypeBlock = blockFactory .newConstantNullBlock (inputPage .getPositionCount ());
153+ changePvalueBlock = blockFactory .newConstantNullBlock (inputPage .getPositionCount ());
154+ }
155+
156+ outputPage = inputPage .appendBlocks (new Block [] { changeTypeBlock , changePvalueBlock });
157+ success = true ;
158+ } finally {
159+ if (success == false ) {
160+ Releasables .closeExpectNoException (changeTypeBlock , changePvalueBlock );
153161 }
154- } else {
155- changeTypeBlock = blockFactory .newConstantNullBlock (inputPage .getPositionCount ());
156- changePvalueBlock = blockFactory .newConstantNullBlock (inputPage .getPositionCount ());
157162 }
158163
159- Page outputPage = inputPage . appendBlocks ( new Block [] { changeTypeBlock , changePvalueBlock } );
164+ inputPages . removeFirst ( );
160165 outputPages .add (outputPage );
161166 pageStartIndex += inputPage .getPositionCount ();
162167 }
163168
164- inputPages .clear ();
165-
166169 if (changeType instanceof ChangeType .Indeterminable indeterminable ) {
167170 warnings (false ).registerException (new IllegalArgumentException (indeterminable .getReason ()));
168171 }
@@ -177,7 +180,19 @@ private void createOutputPages() {
177180 }
178181
179182 @ Override
180- public void close () {}
183+ public void close () {
184+ for (Page page : inputPages ) {
185+ page .releaseBlocks ();
186+ }
187+ for (Page page : outputPages ) {
188+ page .releaseBlocks ();
189+ }
190+ }
191+
192+ @ Override
193+ public String toString () {
194+ return "ChangePointOperator[channel=" + channel + "]" ;
195+ }
181196
182197 private Warnings warnings (boolean onlyWarnings ) {
183198 if (warnings == null ) {
0 commit comments