Skip to content

Commit 22b31e5

Browse files
committed
feat: 实现 Meilisearch 写入失败补偿/重试机制
- 创建 MeiliWorker:异步写入队列 + 失败重试(最大3次,间隔5秒) - 在 link service 中集成异步索引写入(创建链接时) - 在 link handler 中集成异步删除写入(删除链接时) - 在 router 中初始化并启动 MeiliWorker - 实现 redo.md 2.6:Meilisearch 写入失败补偿/重试/后台任务
1 parent 03daf69 commit 22b31e5

File tree

5 files changed

+216
-4
lines changed

5 files changed

+216
-4
lines changed

internal/app/app.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ func Run() error {
8282
if v2.StatsWorker != nil {
8383
v2.StatsWorker.Start()
8484
}
85+
// 启动 Meilisearch Worker
86+
if v2.LinkService != nil && v2.LinkService.GetMeiliWorker() != nil {
87+
v2.LinkService.GetMeiliWorker().Start()
88+
}
8589
httpv2.RegisterRoutes(router, v2)
8690
}
8791

internal/httpv2/handlers/link_handler.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"time"
1313

1414
appcfg "short-link/internal/config"
15+
"short-link/internal/jobs"
1516
"short-link/internal/repo"
1617
"short-link/internal/service"
1718
"short-link/models"
@@ -27,17 +28,19 @@ type LinkHandler struct {
2728
domainRepo *repo.DomainRepo
2829
searchService *service.SearchService
2930
auditLogRepo *repo.AuditLogRepo
31+
meiliWorker *jobs.MeiliWorker
3032
}
3133

3234
// NewLinkHandler 创建 LinkHandler
33-
func NewLinkHandler(cfg *appcfg.Config, linkService *service.LinkService, linkRepo *repo.LinkRepo, domainRepo *repo.DomainRepo, searchService *service.SearchService, auditLogRepo *repo.AuditLogRepo) *LinkHandler {
35+
func NewLinkHandler(cfg *appcfg.Config, linkService *service.LinkService, linkRepo *repo.LinkRepo, domainRepo *repo.DomainRepo, searchService *service.SearchService, auditLogRepo *repo.AuditLogRepo, meiliWorker *jobs.MeiliWorker) *LinkHandler {
3436
return &LinkHandler{
3537
cfg: cfg,
3638
linkService: linkService,
3739
linkRepo: linkRepo,
3840
domainRepo: domainRepo,
3941
searchService: searchService,
4042
auditLogRepo: auditLogRepo,
43+
meiliWorker: meiliWorker,
4144
}
4245
}
4346

@@ -206,6 +209,11 @@ func (h *LinkHandler) DeleteLink(c *gin.Context) {
206209
return
207210
}
208211

212+
// 异步提交 Meilisearch 删除任务(非阻塞)
213+
if h.meiliWorker != nil {
214+
h.meiliWorker.Submit("delete", nil, target.ID)
215+
}
216+
209217
// 记录审计日志(敏感操作)
210218
if h.auditLogRepo != nil {
211219
linkID := target.ID

internal/httpv2/router.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,25 @@ func New() (*Module, error) {
7272
// 初始化异步统计 Worker(批量大小50,等待间隔2秒)
7373
statsWorker := jobs.NewStatsWorker(linkRepo, accessLogRepo, 50, 2*time.Second)
7474

75+
// 初始化 Meilisearch Worker(最大重试3次,重试间隔5秒)
76+
var meiliWorker *jobs.MeiliWorker
77+
meiliWorker, err = jobs.NewMeiliWorker(cfg, 3, 5*time.Second)
78+
if err != nil {
79+
utils.LogWarn("Meilisearch Worker 初始化失败,索引写入将不可用: %v", err)
80+
meiliWorker = nil
81+
}
82+
7583
userService := service.NewUserService(userRepo)
7684
permissionService := service.NewPermissionService(permissionRepo)
77-
linkService := service.NewLinkService(cfg.BaseURL, cfg.MinCodeLength, cfg.MaxCodeLength, linkRepo, domainRepo, settingsRepo, userRepo, accessLogRepo, statsWorker)
85+
linkService := service.NewLinkService(cfg.BaseURL, cfg.MinCodeLength, cfg.MaxCodeLength, linkRepo, domainRepo, settingsRepo, userRepo, accessLogRepo, statsWorker, meiliWorker)
7886
searchService, err := service.NewSearchService(cfg)
7987
if err != nil {
8088
utils.LogWarn("Meilisearch(v2) 初始化失败,搜索功能将不可用: %v", err)
8189
searchService = nil
8290
}
8391

8492
authHandler := handlers.NewAuthHandler(cfg, userService, auditLogRepo)
85-
linkHandler := handlers.NewLinkHandler(cfg, linkService, linkRepo, domainRepo, searchService, auditLogRepo)
93+
linkHandler := handlers.NewLinkHandler(cfg, linkService, linkRepo, domainRepo, searchService, auditLogRepo, meiliWorker)
8694
redirectHandler := handlers.NewRedirectHandler(linkService)
8795
statsHandler := handlers.NewStatsHandler(linkService)
8896

@@ -112,6 +120,9 @@ func (m *Module) Close() {
112120
if m.StatsWorker != nil {
113121
m.StatsWorker.Stop()
114122
}
123+
if m.LinkService != nil && m.LinkService.GetMeiliWorker() != nil {
124+
m.LinkService.GetMeiliWorker().Stop()
125+
}
115126
if m.Pool != nil {
116127
m.Pool.Close()
117128
}

internal/jobs/meili_job.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/**
2+
* Meilisearch 写入任务队列与 Worker(异步写入 + 失败重试)
3+
* 实现 redo.md 2.6:Meilisearch 写入失败补偿/重试/后台任务
4+
*/
5+
package jobs
6+
7+
import (
8+
"context"
9+
"encoding/json"
10+
"fmt"
11+
"sync"
12+
"time"
13+
14+
"short-link/internal/config"
15+
"short-link/models"
16+
"short-link/utils"
17+
18+
"github.com/meilisearch/meilisearch-go"
19+
)
20+
21+
// MeiliTask Meilisearch 写入任务
22+
type MeiliTask struct {
23+
Action string `json:"action"` // "index", "delete"
24+
Link *models.Link `json:"link,omitempty"`
25+
LinkID int64 `json:"link_id,omitempty"`
26+
}
27+
28+
// MeiliWorker Meilisearch 写入 Worker
29+
type MeiliWorker struct {
30+
taskChan chan *MeiliTask
31+
client *meilisearch.Client
32+
index *meilisearch.Index
33+
maxRetries int
34+
retryDelay time.Duration
35+
wg sync.WaitGroup
36+
ctx context.Context
37+
cancel context.CancelFunc
38+
}
39+
40+
// NewMeiliWorker 创建 Meilisearch Worker
41+
func NewMeiliWorker(cfg *config.Config, maxRetries int, retryDelay time.Duration) (*MeiliWorker, error) {
42+
client := meilisearch.NewClient(meilisearch.ClientConfig{
43+
Host: cfg.MeiliHost,
44+
APIKey: cfg.MeiliKey,
45+
})
46+
47+
// 测试连接
48+
if _, err := client.Health(); err != nil {
49+
return nil, fmt.Errorf("Meilisearch连接失败: %w", err)
50+
}
51+
52+
index := client.Index("links")
53+
ctx, cancel := context.WithCancel(context.Background())
54+
55+
return &MeiliWorker{
56+
taskChan: make(chan *MeiliTask, 1000), // 缓冲1000个任务
57+
client: client,
58+
index: index,
59+
maxRetries: maxRetries,
60+
retryDelay: retryDelay,
61+
ctx: ctx,
62+
cancel: cancel,
63+
}, nil
64+
}
65+
66+
// Submit 提交 Meilisearch 写入任务(非阻塞)
67+
func (w *MeiliWorker) Submit(action string, link *models.Link, linkID int64) {
68+
task := &MeiliTask{
69+
Action: action,
70+
Link: link,
71+
LinkID: linkID,
72+
}
73+
74+
select {
75+
case w.taskChan <- task:
76+
// 成功提交
77+
default:
78+
// 队列满,静默丢弃(避免阻塞主流程)
79+
utils.LogWarn("Meilisearch任务队列已满,丢弃任务: action=%s, link_id=%d", action, linkID)
80+
}
81+
}
82+
83+
// Start 启动 Worker(后台 goroutine)
84+
func (w *MeiliWorker) Start() {
85+
w.wg.Add(1)
86+
go w.run()
87+
utils.LogInfo("Meilisearch Worker 已启动(最大重试次数=%d,重试间隔=%v)", w.maxRetries, w.retryDelay)
88+
}
89+
90+
// run Worker 主循环
91+
func (w *MeiliWorker) run() {
92+
defer w.wg.Done()
93+
94+
for {
95+
select {
96+
case <-w.ctx.Done():
97+
return
98+
99+
case task := <-w.taskChan:
100+
w.processTask(task)
101+
}
102+
}
103+
}
104+
105+
// processTask 处理单个任务(带重试)
106+
func (w *MeiliWorker) processTask(task *MeiliTask) {
107+
var err error
108+
for attempt := 0; attempt < w.maxRetries; attempt++ {
109+
if attempt > 0 {
110+
// 重试前等待
111+
select {
112+
case <-w.ctx.Done():
113+
return
114+
case <-time.After(w.retryDelay * time.Duration(attempt)):
115+
}
116+
}
117+
118+
switch task.Action {
119+
case "index":
120+
err = w.indexLink(task.Link)
121+
case "delete":
122+
err = w.deleteLink(task.LinkID)
123+
default:
124+
utils.LogError("未知的 Meilisearch 任务类型: %s", task.Action)
125+
return
126+
}
127+
128+
if err == nil {
129+
// 成功
130+
return
131+
}
132+
133+
utils.LogWarn("Meilisearch写入失败(尝试 %d/%d): action=%s, link_id=%d, error=%v",
134+
attempt+1, w.maxRetries, task.Action, task.LinkID, err)
135+
}
136+
137+
// 所有重试都失败,记录错误(后续可扩展为死信队列)
138+
utils.LogError("Meilisearch写入最终失败: action=%s, link_id=%d, error=%v", task.Action, task.LinkID, err)
139+
}
140+
141+
// indexLink 索引链接到 Meilisearch
142+
func (w *MeiliWorker) indexLink(link *models.Link) error {
143+
if link == nil {
144+
return fmt.Errorf("link 不能为空")
145+
}
146+
147+
doc := map[string]interface{}{
148+
"id": link.ID,
149+
"code": link.Code,
150+
"original_url": link.OriginalURL,
151+
"title": link.Title,
152+
"user_id": link.UserID,
153+
"domain_id": link.DomainID,
154+
"created_at": link.CreatedAt.Unix(),
155+
}
156+
157+
_, err := w.index.AddDocuments([]map[string]interface{}{doc}, "id")
158+
return err
159+
}
160+
161+
// deleteLink 从 Meilisearch 删除链接
162+
func (w *MeiliWorker) deleteLink(linkID int64) error {
163+
_, err := w.index.DeleteDocument(fmt.Sprintf("%d", linkID))
164+
return err
165+
}
166+
167+
// Stop 停止 Worker
168+
func (w *MeiliWorker) Stop() {
169+
w.cancel()
170+
w.wg.Wait()
171+
utils.LogInfo("Meilisearch Worker 已停止")
172+
}
173+
174+

internal/service/link_service.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type LinkService struct {
3232
userRepo *repo.UserRepo
3333
accessLogRepo *repo.AccessLogRepo
3434
statsWorker *jobs.StatsWorker // 异步统计 worker
35+
meiliWorker *jobs.MeiliWorker // Meilisearch 异步写入 worker
3536

3637
// env 默认值(DB settings 可覆盖)
3738
minCodeLen int
@@ -40,14 +41,15 @@ type LinkService struct {
4041
}
4142

4243
// NewLinkService 创建 LinkService
43-
func NewLinkService(baseURL string, minCodeLen int, maxCodeLen int, linkRepo *repo.LinkRepo, domainRepo *repo.DomainRepo, settingsRepo *repo.SettingsRepo, userRepo *repo.UserRepo, accessLogRepo *repo.AccessLogRepo, statsWorker *jobs.StatsWorker) *LinkService {
44+
func NewLinkService(baseURL string, minCodeLen int, maxCodeLen int, linkRepo *repo.LinkRepo, domainRepo *repo.DomainRepo, settingsRepo *repo.SettingsRepo, userRepo *repo.UserRepo, accessLogRepo *repo.AccessLogRepo, statsWorker *jobs.StatsWorker, meiliWorker *jobs.MeiliWorker) *LinkService {
4445
return &LinkService{
4546
linkRepo: linkRepo,
4647
domainRepo: domainRepo,
4748
settingsRepo: settingsRepo,
4849
userRepo: userRepo,
4950
accessLogRepo: accessLogRepo,
5051
statsWorker: statsWorker,
52+
meiliWorker: meiliWorker,
5153
minCodeLen: minCodeLen,
5254
maxCodeLen: maxCodeLen,
5355
baseURL: baseURL,
@@ -366,6 +368,10 @@ func (s *LinkService) CreateLink(ctx context.Context, userID int64, req *models.
366368
}
367369
return nil, "", fmt.Errorf("创建链接失败: %w", err)
368370
}
371+
// 异步提交 Meilisearch 索引任务(非阻塞)
372+
if s.meiliWorker != nil {
373+
s.meiliWorker.Submit("index", link, link.ID)
374+
}
369375
return link, shortURL, nil
370376
}
371377

@@ -396,6 +402,10 @@ func (s *LinkService) CreateLink(ctx context.Context, userID int64, req *models.
396402
}
397403
err := s.linkRepo.CreateLink(ctx, link)
398404
if err == nil {
405+
// 异步提交 Meilisearch 索引任务(非阻塞)
406+
if s.meiliWorker != nil {
407+
s.meiliWorker.Submit("index", link, link.ID)
408+
}
399409
return link, shortURL, nil
400410
}
401411
if repo.IsUniqueViolation(err) {
@@ -423,4 +433,9 @@ func (s *LinkService) GetStats(ctx context.Context) (*models.LinkStats, error) {
423433
return s.linkRepo.GetLinkStats(ctx)
424434
}
425435

436+
// GetMeiliWorker 获取 Meilisearch Worker(用于启动/停止)
437+
func (s *LinkService) GetMeiliWorker() *jobs.MeiliWorker {
438+
return s.meiliWorker
439+
}
440+
426441

0 commit comments

Comments
 (0)