Skip to content

Commit 120320f

Browse files
authored
Implement worker-waiter in Go (#287)
1 parent 2b5636a commit 120320f

File tree

7 files changed

+198
-38
lines changed

7 files changed

+198
-38
lines changed

Makefile

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,10 @@ docker-build-tyger-server: login-acr
9191
docker-build-buffer-sidecar: login-acr
9292
$(MAKE) _docker-build DOCKER_BUILD_TARGET=buffer-sidecar
9393

94-
docker-build-worker-waiter: login-acr
95-
$(MAKE) _docker-build DOCKER_BUILD_TARGET=worker-waiter
96-
9794
docker-build-helm: login-acr
9895
$(MAKE) _docker-build DOCKER_BUILD_TARGET=helm
9996

100-
docker-build: docker-build-test docker-build-tyger-server docker-build-buffer-sidecar docker-build-worker-waiter
97+
docker-build: docker-build-test docker-build-tyger-server docker-build-buffer-sidecar
10198

10299
publish-official-images:
103100
container_registry_spec=$$(echo '${DEVELOPER_CONFIG_JSON}' | jq -c '.officialPushContainerRegistry')

Makefile.cloud

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ set-localsettings: set-context
137137
}
138138
EOF
139139

140-
up: install-cli ensure-environment-conditionally docker-build-tyger-server docker-build-buffer-sidecar docker-build-worker-waiter docker-build-test
140+
up: install-cli ensure-environment-conditionally docker-build-tyger-server docker-build-buffer-sidecar docker-build-test
141141
tyger api install -f <(scripts/get-config.sh)
142142
$(MAKE) cli-ready
143143

cli/Dockerfile

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ ARG TYGER_VERSION="a"
1111
RUN --mount=type=cache,target=/root/.cache/go-build go mod download
1212
COPY . .
1313
RUN --mount=type=cache,target=/root/.cache/go-build \
14-
CGO_ENABLED=0 GOOS=${BUILDOS} GOARCH=${TARGETARCH} go build -ldflags="-s -w -X main.version=${TYGER_VERSION}" -v -o /go/bin/dist/${BUILDOS}/${TARGETARCH}/ ./cmd/buffer-sidecar ./cmd/tyger ./cmd/buffer-copier
14+
CGO_ENABLED=0 GOOS=${BUILDOS} GOARCH=${TARGETARCH} go build -ldflags="-s -w -X main.version=${TYGER_VERSION}" -v -o /go/bin/dist/${BUILDOS}/${TARGETARCH}/ ./cmd/buffer-sidecar ./cmd/tyger ./cmd/buffer-copier ./cmd/worker-waiter
1515

1616
FROM mcr.microsoft.com/azurelinux/distroless/minimal:3.0 AS buffer-sidecar
1717
ARG TARGETARCH BUILDOS
@@ -35,3 +35,11 @@ WORKDIR /app
3535
COPY --from=go-build /go/bin/dist/${BUILDOS}/${TARGETARCH}/buffer-copier .
3636
USER nonroot:nonroot
3737
ENTRYPOINT ["/app/buffer-copier"]
38+
39+
FROM mcr.microsoft.com/azurelinux/distroless/minimal:3.0 AS worker-waiter
40+
ARG TARGETARCH BUILDOS
41+
42+
WORKDIR /app
43+
COPY --from=go-build /go/bin/dist/${BUILDOS}/${TARGETARCH}/worker-waiter .
44+
USER nonroot:nonroot
45+
ENTRYPOINT ["/app/worker-waiter"]

cli/cmd/worker-waiter/main.go

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
package main
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"net"
10+
"os"
11+
"os/signal"
12+
"syscall"
13+
"time"
14+
15+
"github.com/microsoft/tyger/cli/internal/cmd"
16+
"github.com/rs/zerolog"
17+
"github.com/rs/zerolog/log"
18+
"github.com/spf13/cobra"
19+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20+
"k8s.io/apimachinery/pkg/fields"
21+
"k8s.io/client-go/kubernetes"
22+
"k8s.io/client-go/rest"
23+
)
24+
25+
var (
26+
// set during build
27+
version = ""
28+
)
29+
30+
func main() {
31+
rootCmd := newRootCommand()
32+
if err := rootCmd.Execute(); err != nil {
33+
os.Exit(1)
34+
}
35+
}
36+
37+
func newRootCommand() *cobra.Command {
38+
rootCommand := cmd.NewCommonRootCommand(version)
39+
rootCommand.Use = "worker-waiter"
40+
rootCommand.Short = "Waits for worker pods to be ready and DNS to resolve"
41+
42+
var labelSelector string
43+
var hostnames []string
44+
var namespace string
45+
var pollInterval time.Duration
46+
47+
rootCommand.Flags().StringVar(&labelSelector, "label-selector", "", "The label selector for worker pods (e.g., tyger-worker=123)")
48+
rootCommand.Flags().StringSliceVar(&hostnames, "hostname", nil, "Hostnames to wait for DNS resolution (can be specified multiple times)")
49+
rootCommand.Flags().StringVar(&namespace, "namespace", "", "The Kubernetes namespace to watch pods in")
50+
rootCommand.Flags().DurationVar(&pollInterval, "poll-interval", 1*time.Second, "Interval between DNS resolution attempts")
51+
52+
rootCommand.MarkFlagRequired("label-selector")
53+
rootCommand.MarkFlagRequired("namespace")
54+
55+
rootCommand.RunE = func(cmd *cobra.Command, args []string) error {
56+
ctx, cancel := signal.NotifyContext(cmd.Context(), syscall.SIGINT, syscall.SIGTERM)
57+
defer cancel()
58+
59+
return waitForWorkers(ctx, namespace, labelSelector, hostnames, pollInterval)
60+
}
61+
62+
return rootCommand
63+
}
64+
65+
func waitForWorkers(ctx context.Context, namespace, labelSelector string, hostnames []string, pollInterval time.Duration) error {
66+
log.Info().
67+
Str("namespace", namespace).
68+
Str("labelSelector", labelSelector).
69+
Strs("hostnames", hostnames).
70+
Msg("Starting worker waiter")
71+
72+
// Wait for pods to be ready
73+
if err := waitForPodsReady(ctx, namespace, labelSelector, len(hostnames), pollInterval); err != nil {
74+
return fmt.Errorf("failed waiting for pods: %w", err)
75+
}
76+
77+
// Wait for DNS resolution
78+
for _, hostname := range hostnames {
79+
if err := waitForDNS(ctx, hostname, pollInterval); err != nil {
80+
return fmt.Errorf("failed waiting for DNS resolution of %s: %w", hostname, err)
81+
}
82+
}
83+
84+
log.Info().Msg("All workers ready and DNS resolved")
85+
return nil
86+
}
87+
88+
func waitForPodsReady(ctx context.Context, namespace, labelSelector string, expectedCount int, pollInterval time.Duration) error {
89+
config, err := rest.InClusterConfig()
90+
if err != nil {
91+
return fmt.Errorf("failed to get in-cluster config: %w", err)
92+
}
93+
94+
clientset, err := kubernetes.NewForConfig(config)
95+
if err != nil {
96+
return fmt.Errorf("failed to create kubernetes client: %w", err)
97+
}
98+
99+
for {
100+
if err := ctx.Err(); err != nil {
101+
return err
102+
}
103+
104+
pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
105+
LabelSelector: labelSelector,
106+
FieldSelector: fields.Everything().String(),
107+
})
108+
if err != nil {
109+
log.Warn().Err(err).Msg("Error listing pods, retrying...")
110+
time.Sleep(pollInterval)
111+
continue
112+
}
113+
114+
if len(pods.Items) < expectedCount {
115+
log.Info().
116+
Int("found", len(pods.Items)).
117+
Int("expected", expectedCount).
118+
Msg("Waiting for worker pods to be created...")
119+
time.Sleep(pollInterval)
120+
continue
121+
}
122+
123+
readyCount := 0
124+
for _, pod := range pods.Items {
125+
for _, condition := range pod.Status.Conditions {
126+
if condition.Type == "Ready" && condition.Status == "True" {
127+
readyCount++
128+
break
129+
}
130+
}
131+
}
132+
133+
if readyCount >= expectedCount {
134+
log.Info().Int("count", readyCount).Msg("All worker pods are ready")
135+
return nil
136+
}
137+
138+
log.Info().
139+
Int("ready", readyCount).
140+
Int("expected", expectedCount).
141+
Msg("Waiting for worker pods to be ready")
142+
time.Sleep(pollInterval)
143+
}
144+
}
145+
146+
func waitForDNS(ctx context.Context, hostname string, pollInterval time.Duration) error {
147+
log.Info().Str("hostname", hostname).Msg("Waiting for hostname to resolve")
148+
149+
resolver := &net.Resolver{}
150+
151+
for {
152+
if err := ctx.Err(); err != nil {
153+
return err
154+
}
155+
156+
addrs, err := resolver.LookupHost(ctx, hostname)
157+
if err == nil && len(addrs) > 0 {
158+
log.Info().
159+
Str("hostname", hostname).
160+
Strs("addresses", addrs).
161+
Msg("Hostname resolved successfully")
162+
return nil
163+
}
164+
165+
if err != nil {
166+
if log.Logger.GetLevel() <= zerolog.DebugLevel {
167+
log.Debug().Err(err).Str("hostname", hostname).Msg("DNS lookup failed, retrying...")
168+
}
169+
}
170+
171+
time.Sleep(pollInterval)
172+
}
173+
}

deploy/images/worker-waiter/Dockerfile

Lines changed: 0 additions & 14 deletions
This file was deleted.

scripts/build-images.sh

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ Options:
1616
-r, --registry The FQDN of container registry to push to.
1717
--test-connectivity Build (and optionally push) the testconnectivity image
1818
--tyger-server Build (and optionally push) the tyger-server image
19-
--worker-waiter Build (and optionally push) the worker-waiter image
2019
--buffer-sidecar Build (and optionally push) the buffer-sidecar image
2120
--helm Package and push the Tyger Helm chart
2221
--registry-directory The parent directory of the repositories. e.g. <registry>/<registry-dir>/<repo-name>
@@ -47,10 +46,6 @@ while [[ $# -gt 0 ]]; do
4746
tyger_server=1
4847
shift
4948
;;
50-
--worker-waiter)
51-
worker_waiter=1
52-
shift
53-
;;
5449
--buffer-sidecar)
5550
buffer_sidecar=1
5651
shift
@@ -213,15 +208,6 @@ if [[ -n "${tyger_server:-}" ]]; then
213208
build_and_push
214209
fi
215210

216-
if [[ -n "${worker_waiter:-}" ]]; then
217-
build_context="${repo_root_dir}/deploy/images/worker-waiter"
218-
dockerfile_path="${repo_root_dir}/deploy/images/worker-waiter/Dockerfile"
219-
target="worker-waiter"
220-
repo="worker-waiter"
221-
222-
build_and_push
223-
fi
224-
225211
if [[ -n "${buffer_sidecar:-}" ]]; then
226212
build_context="${repo_root_dir}/cli"
227213
dockerfile_path="${repo_root_dir}/cli/Dockerfile"
@@ -239,6 +225,11 @@ if [[ -n "${buffer_sidecar:-}" ]]; then
239225
repo="buffer-copier"
240226

241227
build_and_push
228+
229+
target="worker-waiter"
230+
repo="worker-waiter"
231+
232+
build_and_push
242233
fi
243234

244235
if [[ -n "${helm:-}" ]]; then

server/ControlPlane/Compute/Kubernetes/KubernetesRunCreator.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -345,19 +345,24 @@ private void AddWaitForWorkerInitContainersToJob(V1Pod jobPod, Run run)
345345
VolumeMounts = [new("/no-op/", "no-op")]
346346
});
347347

348-
var waitScript = new StringBuilder("set -euo pipefail").AppendLine();
349-
waitScript.AppendLine($"until kubectl wait --for=condition=ready pod -l {WorkerLabel}={run.Id}; do echo waiting for workers to be ready; sleep 1; done;");
348+
var workerWaiterArgs = new List<string>
349+
{
350+
"--label-selector", $"{WorkerLabel}={run.Id}",
351+
"--namespace", _k8sOptions.Namespace,
352+
};
353+
350354
foreach (var host in GetWorkerDnsNames(run))
351355
{
352-
waitScript.AppendLine($"until getent hosts {host}; do echo waiting for hostname {host} to resolve; sleep 1; done;");
356+
workerWaiterArgs.Add("--hostname");
357+
workerWaiterArgs.Add(host);
353358
}
354359

355360
initContainers.Add(
356361
new()
357362
{
358363
Name = "waitforworker",
359364
Image = _k8sOptions.WorkerWaiterImage,
360-
Command = ["bash", "-c", waitScript.ToString()],
365+
Args = workerWaiterArgs,
361366
});
362367

363368
(jobPod.Spec.Volumes ??= []).Add(new()

0 commit comments

Comments
 (0)