Skip to content
30 changes: 13 additions & 17 deletions cmd/common/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,26 +172,22 @@ func syncNoticeTask() error {
return fmt.Errorf("notice sync task: failed to create notice: %w", err)
}

channelProperties := map[string]string{
"channel_activity": "com.west2online.umeng.MfrMessageActivity",
"huawei_channel_importance": "NORMAL",
"xiaomi_channel_id": config.Vendors.Xiaomi.JwchNotice,
}
// 进行消息推送
err = umeng.SendAndroidGroupcastWithUrl(config.Umeng.Android.AppKey, config.Umeng.Android.AppMasterSecret,
"", "教务处通知", info.Title, constants.UmengJwchNoticeTag, info.URL, channelProperties)
if err != nil {
logger.Errorf("notice sync task: failed to send notice to Android: %v", err)
}
if ok := umeng.EnqueueAsync(func() error {
err = umeng.SendAndroidGroupcastWithUrl("教务处通知", info.Title, "", info.URL, constants.UmengJwchNoticeTag, "教务处")
if err != nil {
logger.Errorf("notice sync task: failed to send notice to Android: %v", err)
}

err = umeng.SendIOSGroupcast(config.Umeng.IOS.AppKey, config.Umeng.IOS.AppMasterSecret,
"教务处通知", "", info.Title, constants.UmengJwchNoticeTag)
if err != nil {
logger.Errorf("notice sync task: failed to send notice to IOS: %v", err)
err = umeng.SendIOSGroupcast("教务处通知", "", info.Title, constants.UmengJwchNoticeTag, "教务处")
if err != nil {
logger.Errorf("notice sync task: failed to send notice to IOS: %v", err)
}
logger.Infof("notice sync task: notice send success")
return nil
}); !ok {
logger.Errorf("umeng async queue full, drop notice notification")
}
logger.Infof("notice sync task: notice send success")

time.Sleep(constants.UmengRateLimitDelay)
}
return nil
}
Expand Down
29 changes: 23 additions & 6 deletions config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,32 @@ type umeng struct {
IOS IOSUmeng `mapstructure:"ios"`
}

type vendor struct {
ExamNotifications string `mapstructure:"ExamNotifications"`
ExamResultsNotifications string `mapstructure:"ExamResultsNotifications"`
JwchNotice string `mapstructure:"JwchNotice"`
type oppo struct {
ChannelID string `mapstructure:"channel_id"`
Category string `mapstructure:"category"`
NotifyLevel string `mapstructure:"notify_level"`
PrivateMsgTemplate struct {
PrivateMsgTemplateID string `mapstructure:"private_msg_template_id"`
} `mapstructure:"private_msg_template"`
}

type huawei struct {
ChannelImportance string `mapstructure:"channel_importance"`
ChannelCategory string `mapstructure:"channel_category"`
}

type localProperties struct {
ChannelID string `json:"channel_id"`
ChannelName string `json:"channel_name"`
}

type vendors struct {
Xiaomi vendor `mapstructure:"xiaomi"`
Huawei vendor `mapstructure:"huawei"`
ChannelActivity string `mapstructure:"channel_activity"`
XiaoMiChannelID string `mapstructure:"xiaomi_channel_id"`
VivoCategory string `mapstructure:"vivo_category"`
Oppo oppo `mapstructure:"oppo"`
Huawei huawei `mapstructure:"huawei"`
LocalProperties localProperties `mapstructure:"local_properties"`
}

type mcp struct {
Expand Down
19 changes: 6 additions & 13 deletions internal/academic/service/get_scores.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ package service
import (
"fmt"
"strings"
"time"

"github.com/bytedance/sonic"

"github.com/west2-online/fzuhelper-server/config"
loginmodel "github.com/west2-online/fzuhelper-server/kitex_gen/model"
"github.com/west2-online/fzuhelper-server/pkg/base"
"github.com/west2-online/fzuhelper-server/pkg/base/context"
Expand Down Expand Up @@ -179,9 +177,10 @@ func (s *AcademicService) handleScoreChange(stuID string, scores []*jwch.Mark) (
scores[i].Name, scores[i].Semester, scores[i].Teacher,
scores[i].ElectiveType,
}, "|"))
err = s.sendNotifications(scores[i].Name, tag)
if err != nil {
return err
if ok := umeng.EnqueueAsync(func() error {
return s.sendNotifications(scores[i].Name, tag)
}); !ok {
logger.Errorf("umeng async queue full, drop score notification, tag:%v", tag)
}
// 写入课程信息,代表发送过通知
_, err = s.db.Academic.CreateCourseOffering(s.ctx, &model.CourseOffering{
Expand All @@ -202,21 +201,15 @@ func (s *AcademicService) handleScoreChange(stuID string, scores []*jwch.Mark) (
}

func (s *AcademicService) sendNotifications(courseName, tag string) (err error) {
err = umeng.SendAndroidGroupcastWithGoApp(config.Umeng.Android.AppKey, config.Umeng.Android.AppMasterSecret,
"", fmt.Sprintf("%v成绩更新啦", courseName), "",
tag)
err = umeng.SendAndroidGroupcastWithGoApp(fmt.Sprintf("%v成绩更新啦", courseName), "", "", tag, fmt.Sprintf("成绩更新%v", tag[:12]))
if err != nil {
logger.Errorf("task queue: failed to send notice to Android: %v", err)
}
err = umeng.SendIOSGroupcast(config.Umeng.IOS.AppKey, config.Umeng.IOS.AppMasterSecret,
fmt.Sprintf("%v成绩更新啦", courseName), "", "",
tag)
err = umeng.SendIOSGroupcast(fmt.Sprintf("%v成绩更新啦", courseName), "", "", tag, fmt.Sprintf("成绩更新%v", tag[:12]))
if err != nil {
logger.Errorf("task queue: failed to send notice to IOS: %v", err)
}

logger.Infof("task queue: send notice to app, tag:%v", tag)
// 停止 30 秒防止 umeng 限流
time.Sleep(constants.UmengRateLimitDelay)
return nil
}
4 changes: 2 additions & 2 deletions internal/academic/service/get_scores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ func TestAcademicService_sendNotifications(t *testing.T) {
Convey("should send notifications to both Android and iOS", func() {
// Given: 准备发送推送的课程信息
courseName := "数据结构"
tag := "test_tag"
tag := "abcdefghijklmnopqrstuvwxyz123456"

// Mock umeng 推送成功
umengAndroidPatch := mockey.Mock(umeng.SendAndroidGroupcastWithGoApp).Return(nil).Build()
Expand All @@ -636,7 +636,7 @@ func TestAcademicService_sendNotifications(t *testing.T) {
Convey("should handle notification errors gracefully", func() {
// Given: 准备发送推送但可能出错
courseName := "数据结构"
tag := "test_tag"
tag := "abcdefghijklmnopqrstuvwxyz123456"

// Mock umeng 推送失败
umengAndroidPatch := mockey.Mock(umeng.SendAndroidGroupcastWithGoApp).Return(fmt.Errorf("android push failed")).Build()
Expand Down
20 changes: 7 additions & 13 deletions internal/course/service/get_course_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import (
"slices"
"sort"
"strings"
"time"

"github.com/bytedance/sonic"

"github.com/west2-online/fzuhelper-server/config"
"github.com/west2-online/fzuhelper-server/internal/course/pack"
"github.com/west2-online/fzuhelper-server/kitex_gen/course"
kitexModel "github.com/west2-online/fzuhelper-server/kitex_gen/model"
Expand Down Expand Up @@ -183,9 +181,10 @@ func (s *CourseService) handleCourseUpdate(term string, newCourses []*kitexModel
hash := utils.GenerateCourseHash(c.Name, term, c.Teacher, c.ElectiveType, c.RawScheduleRules)
if oldAdjust, exists := hashToAdjust[hash]; exists {
if oldAdjust != c.RawAdjust {
err = s.sendNotifications(c.Name, hash)
if err != nil {
return fmt.Errorf("service.GetCourseList: Send notifications failed: %w", err)
if ok := umeng.EnqueueAsync(func() error {
return s.sendNotifications(c.Name, hash)
}); !ok {
logger.Errorf("umeng async queue full, drop course notification, hash:%v", hash)
}
}
}
Expand All @@ -195,20 +194,15 @@ func (s *CourseService) handleCourseUpdate(term string, newCourses []*kitexModel
}

func (s *CourseService) sendNotifications(courseName, tag string) (err error) {
err = umeng.SendAndroidGroupcastWithGoApp(config.Umeng.Android.AppKey, config.Umeng.Android.AppMasterSecret,
"", fmt.Sprintf("[调课] %v", courseName), "", tag)
err = umeng.SendAndroidGroupcastWithGoApp(fmt.Sprintf("[调课] %v", courseName), "", "", tag, fmt.Sprintf("调课%v", tag[:12]))
if err != nil {
logger.Errorf("service.sendNotifications: Send course updated message to Android failed: %v", err)
return err
}

err = umeng.SendIOSGroupcast(config.Umeng.Android.AppKey, config.Umeng.Android.AppMasterSecret,
"", fmt.Sprintf("[调课] %v", courseName), "", tag)
err = umeng.SendIOSGroupcast(fmt.Sprintf("[调课] %v", courseName), "", "", tag, fmt.Sprintf("调课%v", tag[:12]))
if err != nil {
logger.Errorf("service.sendNotifications: Send course updated message to IOS failed: %v", err)
return err
}
time.Sleep(constants.UmengRateLimitDelay)
logger.Infof("service.sendNotifications: Send course updated message, tag:%v", tag)
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/constants/umeng.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import "time"
const (
UmengURL = "https://msgapi.umeng.com/api/send" // 推送 API
UmengMessageExpireTime = 3 * ONE_DAY // 推送消息过期时间
UmengRateLimitDelay = 30 * time.Second // 用于在发送通知循环中等待,防止被友盟限流
UmengRateLimitDelay = 1 * time.Minute // 用于在发送通知中等待,防止被友盟限流
UmengAsyncQueueSize = 500 // 异步发送通知的队列大小
UmengDailyLimit = 500 // 每日最大请求数
)

// Tag
Expand Down
161 changes: 161 additions & 0 deletions pkg/umeng/dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
Copyright 2024 The west2-online Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package umeng

import (
"sync"
"time"

"github.com/west2-online/fzuhelper-server/pkg/constants"
"github.com/west2-online/fzuhelper-server/pkg/logger"
)

// asyncDispatcher 负责异步消费 Umeng 发送任务并执行限流。
// 设计目标:
// 1) 异步:发送端仅入队,不阻塞业务线程或主任务队列。
// 2) 限流:支持最小间隔控制 + 每日配额控制。
// 3) 单例:进程内仅启动一个 dispatcher,确保全局限流一致性。
type asyncDispatcher struct {
// ch 为任务队列通道,元素是“发送函数”。
// 发送函数返回 error,便于统一记录失败日志。
ch chan func() error
// interval 为相邻两次发送的最小间隔。
interval time.Duration
// dailyLimit 为每日最大允许发送次数。
dailyLimit int
// dailyCount 为当天已发送次数。
dailyCount int
// lastResetDate 记录上次重置日期,用于跨日清零计数。
lastResetDate time.Time
// lastRequestTime 记录上一次实际发送的时间,用于间隔限流。
lastRequestTime time.Time
}

var (
// dispatcherOnce 确保 dispatcher 只被初始化一次。
dispatcherOnce sync.Once
// dispatcher 为全局单例实例。
dispatcher *asyncDispatcher
)

// getDispatcher 获取全局 dispatcher 单例。
// 首次调用会完成初始化并启动后台消费协程。
// 该方法为内部使用,外部通过 EnqueueAsync 入队即可。
func getDispatcher() *asyncDispatcher {
dispatcherOnce.Do(func() {
dispatcher = newAsyncDispatcher(constants.UmengAsyncQueueSize, constants.UmengRateLimitDelay, constants.UmengDailyLimit)
// 后台消费协程:串行处理任务,确保限流语义正确。
go dispatcher.run()
})
return dispatcher
}

// newAsyncDispatcher 创建一个新的 dispatcher 实例。
// 参数:
// - queueSize:队列缓冲长度,<=0 时使用默认值。
// - interval:发送最小间隔。
// - dailyLimit:每日最大发送次数,<=0 时使用默认值。
// 返回值仅在 getDispatcher 中使用,避免重复创建。
func newAsyncDispatcher(queueSize int, interval time.Duration, dailyLimit int) *asyncDispatcher {
if queueSize <= 0 {
queueSize = constants.UmengAsyncQueueSize
}
if dailyLimit <= 0 {
dailyLimit = constants.UmengDailyLimit
}
return &asyncDispatcher{
ch: make(chan func() error, queueSize),
interval: interval,
dailyLimit: dailyLimit,
dailyCount: 0,
lastResetDate: time.Now(),
lastRequestTime: time.Now().Add(-interval),
}
}

// EnqueueAsync 将 Umeng 发送任务放入异步队列。
// 特性:
// - 非阻塞:队列满时立即返回 false,不阻塞业务线程。
// - 安全:task 为空时直接返回 false。
// - 单例:内部确保 dispatcher 只初始化一次。
// 用法:将实际发送逻辑包装成闭包传入,例如:
//
// umeng.EnqueueAsync(func() error { return umeng.SendAndroidGroupcastWithGoApp(...) })
func EnqueueAsync(task func() error) bool {
if task == nil {
return false
}
d := getDispatcher()
select {
case d.ch <- task:
return true
default:
return false
}
}

// run 后台消费循环。
// 该循环串行读取队列并执行任务,先限流再发送,保证顺序与配额一致。
// 注意:此方法应仅在后台协程中运行。
func (d *asyncDispatcher) run() {
for task := range d.ch {
d.wait()
if err := task(); err != nil {
logger.Errorf("umeng async task failed: %v", err)
}
}
}

// wait 执行限流等待。
// 逻辑顺序:
// 1) 跨日判断:新的一天重置 dailyCount。
// 2) 每日配额:超过 dailyLimit 则等待到次日零点并重置。
// 3) 间隔限制:确保相邻发送间隔不小于 interval。
// 该方法在后台消费协程中调用,因此可以阻塞而不影响业务线程。
func (d *asyncDispatcher) wait() {
now := time.Now()
if !sameDay(now, d.lastResetDate) {
d.dailyCount = 0
d.lastResetDate = now
}

if d.dailyCount >= d.dailyLimit {
nextDay := time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, now.Location())
time.Sleep(time.Until(nextDay))
d.dailyCount = 0
d.lastResetDate = time.Now()
d.lastRequestTime = time.Now().Add(-d.interval)
}

now = time.Now()
elapsed := now.Sub(d.lastRequestTime)
if elapsed < d.interval {
time.Sleep(d.interval - elapsed)
now = time.Now()
}

d.lastRequestTime = now
d.dailyCount++
}

// sameDay 判断两个时间是否在同一天(按本地时区)。
// 用于每日配额的跨日判断。
func sameDay(a, b time.Time) bool {
ay, am, ad := a.Date()
by, bm, bd := b.Date()
return ay == by && am == bm && ad == bd
}
Loading
Loading