Skip to content

Commit c403cfa

Browse files
committed
feat(k8s): add batch sandbox expire & endpoints status
1 parent ff0b13f commit c403cfa

File tree

9 files changed

+189
-44
lines changed

9 files changed

+189
-44
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
apiVersion: sandbox.opensandbox.io/v1alpha1
2+
kind: BatchSandbox
3+
metadata:
4+
labels:
5+
app.kubernetes.io/name: sandbox-k8s
6+
app.kubernetes.io/managed-by: kustomize
7+
name: batchsandbox-sample
8+
namespace: sandbox-k8s
9+
spec:
10+
replicas: 2
11+
template:
12+
metadata:
13+
labels:
14+
app: example
15+
spec:
16+
containers:
17+
- name: main
18+
image: registry.k8s.io/e2e-test-images/httpd:2.4.38-4
19+
command:
20+
- tail
21+
- -f
22+
- /dev/null
23+
expireTime: "2025-12-03T12:55:41Z"
24+
taskTemplate:
25+
metadata:
26+
labels:
27+
app.task: task
28+
spec:
29+
container: # container mode
30+
image: example.com/agent-task:latest
31+
command:
32+
- /usr/bin/sleep
33+
args:
34+
- "1"
35+
shardTaskPatches: # patch list for container tasks
36+
- spec:
37+
container:
38+
command: # patch command and args, the final command is `python -m http.server 8080`
39+
- python
40+
args:
41+
- -m
42+
- http.server
43+
- "8080"
44+
- spec:
45+
container:
46+
args: # patch args only, the final command is `/usr/bin/sleep 456`
47+
- "456"

kubernetes/config/samples/sandbox_v1alpha1_batchsandbox.yaml

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,3 @@ spec:
2020
- tail
2121
- -f
2222
- /dev/null
23-
expireTime: "2025-12-03T12:55:41Z"
24-
taskTemplate:
25-
metadata:
26-
labels:
27-
app.task: task
28-
spec:
29-
container: # container mode
30-
image: example.com/agent-task:latest
31-
command:
32-
- /usr/bin/sleep
33-
args:
34-
- "1"
35-
shardTaskPatches: # patch list for container tasks
36-
- spec:
37-
container:
38-
command: # patch command and args, the final command is `python -m http.server 8080`
39-
- python
40-
args:
41-
- -m
42-
- http.server
43-
- "8080"
44-
- spec:
45-
container:
46-
args: # patch args only, the final command is `/usr/bin/sleep 456`
47-
- "456"

kubernetes/config/samples/sandbox_v1alpha1_pool.yaml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ spec:
1313
app: example
1414
spec:
1515
containers:
16-
- name: executor
17-
image: reg.docker.alibaba-inc.com/sandbox-k8s/task-executor:1.0.6
16+
- name: main
17+
image: registry.k8s.io/e2e-test-images/httpd:2.4.38-4
1818
command:
19-
- /server
19+
- tail
20+
- -f
21+
- /dev/null
2022
tolerations:
2123
- operator: "Exists"
2224
capacitySpec:

kubernetes/config/samples/sandbox_v1alpha1_pooled_batchsandbox.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@ metadata:
99
spec:
1010
poolRef: pool-sample
1111
replicas: 2
12-
expireTime: "2025-12-03T12:55:41Z"
12+
expireTime: "2026-12-03T12:55:41Z"

kubernetes/internal/controller/apis.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ const (
3030
AnnoPoolAllocGenerationKey = "pool.opensandbox.io/alloc-generation"
3131

3232
FinalizerTaskCleanup = "batch-sandbox.sandbox.opensandbox.io/task-cleanup"
33+
34+
AnnotationSandboxEndpoints = "sandbox.opensandbox.io/endpoints"
3335
)
3436

3537
type SandboxAllocation struct {

kubernetes/internal/controller/batchsandbox_controller.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,23 @@ func (r *BatchSandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request
112112
}
113113
return ctrl.Result{}, err
114114
}
115+
// handle expire
116+
if expireAt := batchSbx.Spec.ExpireTime; expireAt != nil {
117+
now := time.Now()
118+
if expireAt.Time.Before(now) {
119+
if batchSbx.DeletionTimestamp == nil {
120+
klog.Infof("batch sandbox %s expired, expire at %v, delete", klog.KObj(batchSbx), expireAt)
121+
if err := r.Delete(ctx, batchSbx); err != nil {
122+
if errors.IsNotFound(err) {
123+
return ctrl.Result{}, nil
124+
}
125+
return ctrl.Result{}, err
126+
}
127+
}
128+
} else {
129+
DurationStore.Push(types.NamespacedName{Namespace: batchSbx.Namespace, Name: batchSbx.Name}.String(), expireAt.Time.Sub(now))
130+
}
131+
}
115132

116133
// handle finalizers
117134
if batchSbx.DeletionTimestamp == nil {
@@ -153,15 +170,32 @@ func (r *BatchSandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request
153170
newStatus.Replicas = 0
154171
newStatus.Allocated = 0
155172
newStatus.Ready = 0
173+
ipList := []string{}
156174
for _, pod := range pods {
157175
newStatus.Replicas++
158176
if utils.IsAssigned(pod) {
159177
newStatus.Allocated++
178+
ipList = append(ipList, pod.Status.PodIP)
160179
}
161180
if pod.Status.Phase == corev1.PodRunning && utils.IsPodReady(pod) {
162181
newStatus.Ready++
163182
}
164183
}
184+
raw, _ := json.Marshal(ipList)
185+
if batchSbx.Annotations[AnnotationSandboxEndpoints] != string(raw) {
186+
patchData, _ := json.Marshal(map[string]interface{}{
187+
"metadata": map[string]interface{}{
188+
"annotations": map[string]string{
189+
AnnotationSandboxEndpoints: string(raw),
190+
},
191+
},
192+
})
193+
obj := &sandboxv1alpha1.BatchSandbox{ObjectMeta: metav1.ObjectMeta{Namespace: batchSbx.Namespace, Name: batchSbx.Name}}
194+
if err := r.Patch(ctx, obj, client.RawPatch(types.MergePatchType, []byte(patchData))); err != nil {
195+
klog.Errorf("failed to patch annotation %s, %s, body %s", AnnotationSandboxEndpoints, klog.KObj(batchSbx), patchData)
196+
errors = append(errors, err)
197+
}
198+
}
165199
if !reflect.DeepEqual(newStatus, batchSbx.Status) {
166200
klog.Infof("To update BatchSandbox status for %s, replicas=%d allocated=%d ready=%d", klog.KObj(batchSbx), newStatus.Replicas, newStatus.Allocated, newStatus.Ready)
167201
if err := r.updateStatus(batchSbx, newStatus); err != nil {
@@ -417,7 +451,7 @@ func (r *BatchSandboxReconciler) releasePods(ctx context.Context, batchSbx *sand
417451
Name: batchSbx.Name,
418452
},
419453
}
420-
return r.Client.Patch(ctx, b, client.RawPatch(types.StrategicMergePatchType, []byte(body)))
454+
return r.Client.Patch(ctx, b, client.RawPatch(types.MergePatchType, []byte(body)))
421455
}
422456

423457
// Normal Mode

kubernetes/internal/controller/batchsandbox_controller_test.go

Lines changed: 83 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ package controller
3232

3333
import (
3434
"context"
35+
"encoding/json"
3536
gerrors "errors"
3637
"fmt"
38+
"net"
3739
"reflect"
3840
"sync"
3941
"testing"
@@ -57,7 +59,9 @@ import (
5759
"k8s.io/apimachinery/pkg/util/rand"
5860
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
5961
"k8s.io/client-go/tools/record"
62+
"k8s.io/client-go/util/retry"
6063
"k8s.io/utils/ptr"
64+
"k8s.io/utils/set"
6165
"sigs.k8s.io/controller-runtime/pkg/client"
6266
"sigs.k8s.io/controller-runtime/pkg/client/fake"
6367
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
@@ -122,18 +126,49 @@ var _ = Describe("BatchSandbox Controller", func() {
122126
err := k8sClient.Get(ctx, typeNamespacedName, resource)
123127
if !errors.IsNotFound(err) {
124128
Expect(err).NotTo(HaveOccurred())
129+
} else {
130+
return
125131
}
126132
By(fmt.Sprintf("Cleanup the specific resource instance BatchSandbox %s", typeNamespacedName))
127133
Expect(k8sClient.Delete(ctx, resource)).To(Succeed())
128134
})
129-
It("should successfully update batch sandbox status", func() {
135+
It("should successfully create pod, update batch sandbox status, endpoints info", func() {
136+
wantIPSet := make(set.Set[string])
130137
Eventually(func(g Gomega) {
131138
bs := &sandboxv1alpha1.BatchSandbox{}
132139
if err := k8sClient.Get(ctx, typeNamespacedName, bs); err != nil {
133140
return
134141
}
142+
allPods := &corev1.PodList{}
143+
g.Expect(k8sClient.List(ctx, allPods, &client.ListOptions{Namespace: bs.Namespace})).Should(Succeed())
144+
pods := []*corev1.Pod{}
145+
for i := range allPods.Items {
146+
po := &allPods.Items[i]
147+
if metav1.IsControlledBy(po, bs) {
148+
pods = append(pods, po)
149+
if po.Status.PodIP != "" {
150+
continue
151+
}
152+
// patch status to make pod Scheduled
153+
mockIP := randomIPv4().String()
154+
wantIPSet.Insert(mockIP)
155+
po.Status.PodIP = mockIP
156+
po.Status.Phase = corev1.PodRunning
157+
po.Status.Conditions = []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}
158+
Expect(k8sClient.Status().Update(context.Background(), po)).To(Succeed())
159+
}
160+
}
161+
g.Expect(len(pods)).To(Equal(int(*bs.Spec.Replicas)))
135162
g.Expect(bs.Status.ObservedGeneration).To(Equal(bs.Generation))
136163
g.Expect(bs.Status.Replicas).To(Equal(*bs.Spec.Replicas))
164+
g.Expect(bs.Status.Allocated).To(Equal(*bs.Spec.Replicas))
165+
g.Expect(bs.Status.Ready).To(Equal(*bs.Spec.Replicas))
166+
167+
gotIPs := []string{}
168+
if raw := bs.Annotations[AnnotationSandboxEndpoints]; raw != "" {
169+
json.Unmarshal([]byte(raw), &gotIPs)
170+
}
171+
g.Expect(wantIPSet.Equal(set.New(gotIPs...))).To(BeTrue(), fmt.Sprintf("wantIPSet %v, gotIPs %v", wantIPSet.SortedList(), gotIPs))
137172
}, timeout, interval).Should(Succeed())
138173
})
139174
It("should successfully correctly create new Pod and update batch sandbox status when user scale out", func() {
@@ -190,6 +225,33 @@ var _ = Describe("BatchSandbox Controller", func() {
190225
g.Expect(newPod.CreationTimestamp).NotTo(Equal(oldPod.CreationTimestamp))
191226
}, timeout, interval).Should(Succeed())
192227
})
228+
It("should delete batch sandbox and related Pods for expired batch sandbox", func() {
229+
Expect(retry.RetryOnConflict(retry.DefaultRetry, func() error {
230+
bs := &sandboxv1alpha1.BatchSandbox{}
231+
if err := k8sClient.Get(ctx, typeNamespacedName, bs); err != nil {
232+
return err
233+
}
234+
bs.Spec.ExpireTime = &metav1.Time{Time: time.Now().Add(3 * time.Second)}
235+
return k8sClient.Update(ctx, bs)
236+
})).Should(Succeed())
237+
238+
Eventually(
239+
func(g Gomega) {
240+
bs := &sandboxv1alpha1.BatchSandbox{}
241+
g.Expect(errors.IsNotFound(k8sClient.Get(ctx, typeNamespacedName, bs))).To(BeTrue())
242+
allPods := &corev1.PodList{}
243+
g.Expect(k8sClient.List(ctx, allPods, &client.ListOptions{Namespace: bs.Namespace})).Should(Succeed())
244+
pods := []*corev1.Pod{}
245+
for i := range allPods.Items {
246+
po := &allPods.Items[i]
247+
if metav1.IsControlledBy(po, bs) {
248+
pods = append(pods, po)
249+
}
250+
}
251+
g.Expect(len(pods)).To(BeZero())
252+
},
253+
timeout, interval).Should(Succeed())
254+
})
193255
})
194256

195257
// Pooling Mode
@@ -233,7 +295,7 @@ var _ = Describe("BatchSandbox Controller", func() {
233295
Expect(k8sClient.Delete(ctx, resource)).To(Succeed())
234296
})
235297

236-
It("should successfully update batch sandbox status when get pod from pool alloc", func() {
298+
It("should successfully update batch sandbox status, sbx endpoints info when get pod from pool alloc", func() {
237299
// mock pool allocation
238300
mockPods := []string{}
239301
for i := range replicas {
@@ -256,10 +318,14 @@ var _ = Describe("BatchSandbox Controller", func() {
256318
po.Status.Conditions = []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}
257319
Expect(k8sClient.Status().Update(context.Background(), po)).To(Succeed())
258320
}
259-
bs := &sandboxv1alpha1.BatchSandbox{}
260-
Expect(k8sClient.Get(ctx, typeNamespacedName, bs)).To(Succeed())
261-
setSandboxAllocation(bs, SandboxAllocation{Pods: mockPods})
262-
Expect(k8sClient.Update(ctx, bs)).To(Succeed())
321+
Expect(retry.RetryOnConflict(retry.DefaultRetry, func() error {
322+
bs := &sandboxv1alpha1.BatchSandbox{}
323+
if err := k8sClient.Get(ctx, typeNamespacedName, bs); err != nil {
324+
return err
325+
}
326+
setSandboxAllocation(bs, SandboxAllocation{Pods: mockPods})
327+
return k8sClient.Update(ctx, bs)
328+
})).Should(Succeed())
263329
By(fmt.Sprintf("Mock pool allocate Pod %v for BatchSandbox %s", mockPods, typeNamespacedName))
264330

265331
Eventually(func(g Gomega) {
@@ -271,11 +337,22 @@ var _ = Describe("BatchSandbox Controller", func() {
271337
g.Expect(bs.Status.Replicas).To(Equal(*bs.Spec.Replicas))
272338
g.Expect(bs.Status.Allocated).To(Equal(*bs.Spec.Replicas))
273339
g.Expect(bs.Status.Ready).To(Equal(*bs.Spec.Replicas))
340+
341+
g.Expect(bs.Annotations[AnnotationSandboxEndpoints]).To(Equal("[\"1.2.3.4\"]"))
274342
}, timeout, interval).Should(Succeed())
275343
})
276344
})
277345
})
278346

347+
func randomIPv4() net.IP {
348+
rand.Seed(time.Now().UnixNano())
349+
ip := make(net.IP, 4)
350+
for i := range ip {
351+
ip[i] = byte(rand.Intn(256))
352+
}
353+
return ip
354+
}
355+
279356
var _ = Describe("BatchSandbox Task Scheduler", func() {
280357
var (
281358
timeout = 30 * time.Second

kubernetes/internal/controller/pool_controller_test.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"k8s.io/apimachinery/pkg/fields"
4545
"k8s.io/apimachinery/pkg/types"
4646
"k8s.io/apimachinery/pkg/util/rand"
47+
"k8s.io/client-go/util/retry"
4748
"k8s.io/utils/ptr"
4849
kclient "sigs.k8s.io/controller-runtime/pkg/client"
4950

@@ -275,13 +276,20 @@ var _ = Describe("Pool update", func() {
275276
Expect(k8sClient.Delete(ctx, resource)).To(Succeed())
276277
})
277278
It("should successfully update pool revision", func() {
278-
pool := &sandboxv1alpha1.Pool{}
279-
Expect(k8sClient.Get(ctx, typeNamespacedName, pool)).To(Succeed())
280-
oldRevision := pool.Status.Revision
281-
pool.Spec.Template.Labels = map[string]string{
282-
"test.pool.update": "v1",
283-
}
284-
Expect(k8sClient.Update(ctx, pool)).To(Succeed())
279+
var oldRevision string
280+
Expect(retry.RetryOnConflict(retry.DefaultRetry, func() error {
281+
pool := &sandboxv1alpha1.Pool{}
282+
if err := k8sClient.Get(ctx, typeNamespacedName, pool); err != nil {
283+
return err
284+
}
285+
if oldRevision == "" {
286+
oldRevision = pool.Status.Revision
287+
}
288+
pool.Spec.Template.Labels = map[string]string{
289+
"test.pool.update": "v1",
290+
}
291+
return k8sClient.Update(ctx, pool)
292+
})).Should(Succeed())
285293
Eventually(func(g Gomega) {
286294
pool := &sandboxv1alpha1.Pool{}
287295
Expect(k8sClient.Get(ctx, typeNamespacedName, pool)).To(Succeed())

kubernetes/internal/utils/pod.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ func getPodsPrefix(controllerName string) string {
169169
}
170170

171171
func IsAssigned(pod *v1.Pod) bool {
172-
return pod != nil && pod.Spec.NodeName != "" && pod.Status.PodIP != ""
172+
return pod != nil && (pod.Spec.NodeName != "" || pod.Status.PodIP != "")
173173
}
174174

175175
func PodNameSorter(a, b *v1.Pod) int {

0 commit comments

Comments
 (0)