Skip to content

Commit 461a3bf

Browse files
committed
Add dynamic client
1 parent 7d98f28 commit 461a3bf

File tree

5 files changed

+1140
-0
lines changed

5 files changed

+1140
-0
lines changed

dynamic/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .client import * # NOQA

dynamic/client.py

Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
import six
2+
import json
3+
4+
from kubernetes import watch
5+
from kubernetes.client.rest import ApiException
6+
7+
from .discovery import EagerDiscoverer, LazyDiscoverer
8+
from .exceptions import api_exception, KubernetesValidateMissing
9+
from .resource import Resource, ResourceList, Subresource, ResourceInstance, ResourceField
10+
11+
try:
12+
import kubernetes_validate
13+
HAS_KUBERNETES_VALIDATE = True
14+
except ImportError:
15+
HAS_KUBERNETES_VALIDATE = False
16+
17+
try:
18+
from kubernetes_validate.utils import VersionNotSupportedError
19+
except ImportError:
20+
class VersionNotSupportedError(NotImplementedError):
21+
pass
22+
23+
__all__ = [
24+
'DynamicClient',
25+
'ResourceInstance',
26+
'Resource',
27+
'ResourceList',
28+
'Subresource',
29+
'EagerDiscoverer',
30+
'LazyDiscoverer',
31+
'ResourceField',
32+
]
33+
34+
35+
def meta_request(func):
36+
""" Handles parsing response structure and translating API Exceptions """
37+
def inner(self, *args, **kwargs):
38+
serialize_response = kwargs.pop('serialize', True)
39+
serializer = kwargs.pop('serializer', ResourceInstance)
40+
try:
41+
resp = func(self, *args, **kwargs)
42+
except ApiException as e:
43+
raise api_exception(e)
44+
if serialize_response:
45+
try:
46+
if six.PY2:
47+
return serializer(self, json.loads(resp.data))
48+
return serializer(self, json.loads(resp.data.decode('utf8')))
49+
except ValueError:
50+
if six.PY2:
51+
return resp.data
52+
return resp.data.decode('utf8')
53+
return resp
54+
55+
return inner
56+
57+
58+
class DynamicClient(object):
59+
""" A kubernetes client that dynamically discovers and interacts with
60+
the kubernetes API
61+
"""
62+
63+
def __init__(self, client, cache_file=None, discoverer=None):
64+
# Setting default here to delay evaluation of LazyDiscoverer class
65+
# until constructor is called
66+
discoverer = discoverer or LazyDiscoverer
67+
68+
self.client = client
69+
self.configuration = client.configuration
70+
self.__discoverer = discoverer(self, cache_file)
71+
72+
@property
73+
def resources(self):
74+
return self.__discoverer
75+
76+
@property
77+
def version(self):
78+
return self.__discoverer.version
79+
80+
def ensure_namespace(self, resource, namespace, body):
81+
namespace = namespace or body.get('metadata', {}).get('namespace')
82+
if not namespace:
83+
raise ValueError("Namespace is required for {}.{}".format(resource.group_version, resource.kind))
84+
return namespace
85+
86+
def serialize_body(self, body):
87+
if hasattr(body, 'to_dict'):
88+
return body.to_dict()
89+
return body or {}
90+
91+
def get(self, resource, name=None, namespace=None, **kwargs):
92+
path = resource.path(name=name, namespace=namespace)
93+
return self.request('get', path, **kwargs)
94+
95+
def create(self, resource, body=None, namespace=None, **kwargs):
96+
body = self.serialize_body(body)
97+
if resource.namespaced:
98+
namespace = self.ensure_namespace(resource, namespace, body)
99+
path = resource.path(namespace=namespace)
100+
return self.request('post', path, body=body, **kwargs)
101+
102+
def delete(self, resource, name=None, namespace=None, body=None, label_selector=None, field_selector=None, **kwargs):
103+
if not (name or label_selector or field_selector):
104+
raise ValueError("At least one of name|label_selector|field_selector is required")
105+
if resource.namespaced and not (label_selector or field_selector or namespace):
106+
raise ValueError("At least one of namespace|label_selector|field_selector is required")
107+
path = resource.path(name=name, namespace=namespace)
108+
return self.request('delete', path, body=body, label_selector=label_selector, field_selector=field_selector, **kwargs)
109+
110+
def replace(self, resource, body=None, name=None, namespace=None, **kwargs):
111+
body = self.serialize_body(body)
112+
name = name or body.get('metadata', {}).get('name')
113+
if not name:
114+
raise ValueError("name is required to replace {}.{}".format(resource.group_version, resource.kind))
115+
if resource.namespaced:
116+
namespace = self.ensure_namespace(resource, namespace, body)
117+
path = resource.path(name=name, namespace=namespace)
118+
return self.request('put', path, body=body, **kwargs)
119+
120+
def patch(self, resource, body=None, name=None, namespace=None, **kwargs):
121+
body = self.serialize_body(body)
122+
name = name or body.get('metadata', {}).get('name')
123+
if not name:
124+
raise ValueError("name is required to patch {}.{}".format(resource.group_version, resource.kind))
125+
if resource.namespaced:
126+
namespace = self.ensure_namespace(resource, namespace, body)
127+
128+
content_type = kwargs.pop('content_type', 'application/strategic-merge-patch+json')
129+
path = resource.path(name=name, namespace=namespace)
130+
131+
return self.request('patch', path, body=body, content_type=content_type, **kwargs)
132+
133+
def watch(self, resource, namespace=None, name=None, label_selector=None, field_selector=None, resource_version=None, timeout=None):
134+
"""
135+
Stream events for a resource from the Kubernetes API
136+
137+
:param resource: The API resource object that will be used to query the API
138+
:param namespace: The namespace to query
139+
:param name: The name of the resource instance to query
140+
:param label_selector: The label selector with which to filter results
141+
:param field_selector: The field selector with which to filter results
142+
:param resource_version: The version with which to filter results. Only events with
143+
a resource_version greater than this value will be returned
144+
:param timeout: The amount of time in seconds to wait before terminating the stream
145+
146+
:return: Event object with these keys:
147+
'type': The type of event such as "ADDED", "DELETED", etc.
148+
'raw_object': a dict representing the watched object.
149+
'object': A ResourceInstance wrapping raw_object.
150+
151+
Example:
152+
client = DynamicClient(k8s_client)
153+
v1_pods = client.resources.get(api_version='v1', kind='Pod')
154+
155+
for e in v1_pods.watch(resource_version=0, namespace=default, timeout=5):
156+
print(e['type'])
157+
print(e['object'].metadata)
158+
"""
159+
watcher = watch.Watch()
160+
for event in watcher.stream(
161+
resource.get,
162+
namespace=namespace,
163+
name=name,
164+
field_selector=field_selector,
165+
label_selector=label_selector,
166+
resource_version=resource_version,
167+
serialize=False,
168+
timeout_seconds=timeout
169+
):
170+
event['object'] = ResourceInstance(resource, event['object'])
171+
yield event
172+
173+
@meta_request
174+
def request(self, method, path, body=None, **params):
175+
if not path.startswith('/'):
176+
path = '/' + path
177+
178+
path_params = params.get('path_params', {})
179+
query_params = params.get('query_params', [])
180+
if params.get('pretty') is not None:
181+
query_params.append(('pretty', params['pretty']))
182+
if params.get('_continue') is not None:
183+
query_params.append(('continue', params['_continue']))
184+
if params.get('include_uninitialized') is not None:
185+
query_params.append(('includeUninitialized', params['include_uninitialized']))
186+
if params.get('field_selector') is not None:
187+
query_params.append(('fieldSelector', params['field_selector']))
188+
if params.get('label_selector') is not None:
189+
query_params.append(('labelSelector', params['label_selector']))
190+
if params.get('limit') is not None:
191+
query_params.append(('limit', params['limit']))
192+
if params.get('resource_version') is not None:
193+
query_params.append(('resourceVersion', params['resource_version']))
194+
if params.get('timeout_seconds') is not None:
195+
query_params.append(('timeoutSeconds', params['timeout_seconds']))
196+
if params.get('watch') is not None:
197+
query_params.append(('watch', params['watch']))
198+
if params.get('grace_period_seconds') is not None:
199+
query_params.append(('gracePeriodSeconds', params['grace_period_seconds']))
200+
if params.get('propagation_policy') is not None:
201+
query_params.append(('propagationPolicy', params['propagation_policy']))
202+
if params.get('orphan_dependents') is not None:
203+
query_params.append(('orphanDependents', params['orphan_dependents']))
204+
205+
header_params = params.get('header_params', {})
206+
form_params = []
207+
local_var_files = {}
208+
# HTTP header `Accept`
209+
header_params['Accept'] = self.client.select_header_accept([
210+
'application/json',
211+
'application/yaml',
212+
])
213+
214+
# HTTP header `Content-Type`
215+
if params.get('content_type'):
216+
header_params['Content-Type'] = params['content_type']
217+
else:
218+
header_params['Content-Type'] = self.client.select_header_content_type(['*/*'])
219+
220+
# Authentication setting
221+
auth_settings = ['BearerToken']
222+
223+
return self.client.call_api(
224+
path,
225+
method.upper(),
226+
path_params,
227+
query_params,
228+
header_params,
229+
body=body,
230+
post_params=form_params,
231+
async_req=params.get('async_req'),
232+
files=local_var_files,
233+
auth_settings=auth_settings,
234+
_preload_content=False,
235+
_return_http_data_only=params.get('_return_http_data_only', True)
236+
)
237+
238+
def validate(self, definition, version=None, strict=False):
239+
"""validate checks a kubernetes resource definition
240+
241+
Args:
242+
definition (dict): resource definition
243+
version (str): version of kubernetes to validate against
244+
strict (bool): whether unexpected additional properties should be considered errors
245+
246+
Returns:
247+
warnings (list), errors (list): warnings are missing validations, errors are validation failures
248+
"""
249+
if not HAS_KUBERNETES_VALIDATE:
250+
raise KubernetesValidateMissing()
251+
252+
errors = list()
253+
warnings = list()
254+
try:
255+
if version is None:
256+
try:
257+
version = self.version['kubernetes']['gitVersion']
258+
except KeyError:
259+
version = kubernetes_validate.latest_version()
260+
kubernetes_validate.validate(definition, version, strict)
261+
except kubernetes_validate.utils.ValidationError as e:
262+
errors.append("resource definition validation error at %s: %s" % ('.'.join([str(item) for item in e.path]), e.message)) # noqa: B306
263+
except VersionNotSupportedError:
264+
errors.append("Kubernetes version %s is not supported by kubernetes-validate" % version)
265+
except kubernetes_validate.utils.SchemaNotFoundError as e:
266+
warnings.append("Could not find schema for object kind %s with API version %s in Kubernetes version %s (possibly Custom Resource?)" %
267+
(e.kind, e.api_version, e.version))
268+
return warnings, errors

0 commit comments

Comments
 (0)