Skip to content

Commit 9c5dfff

Browse files
authored
Add support for SSL authentication (certificate) (#1191)
* Add SchemaRegistry with SSL and certificate * Fix conditional to add certificate and option for application manually provides the client certificates to the *Handler * Renamed method from certificate to keystore to keep ubiquitous language * Add a option to disable ssl verify for development purposes allowing to use self signed CA authorities * Test for SSL Authentication * keep naming convention consistent with librdkafka * Fix bad merge * Documentation improved * Reverted unnecessary whitespace
1 parent b4e0681 commit 9c5dfff

File tree

12 files changed

+290
-7
lines changed

12 files changed

+290
-7
lines changed

src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
using System.Linq;
2424
using System;
2525
using System.Threading;
26+
using System.Security.Cryptography.X509Certificates;
2627
using Confluent.Kafka;
2728

2829

@@ -74,6 +75,11 @@ public class CachedSchemaRegistryClient : ISchemaRegistryClient, IDisposable
7475
/// </summary>
7576
public const int DefaultMaxCachedSchemas = 1000;
7677

78+
/// <summary>
79+
/// The default SSL server certificate verification for Schema Registry REST API calls.
80+
/// </summary>
81+
public const bool DefaultEnableSslCertificateVerification = true;
82+
7783
/// <summary>
7884
/// The default key subject name strategy.
7985
/// </summary>
@@ -201,13 +207,22 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf
201207
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthCredentialsSource &&
202208
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthUserInfo &&
203209
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryKeySubjectNameStrategy &&
204-
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryValueSubjectNameStrategy)
210+
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryValueSubjectNameStrategy &&
211+
property.Key != SchemaRegistryConfig.PropertyNames.SslCaLocation &&
212+
                    property.Key != SchemaRegistryConfig.PropertyNames.SslKeystoreLocation &&
213+
                    property.Key != SchemaRegistryConfig.PropertyNames.SslKeystorePassword &&
214+
property.Key != SchemaRegistryConfig.PropertyNames.EnableSslCertificateVerification)
205215
{
206216
throw new ArgumentException($"Unknown configuration parameter {property.Key}");
207217
}
208218
}
209219

210-
this.restService = new RestService(schemaRegistryUris, timeoutMs, username, password);
220+
var sslVerificationMaybe = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.EnableSslCertificateVerification);
221+
bool sslVerify;
222+
try { sslVerify = sslVerificationMaybe.Value == null ? DefaultEnableSslCertificateVerification : bool.Parse(sslVerificationMaybe.Value); }
223+
catch (FormatException) { throw new ArgumentException($"Configured value for {SchemaRegistryConfig.PropertyNames.EnableSslCertificateVerification} must be a bool."); }
224+
225+
this.restService = new RestService(schemaRegistryUris, timeoutMs, username, password, SetSslConfig(config), sslVerify);
211226
}
212227

213228

@@ -233,6 +248,31 @@ private bool CleanCacheIfFull()
233248
return false;
234249
}
235250

251+
/// <summary>
252+
/// Add certificates for SSL handshake.
253+
/// </summary>
254+
/// <param name="config">
255+
/// Configuration properties.
256+
/// </param>
257+
        private List<X509Certificate2> SetSslConfig(IEnumerable<KeyValuePair<string, string>> config)
258+
        {
259+
            var certificates = new List<X509Certificate2>();
260+
261+
            var certificateLocation = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SslKeystoreLocation).Value ?? "";
262+
            var certificatePassword = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SslKeystorePassword).Value ?? "";
263+
            if (!String.IsNullOrEmpty(certificateLocation))
264+
            {
265+
                certificates.Add(new X509Certificate2(certificateLocation, certificatePassword));
266+
            }
267+
268+
            var caLocation = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SslCaLocation).Value ?? "";
269+
            if (!String.IsNullOrEmpty(caLocation))
270+
            {
271+
                certificates.Add(new X509Certificate2(caLocation));
272+
            }
273+
274+
            return certificates;
275+
        }
236276

237277
/// <inheritdoc/>
238278
public Task<int> GetSchemaIdAsync(string subject, string avroSchema)

src/Confluent.SchemaRegistry/Rest/RestService.cs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@
2323
using System.Net.Http;
2424
using System.Text;
2525
using System.Threading.Tasks;
26-
26+
using System.Security.Cryptography.X509Certificates;
2727

2828
namespace Confluent.SchemaRegistry
2929
{
3030
/// <remarks>
3131
/// It may be useful to expose this publicly, but this is not
32-
/// required by the Avro serializers, so we will keep this internal
32+
/// required by the Avro serializers, so we will keep this internal
3333
/// for now to minimize documentation / risk of API change etc.
3434
/// </remarks>
3535
internal class RestService : IRestService
@@ -53,7 +53,7 @@ internal class RestService : IRestService
5353
/// <summary>
5454
/// Initializes a new instance of the RestService class.
5555
/// </summary>
56-
public RestService(string schemaRegistryUrl, int timeoutMs, string username, string password)
56+
public RestService(string schemaRegistryUrl, int timeoutMs, string username, string password, List<X509Certificate2> certificates, bool enableSslCertificateVerification)
5757
{
5858
var authorizationHeader = username != null && password != null
5959
? new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}")))
@@ -64,7 +64,16 @@ public RestService(string schemaRegistryUrl, int timeoutMs, string username, str
6464
.Select(SanitizeUri)// need http or https - use http if not present.
6565
.Select(uri =>
6666
{
67-
var client = new HttpClient { BaseAddress = new Uri(uri, UriKind.Absolute), Timeout = TimeSpan.FromMilliseconds(timeoutMs) };
67+
HttpClient client;
68+
                    if (certificates.Count > 0)
69+
                    {
70+
                        client = new HttpClient(CreateHandler(certificates, enableSslCertificateVerification)) { BaseAddress = new Uri(uri, UriKind.Absolute), Timeout = TimeSpan.FromMilliseconds(timeoutMs) };
71+
                    }
72+
                    else
73+
                    {
74+
                        client = new HttpClient() { BaseAddress = new Uri(uri, UriKind.Absolute), Timeout = TimeSpan.FromMilliseconds(timeoutMs) };
75+
                    }
76+
6877
if (authorizationHeader != null) { client.DefaultRequestHeaders.Authorization = authorizationHeader; }
6978
return client;
7079
})
@@ -77,6 +86,20 @@ private static string SanitizeUri(string uri)
7786
return $"{sanitized.TrimEnd('/')}/";
7887
}
7988

89+
private static HttpClientHandler CreateHandler(List<X509Certificate2> certificates, bool enableSslCertificateVerification)
90+
{
91+
    var handler = new HttpClientHandler();
92+
handler.ClientCertificateOptions = ClientCertificateOption.Manual;
93+
94+
if (!enableSslCertificateVerification)
95+
{
96+
handler.ServerCertificateCustomValidationCallback = (httpRequestMessage, cert, certChain, policyErrors) => { return true; };
97+
}
98+
99+
    certificates.ForEach(=> handler.ClientCertificates.Add(c));
100+
    return handler;
101+
}
102+
80103
private RegisteredSchema SanitizeRegisteredSchema(RegisteredSchema schema)
81104
{
82105
if (schema.References == null)

src/Confluent.SchemaRegistry/SchemaRegistryConfig.cs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,27 @@ public static class PropertyNames
8080
/// </summary>
8181
[Obsolete("Subject name strategies should now be configured using the serializer's configuration. In the future, this configuration property will be removed from SchemaRegistryConfig")]
8282
public const string SchemaRegistryValueSubjectNameStrategy = "schema.registry.value.subject.name.strategy";
83+
84+
            /// <summary>
85+
            ///     File path to CA certificate(s) for verifying the schema registry's key. it will use system CA certs if not provided
86+
            /// </summary>
87+
            public const string SslCaLocation = "schema.registry.ssl.ca.location";
88+
89+
            /// <summary>
90+
            ///     SSL keystore (PKCS#12) location.
91+
            /// </summary>
92+
            public const string SslKeystoreLocation = "schema.registry.ssl.keystore.location";
93+
94+
            /// <summary>
95+
            ///     SSL keystore (PKCS#12) password.
96+
            /// </summary>
97+
            public const string SslKeystorePassword = "schema.registry.ssl.keystore.password";
98+
99+
            /// <summary>
100+
            ///     In scenarios of using a private and untrusted CA or in case of impossibility to add a private CA as trusted in system CA certs,
101+
            ///     it is possible to disable SSL verification but it only could be done in test/dev environments.
102+
            /// </summary>
103+
            public const string EnableSslCertificateVerification = "schema.registry.enable.ssl.certificate.verification";
83104
}
84105

85106
/// <summary>
@@ -127,6 +148,53 @@ public int? RequestTimeoutMs
127148
set { SetObject(SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs, value.ToString()); }
128149
}
129150

151+
/// <summary>
152+
        ///     File or directory path to CA certificate(s) for verifying the schema registry's key.
153+
        ///
154+
        ///     default: ''
155+
        ///     importance: low
156+
        /// </summary>
157+
        public string SslCaLocation
158+
        {
159+
            get { return Get(SchemaRegistryConfig.PropertyNames.SslCaLocation); }
160+
            set { SetObject(SchemaRegistryConfig.PropertyNames.SslCaLocation, value.ToString()); }
161+
        }
162+
163+
        /// <summary>
164+
        ///     Path to client's keystore (PKCS#12) used for authentication.
165+
        ///
166+
        ///     default: ''
167+
        ///     importance: low
168+
        /// </summary>
169+
        public string SslKeystoreLocation
170+
        {
171+
            get { return Get(SchemaRegistryConfig.PropertyNames.SslKeystoreLocation); }
172+
            set { SetObject(SchemaRegistryConfig.PropertyNames.SslKeystoreLocation, value.ToString()); }
173+
        }
174+
175+
        /// <summary>
176+
        ///     Client's keystore (PKCS#12) password.
177+
        ///
178+
        ///     default: ''
179+
        ///     importance: low
180+
        /// </summary>
181+
        public string SslKeystorePassword
182+
        {
183+
            get { return Get(SchemaRegistryConfig.PropertyNames.SslKeystorePassword); }
184+
            set { SetObject(SchemaRegistryConfig.PropertyNames.SslKeystorePassword, value.ToString()); }
185+
        }
186+
187+
        /// <summary>
188+
        ///     Enable/Disable SSL server certificate verification. Only use in contained test/dev environments.
189+
        ///
190+
        ///     default: ''
191+
        ///     importance: low
192+
        /// </summary>
193+
        public bool? EnableSslCertificateVerification
194+
        {
195+
            get { return GetBool(SchemaRegistryConfig.PropertyNames.EnableSslCertificateVerification); }
196+
            set { SetObject(SchemaRegistryConfig.PropertyNames.EnableSslCertificateVerification, value); }
197+
        }
130198

131199
/// <summary>
132200
/// Specifies the maximum number of schemas CachedSchemaRegistryClient

test/Confluent.SchemaRegistry.IntegrationTests/Confluent.SchemaRegistry.IntegrationTests.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
<None Update="schema.registry.parameters.json">
1313
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
1414
</None>
15+
<None Update="secrets\**">
16+
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
17+
</None>
1518
</ItemGroup>
1619

1720
<ItemGroup>

test/Confluent.SchemaRegistry.IntegrationTests/Tests/Config.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@ public class Config
44
{
55
public string Server { get; set; }
66
public string ServerWithAuth { get; set; }
7+
public string ServerWithSsl { get; set; }
78
public string Username { get; set; }
89
public string Password { get; set; }
10+
public string KeystoreLocation { get; set; }
11+
public string KeystorePassword { get; set; }
12+
public string CaLocation { get; set; }
13+
public string EnableSslCertificateVerification { get; set; }
914
}
1015
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright 20 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
using System.Collections.Generic;
19+
using System.Net.Http;
20+
using Xunit;
21+
using Confluent.Kafka;
22+
23+
24+
namespace Confluent.SchemaRegistry.IntegrationTests
25+
{
26+
public static partial class Tests
27+
{
28+
[Theory, MemberData(nameof(SchemaRegistryParameters))]
29+
public static void SslAuth(Config config)
30+
{
31+
var testSchema1 =
32+
"{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"Confluent.Kafka.Examples.AvroSpecific" +
33+
"\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"i" +
34+
"nt\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}";
35+
36+
// 1. valid configuration cases
37+
38+
// 1.1. using SSL valid certificate.
39+
var conf = new SchemaRegistryConfig
40+
{
41+
Url = config.ServerWithSsl,
42+
SslKeystoreLocation = config.KeystoreLocation,
43+
SslKeystorePassword = config.KeystorePassword,
44+
SslCaLocation = config.CaLocation,
45+
EnableSslCertificateVerification = bool.Parse(config.EnableSslCertificateVerification),
46+
};
47+
48+
// some sanity checking of strongly typed config property name mappings.
49+
Assert.Equal(config.ServerWithSsl, conf.Get("schema.registry.url"));
50+
51+
using (var sr = new CachedSchemaRegistryClient(conf))
52+
{
53+
var topicName = Guid.NewGuid().ToString();
54+
var subject = SubjectNameStrategy.Topic.ToDelegate()(new SerializationContext(MessageComponentType.Value, topicName), null);
55+
var id = sr.RegisterSchemaAsync(subject, testSchema1).Result;
56+
var schema = sr.GetLatestSchemaAsync(subject).Result;
57+
Assert.Equal(schema.Id, id);
58+
}
59+
60+
// try to connect with invalid SSL config. shouldn't work.
61+
Assert.Throws<HttpRequestException>(() =>
62+
{
63+
var sr = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = config.ServerWithSsl });
64+
var topicName = Guid.NewGuid().ToString();
65+
var subject = SubjectNameStrategy.Topic.ConstructValueSubjectName(topicName, null);
66+
try
67+
{
68+
var id = sr.RegisterSchemaAsync(subject, testSchema1).Result;
69+
}
70+
catch (Exception e)
71+
{
72+
throw e.InnerException;
73+
}
74+
});
75+
76+
}
77+
}
78+
}

test/Confluent.SchemaRegistry.IntegrationTests/Tests/Tests.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,13 @@ public static IEnumerable<object[]> SchemaRegistryParameters()
4747
var config = new Config();
4848
config.Server = json["server"].ToString();
4949
config.ServerWithAuth = json["server_with_auth"].ToString();
50+
config.ServerWithSsl = json["server_with_ssl"].ToString();
5051
config.Username = json["username"].ToString();
5152
config.Password = json["password"].ToString();
53+
config.KeystoreLocation = json["keystore_location"].ToString();
54+
config.KeystorePassword = json["keystore_password"].ToString();
55+
config.CaLocation = json["ca_location"].ToString();
56+
config.EnableSslCertificateVerification = json["enable_ssl_certificate_verification"].ToString();
5257
schemaRegistryParameters = new List<object[]> { new object[] { config } };
5358
}
5459
return schemaRegistryParameters;

test/Confluent.SchemaRegistry.IntegrationTests/docker-compose.yaml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ services:
88
image: confluentinc/cp-zookeeper
99
environment:
1010
ZOOKEEPER_CLIENT_PORT: 2181
11+
zookeeper_3:
12+
image: confluentinc/cp-zookeeper
13+
environment:
14+
ZOOKEEPER_CLIENT_PORT: 2181
1115
kafka:
1216
image: confluentinc/cp-kafka
1317
depends_on:
@@ -28,6 +32,16 @@ services:
2832
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
2933
KAFKA_ZOOKEEPER_CONNECT: zookeeper_2:2181
3034
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
35+
kafka_3:
36+
image: confluentinc/cp-kafka
37+
depends_on:
38+
- zookeeper_3
39+
environment:
40+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT, PLAINTEXT_HOST:PLAINTEXT
41+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka_3:9092, PLAINTEXT_HOST://localhost:29092
42+
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
43+
KAFKA_ZOOKEEPER_CONNECT: zookeeper_3:2181
44+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
3145
schema-registry:
3246
image: confluentinc/cp-schema-registry
3347
depends_on:
@@ -58,3 +72,24 @@ services:
5872
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka_2:9092
5973
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
6074
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper_2:2181
75+
schema-registry_3:
76+
image: confluentinc/cp-schema-registry
77+
depends_on:
78+
- zookeeper_3
79+
- kafka_3
80+
ports:
81+
- 8083:8083
82+
environment:
83+
SCHEMA_REGISTRY_HOST_NAME: schema-registry
84+
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka_3:9092
85+
SCHEMA_REGISTRY_LISTENERS: https://0.0.0.0:8083
86+
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper_3:2181
87+
SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION: /etc/schema-registry/secrets/schema-registry.keystore.jks
88+
SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD: cnf123
89+
SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION: /etc/schema-registry/secrets/schema-registry.truststore.jks
90+
SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD: cnf123
91+
SCHEMA_REGISTRY_SSL_KEY_PASSWORD: cnf123
92+
SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: https
93+
SCHEMA_REGISTRY_SSL_CLIENT_AUTH: 'true'
94+
volumes:
95+
- ./secrets:/etc/schema-registry/secrets

0 commit comments

Comments
 (0)