-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Execute single shard each pipeline for rate #133610
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
|
Pinging @elastic/es-storage-engine (Team:StorageEngine) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks Nhat!
|
Thanks Martijn! |
|
LGTM, looking fwd to see the impact. |
| if (p.operators().stream().anyMatch(s -> s.status() instanceof TimeSeriesSourceOperator.Status)) { | ||
| assertThat(p.operators(), hasSize(2)); | ||
| TimeSeriesSourceOperator.Status status = (TimeSeriesSourceOperator.Status) p.operators().get(0).status(); | ||
| assertThat(status.processedShards(), hasSize(1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw a failure in my PR:
REPRODUCE WITH: ./gradlew ":x-pack:plugin:esql:internalClusterTest" --tests "org.elasticsearch.xpack.esql.action.TimeSeriesIT.testProfile" -Dtests.seed=ADBEEDF1C962A1C8 -Dtests.locale=nso -Dtests.timezone=Europe/Samara -Druntime.java=24
--
| 2> java.lang.AssertionError:
| Expected: a collection with size <1>
| but: collection size was <0>
| at __randomizedtesting.SeedInfo.seed([ADBEEDF1C962A1C8:FCE458746D2F19E1]:0)
| at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
| at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
| at org.elasticsearch.test.ESTestCase.assertThat(ESTestCase.java:2706)
| at org.elasticsearch.xpack.esql.action.TimeSeriesIT.testProfile(TimeSeriesIT.java:556)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Kostas! I will look into it.
For each pipeline in rate aggregation, we use 3 drivers and execute one shard at a time; therefore, the maximum parallelism per query on data nodes is 3. Unlike non-rate aggregations, we cannot partition shards into multiple slices for concurrent execution, which is one of several limitations of rate aggregation. These changes adjust the data node executor to run multiple pipelines simultaneously, with each pipeline handling a single shard to increase parallelism when there are multiple target shards on data nodes.
This change is expected to improve benchmark by 3 times.