Skip to content

Commit 7e7af4a

Browse files
MartinForRealcvvz
authored andcommitted
refactor:remove grpc wrapper
1 parent c9929b8 commit 7e7af4a

File tree

9 files changed

+136
-199
lines changed

9 files changed

+136
-199
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ require (
138138
golang.org/x/crypto v0.17.0 // indirect
139139
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
140140
golang.org/x/oauth2 v0.11.0 // indirect
141-
golang.org/x/sync v0.5.0 // indirect
141+
golang.org/x/sync v0.5.0
142142
golang.org/x/sys v0.15.0 // indirect
143143
golang.org/x/term v0.15.0 // indirect
144144
golang.org/x/text v0.14.0 // indirect

pkg/blob/blob.go

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package blob
1818

1919
import (
2020
"context"
21+
"errors"
2122
"flag"
2223
"fmt"
2324
"os"
@@ -31,8 +32,9 @@ import (
3132
az "github.com/Azure/go-autorest/autorest/azure"
3233
"github.com/container-storage-interface/spec/lib/go/csi"
3334
"github.com/pborman/uuid"
35+
"google.golang.org/grpc"
3436
v1 "k8s.io/api/core/v1"
35-
"k8s.io/apimachinery/pkg/api/errors"
37+
apierror "k8s.io/apimachinery/pkg/api/errors"
3638
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3739
"k8s.io/client-go/kubernetes"
3840
"k8s.io/klog/v2"
@@ -306,17 +308,37 @@ func NewDriver(options *DriverOptions, kubeClient kubernetes.Interface, cloud *p
306308
}
307309

308310
// Run driver initialization
309-
func (d *Driver) Run(endpoint string, testBool bool) {
311+
func (d *Driver) Run(ctx context.Context, endpoint string) error {
310312
versionMeta, err := GetVersionYAML(d.Name)
311313
if err != nil {
312314
klog.Fatalf("%v", err)
313315
}
314316
klog.Infof("\nDRIVER INFORMATION:\n-------------------\n%s\n\nStreaming logs below:", versionMeta)
315-
316-
s := csicommon.NewNonBlockingGRPCServer()
317+
grpcInterceptor := grpc.UnaryInterceptor(csicommon.LogGRPC)
318+
opts := []grpc.ServerOption{
319+
grpcInterceptor,
320+
}
321+
s := grpc.NewServer(opts...)
322+
csi.RegisterIdentityServer(s, d)
323+
csi.RegisterControllerServer(s, d)
324+
csi.RegisterNodeServer(s, d)
325+
326+
go func() {
327+
//graceful shutdown
328+
<-ctx.Done()
329+
s.GracefulStop()
330+
}()
317331
// Driver d act as IdentityServer, ControllerServer and NodeServer
318-
s.Start(endpoint, d, d, d, testBool)
319-
s.Wait()
332+
listener, err := csicommon.Listen(ctx, endpoint)
333+
if err != nil {
334+
klog.Fatalf("failed to listen to endpoint, error: %v", err)
335+
}
336+
err = s.Serve(listener)
337+
if errors.Is(err, grpc.ErrServerStopped) {
338+
klog.Infof("gRPC server stopped serving")
339+
return nil
340+
}
341+
return err
320342
}
321343

322344
// GetContainerInfo get container info according to volume id
@@ -802,7 +824,7 @@ func setAzureCredentials(ctx context.Context, kubeClient kubernetes.Interface, a
802824
Type: "Opaque",
803825
}
804826
_, err := kubeClient.CoreV1().Secrets(secretNamespace).Create(ctx, secret, metav1.CreateOptions{})
805-
if errors.IsAlreadyExists(err) {
827+
if apierror.IsAlreadyExists(err) {
806828
err = nil
807829
}
808830
if err != nil {

pkg/blob/blob_test.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,12 @@ import (
2626
"sort"
2727
"strings"
2828
"testing"
29+
"time"
2930

3031
"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
3132
"github.com/golang/mock/gomock"
3233
"github.com/stretchr/testify/assert"
34+
"golang.org/x/sync/errgroup"
3335
v1api "k8s.io/api/core/v1"
3436
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3537
"k8s.io/client-go/kubernetes"
@@ -132,7 +134,15 @@ func TestRun(t *testing.T) {
132134
os.Setenv(DefaultAzureCredentialFileEnv, fakeCredFile)
133135

134136
d := NewFakeDriver()
135-
d.Run("tcp://127.0.0.1:0", true)
137+
138+
ctx, cancelFn := context.WithCancel(context.Background())
139+
var routines errgroup.Group
140+
routines.Go(func() error { return d.Run(ctx, "tcp://127.0.0.1:0") })
141+
time.Sleep(time.Millisecond * 500)
142+
cancelFn()
143+
time.Sleep(time.Millisecond * 500)
144+
err := routines.Wait()
145+
assert.Nil(t, err)
136146
},
137147
},
138148
{
@@ -159,7 +169,14 @@ func TestRun(t *testing.T) {
159169
d := NewFakeDriver()
160170
d.cloud = &azure.Cloud{}
161171
d.NodeID = ""
162-
d.Run("tcp://127.0.0.1:0", true)
172+
ctx, cancelFn := context.WithCancel(context.Background())
173+
var routines errgroup.Group
174+
routines.Go(func() error { return d.Run(ctx, "tcp://127.0.0.1:0") })
175+
time.Sleep(time.Millisecond * 500)
176+
cancelFn()
177+
time.Sleep(time.Millisecond * 500)
178+
err := routines.Wait()
179+
assert.Nil(t, err)
163180
},
164181
},
165182
}

pkg/blobplugin/main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ func handle() {
8888
if driver == nil {
8989
klog.Fatalln("Failed to initialize Azure Blob Storage CSI driver")
9090
}
91-
driver.Run(*endpoint, false)
91+
if err := driver.Run(context.Background(), *endpoint); err != nil {
92+
klog.Fatalf("Failed to run Azure Blob Storage CSI driver: %v", err)
93+
}
9294
}
9395

9496
func exportMetrics() {

pkg/csi-common/server.go

Lines changed: 0 additions & 118 deletions
This file was deleted.

pkg/csi-common/server_test.go

Lines changed: 0 additions & 67 deletions
This file was deleted.

pkg/csi-common/utils.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ package csicommon
1818

1919
import (
2020
"fmt"
21+
"net"
22+
"os"
23+
"runtime"
2124
"strings"
2225

2326
"github.com/container-storage-interface/spec/lib/go/csi"
@@ -37,6 +40,31 @@ func ParseEndpoint(ep string) (string, string, error) {
3740
return "", "", fmt.Errorf("Invalid endpoint: %v", ep)
3841
}
3942

43+
func Listen(ctx context.Context, endpoint string) (net.Listener, error) {
44+
proto, addr, err := ParseEndpoint(endpoint)
45+
if err != nil {
46+
klog.Errorf(err.Error())
47+
return nil, err
48+
}
49+
50+
if proto == "unix" {
51+
if runtime.GOOS != "windows" {
52+
addr = "/" + addr
53+
}
54+
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
55+
klog.Errorf("Failed to remove %s, error: %s", addr, err.Error())
56+
return nil, err
57+
}
58+
}
59+
listenConfig := net.ListenConfig{}
60+
listener, err := listenConfig.Listen(ctx, proto, addr)
61+
if err != nil {
62+
klog.Errorf("Failed to listen: %v", err)
63+
return nil, err
64+
}
65+
return listener, nil
66+
}
67+
4068
func NewVolumeCapabilityAccessMode(mode csi.VolumeCapability_AccessMode_Mode) *csi.VolumeCapability_AccessMode {
4169
return &csi.VolumeCapability_AccessMode{Mode: mode}
4270
}
@@ -70,7 +98,7 @@ func getLogLevel(method string) int32 {
7098
return 2
7199
}
72100

73-
func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
101+
func LogGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
74102
level := klog.Level(getLogLevel(info.FullMethod))
75103
klog.V(level).Infof("GRPC call: %s", info.FullMethod)
76104
klog.V(level).Infof("GRPC request: %s", protosanitizer.StripSecrets(req))

0 commit comments

Comments
 (0)