Skip to content

Commit a15844d

Browse files
committed
Merge branch 'main' of github.com:elastic/elasticsearch into esql_namedqueries
2 parents 2edaef8 + 7eed096 commit a15844d

File tree

14 files changed

+387
-21
lines changed

14 files changed

+387
-21
lines changed

docs/changelog/127731.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127731
2+
summary: ESQL - Enable telemetry for COMPLETION command
3+
area: Search
4+
type: feature
5+
issues: []

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import io.netty.channel.Channel;
1313

1414
import org.elasticsearch.action.ActionListener;
15-
import org.elasticsearch.common.util.concurrent.ListenableFuture;
15+
import org.elasticsearch.action.support.SubscribableListener;
1616
import org.elasticsearch.http.HttpChannel;
1717
import org.elasticsearch.http.HttpResponse;
1818

@@ -25,7 +25,7 @@
2525
public class Netty4HttpChannel implements HttpChannel {
2626

2727
private final Channel channel;
28-
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
28+
private final SubscribableListener<Void> closeContext = new SubscribableListener<>();
2929

3030
Netty4HttpChannel(Channel channel) {
3131
this.channel = channel;

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerChannel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import io.netty.channel.Channel;
1313

1414
import org.elasticsearch.action.ActionListener;
15-
import org.elasticsearch.common.util.concurrent.ListenableFuture;
15+
import org.elasticsearch.action.support.SubscribableListener;
1616
import org.elasticsearch.http.HttpServerChannel;
1717

1818
import java.net.InetSocketAddress;
@@ -22,7 +22,7 @@
2222
public class Netty4HttpServerChannel implements HttpServerChannel {
2323

2424
private final Channel channel;
25-
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
25+
private final SubscribableListener<Void> closeContext = new SubscribableListener<>();
2626

2727
Netty4HttpServerChannel(Channel channel) {
2828
this.channel = channel;

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
import io.netty.channel.ChannelOption;
1515

1616
import org.elasticsearch.action.ActionListener;
17+
import org.elasticsearch.action.support.SubscribableListener;
1718
import org.elasticsearch.common.bytes.BytesReference;
18-
import org.elasticsearch.common.util.concurrent.ListenableFuture;
1919
import org.elasticsearch.core.IOUtils;
2020
import org.elasticsearch.core.Releasables;
2121
import org.elasticsearch.transport.TcpChannel;
@@ -30,8 +30,8 @@ public class Netty4TcpChannel implements TcpChannel {
3030
private final Channel channel;
3131
private final boolean isServer;
3232
private final String profile;
33-
private final ListenableFuture<Void> connectContext;
34-
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
33+
private final SubscribableListener<Void> connectContext = new SubscribableListener<>();
34+
private final SubscribableListener<Void> closeContext = new SubscribableListener<>();
3535
private final ChannelStats stats = new ChannelStats();
3636
private final boolean rstOnClose;
3737
/**
@@ -43,7 +43,6 @@ public class Netty4TcpChannel implements TcpChannel {
4343
this.channel = channel;
4444
this.isServer = isServer;
4545
this.profile = profile;
46-
this.connectContext = new ListenableFuture<>();
4746
this.rstOnClose = rstOnClose;
4847
addListener(connectFuture, connectContext);
4948
addListener(this.channel.closeFuture(), new ActionListener<>() {

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import io.netty.channel.Channel;
1313

1414
import org.elasticsearch.action.ActionListener;
15-
import org.elasticsearch.common.util.concurrent.ListenableFuture;
15+
import org.elasticsearch.action.support.SubscribableListener;
1616
import org.elasticsearch.transport.TcpServerChannel;
1717

1818
import java.net.InetSocketAddress;
@@ -22,7 +22,7 @@
2222
public class Netty4TcpServerChannel implements TcpServerChannel {
2323

2424
private final Channel channel;
25-
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
25+
private final SubscribableListener<Void> closeContext = new SubscribableListener<>();
2626

2727
Netty4TcpServerChannel(Channel channel) {
2828
this.channel = channel;

server/src/main/java/org/elasticsearch/transport/CloseableConnection.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
package org.elasticsearch.transport;
1111

1212
import org.elasticsearch.action.ActionListener;
13-
import org.elasticsearch.common.util.concurrent.ListenableFuture;
13+
import org.elasticsearch.action.support.SubscribableListener;
1414
import org.elasticsearch.core.AbstractRefCounted;
1515

1616
import java.util.concurrent.atomic.AtomicBoolean;
@@ -20,8 +20,8 @@
2020
*/
2121
public abstract class CloseableConnection extends AbstractRefCounted implements Transport.Connection {
2222

23-
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
24-
private final ListenableFuture<Void> removeContext = new ListenableFuture<>();
23+
private final SubscribableListener<Void> closeContext = new SubscribableListener<>();
24+
private final SubscribableListener<Void> removeContext = new SubscribableListener<>();
2525

2626
private final AtomicBoolean closed = new AtomicBoolean(false);
2727
private final AtomicBoolean removed = new AtomicBoolean(false);

server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@
1212
import org.apache.logging.log4j.Logger;
1313
import org.elasticsearch.action.ActionListener;
1414
import org.elasticsearch.action.support.ContextPreservingActionListener;
15+
import org.elasticsearch.action.support.SubscribableListener;
1516
import org.elasticsearch.cluster.node.DiscoveryNode;
1617
import org.elasticsearch.common.ReferenceDocs;
1718
import org.elasticsearch.common.settings.Settings;
1819
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
19-
import org.elasticsearch.common.util.concurrent.ListenableFuture;
2020
import org.elasticsearch.common.util.concurrent.RunOnce;
2121
import org.elasticsearch.common.util.concurrent.ThreadContext;
2222
import org.elasticsearch.core.AbstractRefCounted;
@@ -43,7 +43,7 @@ public class ClusterConnectionManager implements ConnectionManager {
4343
private static final Logger logger = LogManager.getLogger(ClusterConnectionManager.class);
4444

4545
private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = ConcurrentCollections.newConcurrentMap();
46-
private final ConcurrentMap<DiscoveryNode, ListenableFuture<Transport.Connection>> pendingConnections = ConcurrentCollections
46+
private final ConcurrentMap<DiscoveryNode, SubscribableListener<Transport.Connection>> pendingConnections = ConcurrentCollections
4747
.newConcurrentMap();
4848
private final AbstractRefCounted connectingRefCounter = AbstractRefCounted.of(this::pendingConnectionsComplete);
4949

@@ -184,8 +184,8 @@ private void connectToNodeOrRetry(
184184
return;
185185
}
186186

187-
final ListenableFuture<Transport.Connection> currentListener = new ListenableFuture<>();
188-
final ListenableFuture<Transport.Connection> existingListener = pendingConnections.putIfAbsent(node, currentListener);
187+
final SubscribableListener<Transport.Connection> currentListener = new SubscribableListener<>();
188+
final SubscribableListener<Transport.Connection> existingListener = pendingConnections.putIfAbsent(node, currentListener);
189189
if (existingListener != null) {
190190
try {
191191
// wait on previous entry to complete connection attempt
@@ -203,7 +203,7 @@ private void connectToNodeOrRetry(
203203
// extra connection to the node. We could _just_ check here, but checking up front skips the work to mark the connection as pending.
204204
final Transport.Connection existingConnectionRecheck = connectedNodes.get(node);
205205
if (existingConnectionRecheck != null) {
206-
ListenableFuture<Transport.Connection> future = pendingConnections.remove(node);
206+
var future = pendingConnections.remove(node);
207207
assert future == currentListener : "Listener in pending map is different than the expected listener";
208208
connectingRefCounter.decRef();
209209
future.onResponse(existingConnectionRecheck);
@@ -257,7 +257,7 @@ private void connectToNodeOrRetry(
257257
}
258258
}
259259
} finally {
260-
ListenableFuture<Transport.Connection> future = pendingConnections.remove(node);
260+
var future = pendingConnections.remove(node);
261261
assert future == currentListener : "Listener in pending map is different than the expected listener";
262262
managerRefs.decRef();
263263
releaseOnce.run();
@@ -387,9 +387,9 @@ private void failConnectionListener(
387387
DiscoveryNode node,
388388
RunOnce releaseOnce,
389389
Exception e,
390-
ListenableFuture<Transport.Connection> expectedListener
390+
SubscribableListener<Transport.Connection> expectedListener
391391
) {
392-
ListenableFuture<Transport.Connection> future = pendingConnections.remove(node);
392+
final var future = pendingConnections.remove(node);
393393
releaseOnce.run();
394394
if (future != null) {
395395
assert future == expectedListener : "Listener in pending map is different than the expected listener";
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.bootstrap;
11+
12+
import java.util.List;
13+
14+
record TestBuildInfo(String component, List<TestBuildInfoLocation> locations) {}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.bootstrap;
11+
12+
record TestBuildInfoLocation(String representativeClass, String module) {}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.bootstrap;
11+
12+
import org.elasticsearch.core.SuppressForbidden;
13+
import org.elasticsearch.xcontent.ObjectParser;
14+
import org.elasticsearch.xcontent.ParseField;
15+
import org.elasticsearch.xcontent.XContentFactory;
16+
import org.elasticsearch.xcontent.XContentParser;
17+
import org.elasticsearch.xcontent.XContentParserConfiguration;
18+
import org.elasticsearch.xcontent.XContentType;
19+
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
import java.net.URL;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
26+
class TestBuildInfoParser {
27+
28+
private static final String PLUGIN_TEST_BUILD_INFO_RESOURCES = "META-INF/plugin-test-build-info.json";
29+
private static final String SERVER_TEST_BUILD_INFO_RESOURCE = "META-INF/server-test-build-info.json";
30+
31+
private static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("test_build_info", Builder::new);
32+
private static final ObjectParser<Location, Void> LOCATION_PARSER = new ObjectParser<>("location", Location::new);
33+
static {
34+
LOCATION_PARSER.declareString(Location::representativeClass, new ParseField("representativeClass"));
35+
LOCATION_PARSER.declareString(Location::module, new ParseField("module"));
36+
37+
PARSER.declareString(Builder::component, new ParseField("component"));
38+
PARSER.declareObjectArray(Builder::locations, LOCATION_PARSER, new ParseField("locations"));
39+
}
40+
41+
private static class Location {
42+
private String representativeClass;
43+
private String module;
44+
45+
public void module(final String module) {
46+
this.module = module;
47+
}
48+
49+
public void representativeClass(final String representativeClass) {
50+
this.representativeClass = representativeClass;
51+
}
52+
}
53+
54+
private static final class Builder {
55+
private String component;
56+
private List<Location> locations;
57+
58+
public void component(final String component) {
59+
this.component = component;
60+
}
61+
62+
public void locations(final List<Location> locations) {
63+
this.locations = locations;
64+
}
65+
66+
TestBuildInfo build() {
67+
return new TestBuildInfo(
68+
component,
69+
locations.stream().map(l -> new TestBuildInfoLocation(l.representativeClass, l.module)).toList()
70+
);
71+
}
72+
}
73+
74+
static TestBuildInfo fromXContent(final XContentParser parser) throws IOException {
75+
return PARSER.parse(parser, null).build();
76+
}
77+
78+
static List<TestBuildInfo> parseAllPluginTestBuildInfo() throws IOException {
79+
var xContent = XContentFactory.xContent(XContentType.JSON);
80+
List<TestBuildInfo> pluginsTestBuildInfos = new ArrayList<>();
81+
var resources = TestBuildInfoParser.class.getClassLoader().getResources(PLUGIN_TEST_BUILD_INFO_RESOURCES);
82+
URL resource;
83+
while ((resource = resources.nextElement()) != null) {
84+
try (var stream = getStream(resource); var parser = xContent.createParser(XContentParserConfiguration.EMPTY, stream)) {
85+
pluginsTestBuildInfos.add(fromXContent(parser));
86+
}
87+
}
88+
return pluginsTestBuildInfos;
89+
}
90+
91+
static TestBuildInfo parseServerTestBuildInfo() throws IOException {
92+
var xContent = XContentFactory.xContent(XContentType.JSON);
93+
var resource = TestBuildInfoParser.class.getClassLoader().getResource(SERVER_TEST_BUILD_INFO_RESOURCE);
94+
// No test-build-info for server: this might be a non-gradle build. Proceed without TestBuildInfo
95+
if (resource == null) {
96+
return null;
97+
}
98+
try (var stream = getStream(resource); var parser = xContent.createParser(XContentParserConfiguration.EMPTY, stream)) {
99+
return fromXContent(parser);
100+
}
101+
}
102+
103+
@SuppressForbidden(reason = "URLs from class loader")
104+
private static InputStream getStream(URL resource) throws IOException {
105+
return resource.openStream();
106+
}
107+
}

0 commit comments

Comments
 (0)