Skip to content

Conversation

julian-elastic
Copy link
Contributor

@julian-elastic julian-elastic commented Aug 19, 2025

Improve Expanding Lookup Join performance by pushing a filter to the right side of the lookup join.

As this is a performance optimization, we don't want to break the behavior for old nodes for CSS. The filter that we push down is optional and it is always reapplied after the lookup join. As a result if all nodes involved are new we will get performance benefits. Otherwise there might be partial or no performance benefits, but we will still execute the query successfully and get correct results if the query worked before this optimization.

With this PR ,we only handle Lucene pushable filters. Further optimizations are needed to get benefit by applying non-Lucene pushable filters and could be addressed separately in a later PR.

Changes to Rally-Tracks already added here elastic/rally-tracks#835, and we should see improvements once this is merged. Local results indicate around 90x improvement with the optimization for Lucene pushable filters on a test case that is specifically designed to demonstrate the benefits of this optimization. Customers are likely to see more limited benefits. The test case is an expanding lookup join of 100,000 rows table with 10,000 lookup table with filter of selectivity 0.1% (keeps 10 out of 10,000 rows of the lookup table). In the non-optimized version the filter is not pushed to the right, and we can get an explosion of records. We have 100,000 x10,000 = 1,000,000,000 rows after the join without the optimization. Then we filter then out to only 1,000,000 rows. With the optimization we apply the filter early so after the expanding join we only have 1,000,000 rows. This reduced max number of rows used by a factor of 1,000 and made the query 90 times faster.

Right Pushable filters with optimization
Running filtered join query...
test pushable join with filter on keyword: {"took":125,"documents_found":100000,"values":[[1000000]]}
test pushable join with filter on keyword: {"took":124,"documents_found":100000,"values":[[1000000]]}
test pushable join with filter on keyword: {"took":124,"documents_found":100000,"values":[[1000000]]}
test pushable join with filter on keyword: {"took":121,"documents_found":100000,"values":[[1000000]]}
test pushable join with filter on keyword: {"took":134,"documents_found":100000,"values":[[1000000]]}

Right Pushable filters without optimization
Running filtered join query...
test pushable join with filter on keyword: {"took":11315,"documents_found":100000,"values":[[1000000]]}
test pushable join with filter on keyword: {"took":11348,"documents_found":100000,"values":[[1000000]]}
test pushable join with filter on keyword: {"took":11330,"documents_found":100000,"values":[[1000000]]}
test pushable join with filter on keyword: {"took":11271,"documents_found":100000,"values":[[1000000]]}
test pushable join with filter on keyword: {"took":11258,"documents_found":100000,"values":[[1000000]]}

Script

#!/bin/bash

passwd="redacted"

# Cleanup and create test_left index
curl -sk -uelastic:$passwd -HContent-Type:application/json -XDELETE http://localhost:9200/test_left
curl -sk -uelastic:$passwd -HContent-Type:application/json -XPUT http://localhost:9200/test_left -d'{
    "settings": {
        "index.refresh_interval": -1
    },
    "mappings": {
        "properties": {
            "join_key": { "type": "keyword" },
            "value_left": { "type": "keyword" }
        }
    }
}'

# Cleanup and create test_right index
curl -sk -uelastic:$passwd -HContent-Type:application/json -XDELETE http://localhost:9200/test_right
curl -sk -uelastic:$passwd -HContent-Type:application/json -XPUT http://localhost:9200/test_right -d'{
    "settings": {
        "index.refresh_interval": -1,
        "index.mode": "lookup"
    },
    "mappings": {
        "properties": {
            "join_key": { "type": "keyword" },
            "filter_field_kw": { "type": "keyword" },
            "filter_field_int": { "type": "integer" }
        }
    }
}'

# Populate test_left with 100,000 documents
echo "Populating test_left..."
counter=1
for a in {1..100}; do
    rm -f /tmp/bulk_left
    for b in {1..1000}; do
        echo '{"index":{"_index":"test_left"}}' >> /tmp/bulk_left
        printf '{"join_key":"A", "value_left":"text %d"}\n' "$counter" >> /tmp/bulk_left
        counter=$((counter + 1))
    done
    printf "test_left: batch %02d/100 " $a
    response=$(curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST http://localhost:9200/_bulk?pretty --data-binary @/tmp/bulk_left)
    error=$(echo "$response" | jq -c '.errors')
    if [ "$error" != "false" ]; then
        echo "$response" | jq -c '.items[] | select(.index.error != null) | .index.error'
    else
        echo "OK"
    fi
done

# Populate test_right with 10,000 documents
echo "Populating test_right..."
# Batch 1: 10 'match' and 990 'no_match'
rm -f /tmp/bulk_right
for i in {1..10}; do
    echo '{"index":{"_index":"test_right"}}' >> /tmp/bulk_right
    printf '{"join_key":"A", "filter_field_kw":"match", "filter_field_int":1}\n' >> /tmp/bulk_right
done
for i in {1..990}; do
    echo '{"index":{"_index":"test_right"}}' >> /tmp/bulk_right
    printf '{"join_key":"A", "filter_field_kw":"no_match", "filter_field_int":2}\n' >> /tmp/bulk_right
done
printf "test_right: batch 01/10 "
response=$(curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST http://localhost:9200/_bulk?pretty --data-binary @/tmp/bulk_right)
error=$(echo "$response" | jq -c '.errors')
if [ "$error" != "false" ]; then
    echo "$response" | jq -c '.items[] | select(.index.error != null) | .index.error'
else
    echo "OK"
fi

# Batches 2-10: 1000 'no_match' each
for a in {2..10}; do
    rm -f /tmp/bulk_right
    for b in {1..1000}; do
        echo '{"index":{"_index":"test_right"}}' >> /tmp/bulk_right
        printf '{"join_key":"A", "filter_field_kw":"no_match", "filter_field_int":2}\n' >> /tmp/bulk_right
    done
    printf "test_right: batch %02d/10 " $a
    response=$(curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST http://localhost:9200/_bulk?pretty --data-binary @/tmp/bulk_right)
    error=$(echo "$response" | jq -c '.errors')
    if [ "$error" != "false" ]; then
        echo "$response" | jq -c '.items[] | select(.index.error != null) | .index.error'
    else
        echo "OK"
    fi
done



# Force merge and refresh both indices
curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST http://localhost:9200/test_left,test_right/_forcemerge?max_num_segments=1
curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST http://localhost:9200/test_left,test_right/_refresh
echo
curl -sk -uelastic:$passwd http://localhost:9200/_cat/indices?v

test_join_with_filter_pushable_keyword() {
    echo -n "test pushable join with filter on keyword: "
    response=$(curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST 'http://localhost:9200/_query?pretty' -d'{ "query": "FROM test_left | LOOKUP JOIN test_right ON join_key | WHERE filter_field_kw == \"match\" | STATS count=COUNT(*)", "pragma": { "data_partitioning": "shard" } }')
    error=$(echo "$response" | jq -c '.error')
    if [ "$error" != "null" ]; then
        echo "$error"
    else
        echo "$response" | jq -c '{took, documents_found, values}'
    fi
}

test_join_with_filter_pushable_int() {
    echo -n "test pushable join with filter on int: "
    response=$(curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST 'http://localhost:9200/_query?pretty' -d'{ "query": "FROM test_left | LOOKUP JOIN test_right ON join_key | WHERE filter_field_int == 1 | STATS count=COUNT(*)", "pragma": { "data_partitioning": "shard" } }')
    error=$(echo "$response" | jq -c '.error')
    if [ "$error" != "null" ]; then
        echo "$error"
    else
        echo "$response" | jq -c '{took, documents_found, values}'
    fi
}

test_join_with_filter_non_pushable_int() {
    echo -n "test non-pushable join with filter on int: "
    response=$(curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST 'http://localhost:9200/_query?pretty' -d'{ "query": "FROM test_left | LOOKUP JOIN test_right ON join_key | WHERE ABS(filter_field_int) == 1 | STATS count=COUNT(*)", "pragma": { "data_partitioning": "shard" } }')
    error=$(echo "$response" | jq -c '.error')
    if [ "$error" != "null" ]; then
        echo "$error"
    else
        echo "$response" | jq -c '{took, documents_found, values}'
    fi
}

echo
echo "Running filtered join query..."
##for a in {1..5}; do
 ##   test_join_with_filter
##done
for a in {1..5}; do
    test_join_with_filter_pushable_keyword
done

@julian-elastic julian-elastic self-assigned this Aug 19, 2025
@julian-elastic julian-elastic added :Analytics/ES|QL AKA ESQL >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) labels Aug 19, 2025
@elasticsearchmachine
Copy link
Collaborator

Hi @julian-elastic, I've created a changelog YAML for you.

@julian-elastic julian-elastic force-pushed the lookupPrefilterPushable branch 2 times, most recently from eb7c5c5 to 7eae250 Compare August 20, 2025 19:27
@julian-elastic julian-elastic force-pushed the lookupPrefilterPushable branch from 7eae250 to 2aa2b49 Compare August 20, 2025 19:33
@julian-elastic julian-elastic marked this pull request as ready for review August 20, 2025 19:34
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@elasticsearchmachine
Copy link
Collaborator

Hi @julian-elastic, I've created a changelog YAML for you.

@julian-elastic julian-elastic force-pushed the lookupPrefilterPushable branch from d13da18 to 5405235 Compare August 27, 2025 16:00
Copy link
Contributor

@bpintea bpintea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a partial first pass, leaving mostly cosmetic notes and a couple of questions.

Copy link
Contributor

@bpintea bpintea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.
We should maybe add a COALESCE tests in the csv suite. There is a IS NOT NULL currently, maybe there are some other lookup indices that can take an IS NULL too? Either on the join field or "enriching" field.
Otherwise, I've left some other comments, but no blockers. We should consider pushing down the conditions on the common field in LOOKUP JOIN too. Doesn't have to be now.

Also, let's wait on Alex's review too.

Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heya! This looks very good, thanks for the iterations @julian-elastic .

I realized that the current version swallows some exceptions which I don't think is how we generally process them. (See my remarks below.) I'd like to discuss this before merging. Otherwise, I have only non-blocking comments.

@julian-elastic julian-elastic force-pushed the lookupPrefilterPushable branch from ab90559 to 3116545 Compare September 2, 2025 19:28
Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @julian-elastic , this is cool!

Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I thought the error swallowing was addressed in both cases; only saw now that we keep the error swallowing in the LookupFromIndexService. Not re-blocking as I think we're really on the last stretch here, but I think this should be addressed before merging.

I also am not convinced that there are any queries that actually trigger the serialization code for LookupJoinExec and, unless we can find one, think that the serialization of BinaryExec/LookupJoinExec should be simplified again, otherwise it will look like necessary complexity for our future selves, complicating future work and refactors there. If there are indeed such queries, we should have them as ITs (csv tests, ideally).

@julian-elastic , please proceed at your own discretion once these two points are addressed; this doesn't require re-review IMHO, so don't wait on me to get this merged.

@julian-elastic julian-elastic merged commit b5120c5 into elastic:main Sep 3, 2025
33 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/ES|QL AKA ESQL >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.2.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants