Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Exesh/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
file_storage
file_storage
.idea
1 change: 1 addition & 0 deletions Exesh/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ RUN --mount=type=cache,target=/gomod-cache --mount=type=cache,target=/go-cache \

EXPOSE 5253
EXPOSE 5254
EXPOSE 5255
4 changes: 2 additions & 2 deletions Exesh/config/coordinator/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ job_factory:
execution_scheduler:
executions_interval: 5s
max_concurrency: 10
execution_retry_after: 10m
execution_retry_after: 15s
worker_pool:
worker_die_after: 30s
worker_die_after: 10s
artifact_registry:
artifact_ttl: 3m
sender:
Expand Down
4 changes: 2 additions & 2 deletions Exesh/config/coordinator/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ job_factory:
execution_scheduler:
executions_interval: 3s
max_concurrency: 10
execution_retry_after: 10m
execution_retry_after: 15s
worker_pool:
worker_die_after: 30s
worker_die_after: 10s
artifact_registry:
artifact_ttl: 3m
sender:
Expand Down
4 changes: 3 additions & 1 deletion Exesh/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ services:
build:
context: .
dockerfile: Dockerfile
command:
- /app/bin/worker
container_name: worker-2
networks:
- coduels
ports:
- "5255:5255"
environment:
CONFIG_PATH: config/worker-2/docker.yml
CONFIG_PATH: /config/worker-2/docker.yml
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./config:/config:ro
28 changes: 14 additions & 14 deletions Exesh/internal/domain/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type Context struct {
stepByJobID map[JobID]Step
jobByStepName map[StepName]Job

mu *sync.Mutex
failed bool
mu *sync.Mutex
forceDone bool
}

func newContext(executionID ID, graph *graph) (ctx Context, err error) {
Expand All @@ -31,8 +31,8 @@ func newContext(executionID ID, graph *graph) (ctx Context, err error) {
stepByJobID: make(map[JobID]Step),
jobByStepName: make(map[StepName]Job),

mu: &sync.Mutex{},
failed: false,
mu: &sync.Mutex{},
forceDone: false,
}

hash := sha1.New()
Expand All @@ -46,7 +46,7 @@ func newContext(executionID ID, graph *graph) (ctx Context, err error) {
}

func (c *Context) PickSteps() []Step {
if c.isFailed() {
if c.IsForceDone() {
return []Step{}
}
return c.graph.pickSteps()
Expand All @@ -57,18 +57,18 @@ func (c *Context) ScheduledStep(step Step, job Job) {
c.jobByStepName[step.GetName()] = job
}

func (c *Context) FailStep(stepName StepName) {
c.mu.Lock()
defer c.mu.Unlock()
c.failed = true
}

func (c *Context) DoneStep(stepName StepName) {
c.graph.doneStep(stepName)
}

func (c *Context) ForceDone() {
c.mu.Lock()
defer c.mu.Unlock()
c.forceDone = true
}

func (c *Context) IsDone() bool {
if c.isFailed() {
if c.IsForceDone() {
return true
}
return c.graph.isGraphDone()
Expand All @@ -79,8 +79,8 @@ func (c *Context) GetJobForStep(stepName StepName) (Job, bool) {
return job, ok
}

func (c *Context) isFailed() bool {
func (c *Context) IsForceDone() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.failed
return c.forceDone
}
13 changes: 13 additions & 0 deletions Exesh/internal/domain/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,23 @@ func NewExecution(steps []Step) Execution {
}

func (e *Execution) SetScheduled(scheduledAt time.Time) {
if e.Status == StatusFinishedExecution {
return
}

e.Status = StatusScheduledExecution
e.ScheduledAt = &scheduledAt
}

func (e *Execution) SetFinished(finishedAt time.Time) {
if e.Status == StatusFinishedExecution {
return
}

e.Status = StatusFinishedExecution
e.ScheduledAt = &finishedAt
}

func (e *Execution) BuildContext() (Context, error) {
return newContext(e.ID, newGraph(e.Steps))
}
5 changes: 5 additions & 0 deletions Exesh/internal/domain/execution/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type (
GetType() ResultType
GetDoneAt() time.Time
GetError() error
ShouldFinishExecution() bool
}

ResultDetails struct {
Expand Down Expand Up @@ -47,3 +48,7 @@ func (r ResultDetails) GetError() error {
}
return errors.New(r.Error)
}

func (r ResultDetails) ShouldFinishExecution() bool {
panic("this panic would never happen!")
}
4 changes: 4 additions & 0 deletions Exesh/internal/domain/execution/results/check_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ const (
CheckStatusOK CheckStatus = "OK"
CheckStatusWA CheckStatus = "WA"
)

func (r CheckResult) ShouldFinishExecution() bool {
return r.Status != CheckStatusOK
}
4 changes: 4 additions & 0 deletions Exesh/internal/domain/execution/results/compile_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ const (
CompileStatusOK CompileStatus = "OK"
CompileStatusCE CompileStatus = "CE"
)

func (r CompileResult) ShouldFinishExecution() bool {
return r.Status != CompileStatusOK
}
4 changes: 4 additions & 0 deletions Exesh/internal/domain/execution/results/run_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ const (
RunStatusTL RunStatus = "TL"
RunStatusML RunStatus = "ML"
)

func (r RunResult) ShouldFinishExecution() bool {
return r.Status != RunStatusOK
}
8 changes: 8 additions & 0 deletions Exesh/internal/registry/artifact_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"
"math/rand/v2"
"sync"
"time"
)

Expand All @@ -16,6 +17,7 @@ type (

workerPool workerPool

mu sync.Mutex
workerArtifacts map[string]map[execution.JobID]time.Time
}

Expand All @@ -36,6 +38,9 @@ func NewArtifactRegistry(log *slog.Logger, cfg config.ArtifactRegistryConfig, wo
}

func (r *ArtifactRegistry) GetWorker(jobID execution.JobID) (workerID string, err error) {
r.mu.Lock()
defer r.mu.Unlock()

workers := make([]string, 0)
for worker, artifacts := range r.workerArtifacts {
if trashTime, ok := artifacts[jobID]; ok && r.workerPool.IsAlive(worker) {
Expand All @@ -57,6 +62,9 @@ func (r *ArtifactRegistry) GetWorker(jobID execution.JobID) (workerID string, er
}

func (r *ArtifactRegistry) PutArtifact(workerID string, jobID execution.JobID) {
r.mu.Lock()
defer r.mu.Unlock()

artifacts := r.workerArtifacts[workerID]
if artifacts == nil {
artifacts = make(map[execution.JobID]time.Time)
Expand Down
Loading
Loading