Skip to content

Commit d137564

Browse files
Create error channel for session manager and processor with graceful shutdown
1 parent 6b9cf46 commit d137564

File tree

1 file changed

+42
-11
lines changed

1 file changed

+42
-11
lines changed

cmd/laclm/main.go

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,12 @@ func exec() error {
6262
}
6363
)
6464

65-
/* adding --config arguement */
65+
/* adding --config argument */
6666
rootCmd.PersistentFlags().StringVar(&configPath, "config", "", "Path to config file")
6767

6868
/* Execute the command */
6969
if err := rootCmd.Execute(); err != nil {
70-
fmt.Printf("arguements error: %s", err.Error())
70+
fmt.Printf("arguments error: %s", err.Error())
7171
os.Exit(1)
7272
}
7373

@@ -96,7 +96,7 @@ func exec() error {
9696

9797
/* calculate max procs accurately (runtime.GOMAXPROCS(0)) */
9898
if _, err := maxprocs.Set(); err != nil {
99-
zap.L().Error("automaxproces: failed to set GOMAXPROCS",
99+
zap.L().Error("automaxprocs: failed to set GOMAXPROCS",
100100
zap.Error(err),
101101
)
102102
}
@@ -151,23 +151,49 @@ func run(ctx context.Context) error {
151151
archivalPQ := postgresql.New(connPQ)
152152

153153
/*
154-
initializing schedular
154+
initializing scheduler
155155
scheduler uses context to quit - part of waitgroup
156-
propogates error through error channel
156+
propagates error through error channel
157157
*/
158-
errCh := make(chan error, 1)
158+
errChShed := make(chan error, 1)
159+
errChLog := make(chan error, 1)
159160

160161
/* create a session manager */
161-
sessionManager := session.NewManager(logRedisClient, archivalPQ)
162+
sessionManager := session.NewManager(logRedisClient, archivalPQ, errChLog)
162163

163164
/* create a permissions processor */
164-
permProcessor := transprocessor.NewPermProcessor()
165+
permProcessor := transprocessor.NewPermProcessor(errChLog)
166+
167+
/* handle session and processor errors */
168+
wg.Add(1)
169+
go func(ctx context.Context) {
170+
defer wg.Done()
171+
zap.L().Info("log error handler started")
172+
for {
173+
select {
174+
case err, ok := <-errChLog:
175+
if !ok {
176+
zap.L().Info("log error channel closed")
177+
return
178+
}
179+
if err != nil {
180+
zap.L().Error("log error occurred",
181+
zap.Error(err),
182+
zap.Time("timestamp", time.Now()),
183+
)
184+
}
185+
case <-ctx.Done():
186+
zap.L().Info("log error handler shutting down")
187+
return
188+
}
189+
}
190+
}(ctx)
165191

166192
/* currently FCFS scheduler */
167193
transSched := fcfs.NewFCFSScheduler(sessionManager, permProcessor)
168194

169195
/* initialize the scheduler */
170-
scheduler.InitSchedular(ctx, transSched, &wg, errCh)
196+
scheduler.InitScheduler(ctx, transSched, &wg, errChShed)
171197

172198
/* setting up http mux and routes */
173199
mux := http.NewServeMux()
@@ -201,14 +227,19 @@ func run(ctx context.Context) error {
201227
all the functions called must be async here and ready for graceful shutdowns
202228
*/
203229

230+
/*
231+
scheduler is a core feature of the application
232+
when an error occurs in the scheduler, the system needs to be shutdown
233+
since nothing can work without the scheduler
234+
*/
204235
select {
205236
case <-ctx.Done():
206237
zap.L().Info("Shutdown process initiated")
207-
case err = <-errCh:
238+
case err = <-errChShed:
208239

209240
/* context done can be called here (optional for now) */
210241

211-
zap.L().Error("Fatal Error from schedular",
242+
zap.L().Error("Fatal Error from scheduler",
212243
zap.Error(err),
213244
)
214245
return err

0 commit comments

Comments
 (0)