Skip to content
This repository was archived by the owner on Dec 5, 2020. It is now read-only.

Commit cbe5106

Browse files
authored
add log backend; update clients; add pytest.ini for log config; add t… (#108)
* add log backend; update clients; add pytest.ini for log config Signed-off-by: SreeV <[email protected]>
1 parent 3f8ac28 commit cbe5106

File tree

10 files changed

+239
-20
lines changed

10 files changed

+239
-20
lines changed

marquez_client/client_wo.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def create_namespace(self, namespace_name, owner_name, description=None):
4646
return self._backend.put(
4747
self._path('/namespaces/{0}', namespace_name),
4848
headers=_HEADERS,
49-
json=payload
49+
json_payload=payload
5050
)
5151

5252
# Source API
@@ -67,7 +67,7 @@ def create_source(self, source_name, source_type, connection_url,
6767
return self._backend.put(
6868
self._path('/sources/{0}', source_name),
6969
headers=_HEADERS,
70-
json=payload)
70+
json_payload=payload)
7171

7272
# Datasets API
7373
def create_dataset(self, namespace_name, dataset_name, dataset_type,
@@ -109,7 +109,7 @@ def create_dataset(self, namespace_name, dataset_name, dataset_type,
109109
self._path('/namespaces/{0}/datasets/{1}', namespace_name,
110110
dataset_name),
111111
headers=_HEADERS,
112-
json=payload
112+
json_payload=payload
113113
)
114114

115115
# Job API
@@ -138,7 +138,7 @@ def create_job(self, namespace_name, job_name, job_type,
138138
return self._backend.put(
139139
self._path('/namespaces/{0}/jobs/{1}', namespace_name, job_name),
140140
headers=_HEADERS,
141-
json=payload
141+
json_payload=payload
142142
)
143143

144144
def create_job_run(self, namespace_name, job_name, run_id,
@@ -165,7 +165,7 @@ def create_job_run(self, namespace_name, job_name, run_id,
165165
self._path('/namespaces/{0}/jobs/{1}/runs',
166166
namespace_name, job_name),
167167
headers=_HEADERS,
168-
json=payload)
168+
json_payload=payload)
169169

170170
if mark_as_running:
171171
response = self.mark_job_run_as_started(
@@ -192,7 +192,7 @@ def __mark_job_run_as(self, run_id, action, action_at=None):
192192
self._path('/jobs/runs/{0}/{1}?at={2}', run_id, action,
193193
action_at if action_at else Utils.utc_now()),
194194
headers=_HEADERS,
195-
json={}
195+
json_payload={}
196196
)
197197

198198
# Common

marquez_client/clients.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,15 @@
1212

1313
import logging
1414
import os
15-
import marquez_client
1615

17-
from marquez_client.file_backend import FileBackend
18-
from marquez_client.http_backend import HttpBackend
16+
import marquez_client
1917
from marquez_client.constants import (DEFAULT_MARQUEZ_BACKEND,
2018
DEFAULT_MARQUEZ_URL,
2119
DEFAULT_MARQUEZ_FILE,
2220
DEFAULT_TIMEOUT_MS)
21+
from marquez_client.file_backend import FileBackend
22+
from marquez_client.http_backend import HttpBackend
23+
from marquez_client.log_backend import LogBackend
2324
from marquez_client.utils import Utils
2425

2526
log = logging.getLogger(__name__)
@@ -52,3 +53,5 @@ def from_env():
5253
elif backend_env == 'file':
5354
file = os.environ.get('MARQUEZ_FILE', DEFAULT_MARQUEZ_FILE)
5455
return FileBackend(file)
56+
elif backend_env == 'log':
57+
return LogBackend()

marquez_client/file_backend.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,36 +29,39 @@ def _mkdir_if_not_exists(file):
2929
os.makedirs(path, exist_ok=True)
3030
return file
3131

32-
def put(self, path, headers, json):
32+
def put(self, path, headers, json_payload):
3333
log.debug("_put()")
3434

3535
put_details = {}
3636

3737
put_details['method'] = 'PUT'
3838
put_details['path'] = path
3939
put_details['headers'] = headers
40-
put_details['payload'] = json
40+
put_details['payload'] = json_payload
4141

42-
log.info(put_details)
42+
put_json = json.dumps(put_details)
43+
log.info(put_json)
4344

44-
self._sync_file(put_details)
45+
self._sync_file(put_json)
4546

46-
def post(self, path, headers, json=None):
47+
def post(self, path, headers, json_payload=None):
4748
log.debug("_post()")
4849

4950
post_details = {}
5051

5152
post_details['method'] = 'POST'
5253
post_details['path'] = path
5354
post_details['headers'] = headers
54-
post_details['payload'] = json
55+
if json_payload:
56+
post_details['payload'] = json_payload
5557

56-
log.info(post_details)
58+
post_json = json.dumps(post_details)
59+
log.info(post_json)
5760

58-
self._sync_file(post_details)
61+
self._sync_file(post_json)
5962

6063
def _sync_file(self, json_data):
61-
self._file.write(json.dumps(json_data))
64+
self._file.write(json_data)
6265
self._file.write(os.linesep)
6366
self._file.flush()
6467
os.fsync(self._file.fileno())

marquez_client/log_backend.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
3+
# You may obtain a copy of the License at
4+
#
5+
# http://www.apache.org/licenses/LICENSE-2.0
6+
#
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
import json
14+
import logging
15+
16+
from marquez_client.backend import Backend
17+
18+
log = logging.getLogger(__name__)
19+
20+
21+
class LogBackend(Backend):
22+
def __init__(self):
23+
log.debug("LogBackend.init")
24+
25+
def put(self, path, headers, json_payload):
26+
log.debug("_put()")
27+
28+
put_details = {}
29+
30+
put_details['method'] = 'PUT'
31+
put_details['path'] = path
32+
put_details['headers'] = headers
33+
put_details['payload'] = json_payload
34+
35+
log_details = json.dumps(put_details)
36+
log.info(log_details)
37+
38+
def post(self, path, headers, json_payload=None):
39+
log.debug("_post()")
40+
41+
post_details = {}
42+
43+
post_details['method'] = 'POST'
44+
post_details['path'] = path
45+
post_details['headers'] = headers
46+
if json_payload:
47+
post_details['payload'] = json_payload
48+
49+
log_details = json.dumps(post_details)
50+
log.info(log_details)

marquez_client/utils.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@
1010
# See the License for the specific language governing permissions and
1111
# limitations under the License.
1212

13+
import json
1314
import logging
1415
import uuid
16+
from datetime import datetime
17+
1518
import pytz
1619
import time
17-
18-
from datetime import datetime
1920
from pyrfc3339 import generate
21+
2022
from marquez_client.models import (DatasetFieldType, DatasetType)
2123

2224
log = logging.getLogger(__name__)
@@ -92,3 +94,8 @@ def now_ms():
9294
def utc_now():
9395
return str(generate(datetime.utcnow().replace(tzinfo=pytz.utc),
9496
microseconds=True))
97+
98+
@staticmethod
99+
def get_json(file):
100+
with open(file) as json_file:
101+
return json.load(json_file)

tests/pytest.ini

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
3+
# You may obtain a copy of the License at
4+
#
5+
# http://www.apache.org/licenses/LICENSE-2.0
6+
#
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
[pytest]
14+
log_cli_level = DEBUG
15+
log_cli = True
16+
log_cli_format = %(asctime)s %(levelname)s %(message)s
17+
log_cli_date_format = %Y-%m-%d %H:%M:%S

tests/resources/post.json

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"method": "POST",
3+
"path": "/api/v1/namespaces/my-namespace/jobs/my-job/runs",
4+
"headers": {
5+
"User-Agent": "marquez-python/0.7.6"
6+
},
7+
"payload": {
8+
"id": "8c78b02b-86b8-40c7-91bf-e5dd7899ed3b",
9+
"args": {
10+
"email": "[email protected]",
11+
"emailOnFailure": "true",
12+
"emailOnRetry": "true",
13+
"retries": "1"
14+
}
15+
}
16+
}

tests/resources/put.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"method": "PUT",
3+
"path": "/api/v1/namespaces/my-namespace/jobs/my-job",
4+
"headers": {
5+
"User-Agent": "marquez-python/0.7.6"
6+
},
7+
"payload": {
8+
"inputs": [
9+
{
10+
"namespace": "my-namespace",
11+
"name": "public.mytable"
12+
}
13+
],
14+
"outputs": {
15+
"namespace": "my-namespace",
16+
"name": "public.mytable"
17+
},
18+
"type": "BATCH",
19+
"context": {
20+
"sql": "SELECT * FROM public.mytable;"
21+
},
22+
"location": "https://github.com/my-jobs/blob/07f3d2dfc8186cadae9146719e70294a4c7a8ee8"
23+
}
24+
}

tests/test_log_backend.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
3+
# You may obtain a copy of the License at
4+
#
5+
# http://www.apache.org/licenses/LICENSE-2.0
6+
#
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
import json
14+
import logging
15+
import unittest
16+
17+
import mock
18+
19+
from marquez_client.log_backend import LogBackend
20+
from marquez_client.utils import Utils
21+
22+
log = logging.getLogger(__name__)
23+
24+
25+
class TestLogBackend(unittest.TestCase):
26+
def setUp(self):
27+
log.debug("TestLogBackend.setup(): ")
28+
self.log_backend = LogBackend()
29+
self.put_json = Utils.get_json("tests/resources/put.json")
30+
self.post_json = Utils.get_json("tests/resources/post.json")
31+
32+
@mock.patch("json.dumps")
33+
def test_put(self, mock_put):
34+
log.debug("TestLogBackend.test_put(): ")
35+
36+
path = self.put_json["path"]
37+
headers = self.put_json["headers"]
38+
json_payload = self.put_json["payload"]
39+
40+
self.log_backend.put(path, headers, json)
41+
mock_put.return_value = self.put_json
42+
43+
assert path == mock_put.return_value["path"]
44+
assert headers == mock_put.return_value["headers"]
45+
assert json_payload == mock_put.return_value["payload"]
46+
47+
@mock.patch("json.dumps")
48+
def test_post(self, mock_post):
49+
log.debug("TestLogBackend.test_post(): ")
50+
51+
path = self.post_json["path"]
52+
headers = self.post_json["headers"]
53+
json_payload = self.post_json["payload"]
54+
55+
self.log_backend.post(path, headers, json)
56+
mock_post.return_value = self.post_json
57+
58+
assert path == mock_post.return_value["path"]
59+
assert headers == mock_post.return_value["headers"]
60+
assert json_payload == mock_post.return_value["payload"]
61+
62+
63+
if __name__ == '__main__':
64+
unittest.main()
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
3+
# You may obtain a copy of the License at
4+
#
5+
# http://www.apache.org/licenses/LICENSE-2.0
6+
#
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
import logging.config
14+
import os
15+
import unittest
16+
17+
from marquez_client import Clients
18+
from marquez_client.log_backend import LogBackend
19+
20+
_NAMESPACE = "my-namespace"
21+
log = logging.getLogger(__name__)
22+
23+
24+
class TestMarquezWriteOnlyClientLog(unittest.TestCase):
25+
def test_log_backend(self):
26+
log.debug("MarquezWriteOnlyClient.setup(): ")
27+
os.environ['MARQUEZ_BACKEND'] = 'log'
28+
self.client_wo_log = Clients.new_write_only_client()
29+
log.info("created marquez_client_wo_log.")
30+
if not isinstance(self.client_wo_log._backend, LogBackend):
31+
raise Exception("Not a LogBackend.")
32+
33+
34+
if __name__ == '__main__':
35+
unittest.main()

0 commit comments

Comments
 (0)