Skip to content

Commit 0d1a23d

Browse files
committed
fix: resolve goroutine leak and simplify missed-run detection in scheduler
- Store cron.Cron instance so it can be stopped during FileCache shutdown - Initialize stopAsyncUpload channel (was nil, making select cases dead code) - Call cron.Stop() in FileCache.Stop() and wait for graceful shutdown - Replace hand-rolled Schedule.Next() loop with ScheduleWithPrev.Prev() and WithRunImmediately() for cleaner active-window detection
1 parent 1c9fb97 commit 0d1a23d

File tree

2 files changed

+43
-13
lines changed

2 files changed

+43
-13
lines changed

component/file_cache/file_cache.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
"github.com/Seagate/cloudfuse/internal"
4646
"github.com/Seagate/cloudfuse/internal/handlemap"
4747
"github.com/Seagate/cloudfuse/internal/stats_manager"
48+
cron "github.com/netresearch/go-cron"
4849
)
4950

5051
// Common structure for Component
@@ -84,6 +85,7 @@ type FileCache struct {
8485
activeWindows int
8586
activeWindowsMutex *sync.Mutex
8687
closeWindowCh chan struct{}
88+
cronScheduler *cron.Cron
8789
}
8890

8991
// Structure defining your config parameters
@@ -181,6 +183,7 @@ func (fc *FileCache) Start(ctx context.Context) error {
181183
log.Debug("Starting file cache stats collector")
182184

183185
fc.uploadNotifyCh = make(chan struct{}, 1)
186+
fc.stopAsyncUpload = make(chan struct{})
184187
err = fc.SetupScheduler()
185188
if err != nil {
186189
log.Warn("FileCache::Start : Failed to setup scheduler [%s]", err.Error())
@@ -193,6 +196,18 @@ func (fc *FileCache) Start(ctx context.Context) error {
193196
func (fc *FileCache) Stop() error {
194197
log.Trace("Stopping component : %s", fc.Name())
195198

199+
// Signal active upload windows to stop
200+
if fc.stopAsyncUpload != nil {
201+
close(fc.stopAsyncUpload)
202+
}
203+
204+
// Stop the cron scheduler and wait for running jobs to complete
205+
if fc.cronScheduler != nil {
206+
log.Info("FileCache::Stop : Stopping cron scheduler")
207+
<-fc.cronScheduler.Stop().Done()
208+
log.Info("FileCache::Stop : Cron scheduler stopped")
209+
}
210+
196211
// Wait for all async upload to complete if any
197212
if fc.lazyWrite {
198213
log.Info("FileCache::Stop : Waiting for async close to complete")

component/file_cache/scheduler.go

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ func (fc *FileCache) SetupScheduler() error {
6363
cronScheduler := cron.New(cron.WithSeconds())
6464
fc.scheduleUploads(cronScheduler, fc.schedule)
6565
cronScheduler.Start()
66+
fc.cronScheduler = cronScheduler
6667

6768
log.Info("FileCache::SetupScheduler : Scheduler started successfully")
6869
return nil
@@ -86,6 +87,11 @@ func (fc *FileCache) scheduleUploads(c *cron.Cron, sched WeeklySchedule) {
8687
log.Info("FileCache::SetupScheduler : Upload window ended")
8788
close(fc.closeWindowCh)
8889
}
90+
91+
parser := cron.MustNewParser(
92+
cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
93+
)
94+
8995
// start up the schedules
9096
for _, config := range sched {
9197
windowName := config.Name
@@ -94,9 +100,29 @@ func (fc *FileCache) scheduleUploads(c *cron.Cron, sched WeeklySchedule) {
94100
log.Info("[%s] Invalid duration '%s': %v\n", windowName, config.Duration, err)
95101
continue
96102
}
103+
104+
// Determine if we're joining a window that's already active by
105+
// finding the most recent scheduled start via Prev().
106+
now := time.Now()
97107
var initialWindowEndTime time.Time
108+
var jobOpts []cron.JobOption
109+
110+
schedule, _ := parser.Parse(config.CronExpr)
111+
if sp, ok := schedule.(cron.ScheduleWithPrev); ok {
112+
prevStart := sp.Prev(now)
113+
if !prevStart.IsZero() && prevStart.Add(duration).After(now) {
114+
// We're inside an active window that started at prevStart.
115+
initialWindowEndTime = prevStart.Add(duration)
116+
// Run immediately to join the in-progress window with shortened duration.
117+
jobOpts = append(jobOpts, cron.WithRunImmediately())
118+
log.Info(
119+
"FileCache::scheduleUploads : [%s] joining active window (started %s, ends %s)",
120+
windowName, prevStart.Format(time.Kitchen), initialWindowEndTime.Format(time.Kitchen),
121+
)
122+
}
123+
}
98124

99-
cronEntryId, err := c.AddFunc(config.CronExpr, func() {
125+
_, err = c.AddFunc(config.CronExpr, func() {
100126
// Start a new window and track it
101127
fc.activeWindowsMutex.Lock()
102128
isFirstWindow := fc.activeWindows == 0
@@ -155,23 +181,12 @@ func (fc *FileCache) scheduleUploads(c *cron.Cron, sched WeeklySchedule) {
155181
fc.servicePendingOps()
156182
}
157183
}
158-
})
184+
}, jobOpts...)
159185
if err != nil {
160186
log.Err("[%s] Failed to schedule cron job with expression '%s': %v\n",
161187
windowName, config.CronExpr, err)
162188
continue
163189
}
164-
165-
// check if this schedule should already be active
166-
// did this schedule have a start time within the last duration?
167-
schedule := c.Entry(cronEntryId)
168-
now := time.Now()
169-
for t := schedule.Schedule.Next(now.Add(-duration)); now.After(t); t = schedule.Schedule.Next(t) {
170-
initialWindowEndTime = t.Add(duration)
171-
}
172-
if !initialWindowEndTime.IsZero() {
173-
go schedule.Job.Run()
174-
}
175190
}
176191
}
177192

0 commit comments

Comments
 (0)