11package pluginhelper
22
33import (
4+ "context"
5+ "errors"
46 "net"
7+ "os"
8+ "os/signal"
59 "path"
10+ "syscall"
611
712 "github.com/cloudnative-pg/cnpg-i/pkg/identity"
13+ "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
814 "github.com/spf13/cobra"
915 "github.com/spf13/viper"
1016 "google.golang.org/grpc"
@@ -31,53 +37,7 @@ func CreateMainCmd(identityImpl identity.IdentityServer, enrichers ...ServerEnri
3137 },
3238 Args : cobra .NoArgs ,
3339 RunE : func (cmd * cobra.Command , _ []string ) error {
34- logger := logging .FromContext (cmd .Context ())
35-
36- identityResponse , err := identityImpl .GetPluginMetadata (
37- cmd .Context (),
38- & identity.GetPluginMetadataRequest {})
39- if err != nil {
40- logger .Error (err , "Error while querying the identity service" )
41- return err
42- }
43-
44- pluginPath := viper .GetString ("plugin-path" )
45- pluginName := identityResponse .Name
46- pluginDisplayName := identityResponse .DisplayName
47- pluginVersion := identityResponse .Version
48- socketName := path .Join (pluginPath , identityResponse .Name )
49-
50- grpcServer := grpc .NewServer ()
51- identity .RegisterIdentityServer (
52- grpcServer ,
53- identityImpl )
54- for _ , enrich := range enrichers {
55- enrich (grpcServer )
56- }
57-
58- listener , err := net .Listen (
59- unixNetwork ,
60- socketName ,
61- )
62- if err != nil {
63- logger .Error (err , "While starting server" )
64- return err
65- }
66-
67- logger .Info (
68- "Starting plugin" ,
69- "path" , pluginPath ,
70- "name" , pluginName ,
71- "displayName" , pluginDisplayName ,
72- "version" , pluginVersion ,
73- "socketName" , socketName ,
74- )
75- err = grpcServer .Serve (listener )
76- if err != nil {
77- logger .Error (err , "While terminatind server" )
78- }
79-
80- return err
40+ return run (cmd .Context (), identityImpl , enrichers ... )
8141 },
8242 }
8343
@@ -97,3 +57,125 @@ func CreateMainCmd(identityImpl identity.IdentityServer, enrichers ...ServerEnri
9757
9858 return cmd
9959}
60+
61+ // run starts listining for GRPC requests
62+ func run (ctx context.Context , identityImpl identity.IdentityServer , enrichers ... ServerEnricher ) error {
63+ logger := logging .FromContext (ctx )
64+
65+ identityResponse , err := identityImpl .GetPluginMetadata (
66+ ctx ,
67+ & identity.GetPluginMetadataRequest {})
68+ if err != nil {
69+ logger .Error (err , "Error while querying the identity service" )
70+ return err
71+ }
72+
73+ pluginPath := viper .GetString ("plugin-path" )
74+ pluginName := identityResponse .Name
75+ pluginDisplayName := identityResponse .DisplayName
76+ pluginVersion := identityResponse .Version
77+ socketName := path .Join (pluginPath , identityResponse .Name )
78+
79+ // Remove stale unix socket it still existent
80+ if err := removeStaleSocket (ctx , socketName ); err != nil {
81+ logger .Error (err , "While removing old unix socket" )
82+ return err
83+ }
84+
85+ // Start accepting connections on the socket
86+ listener , err := net .Listen (
87+ unixNetwork ,
88+ socketName ,
89+ )
90+ if err != nil {
91+ logger .Error (err , "While starting server" )
92+ return err
93+ }
94+
95+ // Handle quit-like signal
96+ handleSignals (ctx , listener )
97+
98+ // Create GRPC server
99+ grpcServer := grpc .NewServer (
100+ grpc .ChainUnaryInterceptor (
101+ recovery .UnaryServerInterceptor (recovery .WithRecoveryHandlerContext (panicRecoveryHandler (listener ))),
102+ ),
103+ grpc .ChainStreamInterceptor (
104+ recovery .StreamServerInterceptor (recovery .WithRecoveryHandlerContext (panicRecoveryHandler (listener ))),
105+ ),
106+ )
107+ identity .RegisterIdentityServer (
108+ grpcServer ,
109+ identityImpl )
110+ for _ , enrich := range enrichers {
111+ enrich (grpcServer )
112+ }
113+
114+ logger .Info (
115+ "Starting plugin" ,
116+ "path" , pluginPath ,
117+ "name" , pluginName ,
118+ "displayName" , pluginDisplayName ,
119+ "version" , pluginVersion ,
120+ "socketName" , socketName ,
121+ )
122+
123+ if err = grpcServer .Serve (listener ); ! errors .Is (err , net .ErrClosed ) {
124+ logger .Error (err , "While terminating server" )
125+ }
126+
127+ return nil
128+ }
129+
130+ // removeStaleSocket removes a stale unix domain socket
131+ func removeStaleSocket (ctx context.Context , pluginPath string ) error {
132+ logger := logging .FromContext (ctx )
133+ _ , err := os .Stat (pluginPath )
134+
135+ switch {
136+ case err == nil :
137+ logger .Info ("Removing stale socket" , "pluginPath" , pluginPath )
138+ return os .Remove (pluginPath )
139+
140+ case errors .Is (err , os .ErrNotExist ):
141+ return nil
142+
143+ default :
144+ return err
145+ }
146+ }
147+
148+ // handleSignals makes sure that we close the listening socket
149+ // when we receive a quit-like signal
150+ func handleSignals (ctx context.Context , listener net.Listener ) {
151+ logger := logging .FromContext (ctx )
152+
153+ sigc := make (chan os.Signal , 1 )
154+ signal .Notify (sigc , syscall .SIGTERM , syscall .SIGABRT , syscall .SIGINT )
155+ go func (c chan os.Signal ) {
156+ sig := <- c
157+ logger .Info (
158+ "Caught signal, shutting down." ,
159+ "signal" , sig .String ())
160+
161+ if err := listener .Close (); err != nil {
162+ logger .Error (err , "While stopping server" )
163+ }
164+
165+ os .Exit (1 )
166+ }(sigc )
167+ }
168+
169+ func panicRecoveryHandler (listener net.Listener ) recovery.RecoveryHandlerFuncContext {
170+ return func (ctx context.Context , err any ) error {
171+ logger := logging .FromContext (ctx )
172+ logger .Info ("Panic occurred" , "error" , err )
173+
174+ if closeError := listener .Close (); closeError != nil {
175+ logger .Error (closeError , "While stopping server" )
176+ }
177+
178+ os .Exit (1 )
179+ return nil
180+ }
181+ }
0 commit comments