Skip to content

Commit c0974d7

Browse files
authored
Merge pull request kubernetes#69516 from feiskyer/win-npipe
Switch windows runtime endpoints to npipe
2 parents e7e6c8a + 053b71d commit c0974d7

File tree

7 files changed

+205
-37
lines changed

7 files changed

+205
-37
lines changed

cmd/kubelet/app/options/options.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ func NewKubeletFlags() *KubeletFlags {
212212
if runtime.GOOS == "linux" {
213213
remoteRuntimeEndpoint = "unix:///var/run/dockershim.sock"
214214
} else if runtime.GOOS == "windows" {
215-
remoteRuntimeEndpoint = "tcp://localhost:3735"
215+
remoteRuntimeEndpoint = "npipe:////./pipe/dockershim"
216216
}
217217

218218
return &KubeletFlags{
@@ -376,8 +376,8 @@ func (f *KubeletFlags) AddFlags(mainfs *pflag.FlagSet) {
376376
fs.StringVar(&f.ExperimentalMounterPath, "experimental-mounter-path", f.ExperimentalMounterPath, "[Experimental] Path of mounter binary. Leave empty to use the default mount.")
377377
fs.StringSliceVar(&f.AllowedUnsafeSysctls, "allowed-unsafe-sysctls", f.AllowedUnsafeSysctls, "Comma-separated whitelist of unsafe sysctls or unsafe sysctl patterns (ending in *). Use these at your own risk. Sysctls feature gate is enabled by default.")
378378
fs.BoolVar(&f.ExperimentalKernelMemcgNotification, "experimental-kernel-memcg-notification", f.ExperimentalKernelMemcgNotification, "If enabled, the kubelet will integrate with the kernel memcg notification to determine if memory eviction thresholds are crossed rather than polling.")
379-
fs.StringVar(&f.RemoteRuntimeEndpoint, "container-runtime-endpoint", f.RemoteRuntimeEndpoint, "[Experimental] The endpoint of remote runtime service. Currently unix socket is supported on Linux, and tcp is supported on windows. Examples:'unix:///var/run/dockershim.sock', 'tcp://localhost:3735'")
380-
fs.StringVar(&f.RemoteImageEndpoint, "image-service-endpoint", f.RemoteImageEndpoint, "[Experimental] The endpoint of remote image service. If not specified, it will be the same with container-runtime-endpoint by default. Currently unix socket is supported on Linux, and tcp is supported on windows. Examples:'unix:///var/run/dockershim.sock', 'tcp://localhost:3735'")
379+
fs.StringVar(&f.RemoteRuntimeEndpoint, "container-runtime-endpoint", f.RemoteRuntimeEndpoint, "[Experimental] The endpoint of remote runtime service. Currently unix socket and tcp endpoints are supported on Linux, while npipe and tcp endpoints are supported on windows. Examples:'unix:///var/run/dockershim.sock', 'npipe:////./pipe/dockershim'")
380+
fs.StringVar(&f.RemoteImageEndpoint, "image-service-endpoint", f.RemoteImageEndpoint, "[Experimental] The endpoint of remote image service. If not specified, it will be the same with container-runtime-endpoint by default. Currently unix socket and tcp endpoints are supported on Linux, while npipe and tcp endpoints are supported on windows. Examples:'unix:///var/run/dockershim.sock', 'npipe:////./pipe/dockershim'")
381381
fs.BoolVar(&f.ExperimentalCheckNodeCapabilitiesBeforeMount, "experimental-check-node-capabilities-before-mount", f.ExperimentalCheckNodeCapabilitiesBeforeMount, "[Experimental] if set true, the kubelet will check the underlying node for required components (binaries, etc.) before performing the mount")
382382
fs.BoolVar(&f.ExperimentalNodeAllocatableIgnoreEvictionThreshold, "experimental-allocatable-ignore-eviction", f.ExperimentalNodeAllocatableIgnoreEvictionThreshold, "When set to 'true', Hard Eviction Thresholds will be ignored while calculating Node Allocatable. See https://kubernetes.io/docs/tasks/administer-cluster/reserve-compute-resources/ for more details. [default=false]")
383383
bindableNodeLabels := flag.ConfigurationMap(f.NodeLabels)

pkg/kubelet/util/BUILD

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,27 @@ load(
88

99
go_test(
1010
name = "go_default_test",
11-
srcs = ["util_test.go"],
12-
embed = [":go_default_library"],
13-
deps = [
14-
"//vendor/github.com/stretchr/testify/assert:go_default_library",
11+
srcs = [
12+
"util_unix_test.go",
13+
"util_windows_test.go",
1514
],
15+
embed = [":go_default_library"],
16+
deps = select({
17+
"@io_bazel_rules_go//go/platform:darwin": [
18+
"//vendor/github.com/stretchr/testify/assert:go_default_library",
19+
],
20+
"@io_bazel_rules_go//go/platform:freebsd": [
21+
"//vendor/github.com/stretchr/testify/assert:go_default_library",
22+
],
23+
"@io_bazel_rules_go//go/platform:linux": [
24+
"//vendor/github.com/stretchr/testify/assert:go_default_library",
25+
],
26+
"@io_bazel_rules_go//go/platform:windows": [
27+
"//vendor/github.com/stretchr/testify/assert:go_default_library",
28+
"//vendor/github.com/stretchr/testify/require:go_default_library",
29+
],
30+
"//conditions:default": [],
31+
}),
1632
)
1733

1834
go_library(
@@ -40,6 +56,9 @@ go_library(
4056
"//vendor/github.com/golang/glog:go_default_library",
4157
"//vendor/golang.org/x/sys/unix:go_default_library",
4258
],
59+
"@io_bazel_rules_go//go/platform:windows": [
60+
"//vendor/github.com/Microsoft/go-winio:go_default_library",
61+
],
4362
"//conditions:default": [],
4463
}),
4564
)

pkg/kubelet/util/util.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@ limitations under the License.
1717
package util
1818

1919
import (
20-
"fmt"
21-
"net/url"
22-
2320
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2421
)
2522

@@ -28,20 +25,3 @@ import (
2825
func FromApiserverCache(opts *metav1.GetOptions) {
2926
opts.ResourceVersion = "0"
3027
}
31-
32-
func parseEndpoint(endpoint string) (string, string, error) {
33-
u, err := url.Parse(endpoint)
34-
if err != nil {
35-
return "", "", err
36-
}
37-
38-
if u.Scheme == "tcp" {
39-
return "tcp", u.Host, nil
40-
} else if u.Scheme == "unix" {
41-
return "unix", u.Path, nil
42-
} else if u.Scheme == "" {
43-
return "", "", fmt.Errorf("Using %q as endpoint is deprecated, please consider using full url format", endpoint)
44-
} else {
45-
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
46-
}
47-
}

pkg/kubelet/util/util_unix.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package util
2121
import (
2222
"fmt"
2323
"net"
24+
"net/url"
2425
"os"
2526
"time"
2627

@@ -77,3 +78,24 @@ func parseEndpointWithFallbackProtocol(endpoint string, fallbackProtocol string)
7778
}
7879
return
7980
}
81+
82+
func parseEndpoint(endpoint string) (string, string, error) {
83+
u, err := url.Parse(endpoint)
84+
if err != nil {
85+
return "", "", err
86+
}
87+
88+
switch u.Scheme {
89+
case "tcp":
90+
return "tcp", u.Host, nil
91+
92+
case "unix":
93+
return "unix", u.Path, nil
94+
95+
case "":
96+
return "", "", fmt.Errorf("Using %q as endpoint is deprecated, please consider using full url format", endpoint)
97+
98+
default:
99+
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
100+
}
101+
}

pkg/kubelet/util/util_test.go renamed to pkg/kubelet/util/util_unix_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
// +build freebsd linux darwin
2+
13
/*
2-
Copyright 2017 The Kubernetes Authors.
4+
Copyright 2018 The Kubernetes Authors.
35
46
Licensed under the Apache License, Version 2.0 (the "License");
57
you may not use this file except in compliance with the License.
@@ -39,6 +41,11 @@ func TestParseEndpoint(t *testing.T) {
3941
expectedProtocol: "tcp",
4042
expectedAddr: "localhost:15880",
4143
},
44+
{
45+
endpoint: "npipe://./pipe/mypipe",
46+
expectedProtocol: "npipe",
47+
expectError: true,
48+
},
4249
{
4350
endpoint: "tcp1://abc",
4451
expectedProtocol: "tcp1",

pkg/kubelet/util/util_windows.go

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,37 +21,85 @@ package util
2121
import (
2222
"fmt"
2323
"net"
24+
"net/url"
25+
"strings"
2426
"time"
27+
28+
"github.com/Microsoft/go-winio"
2529
)
2630

2731
const (
28-
tcpProtocol = "tcp"
32+
tcpProtocol = "tcp"
33+
npipeProtocol = "npipe"
2934
)
3035

3136
func CreateListener(endpoint string) (net.Listener, error) {
3237
protocol, addr, err := parseEndpoint(endpoint)
3338
if err != nil {
3439
return nil, err
3540
}
36-
if protocol != tcpProtocol {
37-
return nil, fmt.Errorf("only support tcp endpoint")
38-
}
3941

40-
return net.Listen(protocol, addr)
42+
switch protocol {
43+
case tcpProtocol:
44+
return net.Listen(tcpProtocol, addr)
45+
46+
case npipeProtocol:
47+
return winio.ListenPipe(addr, nil)
48+
49+
default:
50+
return nil, fmt.Errorf("only support tcp and npipe endpoint")
51+
}
4152
}
4253

4354
func GetAddressAndDialer(endpoint string) (string, func(addr string, timeout time.Duration) (net.Conn, error), error) {
4455
protocol, addr, err := parseEndpoint(endpoint)
4556
if err != nil {
4657
return "", nil, err
4758
}
48-
if protocol != tcpProtocol {
49-
return "", nil, fmt.Errorf("only support tcp endpoint")
59+
60+
if protocol == tcpProtocol {
61+
return addr, tcpDial, nil
62+
}
63+
64+
if protocol == npipeProtocol {
65+
return addr, npipeDial, nil
5066
}
5167

52-
return addr, dial, nil
68+
return "", nil, fmt.Errorf("only support tcp and npipe endpoint")
5369
}
5470

55-
func dial(addr string, timeout time.Duration) (net.Conn, error) {
71+
func tcpDial(addr string, timeout time.Duration) (net.Conn, error) {
5672
return net.DialTimeout(tcpProtocol, addr, timeout)
5773
}
74+
75+
func npipeDial(addr string, timeout time.Duration) (net.Conn, error) {
76+
return winio.DialPipe(addr, &timeout)
77+
}
78+
79+
func parseEndpoint(endpoint string) (string, string, error) {
80+
// url.Parse doesn't recognize \, so replace with / first.
81+
endpoint = strings.Replace(endpoint, "\\", "/", -1)
82+
u, err := url.Parse(endpoint)
83+
if err != nil {
84+
return "", "", err
85+
}
86+
87+
if u.Scheme == "tcp" {
88+
return "tcp", u.Host, nil
89+
} else if u.Scheme == "npipe" {
90+
if strings.HasPrefix(u.Path, "//./pipe") {
91+
return "npipe", u.Path, nil
92+
}
93+
94+
// fallback host if not provided.
95+
host := u.Host
96+
if host == "" {
97+
host = "."
98+
}
99+
return "npipe", fmt.Sprintf("//%s%s", host, u.Path), nil
100+
} else if u.Scheme == "" {
101+
return "", "", fmt.Errorf("Using %q as endpoint is deprecated, please consider using full url format", endpoint)
102+
} else {
103+
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
104+
}
105+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// +build windows
2+
3+
/*
4+
Copyright 2018 The Kubernetes Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package util
20+
21+
import (
22+
"testing"
23+
24+
"github.com/stretchr/testify/assert"
25+
"github.com/stretchr/testify/require"
26+
)
27+
28+
func TestParseEndpoint(t *testing.T) {
29+
tests := []struct {
30+
endpoint string
31+
expectError bool
32+
expectedProtocol string
33+
expectedAddr string
34+
}{
35+
{
36+
endpoint: "unix:///tmp/s1.sock",
37+
expectedProtocol: "unix",
38+
expectError: true,
39+
},
40+
{
41+
endpoint: "tcp://localhost:15880",
42+
expectedProtocol: "tcp",
43+
expectedAddr: "localhost:15880",
44+
},
45+
{
46+
endpoint: "npipe://./pipe/mypipe",
47+
expectedProtocol: "npipe",
48+
expectedAddr: "//./pipe/mypipe",
49+
},
50+
{
51+
endpoint: "npipe:////./pipe/mypipe2",
52+
expectedProtocol: "npipe",
53+
expectedAddr: "//./pipe/mypipe2",
54+
},
55+
{
56+
endpoint: "npipe:/pipe/mypipe3",
57+
expectedProtocol: "npipe",
58+
expectedAddr: "//./pipe/mypipe3",
59+
},
60+
{
61+
endpoint: "npipe:\\\\.\\pipe\\mypipe4",
62+
expectedProtocol: "npipe",
63+
expectedAddr: "//./pipe/mypipe4",
64+
},
65+
{
66+
endpoint: "npipe:\\pipe\\mypipe5",
67+
expectedProtocol: "npipe",
68+
expectedAddr: "//./pipe/mypipe5",
69+
},
70+
{
71+
endpoint: "tcp1://abc",
72+
expectedProtocol: "tcp1",
73+
expectError: true,
74+
},
75+
{
76+
endpoint: "a b c",
77+
expectError: true,
78+
},
79+
}
80+
81+
for _, test := range tests {
82+
protocol, addr, err := parseEndpoint(test.endpoint)
83+
assert.Equal(t, test.expectedProtocol, protocol)
84+
if test.expectError {
85+
assert.NotNil(t, err, "Expect error during parsing %q", test.endpoint)
86+
continue
87+
}
88+
require.Nil(t, err, "Expect no error during parsing %q", test.endpoint)
89+
assert.Equal(t, test.expectedAddr, addr)
90+
}
91+
92+
}

0 commit comments

Comments
 (0)