Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.DoubleVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;

import java.util.HashMap;

Expand All @@ -22,7 +21,7 @@
* The new score we assign to each row is equal to {@code 1 / (rank_constant + row_number)}.
* We use the fork discriminator column to determine the {@code row_number} for each row.
*/
public class RrfScoreEvalOperator implements Operator {
public class RrfScoreEvalOperator extends AbstractPageMappingOperator {

public record Factory(int forkPosition, int scorePosition) implements OperatorFactory {
@Override
Expand All @@ -40,9 +39,6 @@ public String describe() {
private final int scorePosition;
private final int forkPosition;

private boolean finished = false;
private Page prev = null;

private HashMap<String, Integer> counters = new HashMap<>();

public RrfScoreEvalOperator(int forkPosition, int scorePosition) {
Expand All @@ -51,30 +47,7 @@ public RrfScoreEvalOperator(int forkPosition, int scorePosition) {
}

@Override
public boolean needsInput() {
return prev == null && finished == false;
}

@Override
public void addInput(Page page) {
assert prev == null : "has pending input page";
prev = page;
}

@Override
public void finish() {
finished = true;
}

@Override
public boolean isFinished() {
return finished && prev == null;
}

@Override
public Page getOutput() {
Page page = prev;

protected Page process(Page page) {
BytesRefBlock forkBlock = (BytesRefBlock) page.getBlock(forkPosition);

DoubleVector.Builder scores = forkBlock.blockFactory().newDoubleVectorBuilder(forkBlock.getPositionCount());
Expand All @@ -96,18 +69,11 @@ public Page getOutput() {
projections[i] = i == scorePosition ? page.getBlockCount() - 1 : i;
}

page = page.projectBlocks(projections);

prev = null;
return page;
return page.projectBlocks(projections);
}

@Override
public void close() {
Releasables.closeExpectNoException(() -> {
if (prev != null) {
prev.releaseBlocks();
}
});
public String toString() {
return "RrfScoreEvalOperator";
}
}
93 changes: 82 additions & 11 deletions x-pack/plugin/esql/qa/testFixtures/src/main/resources/rrf.csv-spec
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ FROM employees METADATA _id, _index, _score
| FORK ( WHERE emp_no:10001 )
( WHERE emp_no:10002 )
| RRF
| EVAL _score = round(_score, 4)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice trick! ❤️

| KEEP _score, _fork, emp_no
| SORT _score, _fork, emp_no
;

_score:double | _fork:keyword | emp_no:integer
0.01639344262295082 | fork1 | 10001
0.01639344262295082 | fork2 | 10002
_score:double | _fork:keyword | emp_no:integer
0.0164 | fork1 | 10001
0.0164 | fork2 | 10002
;

rrfWithMatchAndScore
Expand All @@ -25,16 +27,85 @@ required_capability: rrf
required_capability: match_operator_colon

FROM books METADATA _id, _index, _score
| FORK ( WHERE title:"Tolkien" | SORT _score DESC | LIMIT 3 )
( WHERE author:"Tolkien" | SORT _score DESC | LIMIT 3 )
| FORK ( WHERE title:"Tolkien" | SORT _score, _id DESC | LIMIT 3 )
( WHERE author:"Tolkien" | SORT _score, _id DESC | LIMIT 3 )
| RRF
| EVAL _fork = mv_sort(_fork)
| EVAL _score = round(_score, 5)
| KEEP _score, _fork, _id
;

_score:double | _fork:keyword | _id:keyword
0.03225806451612903 | [fork1, fork2] | 26
0.01639344262295082 | fork2 | 18
0.01639344262295082 | fork1 | 36
0.015873015873015872 | fork1 | 56
0.015873015873015872 | fork2 | 59
_score:double | _fork:keyword | _id:keyword
0.03279 | [fork1, fork2] | 4
0.01613 | fork1 | 56
0.01613 | fork2 | 60
0.01587 | fork2 | 1
0.01587 | fork1 | 26
;

rrfWithDisjunctionAndPostFilter
required_capability: fork
required_capability: rrf
required_capability: match_operator_colon

FROM books METADATA _id, _index, _score
| FORK ( WHERE title:"Tolkien" OR author:"Tolkien" | SORT _score, _id DESC | LIMIT 3 )
( WHERE author:"Tolkien" | SORT _score, _id DESC | LIMIT 3 )
| RRF
| EVAL _fork = mv_sort(_fork)
| EVAL _score = round(_score, 5)
| KEEP _score, _fork, _id
| WHERE _score > 0.014
;

_score:double | _fork:keyword | _id:keyword
0.03252 | [fork1, fork2] | 60
0.032 | [fork1, fork2] | 1
0.01639 | fork2 | 4
0.01587 | fork1 | 40
;

rrfWithStats
required_capability: fork
required_capability: rrf
required_capability: match_operator_colon

FROM books METADATA _id, _index, _score
| FORK ( WHERE title:"Tolkien" | SORT _score, _id DESC | LIMIT 3 )
( WHERE author:"Tolkien" | SORT _score, _id DESC | LIMIT 3 )
( WHERE author:"Ursula K. Le Guin" AND title:"short stories" | SORT _score, _id DESC | LIMIT 3)
| RRF
| STATS count_fork=COUNT(*) BY _fork
;

count_fork:long | _fork:keyword
3 | fork1
3 | fork2
1 | fork3
;

rrfWithMultipleForkBranches
required_capability: fork
required_capability: rrf
required_capability: match_operator_colon

FROM books METADATA _id, _index, _score
| FORK (WHERE author:"Keith Faulkner" AND qstr("author:Rory or author:Beverlie") | SORT _score, _id DESC | LIMIT 3)
(WHERE author:"Ursula K. Le Guin" | SORT _score, _id DESC | LIMIT 3)
(WHERE title:"Tolkien" AND author:"Tolkien" AND year > 2000 AND mv_count(author) == 1 | SORT _score, _id DESC | LIMIT 3)
(WHERE match(author, "Keith Faulkner") AND match(author, "Rory Tyger") | SORT _score, _id DESC | LIMIT 3)
| RRF
| EVAL _fork = mv_sort(_fork)
| EVAL _score = round(_score, 4)
| EVAL title = trim(substring(title, 1, 20))
| KEEP _score, author, title, _fork
;

_score:double | author:keyword | title:keyword | _fork:keyword
0.0328 | [Keith Faulkner, Rory Tyger] | Pop! Went Another Ba | [fork1, fork4]
0.0164 | J.R.R. Tolkien | Letters of J R R Tol | fork3
0.0164 | Ursula K. Le Guin | The wind's twelve qu | fork2
0.0161 | [Beverlie Manson, Keith Faulkner] | Rainbow's End: A Mag | fork1
0.0161 | Ursula K. Le Guin | The Word For World i | fork2
0.0159 | Ursula K. Le Guin | The Dispossessed | fork2
;
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.dissect.DissectException;
import org.elasticsearch.dissect.DissectParser;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
Expand Down Expand Up @@ -658,10 +659,10 @@ public List<PlanFactory> visitForkSubQueries(EsqlBaseParser.ForkSubQueriesContex
// align _fork id across all fork branches
Alias alias = null;
if (firstForkNameId == null) {
alias = new Alias(source(ctx), "_fork", literal);
alias = new Alias(source(ctx), Fork.FORK_FIELD, literal);
firstForkNameId = alias.id();
} else {
alias = new Alias(source(ctx), "_fork", literal, firstForkNameId);
alias = new Alias(source(ctx), Fork.FORK_FIELD, literal, firstForkNameId);
}

var finalAlias = alias;
Expand Down Expand Up @@ -694,12 +695,12 @@ public PlanFactory visitCompositeForkSubQuery(EsqlBaseParser.CompositeForkSubQue
public PlanFactory visitRrfCommand(EsqlBaseParser.RrfCommandContext ctx) {
return input -> {
Source source = source(ctx);
Attribute scoreAttr = new UnresolvedAttribute(source, "_score");
Attribute forkAttr = new UnresolvedAttribute(source, "_fork");
Attribute idAttr = new UnresolvedAttribute(source, "_id");
Attribute indexAttr = new UnresolvedAttribute(source, "_index");
Attribute scoreAttr = new UnresolvedAttribute(source, MetadataAttribute.SCORE);
Attribute forkAttr = new UnresolvedAttribute(source, Fork.FORK_FIELD);
Attribute idAttr = new UnresolvedAttribute(source, IdFieldMapper.NAME);
Attribute indexAttr = new UnresolvedAttribute(source, MetadataAttribute.INDEX);
List<NamedExpression> aggregates = List.of(
new Alias(source, "_score", new Sum(source, scoreAttr, new Literal(source, true, DataType.BOOLEAN)))
new Alias(source, MetadataAttribute.SCORE, new Sum(source, scoreAttr, new Literal(source, true, DataType.BOOLEAN)))
);
List<Attribute> groupings = List.of(idAttr, indexAttr);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* {@code FORK [WHERE content:"fox" ] [WHERE content:"dog"] }
*/
public class Fork extends UnaryPlan implements SurrogateLogicalPlan {
public static final String FORK_FIELD = "_fork";

private final List<LogicalPlan> subPlans;
List<Attribute> lazyOutput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@
package org.elasticsearch.xpack.esql.plan.logical;

import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;

import java.io.IOException;
import java.util.Locale;
import java.util.Objects;

public class RrfScoreEval extends UnaryPlan {
import static org.elasticsearch.xpack.esql.common.Failure.fail;

public class RrfScoreEval extends UnaryPlan implements PostAnalysisVerificationAware {
private final Attribute forkAttr;
private final Attribute scoreAttr;

Expand Down Expand Up @@ -56,4 +62,35 @@ public Attribute scoreAttribute() {
public Attribute forkAttribute() {
return forkAttr;
}

@Override
public void postAnalysisVerification(Failures failures) {
if (this.child() instanceof Fork == false) {
failures.add(
fail(
this,
"Invalid use of RRF. RRF can only be used after FORK, but found {}",
child().sourceText().split(" ")[0].toUpperCase(Locale.ROOT)
)
);
}
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), scoreAttr, forkAttr);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}

RrfScoreEval rrf = (RrfScoreEval) obj;
return child().equals(rrf.child()) && scoreAttr.equals(rrf.scoreAttribute()) && forkAttr.equals(forkAttribute());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
import org.elasticsearch.xpack.esql.core.expression.NameId;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.expression.TypedAttribute;
Expand All @@ -76,6 +77,7 @@
import org.elasticsearch.xpack.esql.evaluator.EvalMapper;
import org.elasticsearch.xpack.esql.evaluator.command.GrokEvaluatorExtracter;
import org.elasticsearch.xpack.esql.expression.Order;
import org.elasticsearch.xpack.esql.plan.logical.Fork;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.ChangePointExec;
import org.elasticsearch.xpack.esql.plan.physical.DissectExec;
Expand Down Expand Up @@ -282,10 +284,10 @@ private PhysicalOperation planRrfScoreEvalExec(RrfScoreEvalExec rrf, LocalExecut
int forkPosition = -1;
int pos = 0;
for (Attribute attr : rrf.child().output()) {
if (attr.name().equals("_fork")) {
if (attr.name().equals(Fork.FORK_FIELD)) {
forkPosition = pos;
}
if (attr.name().equals("_score")) {
if (attr.name().equals(MetadataAttribute.SCORE)) {
scorePosition = pos;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2938,6 +2938,22 @@ public void testRrfError() {
assertThat(e.getMessage(), containsString("Unknown column [_score]"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should provide a better error message for missing metadata attrs- something like "_score is needed for using RRF. Please add METADATA _score to your FROM command".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this - looks simple enough at a first glance. We can just modify this to have a custom error message when MetadataAttribute.isSupported(name) is true:

public static String errorMessage(String name, List<String> potentialMatches) {
String msg = "Unknown column [" + name + "]";
if (CollectionUtils.isEmpty(potentialMatches) == false) {
msg += ", did you mean "
+ (potentialMatches.size() == 1 ? "[" + potentialMatches.get(0) + "]" : "any of " + potentialMatches.toString())
+ "?";
}
return msg;
}

However we would return an error message like "Please add METADATA _score to your FROM command" even if you use ROW:

ROW a = 1, b = "two", c = null
| WHERE _score > 1

I know this is a very narrow corner case, but it would be an unintended behaviour.
It's not straighforward to get the context when we call UnresolvedAttribute.errorMessage whether the source command supports metadata attributes or not. So I think at most, we can look into this separately and not make the change here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be OK to error with "_score is needed for using RRF. Use FROM ... METADATA _score".

We can assume that full text search needs FROM, as FTFs need an index attribute to operate on?

We can refine this in a follow up, but it will be very confusing for users to receive "unknown column _score" - being a metadata attribute means users won't understand where's that coming from without referring to docs

Copy link
Contributor Author

@ioanatia ioanatia Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed - added as a follow up in #123391

assertThat(e.getMessage(), containsString("Unknown column [_fork]"));

e = expectThrows(VerificationException.class, () -> analyze("""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the sequence between FORK and RRF matters? For example if the sequence of fork and RRF is reversed, do we recognized it as a valid query?

| RRF
| FORK (WHERE a:"x")
       (WHERE a:"y")

Do we allow multiple fork or RRF, like below? Do they make sense? ES|QL does not prevent multiple occurrence of the same processing commands, commands like where, eval etc. can be used multiple times in the same query, is this also true for RRF and fork?

| FORK (WHERE a:"x")
       (WHERE a:"y")
| RRF
| RRF
or
| FORK (WHERE a:"x")
       (WHERE a:"y")
| RRF
| FORK (WHERE b:"x")
       (WHERE b:"y")
| RRF

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have put a validation for RrfScoreEval such that we only allow RRF after a FORK command.
It might seem a bit extreme, but it makes sense in practice because while we might be able to execute the following queries, they don't make a lot of sense:

| RRF
| FORK (WHERE a:"x")
       (WHERE a:"y")

or

| FORK (WHERE a:"x")
       (WHERE a:"y")
| RRF
| RRF

Another thing to note is that we currently have a restriction for FORK where it's possible to only have a single FORK command in a query, so the following is not something we can do atm:

| FORK (WHERE a:"x")
       (WHERE a:"y")
| RRF
| FORK (WHERE b:"x")
       (WHERE b:"y")
| RRF

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did see one thing that was concerning when I tried to do:

| FORK (WHERE a:"x")
       (WHERE a:"y")
| RRF
| RRF

this would lead to an unexecutable query because when do the RRF planning this expands to:

| FORK (WHERE a:"x")
       (WHERE a:"y")
| RrfScoreEval
| Dedup
| Sort
| RrfScoreEval
| Dedup
| Sort

The first SORT does not have a LIMIT so it cannot be translated to a TOP N.
I need to think more about this, not about supporting the case where we do RRF after RRF, but how to avoid this case of having unexecutable queries - I added it as a follow in #123391

from test metadata _score, _index, _id
| eval _fork = 1
| rrf
"""));
assertThat(e.getMessage(), containsString("RRF can only be used after FORK, but found EVAL"));

e = expectThrows(VerificationException.class, () -> analyze("""
from test metadata _id, _index, _score
| fork ( where first_name:"foo" )
( where first_name:"bar" )
| rrf
| rrf
"""));
assertThat(e.getMessage(), containsString("RRF can only be used after FORK, but found RRF"));

e = expectThrows(VerificationException.class, () -> analyze("""
from test
| FORK ( WHERE emp_no == 1 )
Expand All @@ -2953,6 +2969,14 @@ public void testRrfError() {
| RRF
"""));
assertThat(e.getMessage(), containsString("Unknown column [_index]"));

e = expectThrows(VerificationException.class, () -> analyze("""
from test metadata _score, _index
| FORK ( WHERE emp_no == 1 )
( WHERE emp_no > 1 )
| RRF
"""));
assertThat(e.getMessage(), containsString("Unknown column [_id]"));
}

// TODO There's too much boilerplate involved here! We need a better way of creating FieldCapabilitiesResponses from a mapping or index.
Expand Down