Skip to content

Commit edb663d

Browse files
authored
Merge pull request kroxylicious#2185 from robobario/sni-clustername-replacement-support
Enable users to inject virtual cluster name into SNI bootstrap address and advertised broker address pattern
2 parents ef582ff + 0ef47ec commit edb663d

File tree

19 files changed

+366
-111
lines changed

19 files changed

+366
-111
lines changed

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ Format `<github issue/pr number>: <short description>`.
77

88
## SNAPSHOT
99

10-
10+
* [#2185](https://github.com/kroxylicious/kroxylicious/pull/2185) Add $(virtualClusterName) placeholders to SNI bootstrap address and advertised broker address pattern
1111
* [#2198](https://github.com/kroxylicious/kroxylicious/pull/2198) Require VirtualCluster name to be a valid DNS label
1212
* [#2188](https://github.com/kroxylicious/kroxylicious/pull/2188) Delete deprecated bootstrapAddressPattern SNI gateway property
1313
* [#2186](https://github.com/kroxylicious/kroxylicious/pull/2186) Remove deprecated FilterFactory implementations
@@ -29,6 +29,10 @@ Format `<github issue/pr number>: <short description>`.
2929
* Remove the deprecated configuration property `brokerAddressPattern` from `sniHostIdentifiesNode` gateway configuration. Use
3030
`advertisedBrokerAddressPattern` instead.
3131
* VirtualCluster names are now restricted to a maximum length of 63, and must match pattern `^[a-z0-9]([-a-z0-9]*[a-z0-9])?$` (case insensitive).
32+
* `virtualClusters[].gateways[].sniHostIdentifiesNode.bootstrapAddress` can now contain an optional replacement token `$(virtualClusterName)`.
33+
When this is present, it will be replaced with the name of that gateway's VirtualCluster.
34+
* `virtualClusters[].gateways[].sniHostIdentifiesNode.advertisedBrokerAddressPattern` can now contain an optional replacement token `$(virtualClusterName)`.
35+
When this is present, it will be replaced with the name of that gateway's VirtualCluster.
3236

3337
## 0.12.0
3438

docs/modules/configuring/con-configuring-vc-gateways.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,8 @@ The gateway exposes the following broker addresses:
225225

226226
|===
227227

228+
Both the `advertisedBrokerAddressPattern` and `bootstrapAddress` configuration parameters accept the `$(virtualClusterName)` replacement token,
229+
which is optional. If included, `$(virtualClusterName)` is replaced with the name of the gateway's virtual cluster.
228230

229231
.Example SNI Host Identifies Node configuration with customized advertised port
230232
[source,yaml]

kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/tester/KroxyliciousConfigUtils.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,9 @@ static String bootstrapServersFor(String virtualCluster, Configuration config, S
116116
return c.getBootstrapAddress().toString();
117117
}
118118
else if (provider.config() instanceof SniRoutingClusterNetworkAddressConfigProviderConfig c) {
119-
return new HostPort(c.getBootstrapAddress().host(), c.getAdvertisedPort()).toString();
119+
String bootstrapAddressPattern = c.getBootstrapAddressPattern();
120+
String replaced = bootstrapAddressPattern.replace("$(virtualClusterName)", virtualCluster);
121+
return new HostPort(replaced, c.getAdvertisedPort()).toString();
120122
}
121123
else {
122124
throw new IllegalStateException("I don't know how to handle ClusterEndpointConfigProvider type:" + provider.type());
@@ -141,7 +143,7 @@ public static VirtualClusterGatewayBuilder defaultPortIdentifiesNodeGatewayBuild
141143
public static VirtualClusterGatewayBuilder defaultSniHostIdentifiesNodeGatewayBuilder(HostPort bootstrapAddress, String advertisedBrokerAddressPattern) {
142144
return defaultGatewayBuilder()
143145
.withNewSniHostIdentifiesNode()
144-
.withBootstrapAddress(bootstrapAddress)
146+
.withBootstrapAddress(bootstrapAddress.toString())
145147
.withAdvertisedBrokerAddressPattern(advertisedBrokerAddressPattern)
146148
.endSniHostIdentifiesNode();
147149
}
@@ -183,7 +185,7 @@ else if (providerDefinition.config() instanceof RangeAwarePortPerNodeClusterNetw
183185
strategy, null, cluster.tls()));
184186
}
185187
else if (providerDefinition.config() instanceof SniRoutingClusterNetworkAddressConfigProviderConfig sc) {
186-
var strategy = new SniHostIdentifiesNodeIdentificationStrategy(sc.getBootstrapAddress(),
188+
var strategy = new SniHostIdentifiesNodeIdentificationStrategy(new HostPort(sc.getBootstrapAddressPattern(), sc.getAdvertisedPort()).toString(),
187189
sc.getAdvertisedBrokerAddressPattern());
188190

189191
return Stream.of(new VirtualClusterGateway(DEFAULT_GATEWAY_NAME,

kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/ExpositionIT.java

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import io.kroxylicious.net.PassthroughProxy;
5151
import io.kroxylicious.proxy.config.ConfigurationBuilder;
5252
import io.kroxylicious.proxy.config.NamedRange;
53+
import io.kroxylicious.proxy.config.SniHostIdentifiesNodeIdentificationStrategy;
5354
import io.kroxylicious.proxy.config.VirtualClusterBuilder;
5455
import io.kroxylicious.proxy.config.VirtualClusterGateway;
5556
import io.kroxylicious.proxy.config.VirtualClusterGatewayBuilder;
@@ -66,6 +67,7 @@
6667

6768
import edu.umd.cs.findbugs.annotations.NonNull;
6869

70+
import static io.kroxylicious.net.IntegrationTestInetAddressResolverProvider.TEST_DOMAIN;
6971
import static io.kroxylicious.test.tester.KroxyliciousConfigUtils.DEFAULT_GATEWAY_NAME;
7072
import static io.kroxylicious.test.tester.KroxyliciousConfigUtils.defaultGatewayBuilder;
7173
import static io.kroxylicious.test.tester.KroxyliciousConfigUtils.defaultPortIdentifiesNodeGatewayBuilder;
@@ -94,6 +96,7 @@ class ExpositionIT extends BaseIT {
9496

9597
public static final HostPort SNI_BOOTSTRAP = new HostPort("bootstrap." + SNI_BASE_ADDRESS, 9192);
9698
public static final String SNI_BROKER_ADDRESS_PATTERN = "broker-$(nodeId)." + SNI_BASE_ADDRESS;
99+
public static final String SNI_BROKER_ADDRESS_PATTERN_WITH_CLUSTER_NAME = "$(virtualClusterName)-broker-$(nodeId)." + SNI_BASE_ADDRESS;
97100
public static final String SASL_USER = "user";
98101
public static final String SASL_PASSWORD = "password";
99102

@@ -299,6 +302,58 @@ void exposesTwoSeparateUpstreamClustersUsingSniRouting(KafkaCluster cluster) thr
299302
}
300303
}
301304

305+
@Test
306+
void exposeSingleUpstreamClustersWithTwoVirtualClustersUsingIdenticalSNIGatewayConfigurationWithClusterNameReplacement(KafkaCluster cluster) throws Exception {
307+
var keystoreTrustStoreList = new ArrayList<KeystoreTrustStorePair>();
308+
var virtualClusterCommonNamePattern = IntegrationTestInetAddressResolverProvider.generateFullyQualifiedDomainName(".$(virtualClusterName)");
309+
var virtualClusterBootstrapPattern = "bootstrap" + virtualClusterCommonNamePattern;
310+
var virtualClusterBrokerAddressPattern = "broker-$(nodeId)" + virtualClusterCommonNamePattern;
311+
VirtualClusterGatewayBuilder sniBuilder = defaultSniHostIdentifiesNodeGatewayBuilder(virtualClusterBootstrapPattern + ":9192",
312+
virtualClusterBrokerAddressPattern);
313+
314+
var builder = new ConfigurationBuilder();
315+
316+
int numberOfVirtualClusters = 2;
317+
for (int i = 0; i < numberOfVirtualClusters; i++) {
318+
String clusterName = "cluster" + i;
319+
String withClusterNameReplaced = virtualClusterCommonNamePattern.replace("$(virtualClusterName)", clusterName);
320+
String domain = "*" + withClusterNameReplaced;
321+
var keystoreTrustStorePair = buildKeystoreTrustStorePair(domain);
322+
keystoreTrustStoreList.add(keystoreTrustStorePair);
323+
var virtualCluster = KroxyliciousConfigUtils.baseVirtualClusterBuilder(cluster, clusterName)
324+
.addToGateways(sniBuilder
325+
.withNewTls()
326+
.withNewKeyStoreKey()
327+
.withStoreFile(keystoreTrustStorePair.brokerKeyStore())
328+
.withNewInlinePasswordStoreProvider(keystoreTrustStorePair.password())
329+
.endKeyStoreKey()
330+
.endTls()
331+
.build())
332+
.withLogNetwork(true)
333+
.withLogFrames(true)
334+
.build();
335+
builder.addToVirtualClusters(virtualCluster);
336+
}
337+
try (var tester = kroxyliciousTester(builder)) {
338+
for (int i = 0; i < numberOfVirtualClusters; i++) {
339+
var trust = keystoreTrustStoreList.get(i);
340+
String clusterName = "cluster" + i;
341+
try (var admin = tester.admin(clusterName, Map.of(
342+
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name,
343+
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trust.clientTrustStore(),
344+
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trust.password()))) {
345+
// do some work to ensure virtual cluster is operational
346+
createTopic(admin, TOPIC + i, 1);
347+
Set<HostPort> nodeAdvertisedAddresses = admin.describeCluster().nodes().get(5, TimeUnit.SECONDS).stream()
348+
.map(node -> new HostPort(node.host(), node.port()))
349+
.collect(Collectors.toSet());
350+
String expectedAdvertisedBrokerHost = "broker-0.%s%s".formatted(clusterName, TEST_DOMAIN);
351+
assertThat(nodeAdvertisedAddresses).singleElement().isEqualTo(new HostPort(expectedAdvertisedBrokerHost, 9192));
352+
}
353+
}
354+
}
355+
}
356+
302357
@Test
303358
void exposesSingleUpstreamClustersUsingMultipleSniGateways(KafkaCluster cluster) throws Exception {
304359
var keystoreTrustStoreList = new ArrayList<KeystoreTrustStorePair>();
@@ -318,7 +373,7 @@ void exposesSingleUpstreamClustersUsingMultipleSniGateways(KafkaCluster cluster)
318373
.addToGateways(new VirtualClusterGatewayBuilder()
319374
.withName("gateway-" + i)
320375
.withNewSniHostIdentifiesNode()
321-
.withBootstrapAddress(new HostPort(virtualClusterFQDN, 9192))
376+
.withBootstrapAddress(new HostPort(virtualClusterFQDN, 9192).toString())
322377
.withAdvertisedBrokerAddressPattern(virtualClusterBrokerAddressPattern.formatted(i))
323378
.endSniHostIdentifiesNode()
324379
.withNewTls()
@@ -457,6 +512,19 @@ private static Stream<Arguments> virtualClusterConfigurations() throws Exception
457512
.endKeyStoreKey()
458513
.endTls()
459514
.build()),
515+
Map.of(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name,
516+
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sniKeystoreTrustStorePair.clientTrustStore(),
517+
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sniKeystoreTrustStorePair.password())),
518+
argumentSet("SniHostIdentifiesNodeWithClusterNameReplacement",
519+
new VirtualClusterBuilder()
520+
.addToGateways(defaultSniHostIdentifiesNodeGatewayBuilder(SNI_BOOTSTRAP.toString(), SNI_BROKER_ADDRESS_PATTERN_WITH_CLUSTER_NAME)
521+
.withNewTls()
522+
.withNewKeyStoreKey()
523+
.withStoreFile(sniKeystoreTrustStorePair.brokerKeyStore())
524+
.withNewInlinePasswordStoreProvider(sniKeystoreTrustStorePair.password())
525+
.endKeyStoreKey()
526+
.endTls()
527+
.build()),
460528
Map.of(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name,
461529
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sniKeystoreTrustStorePair.clientTrustStore(),
462530
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sniKeystoreTrustStorePair.password())));
@@ -567,8 +635,12 @@ void connectToDiscoveryAddress(VirtualClusterBuilder virtualClusterBuilder,
567635
.addToVirtualClusters(virtualClusterBuilder.build());
568636

569637
final HostPort discoveryBrokerAddressToProbe;
570-
if (virtualClusterBuilder.buildFirstGateway().sniHostIdentifiesNode() != null) {
571-
discoveryBrokerAddressToProbe = new HostPort(SNI_BROKER_ADDRESS_PATTERN.replace("$(nodeId)", Integer.toString(cluster.getNumOfBrokers())),
638+
SniHostIdentifiesNodeIdentificationStrategy sniStrategy = virtualClusterBuilder.buildFirstGateway().sniHostIdentifiesNode();
639+
if (sniStrategy != null) {
640+
String pattern = sniStrategy.advertisedBrokerAddressPattern();
641+
String replacedNodeId = pattern.replace("$(nodeId)", Integer.toString(cluster.getNumOfBrokers()));
642+
String replacedVirtualClusterName = replacedNodeId.replace("$(virtualClusterName)", "demo");
643+
discoveryBrokerAddressToProbe = new HostPort(replacedVirtualClusterName,
572644
SNI_BOOTSTRAP.port());
573645
}
574646
else {

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/config/Configuration.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -172,21 +172,21 @@ private static VirtualClusterModel toVirtualClusterModel(@NonNull VirtualCluster
172172
private static void addGateways(@NonNull PluginFactoryRegistry pfr, List<VirtualClusterGateway> gateways, VirtualClusterModel virtualClusterModel) {
173173
gateways.forEach(gateway -> {
174174
var config = gateway.clusterNetworkAddressConfigProvider().config();
175-
var networkAddress = createDeprecatedProvider(config);
175+
var networkAddress = createDeprecatedProvider(config, virtualClusterModel);
176176
var tls = gateway.tls();
177177
virtualClusterModel.addGateway(gateway.name(), networkAddress, tls);
178178
});
179179
}
180180

181181
@NonNull
182182
@SuppressWarnings("removal")
183-
private static ClusterNetworkAddressConfigProvider createDeprecatedProvider(Object config) {
183+
private static ClusterNetworkAddressConfigProvider createDeprecatedProvider(Object config, VirtualClusterModel virtualClusterModel) {
184184
// We avoid using the buildNetworkAddressProviderService in order to avoid the deprecation notice it will produce.
185185
if (config instanceof SniRoutingClusterNetworkAddressConfigProviderConfig sniConfig) {
186-
return new SniRoutingClusterNetworkAddressConfigProvider().build(sniConfig);
186+
return new SniRoutingClusterNetworkAddressConfigProvider().build(sniConfig, virtualClusterModel);
187187
}
188188
else if (config instanceof RangeAwarePortPerNodeClusterNetworkAddressConfigProviderConfig rangeConfig) {
189-
return new RangeAwarePortPerNodeClusterNetworkAddressConfigProvider().build(rangeConfig);
189+
return new RangeAwarePortPerNodeClusterNetworkAddressConfigProvider().build(rangeConfig, virtualClusterModel);
190190
}
191191
else {
192192
throw new IllegalStateException("unexpected provider config type : " + config.getClass().getName());
@@ -198,15 +198,16 @@ private static void addGatewayFromDeprecatedConfig(@NonNull VirtualCluster virtu
198198
VirtualClusterModel virtualClusterModel) {
199199
// VirtualCluster config validation should mean this we always have a provider if we reach this point.
200200
Objects.requireNonNull(virtualCluster.clusterNetworkAddressConfigProvider(), "provider unexpectedly null");
201-
var networkAddress = buildNetworkAddressProviderService(virtualCluster.clusterNetworkAddressConfigProvider(), pfr);
201+
var networkAddress = buildNetworkAddressProviderService(virtualCluster.clusterNetworkAddressConfigProvider(), pfr, virtualClusterModel);
202202
virtualClusterModel.addGateway(VirtualCluster.DEFAULT_GATEWAY_NAME, networkAddress, virtualCluster.tls());
203203
}
204204

205205
private static ClusterNetworkAddressConfigProvider buildNetworkAddressProviderService(@NonNull ClusterNetworkAddressConfigProviderDefinition definition,
206-
@NonNull PluginFactoryRegistry registry) {
206+
@NonNull PluginFactoryRegistry registry,
207+
VirtualClusterModel virtualClusterModel) {
207208
var provider = registry.pluginFactory(ClusterNetworkAddressConfigProviderService.class)
208209
.pluginInstance(definition.type());
209-
return provider.build(definition.config());
210+
return provider.build(definition.config(), virtualClusterModel);
210211
}
211212

212213
public List<MicrometerDefinition> getMicrometer() {

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/config/SniHostIdentifiesNodeIdentificationStrategy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
* must be resolvable and routable from the client's network). A port number can be included. If the port number is not included, the port number assigned
2525
* to the bootstrapAddress is used. One pattern is supported: {@code $(nodeId)} which interpolates the node id into the address (required).
2626
*/
27-
public record SniHostIdentifiesNodeIdentificationStrategy(@NonNull @JsonProperty(required = true) HostPort bootstrapAddress,
27+
public record SniHostIdentifiesNodeIdentificationStrategy(@NonNull @JsonProperty(required = true) String bootstrapAddress,
2828
@NonNull @JsonProperty(required = true) String advertisedBrokerAddressPattern)
2929
implements NodeIdentificationStrategy {
3030
public SniHostIdentifiesNodeIdentificationStrategy {

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/clusternetworkaddressconfigprovider/BrokerAddressPatternUtils.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,18 @@
1212
import java.util.Set;
1313
import java.util.TreeSet;
1414
import java.util.function.Consumer;
15+
import java.util.regex.Matcher;
1516
import java.util.regex.Pattern;
1617
import java.util.stream.Collectors;
1718

1819
import edu.umd.cs.findbugs.annotations.NonNull;
1920

2021
final class BrokerAddressPatternUtils {
21-
private static final String LITERAL_NODE_ID = "$(nodeId)";
22+
public static final String LITERAL_NODE_ID = "$(nodeId)";
23+
public static final String LITERAL_VIRTUAL_CLUSTER_NAME = "$(virtualClusterName)";
2224
private static final Pattern LITERAL_NODE_ID_PATTERN = Pattern.compile(Pattern.quote(LITERAL_NODE_ID));
2325
private static final Pattern PORT_SPECIFIER_RE = Pattern.compile("^(.*):([1-9]\\d*)$");
2426
private static final Pattern TOKEN_RE = Pattern.compile("(\\$\\([^)]+\\))");
25-
public static Set<String> EXPECTED_TOKEN_SET = Set.of(LITERAL_NODE_ID);
2627

2728
private BrokerAddressPatternUtils() {
2829
}
@@ -91,6 +92,10 @@ static PatternAndPort parse(@NonNull String address) {
9192
}
9293
}
9394

95+
public static String replaceVirtualClusterName(String toReplace, String clusterName) {
96+
return toReplace.replaceAll(Pattern.quote(LITERAL_VIRTUAL_CLUSTER_NAME), Matcher.quoteReplacement(clusterName));
97+
}
98+
9499
/**
95100
* Represents a pattern string and optional port.
96101
* @param addressPattern address pattern, must not be null
@@ -110,8 +115,11 @@ static String replaceLiteralNodeId(String brokerAddressPattern, int nodeId) {
110115
return LITERAL_NODE_ID_PATTERN.matcher(brokerAddressPattern).replaceAll(Integer.toString(nodeId));
111116
}
112117

113-
static Pattern createNodeIdCapturingRegex(String brokerAddressPattern) {
114-
var partsQuoted = Arrays.stream(brokerAddressPattern.split(Pattern.quote(LITERAL_NODE_ID))).map(Pattern::quote);
115-
return Pattern.compile(partsQuoted.collect(Collectors.joining("(\\d+)")), Pattern.CASE_INSENSITIVE);
118+
static String createNodeIdCapturingRegexPattern(String brokerAddressPattern, String virtualClusterName) {
119+
String clusterReplaced = replaceVirtualClusterName(brokerAddressPattern, virtualClusterName);
120+
// the -1 split limit produces an empty final item in the array if the node id replacement is at the end of the string
121+
var partsQuoted = Arrays.stream(clusterReplaced.split(Pattern.quote(LITERAL_NODE_ID), -1)).map(Pattern::quote);
122+
return partsQuoted.collect(Collectors.joining("(\\d+)"));
116123
}
124+
117125
}

0 commit comments

Comments
 (0)