Skip to content

Commit b22e241

Browse files
authored
Merge branch 'main' into query_visit_percentage
2 parents 674466c + 5a4c3ab commit b22e241

File tree

16 files changed

+262
-15
lines changed

16 files changed

+262
-15
lines changed

docs/changelog/131940.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 131940
2+
summary: Allow remote enrich after LOOKUP JOIN
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010
package org.elasticsearch.transport.netty4;
1111

1212
import org.apache.logging.log4j.Level;
13+
import org.apache.logging.log4j.core.LogEvent;
1314
import org.elasticsearch.ESNetty4IntegTestCase;
1415
import org.elasticsearch.core.TimeValue;
1516
import org.elasticsearch.test.ESIntegTestCase;
1617
import org.elasticsearch.test.MockLog;
1718
import org.elasticsearch.test.junit.annotations.TestLogging;
19+
import org.elasticsearch.transport.NodeDisconnectedException;
1820
import org.elasticsearch.transport.TcpTransport;
1921
import org.elasticsearch.transport.TransportLogger;
2022

@@ -117,7 +119,15 @@ public void testExceptionalDisconnectLogging() throws Exception {
117119
TcpTransport.class.getCanonicalName(),
118120
Level.DEBUG,
119121
".*closed transport connection \\[[1-9][0-9]*\\] to .* with age \\[[0-9]+ms\\], exception:.*"
120-
)
122+
) {
123+
@Override
124+
public void match(LogEvent event) {
125+
if (event.getThrown() instanceof NodeDisconnectedException nodeDisconnectedException
126+
&& nodeDisconnectedException.getMessage().contains("closed exceptionally: Netty4TcpChannel{")) {
127+
super.match(event);
128+
}
129+
}
130+
}
121131
);
122132

123133
final String nodeName = internalCluster().startNode();

muted-tests.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,12 @@ tests:
540540
- class: org.elasticsearch.xpack.esql.action.CrossClusterQueryWithPartialResultsIT
541541
method: testPartialResults
542542
issue: https://github.com/elastic/elasticsearch/issues/131481
543+
- class: org.elasticsearch.xpack.ml.integration.ClassificationIT
544+
method: testWithCustomFeatureProcessors
545+
issue: https://github.com/elastic/elasticsearch/issues/134001
546+
- class: org.elasticsearch.xpack.esql.action.RandomizedTimeSeriesIT
547+
method: testRateGroupBySubset
548+
issue: https://github.com/elastic/elasticsearch/issues/134019
543549

544550
# Examples:
545551
#

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,15 +1134,15 @@ public void onResponse(Void v) {
11341134
nodeChannels.channels.forEach(ch -> {
11351135
// Mark the channel init time
11361136
ch.getChannelStats().markAccessed(relativeMillisTime);
1137-
ch.addCloseListener(new ActionListener<Void>() {
1137+
ch.addCloseListener(new ActionListener<>() {
11381138
@Override
11391139
public void onResponse(Void ignored) {
11401140
nodeChannels.close();
11411141
}
11421142

11431143
@Override
11441144
public void onFailure(Exception e) {
1145-
nodeChannels.closeAndFail(e);
1145+
nodeChannels.closeAndFail(new NodeDisconnectedException(node, "closed exceptionally: " + ch, null, e));
11461146
}
11471147
});
11481148
});

x-pack/plugin/core/template-resources/src/main/resources/[email protected]

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
"template": {
1111
"lifecycle": {},
1212
"mappings": {
13+
"_meta": {
14+
"template_version": ${xpack.stack.template.version}
15+
},
1316
"properties": {
1417
"meta": {
1518
"properties": {

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 137 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1875,7 +1875,6 @@ type:keyword | language_code:integer | language_name:keyword
18751875
Production | 3 | Spanish
18761876
;
18771877

1878-
18791878
###############################################
18801879
# LOOKUP JOIN on mixed numerical fields
18811880
###############################################
@@ -4872,6 +4871,143 @@ Connected to 10.1.0.1 | English | English | n
48724871
Connected to 10.1.0.1 | English | null | United Kingdom
48734872
;
48744873

4874+
enrichAfterLookupJoin
4875+
required_capability: join_lookup_v12
4876+
4877+
FROM sample_data
4878+
| KEEP message
4879+
| WHERE message == "Connected to 10.1.0.1"
4880+
| EVAL language_code = "1"
4881+
| LOOKUP JOIN message_types_lookup ON message
4882+
| ENRICH languages_policy ON language_code
4883+
;
4884+
4885+
message:keyword | language_code:keyword | type:keyword | language_name:keyword
4886+
Connected to 10.1.0.1 | 1 | Success | English
4887+
;
4888+
4889+
###############################################
4890+
# LOOKUP JOIN and remote ENRICH
4891+
###############################################
4892+
4893+
remoteEnrichAfterLookupJoin
4894+
required_capability: join_lookup_v12
4895+
required_capability: remote_enrich_after_lookup_join
4896+
4897+
FROM sample_data
4898+
| KEEP message
4899+
| WHERE message == "Connected to 10.1.0.1"
4900+
| EVAL language_code = "1"
4901+
| LOOKUP JOIN message_types_lookup ON message
4902+
| ENRICH _remote:languages_policy ON language_code
4903+
;
4904+
4905+
message:keyword | language_code:keyword | type:keyword | language_name:keyword
4906+
Connected to 10.1.0.1 | 1 | Success | English
4907+
;
4908+
4909+
remoteEnrichSortAfterLookupJoin
4910+
required_capability: join_lookup_v12
4911+
required_capability: remote_enrich_after_lookup_join
4912+
4913+
FROM sample_data
4914+
| KEEP message
4915+
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
4916+
| EVAL language_code = "1"
4917+
| LOOKUP JOIN message_types_lookup ON message
4918+
| ENRICH _remote:languages_policy ON language_code
4919+
| SORT message ASC
4920+
;
4921+
4922+
message:keyword | language_code:keyword | type:keyword | language_name:keyword
4923+
Connected to 10.1.0.1 | 1 | Success | English
4924+
Connected to 10.1.0.2 | 1 | Success | English
4925+
;
4926+
4927+
sortRemoteEnrichAfterLookupJoin
4928+
required_capability: join_lookup_v12
4929+
required_capability: remote_enrich_after_lookup_join
4930+
4931+
FROM sample_data
4932+
| KEEP message
4933+
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
4934+
| EVAL language_code = "1"
4935+
| LOOKUP JOIN message_types_lookup ON message
4936+
| SORT message ASC
4937+
| ENRICH _remote:languages_policy ON language_code
4938+
| LIMIT 2
4939+
;
4940+
4941+
message:keyword | language_code:keyword | type:keyword | language_name:keyword
4942+
Connected to 10.1.0.1 | 1 | Success | English
4943+
Connected to 10.1.0.2 | 1 | Success | English
4944+
;
4945+
4946+
remoteEnrichSortAfterLookupJoinWithLimit
4947+
required_capability: join_lookup_v12
4948+
required_capability: remote_enrich_after_lookup_join
4949+
4950+
FROM sample_data
4951+
| KEEP message
4952+
| WHERE message == "Connection error"
4953+
| EVAL language_code = "1"
4954+
| LOOKUP JOIN message_types_lookup ON message
4955+
| LIMIT 2
4956+
| ENRICH _remote:languages_policy ON language_code
4957+
| SORT message ASC
4958+
;
4959+
4960+
message:keyword | language_code:keyword | type:keyword | language_name:keyword
4961+
Connection error | 1 | Error | English
4962+
Connection error | 1 | Error | English
4963+
;
4964+
4965+
remoteEnrichBetweenLookupJoins
4966+
required_capability: join_lookup_v12
4967+
required_capability: remote_enrich_after_lookup_join
4968+
4969+
FROM sample_data
4970+
| KEEP message, client_ip
4971+
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
4972+
| EVAL language_code = "1", client_ip=to_string(client_ip)
4973+
| LOOKUP JOIN message_types_lookup ON message
4974+
| ENRICH _remote:languages_policy ON language_code
4975+
| LOOKUP JOIN clientips_lookup ON client_ip
4976+
| DROP language_code
4977+
| SORT message ASC
4978+
;
4979+
4980+
message:keyword | client_ip:keyword | type:keyword | language_name:keyword | env:keyword
4981+
Connected to 10.1.0.1 | 172.21.3.15 | Success | English | Production
4982+
Connected to 10.1.0.2 | 172.21.2.113 | Success | English | QA
4983+
;
4984+
4985+
remoteEnrichesAndLookupJoins
4986+
required_capability: join_lookup_v12
4987+
required_capability: remote_enrich_after_lookup_join
4988+
4989+
FROM sample_data
4990+
| EVAL language_code = "1", client_ip=to_string(client_ip)
4991+
| ENRICH _remote:languages_policy ON language_code
4992+
| LOOKUP JOIN clientips_lookup ON client_ip
4993+
| EVAL env1 = env
4994+
| ENRICH _remote:clientip_policy ON client_ip
4995+
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
4996+
| LOOKUP JOIN message_types_lookup ON message
4997+
| KEEP message, client_ip, env, env1, type, language_name
4998+
| SORT message ASC
4999+
| LIMIT 10
5000+
;
5001+
5002+
message:keyword | client_ip:keyword | env:keyword | env1: keyword | type:keyword | language_name:keyword
5003+
Connected to 10.1.0.1 | 172.21.3.15 | Production | Production | Success | English
5004+
Connected to 10.1.0.2 | 172.21.2.113 | QA | QA | Success | English
5005+
;
5006+
5007+
###############################################
5008+
# Multi-field LOOKUP JOIN
5009+
###############################################
5010+
48755011
lookupJoinOnTwoFields
48765012
required_capability: join_lookup_v12
48775013
required_capability: lookup_join_on_multiple_fields

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void testEnrichAfterStop() throws Exception {
117117
SimplePauseFieldPlugin.allowEmitting.countDown();
118118

119119
try (EsqlQueryResponse resp = stopAction.actionGet(30, TimeUnit.SECONDS)) {
120-
// Compare this to CrossClustersEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent
120+
// Compare this to CrossClusterEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent
121121
// because we stopped it before processing the data
122122
assertThat(
123123
getValuesList(resp),

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import static org.hamcrest.Matchers.lessThanOrEqualTo;
3131

3232
/**
33-
* This IT test is the dual of CrossClustersEnrichIT, which tests "happy path"
33+
* This IT test is the dual of CrossClusterEnrichIT, which tests "happy path"
3434
* and this one tests unavailable cluster scenarios using (most of) the same tests.
3535
*/
3636
public class CrossClusterEnrichUnavailableClustersIT extends AbstractEnrichBasedCrossClusterTestCase {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,6 +1277,12 @@ public enum Cap {
12771277
*/
12781278
ENABLE_LOOKUP_JOIN_ON_REMOTE(Build.current().isSnapshot()),
12791279

1280+
/**
1281+
* Fix the planning of {@code | ENRICH _remote:policy} when there's a preceding {@code | LOOKUP JOIN},
1282+
* see <a href="https://github.com/elastic/elasticsearch/issues/129372">java.lang.ClassCastException when combining LOOKUP JOIN and remote ENRICH</a>
1283+
*/
1284+
REMOTE_ENRICH_AFTER_LOOKUP_JOIN,
1285+
12801286
/**
12811287
* MATCH PHRASE function
12821288
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ public LogicalPlan visitShowInfo(EsqlBaseParser.ShowInfoContext ctx) {
465465

466466
@Override
467467
public PlanFactory visitEnrichCommand(EsqlBaseParser.EnrichCommandContext ctx) {
468-
return p -> {
468+
return child -> {
469469
var source = source(ctx);
470470
Tuple<Mode, String> tuple = parsePolicyName(ctx.policyName);
471471
Mode mode = tuple.v1();
@@ -484,9 +484,15 @@ public PlanFactory visitEnrichCommand(EsqlBaseParser.EnrichCommandContext ctx) {
484484
}
485485

486486
List<NamedExpression> keepClauses = visitList(this, ctx.enrichWithClause(), NamedExpression.class);
487+
488+
// If this is a remote-only ENRICH, any upstream LOOKUP JOINs need to be treated as remote-only, too.
489+
if (mode == Mode.REMOTE) {
490+
child = child.transformDown(LookupJoin.class, lj -> new LookupJoin(lj.source(), lj.left(), lj.right(), lj.config(), true));
491+
}
492+
487493
return new Enrich(
488494
source,
489-
p,
495+
child,
490496
mode,
491497
Literal.keyword(source(ctx.policyName), policyNameString),
492498
matchField,

0 commit comments

Comments
 (0)