Skip to content

Commit 77d4bb1

Browse files
committed
Merge branch 'ussamoo-feature/#1126'
2 parents fd7c6d7 + 6e5471a commit 77d4bb1

File tree

7 files changed

+105
-72
lines changed

7 files changed

+105
-72
lines changed

docs/features/kubernetes.rst

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
Kubernetes
22
==============
33

4-
This feature was requested as part of `Issue 345 <https://github.com/ThreeMammals/Ocelot/issues/345>`_ . to add support for kubernetes's service discovery provider.
4+
This feature was requested as part of `Issue 345 <https://github.com/ThreeMammals/Ocelot/issues/345>`_ . to add support for kubernetes's provider.
5+
6+
Ocelot will call the k8s endpoints API in a given namespace to get all of the endpoints for a pod and then load balance across them. Ocelot used to use the services api to send requests to the k8s service but this was changed in `PR 1134 <https://github.com/ThreeMammals/Ocelot/pull/1134>`_ because the service did not load balance as expected.
57

68
The first thing you need to do is install the NuGet package that provides kubernetes support in Ocelot.
79

@@ -23,7 +25,7 @@ If you have services deployed in kubernetes you will normally use the naming ser
2325
}
2426
2527
You can replicate a Permissive. Using RBAC role bindings.
26-
`Permissive RBAC Permissions <https://kubernetes.io/docs/reference/access-authn-authz/rbac/#permissive-rbac-permissions>`_, k8s api server and token will read from pod .
28+
`Permissive RBAC Permissions <https://kubernetes.io/docs/reference/access-authn-authz/rbac/#permissive-rbac-permissions>`_, k8s api server and token will read from pod.
2729

2830
.. code-block::bash
2931
kubectl create clusterrolebinding permissive-binding --clusterrole=cluster-admin --user=admin --user=kubelet --group=system:serviceaccounts
@@ -76,7 +78,7 @@ The polling interval is in milliseconds and tells Ocelot how often to call kuber
7678
Please note there are tradeoffs here. If you poll kubernetes it is possible Ocelot will not know if a service is down depending on your polling interval and you might get more errors than if you get the latest services per request. This really depends on how volatile your services are. I doubt it will matter for most people and polling may give a tiny performance improvement over calling kubernetes per request.
7779
There is no way for Ocelot to work these out for you.
7880

79-
If your downstream service resides in a different namespace you can override the global setting at the ReRoute level by specifying a ServiceNamespace
81+
If your downstream service resides in a different namespace you can override the global setting at the ReRoute level by specifying a ServiceNamespace.
8082

8183

8284
.. code-block:: json
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
using HTTPlease;
2+
using KubeClient;
3+
using KubeClient.Models;
4+
using KubeClient.ResourceClients;
5+
using System;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
9+
namespace Ocelot.Provider.Kubernetes.KubeApiClientExtensions
10+
{
11+
public class EndPointClientV1 : KubeResourceClient
12+
{
13+
private readonly HttpRequest _collection = KubeRequest.Create("api/v1/namespaces/{Namespace}/endpoints/{ServiceName}");
14+
15+
public EndPointClientV1(IKubeApiClient client) : base(client)
16+
{
17+
}
18+
19+
public async Task<EndpointsV1> Get(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default)
20+
{
21+
if (string.IsNullOrEmpty(serviceName)) throw new ArgumentNullException(nameof(serviceName));
22+
23+
var response = await Http.GetAsync(
24+
_collection.WithTemplateParameters(new
25+
{
26+
Namespace = kubeNamespace ?? KubeClient.DefaultNamespace,
27+
ServiceName = serviceName
28+
}),
29+
cancellationToken
30+
);
31+
32+
if (response.IsSuccessStatusCode)
33+
return await response.ReadContentAsAsync<EndpointsV1>();
34+
35+
return null;
36+
}
37+
}
38+
}

src/Ocelot.Provider.Kubernetes/KubeProvider.cs

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,57 +6,52 @@
66
using System.Collections.Generic;
77
using System.Linq;
88
using System.Threading.Tasks;
9+
using Ocelot.Provider.Kubernetes.KubeApiClientExtensions;
910

1011
namespace Ocelot.Provider.Kubernetes
1112
{
12-
public class Kube : IServiceDiscoveryProvider
13+
public class KubernetesServiceDiscoveryProvider : IServiceDiscoveryProvider
1314
{
14-
private KubeRegistryConfiguration kubeRegistryConfiguration;
15-
private IOcelotLogger logger;
16-
private IKubeApiClient kubeApi;
15+
private readonly KubeRegistryConfiguration _kubeRegistryConfiguration;
16+
private readonly IOcelotLogger _logger;
17+
private readonly IKubeApiClient _kubeApi;
1718

18-
public Kube(KubeRegistryConfiguration kubeRegistryConfiguration, IOcelotLoggerFactory factory, IKubeApiClient kubeApi)
19+
public KubernetesServiceDiscoveryProvider(KubeRegistryConfiguration kubeRegistryConfiguration, IOcelotLoggerFactory factory, IKubeApiClient kubeApi)
1920
{
20-
this.kubeRegistryConfiguration = kubeRegistryConfiguration;
21-
this.logger = factory.CreateLogger<Kube>();
22-
this.kubeApi = kubeApi;
21+
_kubeRegistryConfiguration = kubeRegistryConfiguration;
22+
_logger = factory.CreateLogger<KubernetesServiceDiscoveryProvider>();
23+
_kubeApi = kubeApi;
2324
}
2425

25-
2626
public async Task<List<Service>> Get()
2727
{
28-
var service = await kubeApi.ServicesV1().Get(kubeRegistryConfiguration.KeyOfServiceInK8s, kubeRegistryConfiguration.KubeNamespace);
28+
var endpoint = await _kubeApi
29+
.ResourceClient(client => new EndPointClientV1(client))
30+
.Get(_kubeRegistryConfiguration.KeyOfServiceInK8s, _kubeRegistryConfiguration.KubeNamespace);
31+
2932
var services = new List<Service>();
30-
if (IsValid(service))
33+
if (endpoint != null && endpoint.Subsets.Any())
3134
{
32-
services.Add(BuildService(service));
35+
services.AddRange(BuildServices(endpoint));
3336
}
3437
else
3538
{
36-
logger.LogWarning($"namespace:{kubeRegistryConfiguration.KubeNamespace }service:{kubeRegistryConfiguration.KeyOfServiceInK8s} Unable to use ,it is invalid. Address must contain host only e.g. localhost and port must be greater than 0");
39+
_logger.LogWarning($"namespace:{_kubeRegistryConfiguration.KubeNamespace }service:{_kubeRegistryConfiguration.KeyOfServiceInK8s} Unable to use ,it is invalid. Address must contain host only e.g. localhost and port must be greater than 0");
3740
}
3841
return services;
3942
}
4043

41-
private bool IsValid(ServiceV1 service)
44+
private List<Service> BuildServices(EndpointsV1 endpoint)
4245
{
43-
if (string.IsNullOrEmpty(service.Spec.ClusterIP) || service.Spec.Ports.Count <= 0)
46+
var services = new List<Service>();
47+
48+
foreach (var subset in endpoint.Subsets)
4449
{
45-
return false;
50+
services.AddRange(subset.Addresses.Select(address => new Service(endpoint.Metadata.Name,
51+
new ServiceHostAndPort(address.Ip, subset.Ports.First().Port),
52+
endpoint.Metadata.Uid, string.Empty, Enumerable.Empty<string>())));
4653
}
47-
48-
return true;
49-
}
50-
51-
private Service BuildService(ServiceV1 serviceEntry)
52-
{
53-
var servicePort = serviceEntry.Spec.Ports.FirstOrDefault();
54-
return new Service(
55-
serviceEntry.Metadata.Name,
56-
new ServiceHostAndPort(serviceEntry.Spec.ClusterIP, servicePort.Port),
57-
serviceEntry.Metadata.Uid,
58-
string.Empty,
59-
Enumerable.Empty<string>());
54+
return services;
6055
}
6156
}
6257
}

src/Ocelot.Provider.Kubernetes/KubernetesProviderFactory.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,24 @@ public static class KubernetesProviderFactory
1212
public static ServiceDiscoveryFinderDelegate Get = (provider, config, reRoute) =>
1313
{
1414
var factory = provider.GetService<IOcelotLoggerFactory>();
15-
return GetkubeProvider(provider, config, reRoute, factory);
15+
return GetKubeProvider(provider, config, reRoute, factory);
1616
};
1717

18-
private static ServiceDiscovery.Providers.IServiceDiscoveryProvider GetkubeProvider(IServiceProvider provider, Configuration.ServiceProviderConfiguration config, DownstreamReRoute reRoute, IOcelotLoggerFactory factory)
18+
private static ServiceDiscovery.Providers.IServiceDiscoveryProvider GetKubeProvider(IServiceProvider provider, ServiceProviderConfiguration config, DownstreamReRoute reRoute, IOcelotLoggerFactory factory)
1919
{
2020
var kubeClient = provider.GetService<IKubeApiClient>();
21+
2122
var k8sRegistryConfiguration = new KubeRegistryConfiguration()
2223
{
2324
KeyOfServiceInK8s = reRoute.ServiceName,
2425
KubeNamespace = string.IsNullOrEmpty(reRoute.ServiceNamespace) ? config.Namespace : reRoute.ServiceNamespace
2526
};
2627

27-
var k8sServiceDiscoveryProvider = new Kube(k8sRegistryConfiguration, factory, kubeClient);
28+
var k8sServiceDiscoveryProvider = new KubernetesServiceDiscoveryProvider(k8sRegistryConfiguration, factory, kubeClient);
29+
2830
if (config.Type?.ToLower() == "pollkube")
2931
{
30-
return new PollKube(config.PollingInterval, factory, k8sServiceDiscoveryProvider);
32+
return new PollKubernetes(config.PollingInterval, factory, k8sServiceDiscoveryProvider);
3133
}
3234
return k8sServiceDiscoveryProvider;
3335
}

src/Ocelot.Provider.Kubernetes/PollKube.cs renamed to src/Ocelot.Provider.Kubernetes/PollKubernetes.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,17 @@
77

88
namespace Ocelot.Provider.Kubernetes
99
{
10-
public class PollKube : IServiceDiscoveryProvider
10+
public class PollKubernetes : IServiceDiscoveryProvider
1111
{
1212
private readonly IOcelotLogger _logger;
1313
private readonly IServiceDiscoveryProvider _kubeServiceDiscoveryProvider;
1414
private readonly Timer _timer;
1515
private bool _polling;
1616
private List<Service> _services;
1717

18-
public PollKube(int pollingInterval, IOcelotLoggerFactory factory, IServiceDiscoveryProvider kubeServiceDiscoveryProvider)
18+
public PollKubernetes(int pollingInterval, IOcelotLoggerFactory factory, IServiceDiscoveryProvider kubeServiceDiscoveryProvider)
1919
{
20-
_logger = factory.CreateLogger<PollKube>();
20+
_logger = factory.CreateLogger<PollKubernetes>();
2121
_kubeServiceDiscoveryProvider = kubeServiceDiscoveryProvider;
2222
_services = new List<Service>();
2323

test/Ocelot.UnitTests/Kubernetes/KubeServiceDiscoveryProviderTests.cs

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ namespace Ocelot.UnitTests.Kubernetes
2121
public class KubeServiceDiscoveryProviderTests : IDisposable
2222
{
2323
private IWebHost _fakeKubeBuilder;
24-
private ServiceV1 _serviceEntries;
25-
private readonly Kube _provider;
24+
private readonly KubernetesServiceDiscoveryProvider _provider;
25+
private EndpointsV1 _endpointEntries;
2626
private readonly string _serviceName;
2727
private readonly string _namespaces;
2828
private readonly int _port;
@@ -41,59 +41,55 @@ public KubeServiceDiscoveryProviderTests()
4141
_port = 86;
4242
_kubeHost = "localhost";
4343
_fakekubeServiceDiscoveryUrl = $"http://{_kubeHost}:{_port}";
44-
_serviceEntries = new ServiceV1();
44+
_endpointEntries = new EndpointsV1();
4545
_factory = new Mock<IOcelotLoggerFactory>();
4646

4747
var option = new KubeClientOptions
4848
{
4949
ApiEndPoint = new Uri(_fakekubeServiceDiscoveryUrl),
5050
AccessToken = "txpc696iUhbVoudg164r93CxDTrKRVWG",
5151
AuthStrategy = KubeClient.KubeAuthStrategy.BearerToken,
52-
AllowInsecure = true
52+
AllowInsecure = true,
5353
};
5454

5555
_clientFactory = KubeApiClient.Create(option);
5656
_logger = new Mock<IOcelotLogger>();
57-
_factory.Setup(x => x.CreateLogger<Kube>()).Returns(_logger.Object);
57+
_factory.Setup(x => x.CreateLogger<KubernetesServiceDiscoveryProvider>()).Returns(_logger.Object);
5858
var config = new KubeRegistryConfiguration()
5959
{
6060
KeyOfServiceInK8s = _serviceName,
61-
KubeNamespace = _namespaces
61+
KubeNamespace = _namespaces,
6262
};
63-
_provider = new Kube(config, _factory.Object, _clientFactory);
63+
_provider = new KubernetesServiceDiscoveryProvider(config, _factory.Object, _clientFactory);
6464
}
6565

6666
[Fact]
6767
public void should_return_service_from_k8s()
6868
{
6969
var token = "Bearer txpc696iUhbVoudg164r93CxDTrKRVWG";
70-
var serviceEntryOne = new ServiceV1()
70+
var endPointEntryOne = new EndpointsV1
7171
{
72-
Kind = "service",
72+
Kind = "endpoint",
7373
ApiVersion = "1.0",
7474
Metadata = new ObjectMetaV1()
7575
{
76-
Namespace = "dev"
76+
Namespace = "dev",
7777
},
78-
Spec = new ServiceSpecV1()
79-
{
80-
ClusterIP = "localhost"
81-
},
82-
Status = new ServiceStatusV1()
83-
{
84-
LoadBalancer = new LoadBalancerStatusV1()
85-
}
8678
};
87-
88-
serviceEntryOne.Spec.Ports.Add(
89-
new ServicePortV1()
90-
{
91-
Port = 80
92-
}
93-
);
79+
var endpointSubsetV1 = new EndpointSubsetV1();
80+
endpointSubsetV1.Addresses.Add(new EndpointAddressV1()
81+
{
82+
Ip = "127.0.0.1",
83+
Hostname = "localhost",
84+
});
85+
endpointSubsetV1.Ports.Add(new EndpointPortV1()
86+
{
87+
Port = 80,
88+
});
89+
endPointEntryOne.Subsets.Add(endpointSubsetV1);
9490

9591
this.Given(x => GivenThereIsAFakeKubeServiceDiscoveryProvider(_fakekubeServiceDiscoveryUrl, _serviceName, _namespaces))
96-
.And(x => GivenTheServicesAreRegisteredWithKube(serviceEntryOne))
92+
.And(x => GivenTheServicesAreRegisteredWithKube(endPointEntryOne))
9793
.When(x => WhenIGetTheServices())
9894
.Then(x => ThenTheCountIs(1))
9995
.And(_ => _receivedToken.ShouldBe(token))
@@ -110,9 +106,9 @@ private void WhenIGetTheServices()
110106
_services = _provider.Get().GetAwaiter().GetResult();
111107
}
112108

113-
private void GivenTheServicesAreRegisteredWithKube(ServiceV1 serviceEntries)
109+
private void GivenTheServicesAreRegisteredWithKube(EndpointsV1 endpointEntries)
114110
{
115-
_serviceEntries = serviceEntries;
111+
_endpointEntries = endpointEntries;
116112
}
117113

118114
private void GivenThereIsAFakeKubeServiceDiscoveryProvider(string url, string serviceName, string namespaces)
@@ -127,14 +123,14 @@ private void GivenThereIsAFakeKubeServiceDiscoveryProvider(string url, string se
127123
{
128124
app.Run(async context =>
129125
{
130-
if (context.Request.Path.Value == $"/api/v1/namespaces/{namespaces}/services/{serviceName}")
126+
if (context.Request.Path.Value == $"/api/v1/namespaces/{namespaces}/endpoints/{serviceName}")
131127
{
132128
if (context.Request.Headers.TryGetValue("Authorization", out var values))
133129
{
134130
_receivedToken = values.First();
135131
}
136132

137-
var json = JsonConvert.SerializeObject(_serviceEntries);
133+
var json = JsonConvert.SerializeObject(_endpointEntries);
138134
context.Response.Headers.Add("Content-Type", "application/json");
139135
await context.Response.WriteAsync(json);
140136
}

test/Ocelot.UnitTests/Kubernetes/PollingKubeServiceDiscoveryProviderTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ namespace Ocelot.UnitTests.Kubernetes
1515
public class PollingKubeServiceDiscoveryProviderTests
1616
{
1717
private readonly int _delay;
18-
private PollKube _provider;
18+
private PollKubernetes _provider;
1919
private readonly List<Service> _services;
2020
private readonly Mock<IOcelotLoggerFactory> _factory;
2121
private readonly Mock<IOcelotLogger> _logger;
@@ -28,7 +28,7 @@ public PollingKubeServiceDiscoveryProviderTests()
2828
_delay = 1;
2929
_factory = new Mock<IOcelotLoggerFactory>();
3030
_logger = new Mock<IOcelotLogger>();
31-
_factory.Setup(x => x.CreateLogger<PollKube>()).Returns(_logger.Object);
31+
_factory.Setup(x => x.CreateLogger<PollKubernetes>()).Returns(_logger.Object);
3232
_kubeServiceDiscoveryProvider = new Mock<IServiceDiscoveryProvider>();
3333
}
3434

@@ -56,7 +56,7 @@ private void ThenTheCountIs(int count)
5656

5757
private void WhenIGetTheServices(int expected)
5858
{
59-
_provider = new PollKube(_delay, _factory.Object, _kubeServiceDiscoveryProvider.Object);
59+
_provider = new PollKubernetes(_delay, _factory.Object, _kubeServiceDiscoveryProvider.Object);
6060

6161
var result = Wait.WaitFor(3000).Until(() =>
6262
{

0 commit comments

Comments
 (0)