Skip to content

Commit 749e0c4

Browse files
ventifusclaude
andcommitted
feat: add admin runjob endpoint for streaming Kubernetes Job output
Adds POST /admin/.../runjob which accepts a Job manifest (JSON), creates the Job on the cluster, streams the pod's logs back to the caller as they arrive, then deletes the Job on completion or cancellation. Namespace defaults to openshift-azure-operator if absent; a random 5-character suffix is appended to the job name to prevent collisions. Also adds KubeFollowPodLogs to KubeActions for log streaming with Follow=true, composable by higher-level actions. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 5e987bd commit 749e0c4

File tree

5 files changed

+527
-0
lines changed

5 files changed

+527
-0
lines changed
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
package frontend
2+
3+
// Copyright (c) Microsoft Corporation.
4+
// Licensed under the Apache License 2.0.
5+
6+
import (
7+
"context"
8+
"encoding/json"
9+
"fmt"
10+
"io"
11+
"net/http"
12+
"path/filepath"
13+
"strings"
14+
"time"
15+
16+
"github.com/go-chi/chi/v5"
17+
"github.com/sirupsen/logrus"
18+
19+
batchv1 "k8s.io/api/batch/v1"
20+
corev1 "k8s.io/api/core/v1"
21+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
22+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
23+
kruntime "k8s.io/apimachinery/pkg/runtime"
24+
"k8s.io/apimachinery/pkg/runtime/schema"
25+
utilrand "k8s.io/apimachinery/pkg/util/rand"
26+
"k8s.io/apimachinery/pkg/watch"
27+
28+
"github.com/Azure/ARO-RP/pkg/api"
29+
"github.com/Azure/ARO-RP/pkg/database/cosmosdb"
30+
"github.com/Azure/ARO-RP/pkg/frontend/adminactions"
31+
"github.com/Azure/ARO-RP/pkg/frontend/middleware"
32+
)
33+
34+
const (
35+
runJobDefaultNamespace = "openshift-azure-operator"
36+
runJobCleanupTimeout = 30 * time.Second
37+
)
38+
39+
func (f *frontend) postAdminOpenShiftClusterRunJob(w http.ResponseWriter, r *http.Request) {
40+
ctx := r.Context()
41+
log := ctx.Value(middleware.ContextKeyLog).(*logrus.Entry)
42+
r.URL.Path = filepath.Dir(r.URL.Path)
43+
44+
reader, writer := io.Pipe()
45+
err := f._postAdminOpenShiftClusterRunJob(ctx, r, log, writer)
46+
var header http.Header
47+
if err == nil {
48+
header = http.Header{"Content-Type": []string{"text/plain"}}
49+
}
50+
f.streamResponder.AdminReplyStream(log, w, header, reader, err)
51+
}
52+
53+
func (f *frontend) _postAdminOpenShiftClusterRunJob(ctx context.Context, r *http.Request, log *logrus.Entry, writer io.WriteCloser) error {
54+
resType, resName, resGroupName := chi.URLParam(r, "resourceType"), chi.URLParam(r, "resourceName"), chi.URLParam(r, "resourceGroupName")
55+
56+
body := r.Context().Value(middleware.ContextKeyBody).([]byte)
57+
job, err := parseAndValidateJob(body)
58+
if err != nil {
59+
return err
60+
}
61+
62+
resourceID := strings.TrimPrefix(r.URL.Path, "/admin")
63+
64+
dbOpenShiftClusters, err := f.dbGroup.OpenShiftClusters()
65+
if err != nil {
66+
return api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", err.Error())
67+
}
68+
69+
doc, err := dbOpenShiftClusters.Get(ctx, resourceID)
70+
switch {
71+
case cosmosdb.IsErrorStatusCode(err, http.StatusNotFound):
72+
return api.NewCloudError(http.StatusNotFound, api.CloudErrorCodeResourceNotFound, "",
73+
fmt.Sprintf("The Resource '%s/%s' under resource group '%s' was not found.", resType, resName, resGroupName))
74+
case err != nil:
75+
return err
76+
}
77+
78+
k, err := f.kubeActionsFactory(log, f.env, doc.OpenShiftCluster)
79+
if err != nil {
80+
return err
81+
}
82+
83+
go runJobStream(ctx, k, job, writer)
84+
return nil
85+
}
86+
87+
func parseAndValidateJob(body []byte) (*batchv1.Job, error) {
88+
if len(body) == 0 {
89+
return nil, api.NewCloudError(http.StatusBadRequest, api.CloudErrorCodeInvalidRequestContent, "",
90+
"The request body must not be empty.")
91+
}
92+
93+
var raw map[string]interface{}
94+
if err := json.Unmarshal(body, &raw); err != nil {
95+
return nil, api.NewCloudError(http.StatusBadRequest, api.CloudErrorCodeInvalidRequestContent, "",
96+
fmt.Sprintf("Failed to parse request body: %v", err))
97+
}
98+
99+
kind, _ := raw["kind"].(string)
100+
if kind != "Job" {
101+
return nil, api.NewCloudError(http.StatusBadRequest, api.CloudErrorCodeInvalidParameter, "",
102+
fmt.Sprintf("Expected kind 'Job', got '%s'.", kind))
103+
}
104+
105+
var job batchv1.Job
106+
if err := kruntime.DefaultUnstructuredConverter.FromUnstructured(raw, &job); err != nil {
107+
return nil, api.NewCloudError(http.StatusBadRequest, api.CloudErrorCodeInvalidRequestContent, "",
108+
fmt.Sprintf("Failed to convert manifest to Job: %v", err))
109+
}
110+
111+
if job.Name == "" {
112+
return nil, api.NewCloudError(http.StatusBadRequest, api.CloudErrorCodeInvalidParameter, "",
113+
"The provided Job manifest must have a non-empty metadata.name.")
114+
}
115+
116+
if job.Namespace == "" {
117+
job.Namespace = runJobDefaultNamespace
118+
}
119+
120+
job.Name = job.Name + "-" + utilrand.String(5)
121+
122+
return &job, nil
123+
}
124+
125+
// runJobStream creates a Kubernetes Job on the cluster, streams the pod's logs back to w
126+
// as they arrive, then deletes the Job regardless of outcome. It is called in a goroutine
127+
// by the HTTP handler and may also be called directly by higher-level composed actions.
128+
func runJobStream(ctx context.Context, k adminactions.KubeActions, job *batchv1.Job, w io.WriteCloser) {
129+
defer w.Close()
130+
131+
namespace := job.Namespace
132+
jobName := job.Name
133+
134+
fmt.Fprintf(w, "Creating job %s in %s...\n", jobName, namespace)
135+
136+
unstrMap, err := kruntime.DefaultUnstructuredConverter.ToUnstructured(job)
137+
if err != nil {
138+
fmt.Fprintf(w, "Failed to prepare job manifest: %v\n", err)
139+
return
140+
}
141+
un := &unstructured.Unstructured{Object: unstrMap}
142+
un.SetGroupVersionKind(schema.GroupVersionKind{Group: "batch", Version: "v1", Kind: "Job"})
143+
144+
if err := k.KubeCreateOrUpdate(ctx, un); err != nil {
145+
fmt.Fprintf(w, "Failed to create job: %v\n", err)
146+
return
147+
}
148+
149+
fmt.Fprintf(w, "Waiting for pod...\n")
150+
podName, err := waitForJobPod(ctx, k, namespace, jobName)
151+
if err != nil {
152+
fmt.Fprintf(w, "Error waiting for pod: %v\n", err)
153+
cleanupJob(k, namespace, jobName)
154+
return
155+
}
156+
157+
fmt.Fprintf(w, "Pod %s assigned, streaming logs...\n", podName)
158+
159+
if err := k.KubeFollowPodLogs(ctx, namespace, podName, "", newLimitedWriter(w, "pod logs")); err != nil && ctx.Err() == nil {
160+
fmt.Fprintf(w, "Log streaming error: %v\n", err)
161+
}
162+
163+
if ctx.Err() != nil {
164+
fmt.Fprintf(w, "Request cancelled.\n")
165+
cleanupJob(k, namespace, jobName)
166+
return
167+
}
168+
169+
if ok, err := jobSucceeded(ctx, k, namespace, jobName); err != nil {
170+
fmt.Fprintf(w, "Could not determine job result: %v\n", err)
171+
} else if ok {
172+
fmt.Fprintf(w, "Job succeeded.\n")
173+
} else {
174+
fmt.Fprintf(w, "Job failed.\n")
175+
}
176+
177+
cleanupJob(k, namespace, jobName)
178+
fmt.Fprintf(w, "Cleanup complete.\n")
179+
}
180+
181+
// waitForJobPod watches pods with the job-name label until one reaches Running,
182+
// Succeeded, or Failed phase, then returns the pod name.
183+
func waitForJobPod(ctx context.Context, k adminactions.KubeActions, namespace, jobName string) (string, error) {
184+
podTemplate := &unstructured.Unstructured{}
185+
podTemplate.SetGroupVersionKind(schema.GroupVersionKind{Version: "v1", Kind: "Pod"})
186+
podTemplate.SetNamespace(namespace)
187+
podTemplate.SetLabels(map[string]string{"batch.kubernetes.io/job-name": jobName})
188+
189+
watcher, err := k.KubeWatch(ctx, podTemplate, "batch.kubernetes.io/job-name")
190+
if err != nil {
191+
return "", fmt.Errorf("watching pods: %w", err)
192+
}
193+
defer watcher.Stop()
194+
195+
for {
196+
select {
197+
case <-ctx.Done():
198+
return "", ctx.Err()
199+
case event, ok := <-watcher.ResultChan():
200+
if !ok {
201+
return "", fmt.Errorf("pod watch channel closed unexpectedly")
202+
}
203+
if event.Type != watch.Added && event.Type != watch.Modified {
204+
continue
205+
}
206+
pod, ok := event.Object.(*unstructured.Unstructured)
207+
if !ok {
208+
continue
209+
}
210+
name := pod.GetName()
211+
phase, _, _ := unstructured.NestedString(pod.Object, "status", "phase")
212+
switch corev1.PodPhase(phase) {
213+
case corev1.PodRunning, corev1.PodSucceeded, corev1.PodFailed:
214+
return name, nil
215+
}
216+
}
217+
}
218+
}
219+
220+
// jobSucceeded returns true if the Job has a Complete=True condition.
221+
func jobSucceeded(ctx context.Context, k adminactions.KubeActions, namespace, jobName string) (bool, error) {
222+
data, err := k.KubeGet(ctx, "Job.batch", namespace, jobName)
223+
if err != nil {
224+
return false, err
225+
}
226+
var obj map[string]interface{}
227+
if err := json.Unmarshal(data, &obj); err != nil {
228+
return false, err
229+
}
230+
conditions, _, _ := unstructured.NestedSlice(obj, "status", "conditions")
231+
for _, c := range conditions {
232+
m, ok := c.(map[string]interface{})
233+
if !ok {
234+
continue
235+
}
236+
if m["type"] == "Complete" && m["status"] == "True" {
237+
return true, nil
238+
}
239+
}
240+
return false, nil
241+
}
242+
243+
// cleanupJob deletes the Job with a fresh context so that cancellation of the
244+
// caller's request context does not prevent cleanup.
245+
func cleanupJob(k adminactions.KubeActions, namespace, jobName string) {
246+
ctx, cancel := context.WithTimeout(context.Background(), runJobCleanupTimeout)
247+
defer cancel()
248+
foreground := metav1.DeletePropagationForeground
249+
_ = k.KubeDelete(ctx, "Job.batch", namespace, jobName, false, &foreground)
250+
}

0 commit comments

Comments
 (0)