Skip to content

Commit ce62813

Browse files
committed
fix: umeng limit
1 parent a1f2d52 commit ce62813

File tree

5 files changed

+185
-22
lines changed

5 files changed

+185
-22
lines changed

cmd/common/main.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -173,18 +173,21 @@ func syncNoticeTask() error {
173173
}
174174

175175
// 进行消息推送
176-
err = umeng.SendAndroidGroupcastWithUrl("教务处通知", info.Title, "", info.URL, constants.UmengJwchNoticeTag)
177-
if err != nil {
178-
logger.Errorf("notice sync task: failed to send notice to Android: %v", err)
179-
}
176+
if ok := umeng.EnqueueAsync(func() error {
177+
err = umeng.SendAndroidGroupcastWithUrl("教务处通知", info.Title, "", info.URL, constants.UmengJwchNoticeTag)
178+
if err != nil {
179+
logger.Errorf("notice sync task: failed to send notice to Android: %v", err)
180+
}
180181

181-
err = umeng.SendIOSGroupcast("教务处通知", "", info.Title, constants.UmengJwchNoticeTag)
182-
if err != nil {
183-
logger.Errorf("notice sync task: failed to send notice to IOS: %v", err)
182+
err = umeng.SendIOSGroupcast("教务处通知", "", info.Title, constants.UmengJwchNoticeTag)
183+
if err != nil {
184+
logger.Errorf("notice sync task: failed to send notice to IOS: %v", err)
185+
}
186+
logger.Infof("notice sync task: notice send success")
187+
return nil
188+
}); !ok {
189+
logger.Errorf("umeng async queue full, drop notice notification")
184190
}
185-
logger.Infof("notice sync task: notice send success")
186-
187-
time.Sleep(constants.UmengRateLimitDelay)
188191
}
189192
return nil
190193
}

internal/academic/service/get_scores.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package service
1919
import (
2020
"fmt"
2121
"strings"
22-
"time"
2322

2423
"github.com/bytedance/sonic"
2524

@@ -178,9 +177,10 @@ func (s *AcademicService) handleScoreChange(stuID string, scores []*jwch.Mark) (
178177
scores[i].Name, scores[i].Semester, scores[i].Teacher,
179178
scores[i].ElectiveType,
180179
}, "|"))
181-
err = s.sendNotifications(scores[i].Name, tag)
182-
if err != nil {
183-
return err
180+
if ok := umeng.EnqueueAsync(func() error {
181+
return s.sendNotifications(scores[i].Name, tag)
182+
}); !ok {
183+
logger.Errorf("umeng async queue full, drop score notification, tag:%v", tag)
184184
}
185185
// 写入课程信息,代表发送过通知
186186
_, err = s.db.Academic.CreateCourseOffering(s.ctx, &model.CourseOffering{
@@ -211,7 +211,5 @@ func (s *AcademicService) sendNotifications(courseName, tag string) (err error)
211211
}
212212

213213
logger.Infof("task queue: send notice to app, tag:%v", tag)
214-
// 停止 30 秒防止 umeng 限流
215-
time.Sleep(constants.UmengRateLimitDelay)
216214
return nil
217215
}

internal/course/service/get_course_list.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"slices"
2323
"sort"
2424
"strings"
25-
"time"
2625

2726
"github.com/bytedance/sonic"
2827

@@ -182,9 +181,10 @@ func (s *CourseService) handleCourseUpdate(term string, newCourses []*kitexModel
182181
hash := utils.GenerateCourseHash(c.Name, term, c.Teacher, c.ElectiveType, c.RawScheduleRules)
183182
if oldAdjust, exists := hashToAdjust[hash]; exists {
184183
if oldAdjust != c.RawAdjust {
185-
err = s.sendNotifications(c.Name, hash)
186-
if err != nil {
187-
return fmt.Errorf("service.GetCourseList: Send notifications failed: %w", err)
184+
if ok := umeng.EnqueueAsync(func() error {
185+
return s.sendNotifications(c.Name, hash)
186+
}); !ok {
187+
logger.Errorf("umeng async queue full, drop course notification, hash:%v", hash)
188188
}
189189
}
190190
}
@@ -205,7 +205,6 @@ func (s *CourseService) sendNotifications(courseName, tag string) (err error) {
205205
logger.Errorf("service.sendNotifications: Send course updated message to IOS failed: %v", err)
206206
return err
207207
}
208-
time.Sleep(constants.UmengRateLimitDelay)
209208
return nil
210209
}
211210

pkg/constants/umeng.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ import "time"
2121
const (
2222
UmengURL = "https://msgapi.umeng.com/api/send" // 推送 API
2323
UmengMessageExpireTime = 3 * ONE_DAY // 推送消息过期时间
24-
UmengRateLimitDelay = 30 * time.Second // 用于在发送通知循环中等待,防止被友盟限流
24+
UmengRateLimitDelay = 1 * time.Minute // 用于在发送通知中等待,防止被友盟限流
25+
UmengAsyncQueueSize = 500 // 异步发送通知的队列大小
26+
UmengDailyLimit = 500 // 每日最大请求数
2527
)
2628

2729
// Tag

pkg/umeng/dispatcher.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
Copyright 2024 The west2-online Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package umeng
18+
19+
import (
20+
"sync"
21+
"time"
22+
23+
"github.com/west2-online/fzuhelper-server/pkg/constants"
24+
"github.com/west2-online/fzuhelper-server/pkg/logger"
25+
)
26+
27+
// asyncDispatcher 负责异步消费 Umeng 发送任务并执行限流。
28+
// 设计目标:
29+
// 1) 异步:发送端仅入队,不阻塞业务线程或主任务队列。
30+
// 2) 限流:支持最小间隔控制 + 每日配额控制。
31+
// 3) 单例:进程内仅启动一个 dispatcher,确保全局限流一致性。
32+
type asyncDispatcher struct {
33+
// ch 为任务队列通道,元素是“发送函数”。
34+
// 发送函数返回 error,便于统一记录失败日志。
35+
ch chan func() error
36+
// interval 为相邻两次发送的最小间隔。
37+
interval time.Duration
38+
// dailyLimit 为每日最大允许发送次数。
39+
dailyLimit int
40+
// dailyCount 为当天已发送次数。
41+
dailyCount int
42+
// lastResetDate 记录上次重置日期,用于跨日清零计数。
43+
lastResetDate time.Time
44+
// lastRequestTime 记录上一次实际发送的时间,用于间隔限流。
45+
lastRequestTime time.Time
46+
}
47+
48+
var (
49+
// dispatcherOnce 确保 dispatcher 只被初始化一次。
50+
dispatcherOnce sync.Once
51+
// dispatcher 为全局单例实例。
52+
dispatcher *asyncDispatcher
53+
)
54+
55+
// getDispatcher 获取全局 dispatcher 单例。
56+
// 首次调用会完成初始化并启动后台消费协程。
57+
// 该方法为内部使用,外部通过 EnqueueAsync 入队即可。
58+
func getDispatcher() *asyncDispatcher {
59+
dispatcherOnce.Do(func() {
60+
dispatcher = newAsyncDispatcher(constants.UmengAsyncQueueSize, constants.UmengRateLimitDelay, constants.UmengDailyLimit)
61+
// 后台消费协程:串行处理任务,确保限流语义正确。
62+
go dispatcher.run()
63+
})
64+
return dispatcher
65+
}
66+
67+
// newAsyncDispatcher 创建一个新的 dispatcher 实例。
68+
// 参数:
69+
// - queueSize:队列缓冲长度,<=0 时使用默认值。
70+
// - interval:发送最小间隔。
71+
// - dailyLimit:每日最大发送次数,<=0 时使用默认值。
72+
// 返回值仅在 getDispatcher 中使用,避免重复创建。
73+
func newAsyncDispatcher(queueSize int, interval time.Duration, dailyLimit int) *asyncDispatcher {
74+
if queueSize <= 0 {
75+
queueSize = constants.UmengAsyncQueueSize
76+
}
77+
if dailyLimit <= 0 {
78+
dailyLimit = constants.UmengDailyLimit
79+
}
80+
return &asyncDispatcher{
81+
ch: make(chan func() error, queueSize),
82+
interval: interval,
83+
dailyLimit: dailyLimit,
84+
dailyCount: 0,
85+
lastResetDate: time.Now(),
86+
lastRequestTime: time.Now().Add(-interval),
87+
}
88+
}
89+
90+
// EnqueueAsync 将 Umeng 发送任务放入异步队列。
91+
// 特性:
92+
// - 非阻塞:队列满时立即返回 false,不阻塞业务线程。
93+
// - 安全:task 为空时直接返回 false。
94+
// - 单例:内部确保 dispatcher 只初始化一次。
95+
// 用法:将实际发送逻辑包装成闭包传入,例如:
96+
//
97+
// umeng.EnqueueAsync(func() error { return umeng.SendAndroidGroupcastWithGoApp(...) })
98+
func EnqueueAsync(task func() error) bool {
99+
if task == nil {
100+
return false
101+
}
102+
d := getDispatcher()
103+
select {
104+
case d.ch <- task:
105+
return true
106+
default:
107+
return false
108+
}
109+
}
110+
111+
// run 后台消费循环。
112+
// 该循环串行读取队列并执行任务,先限流再发送,保证顺序与配额一致。
113+
// 注意:此方法应仅在后台协程中运行。
114+
func (d *asyncDispatcher) run() {
115+
for task := range d.ch {
116+
d.wait()
117+
if err := task(); err != nil {
118+
logger.Errorf("umeng async task failed: %v", err)
119+
}
120+
}
121+
}
122+
123+
// wait 执行限流等待。
124+
// 逻辑顺序:
125+
// 1) 跨日判断:新的一天重置 dailyCount。
126+
// 2) 每日配额:超过 dailyLimit 则等待到次日零点并重置。
127+
// 3) 间隔限制:确保相邻发送间隔不小于 interval。
128+
// 该方法在后台消费协程中调用,因此可以阻塞而不影响业务线程。
129+
func (d *asyncDispatcher) wait() {
130+
now := time.Now()
131+
if !sameDay(now, d.lastResetDate) {
132+
d.dailyCount = 0
133+
d.lastResetDate = now
134+
}
135+
136+
if d.dailyCount >= d.dailyLimit {
137+
nextDay := time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, now.Location())
138+
time.Sleep(time.Until(nextDay))
139+
d.dailyCount = 0
140+
d.lastResetDate = time.Now()
141+
d.lastRequestTime = time.Now().Add(-d.interval)
142+
}
143+
144+
now = time.Now()
145+
elapsed := now.Sub(d.lastRequestTime)
146+
if elapsed < d.interval {
147+
time.Sleep(d.interval - elapsed)
148+
now = time.Now()
149+
}
150+
151+
d.lastRequestTime = now
152+
d.dailyCount++
153+
}
154+
155+
// sameDay 判断两个时间是否在同一天(按本地时区)。
156+
// 用于每日配额的跨日判断。
157+
func sameDay(a, b time.Time) bool {
158+
ay, am, ad := a.Date()
159+
by, bm, bd := b.Date()
160+
return ay == by && am == bm && ad == bd
161+
}

0 commit comments

Comments
 (0)