-
Notifications
You must be signed in to change notification settings - Fork 25.6k
ES|QL: Add initial grammar and planning for RRF (snapshot) #123396
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
33eb3fe
9b00645
406b3ea
f08abb7
f8c2654
c6cbfd0
587ef71
ee50a80
281da1d
0a45f00
e401c23
40168dc
4b3a8d7
d56d280
b6b1a84
d1de323
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 123396 | ||
| summary: Add initial grammar and planning for RRF (snapshot) | ||
| area: ES|QL | ||
| type: feature | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,113 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| package org.elasticsearch.compute.operator; | ||
|
|
||
| import org.apache.lucene.util.BytesRef; | ||
| import org.elasticsearch.compute.data.Block; | ||
| import org.elasticsearch.compute.data.BytesRefBlock; | ||
| import org.elasticsearch.compute.data.DoubleVector; | ||
| import org.elasticsearch.compute.data.Page; | ||
| import org.elasticsearch.core.Releasables; | ||
|
|
||
| import java.util.HashMap; | ||
|
|
||
| /** | ||
| * Updates the score column with new scores using the RRF formula. | ||
| * Receives the position of the score and fork columns. | ||
| * The new score we assign to each row is equal to {@code 1 / (rank_constant + row_number)}. | ||
| * We use the fork discriminator column to determine the {@code row_number} for each row. | ||
| */ | ||
| public class RrfScoreEvalOperator implements Operator { | ||
ioanatia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| public record Factory(int forkPosition, int scorePosition) implements OperatorFactory { | ||
| @Override | ||
| public Operator get(DriverContext driverContext) { | ||
| return new RrfScoreEvalOperator(forkPosition, scorePosition); | ||
| } | ||
|
|
||
| @Override | ||
| public String describe() { | ||
| return "RrfScoreEvalOperator"; | ||
| } | ||
|
|
||
| } | ||
|
|
||
| private final int scorePosition; | ||
| private final int forkPosition; | ||
|
|
||
| private boolean finished = false; | ||
| private Page prev = null; | ||
|
|
||
| private HashMap<String, Integer> counters = new HashMap<>(); | ||
|
|
||
| public RrfScoreEvalOperator(int forkPosition, int scorePosition) { | ||
| this.scorePosition = scorePosition; | ||
| this.forkPosition = forkPosition; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean needsInput() { | ||
| return prev == null && finished == false; | ||
| } | ||
|
|
||
| @Override | ||
| public void addInput(Page page) { | ||
| assert prev == null : "has pending input page"; | ||
| prev = page; | ||
| } | ||
|
|
||
| @Override | ||
| public void finish() { | ||
| finished = true; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isFinished() { | ||
| return finished && prev == null; | ||
| } | ||
|
|
||
| @Override | ||
| public Page getOutput() { | ||
| Page page = prev; | ||
|
|
||
| BytesRefBlock forkBlock = (BytesRefBlock) page.getBlock(forkPosition); | ||
|
|
||
| DoubleVector.Builder scores = forkBlock.blockFactory().newDoubleVectorBuilder(forkBlock.getPositionCount()); | ||
|
|
||
| for (int i = 0; i < page.getPositionCount(); i++) { | ||
| String fork = forkBlock.getBytesRef(i, new BytesRef()).utf8ToString(); | ||
|
|
||
| int rank = counters.getOrDefault(fork, 1); | ||
| counters.put(fork, rank + 1); | ||
| scores.appendDouble(1.0 / (60 + rank)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor: this is currently configurable in _search, so we probably need to expose it as an option in the future here too
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right, we need to make the rank constant configurable. |
||
| } | ||
|
|
||
| Block scoreBlock = scores.build().asBlock(); | ||
| page = page.appendBlock(scoreBlock); | ||
|
|
||
| int[] projections = new int[page.getBlockCount() - 1]; | ||
|
|
||
| for (int i = 0; i < page.getBlockCount() - 1; i++) { | ||
| projections[i] = i == scorePosition ? page.getBlockCount() - 1 : i; | ||
| } | ||
|
|
||
| page = page.projectBlocks(projections); | ||
|
|
||
| prev = null; | ||
| return page; | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| Releasables.closeExpectNoException(() -> { | ||
| if (prev != null) { | ||
| prev.releaseBlocks(); | ||
| } | ||
| }); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| // | ||
| // CSV spec for RRF command | ||
| // | ||
|
|
||
| simpleRrf | ||
| required_capability: fork | ||
| required_capability: rrf | ||
| required_capability: match_operator_colon | ||
|
|
||
| FROM employees METADATA _id, _index, _score | ||
| | FORK ( WHERE emp_no:10001 ) | ||
| ( WHERE emp_no:10002 ) | ||
| | RRF | ||
| | KEEP _score, _fork, emp_no | ||
ioanatia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ; | ||
|
|
||
| _score:double | _fork:keyword | emp_no:integer | ||
| 0.01639344262295082 | fork1 | 10001 | ||
| 0.01639344262295082 | fork2 | 10002 | ||
| ; | ||
|
|
||
| rrfWithMatchAndScore | ||
| required_capability: fork | ||
| required_capability: rrf | ||
| required_capability: match_operator_colon | ||
|
|
||
| FROM books METADATA _id, _index, _score | ||
| | FORK ( WHERE title:"Tolkien" | SORT _score DESC | LIMIT 3 ) | ||
|
||
| ( WHERE author:"Tolkien" | SORT _score DESC | LIMIT 3 ) | ||
| | RRF | ||
| | KEEP _score, _fork, _id | ||
| ; | ||
|
|
||
| _score:double | _fork:keyword | _id:keyword | ||
| 0.03225806451612903 | [fork1, fork2] | 26 | ||
ioanatia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| 0.01639344262295082 | fork2 | 18 | ||
| 0.01639344262295082 | fork1 | 36 | ||
| 0.015873015873015872 | fork1 | 56 | ||
| 0.015873015873015872 | fork2 | 59 | ||
| ; | ||
ioanatia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -65,6 +65,7 @@ import ChangePoint, | |
| Metrics, | ||
| MvExpand, | ||
| Project, | ||
| Rrf, | ||
| Rename, | ||
| Show, | ||
| UnknownCommand; | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.