Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions pipeline/runtime/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

// Run starts the execution of a workflow and waits for it to complete.
func (r *Runtime) Run(runnerCtx context.Context) error {
logger := r.MakeLogger()
logger := r.makeLogger()
logger.Debug().Msgf("executing %d stages, in order of:", len(r.spec.Stages))
for stagePos, stage := range r.spec.Stages {
stepNames := []string{}
Expand Down Expand Up @@ -85,12 +85,12 @@ func (r *Runtime) Run(runnerCtx context.Context) error {
return pipeline_errors.ErrCancel
case err := <-r.execAll(runnerCtx, stage.Steps):
if err != nil {
r.err = err
r.err.Set(err)
}
}
}

return r.err
return r.err.Get()
}

// Updates the current status of a step.
Expand All @@ -105,7 +105,7 @@ func (r *Runtime) traceStep(processState *backend.State, err error, step *backen
state := new(state.State)
state.Pipeline.Started = r.started
state.Pipeline.Step = step
state.Pipeline.Error = r.err
state.Pipeline.Error = r.err.Get()

// We have an error while starting the step
if processState == nil && err != nil {
Expand All @@ -128,7 +128,7 @@ func (r *Runtime) traceStep(processState *backend.State, err error, step *backen
func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend.Step) <-chan error {
var g errgroup.Group
done := make(chan error)
logger := r.MakeLogger()
logger := r.makeLogger()

for _, step := range steps {
// Required since otherwise the loop variable
Expand All @@ -141,14 +141,14 @@ func (r *Runtime) execAll(runnerCtx context.Context, steps []*backend.Step) <-ch
Str("step", step.Name).
Msg("prepare")

switch {
case r.err != nil && !step.OnFailure:
switch rErr := r.err.Get(); {
case rErr != nil && !step.OnFailure:
logger.Debug().
Str("step", step.Name).
Err(r.err).
Err(rErr).
Msgf("skipped due to OnFailure=%t", step.OnFailure)
return nil
case r.err == nil && !step.OnSuccess:
case rErr == nil && !step.OnSuccess:
logger.Debug().
Str("step", step.Name).
Msgf("skipped due to OnSuccess=%t", step.OnSuccess)
Expand Down Expand Up @@ -228,7 +228,7 @@ func (r *Runtime) exec(runnerCtx context.Context, step *backend.Step, setupWg *s
return nil, err
}
startTime := time.Now().Unix()
logger := r.MakeLogger()
logger := r.makeLogger()

var wg sync.WaitGroup
if r.logger != nil {
Expand Down
14 changes: 8 additions & 6 deletions pipeline/runtime/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,43 +22,45 @@ import (
"go.woodpecker-ci.org/woodpecker/v3/pipeline/tracing"
)

// Option configures a runtime option.
// Option configures a Runtime.
type Option func(*Runtime)

// WithBackend returns an option configured with a runtime engine.
// WithBackend sets the backend engine used to run steps.
func WithBackend(backend backend.Backend) Option {
return func(r *Runtime) {
r.engine = backend
}
}

// WithLogger returns an option configured with a runtime logger.
// WithLogger sets the function used to stream step logs.
func WithLogger(logger logging.Logger) Option {
return func(r *Runtime) {
r.logger = logger
}
}

// WithTracer returns an option configured with a runtime tracer.
// WithTracer sets the tracer used to report step state changes.
func WithTracer(tracer tracing.Tracer) Option {
return func(r *Runtime) {
r.tracer = tracer
}
}

// WithContext returns an option configured with a context.
// WithContext sets the workflow execution context.
func WithContext(ctx context.Context) Option {
return func(r *Runtime) {
r.ctx = ctx
}
}

// WithDescription sets the descriptive key-value pairs attached to every log line.
func WithDescription(desc map[string]string) Option {
return func(r *Runtime) {
r.Description = desc
r.description = desc
}
}

// WithTaskUUID sets a specific task UUID instead of the auto-generated one.
func WithTaskUUID(uuid string) Option {
return func(r *Runtime) {
r.taskUUID = uuid
Expand Down
33 changes: 18 additions & 15 deletions pipeline/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,46 +24,49 @@ import (
backend "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/logging"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/tracing"
"go.woodpecker-ci.org/woodpecker/v3/shared/utils"
)

// Runtime represents a workflow state executed by a specific backend.
// Each workflow gets its own state configuration at runtime.
// Each workflow gets its own Runtime instance.
type Runtime struct {
err error
// err holds the first error that occurred in the workflow.
err utils.Protected[error]

spec *backend.Config
engine backend.Backend
started int64

// The context a workflow is being executed with.
// All normal (non cleanup) operations must use this.
// Cleanup operations should use the runnerCtx passed to Run()
// ctx is the context for the current workflow execution.
// All normal (non-cleanup) step operations must use this context.
// Cleanup operations should use the runnerCtx passed to Run().
ctx context.Context

tracer tracing.Tracer
logger logging.Logger

taskUUID string

Description map[string]string // The runtime descriptors.
taskUUID string
description map[string]string
}

// New returns a new runtime using the specified runtime
// configuration and runtime engine.
// New returns a new Runtime for the given workflow spec and options.
func New(spec *backend.Config, opts ...Option) *Runtime {
r := new(Runtime)
r.Description = map[string]string{}
r.err = utils.NewProtected[error](nil)
r.description = map[string]string{}
r.spec = spec
r.ctx = context.Background()
r.taskUUID = ulid.Make().String()
for _, opts := range opts {
opts(r)
for _, opt := range opts {
opt(r)
}
return r
}

func (r *Runtime) MakeLogger() zerolog.Logger {
// makeLogger returns a logger enriched with all runtime description fields.
func (r *Runtime) makeLogger() zerolog.Logger {
logCtx := log.With()
for key, val := range r.Description {
for key, val := range r.description {
logCtx = logCtx.Str(key, val)
}
return logCtx.Logger()
Expand Down
3 changes: 3 additions & 0 deletions pipeline/runtime/shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ var (
shutdownCtxLock sync.Mutex
)

// GetShutdownCtx returns a context that is valid for shutdownTimeout after the
// first call. It is used as a fallback cleanup context when the runner context
// is already canceled.
func GetShutdownCtx() context.Context {
shutdownCtxLock.Lock()
defer shutdownCtxLock.Unlock()
Expand Down
64 changes: 64 additions & 0 deletions shared/utils/protected.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2026 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
"sync"
)

// Protected provides thread-safe read and write access to a value of type T.
type Protected[T any] interface {
// Get returns the current value using a read lock, allowing multiple concurrent
// readers. Safe to call from multiple goroutines simultaneously.
Get() T

// Set replaces the current value using an exclusive write lock.
// Blocks until all ongoing reads/writes complete.
Set(v T)

// Update performs an atomic read-modify-write operation under a single exclusive
// lock. The provided function receives the current value and returns the new value,
// eliminating the race condition that would occur with a separate Get + Set.
Update(fn func(T) T)
}

type protected[T any] struct {
mu sync.RWMutex
value T
}

// NewProtected creates and returns a new Protected wrapper initialized with the
// given value. Use this as the constructor instead of creating a protected struct directly.
func NewProtected[T any](initial T) Protected[T] {
return &protected[T]{value: initial}
}

func (p *protected[T]) Get() T {
p.mu.RLock()
defer p.mu.RUnlock()
return p.value
}

func (p *protected[T]) Set(v T) {
p.mu.Lock()
defer p.mu.Unlock()
p.value = v
}

func (p *protected[T]) Update(fn func(T) T) {
p.mu.Lock()
defer p.mu.Unlock()
p.value = fn(p.value)
}