Skip to content
This repository was archived by the owner on Dec 5, 2020. It is now read-only.

Commit 1ac307b

Browse files
authored
Add wo client tests (#122)
* Add wo client tests Signed-off-by: wslulciuc <[email protected]> * Fix Utils.mk_fields_from() dict access Signed-off-by: wslulciuc <[email protected]> * Add run state transitions tests for wo client Signed-off-by: wslulciuc <[email protected]> * continued: Add run state transitions tests for wo client Signed-off-by: wslulciuc <[email protected]> * Apply formatting Signed-off-by: wslulciuc <[email protected]>
1 parent 41ac7ae commit 1ac307b

File tree

9 files changed

+262
-114
lines changed

9 files changed

+262
-114
lines changed

marquez_client/backend.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212

1313

1414
class Backend:
15-
def put(self, path, headers, json):
15+
def put(self, path, headers, payload):
1616
pass
1717

18-
def post(self, path, headers, json=None):
18+
def post(self, path, headers, payload=None):
1919
pass

marquez_client/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def create_source(self, source_name, source_type, connection_url,
7878
Utils.is_valid_connection_url(connection_url)
7979

8080
payload = {
81-
'type': source_type,
81+
'type': source_type.upper(),
8282
'connectionUrl': connection_url
8383
}
8484

@@ -131,7 +131,7 @@ def create_dataset(self, namespace_name, dataset_name, dataset_type,
131131
payload['runId'] = run_id
132132

133133
if fields:
134-
payload['fields'] = fields
134+
payload['fields'] = Utils.mk_fields_from(fields)
135135

136136
if tags:
137137
payload['tags'] = tags

marquez_client/client_wo.py

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828

2929
# Marquez Write Only Client
30-
class MarquezWriteOnlyClient(object):
30+
class MarquezWriteOnlyClient:
3131
def __init__(self, backend):
3232
self._backend = backend
3333

@@ -44,7 +44,7 @@ def create_namespace(self, namespace_name, owner_name, description=None):
4444
payload['description'] = description
4545

4646
self._backend.put(
47-
self._path('/namespaces/{0}', namespace_name),
47+
path=self._path('/namespaces/{0}', namespace_name),
4848
headers=_HEADERS,
4949
payload=payload
5050
)
@@ -57,15 +57,15 @@ def create_source(self, source_name, source_type, connection_url,
5757
Utils.is_valid_connection_url(connection_url)
5858

5959
payload = {
60-
'type': source_type,
60+
'type': source_type.upper(),
6161
'connectionUrl': connection_url
6262
}
6363

6464
if description:
6565
payload['description'] = description
6666

6767
self._backend.put(
68-
self._path('/sources/{0}', source_name),
68+
path=self._path('/sources/{0}', source_name),
6969
headers=_HEADERS,
7070
payload=payload)
7171

@@ -97,7 +97,7 @@ def create_dataset(self, namespace_name, dataset_name, dataset_type,
9797
payload['runId'] = run_id
9898

9999
if fields:
100-
payload['fields'] = fields
100+
payload['fields'] = Utils.mk_fields_from(fields)
101101

102102
if tags:
103103
payload['tags'] = tags
@@ -106,8 +106,8 @@ def create_dataset(self, namespace_name, dataset_name, dataset_type,
106106
payload['schemaLocation'] = schema_location
107107

108108
self._backend.put(
109-
self._path('/namespaces/{0}/datasets/{1}', namespace_name,
110-
dataset_name),
109+
path=self._path('/namespaces/{0}/datasets/{1}', namespace_name,
110+
dataset_name),
111111
headers=_HEADERS,
112112
payload=payload
113113
)
@@ -140,7 +140,9 @@ def create_job(self, namespace_name, job_name, job_type,
140140
payload['description'] = description
141141

142142
self._backend.put(
143-
self._path('/namespaces/{0}/jobs/{1}', namespace_name, job_name),
143+
path=self._path(
144+
'/namespaces/{0}/jobs/{1}', namespace_name, job_name
145+
),
144146
headers=_HEADERS,
145147
payload=payload
146148
)
@@ -165,42 +167,38 @@ def create_job_run(self, namespace_name, job_name, run_id,
165167
if run_args:
166168
payload['args'] = run_args
167169

168-
response = self._backend.post(
169-
self._path('/namespaces/{0}/jobs/{1}/runs',
170-
namespace_name, job_name),
170+
self._backend.post(
171+
path=self._path('/namespaces/{0}/jobs/{1}/runs',
172+
namespace_name, job_name),
171173
headers=_HEADERS,
172174
payload=payload)
173175

174176
if mark_as_running:
175-
response = self.mark_job_run_as_started(
177+
self.mark_job_run_as_started(
176178
run_id, str(datetime.datetime.utcnow()))
177179

178-
response
179-
180-
def mark_job_run_as_started(self, run_id, action_at=None):
181-
self.__mark_job_run_as(run_id, 'start', action_at)
180+
def mark_job_run_as_started(self, run_id, at=None):
181+
self.__mark_job_run_as(run_id, 'start', at)
182182

183-
def mark_job_run_as_completed(self, run_id, action_at=None):
184-
self.__mark_job_run_as(run_id, 'complete', action_at)
183+
def mark_job_run_as_completed(self, run_id, at=None):
184+
self.__mark_job_run_as(run_id, 'complete', at)
185185

186-
def mark_job_run_as_failed(self, run_id, action_at=None):
187-
self.__mark_job_run_as(run_id, 'fail', action_at)
186+
def mark_job_run_as_failed(self, run_id, at=None):
187+
self.__mark_job_run_as(run_id, 'fail', at)
188188

189-
def mark_job_run_as_aborted(self, run_id, action_at=None):
190-
self.__mark_job_run_as(run_id, 'abort', action_at)
189+
def mark_job_run_as_aborted(self, run_id, at=None):
190+
self.__mark_job_run_as(run_id, 'abort', at)
191191

192-
def __mark_job_run_as(self, run_id, action, action_at=None):
192+
def __mark_job_run_as(self, run_id, transition, at=None):
193193
Utils.is_valid_uuid(run_id, 'run_id')
194-
195194
self._backend.post(
196-
self._path('/jobs/runs/{0}/{1}?at={2}', run_id, action,
197-
action_at if action_at else Utils.utc_now()),
198-
headers=_HEADERS,
199-
payload={}
195+
path=self._path('/jobs/runs/{0}/{1}?at={2}', run_id, transition,
196+
at if at else Utils.utc_now()),
197+
headers=_HEADERS
200198
)
201199

202200
# Common
203201
@staticmethod
204202
def _path(path_template, *args):
205203
encoded_args = [quote(arg.encode('utf-8'), safe='') for arg in args]
206-
return f'{_API_PATH}{path_template.format(*encoded_args)}'
204+
return f'{path_template.format(*encoded_args)}'

marquez_client/models.py

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
class DatasetType(Enum):
1717
DB_TABLE = "DB_TABLE"
1818
STREAM = "STREAM"
19-
HTTP_ENDPOINT = "HTTP_ENDPOINT"
2019

2120

2221
class JobType(Enum):
@@ -31,36 +30,3 @@ class RunState(Enum):
3130
COMPLETED = 'COMPLETED'
3231
FAILED = 'FAILED'
3332
ABORTED = 'ABORTED'
34-
35-
36-
class DatasetFieldType(Enum):
37-
NUMBER = 1
38-
DECIMAL = 2
39-
NUMERIC = 3
40-
INT = 4
41-
INTEGER = 5
42-
BIGINT = 6
43-
SMALLINT = 7
44-
FLOAT = 8
45-
FLOAT4 = 9
46-
FLOAT8 = 10
47-
DOUBLE = 11
48-
REAL = 12
49-
VARCHAR = 13
50-
CHAR = 14
51-
CHARACTER = 15
52-
STRING = 16
53-
TEXT = 17
54-
BINARY = 18
55-
VARBINARY = 19
56-
BOOLEAN = 20
57-
DATE = 21
58-
DATETIME = 22
59-
TIME = 23
60-
TIMESTAMP = 24
61-
TIMESTAMP_LTZ = 25
62-
TIMESTAMP_NTZ = 26
63-
TIMESTAMP_TZ = 27
64-
VARIANT = 28
65-
OBJECT = 29
66-
ARRAY = 30

marquez_client/utils.py

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,37 +11,33 @@
1111
# limitations under the License.
1212

1313
import json
14-
import logging
1514
import uuid
1615
from datetime import datetime
1716

1817
import pytz
1918
import time
2019
from pyrfc3339 import generate
2120

22-
from marquez_client.models import (DatasetFieldType, DatasetType)
2321

24-
log = logging.getLogger(__name__)
25-
26-
27-
class Utils(object):
28-
def make_field(name, data_type, description=None):
29-
if isinstance(data_type, str):
30-
if not DatasetFieldType.__members__.__contains__(data_type):
31-
raise ValueError(f'Invalid field type: {data_type}')
32-
elif isinstance(data_type, DatasetFieldType):
33-
data_type = data_type.name
34-
else:
35-
raise ValueError('data_type must be a str or a DatasetFieldType')
36-
37-
DatasetType.__members__.get(data_type)
38-
field = {
39-
'name': name,
40-
'type': data_type
41-
}
42-
if description:
43-
field['description'] = description
44-
return field
22+
class Utils:
23+
@staticmethod
24+
def mk_fields_from(fields):
25+
new_fields = []
26+
for field in fields:
27+
if 'name' not in field:
28+
raise ValueError('field name must not be None')
29+
if 'type' not in field:
30+
raise ValueError('field type must not be None')
31+
new_field = {
32+
'name': field['name'],
33+
'type': field['type'].upper(),
34+
}
35+
if 'tags' in field:
36+
new_field['tags'] = field['tags']
37+
if 'description' in field:
38+
new_field['description'] = field['description']
39+
new_fields.append(new_field)
40+
return new_fields
4541

4642
@staticmethod
4743
def to_seconds(timeout_ms):

tests/test_marquez_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1010
# See the License for the specific language governing permissions and
1111
# limitations under the License.
12-
import json
12+
1313
import logging.config
1414
import unittest
1515
import uuid

0 commit comments

Comments
 (0)