Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dotflow/core/decorators/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ def time(func):
def inside(*args, **kwargs):
start = datetime.now()
task = func(*args, **kwargs)
task._set_duration((datetime.now() - start).total_seconds())
task.duration = (datetime.now() - start).total_seconds()
return task
return inside
22 changes: 18 additions & 4 deletions dotflow/core/exception.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Exception module"""

MESSAGE_UNKNOWN_ERROR = "Unknown error, please check logs for more information."
MESSAGE_MISSING_STEP_DECORATOR = "A step function necessarily needs an 'action' decorator to circulate in the workflow. For more implementation details, access the documentation: https://dotflow-io.github.io/dotflow/nav/getting-started/#3-task-function."
MESSAGE_MISSING_STEP_DECORATOR = "A step function necessarily needs an '@action' decorator to circulate in the workflow. For more implementation details, access the documentation: https://dotflow-io.github.io/dotflow/nav/getting-started/#3-task-function."
MESSAGE_NOT_CALLABLE_OBJECT = "Problem validating the '{name}' object type; this is not a callable object"
MESSAGE_EXECUTION_NOT_EXIST = "The execution mode does not exist. Allowed parameter is 'sequential' and 'background'."
MESSAGE_MODULE_NOT_FOUND = "Problem importing the python module, it probably doesn't exist or is wrong."
MESSAGE_MODULE_NOT_FOUND = "Problem importing the python module '{module}', it probably doesn't exist or is wrong."


class MissingActionDecorator(Exception):
Expand All @@ -24,7 +25,20 @@ def __init__(self):

class ModuleNotFound(Exception):

def __init__(self):
def __init__(self, module: str):
super(ModuleNotFound, self).__init__(
MESSAGE_MODULE_NOT_FOUND
MESSAGE_MODULE_NOT_FOUND.format(
module=module
)
)


class NotCallableObject(Exception):

def __init__(self, name: str):
super(NotCallableObject, self).__init__(
MESSAGE_NOT_CALLABLE_OBJECT.format(
name=name
)
)

2 changes: 1 addition & 1 deletion dotflow/core/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ def __init__(
) -> None:
self.task = task
self.task.status = TaskStatus.IN_PROGRESS
self.task._set_workflow_id(workflow_id)
self.task.previous_context = previous_context
self.task.workflow_id = workflow_id

self._excution()

Expand Down
4 changes: 3 additions & 1 deletion dotflow/core/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ def import_module(cls, value: str):
if hasattr(module, cls._get_name(value)):
return getattr(module, cls._get_name(value))

raise ModuleNotFound()
raise ModuleNotFound(
module=value
)

@classmethod
def _get_name(cls, value: str):
Expand Down
154 changes: 100 additions & 54 deletions dotflow/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dotflow.core.action import Action
from dotflow.core.context import Context
from dotflow.core.module import Module
from dotflow.core.exception import MissingActionDecorator
from dotflow.core.exception import MissingActionDecorator, NotCallableObject
from dotflow.core.types.status import TaskStatus
from dotflow.settings import Settings as settings
from dotflow.utils import basic_callback, traceback_error, message_error, copy_file
Expand All @@ -19,11 +19,12 @@ class TaskInstance:
def __init__(self, *args, **kwargs) -> None:
self.task_id = None
self.workflow_id = None
self.step = None
self.callback = None
self._step = None
self._callback = None
self._previous_context = None
self._initial_context = None
self._current_context = None
self._previous_context = None
self._duration = None
self._error = None
self._status = None
self._config = None
Expand All @@ -41,44 +42,45 @@ def __init__(
config: Config = None,
) -> None:
super().__init__(task_id, step, callback, initial_context, workflow_id)
self.config = config
self.task_id = task_id
self.workflow_id = workflow_id
self.step = step
self.callback = callback
self.initial_context = initial_context
self.status = TaskStatus.NOT_STARTED
self.config = config

@property
def status(self):
if not self._status:
return TaskStatus.NOT_STARTED
return self._status
def step(self):
return self._step

@status.setter
def status(self, value: TaskStatus) -> None:
self._status = value
@step.setter
def step(self, value: Callable):
new_step = value

if isinstance(value, str):
new_step = Module(value=value)

if new_step.__module__ != Action.__module__:
raise MissingActionDecorator()

logger.info("ID %s - %s - %s", self.workflow_id, self.task_id, value)
self._step = new_step

@property
def error(self):
if not self._error:
return TaskError()
return self._error
def callback(self):
return self._callback

@error.setter
def error(self, value: Exception) -> None:
task_error = TaskError(value)
self._error = task_error
@callback.setter
def callback(self, value: Callable):
new_callback = value

logger.error(
"ID %s - %s - %s \n %s",
self.workflow_id,
self.task_id,
self.status,
task_error.traceback,
)
if isinstance(value, str):
new_callback = Module(value=value)

if not isinstance(new_callback, Callable):
raise NotCallableObject(name=str(new_callback))

self._callback = new_callback

@property
def previous_context(self):
Expand All @@ -90,6 +92,26 @@ def previous_context(self):
def previous_context(self, value: Context):
self._previous_context = Context(value)

@property
def initial_context(self):
if not self._initial_context:
return Context()
return self._initial_context

@initial_context.setter
def initial_context(self, value: Context):
self._initial_context = Context(value)

if self.config.output:
logger.info(
"ID %s - %s - Initial Context -> %s",
self.workflow_id,
self.task_id,
str(value),
)

copy_file(source=settings.LOG_PATH, destination=self.config.log_path)

@property
def current_context(self):
if not self._current_context:
Expand All @@ -111,24 +133,47 @@ def current_context(self, value: Context):
copy_file(source=settings.LOG_PATH, destination=self.config.log_path)

@property
def initial_context(self):
if not self._initial_context:
return Context()
return self._initial_context
def duration(self):
return self._duration

@initial_context.setter
def initial_context(self, value: Context):
self._initial_context = Context(value)
@duration.setter
def duration(self, value: float):
self._duration = value

if self.config.output:
logger.info(
"ID %s - %s - Initial Context -> %s",
self.workflow_id,
self.task_id,
str(value),
)
@property
def error(self):
if not self._error:
return TaskError()
return self._error

copy_file(source=settings.LOG_PATH, destination=self.config.log_path)
@error.setter
def error(self, value: Exception) -> None:
task_error = TaskError(value)
self._error = task_error

logger.error(
"ID %s - %s - %s \n %s",
self.workflow_id,
self.task_id,
self.status,
task_error.traceback,
)

@property
def status(self):
if not self._status:
return TaskStatus.NOT_STARTED
return self._status

@status.setter
def status(self, value: TaskStatus) -> None:
self._status = value
logger.info(
"ID %s - %s - %s",
self.workflow_id,
self.task_id,
value
)

@property
def config(self):
Expand All @@ -140,12 +185,6 @@ def config(self):
def config(self, value: Config):
self._config = value

def _set_duration(self, value: float) -> None:
self.duration = value

def _set_workflow_id(self, value: UUID) -> None:
self.workflow_id = value


class TaskError:

Expand All @@ -158,7 +197,7 @@ def __init__(self, error: Exception = None) -> None:
class TaskBuilder:

def __init__(self, config: Config, workflow_id: UUID = None) -> None:
self.queu: List[Task] = []
self.queu: List[Callable] = []
self.workflow_id = workflow_id
self.config = config

Expand All @@ -168,10 +207,14 @@ def add(
callback: Callable = basic_callback,
initial_context: Any = None,
) -> None:
step = Module(value=step)

if step.__module__ != Action.__module__:
raise MissingActionDecorator()
if isinstance(step, list):
for inside_step in step:
self.add(
step=inside_step,
callback=callback,
initial_context=initial_context
)
return self

self.queu.append(
Task(
Expand All @@ -191,3 +234,6 @@ def count(self) -> int:

def clear(self) -> None:
self.queu.clear()

def reverse(self) -> None:
self.queu.reverse()
2 changes: 1 addition & 1 deletion examples/simple_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def main():
0000-00-00 00:00:00,000 - INFO [dotflow]: ID 56a908c5-c9f2-4ebf-a00a-895e49bd189b - 0 - In progress
0000-00-00 00:00:00,000 - INFO [dotflow]: ID 56a908c5-c9f2-4ebf-a00a-895e49bd189b - 0 - Completed
"""
system("dotflow start --step examples.cli.simple_step")
system("dotflow start --step examples.simple_cli.simple_step")


if __name__ == "__main__":
Expand Down
Loading