@@ -16,9 +16,13 @@ import (
1616 "github.com/spf13/cobra"
1717 "go.uber.org/automaxprocs/maxprocs"
1818 "go.uber.org/zap"
19+ "google.golang.org/grpc"
20+ "google.golang.org/grpc/credentials/insecure"
21+ "google.golang.org/grpc/keepalive"
1922
2023 "github.com/PythonHacker24/linux-acl-management-backend/api/routes"
2124 "github.com/PythonHacker24/linux-acl-management-backend/config"
25+ "github.com/PythonHacker24/linux-acl-management-backend/internal/grpcpool"
2226 "github.com/PythonHacker24/linux-acl-management-backend/internal/postgresql"
2327 "github.com/PythonHacker24/linux-acl-management-backend/internal/redis"
2428 "github.com/PythonHacker24/linux-acl-management-backend/internal/scheduler"
@@ -101,6 +105,7 @@ func exec() error {
101105 )
102106 }
103107
108+ /* preparing graceful shutdown for CTRL+C and docker */
104109 ctx , cancel := context .WithCancel (context .Background ())
105110 defer cancel ()
106111
@@ -121,6 +126,9 @@ func run(ctx context.Context) error {
121126 wg sync.WaitGroup
122127 )
123128
129+
130+
131+
124132 /* RULE: complete backend system must initiate before http server starts */
125133
126134 /* DATABASE CONNECTIONS MUST BE MADE BEFORE SCHEDULER STARTS */
@@ -142,6 +150,7 @@ func run(ctx context.Context) error {
142150 config .BackendConfig .Database .ArchivalPQ .SSLMode ,
143151 )
144152
153+ /* connect to PostgreSQL database */
145154 connPQ , err := pgx .Connect (context .Background (), pqDB )
146155 if err != nil {
147156 fmt .Fprintf (os .Stderr , "Unable to connect to database: %v\n " , err )
@@ -150,13 +159,57 @@ func run(ctx context.Context) error {
150159
151160 archivalPQ := postgresql .New (connPQ )
152161
162+ /* create a error channel */
163+ errChLog := make (chan error , 1 )
164+
165+ /* create the client pool for daemons (via gRPC) */
166+ /* unsecure for now */
167+
168+ /* attempting to keep connections alive all the time even with no activity */
169+ var kacp = keepalive.ClientParameters {
170+ /* send pings every 10 seconds if there is no activity */
171+ Time : 10 * time .Second ,
172+
173+ /* wait 2 second for ping ack before considering the connection dead */
174+ Timeout : 2 * time .Second ,
175+
176+ /* send pings even without active streams */
177+ PermitWithoutStream : true ,
178+ }
179+
180+ pool := grpcpool .NewClientPool (
181+ grpc .WithTransportCredentials (insecure .NewCredentials ()),
182+ grpc .WithKeepaliveParams (kacp ),
183+ )
184+
185+ for _ , system := range config .BackendConfig .FileSystemServers {
186+ /* check if system is remote */
187+ if system .Remote != nil {
188+ address := fmt .Sprintf ("%s:%d" , system .Remote .Host , system .Remote .Port )
189+ go func (addr string , errCh chan <- error ) {
190+ _ , err := pool .GetConn (addr , errCh )
191+ if err != nil {
192+ zap .L ().Error ("Failed to get connect with a daemon" ,
193+ zap .String ("Address" , addr ),
194+ zap .Error (err ),
195+ )
196+ }
197+
198+ /* now test for connections */
199+ zap .L ().Info ("Connected to" ,
200+ zap .String ("address" , addr ),
201+ )
202+
203+ }(address , errChLog )
204+ }
205+ }
206+
153207 /*
154208 initializing scheduler
155209 scheduler uses context to quit - part of waitgroup
156210 propagates error through error channel
157211 */
158212 errChShed := make (chan error , 1 )
159- errChLog := make (chan error , 1 )
160213
161214 /* create a session manager */
162215 sessionManager := session .NewManager (logRedisClient , archivalPQ , errChLog )
@@ -187,7 +240,8 @@ func run(ctx context.Context) error {
187240 return
188241 }
189242 }
190- }(ctx )
243+ /* update this to use different ctx called essentialctx which ends after everything is shutdown */
244+ }(ctx )
191245
192246 /* currently FCFS scheduler */
193247 transSched := fcfs .NewFCFSScheduler (sessionManager , permProcessor )
@@ -201,6 +255,7 @@ func run(ctx context.Context) error {
201255 /* routes declared in /api/routes.go */
202256 routes .RegisterRoutes (mux , sessionManager )
203257
258+ /* create a http server */
204259 server := & http.Server {
205260 Addr : fmt .Sprintf ("%s:%d" ,
206261 config .BackendConfig .Server .Host ,
@@ -276,6 +331,9 @@ func run(ctx context.Context) error {
276331
277332 wg .Wait ()
278333
334+ /* close connections with daemon */
335+ pool .CloseAll (errChLog )
336+
279337 /* flush Redis data before closing */
280338 if err := logRedisClient .FlushAll (context .Background ()); err != nil {
281339 zap .L ().Error ("Failed to flush Redis data during shutdown" ,
@@ -286,6 +344,8 @@ func run(ctx context.Context) error {
286344 /* close archival database connection */
287345 connPQ .Close (context .Background ())
288346
347+ /* essentialctx must be closed here */
348+
289349 zap .L ().Info ("All background processes closed gracefully" )
290350
291351 return err
0 commit comments