Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions kubernetes_asyncio/watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,15 @@ def unmarshal_event(self, data: str, response_type):
reason = "{}: {}".format(obj['reason'], obj['message'])
raise client.exceptions.ApiException(status=obj['code'], reason=reason)

# If possible, compile the JSON response into a Python native response
# type, eg `V1Namespace` or `V1Pod`,`ExtensionsV1beta1Deployment`, ...
if response_type:
js['object'] = self._api_client.deserialize(
response=SimpleNamespace(data=json.dumps(js['raw_object'])),
response_type=response_type
)

if js['type'].lower() != 'bookmark':
# If possible, compile the JSON response into a Python native response
# type, eg `V1Namespace` or `V1Pod`,`ExtensionsV1beta1Deployment`, ...
if response_type:
js['object'] = self._api_client.deserialize(
response=SimpleNamespace(data=json.dumps(js['raw_object'])),
response_type=response_type
)

# decode and save resource_version to continue watching
if hasattr(js['object'], 'metadata'):
self.resource_version = js['object'].metadata.resource_version
Expand All @@ -123,7 +123,14 @@ def unmarshal_event(self, data: str, response_type):
self.resource_version = js['object']['metadata']['resourceVersion']

elif js['type'].lower() == 'bookmark':
self.resource_version = js['object']['metadata']['resourceVersion']
if (isinstance(js['raw_object'], dict)
and 'metadata' in js['raw_object']
and 'resourceVersion' in js['raw_object']['metadata']):
self.resource_version = js['raw_object']['metadata']['resourceVersion']
else:
raise Exception(("Malformed JSON response for bookmark event, "
"'metadata' or 'resourceVersion' field is missing. "
"JSON: {}").format(js))

return js

Expand Down
35 changes: 33 additions & 2 deletions kubernetes_asyncio/watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,37 @@ async def test_unmarshal_bookmark_succeeds_and_preserves_resource_version(self):

# make sure the resource version is preserved,
# and the watcher's resource_version is updated
self.assertTrue(isinstance(event['object'], dict))
self.assertEqual("1", event['object']['metadata']['resourceVersion'])
self.assertTrue(isinstance(event['raw_object'], dict))
self.assertEqual("1", event['raw_object']['metadata']['resourceVersion'])
self.assertEqual("1", w.resource_version)

async def test_unmarshal_job_bookmark_succeeds_and_preserves_resource_version(self):
w = Watch()
event = w.unmarshal_event('{"type": "BOOKMARK", "object": {"apiVersion":'
'"batch/v1","kind":"Job","metadata":'
'{"name": "bar", "resourceVersion": "1"},'
'"spec": {"template": {"metadata": '
'{"creationTimestamp":null}, "spec": '
'{"containers":null}}}}}',
'object')
self.assertEqual("BOOKMARK", event['type'])

# make sure the resource version is preserved,
# and the watcher's resource_version is updated
self.assertTrue(isinstance(event['raw_object'], dict))
self.assertEqual("1", event['raw_object']['metadata']['resourceVersion'])
self.assertEqual("1", w.resource_version)

async def test_unmarshall_job_bookmark_malformed_object_fails(self):
# An actual error response sent by K8s during testing.
k8s_err = {
'type': 'BOOKMARK',
'object': {
'kind': 'Job',
'apiVersion': 'batch/v1',
'metadata': {},
}
}

with self.assertRaises(Exception):
Watch().unmarshal_event(json.dumps(k8s_err), None)