Skip to content
This repository was archived by the owner on Jul 28, 2020. It is now read-only.

Commit b97ff18

Browse files
authored
Merge pull request #49 from zenaton/feature/graphql_gateway
Use Zenaton Gateway instead of the local agent.
2 parents c0078d3 + 242b7b3 commit b97ff18

File tree

4 files changed

+216
-127
lines changed

4 files changed

+216
-127
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,13 @@
22

33
## [Unreleased]
44

5+
### Added
6+
57
- Added `custom_id` argument for workflow schedule.
8+
- Dispatch of tasks and workflows are now done using the API instead of a local agent.
9+
- Pause, Resume and Kill workflows are now done using the API instead of a local agent.
10+
- Send event to workflow is now done using the API instead of a local agent.
11+
- Find workflow is now done using the API instead of a local agent.
612

713
## [0.4.1] - 2019-09-25
814

tests/test_client.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,8 @@
1111
@pytest.mark.usefixtures("client")
1212
def test_url_functions(client):
1313
assert validate_url(client.worker_url())
14-
assert validate_url(client.send_event_url())
14+
assert validate_url(client.gateway_url())
1515
assert validate_url(client.instance_worker_url())
16-
if os.getenv('ZENATON_API_TOKEN'):
17-
assert validate_url(client.instance_website_url())
18-
else:
19-
with pytest.raises(ValueError, match=r'.*API token.*'):
20-
client.instance_website_url()
2116

2217

2318
@pytest.mark.usefixtures("client", "sequential_workflow")

zenaton/client.py

Lines changed: 137 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import uuid
66

77
from .abstracts.workflow import Workflow
8-
from .exceptions import InvalidArgumentError
8+
from .exceptions import InvalidArgumentError, ExternalError
99
from .services.http_service import HttpService
1010
from .services.graphql_service import GraphQLService
1111
from .services.properties import Properties
@@ -27,30 +27,12 @@ class Client(metaclass=Singleton):
2727
APP_ID = 'app_id' # Parameter name for the application ID
2828
API_TOKEN = 'api_token' # Parameter name for the API token
2929

30-
ATTR_INTENT_ID = 'intent_id' # Parameter name for intent_id
31-
ATTR_ID = 'custom_id' # Parameter name for custom ids
32-
ATTR_NAME = 'name' # Parameter name for workflow names
33-
ATTR_CANONICAL = 'canonical_name' # Parameter name for version name
34-
ATTR_DATA = 'data' # Parameter name for json payload
35-
ATTR_PROG = 'programming_language' # Parameter name for the language
36-
ATTR_MODE = 'mode' # Parameter name for the worker update mode
37-
ATTR_MAX_PROCESSING_TIME = 'max_processing_time' # Pararameter name for the max processing time
38-
39-
PROG = 'Python' # The current programming language
40-
41-
EVENT_INPUT = 'event_input' # Parameter name for event input
42-
EVENT_NAME = 'event_name' # Parameter name for event name
43-
EVENT_DATA = 'event_data' # Parameter name for event data
44-
45-
WORKFLOW_KILL = 'kill' # Worker update mode to stop a worker
46-
WORKFLOW_PAUSE = 'pause' # Worker udpate mode to pause a worker
47-
WORKFLOW_RUN = 'run' # Worker update mode to resume a worker
30+
PROG = 'PYTHON' # The current programming language
4831

4932
def __init__(self, app_id='', api_token='', app_env=''):
5033
self.app_id = app_id
5134
self.api_token = api_token
5235
self.app_env = app_env
53-
self.http = HttpService()
5436
self.graphql = GraphQLService()
5537
self.serializer = Serializer()
5638
self.properties = Properties()
@@ -93,85 +75,68 @@ def website_url(self, resource='', params=''):
9375
url = '{}/{}?{}={}&'.format(api_url, resource, self.API_TOKEN, self.api_token)
9476
return self.add_app_env(url, params)
9577

96-
def send_event_url(self):
97-
return self.worker_url('events')
98-
9978
"""
10079
Start the specified workflow
10180
:params .abstracts.workflow.Workflow flow
10281
"""
10382
def start_workflow(self, flow):
104-
with self._connect_to_agent():
105-
return self.http.post(
106-
self.instance_worker_url(),
107-
data=json.dumps({
108-
self.ATTR_INTENT_ID: self.uuid(),
109-
self.ATTR_PROG: self.PROG,
110-
self.ATTR_CANONICAL: self.canonical_name(flow),
111-
self.ATTR_NAME: self.class_name(flow),
112-
self.ATTR_DATA: self.serializer.encode(self.properties.from_(flow)),
113-
self.ATTR_ID: self.parse_custom_id_from(flow)
114-
}))
83+
query = self.graphql.DISPATCH_WORKFLOW
84+
variables = {
85+
'input': {
86+
'intent_id': self.uuid(),
87+
'environment_name': self.app_env,
88+
'programming_language': self.PROG,
89+
'custom_id': self.parse_custom_id_from(flow),
90+
'name': self.class_name(flow),
91+
'canonical_name': self.canonical_name(flow),
92+
'data': self.serializer.encode(self.properties.from_(flow))
93+
}
94+
}
95+
return self.gateway_request(query, variables=variables, data_response_key="dispatchWorkflow")
11596

11697
def start_task(self, task):
117-
with self._connect_to_agent():
118-
return self.http.post(
119-
self.worker_url('tasks'),
120-
data=json.dumps({
121-
self.ATTR_INTENT_ID: self.uuid(),
122-
self.ATTR_PROG: self.PROG,
123-
self.ATTR_NAME: self.class_name(task),
124-
self.ATTR_DATA: self.serializer.encode(self.properties.from_(task)),
125-
self.ATTR_MAX_PROCESSING_TIME: task.max_processing_time() if hasattr(task, 'max_processing_time') else None
126-
}))
98+
query = self.graphql.DISPATCH_TASK
99+
variables = {
100+
'input': {
101+
'intent_id': self.uuid(),
102+
'environment_name': self.app_env,
103+
'programming_language': self.PROG,
104+
'max_processing_time': task.max_processing_time() if hasattr(task, 'max_processing_time') else None,
105+
'name': self.class_name(task),
106+
'data': self.serializer.encode(self.properties.from_(task))
107+
}
108+
}
109+
return self.gateway_request(query, variables=variables, data_response_key="dispatchTask")
127110

128111
def start_scheduled_workflow(self, flow, cron):
129-
url = self.gateway_url()
130-
headers = self.gateway_headers()
131112
query = self.graphql.CREATE_WORKFLOW_SCHEDULE
132113
variables = {
133-
'createWorkflowScheduleInput': {
134-
'intentId': self.uuid(),
135-
'environmentName': self.app_env,
114+
'input': {
115+
'intent_id': self.uuid(),
116+
'environment_name': self.app_env,
136117
'cron': cron,
137118
'customId': self.parse_custom_id_from(flow),
138119
'workflowName': self.class_name(flow),
139120
'canonicalName': self.canonical_name(flow) or self.class_name(flow),
140-
'programmingLanguage': self.PROG.upper(),
121+
'programmingLanguage': self.PROG,
141122
'properties': self.serializer.encode(self.properties.from_(flow))
142123
}
143124
}
144-
res = self.graphql.request(url, query, variables=variables, headers=headers)
145-
return res['data']['createWorkflowSchedule']
125+
return self.gateway_request(query, variables=variables, data_response_key="createWorkflowSchedule")
146126

147127
def start_scheduled_task(self, task, cron):
148-
url = self.gateway_url()
149-
headers = self.gateway_headers()
150128
query = self.graphql.CREATE_TASK_SCHEDULE
151129
variables = {
152-
'createTaskScheduleInput': {
153-
'intentId': self.uuid(),
154-
'environmentName': self.app_env,
130+
'input': {
131+
'intent_id': self.uuid(),
132+
'environment_name': self.app_env,
155133
'cron': cron,
156-
'taskName': self.class_name(task),
157-
'programmingLanguage': self.PROG.upper(),
134+
'task_name': self.class_name(task),
135+
'programming_language': self.PROG,
158136
'properties': self.serializer.encode(self.properties.from_(task))
159137
}
160138
}
161-
res = self.graphql.request(url, query, variables=variables, headers=headers)
162-
return res['data']['createTaskSchedule']
163-
164-
def update_instance(self, workflow, custom_id, mode):
165-
params = '{}={}'.format(self.ATTR_ID, custom_id)
166-
url = self.instance_worker_url(params)
167-
options = json.dumps({
168-
self.ATTR_INTENT_ID: self.uuid(),
169-
self.ATTR_PROG: self.PROG,
170-
self.ATTR_NAME: workflow.__name__,
171-
self.ATTR_MODE: mode
172-
})
173-
with self._connect_to_agent():
174-
return self.http.put(url, options)
139+
return self.gateway_request(query, variables=variables, data_response_key="createTaskSchedule")
175140

176141
"""
177142
Sends an event to a workflow
@@ -181,17 +146,20 @@ def update_instance(self, workflow, custom_id, mode):
181146
:returns None
182147
"""
183148
def send_event(self, workflow_name, custom_id, event):
184-
body = json.dumps({
185-
self.ATTR_INTENT_ID: self.uuid(),
186-
self.ATTR_PROG: self.PROG,
187-
self.ATTR_NAME: workflow_name,
188-
self.ATTR_ID: custom_id,
189-
self.EVENT_NAME: type(event).__name__,
190-
self.EVENT_INPUT: self.serializer.encode(self.properties.from_(event)),
191-
self.EVENT_DATA: self.serializer.encode(event),
192-
})
193-
with self._connect_to_agent():
194-
return self.http.post(self.send_event_url(), body)
149+
query = self.graphql.SEND_EVENT
150+
variables = {
151+
'input': {
152+
'intent_id': self.uuid(),
153+
'custom_id': custom_id,
154+
'environment_name': self.app_env,
155+
'programming_language': self.PROG,
156+
'name': type(event).__name__,
157+
'input': self.serializer.encode(self.properties.from_(event)),
158+
'workflow_name': workflow_name,
159+
'data': self.serializer.encode(event)
160+
}
161+
}
162+
return self.gateway_request(query, variables=variables, data_response_key="sendEventToWorkflowByNameAndCustomId")
195163

196164
"""
197165
Finds a workflow
@@ -201,52 +169,83 @@ def send_event(self, workflow_name, custom_id, event):
201169
"""
202170

203171
def find_workflow(self, workflow, custom_id):
204-
205-
params = '{}={}&{}={}&{}={}'.format(
206-
self.ATTR_ID,
207-
custom_id,
208-
self.ATTR_NAME,
209-
workflow.__name__,
210-
self.ATTR_PROG,
211-
self.PROG)
212-
response = self.http.get(self.instance_website_url(params))
213-
214-
if response.get('data', None) is not None:
215-
data = response['data']
216-
return self.properties.object_from(
172+
query = self.graphql.FIND_WORKFLOW
173+
variables = {
174+
'custom_id': custom_id,
175+
'environment_name': self.app_env,
176+
'programming_language': self.PROG,
177+
'name': workflow.__name__
178+
}
179+
res = self.gateway_request(query, variables=variables, data_response_key="findWorkflow", throw_on_error=False)
180+
errors = self.get_graphql_errors(res)
181+
if errors:
182+
if self.contains_not_found_error(errors):
183+
return None
184+
else:
185+
raise ExternalError(errors)
186+
187+
return self.properties.object_from(
217188
workflow,
218-
self.serializer.decode(data['properties']),
189+
self.serializer.decode(res['properties']),
219190
Workflow
220191
)
221-
else:
222-
return None
223192

224193
"""
225194
Stops a workflow
226-
:param String workflow_name the class name of the workflow
195+
:param .abstracts.workflow.Workflow workflow
227196
:param String custom_id the custom ID of the workflow, if any
228197
:returns None
229198
"""
230-
def kill_workflow(self, workflow_name, custom_id):
231-
return self.update_instance(workflow_name, custom_id, self.WORKFLOW_KILL)
199+
def kill_workflow(self, workflow, custom_id):
200+
query = self.graphql.KILL_WORKFLOW
201+
variables = {
202+
'input': {
203+
'intent_id': self.uuid(),
204+
'environment_name': self.app_env,
205+
'programming_language': self.PROG,
206+
'custom_id': custom_id,
207+
'name': workflow.__name__
208+
}
209+
}
210+
return self.gateway_request(query, variables=variables, data_response_key="killWorkflow")
232211

233212
"""
234213
Pauses a workflow
235-
:param String workflow_name the class name of the workflow
214+
:param .abstracts.workflow.Workflow flow
236215
:param String custom_id the custom ID of the workflow, if any
237216
:returns None
238217
"""
239-
def pause_workflow(self, workflow_name, custom_id):
240-
return self.update_instance(workflow_name, custom_id, self.WORKFLOW_PAUSE)
218+
def pause_workflow(self, workflow, custom_id):
219+
query = self.graphql.PAUSE_WORKFLOW
220+
variables = {
221+
'input': {
222+
'intent_id': self.uuid(),
223+
'environment_name': self.app_env,
224+
'programming_language': self.PROG,
225+
'custom_id': custom_id,
226+
'name': workflow.__name__
227+
}
228+
}
229+
return self.gateway_request(query, variables=variables, data_response_key="pauseWorkflow")
241230

242231
"""
243232
Resumes a workflow
244-
:param String workflow_name the class name of the workflow
233+
:param .abstracts.workflow.Workflow flow
245234
:param String custom_id the custom ID of the workflow, if any
246235
:returns None
247236
"""
248-
def resume_workflow(self, workflow_name, custom_id):
249-
return self.update_instance(workflow_name, custom_id, self.WORKFLOW_RUN)
237+
def resume_workflow(self, workflow, custom_id):
238+
query = self.graphql.RESUME_WORKFLOW
239+
variables = {
240+
'input': {
241+
'intent_id': self.uuid(),
242+
'environment_name': self.app_env,
243+
'programming_language': self.PROG,
244+
'custom_id': custom_id,
245+
'name': workflow.__name__
246+
}
247+
}
248+
return self.gateway_request(query, variables=variables, data_response_key="resumeWorkflow")
250249

251250
def instance_website_url(self, params=''):
252251
return self.website_url('instances', params)
@@ -283,6 +282,35 @@ def class_name(self, flow):
283282
return type(flow.current_implementation()).__name__
284283
return type(flow).__name__
285284

285+
def gateway_request(self, query, variables=None, data_response_key=None, throw_on_error=True):
286+
url = self.gateway_url()
287+
headers = self.gateway_headers()
288+
res = self.graphql.request(url, query, variables=variables, headers=headers)
289+
290+
errors = self.get_graphql_errors(res)
291+
if errors:
292+
if throw_on_error:
293+
raise ExternalError(errors)
294+
else:
295+
return res
296+
297+
if data_response_key:
298+
return res['data'][data_response_key]
299+
else:
300+
return res['data']
301+
302+
def contains_not_found_error(self, errors):
303+
not_found_errors = list(filter(lambda error: error.get('type') == 'NOT_FOUND', errors))
304+
return len(not_found_errors) > 0
305+
306+
def get_graphql_errors(self, response):
307+
errors = response.get('errors')
308+
if isinstance(errors, list) and len(errors) > 0:
309+
for error in errors:
310+
if 'locations' in error:
311+
del error['locations']
312+
return errors
313+
286314
@contextmanager
287315
def _connect_to_agent(self):
288316
"""Display nice error message if connection to agent fails."""

0 commit comments

Comments
 (0)