Skip to content
This repository was archived by the owner on Jun 26, 2023. It is now read-only.

Commit b992d23

Browse files
committed
use a locking mechanism for the scheduler instead of the singleton model in the cron lib
1 parent c216161 commit b992d23

File tree

3 files changed

+19
-9
lines changed

3 files changed

+19
-9
lines changed

internal/cmd/scheduler.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,14 @@ import (
2323
"github.com/christianselig/apollo-backend/internal/repository"
2424
)
2525

26-
const batchSize = 250
27-
const accountEnqueueSeconds = 60
26+
const (
27+
batchSize = 250
28+
accountEnqueueSeconds = 60
29+
)
30+
31+
var (
32+
enqueueAccountsMutex sync.Mutex
33+
)
2834

2935
func SchedulerCmd(ctx context.Context) *cobra.Command {
3036
cmd := &cobra.Command{
@@ -103,9 +109,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command {
103109
s := gocron.NewScheduler(time.UTC)
104110
s.SetMaxConcurrentJobs(8, gocron.WaitMode)
105111

106-
eaj, _ := s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) })
107-
eaj.SingletonMode()
108-
112+
_, _ = s.Every(5).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) })
109113
_, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, []rmq.Queue{subredditQueue, trendingQueue}) })
110114
_, _ = s.Every(5).Seconds().Do(func() { enqueueUsers(ctx, logger, statsd, db, userQueue) })
111115
_, _ = s.Every(5).Seconds().Do(func() { enqueueLiveActivities(ctx, logger, db, redis, luaSha, liveActivitiesQueue) })
@@ -456,6 +460,12 @@ func enqueueStuckAccounts(ctx context.Context, logger *zap.Logger, statsd *stats
456460
}
457461

458462
func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Client, pool *pgxpool.Pool, redisConn *redis.Client, luaSha string, queue rmq.Queue) {
463+
if enqueueAccountsMutex.TryLock() {
464+
defer enqueueAccountsMutex.Unlock()
465+
} else {
466+
return
467+
}
468+
459469
ctx, cancel := context.WithCancel(ctx)
460470
defer cancel()
461471

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
2-
"access_token": "***REMOVED***",
2+
"access_token": "xxx",
33
"token_type": "bearer",
44
"expires_in": 3600,
5-
"refresh_token": "***REMOVED***",
5+
"refresh_token": "yyy",
66
"scope": "account creddits edit flair history identity livemanage modconfig modcontributors modflair modlog modmail modothers modposts modself modtraffic modwiki mysubreddits privatemessages read report save structuredstyles submit subscribe vote wikiedit wikiread"
77
}

internal/reddit/types_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ func TestRefreshTokenResponseParsing(t *testing.T) {
5757
rtr := ret.(*reddit.RefreshTokenResponse)
5858
assert.NotNil(t, rtr)
5959

60-
assert.Equal(t, "***REMOVED***", rtr.AccessToken)
61-
assert.Equal(t, "***REMOVED***", rtr.RefreshToken)
60+
assert.Equal(t, "xxx", rtr.AccessToken)
61+
assert.Equal(t, "yyy", rtr.RefreshToken)
6262
assert.Equal(t, 1*time.Hour, rtr.Expiry)
6363
}
6464

0 commit comments

Comments
 (0)