From b4a0bc7abd14585c413f586749d57806e37f42cc Mon Sep 17 00:00:00 2001 From: Wojtek Bednarzak Date: Thu, 12 Apr 2018 16:10:11 +0100 Subject: [PATCH 1/3] Make the manager responsible for saving the tasks. This change adds a empty state store which does nothing. It is not yet fully decided what component should be storing the state, so the default is changed for the worker do not save any data. --- manager/src/state/file.rs | 23 +++++++++++++++++++---- manager/src/state/mod.rs | 6 ++++-- worker/settings/settings.go | 2 +- worker/state/empty.go | 21 +++++++++++++++++++++ worker/state/state.go | 2 ++ 5 files changed, 47 insertions(+), 7 deletions(-) create mode 100644 worker/state/empty.go diff --git a/manager/src/state/file.rs b/manager/src/state/file.rs index c24e12e..7920e72 100644 --- a/manager/src/state/file.rs +++ b/manager/src/state/file.rs @@ -131,10 +131,25 @@ impl State for FileStore { } pending_file_path.push(task_id); - File::create(pending_file_path) - .context(StateErrorKind::PendingTaskWriteFailed)? - .write_all(&task_id.as_bytes()) - .context(StateErrorKind::PendingTaskWriteFailed)?; + match task.get_status() { + TaskStatus::TASK_UNKNOWN => Ok(()) + TaskStatus::TASK_IN_PROGRESS => Ok(()) + TaskStatus::TASK_PENDING => { + File::create(pending_file_path) + .context(StateErrorKind::PendingTaskWriteFailed)? + .write_all(&task_id.as_bytes()) + .context(StateErrorKind::PendingTaskWriteFailed)?; + } + TaskStatus::TASK_DONE => { + fs::remove_file(pending_file_path) + .context(StateErrorKind::PendingTaskRemoveFailed)?; + } + TaskStatus::TASK_FAILED => { + // TODO: Do we want to remove the pending file if the task + // fails or do we keep it? + } + } + Ok(()) } diff --git a/manager/src/state/mod.rs b/manager/src/state/mod.rs index 39d40bc..023ba4c 100644 --- a/manager/src/state/mod.rs +++ b/manager/src/state/mod.rs @@ -8,14 +8,16 @@ use std::fmt::Display; use failure::*; use futures::Future; -use heracles_proto::datatypes::{Job, Task, TaskKind}; +use heracles_proto::datatypes::{Job, Task, TaskKind, TaskStatus}; #[allow(doc_markdown)] /// Interface for creating connections to state stores, such as etcd or TiKV etc. pub trait State { /// Serialize the job and save it in the state store so it can be loaded later. fn save_job(&self, job: &Job) -> Result<(), StateError>; - /// Adds a task to the list of tasks and add it to pending + /// Adds a task to the list of tasks. + /// If the task has status of PENDING, it is added to pending tasks + /// If the task has status of DONE, the pending task is removed. fn save_task(&self, task: &Task) -> Result<(), StateError>; /// List of pending map tasks for a specific job. fn pending_map_tasks(&self, job: &Job) -> Result, StateError>; diff --git a/worker/settings/settings.go b/worker/settings/settings.go index 4978f5c..703d28b 100644 --- a/worker/settings/settings.go +++ b/worker/settings/settings.go @@ -33,7 +33,7 @@ func Init() error { func setDefaults() { settings.SetDefault("broker.queue_name", "heracles_tasks") settings.SetDefault("broker.address", "") - settings.SetDefault("state.backend", "file") + settings.SetDefault("state.backend", "empty") settings.SetDefault("state.location", "") } diff --git a/worker/state/empty.go b/worker/state/empty.go new file mode 100644 index 0000000..19c8fed --- /dev/null +++ b/worker/state/empty.go @@ -0,0 +1,21 @@ +package state + +import ( + "github.com/cpssd/heracles/proto/datatypes" + log "github.com/golang/glog" +) + +// EmptyStore implements State +type EmptyStore struct { +} + +// NewEmptyStore returns a state store which does nothing. +func NewEmptyStore() (*EmptyStore, error) { + return &EmptyStore{}, nil +} + +// SaveProgress implementation +func (f EmptyStore) SaveProgress(task *datatypes.Task) error { + log.V(1).Info("empty store saving task %s", task.GetId()) + return nil +} diff --git a/worker/state/state.go b/worker/state/state.go index a98b432..2445c94 100644 --- a/worker/state/state.go +++ b/worker/state/state.go @@ -20,6 +20,8 @@ func New() (State, error) { case "file": location := settings.Get("state.location").(string) return NewFileStore(location) + case "empty": + return NewEmptyStore() } return nil, errors.New("unknown state kind") From 2fe1b6e16ff677a2db2ddf1b120ad42644ff9f3e Mon Sep 17 00:00:00 2001 From: Wojtek Bednarzak Date: Thu, 12 Apr 2018 16:17:02 +0100 Subject: [PATCH 2/3] Fix bad code --- manager/src/state/file.rs | 4 ++-- manager/src/state/mod.rs | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/manager/src/state/file.rs b/manager/src/state/file.rs index 7920e72..7156691 100644 --- a/manager/src/state/file.rs +++ b/manager/src/state/file.rs @@ -132,8 +132,8 @@ impl State for FileStore { pending_file_path.push(task_id); match task.get_status() { - TaskStatus::TASK_UNKNOWN => Ok(()) - TaskStatus::TASK_IN_PROGRESS => Ok(()) + TaskStatus::TASK_UNKNOWN => return Ok(()), + TaskStatus::TASK_IN_PROGRESS => return Ok(()), TaskStatus::TASK_PENDING => { File::create(pending_file_path) .context(StateErrorKind::PendingTaskWriteFailed)? diff --git a/manager/src/state/mod.rs b/manager/src/state/mod.rs index 023ba4c..111b9ff 100644 --- a/manager/src/state/mod.rs +++ b/manager/src/state/mod.rs @@ -63,6 +63,8 @@ pub enum StateErrorKind { TaskWriteFailed, #[fail(display = "Failed to create pending task")] PendingTaskWriteFailed, + #[fail(display = "Failed to remove pending task")] + PendingTaskRemoveFailed, #[fail(display = "Failed operation.")] OperationFailed, } From 9364b59055608cb690c87bbe7011c22ad6767b6a Mon Sep 17 00:00:00 2001 From: Wojtek Bednarzak Date: Thu, 12 Apr 2018 17:04:09 +0100 Subject: [PATCH 3/3] Code review changes. --- manager/src/state/file.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/manager/src/state/file.rs b/manager/src/state/file.rs index 7156691..b855e20 100644 --- a/manager/src/state/file.rs +++ b/manager/src/state/file.rs @@ -132,25 +132,26 @@ impl State for FileStore { pending_file_path.push(task_id); match task.get_status() { - TaskStatus::TASK_UNKNOWN => return Ok(()), - TaskStatus::TASK_IN_PROGRESS => return Ok(()), + TaskStatus::TASK_UNKNOWN => Ok(()), + TaskStatus::TASK_IN_PROGRESS => Ok(()), TaskStatus::TASK_PENDING => { File::create(pending_file_path) .context(StateErrorKind::PendingTaskWriteFailed)? .write_all(&task_id.as_bytes()) .context(StateErrorKind::PendingTaskWriteFailed)?; + Ok(()) } TaskStatus::TASK_DONE => { fs::remove_file(pending_file_path) .context(StateErrorKind::PendingTaskRemoveFailed)?; + Ok(()) } TaskStatus::TASK_FAILED => { // TODO: Do we want to remove the pending file if the task // fails or do we keep it? + Ok(()) } } - - Ok(()) } fn pending_map_tasks(&self, job: &Job) -> Result, StateError> {