Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,12 @@ IMAGE_TAG ?= $(IMAGE_REPO):$(GIT_TAG)
HELM_CHART_REPO := $(STAGING_IMAGE_REGISTRY)/jobset/charts

# In-place restart agent image
# TODO (k8s 1.35): Replace IN_PLACE_RESTART_AGENT_BASE_IMAGE with BASE_IMAGE (distroless/static:nonroot)
# TODO (beta of in-place restart): Default IN_PLACE_RESTART_AGENT_IMAGE_REGISTRY to a valid registry URL to build and push the agent automatically
IN_PLACE_RESTART_AGENT_IMAGE_REGISTRY ?=
IN_PLACE_RESTART_AGENT_IMAGE_NAME := in-place-restart-agent
IN_PLACE_RESTART_AGENT_IMAGE_REPO ?= $(IN_PLACE_RESTART_AGENT_IMAGE_REGISTRY)/$(IN_PLACE_RESTART_AGENT_IMAGE_NAME)
IN_PLACE_RESTART_AGENT_IMAGE_TAG ?= $(IN_PLACE_RESTART_AGENT_IMAGE_REPO):$(GIT_TAG)
IN_PLACE_RESTART_AGENT_DOCKERFILE ?= cmd/in-place-restart-agent/Dockerfile-example
IN_PLACE_RESTART_AGENT_BASE_IMAGE ?= debian:bookworm-slim
IN_PLACE_RESTART_AGENT_BUILDER_IMAGE ?= golang:$(GO_VERSION)
IN_PLACE_RESTART_AGENT_DOCKERFILE ?= cmd/in-place-restart-agent/Dockerfile

# Use distroless as minimal base image to package the manager binary
# Refer to https://github.com/GoogleContainerTools/distroless for more details
Expand Down Expand Up @@ -211,15 +208,21 @@ image-build:
image-push: PUSH=--push
image-push: image-build

# Build the in-place restart agent image
# Build the in-place restart agent binary (sidecar container mode)
.PHONY: in-place-restart-agent-build
in-place-restart-agent-build: install-go-deps
$(GO_BUILD_ENV) $(GO_CMD) build -ldflags="$(LD_FLAGS)" -o bin/in-place-restart-agent cmd/in-place-restart-agent/main.go

# Build the in-place restart agent image (sidecar container mode)
.PHONY: in-place-restart-agent-image-build
in-place-restart-agent-image-build:
$(IMAGE_BUILD_CMD) \
-t $(IN_PLACE_RESTART_AGENT_IMAGE_TAG) \
-t $(IN_PLACE_RESTART_AGENT_IMAGE_REPO):$(BRANCH_NAME) \
-f $(IN_PLACE_RESTART_AGENT_DOCKERFILE) \
--platform=$(PLATFORMS) \
--build-arg BASE_IMAGE=$(IN_PLACE_RESTART_AGENT_BASE_IMAGE) \
--build-arg BUILDER_IMAGE=$(IN_PLACE_RESTART_AGENT_BUILDER_IMAGE) \
--build-arg BASE_IMAGE=$(BASE_IMAGE) \
--build-arg BUILDER_IMAGE=$(BUILDER_IMAGE) \
--build-arg CGO_ENABLED=$(CGO_ENABLED) \
$(PUSH) \
$(IMAGE_BUILD_EXTRA_OPTS) ./
Expand Down
28 changes: 28 additions & 0 deletions cmd/in-place-restart-agent/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This is only for the sidecar container mode of the in-place restart agent
ARG BUILDER_IMAGE=golang:1.25
ARG BASE_IMAGE=gcr.io/distroless/static:nonroot

# Build the manager binary
FROM --platform=${BUILDPLATFORM} ${BUILDER_IMAGE} AS builder

ARG CGO_ENABLED
ARG TARGETARCH

WORKDIR /workspace

# Install dependencies
COPY go.mod go.sum Makefile ./
RUN make install-go-deps controller-gen

# Copy the go source
COPY . .

# Build
RUN make in-place-restart-agent-build GO_BUILD_ENV='CGO_ENABLED=${CGO_ENABLED} GOOS=linux GOARCH=${TARGETARCH}'

FROM --platform=${BUILDPLATFORM} ${BASE_IMAGE}
WORKDIR /
COPY --from=builder /workspace/bin/in-place-restart-agent .
USER 65532:65532

ENTRYPOINT ["/in-place-restart-agent"]
38 changes: 0 additions & 38 deletions cmd/in-place-restart-agent/Dockerfile-example

This file was deleted.

140 changes: 114 additions & 26 deletions cmd/in-place-restart-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package main

import (
"context"
"net/http"
"os"
"os/exec"
"os/signal"
Expand All @@ -38,12 +39,17 @@ import (
const (
// NamespaceFile is the path to the file containing the namespace of the Pod as specified in the k8s standard
// See https://kubernetes.io/docs/tasks/run-application/access-api-from-pod/#directly-accessing-the-rest-api
NamespaceFile = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
EnvJobSetName = "JOBSET_NAME"
EnvPodName = "POD_NAME"
EnvInPlaceRestartExitCode = "IN_PLACE_RESTART_EXIT_CODE"
EnvWorkerCommand = "WORKER_COMMAND"
ControllerName = "in-place-restart-agent"
NamespaceFile = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
EnvJobSetName = "JOBSET_NAME"
EnvPodName = "POD_NAME"
EnvWorkerCommand = "WORKER_COMMAND"
EnvStartupProbePath = "STARTUP_PROBE_PATH"
EnvStartupProbePort = "STARTUP_PROBE_PORT"
EnvInPlaceRestartExitCode = "IN_PLACE_RESTART_EXIT_CODE"
DefaultStartupProbePath = "/barrier-is-down"
DefaultStartupProbePort = "8081"
DefaultInPlaceRestartExitCode = "1"
ControllerName = "in-place-restart-agent"
)

var (
Expand Down Expand Up @@ -108,6 +114,8 @@ func setupInPlaceRestartAgentOrDie(mgr ctrl.Manager, env env) {
env.Namespace,
env.PodName,
env.WorkerCommand,
env.StartupProbePath,
env.StartupProbePort,
env.InPlaceRestartExitCode,
)
if err := inPlaceRestartAgent.SetupWithManager(mgr); err != nil {
Expand Down Expand Up @@ -166,43 +174,75 @@ type env struct {
// PodName is the name of the Pod
// Represents the POD_NAME environment variable
PodName string
// WorkerCommand is the command used to start the worker process when the barrier is lifted
// Represents the WORKER_COMMAND environment variable
WorkerCommand string
// StartupProbePath is the path of the startup probe used for the barrier
// Represents the STARTUP_PROBE_PATH environment variable
StartupProbePath string
// StartupProbePort is the port of the startup probe used for the barrier
// Represents the STARTUP_PROBE_PORT environment variable
StartupProbePort string
// InPlaceRestartExitCode is the exit code used to trigger an in-place restart
// The agent will exit with this exit code when it detects that the Pod needs to be restarted in-place
// The Pod spec should be configured accordingly to restart the Pod in-place when the agent exits with this exit code
// See https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#restart-all-containers
// Represents the IN_PLACE_RESTART_EXIT_CODE environment variable
InPlaceRestartExitCode int
// WorkerCommand is the command used to start the worker process when the barrier is lifted
// Represents the WORKER_COMMAND environment variable
WorkerCommand string
}

// parseEnvOrDie parses the environment variables and returns an env struct
// It reads the namespace from the mounted service account file to reduce the number of env vars
// It exits with an error if any of the variables are not set
func parseEnvOrDie() env {
rawNamespace, err := os.ReadFile(NamespaceFile)
if err != nil {
setupLog.Error(err, "unable to read namespace file. Please check if pod.spec.automountServiceAccountToken or serviceAccount.automountServiceAccountToken are set to false", "file", NamespaceFile)
os.Exit(1)
}
setupLog.Info("defaulting namespace", "namespace", string(rawNamespace))
namespace := string(rawNamespace)
jobSetName := getEnvOrDie(EnvJobSetName)
podName := getEnvOrDie(EnvPodName)
rawInPlaceRestartExitCode := getEnvOrDie(EnvInPlaceRestartExitCode)
rawInPlaceRestartExitCode := os.Getenv(EnvInPlaceRestartExitCode)
if rawInPlaceRestartExitCode == "" {
setupLog.Info("env var IN_PLACE_RESTART_EXIT_CODE not set, using default", "default", DefaultInPlaceRestartExitCode)
rawInPlaceRestartExitCode = DefaultInPlaceRestartExitCode
}
inPlaceRestartExitCode, err := strconv.Atoi(rawInPlaceRestartExitCode)
if err != nil {
setupLog.Error(err, "invalid env var value", "name", EnvInPlaceRestartExitCode, "value", rawInPlaceRestartExitCode)
os.Exit(1)
}
workerCommand := getEnvOrDie(EnvWorkerCommand)

if os.Getenv(EnvWorkerCommand) != "" && (os.Getenv(EnvStartupProbePath) != "" || os.Getenv(EnvStartupProbePort) != "") {
setupLog.Error(nil, "invalid env var configuration: WORKER_COMMAND cannot be set with STARTUP_PROBE_PATH or STARTUP_PROBE_PORT")
os.Exit(1)
}
workerCommand := os.Getenv(EnvWorkerCommand)
if workerCommand != "" {
setupLog.Info("env var WORKER_COMMAND is set. In-place restart agent will run in main container mode")
} else {
setupLog.Info("env var WORKER_COMMAND is not set. In-place restart agent will run in sidecar container mode")
}
startupProbePath := os.Getenv(EnvStartupProbePath)
if startupProbePath == "" && workerCommand == "" {
setupLog.Info("startup probe path not set, using default", "default", DefaultStartupProbePath)
startupProbePath = DefaultStartupProbePath
}
startupProbePort := os.Getenv(EnvStartupProbePort)
if startupProbePort == "" && workerCommand == "" {
setupLog.Info("startup probe port not set, using default", "default", DefaultStartupProbePort)
startupProbePort = DefaultStartupProbePort
}

return env{
Namespace: namespace,
JobSetName: jobSetName,
PodName: podName,
InPlaceRestartExitCode: inPlaceRestartExitCode,
WorkerCommand: workerCommand,
StartupProbePath: startupProbePath,
StartupProbePort: startupProbePort,
}
}

Expand All @@ -224,34 +264,47 @@ type InPlaceRestartAgent struct {
PodName string
InPlaceRestartExitCode int
WorkerCommand string
StartupProbePath string
StartupProbePort string
PodInPlaceRestartAttempt *int32
IsBarrierActive bool
Exit func(int) // Required for testing
StartWorker func(context.Context, string) error // Required for testing
StartStartupProbe func(string, http.Handler) error // Required for testing
}

// NewInPlaceRestartAgent creates a new InPlaceRestartAgent
func NewInPlaceRestartAgent(client client.Client, namespace string, podName string, workerCommand string, inPlaceRestartExitCode int) *InPlaceRestartAgent {
func NewInPlaceRestartAgent(client client.Client, namespace string, podName string, workerCommand string, startupProbePath string, startupProbePort string, inPlaceRestartExitCode int) *InPlaceRestartAgent {
return &InPlaceRestartAgent{
Client: client,
Namespace: namespace,
PodName: podName,
InPlaceRestartExitCode: inPlaceRestartExitCode,
WorkerCommand: workerCommand,
StartupProbePath: startupProbePath,
StartupProbePort: startupProbePort,
InPlaceRestartExitCode: inPlaceRestartExitCode,
PodInPlaceRestartAttempt: nil,
IsBarrierActive: true,
Exit: os.Exit,
StartWorker: func(ctx context.Context, command string) error {
shell := os.Getenv("SHELL")
if shell == "" {
shell = "/bin/sh"
var shell string
if envShell := os.Getenv("SHELL"); envShell != "" {
shell = envShell
} else if bash, err := exec.LookPath("bash"); err == nil {
shell = bash
} else {
setupLog.Error(nil, "shell not found")
os.Exit(1)
}
cmd := exec.CommandContext(ctx, shell, "-c", command)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

return cmd.Run()
},
StartStartupProbe: func(addr string, handler http.Handler) error {
server := &http.Server{Addr: addr, Handler: handler}
return server.ListenAndServe()
},
}
}

Expand All @@ -267,7 +320,16 @@ func (r *InPlaceRestartAgent) SetupWithManager(mgr ctrl.Manager) error {
// Reconcile handles the in-place restart logic at the Pod level
func (r *InPlaceRestartAgent) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
log = log.WithValues("namespace", r.Namespace, "podName", r.PodName, "inPlaceRestartExitCode", r.InPlaceRestartExitCode, "workerCommand", r.WorkerCommand, "podInPlaceRestartAttempt", r.PodInPlaceRestartAttempt, "isBarrierActive", r.IsBarrierActive)
log = log.WithValues(
"namespace", r.Namespace,
"podName", r.PodName,
"workerCommand", r.WorkerCommand,
"startupProbePath", r.StartupProbePath,
"startupProbePort", r.StartupProbePort,
"inPlaceRestartExitCode", r.InPlaceRestartExitCode,
"podInPlaceRestartAttempt", r.PodInPlaceRestartAttempt,
"isBarrierActive", r.IsBarrierActive,
)
ctx = ctrl.LoggerInto(ctx, log)
log.Info("reconciling")

Expand All @@ -289,8 +351,12 @@ func (r *InPlaceRestartAgent) Reconcile(ctx context.Context, req ctrl.Request) (
// One example is downgrading the JobSet CRD to a version that does not support in-place restart
if r.shouldBypassBarrier(&js) {
if r.IsBarrierActive {
log.Info("Bypassing sync barrier: JobSet has in-place restart disabled or uses a different restart strategy. Executing worker command")
go r.executeWorkerCommand(ctx)
log.Info("Bypassing sync barrier: JobSet has in-place restart disabled or uses a different restart strategy.")
if r.isSidecarContainerMode() {
go r.startStartupProbeServer(ctx)
} else {
go r.executeWorkerCommand(ctx)
}
r.IsBarrierActive = false
}
return ctrl.Result{}, nil
Expand Down Expand Up @@ -322,19 +388,22 @@ func (r *InPlaceRestartAgent) Reconcile(ctx context.Context, req ctrl.Request) (
// If the Pod in-place restart attempt is less than or equal to the previous in-place restart attempt, it means that the JobSet controller has marked the Pod in-place restart attempt as outdated
// Which means that the Pod should be restarted to reach the new in-place restart attempt
// So exit with the in-place restart exit code
// This will trigger container restart since Pod.spec.restartPolicy = OnFailure
// Once RestartAllContainers is released upstream (k8s 1.35), this will trigger in-place container restart since Pod.spec.initContainers[].restartPolicyRules[].action = RestartAllContainers
// For the sidecar container mode, this will trigger in-place container restart since Pod.spec.initContainers[].restartPolicyRules[].action is set to RestartAllContainers
// For the main container mode, this will trigger container restart since Pod.spec.containers[].restartPolicyRules[].action is set to Restart
if r.PodInPlaceRestartAttempt != nil && previousInPlaceRestartAttempt != nil && *r.PodInPlaceRestartAttempt <= *previousInPlaceRestartAttempt {
log.Info("exiting agent with in-place restart exit code to restart this Pod in-place", "exitCode", r.InPlaceRestartExitCode)
r.Exit(r.InPlaceRestartExitCode)
}

// Handle barrier lift
// If the barrier is active and the Pod in-place restart attempt is equal to the JobSet current in-place restart attempt, it means that the JobSet controller has marked the Pod in-place restart attempt as synced with the other Pods
// So execute the worker command
// TODO(k8s 1.35): Once RestartAllContainers is released upstream, this should succeed a start up probe instead
// So lift the barrier by starting the startup probe server (sidecar container mode) or executing the worker command (main container mode)
if r.IsBarrierActive && r.PodInPlaceRestartAttempt != nil && currentInPlaceRestartAttempt != nil && *r.PodInPlaceRestartAttempt == *currentInPlaceRestartAttempt {
go r.executeWorkerCommand(ctx)
if r.isSidecarContainerMode() {
go r.startStartupProbeServer(ctx)
} else {
go r.executeWorkerCommand(ctx)
}
r.IsBarrierActive = false
}

Expand Down Expand Up @@ -396,3 +465,22 @@ func (r *InPlaceRestartAgent) shouldBypassBarrier(js *jobset.JobSet) bool {
}
return false
}

// isSidecarContainerMode returns true if the agent is in sidecar container mode
func (r *InPlaceRestartAgent) isSidecarContainerMode() bool {
return r.WorkerCommand == ""
}

// startStartupProbeServer starts the startup probe server
func (r *InPlaceRestartAgent) startStartupProbeServer(ctx context.Context) {
log := ctrl.LoggerFrom(ctx)
log.Info("starting startup probe server")
mux := http.NewServeMux()
mux.HandleFunc(r.StartupProbePath, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
if err := r.StartStartupProbe(":"+r.StartupProbePort, mux); err != nil && err != http.ErrServerClosed {
log.Error(err, "startup probe server failed")
r.Exit(1)
}
}
Loading