Skip to content

Commit dd820a1

Browse files
authored
Merge pull request kubernetes#127421 from liggitt/kubelet-merge
Merge kubelet configuration files using versioned serialized data
2 parents d6e5fb4 + 3153a73 commit dd820a1

File tree

4 files changed

+109
-19
lines changed

4 files changed

+109
-19
lines changed

cmd/kubelet/app/server.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"google.golang.org/grpc/codes"
4343
"google.golang.org/grpc/status"
4444
jsonpatch "gopkg.in/evanphx/json-patch.v4"
45+
4546
"k8s.io/klog/v2"
4647
"k8s.io/mount-utils"
4748

@@ -51,9 +52,11 @@ import (
5152
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
5253
oteltrace "go.opentelemetry.io/otel/trace"
5354
noopoteltrace "go.opentelemetry.io/otel/trace/noop"
55+
5456
v1 "k8s.io/api/core/v1"
5557
"k8s.io/apimachinery/pkg/api/resource"
5658
"k8s.io/apimachinery/pkg/runtime"
59+
"k8s.io/apimachinery/pkg/runtime/schema"
5760
"k8s.io/apimachinery/pkg/types"
5861
utilnet "k8s.io/apimachinery/pkg/util/net"
5962
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -94,6 +97,7 @@ import (
9497
"k8s.io/kubernetes/pkg/kubelet"
9598
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
9699
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
100+
kubeletconfigv1beta1conversion "k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1"
97101
kubeletconfigvalidation "k8s.io/kubernetes/pkg/kubelet/apis/config/validation"
98102
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
99103
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
@@ -316,7 +320,15 @@ is checked every 20 seconds (also configurable with a flag).`,
316320
// potentially overriding the previous values.
317321
func mergeKubeletConfigurations(kubeletConfig *kubeletconfiginternal.KubeletConfiguration, kubeletDropInConfigDir string) error {
318322
const dropinFileExtension = ".conf"
319-
baseKubeletConfigJSON, err := json.Marshal(kubeletConfig)
323+
324+
// TODO: move the "internal --> versioned --> merge --> default --> internal" operation inside the loop,
325+
// and use the version of each drop-in file as the versioned target once config files can be in versions other than just v1beta1
326+
versionedConfig := &kubeletconfigv1beta1.KubeletConfiguration{}
327+
if err := kubeletconfigv1beta1conversion.Convert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(kubeletConfig, versionedConfig, nil); err != nil {
328+
return err
329+
}
330+
331+
baseKubeletConfigJSON, err := json.Marshal(versionedConfig)
320332
if err != nil {
321333
return fmt.Errorf("failed to marshal base config: %w", err)
322334
}
@@ -326,10 +338,17 @@ func mergeKubeletConfigurations(kubeletConfig *kubeletconfiginternal.KubeletConf
326338
return err
327339
}
328340
if !info.IsDir() && filepath.Ext(info.Name()) == dropinFileExtension {
329-
dropinConfigJSON, err := loadDropinConfigFileIntoJSON(path)
341+
dropinConfigJSON, gvk, err := loadDropinConfigFileIntoJSON(path)
330342
if err != nil {
331343
return fmt.Errorf("failed to load kubelet dropin file, path: %s, error: %w", path, err)
332344
}
345+
if gvk == nil || gvk.Empty() {
346+
return fmt.Errorf("failed to load kubelet dropin file, path: %s, no apiVersion/kind", path)
347+
}
348+
// TODO: expand this once more than a single kubelet config version exists
349+
if *gvk != kubeletconfigv1beta1.SchemeGroupVersion.WithKind("KubeletConfiguration") {
350+
return fmt.Errorf("failed to load kubelet dropin file, path: %s, unknown apiVersion/kind: %v", path, gvk.String())
351+
}
333352
mergedConfigJSON, err := jsonpatch.MergePatch(baseKubeletConfigJSON, dropinConfigJSON)
334353
if err != nil {
335354
return fmt.Errorf("failed to merge drop-in and current config: %w", err)
@@ -341,9 +360,18 @@ func mergeKubeletConfigurations(kubeletConfig *kubeletconfiginternal.KubeletConf
341360
return fmt.Errorf("failed to walk through kubelet dropin directory %q: %w", kubeletDropInConfigDir, err)
342361
}
343362

344-
if err := json.Unmarshal(baseKubeletConfigJSON, kubeletConfig); err != nil {
363+
// reset versioned config and decode
364+
versionedConfig = &kubeletconfigv1beta1.KubeletConfiguration{}
365+
if err := json.Unmarshal(baseKubeletConfigJSON, versionedConfig); err != nil {
345366
return fmt.Errorf("failed to unmarshal merged JSON into kubelet configuration: %w", err)
346367
}
368+
// apply defaulting after decoding
369+
kubeletconfigv1beta1conversion.SetDefaults_KubeletConfiguration(versionedConfig)
370+
// convert back to internal config
371+
if err := kubeletconfigv1beta1conversion.Convert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(versionedConfig, kubeletConfig, nil); err != nil {
372+
return fmt.Errorf("failed to convert merged config to internal kubelet configuration: %w", err)
373+
}
374+
347375
return nil
348376
}
349377

@@ -423,16 +451,16 @@ func loadConfigFile(name string) (*kubeletconfiginternal.KubeletConfiguration, e
423451
return kc, err
424452
}
425453

426-
func loadDropinConfigFileIntoJSON(name string) ([]byte, error) {
454+
func loadDropinConfigFileIntoJSON(name string) ([]byte, *schema.GroupVersionKind, error) {
427455
const errFmt = "failed to load drop-in kubelet config file %s, error %v"
428456
// compute absolute path based on current working dir
429457
kubeletConfigFile, err := filepath.Abs(name)
430458
if err != nil {
431-
return nil, fmt.Errorf(errFmt, name, err)
459+
return nil, nil, fmt.Errorf(errFmt, name, err)
432460
}
433461
loader, err := configfiles.NewFsLoader(&utilfs.DefaultFs{}, kubeletConfigFile)
434462
if err != nil {
435-
return nil, fmt.Errorf(errFmt, name, err)
463+
return nil, nil, fmt.Errorf(errFmt, name, err)
436464
}
437465
return loader.LoadIntoJSON()
438466
}

cmd/kubelet/app/server_test.go

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/stretchr/testify/require"
2727
"gopkg.in/yaml.v2"
28+
2829
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2930
"k8s.io/kubernetes/cmd/kubelet/app/options"
3031
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
@@ -80,6 +81,7 @@ func TestMergeKubeletConfigurations(t *testing.T) {
8081
overwrittenConfigFields map[string]interface{}
8182
cliArgs []string
8283
name string
84+
expectMergeError string
8385
}{
8486
{
8587
kubeletConfig: &kubeletconfiginternal.KubeletConfiguration{
@@ -222,6 +224,55 @@ readOnlyPort: 10255
222224
},
223225
name: "cli args override kubelet.conf",
224226
},
227+
{
228+
kubeletConfig: &kubeletconfiginternal.KubeletConfiguration{
229+
ResolverConfig: "original",
230+
Port: 123,
231+
ReadOnlyPort: 234,
232+
},
233+
dropin1: `
234+
apiVersion: kubelet.config.k8s.io/v1beta1
235+
kind: KubeletConfiguration
236+
resolvConf: dropin1
237+
`,
238+
dropin2: `
239+
apiVersion: kubelet.config.k8s.io/v1beta1
240+
kind: KubeletConfiguration
241+
port: 0
242+
`,
243+
overwrittenConfigFields: map[string]interface{}{
244+
"ResolverConfig": "dropin1", // overridden by dropin1
245+
"Port": int32(10250), // reset to 0 by dropin2, then re-defaulted
246+
"ReadOnlyPort": int32(234), // preserved from original config
247+
},
248+
name: "json conversion is correct",
249+
},
250+
{
251+
kubeletConfig: &kubeletconfiginternal.KubeletConfiguration{},
252+
dropin1: `
253+
apiVersion: kubelet.config.k8s.io/v1beta2
254+
kind: KubeletConfiguration
255+
`,
256+
name: "invalid drop-in apiVersion",
257+
expectMergeError: `unknown apiVersion/kind: kubelet.config.k8s.io/v1beta2, Kind=KubeletConfiguration`,
258+
},
259+
{
260+
kubeletConfig: &kubeletconfiginternal.KubeletConfiguration{},
261+
dropin1: `
262+
apiVersion: kubelet.config.k8s.io/v1beta1
263+
kind: KubeletConfiguration2
264+
`,
265+
name: "invalid drop-in kind",
266+
expectMergeError: `unknown apiVersion/kind: kubelet.config.k8s.io/v1beta1, Kind=KubeletConfiguration2`,
267+
},
268+
{
269+
kubeletConfig: &kubeletconfiginternal.KubeletConfiguration{},
270+
dropin1: `
271+
port: 123
272+
`,
273+
name: "empty drop-in apiVersion/kind",
274+
expectMergeError: `'Kind' is missing`,
275+
},
225276
}
226277

227278
for _, test := range testCases {
@@ -248,15 +299,24 @@ readOnlyPort: 10255
248299
err := os.Mkdir(kubeletConfDir, 0755)
249300
require.NoError(t, err, "Failed to create kubelet.conf.d directory")
250301

251-
err = os.WriteFile(filepath.Join(kubeletConfDir, "10-kubelet.conf"), []byte(test.dropin1), 0644)
252-
require.NoError(t, err, "failed to create config from a yaml file")
302+
if len(test.dropin1) > 0 {
303+
err = os.WriteFile(filepath.Join(kubeletConfDir, "10-kubelet.conf"), []byte(test.dropin1), 0644)
304+
require.NoError(t, err, "failed to create config from a yaml file")
305+
}
253306

254-
err = os.WriteFile(filepath.Join(kubeletConfDir, "20-kubelet.conf"), []byte(test.dropin2), 0644)
255-
require.NoError(t, err, "failed to create config from a yaml file")
307+
if len(test.dropin2) > 0 {
308+
err = os.WriteFile(filepath.Join(kubeletConfDir, "20-kubelet.conf"), []byte(test.dropin2), 0644)
309+
require.NoError(t, err, "failed to create config from a yaml file")
310+
}
256311

257312
// Merge the kubelet configurations
258313
err = mergeKubeletConfigurations(kubeletConfig, kubeletConfDir)
259-
require.NoError(t, err, "failed to merge kubelet drop-in configs")
314+
if test.expectMergeError == "" {
315+
require.NoError(t, err, "failed to merge kubelet drop-in configs")
316+
} else {
317+
require.Error(t, err)
318+
require.Contains(t, err.Error(), test.expectMergeError)
319+
}
260320
}
261321

262322
// Use kubelet config flag precedence

pkg/kubelet/kubeletconfig/configfiles/configfiles.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"path/filepath"
2222

23+
"k8s.io/apimachinery/pkg/runtime/schema"
2324
"k8s.io/apimachinery/pkg/runtime/serializer"
2425
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
2526
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
@@ -33,7 +34,7 @@ type Loader interface {
3334
Load() (*kubeletconfig.KubeletConfiguration, error)
3435
// LoadIntoJSON loads and returns the KubeletConfiguration from the storage layer, or an error if a configuration could not be
3536
// loaded. It returns the configuration as a JSON byte slice
36-
LoadIntoJSON() ([]byte, error)
37+
LoadIntoJSON() ([]byte, *schema.GroupVersionKind, error)
3738
}
3839

3940
// fsLoader loads configuration from `configDir`
@@ -81,15 +82,15 @@ func (loader *fsLoader) Load() (*kubeletconfig.KubeletConfiguration, error) {
8182
return kc, nil
8283
}
8384

84-
func (loader *fsLoader) LoadIntoJSON() ([]byte, error) {
85+
func (loader *fsLoader) LoadIntoJSON() ([]byte, *schema.GroupVersionKind, error) {
8586
data, err := loader.fs.ReadFile(loader.kubeletFile)
8687
if err != nil {
87-
return nil, fmt.Errorf("failed to read drop-in kubelet config file %q, error: %v", loader.kubeletFile, err)
88+
return nil, nil, fmt.Errorf("failed to read drop-in kubelet config file %q, error: %w", loader.kubeletFile, err)
8889
}
8990

9091
// no configuration is an error, some parameters are required
9192
if len(data) == 0 {
92-
return nil, fmt.Errorf("kubelet config file %q was empty", loader.kubeletFile)
93+
return nil, nil, fmt.Errorf("kubelet config file %q was empty", loader.kubeletFile)
9394
}
9495

9596
return utilcodec.DecodeKubeletConfigurationIntoJSON(loader.kubeletCodecs, data)

pkg/kubelet/kubeletconfig/util/codec/codec.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,15 @@ func DecodeKubeletConfiguration(kubeletCodecs *serializer.CodecFactory, data []b
109109
}
110110

111111
// DecodeKubeletConfigurationIntoJSON decodes a serialized KubeletConfiguration to the internal type.
112-
func DecodeKubeletConfigurationIntoJSON(kubeletCodecs *serializer.CodecFactory, data []byte) ([]byte, error) {
112+
func DecodeKubeletConfigurationIntoJSON(kubeletCodecs *serializer.CodecFactory, data []byte) ([]byte, *schema.GroupVersionKind, error) {
113113
// The UniversalDecoder runs defaulting and returns the internal type by default.
114-
obj, _, err := kubeletCodecs.UniversalDecoder().Decode(data, nil, &unstructured.Unstructured{})
114+
obj, gvk, err := kubeletCodecs.UniversalDecoder().Decode(data, nil, &unstructured.Unstructured{})
115115
if err != nil {
116-
return nil, err
116+
return nil, nil, err
117117
}
118118

119119
objT := obj.(*unstructured.Unstructured)
120120

121-
return json.Marshal(objT.Object)
121+
jsonData, err := json.Marshal(objT.Object)
122+
return jsonData, gvk, err
122123
}

0 commit comments

Comments
 (0)