Skip to content
This repository was archived by the owner on Jul 28, 2020. It is now read-only.

Commit 6e3e4c4

Browse files
authored
Merge pull request #7 from zenaton/single-task-dispatch
Single task dispatch
2 parents bac2fbe + 4faeebe commit 6e3e4c4

File tree

5 files changed

+57
-11
lines changed

5 files changed

+57
-11
lines changed

CHANGELOG.md

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,28 @@
1-
## Change Log
1+
# Changelog
22

3-
# Unreleased
3+
## Unreleased
44

5-
## Fixed
5+
### Added
6+
Calling `dispatch` on tasks now allows to process tasks asynchronously
7+
8+
### Fixed
69
Fixed Wait task behavior in some edge cases
710
Encodes HTTP params before sending request
811

9-
# [0.2.5] - 2018/10/17
12+
## [0.2.5] - 2018/10/17
1013
Object Serialization (including circular structures)
1114

12-
# [0.2.4] - 2018/09/26
15+
## [0.2.4] - 2018/09/26
1316
Enhanced WithDuration & WithTimestamp classes
1417

15-
# [0.2.3] - 2018/09/21
18+
## [0.2.3] - 2018/09/21
1619
Minor enhancements (including the workflow find() method)
1720

18-
# [0.2.2] - 2018/09/19
21+
## [0.2.2] - 2018/09/19
1922
New version scheme management
2023

21-
# [0.2.1] - 2018/09/17
24+
## [0.2.1] - 2018/09/17
2225
Reorganized modules
2326

24-
# [0.2.0] - 2018/09/14
27+
## [0.2.0] - 2018/09/14
2528
Full rewriting of the package

tests/fixtures/fixture_engine.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,29 @@
44
from zenaton.client import Client
55

66

7+
import os
8+
9+
import pytest
10+
from dotenv import load_dotenv
11+
12+
from zenaton.client import Client
13+
14+
# LOADING CONFIG FROM .env file
15+
load_dotenv()
16+
app_id = os.getenv('ZENATON_APP_ID')
17+
api_token = os.getenv('ZENATON_API_TOKEN')
18+
app_env = os.getenv('ZENATON_APP_ENV')
19+
20+
if not app_id:
21+
raise Exception('Please include your ZENATON_APP_ID in the .env file')
22+
23+
if not api_token:
24+
raise Exception('Please include your ZENATON_API_TOKEN in the .env file')
25+
26+
if not app_env:
27+
raise Exception('Please include your ZENATON_APP_ENV in the .env file')
28+
729
@pytest.fixture
830
def engine():
9-
Client()
31+
Client(app_id, api_token, app_env)
1032
return Engine()

tests/test_engine.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import pytest
2+
import pytest_mock
23

34
from zenaton.exceptions import InvalidArgumentError
45

@@ -18,3 +19,13 @@ def test_valid_job(engine, sequential_workflow, task0):
1819
assert not engine.valid_job(FakeTask)
1920
assert engine.valid_job(sequential_workflow)
2021
assert engine.valid_job(task0)
22+
23+
24+
@pytest.mark.usefixtures("client", "engine", "task0")
25+
def test_dispatch_task(client, engine, task0, mocker):
26+
mocker.spy(client, "start_task")
27+
task0.dispatch()
28+
assert client.start_task.call_count == 1
29+
30+
31+

zenaton/client.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class Client(metaclass=Singleton):
2929
ATTR_DATA = 'data' # Parameter name for json payload
3030
ATTR_PROG = 'programming_language' # Parameter name for the language
3131
ATTR_MODE = 'mode' # Parameter name for the worker update mode
32+
ATTR_MAX_PROCESSING_TIME = 'max_processing_time' # Pararameter name for the max processing time
3233

3334
PROG = 'Python' # The current programming language
3435

@@ -87,6 +88,15 @@ def start_workflow(self, flow):
8788
self.ATTR_ID: self.parse_custom_id_from(flow)
8889
}))
8990

91+
def start_task(self, task):
92+
return self.http.post(self.worker_url('tasks'),
93+
data=json.dumps({
94+
self.ATTR_PROG: self.PROG,
95+
self.ATTR_NAME: self.class_name(task),
96+
self.ATTR_DATA: self.serializer.encode(self.properties.from_(task)),
97+
self.ATTR_MAX_PROCESSING_TIME: task.max_processing_time() if hasattr(task, 'max_processing_time') else None
98+
}))
99+
90100
def update_instance(self, workflow, custom_id, mode):
91101
params = '{}={}'.format(self.ATTR_ID, custom_id)
92102
url = self.instance_worker_url(params)

zenaton/engine.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def local_dispatch(self, job):
4444
if issubclass(job.__class__, Workflow):
4545
self.client.start_workflow(job)
4646
else:
47-
job.handle()
47+
self.client.start_task(job)
4848

4949
def check_argument(self, job):
5050
if not self.valid_job(job):

0 commit comments

Comments
 (0)