Skip to content

Commit b2650ec

Browse files
authored
Update goroutines to use errgroup (#802)
1 parent ce87199 commit b2650ec

File tree

1 file changed

+47
-39
lines changed

1 file changed

+47
-39
lines changed

cmd/volumeprovider/app/app.go

Lines changed: 47 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ import (
99
"fmt"
1010
"net"
1111
"os"
12-
"sync"
1312
"time"
1413

14+
"github.com/go-logr/logr"
1515
providerapi "github.com/ironcore-dev/ceph-provider/api"
1616
"github.com/ironcore-dev/ceph-provider/internal/ceph"
1717
"github.com/ironcore-dev/ceph-provider/internal/controllers"
@@ -27,6 +27,7 @@ import (
2727
eventrecorder "github.com/ironcore-dev/provider-utils/eventutils/recorder"
2828
"github.com/spf13/cobra"
2929
"github.com/spf13/pflag"
30+
"golang.org/x/sync/errgroup"
3031
"google.golang.org/grpc"
3132
ctrl "sigs.k8s.io/controller-runtime"
3233
"sigs.k8s.io/controller-runtime/pkg/log/zap"
@@ -174,8 +175,6 @@ func Run(ctx context.Context, opts Options) error {
174175
return err
175176
}
176177

177-
var wg sync.WaitGroup
178-
179178
cleanup, err := configureCephAuth(&opts.Ceph)
180179
if err != nil {
181180
return fmt.Errorf("failed to configure ceph auth: %w", err)
@@ -274,14 +273,16 @@ func Run(ctx context.Context, opts Options) error {
274273
return fmt.Errorf("failed to initialize image reconciler: %w", err)
275274
}
276275

277-
wg.Add(1)
278-
go func() {
279-
defer wg.Done()
276+
g, ctx := errgroup.WithContext(ctx)
277+
278+
g.Go(func() error {
280279
setupLog.Info("Starting image reconciler")
281280
if err := imageReconciler.Start(ctx); err != nil {
282-
log.Error(err, "failed to start image reconciler")
281+
setupLog.Error(err, "failed to start image reconciler")
282+
return err
283283
}
284-
}()
284+
return nil
285+
})
285286

286287
snapshotReconciler, err := controllers.NewSnapshotReconciler(
287288
log.WithName("snapshot-reconciler"),
@@ -300,40 +301,38 @@ func Run(ctx context.Context, opts Options) error {
300301
return fmt.Errorf("failed to initialize snapshot reconciler: %w", err)
301302
}
302303

303-
wg.Add(1)
304-
go func() {
305-
defer wg.Done()
304+
g.Go(func() error {
306305
setupLog.Info("Starting snapshot reconciler")
307306
if err := snapshotReconciler.Start(ctx); err != nil {
308-
log.Error(err, "failed to start snapshot reconciler")
307+
setupLog.Error(err, "failed to start snapshot reconciler")
308+
return err
309309
}
310+
return nil
311+
})
310312

311-
}()
312-
313-
wg.Add(1)
314-
go func() {
315-
defer wg.Done()
313+
g.Go(func() error {
316314
setupLog.Info("Starting image events")
317315
if err := imageEvents.Start(ctx); err != nil {
318-
log.Error(err, "failed to start image events")
316+
setupLog.Error(err, "failed to start image events")
317+
return err
319318
}
320-
}()
319+
return nil
320+
})
321321

322-
wg.Add(1)
323-
go func() {
324-
defer wg.Done()
322+
g.Go(func() error {
325323
setupLog.Info("Starting snapshot events")
326324
if err := snapshotEvents.Start(ctx); err != nil {
327-
log.Error(err, "failed to start snapshot events")
325+
setupLog.Error(err, "failed to start snapshot events")
326+
return err
328327
}
329-
}()
328+
return nil
329+
})
330330

331-
wg.Add(1)
332-
go func() {
333-
defer wg.Done()
331+
g.Go(func() error {
334332
setupLog.Info("Starting volume events garbage collector")
335333
volumeEventStore.Start(ctx)
336-
}()
334+
return nil
335+
})
337336

338337
supportedClasses, err := vcr.LoadVolumeClassesFile(opts.PathSupportedVolumeClasses)
339338
if err != nil {
@@ -366,19 +365,31 @@ func Run(ctx context.Context, opts Options) error {
366365
return fmt.Errorf("error creating server: %w", err)
367366
}
368367

369-
log.V(1).Info("Cleaning up any previous socket")
368+
g.Go(func() error {
369+
setupLog.Info("Starting grpc server")
370+
if err := runGRPCServer(ctx, setupLog, log, srv, opts); err != nil {
371+
setupLog.Error(err, "failed to start grpc server")
372+
return err
373+
}
374+
return nil
375+
})
376+
return g.Wait()
377+
}
378+
379+
func runGRPCServer(ctx context.Context, setupLog logr.Logger, log logr.Logger, srv *volumeserver.Server, opts Options) error {
380+
setupLog.V(1).Info("Cleaning up any previous socket")
370381
if err := common.CleanupSocketIfExists(opts.Address); err != nil {
371382
return fmt.Errorf("error cleaning up socket: %w", err)
372383
}
373384

374-
log.V(1).Info("Start listening on unix socket", "Address", opts.Address)
385+
setupLog.V(1).Info("Start listening on unix socket", "Address", opts.Address)
375386
l, err := net.Listen("unix", opts.Address)
376387
if err != nil {
377388
return fmt.Errorf("failed to listen: %w", err)
378389
}
379390
defer func() {
380391
if err := l.Close(); err != nil {
381-
log.Error(err, "Error closing socket")
392+
setupLog.Error(err, "failed to close listener")
382393
}
383394
}()
384395

@@ -396,18 +407,15 @@ func Run(ctx context.Context, opts Options) error {
396407
)
397408
iriv1alpha1.RegisterVolumeRuntimeServer(grpcSrv, srv)
398409

399-
setupLog.Info("Starting server", "Address", l.Addr().String())
410+
setupLog.Info("Starting grpc server", "Address", l.Addr().String())
400411
go func() {
401-
defer func() {
402-
setupLog.Info("Shutting down server")
403-
grpcSrv.Stop()
404-
setupLog.Info("Shut down server")
405-
}()
406412
<-ctx.Done()
413+
setupLog.Info("Shutting down grpc server")
414+
grpcSrv.GracefulStop()
415+
setupLog.Info("Shut down grpc server")
407416
}()
408417
if err := grpcSrv.Serve(l); err != nil {
409-
return fmt.Errorf("error serving: %w", err)
418+
return fmt.Errorf("error serving grpc: %w", err)
410419
}
411-
wg.Wait()
412420
return nil
413421
}

0 commit comments

Comments
 (0)