From ffd8a2188070df7b43e61ce1a8423b9d9e2bc5d4 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Fri, 27 Feb 2026 20:35:52 +0100 Subject: [PATCH] refactor pipeline runtime code --- pipeline/runtime/executor.go | 297 ----------------------------------- pipeline/runtime/option.go | 14 +- pipeline/runtime/runtime.go | 44 ++++-- pipeline/runtime/shutdown.go | 3 + pipeline/runtime/step.go | 262 ++++++++++++++++++++++++++++++ pipeline/runtime/workflow.go | 130 +++++++++++++++ 6 files changed, 433 insertions(+), 317 deletions(-) delete mode 100644 pipeline/runtime/executor.go create mode 100644 pipeline/runtime/step.go create mode 100644 pipeline/runtime/workflow.go diff --git a/pipeline/runtime/executor.go b/pipeline/runtime/executor.go deleted file mode 100644 index edf1b0d195c..00000000000 --- a/pipeline/runtime/executor.go +++ /dev/null @@ -1,297 +0,0 @@ -// Copyright 2023 Woodpecker Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package runtime - -import ( - "context" - "errors" - "strings" - "sync" - "time" - - "golang.org/x/sync/errgroup" - - backend "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types" - pipeline_errors "go.woodpecker-ci.org/woodpecker/v3/pipeline/errors" - "go.woodpecker-ci.org/woodpecker/v3/pipeline/frontend/metadata" - "go.woodpecker-ci.org/woodpecker/v3/pipeline/state" -) - -// Run starts the execution of a workflow and waits for it to complete. -func (r *Runtime) Run(runnerCtx context.Context) error { - logger := r.MakeLogger() - logger.Debug().Msgf("executing %d stages, in order of:", len(r.spec.Stages)) - for stagePos, stage := range r.spec.Stages { - stepNames := []string{} - for _, step := range stage.Steps { - stepNames = append(stepNames, step.Name) - } - - logger.Debug(). - Int("StagePos", stagePos). - Str("Steps", strings.Join(stepNames, ",")). - Msg("stage") - } - - defer func() { - ctx := runnerCtx //nolint:contextcheck - if ctx.Err() != nil { - ctx = GetShutdownCtx() - } - if err := r.engine.DestroyWorkflow(ctx, r.spec, r.taskUUID); err != nil { - logger.Error().Err(err).Msg("could not destroy engine") - } - }() - - r.started = time.Now().Unix() - if err := r.engine.SetupWorkflow(runnerCtx, r.spec, r.taskUUID); err != nil { - var stepErr *pipeline_errors.ErrInvalidWorkflowSetup - if errors.As(err, &stepErr) { - state := new(state.State) - state.Pipeline.Step = stepErr.Step - state.Pipeline.Error = stepErr.Err - state.Process = backend.State{ - Error: stepErr.Err, - Exited: true, - ExitCode: 1, - } - - // Trace the error if we have a tracer - if r.tracer != nil { - if err := r.tracer.Trace(state); err != nil { - logger.Error().Err(err).Msg("failed to trace step error") - } - } - } - - return err - } - - for _, stage := range r.spec.Stages { - select { - case <-r.ctx.Done(): - return pipeline_errors.ErrCancel - case err := <-r.execAll(runnerCtx, stage.Steps): - if err != nil { - r.err = err - } - } - } - - return r.err -} - -// Updates the current status of a step. -// If processState is nil, we assume the step did not start. -// If step did not started and err exists, it's a step start issue and step is done. -func (r *Runtime) traceStep(processState *backend.State, err error, step *backend.Step) error { - if r.tracer == nil { - // no tracer nothing to trace :) - return nil - } - - state := new(state.State) - state.Pipeline.Started = r.started - state.Pipeline.Step = step - state.Pipeline.Error = r.err - - // We have an error while starting the step - if processState == nil && err != nil { - state.Process = backend.State{ - Error: err, - Exited: true, - OOMKilled: false, - } - } else if processState != nil { - state.Process = *processState - } - - if traceErr := r.tracer.Trace(state); traceErr != nil { - return traceErr - } - return err -} - -// Executes a set of parallel steps. -func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend.Step) <-chan error { - var g errgroup.Group - done := make(chan error) - logger := r.MakeLogger() - - for _, step := range steps { - // Required since otherwise the loop variable - // will be captured by the function. This will - // recreate the step "variable" - step := step - g.Go(func() error { - // Case the pipeline was already complete. - logger.Debug(). - Str("step", step.Name). - Msg("prepare") - - switch { - case r.err != nil && !step.OnFailure: - logger.Debug(). - Str("step", step.Name). - Err(r.err). - Msgf("skipped due to OnFailure=%t", step.OnFailure) - return nil - case r.err == nil && !step.OnSuccess: - logger.Debug(). - Str("step", step.Name). - Msgf("skipped due to OnSuccess=%t", step.OnSuccess) - return nil - } - - // Trace started. - err := r.traceStep(nil, nil, step) - if err != nil { - return err - } - - // add compatibility for drone-ci plugins - metadata.SetDroneEnviron(step.Environment) - - logger.Debug(). - Str("step", step.Name). - Msg("executing") - - // setup exec func in a way it can be detached if needed - // wg will signal once - execAndTrace := func(wg *sync.WaitGroup) error { - processState, err := r.exec(runnerCtx, step, wg) - - logger.Debug(). - Str("step", step.Name). - Msg("complete") - - // normalize context cancel error - if errors.Is(err, context.Canceled) { - err = pipeline_errors.ErrCancel - } - - // Return the error after tracing it. - err = r.traceStep(processState, err, step) - if err != nil && step.Failure == metadata.FailureIgnore { - return nil - } - return err - } - - // Report all errors until the setup happened. - // Afterwards errors will be dropped. - if step.Detached { - var wg sync.WaitGroup - wg.Add(1) - var setupErr error - go func() { - setupErr = execAndTrace(&wg) - }() - wg.Wait() - return setupErr - } - - // run blocking - return execAndTrace(nil) - }) - } - - go func() { - done <- g.Wait() - close(done) - }() - - return done -} - -// Executes the step and returns the state and error. -func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step, setupWg *sync.WaitGroup) (*backend.State, error) { - defer func() { - if setupWg != nil { - setupWg.Done() - } - }() - - if err := r.engine.StartStep(r.ctx, step, r.taskUUID); err != nil { //nolint:contextcheck - return nil, err - } - startTime := time.Now().Unix() - logger := r.MakeLogger() - - var wg sync.WaitGroup - if r.logger != nil { - rc, err := r.engine.TailStep(r.ctx, step, r.taskUUID) //nolint:contextcheck - if err != nil { - return nil, err - } - - wg.Add(1) - go func() { - defer wg.Done() - - if err := r.logger(step, rc); err != nil { - logger.Error().Err(err).Msg("process logging failed") - } - _ = rc.Close() - }() - } - - // nothing else to block for detached process. - if setupWg != nil { - setupWg.Done() - // set to nil so the setupWg.Done in defer does not call it a second time - setupWg = nil - } - - // We wait until all data was logged. (Needed for some backends like local as WaitStep kills the log stream) - wg.Wait() - - waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID) //nolint:contextcheck - if err != nil { - if errors.Is(err, context.Canceled) { - waitState.Error = pipeline_errors.ErrCancel - } else { - return nil, err - } - } - - // It is important to use the runnerCtx here because - // in case the workflow was canceled we still have the docker daemon to stop the container. - if err := r.engine.DestroyStep(runnerCtx, step, r.taskUUID); err != nil { - return nil, err - } - - // we update with our start time here - waitState.Started = startTime - - // we handle cancel case - if ctxErr := r.ctx.Err(); ctxErr != nil && errors.Is(ctxErr, context.Canceled) { - waitState.Error = pipeline_errors.ErrCancel - } - - if waitState.OOMKilled { - return waitState, &pipeline_errors.OomError{ - UUID: step.UUID, - Code: waitState.ExitCode, - } - } else if waitState.ExitCode != 0 { - return waitState, &pipeline_errors.ExitError{ - UUID: step.UUID, - Code: waitState.ExitCode, - } - } - - return waitState, nil -} diff --git a/pipeline/runtime/option.go b/pipeline/runtime/option.go index f87d4a108a9..28cd3009ef9 100644 --- a/pipeline/runtime/option.go +++ b/pipeline/runtime/option.go @@ -22,43 +22,45 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/pipeline/tracing" ) -// Option configures a runtime option. +// Option configures a Runtime. type Option func(*Runtime) -// WithBackend returns an option configured with a runtime engine. +// WithBackend sets the backend engine used to run steps. func WithBackend(backend backend.Backend) Option { return func(r *Runtime) { r.engine = backend } } -// WithLogger returns an option configured with a runtime logger. +// WithLogger sets the function used to stream step logs. func WithLogger(logger logging.Logger) Option { return func(r *Runtime) { r.logger = logger } } -// WithTracer returns an option configured with a runtime tracer. +// WithTracer sets the tracer used to report step state changes. func WithTracer(tracer tracing.Tracer) Option { return func(r *Runtime) { r.tracer = tracer } } -// WithContext returns an option configured with a context. +// WithContext sets the workflow execution context. func WithContext(ctx context.Context) Option { return func(r *Runtime) { r.ctx = ctx } } +// WithDescription sets the descriptive key-value pairs attached to every log line. func WithDescription(desc map[string]string) Option { return func(r *Runtime) { - r.Description = desc + r.description = desc } } +// WithTaskUUID sets a specific task UUID instead of the auto-generated one. func WithTaskUUID(uuid string) Option { return func(r *Runtime) { r.taskUUID = uuid diff --git a/pipeline/runtime/runtime.go b/pipeline/runtime/runtime.go index 5fb197b4ab5..c4142b7ae43 100644 --- a/pipeline/runtime/runtime.go +++ b/pipeline/runtime/runtime.go @@ -16,6 +16,7 @@ package runtime import ( "context" + "sync" "github.com/oklog/ulid/v2" "github.com/rs/zerolog" @@ -27,44 +28,59 @@ import ( ) // Runtime represents a workflow state executed by a specific backend. -// Each workflow gets its own state configuration at runtime. +// Each workflow gets its own Runtime instance. type Runtime struct { - err error + // err holds the first error that occurred in the workflow. + // Always use getErr/setErr to access it — it is read and written from concurrent goroutines. + errMu sync.RWMutex + err error + spec *backend.Config engine backend.Backend started int64 - // The context a workflow is being executed with. - // All normal (non cleanup) operations must use this. - // Cleanup operations should use the runnerCtx passed to Run() + // ctx is the context for the current workflow execution. + // All normal (non-cleanup) step operations must use this context. + // Cleanup operations should use the runnerCtx passed to Run(). ctx context.Context tracer tracing.Tracer logger logging.Logger - taskUUID string - - Description map[string]string // The runtime descriptors. + taskUUID string + description map[string]string } -// New returns a new runtime using the specified runtime -// configuration and runtime engine. +// New returns a new Runtime for the given workflow spec and options. func New(spec *backend.Config, opts ...Option) *Runtime { r := new(Runtime) - r.Description = map[string]string{} + r.description = map[string]string{} r.spec = spec r.ctx = context.Background() r.taskUUID = ulid.Make().String() - for _, opts := range opts { - opts(r) + for _, opt := range opts { + opt(r) } return r } +// MakeLogger returns a logger enriched with all runtime description fields. func (r *Runtime) MakeLogger() zerolog.Logger { logCtx := log.With() - for key, val := range r.Description { + for key, val := range r.description { logCtx = logCtx.Str(key, val) } return logCtx.Logger() } + +func (r *Runtime) getErr() error { + r.errMu.RLock() + defer r.errMu.RUnlock() + return r.err +} + +func (r *Runtime) setErr(err error) { + r.errMu.Lock() + defer r.errMu.Unlock() + r.err = err +} diff --git a/pipeline/runtime/shutdown.go b/pipeline/runtime/shutdown.go index d53cf1c0d7f..166eb0d3d80 100644 --- a/pipeline/runtime/shutdown.go +++ b/pipeline/runtime/shutdown.go @@ -27,6 +27,9 @@ var ( shutdownCtxLock sync.Mutex ) +// GetShutdownCtx returns a context that is valid for shutdownTimeout after the +// first call. It is used as a fallback cleanup context when the runner context +// is already cancelled. func GetShutdownCtx() context.Context { shutdownCtxLock.Lock() defer shutdownCtxLock.Unlock() diff --git a/pipeline/runtime/step.go b/pipeline/runtime/step.go new file mode 100644 index 00000000000..7758d296a61 --- /dev/null +++ b/pipeline/runtime/step.go @@ -0,0 +1,262 @@ +// Copyright 2026 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runtime + +import ( + "context" + "errors" + "sync" + "time" + + backend "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types" + pipeline_errors "go.woodpecker-ci.org/woodpecker/v3/pipeline/errors" + "go.woodpecker-ci.org/woodpecker/v3/pipeline/frontend/metadata" + "go.woodpecker-ci.org/woodpecker/v3/pipeline/state" +) + +// executeStep is the single entry point called per step from runStage. +// It checks whether the step should be skipped, emits a "started" trace, +// sets up drone-compat env vars, then hands off to blocking or detached execution. +func (r *Runtime) executeStep(runnerCtx context.Context, step *backend.Step) error { + logger := r.MakeLogger() + logger.Debug().Str("step", step.Name).Msg("prepare") + + if r.shouldSkipStep(step) { + return nil + } + + // Emit a "step started" trace before doing any real work. + if err := r.traceStep(nil, nil, step); err != nil { + return err + } + + // Add compatibility environment variables for drone-ci plugins. + metadata.SetDroneEnviron(step.Environment) + + logger.Debug().Str("step", step.Name).Msg("executing") + + if step.Detached { + return r.runDetachedStep(runnerCtx, step) + } + return r.runBlockingStep(runnerCtx, step) +} + +// shouldSkipStep returns true when the step should not run based on the current +// pipeline error state and the step's OnSuccess / OnFailure flags. +// It logs the reason for skipping before returning. +func (r *Runtime) shouldSkipStep(step *backend.Step) bool { + logger := r.MakeLogger() + currentErr := r.getErr() + + if currentErr != nil && !step.OnFailure { + logger.Debug(). + Str("step", step.Name). + Err(currentErr). + Msgf("skipped due to OnFailure=%t", step.OnFailure) + return true + } + + if currentErr == nil && !step.OnSuccess { + logger.Debug(). + Str("step", step.Name). + Msgf("skipped due to OnSuccess=%t", step.OnSuccess) + return true + } + + return false +} + +// startStep starts the step container and spawns a goroutine to stream its logs. +// It returns: +// - waitForLogs: must be called before WaitStep — it blocks until the log stream +// is fully drained. Some backends (e.g. local) close the log stream when +// WaitStep is called, so draining first is required. +// - startTime: unix timestamp recorded right after the container started, used +// later to fill waitState.Started. +// +// If StartStep or TailStep fail, startStep returns a non-nil error and the caller +// must not call waitForLogs. +func (r *Runtime) startStep(step *backend.Step) (waitForLogs func(), startTime int64, err error) { + if err := r.engine.StartStep(r.ctx, step, r.taskUUID); err != nil { //nolint:contextcheck + return nil, 0, err + } + startTime = time.Now().Unix() + + var wg sync.WaitGroup + + if r.logger != nil { + rc, err := r.engine.TailStep(r.ctx, step, r.taskUUID) //nolint:contextcheck + if err != nil { + return nil, 0, err + } + + wg.Add(1) + go func() { + defer wg.Done() + logger := r.MakeLogger() + if err := r.logger(step, rc); err != nil { + logger.Error().Err(err).Str("step", step.Name).Msg("step log streaming failed") + } + _ = rc.Close() + }() + } + + return wg.Wait, startTime, nil +} + +// completeStep drains the log stream, waits for the process to exit, destroys +// the container, and maps exit conditions (OOM kill, non-zero exit code, context +// cancellation) to typed errors. +// +// runnerCtx is intentionally used for DestroyStep so that container cleanup can +// still reach the backend even after the workflow context (r.ctx) is cancelled. +func (r *Runtime) completeStep(runnerCtx context.Context, step *backend.Step, waitForLogs func(), startTime int64) (*backend.State, error) { + // Drain the log stream before waiting on the process exit. + waitForLogs() + + waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID) //nolint:contextcheck + if err != nil { + if errors.Is(err, context.Canceled) { + waitState.Error = pipeline_errors.ErrCancel + } else { + return nil, err + } + } + + // Use runnerCtx here: the workflow context may already be cancelled but we + // still need to reach the backend to stop/remove the container. + if err := r.engine.DestroyStep(runnerCtx, step, r.taskUUID); err != nil { + return nil, err + } + + waitState.Started = startTime + + // Re-check context cancellation: the wait may have raced with cancellation. + if ctxErr := r.ctx.Err(); ctxErr != nil && errors.Is(ctxErr, context.Canceled) { + waitState.Error = pipeline_errors.ErrCancel + } + + if waitState.OOMKilled { + return waitState, &pipeline_errors.OomError{ + UUID: step.UUID, + Code: waitState.ExitCode, + } + } + if waitState.ExitCode != 0 { + return waitState, &pipeline_errors.ExitError{ + UUID: step.UUID, + Code: waitState.ExitCode, + } + } + + return waitState, nil +} + +// runBlockingStep starts the step and blocks until it fully completes. +// The error is traced and returned to runStage, which feeds it into the +// stage error group. +func (r *Runtime) runBlockingStep(runnerCtx context.Context, step *backend.Step) error { + logger := r.MakeLogger() + + waitForLogs, startTime, err := r.startStep(step) + if err != nil { + // The step never ran — trace the start failure and surface it. + return r.traceStep(nil, err, step) + } + + processState, err := r.completeStep(runnerCtx, step, waitForLogs, startTime) + logger.Debug().Str("step", step.Name).Msg("complete") + + if errors.Is(err, context.Canceled) { + err = pipeline_errors.ErrCancel + } + + err = r.traceStep(processState, err, step) + if err != nil && step.Failure == metadata.FailureIgnore { + return nil + } + return err +} + +// runDetachedStep starts the step and returns as soon as the container is running +// and log streaming is set up. The rest of the step lifecycle runs in the background. +// +// Any error that occurs after setup is logged but not propagated — it cannot +// influence the pipeline outcome at that point. +func (r *Runtime) runDetachedStep(runnerCtx context.Context, step *backend.Step) error { + waitForLogs, startTime, err := r.startStep(step) + if err != nil { + // Setup failed before the container was running — treat it like a + // blocking failure so the pipeline is aware. + return r.traceStep(nil, err, step) + } + + // Container is up and logging is streaming — hand off to background. + go func() { + logger := r.MakeLogger() + + processState, err := r.completeStep(runnerCtx, step, waitForLogs, startTime) + logger.Debug().Str("step", step.Name).Msg("complete") + + if errors.Is(err, context.Canceled) { + err = pipeline_errors.ErrCancel + } + if err != nil { + logger.Error().Err(err).Str("step", step.Name).Msg("detached step failed after setup") + } + + if traceErr := r.traceStep(processState, err, step); traceErr != nil { + logger.Error().Err(traceErr).Str("step", step.Name).Msg("failed to trace detached step result") + } + }() + + return nil +} + +// traceStep reports the current state of a step to the tracer. +// +// - processState == nil, err == nil → step is being marked as started +// - processState == nil, err != nil → step failed to start +// - processState != nil → step has finished (err may or may not be set) +// +// Always returns err unchanged so callers can write: return r.traceStep(state, err, step) +func (r *Runtime) traceStep(processState *backend.State, err error, step *backend.Step) error { + if r.tracer == nil { + return err + } + + s := new(state.State) + s.Pipeline.Started = r.started + s.Pipeline.Step = step + s.Pipeline.Error = r.getErr() + + switch { + case processState == nil && err != nil: + // Step failed to start — synthesise an exited process state. + s.Process = backend.State{ + Error: err, + Exited: true, + OOMKilled: false, + } + case processState != nil: + s.Process = *processState + // processState == nil && err == nil: step just started, leave s.Process zero-valued. + } + + if traceErr := r.tracer.Trace(s); traceErr != nil { + return traceErr + } + return err +} diff --git a/pipeline/runtime/workflow.go b/pipeline/runtime/workflow.go new file mode 100644 index 00000000000..7e8ce46f7ac --- /dev/null +++ b/pipeline/runtime/workflow.go @@ -0,0 +1,130 @@ +// Copyright 2026 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runtime + +import ( + "context" + "errors" + "strings" + "time" + + "golang.org/x/sync/errgroup" + + backend "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types" + pipeline_errors "go.woodpecker-ci.org/woodpecker/v3/pipeline/errors" + "go.woodpecker-ci.org/woodpecker/v3/pipeline/state" +) + +// Run starts the workflow, executes all stages sequentially, and tears down the +// workflow on exit. runnerCtx must outlive workflow cancellation so that cleanup +// can still reach the backend (e.g. stopping Docker containers). +func (r *Runtime) Run(runnerCtx context.Context) error { + logger := r.MakeLogger() + r.logStages() + + defer func() { + ctx := runnerCtx //nolint:contextcheck + if ctx.Err() != nil { + // runnerCtx itself is done — fall back to a short-lived shutdown context. + ctx = GetShutdownCtx() + } + if err := r.engine.DestroyWorkflow(ctx, r.spec, r.taskUUID); err != nil { + logger.Error().Err(err).Msg("could not destroy workflow") + } + }() + + r.started = time.Now().Unix() + + if err := r.engine.SetupWorkflow(runnerCtx, r.spec, r.taskUUID); err != nil { + r.traceWorkflowSetupError(err) + return err + } + + for _, stage := range r.spec.Stages { + select { + case <-r.ctx.Done(): + return pipeline_errors.ErrCancel + case err := <-r.runStage(runnerCtx, stage.Steps): + if err != nil { + r.setErr(err) + } + } + } + + return r.getErr() +} + +// logStages logs the ordered list of stages and their steps at debug level. +func (r *Runtime) logStages() { + logger := r.MakeLogger() + logger.Debug().Msgf("executing %d stages, in order of:", len(r.spec.Stages)) + for stagePos, stage := range r.spec.Stages { + stepNames := make([]string, 0, len(stage.Steps)) + for _, step := range stage.Steps { + stepNames = append(stepNames, step.Name) + } + logger.Debug(). + Int("StagePos", stagePos). + Str("Steps", strings.Join(stepNames, ",")). + Msg("stage") + } +} + +// traceWorkflowSetupError traces an ErrInvalidWorkflowSetup to the tracer, if one +// is configured. Other error types are silently ignored here (they are still +// returned by Run). +func (r *Runtime) traceWorkflowSetupError(err error) { + var stepErr *pipeline_errors.ErrInvalidWorkflowSetup + if !errors.As(err, &stepErr) { + return + } + + s := new(state.State) + s.Pipeline.Step = stepErr.Step + s.Pipeline.Error = stepErr.Err + s.Process = backend.State{ + Error: stepErr.Err, + Exited: true, + ExitCode: 1, + } + + if r.tracer != nil { + if traceErr := r.tracer.Trace(s); traceErr != nil { + logger := r.MakeLogger() + logger.Error().Err(traceErr).Msg("failed to trace workflow setup error") + } + } +} + +// runStage executes all steps of a stage in parallel. +// It returns a channel that emits the combined error (if any) once all steps finish. +func (r *Runtime) runStage(runnerCtx context.Context, steps []*backend.Step) <-chan error { + var g errgroup.Group + done := make(chan error) + + for _, step := range steps { + step := step // capture loop variable + g.Go(func() error { + return r.executeStep(runnerCtx, step) + }) + } + + go func() { + done <- g.Wait() + close(done) + }() + + return done +}