Skip to content

Commit f13d52e

Browse files
authored
Merge pull request #1187 from MartinForReal/shafan/removegrpc
Refactor: remove grpc wrapper
2 parents 00d4d08 + 2afb58a commit f13d52e

File tree

9 files changed

+136
-199
lines changed

9 files changed

+136
-199
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ require (
132132
golang.org/x/crypto v0.17.0 // indirect
133133
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
134134
golang.org/x/oauth2 v0.11.0 // indirect
135-
golang.org/x/sync v0.5.0 // indirect
135+
golang.org/x/sync v0.5.0
136136
golang.org/x/sys v0.15.0 // indirect
137137
golang.org/x/term v0.15.0 // indirect
138138
golang.org/x/text v0.14.0 // indirect

pkg/blob/blob.go

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package blob
1818

1919
import (
2020
"context"
21+
"errors"
2122
"flag"
2223
"fmt"
2324
"os"
@@ -31,8 +32,9 @@ import (
3132
az "github.com/Azure/go-autorest/autorest/azure"
3233
"github.com/container-storage-interface/spec/lib/go/csi"
3334
"github.com/pborman/uuid"
35+
"google.golang.org/grpc"
3436
v1 "k8s.io/api/core/v1"
35-
"k8s.io/apimachinery/pkg/api/errors"
37+
apierror "k8s.io/apimachinery/pkg/api/errors"
3638
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3739
"k8s.io/client-go/kubernetes"
3840
"k8s.io/klog/v2"
@@ -301,17 +303,37 @@ func NewDriver(options *DriverOptions, kubeClient kubernetes.Interface, cloud *p
301303
}
302304

303305
// Run driver initialization
304-
func (d *Driver) Run(endpoint string, testBool bool) {
306+
func (d *Driver) Run(ctx context.Context, endpoint string) error {
305307
versionMeta, err := GetVersionYAML(d.Name)
306308
if err != nil {
307309
klog.Fatalf("%v", err)
308310
}
309311
klog.Infof("\nDRIVER INFORMATION:\n-------------------\n%s\n\nStreaming logs below:", versionMeta)
310-
311-
s := csicommon.NewNonBlockingGRPCServer()
312+
grpcInterceptor := grpc.UnaryInterceptor(csicommon.LogGRPC)
313+
opts := []grpc.ServerOption{
314+
grpcInterceptor,
315+
}
316+
s := grpc.NewServer(opts...)
317+
csi.RegisterIdentityServer(s, d)
318+
csi.RegisterControllerServer(s, d)
319+
csi.RegisterNodeServer(s, d)
320+
321+
go func() {
322+
//graceful shutdown
323+
<-ctx.Done()
324+
s.GracefulStop()
325+
}()
312326
// Driver d act as IdentityServer, ControllerServer and NodeServer
313-
s.Start(endpoint, d, d, d, testBool)
314-
s.Wait()
327+
listener, err := csicommon.Listen(ctx, endpoint)
328+
if err != nil {
329+
klog.Fatalf("failed to listen to endpoint, error: %v", err)
330+
}
331+
err = s.Serve(listener)
332+
if errors.Is(err, grpc.ErrServerStopped) {
333+
klog.Infof("gRPC server stopped serving")
334+
return nil
335+
}
336+
return err
315337
}
316338

317339
// GetContainerInfo get container info according to volume id
@@ -797,7 +819,7 @@ func setAzureCredentials(ctx context.Context, kubeClient kubernetes.Interface, a
797819
Type: "Opaque",
798820
}
799821
_, err := kubeClient.CoreV1().Secrets(secretNamespace).Create(ctx, secret, metav1.CreateOptions{})
800-
if errors.IsAlreadyExists(err) {
822+
if apierror.IsAlreadyExists(err) {
801823
err = nil
802824
}
803825
if err != nil {

pkg/blob/blob_test.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,12 @@ import (
2626
"sort"
2727
"strings"
2828
"testing"
29+
"time"
2930

3031
"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
3132
"github.com/golang/mock/gomock"
3233
"github.com/stretchr/testify/assert"
34+
"golang.org/x/sync/errgroup"
3335
v1api "k8s.io/api/core/v1"
3436
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3537
"k8s.io/client-go/kubernetes"
@@ -132,7 +134,15 @@ func TestRun(t *testing.T) {
132134
os.Setenv(DefaultAzureCredentialFileEnv, fakeCredFile)
133135

134136
d := NewFakeDriver()
135-
d.Run("tcp://127.0.0.1:0", true)
137+
138+
ctx, cancelFn := context.WithCancel(context.Background())
139+
var routines errgroup.Group
140+
routines.Go(func() error { return d.Run(ctx, "tcp://127.0.0.1:0") })
141+
time.Sleep(time.Millisecond * 500)
142+
cancelFn()
143+
time.Sleep(time.Millisecond * 500)
144+
err := routines.Wait()
145+
assert.Nil(t, err)
136146
},
137147
},
138148
{
@@ -159,7 +169,14 @@ func TestRun(t *testing.T) {
159169
d := NewFakeDriver()
160170
d.cloud = &azure.Cloud{}
161171
d.NodeID = ""
162-
d.Run("tcp://127.0.0.1:0", true)
172+
ctx, cancelFn := context.WithCancel(context.Background())
173+
var routines errgroup.Group
174+
routines.Go(func() error { return d.Run(ctx, "tcp://127.0.0.1:0") })
175+
time.Sleep(time.Millisecond * 500)
176+
cancelFn()
177+
time.Sleep(time.Millisecond * 500)
178+
err := routines.Wait()
179+
assert.Nil(t, err)
163180
},
164181
},
165182
}

pkg/blobplugin/main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ func handle() {
8888
if driver == nil {
8989
klog.Fatalln("Failed to initialize Azure Blob Storage CSI driver")
9090
}
91-
driver.Run(*endpoint, false)
91+
if err := driver.Run(context.Background(), *endpoint); err != nil {
92+
klog.Fatalf("Failed to run Azure Blob Storage CSI driver: %v", err)
93+
}
9294
}
9395

9496
func exportMetrics() {

pkg/csi-common/server.go

Lines changed: 0 additions & 118 deletions
This file was deleted.

pkg/csi-common/server_test.go

Lines changed: 0 additions & 67 deletions
This file was deleted.

pkg/csi-common/utils.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ package csicommon
1818

1919
import (
2020
"fmt"
21+
"net"
22+
"os"
23+
"runtime"
2124
"strings"
2225

2326
"github.com/container-storage-interface/spec/lib/go/csi"
@@ -37,6 +40,31 @@ func ParseEndpoint(ep string) (string, string, error) {
3740
return "", "", fmt.Errorf("Invalid endpoint: %v", ep)
3841
}
3942

43+
func Listen(ctx context.Context, endpoint string) (net.Listener, error) {
44+
proto, addr, err := ParseEndpoint(endpoint)
45+
if err != nil {
46+
klog.Errorf(err.Error())
47+
return nil, err
48+
}
49+
50+
if proto == "unix" {
51+
if runtime.GOOS != "windows" {
52+
addr = "/" + addr
53+
}
54+
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
55+
klog.Errorf("Failed to remove %s, error: %s", addr, err.Error())
56+
return nil, err
57+
}
58+
}
59+
listenConfig := net.ListenConfig{}
60+
listener, err := listenConfig.Listen(ctx, proto, addr)
61+
if err != nil {
62+
klog.Errorf("Failed to listen: %v", err)
63+
return nil, err
64+
}
65+
return listener, nil
66+
}
67+
4068
func NewVolumeCapabilityAccessMode(mode csi.VolumeCapability_AccessMode_Mode) *csi.VolumeCapability_AccessMode {
4169
return &csi.VolumeCapability_AccessMode{Mode: mode}
4270
}
@@ -70,7 +98,7 @@ func getLogLevel(method string) int32 {
7098
return 2
7199
}
72100

73-
func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
101+
func LogGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
74102
level := klog.Level(getLogLevel(info.FullMethod))
75103
klog.V(level).Infof("GRPC call: %s", info.FullMethod)
76104
klog.V(level).Infof("GRPC request: %s", protosanitizer.StripSecrets(req))

0 commit comments

Comments
 (0)