diff --git a/Exesh/.gitignore b/Exesh/.gitignore index f3aa553..49b3753 100644 --- a/Exesh/.gitignore +++ b/Exesh/.gitignore @@ -1 +1,2 @@ -file_storage \ No newline at end of file +file_storage +.idea \ No newline at end of file diff --git a/Exesh/Dockerfile b/Exesh/Dockerfile index b0cb29e..0d9bb40 100644 --- a/Exesh/Dockerfile +++ b/Exesh/Dockerfile @@ -15,3 +15,4 @@ RUN --mount=type=cache,target=/gomod-cache --mount=type=cache,target=/go-cache \ EXPOSE 5253 EXPOSE 5254 +EXPOSE 5255 diff --git a/Exesh/config/coordinator/dev.yml b/Exesh/config/coordinator/dev.yml index 00cb7f2..3b6a2e0 100644 --- a/Exesh/config/coordinator/dev.yml +++ b/Exesh/config/coordinator/dev.yml @@ -22,9 +22,9 @@ job_factory: execution_scheduler: executions_interval: 5s max_concurrency: 10 - execution_retry_after: 10m + execution_retry_after: 15s worker_pool: - worker_die_after: 30s + worker_die_after: 10s artifact_registry: artifact_ttl: 3m sender: diff --git a/Exesh/config/coordinator/docker.yml b/Exesh/config/coordinator/docker.yml index b3b2ad1..f1131ff 100644 --- a/Exesh/config/coordinator/docker.yml +++ b/Exesh/config/coordinator/docker.yml @@ -22,9 +22,9 @@ job_factory: execution_scheduler: executions_interval: 3s max_concurrency: 10 - execution_retry_after: 10m + execution_retry_after: 15s worker_pool: - worker_die_after: 30s + worker_die_after: 10s artifact_registry: artifact_ttl: 3m sender: diff --git a/Exesh/docker-compose.yml b/Exesh/docker-compose.yml index a505c8f..3b44ef3 100644 --- a/Exesh/docker-compose.yml +++ b/Exesh/docker-compose.yml @@ -67,13 +67,15 @@ services: build: context: . dockerfile: Dockerfile + command: + - /app/bin/worker container_name: worker-2 networks: - coduels ports: - "5255:5255" environment: - CONFIG_PATH: config/worker-2/docker.yml + CONFIG_PATH: /config/worker-2/docker.yml volumes: - /var/run/docker.sock:/var/run/docker.sock - ./config:/config:ro diff --git a/Exesh/internal/domain/execution/context.go b/Exesh/internal/domain/execution/context.go index da1c3a0..376256b 100644 --- a/Exesh/internal/domain/execution/context.go +++ b/Exesh/internal/domain/execution/context.go @@ -18,8 +18,8 @@ type Context struct { stepByJobID map[JobID]Step jobByStepName map[StepName]Job - mu *sync.Mutex - failed bool + mu *sync.Mutex + forceDone bool } func newContext(executionID ID, graph *graph) (ctx Context, err error) { @@ -31,8 +31,8 @@ func newContext(executionID ID, graph *graph) (ctx Context, err error) { stepByJobID: make(map[JobID]Step), jobByStepName: make(map[StepName]Job), - mu: &sync.Mutex{}, - failed: false, + mu: &sync.Mutex{}, + forceDone: false, } hash := sha1.New() @@ -46,7 +46,7 @@ func newContext(executionID ID, graph *graph) (ctx Context, err error) { } func (c *Context) PickSteps() []Step { - if c.isFailed() { + if c.IsForceDone() { return []Step{} } return c.graph.pickSteps() @@ -57,18 +57,18 @@ func (c *Context) ScheduledStep(step Step, job Job) { c.jobByStepName[step.GetName()] = job } -func (c *Context) FailStep(stepName StepName) { - c.mu.Lock() - defer c.mu.Unlock() - c.failed = true -} - func (c *Context) DoneStep(stepName StepName) { c.graph.doneStep(stepName) } +func (c *Context) ForceDone() { + c.mu.Lock() + defer c.mu.Unlock() + c.forceDone = true +} + func (c *Context) IsDone() bool { - if c.isFailed() { + if c.IsForceDone() { return true } return c.graph.isGraphDone() @@ -79,8 +79,8 @@ func (c *Context) GetJobForStep(stepName StepName) (Job, bool) { return job, ok } -func (c *Context) isFailed() bool { +func (c *Context) IsForceDone() bool { c.mu.Lock() defer c.mu.Unlock() - return c.failed + return c.forceDone } diff --git a/Exesh/internal/domain/execution/execution.go b/Exesh/internal/domain/execution/execution.go index 9c4feba..6e9709e 100644 --- a/Exesh/internal/domain/execution/execution.go +++ b/Exesh/internal/domain/execution/execution.go @@ -35,10 +35,23 @@ func NewExecution(steps []Step) Execution { } func (e *Execution) SetScheduled(scheduledAt time.Time) { + if e.Status == StatusFinishedExecution { + return + } + e.Status = StatusScheduledExecution e.ScheduledAt = &scheduledAt } +func (e *Execution) SetFinished(finishedAt time.Time) { + if e.Status == StatusFinishedExecution { + return + } + + e.Status = StatusFinishedExecution + e.ScheduledAt = &finishedAt +} + func (e *Execution) BuildContext() (Context, error) { return newContext(e.ID, newGraph(e.Steps)) } diff --git a/Exesh/internal/domain/execution/result.go b/Exesh/internal/domain/execution/result.go index 396f531..bd0427b 100644 --- a/Exesh/internal/domain/execution/result.go +++ b/Exesh/internal/domain/execution/result.go @@ -11,6 +11,7 @@ type ( GetType() ResultType GetDoneAt() time.Time GetError() error + ShouldFinishExecution() bool } ResultDetails struct { @@ -47,3 +48,7 @@ func (r ResultDetails) GetError() error { } return errors.New(r.Error) } + +func (r ResultDetails) ShouldFinishExecution() bool { + panic("this panic would never happen!") +} diff --git a/Exesh/internal/domain/execution/results/check_result.go b/Exesh/internal/domain/execution/results/check_result.go index adbc201..fe5c00d 100644 --- a/Exesh/internal/domain/execution/results/check_result.go +++ b/Exesh/internal/domain/execution/results/check_result.go @@ -15,3 +15,7 @@ const ( CheckStatusOK CheckStatus = "OK" CheckStatusWA CheckStatus = "WA" ) + +func (r CheckResult) ShouldFinishExecution() bool { + return r.Status != CheckStatusOK +} diff --git a/Exesh/internal/domain/execution/results/compile_result.go b/Exesh/internal/domain/execution/results/compile_result.go index 0c3f556..64de487 100644 --- a/Exesh/internal/domain/execution/results/compile_result.go +++ b/Exesh/internal/domain/execution/results/compile_result.go @@ -16,3 +16,7 @@ const ( CompileStatusOK CompileStatus = "OK" CompileStatusCE CompileStatus = "CE" ) + +func (r CompileResult) ShouldFinishExecution() bool { + return r.Status != CompileStatusOK +} diff --git a/Exesh/internal/domain/execution/results/run_result.go b/Exesh/internal/domain/execution/results/run_result.go index 6fd1cf4..08e37ab 100644 --- a/Exesh/internal/domain/execution/results/run_result.go +++ b/Exesh/internal/domain/execution/results/run_result.go @@ -21,3 +21,7 @@ const ( RunStatusTL RunStatus = "TL" RunStatusML RunStatus = "ML" ) + +func (r RunResult) ShouldFinishExecution() bool { + return r.Status != RunStatusOK +} diff --git a/Exesh/internal/registry/artifact_registry.go b/Exesh/internal/registry/artifact_registry.go index 9e8705f..928181c 100644 --- a/Exesh/internal/registry/artifact_registry.go +++ b/Exesh/internal/registry/artifact_registry.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "math/rand/v2" + "sync" "time" ) @@ -16,6 +17,7 @@ type ( workerPool workerPool + mu sync.Mutex workerArtifacts map[string]map[execution.JobID]time.Time } @@ -36,6 +38,9 @@ func NewArtifactRegistry(log *slog.Logger, cfg config.ArtifactRegistryConfig, wo } func (r *ArtifactRegistry) GetWorker(jobID execution.JobID) (workerID string, err error) { + r.mu.Lock() + defer r.mu.Unlock() + workers := make([]string, 0) for worker, artifacts := range r.workerArtifacts { if trashTime, ok := artifacts[jobID]; ok && r.workerPool.IsAlive(worker) { @@ -57,6 +62,9 @@ func (r *ArtifactRegistry) GetWorker(jobID execution.JobID) (workerID string, er } func (r *ArtifactRegistry) PutArtifact(workerID string, jobID execution.JobID) { + r.mu.Lock() + defer r.mu.Unlock() + artifacts := r.workerArtifacts[workerID] if artifacts == nil { artifacts = make(map[execution.JobID]time.Time) diff --git a/Exesh/internal/scheduler/execution_scheduler.go b/Exesh/internal/scheduler/execution_scheduler.go index 2ebd0c7..cd60f29 100644 --- a/Exesh/internal/scheduler/execution_scheduler.go +++ b/Exesh/internal/scheduler/execution_scheduler.go @@ -2,6 +2,7 @@ package scheduler import ( "context" + "errors" "exesh/internal/config" "exesh/internal/domain/execution" "fmt" @@ -33,9 +34,9 @@ type ( } executionStorage interface { + GetForUpdate(context.Context, execution.ID) (*execution.Execution, error) GetForSchedule(context.Context, time.Time) (*execution.Execution, error) - Update(context.Context, execution.Execution) error - Finish(context.Context, execution.ID) error + Save(context.Context, execution.Execution) error } jobFactory interface { @@ -87,7 +88,15 @@ func NewExecutionScheduler( } func (s *ExecutionScheduler) Start(ctx context.Context) { - go s.runExecutionScheduler(ctx) + go func() { + err := s.runExecutionScheduler(ctx) + if errors.Is(err, context.Canceled) { + err = nil + } + if err != nil { + s.log.Error("execution scheduler exited with error", slog.Any("error", err)) + } + }() } func (s *ExecutionScheduler) runExecutionScheduler(ctx context.Context) error { @@ -107,14 +116,12 @@ func (s *ExecutionScheduler) runExecutionScheduler(ctx context.Context) error { continue } - s.log.Info("begin execution scheduler loop") + s.log.Info("begin execution scheduler loop", slog.Int("now_executions", s.getNowExecutions())) + s.changeNowExecutions(+1) if err := s.unitOfWork.Do(ctx, func(ctx context.Context) error { - s.changeNowExecutions(+1) - e, err := s.executionStorage.GetForSchedule(ctx, time.Now().Add(-s.cfg.ExecutionRetryAfter)) if err != nil { - s.changeNowExecutions(-1) return fmt.Errorf("failed to get execution for schedule from storage: %w", err) } if e == nil { @@ -125,24 +132,21 @@ func (s *ExecutionScheduler) runExecutionScheduler(ctx context.Context) error { e.SetScheduled(time.Now()) - if err = s.executionStorage.Update(ctx, *e); err != nil { - s.changeNowExecutions(-1) - return fmt.Errorf("failed to update execution in storage %s: %w", e.ID.String(), err) - } - execCtx, err := e.BuildContext() if err != nil { - s.changeNowExecutions(-1) return fmt.Errorf("failed to build execution context: %w", err) } if err = s.scheduleExecution(ctx, &execCtx); err != nil { - s.changeNowExecutions(-1) return fmt.Errorf("failed to schedule execution: %w", err) + } + if err = s.executionStorage.Save(ctx, *e); err != nil { + return fmt.Errorf("failed to update execution in storage %s: %w", e.ID.String(), err) } return nil }); err != nil { + s.changeNowExecutions(-1) s.log.Error("failed to schedule execution", slog.Any("error", err)) } } @@ -160,110 +164,125 @@ func (s *ExecutionScheduler) scheduleExecution(ctx context.Context, execCtx *exe } for _, step := range execCtx.PickSteps() { - s.scheduleStep(ctx, execCtx, step) + if err = s.scheduleStep(ctx, execCtx, step); err != nil { + return err + } } return nil } -func (s *ExecutionScheduler) scheduleStep(ctx context.Context, execCtx *execution.Context, step execution.Step) { - s.log.Info("schedule step", slog.Any("step_name", step.GetName())) +func (s *ExecutionScheduler) scheduleStep( + ctx context.Context, + execCtx *execution.Context, + step execution.Step, +) error { + if execCtx.IsDone() { + return nil + } + + s.log.Info("schedule step", slog.Any("step", step.GetName())) job, err := s.jobFactory.Create(ctx, execCtx, step) if err != nil { s.log.Error("failed to create job for step", slog.Any("step_name", step.GetName()), slog.Any("error", err)) - return + return fmt.Errorf("failed to create job for step %s: %w", step.GetName(), err) } s.jobScheduler.Schedule(ctx, job, func(ctx context.Context, result execution.Result) { if result.GetError() != nil { - if err = s.failStep(ctx, execCtx, step, result); err != nil { - s.log.Error("failed to fail step", slog.Any("step_name", step.GetName()), slog.Any("error", err)) - } + s.failStep(ctx, execCtx, step, result) } else { - if err = s.doneStep(ctx, execCtx, step, result); err != nil { - s.log.Error("failed to done step", slog.Any("step_name", step.GetName()), slog.Any("error", err)) - } + s.doneStep(ctx, execCtx, step, result) } }) execCtx.ScheduledStep(step, job) + + return nil } -func (s *ExecutionScheduler) failStep(ctx context.Context, execCtx *execution.Context, step execution.Step, result execution.Result) error { +func (s *ExecutionScheduler) failStep( + ctx context.Context, + execCtx *execution.Context, + step execution.Step, + result execution.Result, +) { if execCtx.IsDone() { - return nil - } - - defer s.changeNowExecutions(-1) - - execCtx.FailStep(step.GetName()) - - msg, err := s.messageFactory.CreateExecutionFinishedError(execCtx, result.GetError().Error()) - if err != nil { - return fmt.Errorf("failed to create message for result: %w", err) - } - if err = s.messageSender.Send(ctx, msg); err != nil { - return fmt.Errorf("failed to send message: %w", err) + return } - if err := s.unitOfWork.Do(ctx, func(ctx context.Context) error { - if err = s.executionStorage.Finish(ctx, execCtx.ExecutionID); err != nil { - return err - } - return nil - }); err != nil { - s.log.Error("failed to finish execution in storage", slog.Any("error", err)) - return fmt.Errorf("failed to finish execution in storage: %w", err) - } + s.log.Info("fail step", + slog.Any("step", step.GetName()), + slog.Any("execution", execCtx.ExecutionID.String()), + slog.Any("error", result.GetError()), + ) - return nil + s.finishExecution(ctx, execCtx, result.GetError()) } -func (s *ExecutionScheduler) doneStep(ctx context.Context, execCtx *execution.Context, step execution.Step, result execution.Result) error { +func (s *ExecutionScheduler) doneStep( + ctx context.Context, + execCtx *execution.Context, + step execution.Step, + result execution.Result, +) { if execCtx.IsDone() { - return nil - } - - execCtx.DoneStep(step.GetName()) - - msg, err := s.messageFactory.CreateForStep(execCtx, step, result) - if err != nil { - return fmt.Errorf("failed to create message for result: %w", err) - } - if err = s.messageSender.Send(ctx, msg); err != nil { - return fmt.Errorf("failed to send message: %w", err) + return } - if execCtx.IsDone() { - defer s.changeNowExecutions(-1) + s.log.Info("done step", + slog.Any("step", step.GetName()), + slog.Any("execution", execCtx.ExecutionID.String()), + ) - if err := s.unitOfWork.Do(ctx, func(ctx context.Context) error { - if err = s.executionStorage.Finish(ctx, execCtx.ExecutionID); err != nil { - return err - } - return nil - }); err != nil { - s.log.Error("failed to finish execution in storage", slog.Any("error", err)) - return fmt.Errorf("failed to finish execution in storage: %w", err) + if err := s.unitOfWork.Do(ctx, func(ctx context.Context) error { + e, err := s.executionStorage.GetForUpdate(ctx, execCtx.ExecutionID) + if err != nil { + return fmt.Errorf("failed to get execution for update from storage: %w", err) + } + if e == nil { + return fmt.Errorf("failed to get execution for update from storage: not found") } - msg, err := s.messageFactory.CreateExecutionFinished(execCtx) + msg, err := s.messageFactory.CreateForStep(execCtx, step, result) if err != nil { - return fmt.Errorf("failed to create execution finished message: %w", err) + return fmt.Errorf("failed to create message for step: %w", err) } if err = s.messageSender.Send(ctx, msg); err != nil { - return fmt.Errorf("failed to send %s message: %w", msg.GetType(), err) + return fmt.Errorf("failed to send message for step: %w", err) } + execCtx.DoneStep(step.GetName()) + + e.SetScheduled(time.Now()) + + if err = s.executionStorage.Save(ctx, *e); err != nil { + return err + } return nil + }); err != nil { + s.log.Error("failed to update execution in storage for done step", slog.Any("error", err)) + s.finishExecution( + ctx, + execCtx, + fmt.Errorf("failed to update execution in storage for done step %s: %w", step.GetName(), err)) + return } - for _, step := range execCtx.PickSteps() { - s.scheduleStep(ctx, execCtx, step) + if execCtx.IsDone() || result.ShouldFinishExecution() { + s.finishExecution(ctx, execCtx, nil) + return } - return nil + for _, step = range execCtx.PickSteps() { + if err := s.scheduleStep(ctx, execCtx, step); err != nil { + s.log.Error("failed to schedule step", + slog.Any("step", step.GetName()), + slog.Any("error", err)) + s.finishExecution(ctx, execCtx, fmt.Errorf("failed to schedule step %s: %w", step.GetName(), err)) + } + } } func (s *ExecutionScheduler) getNowExecutions() int { @@ -279,3 +298,58 @@ func (s *ExecutionScheduler) changeNowExecutions(delta int) { s.nowExecutions += delta } + +func (s *ExecutionScheduler) finishExecution( + ctx context.Context, + execCtx *execution.Context, + execError error, +) { + if execCtx.IsForceDone() { + return + } + + if execError == nil { + s.log.Info("finish execution", slog.String("execution", execCtx.ExecutionID.String())) + } else { + s.log.Warn("finish execution with error", + slog.String("execution", execCtx.ExecutionID.String()), + slog.Any("error", execError)) + } + + defer s.changeNowExecutions(-1) + + execCtx.ForceDone() + + if err := s.unitOfWork.Do(ctx, func(ctx context.Context) error { + e, err := s.executionStorage.GetForUpdate(ctx, execCtx.ExecutionID) + if err != nil { + return fmt.Errorf("failed to get execution for update from storage: %w", err) + } + if e == nil { + return fmt.Errorf("failed to get execution for update from storage: not found") + } + + var msg execution.Message + if execError == nil { + msg, err = s.messageFactory.CreateExecutionFinished(execCtx) + } else { + msg, err = s.messageFactory.CreateExecutionFinishedError(execCtx, execError.Error()) + } + if err != nil { + return fmt.Errorf("failed to create execution finished message: %w", err) + } + if err = s.messageSender.Send(ctx, msg); err != nil { + return fmt.Errorf("failed to send execution finished message: %w", err) + } + + e.SetFinished(time.Now()) + + if err = s.executionStorage.Save(ctx, *e); err != nil { + return err + } + return nil + }); err != nil { + s.log.Error("failed to finish execution in storage", slog.Any("error", err)) + return + } +} diff --git a/Exesh/internal/storage/postgres/execution_storage.go b/Exesh/internal/storage/postgres/execution_storage.go index fa700e9..df6cc2f 100644 --- a/Exesh/internal/storage/postgres/execution_storage.go +++ b/Exesh/internal/storage/postgres/execution_storage.go @@ -33,6 +33,12 @@ const ( VALUES ($1, $2, $3, $4, $5, $6); ` + selectForUpdateQuery = ` + SELECT id, steps, status, created_at, scheduled_at, finished_at FROM Executions + WHERE id = $1 + FOR UPDATE + ` + selectForScheduleQuery = ` SELECT id, steps, status, created_at, scheduled_at, finished_at FROM Executions WHERE status = $1 OR (status = $2 AND scheduled_at < $3) @@ -45,11 +51,6 @@ const ( UPDATE Executions SET steps=$2, status=$3, created_at=$4, scheduled_at=$5, finished_at=$6 WHERE id=$1; ` - - finishQuery = ` - UPDATE Executions SET status=$2, finished_at=$3 - WHERE id=$1; - ` ) func NewExecutionStorage(ctx context.Context, log *slog.Logger) (*ExecutionStorage, error) { @@ -73,14 +74,13 @@ func (s *ExecutionStorage) Create(ctx context.Context, e execution.Execution) er return nil } -func (s *ExecutionStorage) GetForSchedule(ctx context.Context, retryBefore time.Time) (e *execution.Execution, err error) { +func (s *ExecutionStorage) GetForUpdate(ctx context.Context, id execution.ID) (e *execution.Execution, err error) { tx := extractTx(ctx) e = &execution.Execution{} var eid string var stepsRaw json.RawMessage - if err = tx.QueryRowContext(ctx, selectForScheduleQuery, - execution.StatusNewExecution, execution.StatusScheduledExecution, retryBefore). + if err = tx.QueryRowContext(ctx, selectForUpdateQuery, id). Scan(&eid, &stepsRaw, &e.Status, &e.CreatedAt, &e.ScheduledAt, &e.FinishedAt); err != nil { if errors.Is(err, sql.ErrNoRows) { e = nil @@ -104,22 +104,43 @@ func (s *ExecutionStorage) GetForSchedule(ctx context.Context, retryBefore time. return } -func (s *ExecutionStorage) Update(ctx context.Context, e execution.Execution) error { +func (s *ExecutionStorage) GetForSchedule(ctx context.Context, retryBefore time.Time) (e *execution.Execution, err error) { tx := extractTx(ctx) - if _, err := tx.ExecContext(ctx, updateQuery, - e.ID, e.Steps, e.Status, e.CreatedAt, e.ScheduledAt, e.FinishedAt); err != nil { - return fmt.Errorf("failed to do update query: %w", err) + e = &execution.Execution{} + var eid string + var stepsRaw json.RawMessage + if err = tx.QueryRowContext(ctx, selectForScheduleQuery, + execution.StatusNewExecution, execution.StatusScheduledExecution, retryBefore). + Scan(&eid, &stepsRaw, &e.Status, &e.CreatedAt, &e.ScheduledAt, &e.FinishedAt); err != nil { + if errors.Is(err, sql.ErrNoRows) { + e = nil + err = nil + return + } + err = fmt.Errorf("failed to do select query: %w", err) + return } - return nil + if err = e.ID.FromString(eid); err != nil { + err = fmt.Errorf("failed to unmarshal id: %w", err) + return + } + + if e.Steps, err = steps.UnmarshalStepsJSON(stepsRaw); err != nil { + err = fmt.Errorf("failed to unmarshal steps json: %w", err) + return + } + + return } -func (s *ExecutionStorage) Finish(ctx context.Context, executionID execution.ID) error { +func (s *ExecutionStorage) Save(ctx context.Context, e execution.Execution) error { tx := extractTx(ctx) - if _, err := tx.ExecContext(ctx, finishQuery, executionID, execution.StatusFinishedExecution, time.Now()); err != nil { - return fmt.Errorf("failed to do finish query: %w", err) + if _, err := tx.ExecContext(ctx, updateQuery, + e.ID, e.Steps, e.Status, e.CreatedAt, e.ScheduledAt, e.FinishedAt); err != nil { + return fmt.Errorf("failed to do update query: %w", err) } return nil