Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Commit 1637021

Browse files
niliayuhamersaw
andauthored
Adding primaryContainerName implementation to podBuilder (#280)
* Adding primaryContainerName implementation to podBuilder Signed-off-by: Ailin Yu <ailin@pachama.com> * Debugging: Mergo needs a pointer, and an excessive amount of debug printouts Signed-off-by: Ailin Yu <ailin@pachama.com> * Starting to do something, lots of debug outputs Signed-off-by: Ailin Yu <ailin@pachama.com> * Sidecar uses task exec ID Signed-off-by: Ailin Yu <ailin@pachama.com> * Cleaning up debugging Signed-off-by: Ailin Yu <ailin@pachama.com> * Modified container merging loop, and some dev/testing changes in sidecarbuilder Signed-off-by: Ailin Yu <ailin@pachama.com> * Sidecar uses primary container name from config Signed-off-by: Ailin Yu <ailin@pachama.com> * Cleanups Signed-off-by: Ailin Yu <ailin@pachama.com> * added support for default and primary container templates Signed-off-by: Daniel Rammer <daniel@union.ai> * fixed container template reference issues Signed-off-by: Daniel Rammer <daniel@union.ai> * removed unnecessary DeepCopy call Signed-off-by: Daniel Rammer <daniel@union.ai> * added unit tests Signed-off-by: Daniel Rammer <daniel@union.ai> * fixed lint issues Signed-off-by: Daniel Rammer <daniel@union.ai> Signed-off-by: Ailin Yu <ailin@pachama.com> Signed-off-by: Daniel Rammer <daniel@union.ai> Co-authored-by: Daniel Rammer <daniel@union.ai>
1 parent 9ccfa70 commit 1637021

File tree

5 files changed

+129
-37
lines changed

5 files changed

+129
-37
lines changed

go/tasks/pluginmachinery/flytek8s/pod_helper.go

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ const PodKind = "pod"
2323
const OOMKilled = "OOMKilled"
2424
const Interrupted = "Interrupted"
2525
const SIGKILL = 137
26+
const defaultContainerTemplateName = "default"
27+
const primaryContainerTemplateName = "primary"
2628

2729
// ApplyInterruptibleNodeAffinity configures the node-affinity for the pod using the configuration specified.
2830
func ApplyInterruptibleNodeAffinity(interruptible bool, podSpec *v1.PodSpec) {
@@ -135,7 +137,7 @@ func ToK8sPodSpec(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (*
135137
return pod, nil
136138
}
137139

138-
func BuildPodWithSpec(podTemplate *v1.PodTemplate, podSpec *v1.PodSpec) (*v1.Pod, error) {
140+
func BuildPodWithSpec(podTemplate *v1.PodTemplate, podSpec *v1.PodSpec, primaryContainerName string) (*v1.Pod, error) {
139141
pod := v1.Pod{
140142
TypeMeta: v12.TypeMeta{
141143
Kind: PodKind,
@@ -144,14 +146,59 @@ func BuildPodWithSpec(podTemplate *v1.PodTemplate, podSpec *v1.PodSpec) (*v1.Pod
144146
}
145147

146148
if podTemplate != nil {
149+
// merge template PodSpec
147150
basePodSpec := podTemplate.Template.Spec.DeepCopy()
148151
err := mergo.Merge(basePodSpec, podSpec, mergo.WithOverride, mergo.WithAppendSlice)
149152
if err != nil {
150153
return nil, err
151154
}
152155

153-
basePodSpec.Containers = podSpec.Containers
156+
// merge template Containers
157+
var mergedContainers []v1.Container
158+
var defaultContainerTemplate, primaryContainerTemplate *v1.Container
159+
for i := 0; i < len(podTemplate.Template.Spec.Containers); i++ {
160+
if podTemplate.Template.Spec.Containers[i].Name == defaultContainerTemplateName {
161+
defaultContainerTemplate = &podTemplate.Template.Spec.Containers[i]
162+
} else if podTemplate.Template.Spec.Containers[i].Name == primaryContainerTemplateName {
163+
primaryContainerTemplate = &podTemplate.Template.Spec.Containers[i]
164+
}
165+
}
166+
167+
for _, container := range podSpec.Containers {
168+
// if applicable start with defaultContainerTemplate
169+
var mergedContainer *v1.Container
170+
if defaultContainerTemplate != nil {
171+
mergedContainer = defaultContainerTemplate.DeepCopy()
172+
}
173+
174+
// if applicable merge with primaryContainerTemplate
175+
if container.Name == primaryContainerName && primaryContainerTemplate != nil {
176+
if mergedContainer == nil {
177+
mergedContainer = primaryContainerTemplate.DeepCopy()
178+
} else {
179+
err := mergo.Merge(mergedContainer, primaryContainerTemplate, mergo.WithOverride, mergo.WithAppendSlice)
180+
if err != nil {
181+
return nil, err
182+
}
183+
}
184+
}
185+
186+
// if applicable merge with existing container
187+
if mergedContainer == nil {
188+
mergedContainers = append(mergedContainers, container)
189+
} else {
190+
err := mergo.Merge(mergedContainer, container, mergo.WithOverride, mergo.WithAppendSlice)
191+
if err != nil {
192+
return nil, err
193+
}
194+
195+
mergedContainers = append(mergedContainers, *mergedContainer)
196+
}
197+
198+
}
154199

200+
// update Pod fields
201+
basePodSpec.Containers = mergedContainers
155202
pod.ObjectMeta = podTemplate.Template.ObjectMeta
156203
pod.Spec = *basePodSpec
157204
} else {

go/tasks/pluginmachinery/flytek8s/pod_helper_test.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1016,6 +1016,14 @@ func TestDeterminePrimaryContainerPhase(t *testing.T) {
10161016
func TestBuildPodWithSpec(t *testing.T) {
10171017
var priority int32 = 1
10181018
podSpec := v1.PodSpec{
1019+
Containers: []v1.Container{
1020+
v1.Container{
1021+
Name: "foo",
1022+
},
1023+
v1.Container{
1024+
Name: "bar",
1025+
},
1026+
},
10191027
NodeSelector: map[string]string{
10201028
"baz": "bar",
10211029
},
@@ -1031,13 +1039,27 @@ func TestBuildPodWithSpec(t *testing.T) {
10311039
},
10321040
}
10331041

1034-
pod, err := BuildPodWithSpec(nil, &podSpec)
1042+
pod, err := BuildPodWithSpec(nil, &podSpec, "foo")
10351043
assert.Nil(t, err)
10361044
assert.True(t, reflect.DeepEqual(pod.Spec, podSpec))
10371045

1046+
defaultContainerTemplate := v1.Container{
1047+
Name: defaultContainerTemplateName,
1048+
TerminationMessagePath: "/dev/default-termination-log",
1049+
}
1050+
1051+
primaryContainerTemplate := v1.Container{
1052+
Name: primaryContainerTemplateName,
1053+
TerminationMessagePath: "/dev/primary-termination-log",
1054+
}
1055+
10381056
podTemplate := v1.PodTemplate{
10391057
Template: v1.PodTemplateSpec{
10401058
Spec: v1.PodSpec{
1059+
Containers: []v1.Container{
1060+
defaultContainerTemplate,
1061+
primaryContainerTemplate,
1062+
},
10411063
HostNetwork: true,
10421064
NodeSelector: map[string]string{
10431065
"foo": "bar",
@@ -1052,7 +1074,7 @@ func TestBuildPodWithSpec(t *testing.T) {
10521074
},
10531075
}
10541076

1055-
pod, err = BuildPodWithSpec(&podTemplate, &podSpec)
1077+
pod, err = BuildPodWithSpec(&podTemplate, &podSpec, "foo")
10561078
assert.Nil(t, err)
10571079

10581080
// validate a PodTemplate-only field
@@ -1065,4 +1087,14 @@ func TestBuildPodWithSpec(t *testing.T) {
10651087
assert.Equal(t, len(podTemplate.Template.Spec.NodeSelector)+len(podSpec.NodeSelector), len(pod.Spec.NodeSelector))
10661088
// validate an appended array
10671089
assert.Equal(t, len(podTemplate.Template.Spec.Tolerations)+len(podSpec.Tolerations), len(pod.Spec.Tolerations))
1090+
1091+
// validate primary container
1092+
primaryContainer := pod.Spec.Containers[0]
1093+
assert.Equal(t, podSpec.Containers[0].Name, primaryContainer.Name)
1094+
assert.Equal(t, primaryContainerTemplate.TerminationMessagePath, primaryContainer.TerminationMessagePath)
1095+
1096+
// validate default container
1097+
defaultContainer := pod.Spec.Containers[1]
1098+
assert.Equal(t, podSpec.Containers[1].Name, defaultContainer.Name)
1099+
assert.Equal(t, defaultContainerTemplate.TerminationMessagePath, defaultContainer.TerminationMessagePath)
10681100
}

go/tasks/plugins/k8s/pod/container.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
77

8+
"github.com/flyteorg/flyteplugins/go/tasks/errors"
89
pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
910
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"
1011

@@ -27,6 +28,14 @@ func (containerPodBuilder) buildPodSpec(ctx context.Context, task *core.TaskTemp
2728
return podSpec, nil
2829
}
2930

31+
func (containerPodBuilder) getPrimaryContainerName(task *core.TaskTemplate, taskCtx pluginsCore.TaskExecutionContext) (string, error) {
32+
primaryContainerName := taskCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()
33+
if primaryContainerName == "" {
34+
return "", errors.Errorf(errors.BadTaskSpecification, "invalid TaskSpecification, missing generated name")
35+
}
36+
return primaryContainerName, nil
37+
}
38+
3039
func (containerPodBuilder) updatePodMetadata(ctx context.Context, pod *v1.Pod, task *core.TaskTemplate, taskCtx pluginsCore.TaskExecutionContext) error {
3140
return nil
3241
}

go/tasks/plugins/k8s/pod/plugin.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ var (
3434

3535
type podBuilder interface {
3636
buildPodSpec(ctx context.Context, task *core.TaskTemplate, taskCtx pluginsCore.TaskExecutionContext) (*v1.PodSpec, error)
37+
getPrimaryContainerName(task *core.TaskTemplate, taskCtx pluginsCore.TaskExecutionContext) (string, error)
3738
updatePodMetadata(ctx context.Context, pod *v1.Pod, task *core.TaskTemplate, taskCtx pluginsCore.TaskExecutionContext) error
3839
}
3940

@@ -61,7 +62,6 @@ func (p plugin) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecu
6162
builder = p.defaultPodBuilder
6263
}
6364

64-
// build pod
6565
podSpec, err := builder.buildPodSpec(ctx, task, taskCtx)
6666
if err != nil {
6767
return nil, err
@@ -70,7 +70,12 @@ func (p plugin) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecu
7070
podSpec.ServiceAccountName = flytek8s.GetServiceAccountNameFromTaskExecutionMetadata(taskCtx.TaskExecutionMetadata())
7171

7272
podTemplate := flytek8s.DefaultPodTemplateStore.LoadOrDefault(taskCtx.TaskExecutionMetadata().GetNamespace())
73-
pod, err := flytek8s.BuildPodWithSpec(podTemplate, podSpec)
73+
primaryContainerName, err := builder.getPrimaryContainerName(task, taskCtx)
74+
if err != nil {
75+
return nil, err
76+
}
77+
78+
pod, err := flytek8s.BuildPodWithSpec(podTemplate, podSpec, primaryContainerName)
7479
if err != nil {
7580
return nil, err
7681
}

go/tasks/plugins/k8s/pod/sidecar.go

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -77,19 +77,31 @@ func (sidecarPodBuilder) buildPodSpec(ctx context.Context, task *core.TaskTempla
7777
return &podSpec, nil
7878
}
7979

80-
func getPrimaryContainerNameFromConfig(task *core.TaskTemplate) (string, error) {
81-
if len(task.GetConfig()) == 0 {
82-
return "", errors.Errorf(errors.BadTaskSpecification,
83-
"invalid TaskSpecification, config needs to be non-empty and include missing [%s] key", PrimaryContainerKey)
84-
}
80+
func (sidecarPodBuilder) getPrimaryContainerName(task *core.TaskTemplate, taskCtx pluginsCore.TaskExecutionContext) (string, error) {
81+
switch task.TaskTypeVersion {
82+
case 0:
83+
// Handles pod tasks when they are defined as Sidecar tasks and marshal the podspec using k8s proto.
84+
sidecarJob := sidecarJob{}
85+
err := utils.UnmarshalStructToObj(task.GetCustom(), &sidecarJob)
86+
if err != nil {
87+
return "", errors.Errorf(errors.BadTaskSpecification, "invalid TaskSpecification [%v], Err: [%v]", task.GetCustom(), err.Error())
88+
}
8589

86-
primaryContainerName, ok := task.GetConfig()[PrimaryContainerKey]
87-
if !ok {
88-
return "", errors.Errorf(errors.BadTaskSpecification,
89-
"invalid TaskSpecification, config missing [%s] key in [%v]", PrimaryContainerKey, task.GetConfig())
90-
}
90+
return sidecarJob.PrimaryContainerName, nil
91+
default:
92+
if len(task.GetConfig()) == 0 {
93+
return "", errors.Errorf(errors.BadTaskSpecification,
94+
"invalid TaskSpecification, config needs to be non-empty and include missing [%s] key", PrimaryContainerKey)
95+
}
9196

92-
return primaryContainerName, nil
97+
primaryContainerName, ok := task.GetConfig()[PrimaryContainerKey]
98+
if !ok {
99+
return "", errors.Errorf(errors.BadTaskSpecification,
100+
"invalid TaskSpecification, config missing [%s] key in [%v]", PrimaryContainerKey, task.GetConfig())
101+
}
102+
103+
return primaryContainerName, nil
104+
}
93105
}
94106

95107
func mergeMapInto(src map[string]string, dst map[string]string) {
@@ -98,11 +110,10 @@ func mergeMapInto(src map[string]string, dst map[string]string) {
98110
}
99111
}
100112

101-
func (sidecarPodBuilder) updatePodMetadata(ctx context.Context, pod *v1.Pod, task *core.TaskTemplate, taskCtx pluginsCore.TaskExecutionContext) error {
113+
func (s sidecarPodBuilder) updatePodMetadata(ctx context.Context, pod *v1.Pod, task *core.TaskTemplate, taskCtx pluginsCore.TaskExecutionContext) error {
102114
pod.Annotations = make(map[string]string)
103115
pod.Labels = make(map[string]string)
104116

105-
var primaryContainerName string
106117
switch task.TaskTypeVersion {
107118
case 0:
108119
// Handles pod tasks when they are defined as Sidecar tasks and marshal the podspec using k8s proto.
@@ -114,32 +125,20 @@ func (sidecarPodBuilder) updatePodMetadata(ctx context.Context, pod *v1.Pod, tas
114125

115126
mergeMapInto(sidecarJob.Annotations, pod.Annotations)
116127
mergeMapInto(sidecarJob.Labels, pod.Labels)
117-
118-
primaryContainerName = sidecarJob.PrimaryContainerName
119-
case 1:
120-
// Handles pod tasks that marshal the pod spec to the task custom.
121-
containerName, err := getPrimaryContainerNameFromConfig(task)
122-
if err != nil {
123-
return err
124-
}
125-
126-
primaryContainerName = containerName
127128
default:
128129
// Handles pod tasks that marshal the pod spec to the k8s_pod task target.
129-
if task.GetK8SPod() == nil || task.GetK8SPod().Metadata != nil {
130+
if task.GetK8SPod() != nil && task.GetK8SPod().Metadata != nil {
130131
mergeMapInto(task.GetK8SPod().Metadata.Annotations, pod.Annotations)
131132
mergeMapInto(task.GetK8SPod().Metadata.Labels, pod.Labels)
132133
}
133-
134-
containerName, err := getPrimaryContainerNameFromConfig(task)
135-
if err != nil {
136-
return err
137-
}
138-
139-
primaryContainerName = containerName
140134
}
141135

142136
// validate pod and update resource requirements
137+
primaryContainerName, err := s.getPrimaryContainerName(task, taskCtx)
138+
if err != nil {
139+
return err
140+
}
141+
143142
if err := validateAndFinalizePodSpec(ctx, taskCtx, primaryContainerName, &pod.Spec); err != nil {
144143
return err
145144
}

0 commit comments

Comments
 (0)