Skip to content

Commit 6eb907c

Browse files
mnaserstrigazi
authored andcommitted
Drop Kubernetes Python client dependency
We depend on the Kubernetes Python client for several things such as health checks & metrics polling. Those are both run inside periodic jobs which spawn in greenthreads. The Kubernetes API uses it's own thread pools which seem to use native pools and cause several different deadlocks when it comes to logging. Since we don't make extensive use of the Kubernetes API and we want something that doesn't use any threadpools, we can simply use a simple wrapper using Requests. This patch takes care of dropping the dependency and refactoring all the code to use this simple mechansim instead, which should reduce the overall dependency list as well as avoid any deadlock issues which are present in the upstream client. Change-Id: If0b7c96cb77bba0c79a678c9885622f1fe0f7ebc
1 parent a9b9ba2 commit 6eb907c

File tree

8 files changed

+327
-233
lines changed

8 files changed

+327
-233
lines changed

lower-constraints.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ jsonschema==2.6.0
4949
keystoneauth1==3.14.0
5050
keystonemiddleware==9.0.0
5151
kombu==5.0.1
52-
kubernetes==12.0.0
5352
linecache2==1.0.0
5453
logutils==0.3.5
5554
Mako==1.0.7
@@ -124,6 +123,7 @@ repoze.lru==0.7
124123
requests-oauthlib==0.8.0
125124
requests-toolbelt==0.8.0
126125
requests==2.20.1
126+
requests-mock==1.2.0
127127
requestsexceptions==1.4.0
128128
restructuredtext-lint==1.1.3
129129
rfc3986==1.2.0

magnum/conductor/k8s_api.py

Lines changed: 70 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -12,134 +12,91 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import tempfile
16-
17-
from kubernetes import client as k8s_config
18-
from kubernetes.client import api_client
19-
from kubernetes.client.apis import core_v1_api
20-
from kubernetes.client import configuration as k8s_configuration
21-
from kubernetes.client import rest
22-
from oslo_log import log as logging
15+
import requests
2316

2417
from magnum.conductor.handlers.common.cert_manager import create_client_files
2518

26-
LOG = logging.getLogger(__name__)
27-
2819

29-
class ApiClient(api_client.ApiClient):
30-
31-
def __init__(self, configuration=None, header_name=None,
32-
header_value=None, cookie=None):
33-
if configuration is None:
34-
configuration = k8s_configuration.Configuration()
35-
self.configuration = configuration
20+
class KubernetesAPI:
21+
"""
22+
Simple Kubernetes API client using requests.
3623
37-
self.rest_client = rest.RESTClientObject(configuration)
38-
self.default_headers = {}
39-
if header_name is not None:
40-
self.default_headers[header_name] = header_value
41-
self.cookie = cookie
24+
This API wrapper allows for a set of very simple operations to be
25+
performed on a Kubernetes cluster using the `requests` library. The
26+
reason behind it is that the native `kubernetes` library does not
27+
seem to be quite thread-safe at the moment.
4228
43-
def __del__(self):
44-
pass
45-
46-
def call_api(self, resource_path, method,
47-
path_params=None, query_params=None, header_params=None,
48-
body=None, post_params=None, files=None,
49-
response_type=None, auth_settings=None,
50-
_return_http_data_only=None, collection_formats=None,
51-
_preload_content=True, _request_timeout=None, **kwargs):
52-
"""Makes http request (synchronous) and return the deserialized data
53-
54-
:param resource_path: Path to method endpoint.
55-
:param method: Method to call.
56-
:param path_params: Path parameters in the url.
57-
:param query_params: Query parameters in the url.
58-
:param header_params: Header parameters to be
59-
placed in the request header.
60-
:param body: Request body.
61-
:param post_params dict: Request post form parameters,
62-
for `application/x-www-form-urlencoded`, `multipart/form-data`.
63-
:param auth_settings list: Auth Settings names for the request.
64-
:param response: Response data type.
65-
:param files dict: key -> filename, value -> filepath,
66-
for `multipart/form-data`.
67-
:param _return_http_data_only: response data without head status code
68-
and headers
69-
:param collection_formats: dict of collection formats for path, query,
70-
header, and post parameters.
71-
:param _preload_content: if False, the urllib3.HTTPResponse object will
72-
be returned without reading/decoding response
73-
data. Default is True.
74-
:param _request_timeout: timeout setting for this request. If one
75-
number provided, it will be total request
76-
timeout. It can also be a pair (tuple) of
77-
(connection, read) timeouts.
78-
79-
:return: The method will return the response directly
29+
Also, our interactions with the Kubernetes API are happening inside
30+
Greenthreads so we don't need to use connection pooling on top of it,
31+
in addition to pools not being something that you can disable with
32+
the native Kubernetes API.
33+
"""
8034

35+
def __init__(self, context, cluster):
36+
self.context = context
37+
self.cluster = cluster
38+
39+
# Load certificates for cluster
40+
(self.ca_file, self.key_file, self.cert_file) = create_client_files(
41+
self.cluster, self.context
42+
)
43+
44+
def _request(self, method, url, json=True):
45+
response = requests.request(
46+
method,
47+
url,
48+
verify=self.ca_file.name,
49+
cert=(self.cert_file.name, self.key_file.name)
50+
)
51+
response.raise_for_status()
52+
if json:
53+
return response.json()
54+
else:
55+
return response.text
56+
57+
def get_healthz(self):
8158
"""
82-
return self.__call_api(resource_path, method,
83-
path_params, query_params, header_params,
84-
body, post_params, files,
85-
response_type, auth_settings,
86-
_return_http_data_only, collection_formats,
87-
_preload_content, _request_timeout)
88-
89-
90-
class K8sAPI(core_v1_api.CoreV1Api):
91-
92-
def _create_temp_file_with_content(self, content):
93-
"""Creates temp file and write content to the file.
94-
95-
:param content: file content
96-
:returns: temp file
59+
Get the health of the cluster from API
9760
"""
98-
try:
99-
tmp = tempfile.NamedTemporaryFile(delete=True)
100-
tmp.write(content)
101-
tmp.flush()
102-
except Exception as err:
103-
LOG.error("Error while creating temp file: %s", err)
104-
raise
105-
return tmp
106-
107-
def __init__(self, context, cluster):
108-
self.ca_file = None
109-
self.cert_file = None
110-
self.key_file = None
61+
return self._request(
62+
'GET',
63+
f"{self.cluster.api_address}/healthz",
64+
json=False
65+
)
11166

112-
if cluster.magnum_cert_ref:
113-
(self.ca_file, self.key_file,
114-
self.cert_file) = create_client_files(cluster, context)
67+
def list_node(self):
68+
"""
69+
List all nodes in the cluster.
11570
116-
config = k8s_config.Configuration()
117-
config.host = cluster.api_address
118-
config.ssl_ca_cert = self.ca_file.name
119-
config.cert_file = self.cert_file.name
120-
config.key_file = self.key_file.name
71+
:return: List of nodes.
72+
"""
73+
return self._request(
74+
'GET',
75+
f"{self.cluster.api_address}/api/v1/nodes"
76+
)
12177

122-
# build a connection with Kubernetes master
123-
client = ApiClient(configuration=config)
78+
def list_namespaced_pod(self, namespace):
79+
"""
80+
List all pods in the given namespace.
12481
125-
super(K8sAPI, self).__init__(client)
82+
:param namespace: Namespace to list pods from.
83+
:return: List of pods.
84+
"""
85+
return self._request(
86+
'GET',
87+
f"{self.cluster.api_address}/api/v1/namespaces/{namespace}/pods"
88+
)
12689

12790
def __del__(self):
128-
if self.ca_file:
91+
"""
92+
Close all of the file descriptions for the certificates, since they
93+
are left open by `create_client_files`.
94+
95+
TODO(mnaser): Use a context manager and avoid having these here.
96+
"""
97+
if hasattr(self, 'ca_file'):
12998
self.ca_file.close()
130-
if self.cert_file:
99+
if hasattr(self, 'cert_file'):
131100
self.cert_file.close()
132-
if self.key_file:
101+
if hasattr(self, 'key_file'):
133102
self.key_file.close()
134-
135-
136-
def create_k8s_api(context, cluster):
137-
"""Create a kubernetes API client
138-
139-
Creates connection with Kubernetes master and creates ApivApi instance
140-
to call Kubernetes APIs.
141-
142-
:param context: The security context
143-
:param cluster: Cluster object
144-
"""
145-
return K8sAPI(context, cluster)

magnum/drivers/common/k8s_monitor.py

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def metrics_spec(self):
4242
}
4343

4444
def pull_data(self):
45-
k8s_api = k8s.create_k8s_api(self.context, self.cluster)
45+
k8s_api = k8s.KubernetesAPI(self.context, self.cluster)
4646
nodes = k8s_api.list_node()
4747
self.data['nodes'] = self._parse_node_info(nodes)
4848
pods = k8s_api.list_namespaced_pod('default')
@@ -52,7 +52,7 @@ def poll_health_status(self):
5252
if self._is_magnum_auto_healer_running():
5353
return
5454

55-
k8s_api = k8s.create_k8s_api(self.context, self.cluster)
55+
k8s_api = k8s.KubernetesAPI(self.context, self.cluster)
5656
if self._is_cluster_accessible():
5757
status, reason = self._poll_health_status(k8s_api)
5858
else:
@@ -132,20 +132,16 @@ def _parse_pod_info(self, pods):
132132
[{'Memory': 1280000.0, cpu: 0.5},
133133
{'Memory': 1280000.0, cpu: 0.5}]
134134
"""
135-
pods = pods.items
135+
pods = pods['items']
136136
parsed_containers = []
137137
for pod in pods:
138-
containers = pod.spec.containers
138+
containers = pod['spec']['containers']
139139
for container in containers:
140140
memory = 0
141141
cpu = 0
142-
resources = container.resources
143-
limits = resources.limits
142+
resources = container['resources']
143+
limits = resources['limits']
144144
if limits is not None:
145-
# Output of resources.limits is string
146-
# for example:
147-
# limits = "{cpu': '500m': 'memory': '1000Ki'}"
148-
limits = ast.literal_eval(limits)
149145
if limits.get('memory', ''):
150146
memory = utils.get_k8s_quantity(limits['memory'])
151147
if limits.get('cpu', ''):
@@ -184,13 +180,13 @@ def _parse_node_info(self, nodes):
184180
{'cpu': 1, 'Memory': 1024.0}]
185181
186182
"""
187-
nodes = nodes.items
183+
nodes = nodes['items']
188184
parsed_nodes = []
189185
for node in nodes:
190186
# Output of node.status.capacity is strong
191187
# for example:
192188
# capacity = "{'cpu': '1', 'memory': '1000Ki'}"
193-
capacity = node.status.capacity
189+
capacity = node['status']['capacity']
194190
memory = utils.get_k8s_quantity(capacity['memory'])
195191
cpu = int(capacity['cpu'])
196192
parsed_nodes.append({'Memory': memory, 'Cpu': cpu})
@@ -234,15 +230,14 @@ def _poll_health_status(self, k8s_api):
234230
api_status = None
235231

236232
try:
237-
api_status, _, _ = k8s_api.api_client.call_api(
238-
'/healthz', 'GET', response_type=object)
233+
api_status = k8s_api.get_healthz()
239234

240-
for node in k8s_api.list_node().items:
241-
node_key = node.metadata.name + ".Ready"
235+
for node in k8s_api.list_node()['items']:
236+
node_key = node['metadata']['name'] + ".Ready"
242237
ready = False
243-
for condition in node.status.conditions:
244-
if condition.type == 'Ready':
245-
ready = strutils.bool_from_string(condition.status)
238+
for condition in node['status']['conditions']:
239+
if condition['type'] == 'Ready':
240+
ready = strutils.bool_from_string(condition['status'])
246241
break
247242

248243
health_status_reason[node_key] = ready

magnum/drivers/common/k8s_scale_manager.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ def __init__(self, context, osclient, cluster):
2020
super(K8sScaleManager, self).__init__(context, osclient, cluster)
2121

2222
def _get_hosts_with_container(self, context, cluster):
23-
k8s_api = k8s.create_k8s_api(self.context, cluster)
23+
k8s_api = k8s.KubernetesAPI(context, cluster)
2424
hosts = set()
25-
for pod in k8s_api.list_namespaced_pod(namespace='default').items:
26-
hosts.add(pod.spec.node_name)
25+
for pod in k8s_api.list_namespaced_pod(namespace='default')['items']:
26+
hosts.add(pod['spec']['node_name'])
2727

2828
return hosts

0 commit comments

Comments
 (0)