Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 105 additions & 2 deletions agent/rpc/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
grpcproto "google.golang.org/protobuf/proto"

backend "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/types"
"go.woodpecker-ci.org/woodpecker/v3/rpc"
"go.woodpecker-ci.org/woodpecker/v3/rpc/proto"
)
Expand Down Expand Up @@ -482,7 +483,7 @@ func (c *client) sendLogs(ctx context.Context, entries []*proto.LogEntry) error
return nil
}

func (c *client) RegisterAgent(ctx context.Context, info rpc.AgentInfo) (int64, error) {
func (c *client) RegisterAgent(ctx context.Context, info rpc.AgentInfo) (rpc.AgentConfig, error) {
req := new(proto.RegisterAgentRequest)
req.Info = &proto.AgentInfo{
Platform: info.Platform,
Expand All @@ -493,7 +494,14 @@ func (c *client) RegisterAgent(ctx context.Context, info rpc.AgentInfo) (int64,
}

res, err := c.client.RegisterAgent(ctx, req)
return res.GetAgentId(), err
if err != nil {
return rpc.AgentConfig{}, err
}
protoConfig := res.GetConfig()
return rpc.AgentConfig{
AgentID: protoConfig.GetAgentId(),
RecoveryEnabled: protoConfig.GetRecoveryEnabled(),
}, nil
}

func (c *client) UnregisterAgent(ctx context.Context) error {
Expand Down Expand Up @@ -542,3 +550,98 @@ func (c *client) ReportHealth(ctx context.Context) (err error) {
}
}
}

// InitWorkflowRecovery initializes recovery state for all steps in a workflow and returns current states.
func (c *client) InitWorkflowRecovery(ctx context.Context, workflowID string, stepUUIDs []string, timeoutSeconds int64) (map[string]*types.RecoveryState, error) {
retry := c.newBackOff()
req := &proto.InitWorkflowRecoveryRequest{
WorkflowId: workflowID,
StepUuids: stepUUIDs,
TimeoutSeconds: timeoutSeconds,
}

var res *proto.InitWorkflowRecoveryResponse
var err error

for {
res, err = c.client.InitWorkflowRecovery(ctx, req)
if err == nil {
break
}
log.Error().Err(err).Msgf("grpc error: InitWorkflowRecovery(): code: %v", status.Code(err))

switch status.Code(err) {
case codes.Canceled:
if ctx.Err() != nil {
return nil, nil
}
return nil, err
case
codes.Aborted,
codes.DataLoss,
codes.DeadlineExceeded,
codes.Internal,
codes.Unavailable:
// non-fatal errors
default:
return nil, err
}

select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return nil, ctx.Err()
}
}

result := make(map[string]*types.RecoveryState, len(res.GetStates()))
for _, state := range res.GetStates() {
result[state.GetStepUuid()] = &types.RecoveryState{
Status: types.RecoveryStatus(state.GetStatus()),
ExitCode: int(state.GetExitCode()),
}
}
return result, nil
}

// UpdateStepRecoveryState updates the recovery state for a specific step.
func (c *client) UpdateStepRecoveryState(ctx context.Context, workflowID, stepUUID string, recoveryStatus types.RecoveryStatus, exitCode int) (err error) {
retry := c.newBackOff()
req := &proto.UpdateStepRecoveryStateRequest{
WorkflowId: workflowID,
StepUuid: stepUUID,
Status: proto.RecoveryStatus(recoveryStatus),
ExitCode: int32(exitCode),
}

for {
_, err = c.client.UpdateStepRecoveryState(ctx, req)
if err == nil {
return nil
}
log.Error().Err(err).Msgf("grpc error: UpdateStepRecoveryState(): code: %v", status.Code(err))

switch status.Code(err) {
case codes.Canceled:
if ctx.Err() != nil {
return nil
}
return err
case
codes.Aborted,
codes.DataLoss,
codes.DeadlineExceeded,
codes.Internal,
codes.Unavailable:
// non-fatal errors
default:
return err
}

select {
case <-time.After(retry.NextBackOff()):
case <-ctx.Done():
return ctx.Err()
}
}
}
66 changes: 39 additions & 27 deletions agent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,22 @@ import (
)

type Runner struct {
client rpc.Peer
filter rpc.Filter
hostname string
counter *State
backend *backend.Backend
client rpc.Peer
filter rpc.Filter
hostname string
counter *State
backend *backend.Backend
recoveryEnabled bool
}

func NewRunner(workEngine rpc.Peer, f rpc.Filter, h string, state *State, backend *backend.Backend) Runner {
func NewRunner(workEngine rpc.Peer, f rpc.Filter, h string, state *State, backend *backend.Backend, recoveryEnabled bool) Runner {
return Runner{
client: workEngine,
filter: f,
hostname: h,
counter: state,
backend: backend,
client: workEngine,
filter: f,
hostname: h,
counter: state,
backend: backend,
recoveryEnabled: recoveryEnabled,
}
}

Expand Down Expand Up @@ -95,14 +97,28 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {
workflowCtx, cancelWorkflowCtx := context.WithCancelCause(workflowCtx)
defer cancelWorkflowCtx(nil)

// Add sigterm support for internal context.
// Required to be able to terminate the running workflow by external signals.
// Handle SIGTERM (k8s, docker, system shutdown)
workflowCtx = utils.WithContextSigtermCallback(workflowCtx, func() {
logger.Error().Msg("received sigterm termination signal")
// WithContextSigtermCallback would cancel the context too, but we want our own custom error
cancelWorkflowCtx(pipeline_errors.ErrCancel)
})

state := rpc.WorkflowState{
Started: time.Now().Unix(),
}
if err := r.client.Init(runnerCtx, workflow.ID, state); err != nil {
logger.Error().Err(err).Msg("workflow initialization failed")
return err
}

// Initialize recovery manager before launching goroutines that reference it
recoveryManager := pipeline_runtime.NewRecoveryManager(r.client, workflow.ID, r.recoveryEnabled)
if err := recoveryManager.InitRecoveryState(runnerCtx, workflow.Config, int64(timeout.Seconds())); err != nil {
logger.Warn().Err(err).Msg("failed to initialize recovery state, continuing without recovery")
recoveryManager = pipeline_runtime.NewRecoveryManager(r.client, workflow.ID, false)
}

// Listen for remote cancel events (UI / API).
// When canceled, we MUST cancel the workflow context
// so that workflow execution stop immediately.
Expand All @@ -114,10 +130,10 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {
cancelWorkflowCtx(err)
} else {
if canceled {
logger.Debug().Err(err).Msg("server side cancel signal received")
logger.Debug().Msg("server side cancel signal received")
recoveryManager.SetCanceled()
cancelWorkflowCtx(pipeline_errors.ErrCancel)
}
// Wait returned without error, meaning the workflow finished normally
logger.Debug().Msg("cancel listener exited normally")
}
}()
Expand All @@ -139,18 +155,6 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {
}
}()

state := rpc.WorkflowState{
Started: time.Now().Unix(),
}

if err := r.client.Init(runnerCtx, workflow.ID, state); err != nil {
logger.Error().Err(err).Msg("signaling workflow initialization to server failed")
// We have an error, maybe the server is currently unreachable or other server-side errors occurred.
// So let's clean up and end this not yet started workflow run.
cancelWorkflowCtx(err)
return err
}

var uploads sync.WaitGroup

// Run pipeline
Expand All @@ -161,6 +165,7 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {
pipeline_runtime.WithLogger(r.createLogger(logger, &uploads, workflow)),
pipeline_runtime.WithTracer(r.createTracer(ctxMeta, &uploads, logger, workflow)),
pipeline_runtime.WithBackend(*r.backend),
pipeline_runtime.WithRecoveryManager(recoveryManager),
pipeline_runtime.WithDescription(map[string]string{
"workflow_id": workflow.ID,
"repo": repoName,
Expand Down Expand Up @@ -189,6 +194,13 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error {
uploads.Wait()
logger.Debug().Msg("logs and traces uploaded")

// If workflow is recoverable (context canceled, recovery enabled, not user cancel),
// skip marking as done. The workflow will be picked up by a new agent after restart.
if recoveryManager.IsRecoverable(runnerCtx) {
logger.Info().Msg("workflow is recoverable, not marking as done")
return nil
}

// Update workflow state
doneCtx := runnerCtx
if doneCtx.Err() != nil {
Expand Down
5 changes: 3 additions & 2 deletions cmd/agent/core/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func run(ctx context.Context, c *cli.Command, backends []types.Backend) error {
log.Debug().Msgf("custom labels detected: %#v", customLabels)
}

agentConfig.AgentID, err = client.RegisterAgent(grpcCtx, rpc.AgentInfo{ //nolint:contextcheck
registeredAgent, err := client.RegisterAgent(grpcCtx, rpc.AgentInfo{ //nolint:contextcheck
Version: version.String(),
Backend: backendEngine.Name(),
Platform: engInfo.Platform,
Expand All @@ -221,6 +221,7 @@ func run(ctx context.Context, c *cli.Command, backends []types.Backend) error {
if err != nil {
return err
}
agentConfig.AgentID = registeredAgent.AgentID

serviceWaitingGroup.Go(func() error {
// we close grpc client context once unregister was handled
Expand Down Expand Up @@ -288,7 +289,7 @@ func run(ctx context.Context, c *cli.Command, backends []types.Backend) error {
// https://go.dev/blog/go1.22 fixed scope for goroutines in loops
for i := range maxWorkflows {
serviceWaitingGroup.Go(func() error {
runner := agent.NewRunner(client, filter, hostname, counter, &backendEngine)
runner := agent.NewRunner(client, filter, hostname, counter, &backendEngine, registeredAgent.RecoveryEnabled)
log.Debug().Msgf("created new runner %d", i)

for {
Expand Down
9 changes: 9 additions & 0 deletions cmd/server/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,15 @@ var flags = append([]cli.Flag{
Name: "encryption-disable-flag",
Usage: "Flag to decrypt all encrypted data and disable encryption on server",
},
//
// recovery options
//
&cli.BoolFlag{
Sources: cli.EnvVars("WOODPECKER_RECOVERY_ENABLED"),
Name: "recovery-enabled",
Usage: "Enable pipeline recovery state tracking, allowing agents to resume workflows after restart",
Value: false,
},
}, logger.GlobalLoggerFlags...)

// If woodpecker is running inside a container the default value for
Expand Down
1 change: 1 addition & 0 deletions cmd/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func runGrpcServer(ctx context.Context, c *cli.Command, _store store.Store) erro
server.Config.Services.Logs,
server.Config.Services.Pubsub,
_store,
c.Bool("recovery-enabled"),
)
proto.RegisterWoodpeckerServer(grpcServer, woodpeckerServer)

Expand Down
22 changes: 22 additions & 0 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,28 @@ func run(ctx context.Context, c *cli.Command) error {
return nil
})

// Start recovery state cleanup task
if c.Bool("recovery-enabled") {
serviceWaitingGroup.Go(func() error {
log.Info().Msg("starting recovery state cleanup service ...")
ticker := time.NewTicker(time.Minute * 5)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info().Msg("recovery state cleanup service stopped")
return nil
case <-ticker.C:
if err := _store.RecoveryStateCleanExpired(); err != nil {
log.Error().Err(err).Msg("failed to clean expired recovery states")
} else {
log.Trace().Msg("cleaned expired recovery states")
}
}
}
})
}

// start the grpc server
serviceWaitingGroup.Go(func() error {
log.Info().Msg("starting grpc server ...")
Expand Down
1 change: 0 additions & 1 deletion cmd/server/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ func setupEvilGlobals(ctx context.Context, c *cli.Command, s store.Store) (err e
server.Config.Pipeline.DefaultCancelPreviousPipelineEvents = events
server.Config.Pipeline.DefaultTimeout = c.Int64("default-pipeline-timeout")
server.Config.Pipeline.MaxTimeout = c.Int64("max-pipeline-timeout")

_labels := c.StringSlice("default-workflow-labels")
labels := make(map[string]string, len(_labels))
for _, v := range _labels {
Expand Down
9 changes: 9 additions & 0 deletions docs/docs/30-administration/10-configuration/10-server.md
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,15 @@ Fully qualified public forge URL, used if forge url is not a public URL. Format:

---

### RECOVERY_ENABLED

- Name: `WOODPECKER_RECOVERY_ENABLED`
- Default: `false`

Enables pipeline recovery state tracking. When enabled, agents can resume workflows after a restart (e.g. during rolling deployments or agent crashes). Steps that already completed successfully are skipped, and running steps are reconnected. Currently, Docker and Kubernetes backends support recovery.

---

### GITHUB\_\*

See [GitHub configuration](./12-forges/20-github.md#configuration)
Expand Down
Loading