Skip to content

Commit 5e79659

Browse files
feat: add Consul discovery contrib extension, harden Consul discovery and leadership election
1 parent 2959cde commit 5e79659

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+7108
-19
lines changed

distribution/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,10 @@
453453
<argument>-c</argument>
454454
<argument>org.apache.druid.extensions.contrib:druid-deltalake-extensions</argument>
455455
<argument>-c</argument>
456+
<argument>org.apache.druid.extensions.contrib:grpc-query</argument>
457+
<argument>-c</argument>
458+
<argument>org.apache.druid.extensions.contrib:druid-consul-extensions</argument>
459+
<argument>-c</argument>
456460
<argument>org.apache.druid.extensions.contrib:druid-spectator-histogram</argument>
457461
<argument>-c</argument>
458462
<argument>org.apache.druid.extensions.contrib:druid-rabbit-indexing-service</argument>

docs/development/extensions-contrib/consul.md

Lines changed: 648 additions & 0 deletions
Large diffs are not rendered by default.

embedded-tests/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,12 @@
547547
<version>5.5</version>
548548
<scope>test</scope>
549549
</dependency>
550+
<dependency>
551+
<groupId>org.apache.druid.extensions.contrib</groupId>
552+
<artifactId>druid-consul-extensions</artifactId>
553+
<version>${project.parent.version}</version>
554+
<scope>test</scope>
555+
</dependency>
550556

551557
</dependencies>
552558

Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.testing.embedded.consul;
21+
22+
import org.apache.druid.java.util.common.StringUtils;
23+
import org.apache.druid.java.util.common.logger.Logger;
24+
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
25+
import org.apache.druid.testing.embedded.TestcontainerResource;
26+
import org.apache.druid.testing.utils.TLSCertificateBundle;
27+
import org.apache.druid.testing.utils.TLSCertificateGenerator;
28+
import org.testcontainers.containers.BindMode;
29+
import org.testcontainers.containers.GenericContainer;
30+
import org.testcontainers.containers.wait.strategy.Wait;
31+
import org.testcontainers.utility.DockerImageName;
32+
33+
import javax.annotation.Nullable;
34+
import java.net.URI;
35+
import java.time.Duration;
36+
37+
/**
38+
* Runs a single-node Consul agent for use as the discovery backend during docker tests.
39+
* Supports plain HTTP, TLS, and mutual TLS (mTLS) modes.
40+
*/
41+
public class ConsulClusterResource extends TestcontainerResource<GenericContainer<?>>
42+
{
43+
private static final Logger log = new Logger(ConsulClusterResource.class);
44+
private static final int CONSUL_HTTP_PORT = 8500;
45+
private static final int CONSUL_HTTPS_PORT = 8501;
46+
private static final DockerImageName CONSUL_IMAGE = DockerImageName.parse("hashicorp/consul:1.18");
47+
48+
private final ConsulSecurityMode securityMode;
49+
private String consulHostForDruid;
50+
private int consulPortForDruid;
51+
52+
@Nullable
53+
private TLSCertificateBundle certBundle;
54+
55+
/**
56+
* Creates a Consul cluster resource with plain HTTP (no encryption).
57+
*/
58+
public ConsulClusterResource()
59+
{
60+
this(ConsulSecurityMode.PLAIN);
61+
}
62+
63+
/**
64+
* Creates a Consul cluster resource with the specified security mode.
65+
*
66+
* @param securityMode security mode (PLAIN, TLS, or MTLS)
67+
*/
68+
public ConsulClusterResource(ConsulSecurityMode securityMode)
69+
{
70+
this.securityMode = securityMode;
71+
}
72+
73+
@Override
74+
protected GenericContainer<?> createContainer()
75+
{
76+
try {
77+
// Generate certificates for TLS/mTLS modes
78+
if (securityMode == ConsulSecurityMode.TLS || securityMode == ConsulSecurityMode.MTLS) {
79+
certBundle = TLSCertificateGenerator.generateToTempDirectory();
80+
log.info("Generated TLS certificates for Consul in: %s", certBundle.getCertificateDirectory());
81+
}
82+
83+
GenericContainer<?> container = new GenericContainer<>(CONSUL_IMAGE);
84+
85+
if (securityMode == ConsulSecurityMode.PLAIN) {
86+
// Plain HTTP mode
87+
container
88+
.withCommand(
89+
"agent",
90+
"-server",
91+
"-bootstrap-expect=1",
92+
"-client=0.0.0.0",
93+
"-bind=0.0.0.0",
94+
"-ui",
95+
"-datacenter=dc1"
96+
)
97+
.withExposedPorts(CONSUL_HTTP_PORT)
98+
.waitingFor(
99+
Wait.forHttp("/v1/status/leader")
100+
.forStatusCode(200)
101+
.withStartupTimeout(Duration.ofMinutes(2))
102+
);
103+
} else {
104+
// TLS or mTLS mode
105+
String configFile = securityMode == ConsulSecurityMode.TLS
106+
? "consul-config-tls-only.json"
107+
: "consul-config-mtls.json";
108+
109+
container
110+
.withCommand(
111+
"agent",
112+
"-dev",
113+
"-config-file=/consul/config/" + configFile
114+
)
115+
.withExposedPorts(CONSUL_HTTPS_PORT)
116+
// Mount certificate directory
117+
.withFileSystemBind(
118+
certBundle.getCertificateDirectory(),
119+
"/tls",
120+
BindMode.READ_ONLY
121+
)
122+
// Mount Consul TLS config from test resources
123+
.withClasspathResourceMapping(
124+
"tls/" + configFile,
125+
"/consul/config/" + configFile,
126+
BindMode.READ_ONLY
127+
)
128+
.waitingFor(
129+
Wait.forHttps("/v1/status/leader")
130+
.allowInsecure() // Self-signed certificate
131+
.forStatusCode(200)
132+
.withStartupTimeout(Duration.ofMinutes(2))
133+
);
134+
}
135+
136+
return container;
137+
}
138+
catch (Exception e) {
139+
throw new RuntimeException("Failed to create Consul container", e);
140+
}
141+
}
142+
143+
@Override
144+
public void onStarted(EmbeddedDruidCluster cluster)
145+
{
146+
// Store internal IP for container-to-container communication.
147+
// Tests use getConsulHostForDruid()/getConsulPortForDruid() for containers
148+
// and getMappedPort() for embedded servers running on the host.
149+
consulHostForDruid = getContainer()
150+
.getContainerInfo()
151+
.getNetworkSettings()
152+
.getIpAddress();
153+
154+
// Use appropriate port based on security mode
155+
consulPortForDruid = securityMode == ConsulSecurityMode.PLAIN
156+
? CONSUL_HTTP_PORT
157+
: CONSUL_HTTPS_PORT;
158+
159+
// Only set type and prefix - host/port must be configured by tests
160+
// because embedded servers need localhost:mappedPort while containers
161+
// need internalIP:port
162+
cluster.addCommonProperty("druid.discovery.type", "consul");
163+
cluster.addCommonProperty("druid.discovery.consul.service.servicePrefix", "druid");
164+
}
165+
166+
@Override
167+
public void stop()
168+
{
169+
super.stop();
170+
// Clean up generated certificates
171+
if (certBundle != null) {
172+
certBundle.cleanup();
173+
certBundle = null;
174+
}
175+
}
176+
177+
/**
178+
* Host value that Druid containers should use when connecting to Consul.
179+
*/
180+
public String getConsulHostForDruid()
181+
{
182+
return consulHostForDruid;
183+
}
184+
185+
/**
186+
* TCP port that Druid containers should use when connecting to Consul.
187+
* This is the internal port (8500).
188+
*/
189+
public int getConsulPortForDruid()
190+
{
191+
return consulPortForDruid;
192+
}
193+
194+
/**
195+
* TCP port mapped on host for accessing Consul HTTP/HTTPS API.
196+
* Use this for embedded servers running on the host machine.
197+
*/
198+
public int getMappedPort()
199+
{
200+
ensureRunning();
201+
int internalPort = securityMode == ConsulSecurityMode.PLAIN
202+
? CONSUL_HTTP_PORT
203+
: CONSUL_HTTPS_PORT;
204+
return getContainer().getMappedPort(internalPort);
205+
}
206+
207+
/**
208+
* Builds a host-accessible URI for the Consul HTTP/HTTPS API.
209+
*/
210+
public URI getHttpUri(String pathAndQuery)
211+
{
212+
ensureRunning();
213+
final String normalizedPath = pathAndQuery.startsWith("/") ? pathAndQuery : "/" + pathAndQuery;
214+
final String scheme = securityMode == ConsulSecurityMode.PLAIN ? "http" : "https";
215+
final int internalPort = securityMode == ConsulSecurityMode.PLAIN
216+
? CONSUL_HTTP_PORT
217+
: CONSUL_HTTPS_PORT;
218+
219+
return URI.create(
220+
StringUtils.format(
221+
"%s://%s:%d%s",
222+
scheme,
223+
getContainer().getHost(),
224+
getContainer().getMappedPort(internalPort),
225+
normalizedPath
226+
)
227+
);
228+
}
229+
230+
/**
231+
* Returns the security mode of this Consul cluster.
232+
*/
233+
public ConsulSecurityMode getSecurityMode()
234+
{
235+
return securityMode;
236+
}
237+
238+
/**
239+
* Returns the certificate bundle for TLS/mTLS modes.
240+
* Returns null for PLAIN mode.
241+
*/
242+
@Nullable
243+
public TLSCertificateBundle getCertificateBundle()
244+
{
245+
return certBundle;
246+
}
247+
248+
/**
249+
* Returns the path to the truststore for TLS/mTLS modes.
250+
* Returns null for PLAIN mode.
251+
*/
252+
@Nullable
253+
public String getTrustStorePath()
254+
{
255+
return certBundle != null ? certBundle.getTrustStorePath() : null;
256+
}
257+
258+
/**
259+
* Returns the path to the keystore for mTLS mode.
260+
* Returns null for PLAIN and TLS modes.
261+
*/
262+
@Nullable
263+
public String getKeyStorePath()
264+
{
265+
return certBundle != null ? certBundle.getKeyStorePath() : null;
266+
}
267+
268+
/**
269+
* Returns the password for keystores/truststores.
270+
* Always returns "changeit" for test certificates.
271+
*/
272+
public String getStorePassword()
273+
{
274+
return "changeit";
275+
}
276+
277+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.testing.embedded.consul;
21+
22+
/**
23+
* Security modes for Consul connections in integration tests.
24+
*/
25+
public enum ConsulSecurityMode
26+
{
27+
/**
28+
* Plain HTTP connection with no encryption.
29+
* Uses port 8500.
30+
*/
31+
PLAIN,
32+
33+
/**
34+
* HTTPS with server-side TLS only.
35+
* Server certificate is validated using a truststore, but no client certificate is required.
36+
* Uses port 8501.
37+
*/
38+
TLS,
39+
40+
/**
41+
* HTTPS with mutual TLS (mTLS).
42+
* Both server and client certificates are validated.
43+
* Requires both truststore (for server cert) and keystore (for client cert).
44+
* Uses port 8501.
45+
*/
46+
MTLS
47+
}

0 commit comments

Comments
 (0)