88package org .elasticsearch .compute .operator .fuse ;
99
1010import org .apache .lucene .util .BytesRef ;
11+ import org .elasticsearch .TransportVersion ;
12+ import org .elasticsearch .common .io .stream .NamedWriteableRegistry ;
13+ import org .elasticsearch .common .io .stream .StreamInput ;
14+ import org .elasticsearch .common .io .stream .StreamOutput ;
1115import org .elasticsearch .compute .data .Block ;
1216import org .elasticsearch .compute .data .BytesRefBlock ;
1317import org .elasticsearch .compute .data .DoubleVector ;
1418import org .elasticsearch .compute .data .DoubleVectorBlock ;
1519import org .elasticsearch .compute .data .Page ;
1620import org .elasticsearch .compute .operator .DriverContext ;
1721import org .elasticsearch .compute .operator .Operator ;
22+ import org .elasticsearch .core .Releasables ;
23+ import org .elasticsearch .core .TimeValue ;
24+ import org .elasticsearch .xcontent .XContentBuilder ;
1825
26+ import java .io .IOException ;
1927import java .util .ArrayDeque ;
2028import java .util .Collection ;
2129import java .util .Deque ;
@@ -60,6 +68,12 @@ public String describe() {
6068 private final Deque <Page > outputPages ;
6169 private boolean finished ;
6270
71+ private long emitNanos ;
72+ private int pagesReceived = 0 ;
73+ private int pagesProcessed = 0 ;
74+ private long rowsReceived = 0 ;
75+ private long rowsEmitted = 0 ;
76+
6377 public LinearScoreEvalOperator (int discriminatorPosition , int scorePosition , LinearConfig config ) {
6478 this .scorePosition = scorePosition ;
6579 this .discriminatorPosition = discriminatorPosition ;
@@ -79,6 +93,8 @@ public boolean needsInput() {
7993 @ Override
8094 public void addInput (Page page ) {
8195 inputPages .add (page );
96+ pagesReceived ++;
97+ rowsReceived += page .getPositionCount ();
8298 }
8399
84100 @ Override
@@ -90,35 +106,58 @@ public void finish() {
90106 }
91107
92108 private void createOutputPages () {
109+ final var emitStart = System .nanoTime ();
93110 normalizer .preprocess (inputPages , scorePosition , discriminatorPosition );
111+ try {
112+ while (inputPages .isEmpty () == false ) {
113+ Page inputPage = inputPages .peek ();
114+ processInputPage (inputPage );
115+ inputPages .removeFirst ();
116+ pagesProcessed += 1 ;
117+ }
118+ } finally {
119+ emitNanos = System .nanoTime () - emitStart ;
120+ Releasables .close (inputPages );
121+ }
122+ }
94123
95- while (inputPages .isEmpty () == false ) {
96- Page inputPage = inputPages .peek ();
124+ private void processInputPage (Page inputPage ) {
125+ BytesRefBlock discriminatorBlock = inputPage .getBlock (discriminatorPosition );
126+ DoubleVectorBlock initialScoreBlock = inputPage .getBlock (scorePosition );
97127
98- BytesRefBlock discriminatorBlock = inputPage .getBlock (discriminatorPosition );
99- DoubleVectorBlock initialScoreBlock = inputPage .getBlock (scorePosition );
128+ Page newPage = null ;
129+ Block scoreBlock = null ;
130+ DoubleVector .Builder scores = null ;
100131
101- DoubleVector .Builder scores = discriminatorBlock .blockFactory ().newDoubleVectorBuilder (discriminatorBlock .getPositionCount ());
132+ try {
133+ scores = discriminatorBlock .blockFactory ().newDoubleVectorBuilder (discriminatorBlock .getPositionCount ());
102134
103135 for (int i = 0 ; i < inputPage .getPositionCount (); i ++) {
104136 String discriminator = discriminatorBlock .getBytesRef (i , new BytesRef ()).utf8ToString ();
105137
106138 var weight = config .weights ().get (discriminator ) == null ? 1.0 : config .weights ().get (discriminator );
107139
108- Double score = initialScoreBlock .getDouble (i );
140+ double score = initialScoreBlock .getDouble (i );
109141 scores .appendDouble (weight * normalizer .normalize (score , discriminator ));
110142 }
111- Block scoreBlock = scores .build ().asBlock ();
112- inputPage = inputPage .appendBlock (scoreBlock );
113143
114- int [] projections = new int [inputPage .getBlockCount () - 1 ];
144+ scoreBlock = scores .build ().asBlock ();
145+ newPage = inputPage .appendBlock (scoreBlock );
146+
147+ int [] projections = new int [newPage .getBlockCount () - 1 ];
115148
116- for (int i = 0 ; i < inputPage .getBlockCount () - 1 ; i ++) {
117- projections [i ] = i == scorePosition ? inputPage .getBlockCount () - 1 : i ;
149+ for (int i = 0 ; i < newPage .getBlockCount () - 1 ; i ++) {
150+ projections [i ] = i == scorePosition ? newPage .getBlockCount () - 1 : i ;
151+ }
152+
153+ outputPages .add (newPage .projectBlocks (projections ));
154+ } finally {
155+ if (newPage != null ) {
156+ newPage .releaseBlocks ();
157+ }
158+ if (scoreBlock == null && scores != null ) {
159+ Releasables .close (scores );
118160 }
119- inputPages .removeFirst ();
120- outputPages .add (inputPage .projectBlocks (projections ));
121- inputPage .releaseBlocks ();
122161 }
123162 }
124163
@@ -132,7 +171,11 @@ public Page getOutput() {
132171 if (finished == false || outputPages .isEmpty ()) {
133172 return null ;
134173 }
135- return outputPages .removeFirst ();
174+
175+ Page page = outputPages .removeFirst ();
176+ rowsEmitted += page .getPositionCount ();
177+
178+ return page ;
136179 }
137180
138181 @ Override
@@ -156,6 +199,69 @@ public String toString() {
156199 + "]" ;
157200 }
158201
202+ @ Override
203+ public Operator .Status status () {
204+ return new Status (emitNanos , pagesReceived , pagesProcessed , rowsReceived , rowsEmitted );
205+ }
206+
207+ public record Status (long emitNanos , int pagesReceived , int pagesProcessed , long rowsReceived , long rowsEmitted )
208+ implements
209+ Operator .Status {
210+
211+ public static final TransportVersion ESQL_FUSE_LINEAR_OPERATOR_STATUS = TransportVersion .fromName (
212+ "esql_fuse_linear_operator_status"
213+ );
214+
215+ public static final NamedWriteableRegistry .Entry ENTRY = new NamedWriteableRegistry .Entry (
216+ Operator .Status .class ,
217+ "linearScoreEval" ,
218+ Status ::new
219+ );
220+
221+ Status (StreamInput streamInput ) throws IOException {
222+ this (streamInput .readLong (), streamInput .readInt (), streamInput .readInt (), streamInput .readLong (), streamInput .readLong ());
223+ }
224+
225+ @ Override
226+ public String getWriteableName () {
227+ return ENTRY .name ;
228+ }
229+
230+ @ Override
231+ public boolean supportsVersion (TransportVersion version ) {
232+ return version .supports (ESQL_FUSE_LINEAR_OPERATOR_STATUS );
233+ }
234+
235+ @ Override
236+ public TransportVersion getMinimalSupportedVersion () {
237+ assert false : "must not be called when overriding supportsVersion" ;
238+ throw new UnsupportedOperationException ("must not be called when overriding supportsVersion" );
239+ }
240+
241+ @ Override
242+ public void writeTo (StreamOutput out ) throws IOException {
243+ out .writeLong (emitNanos );
244+ out .writeInt (pagesReceived );
245+ out .writeInt (pagesProcessed );
246+ out .writeLong (rowsReceived );
247+ out .writeLong (rowsEmitted );
248+ }
249+
250+ @ Override
251+ public XContentBuilder toXContent (XContentBuilder builder , Params params ) throws IOException {
252+ builder .startObject ();
253+ builder .field ("emit_nanos" , emitNanos );
254+ if (builder .humanReadable ()) {
255+ builder .field ("emit_time" , TimeValue .timeValueNanos (emitNanos ));
256+ }
257+ builder .field ("pages_received" , pagesReceived );
258+ builder .field ("pages_processed" , pagesProcessed );
259+ builder .field ("rows_received" , rowsReceived );
260+ builder .field ("rows_emitted" , rowsEmitted );
261+ return builder .endObject ();
262+ }
263+ }
264+
159265 private Normalizer createNormalizer (LinearConfig .Normalizer normalizer ) {
160266 return switch (normalizer ) {
161267 case NONE -> new NoneNormalizer ();
0 commit comments