Skip to content

Commit e2c3619

Browse files
committed
fix scheduler
1 parent 4b69699 commit e2c3619

File tree

4 files changed

+102
-37
lines changed

4 files changed

+102
-37
lines changed

backend/internal/dingtalk/bot.go

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ import (
1111
"sync"
1212
"time"
1313

14+
"sop-chat/internal/dingtalksdk/chatbot"
15+
dingclient "sop-chat/internal/dingtalksdk/client"
16+
1417
cmsclient "github.com/alibabacloud-go/cms-20240330/v6/client"
1518
openapiutil "github.com/alibabacloud-go/darabonba-openapi/v2/utils"
1619
"github.com/alibabacloud-go/tea/tea"
17-
"sop-chat/internal/dingtalksdk/chatbot"
18-
dingclient "sop-chat/internal/dingtalksdk/client"
1920

2021
"sop-chat/internal/config"
2122
"sop-chat/pkg/sopchat"
@@ -98,6 +99,7 @@ func (b *Bot) UpdateConfig(newCfg *config.DingTalkConfig) {
9899
}
99100

100101
// Start 启动钉钉 Stream 连接(非阻塞:SDK 内部以 goroutine 运行消息循环)
102+
// 连接失败时会自动重试,最多重试 5 次,每次间隔指数递增
101103
func (b *Bot) Start() error {
102104
b.cliMu.Lock()
103105
defer b.cliMu.Unlock()
@@ -106,19 +108,44 @@ func (b *Bot) Start() error {
106108
return nil // 已在运行,幂等
107109
}
108110

109-
cli := dingclient.NewStreamClient(
110-
dingclient.WithAppCredential(dingclient.NewAppCredentialConfig(b.dtConfig.ClientId, b.dtConfig.ClientSecret)),
111-
dingclient.WithUserAgent(dingclient.NewDingtalkGoSDKUserAgent()),
112-
dingclient.WithAutoReconnect(true), // 断线自动重连,直到 Stop() 被调用
113-
)
114-
cli.RegisterChatBotCallbackRouter(b.onMessage)
111+
// 重试参数
112+
maxRetries := 5
113+
baseDelay := time.Second
114+
maxDelay := 30 * time.Second
115115

116-
if err := cli.Start(context.Background()); err != nil {
117-
return err
116+
var lastErr error
117+
for attempt := 0; attempt < maxRetries; attempt++ {
118+
if attempt > 0 {
119+
// 指数退避:1s, 2s, 4s, 8s, 16s...
120+
delay := baseDelay << uint(attempt-1)
121+
if delay > maxDelay {
122+
delay = maxDelay
123+
}
124+
log.Printf("[DingTalk] 机器人启动失败,%v 后重试(第 %d/%d 次)...", delay, attempt+1, maxRetries)
125+
time.Sleep(delay)
126+
}
127+
128+
cli := dingclient.NewStreamClient(
129+
dingclient.WithAppCredential(dingclient.NewAppCredentialConfig(b.dtConfig.ClientId, b.dtConfig.ClientSecret)),
130+
dingclient.WithUserAgent(dingclient.NewDingtalkGoSDKUserAgent()),
131+
dingclient.WithAutoReconnect(true), // 断线自动重连,直到 Stop() 被调用
132+
)
133+
cli.RegisterChatBotCallbackRouter(b.onMessage)
134+
135+
if err := cli.Start(context.Background()); err != nil {
136+
lastErr = err
137+
log.Printf("[DingTalk] 机器人启动失败 (clientId=%s, attempt=%d): %v", b.dtConfig.ClientId, attempt+1, err)
138+
// 清理失败的客户端,防止资源泄漏
139+
cli.Close()
140+
continue
141+
}
142+
143+
b.cli = cli
144+
log.Printf("[DingTalk] 机器人已启动,绑定数字员工: %s", b.dtConfig.EmployeeName)
145+
return nil
118146
}
119-
b.cli = cli
120-
log.Printf("[DingTalk] 机器人已启动,绑定数字员工: %s", b.dtConfig.EmployeeName)
121-
return nil
147+
148+
return fmt.Errorf("钉钉机器人启动失败,已重试 %d 次: %w", maxRetries, lastErr)
122149
}
123150

124151
// Stop 停止钉钉 Stream 连接,禁用自动重连后关闭 WebSocket

backend/internal/dingtalksdk/client/client.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ import (
66
"encoding/json"
77
"errors"
88
"fmt"
9-
"sop-chat/internal/dingtalksdk/card"
109
"io"
1110
"net/http"
1211
"net/url"
12+
"sop-chat/internal/dingtalksdk/card"
1313
"sync"
1414
"time"
1515

@@ -92,18 +92,17 @@ func (cli *StreamClient) Start(ctx context.Context) error {
9292

9393
header := make(http.Header)
9494

95-
var dialer *websocket.Dialer
95+
// 始终创建新的 Dialer,避免使用共享的 DefaultDialer 可能导致的问题
96+
dialer := &websocket.Dialer{
97+
HandshakeTimeout: 10 * time.Second,
98+
}
9699

97-
if len(cli.proxy) == 0 {
98-
dialer = websocket.DefaultDialer
99-
} else {
100+
if len(cli.proxy) > 0 {
100101
proxyURL, err := url.Parse(cli.proxy)
101102
if err != nil {
102103
return err
103104
}
104-
dialer = &websocket.Dialer{
105-
Proxy: http.ProxyURL(proxyURL),
106-
}
105+
dialer.Proxy = http.ProxyURL(proxyURL)
107106
}
108107

109108
conn, resp, err := dialer.Dial(wssUrl, header)

backend/internal/scheduler/scheduler.go

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,14 @@ func (s *Scheduler) runTask(task config.ScheduledTaskConfig) {
136136
clientCfg := s.clientCfg
137137
s.mu.Unlock()
138138

139-
log.Printf("[Scheduler] 开始执行任务: %q employee=%q webhook=%s product=%q project=%q workspace=%q region=%q",
140-
task.Name, task.EmployeeName, task.Webhook.Type, task.Product, task.Project, task.Workspace, task.Region)
139+
log.Printf("[Scheduler] ========== 任务触发 ==========")
140+
log.Printf("[Scheduler] 任务名称: %q", task.Name)
141+
log.Printf("[Scheduler] Cron 表达式: %q", task.Cron)
142+
log.Printf("[Scheduler] 数字员工: %q", task.EmployeeName)
143+
log.Printf("[Scheduler] Webhook: type=%s url=%s", task.Webhook.Type, maskURL(task.Webhook.URL))
144+
log.Printf("[Scheduler] 产品配置: product=%q project=%q workspace=%q region=%q",
145+
task.Product, task.Project, task.Workspace, task.Region)
146+
141147
prompt := task.Prompt
142148
if task.ConciseReply {
143149
prompt += "\n\n简化最终输出 适合聊天工具上阅读"
@@ -159,28 +165,30 @@ func (s *Scheduler) runTask(task config.ScheduledTaskConfig) {
159165
taskWorkspace := task.Workspace
160166
taskRegion := task.Region
161167

168+
log.Printf("[Scheduler] 任务 %q 开始查询数字员工...", task.Name)
162169
reply, err := queryEmployee(clientCfg, task.Name, task.EmployeeName, prompt, s.timezone, taskProduct, taskProject, taskWorkspace, taskRegion)
163170
if err != nil {
164171
log.Printf("[Scheduler] 任务 %q 查询数字员工失败: %v", task.Name, err)
165172
return
166173
}
167174

168-
log.Printf("[Scheduler] 任务 %q 数字员工原始响应(%d 字): %s",
169-
task.Name, len([]rune(reply)), reply)
175+
log.Printf("[Scheduler] 任务 %q 数字员工响应完成(%d 字)", task.Name, len([]rune(reply)))
170176

171177
if reply == "" {
172178
log.Printf("[Scheduler] 任务 %q 数字员工返回空响应,跳过发送", task.Name)
173179
return
174180
}
175181

182+
log.Printf("[Scheduler] 任务 %q 开始发送 Webhook (type=%s)...", task.Name, task.Webhook.Type)
176183
raw, err := sendToWebhook(task.Webhook, reply)
177184
if err != nil {
178185
log.Printf("[Scheduler] 任务 %q 发送 webhook 失败: %v(平台响应: %s)", task.Name, err, raw)
179186
return
180187
}
181188

182189
elapsed := time.Since(startTime).Round(time.Millisecond)
183-
log.Printf("[Scheduler] 任务 %q 执行完成,耗时 %s,webhook 平台响应: %s", task.Name, elapsed, raw)
190+
log.Printf("[Scheduler] 任务 %q 执行完成,耗时 %s", task.Name, elapsed)
191+
log.Printf("[Scheduler] ========== 任务结束 ==========")
184192
}
185193

186194
// QueryEmployee 向数字员工发送消息,等待完整响应并返回文本(公开,供外部触发测试使用)
@@ -195,29 +203,37 @@ func QueryEmployeeWithVariables(clientCfg *config.ClientConfig, employeeName, me
195203

196204
// queryEmployee 向数字员工发送消息,等待完整响应并返回文本
197205
func queryEmployee(clientCfg *config.ClientConfig, taskName, employeeName, message string, loc *time.Location, product, project, workspace, region string) (string, error) {
206+
log.Printf("[Scheduler] queryEmployee 开始: task=%q employee=%q", taskName, employeeName)
207+
198208
sopClient, err := client.NewCMSClient(&client.Config{
199209
AccessKeyId: clientCfg.AccessKeyId,
200210
AccessKeySecret: clientCfg.AccessKeySecret,
201211
Endpoint: clientCfg.Endpoint,
202212
})
203213
if err != nil {
214+
log.Printf("[Scheduler] queryEmployee 创建 CMS 客户端失败: %v", err)
204215
return "", fmt.Errorf("创建 CMS 客户端失败: %w", err)
205216
}
217+
log.Printf("[Scheduler] queryEmployee CMS 客户端创建成功")
206218
cms := sopClient.CmsClient
207219

208220
// CMS API 要求必须传有效 ThreadId,先创建一次性线程
209221
threadTitle := fmt.Sprintf("[定时任务] %s @ %s", taskName, time.Now().In(loc).Format("2006-01-02 15:04:05"))
222+
log.Printf("[Scheduler] queryEmployee 创建线程: %s", threadTitle)
210223
threadResp, err := sopClient.CreateThread(&sopchat.ThreadConfig{
211224
EmployeeName: employeeName,
212225
Title: threadTitle,
213226
})
214227
if err != nil {
228+
log.Printf("[Scheduler] queryEmployee 创建线程失败: %v", err)
215229
return "", fmt.Errorf("创建线程失败: %w", err)
216230
}
217231
if threadResp.Body == nil || threadResp.Body.ThreadId == nil || *threadResp.Body.ThreadId == "" {
232+
log.Printf("[Scheduler] queryEmployee CreateThread 返回了空的 ThreadId")
218233
return "", fmt.Errorf("CreateThread 返回了空的 ThreadId")
219234
}
220235
threadId := *threadResp.Body.ThreadId
236+
log.Printf("[Scheduler] queryEmployee 线程创建成功: threadId=%s", threadId)
221237
nowTS := time.Now().Unix()
222238
variables := map[string]interface{}{
223239
"timeStamp": fmt.Sprintf("%d", nowTS),
@@ -265,9 +281,11 @@ func queryEmployee(clientCfg *config.ClientConfig, taskName, employeeName, messa
265281

266282
startSSE := time.Now()
267283

268-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
284+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
269285
defer cancel()
270286

287+
log.Printf("[Scheduler] queryEmployee 开始 SSE 流式请求: employee=%q threadId=%s timeout=30m", employeeName, threadId)
288+
271289
responseChan := make(chan *cmsclient.CreateChatResponse)
272290
errorChan := make(chan error)
273291

@@ -276,6 +294,7 @@ func queryEmployee(clientCfg *config.ClientConfig, taskName, employeeName, messa
276294

277295
var textParts []string
278296
responseCount, msgCount := 0, 0
297+
lastProgressLog := time.Now()
279298

280299
for {
281300
select {
@@ -286,12 +305,19 @@ func queryEmployee(clientCfg *config.ClientConfig, taskName, employeeName, messa
286305
case response, ok := <-responseChan:
287306
if !ok {
288307
result := strings.Join(textParts, "")
289-
log.Printf("[Scheduler] queryEmployee 完成: employee=%q threadId=%s prompt=%q 耗时 %s 共 %d 帧 文本 %d 字",
290-
employeeName, threadId, message, time.Since(startSSE).Round(time.Millisecond), responseCount, len([]rune(result)))
308+
log.Printf("[Scheduler] queryEmployee 完成: employee=%q threadId=%s 耗时 %s 共 %d 帧 文本 %d 字",
309+
employeeName, threadId, time.Since(startSSE).Round(time.Millisecond), responseCount, len([]rune(result)))
291310
return result, nil
292311
}
293312
responseCount++
294313

314+
// 每 30 秒打印一次进度
315+
if time.Since(lastProgressLog) > 30*time.Second {
316+
log.Printf("[Scheduler] queryEmployee 进行中: employee=%q 已收 %d 帧 %d 消息 耗时 %s",
317+
employeeName, responseCount, msgCount, time.Since(startSSE).Round(time.Second))
318+
lastProgressLog = time.Now()
319+
}
320+
295321
if response.StatusCode != nil && *response.StatusCode != 200 {
296322
log.Printf("[Scheduler] queryEmployee 响应状态码异常: %d", *response.StatusCode)
297323
}
@@ -302,8 +328,8 @@ func queryEmployee(clientCfg *config.ClientConfig, taskName, employeeName, messa
302328
// 检测 done 消息
303329
if sopchat.IsDoneMessage(response.Body) {
304330
result := strings.Join(textParts, "")
305-
log.Printf("[Scheduler] queryEmployee 完成: employee=%q threadId=%s prompt=%q 耗时 %s 共 %d 帧 文本 %d 字",
306-
employeeName, threadId, message, time.Since(startSSE).Round(time.Millisecond), responseCount, len([]rune(result)))
331+
log.Printf("[Scheduler] queryEmployee 完成: employee=%q threadId=%s 耗时 %s 共 %d 帧 文本 %d 字",
332+
employeeName, threadId, time.Since(startSSE).Round(time.Millisecond), responseCount, len([]rune(result)))
307333
return result, nil
308334
}
309335

@@ -329,18 +355,27 @@ func queryEmployee(clientCfg *config.ClientConfig, taskName, employeeName, messa
329355
case err, ok := <-errorChan:
330356
if !ok {
331357
result := strings.Join(textParts, "")
332-
log.Printf("[Scheduler] queryEmployee 完成: employee=%q threadId=%s prompt=%q 耗时 %s 共 %d 帧 文本 %d 字",
333-
employeeName, threadId, message, time.Since(startSSE).Round(time.Millisecond), responseCount, len([]rune(result)))
358+
log.Printf("[Scheduler] queryEmployee 完成: employee=%q threadId=%s 耗时 %s 共 %d 帧 文本 %d 字",
359+
employeeName, threadId, time.Since(startSSE).Round(time.Millisecond), responseCount, len([]rune(result)))
334360
return result, nil
335361
}
336362
if err != nil {
337-
log.Printf("[Scheduler] queryEmployee SSE error: %v", err)
363+
log.Printf("[Scheduler] queryEmployee SSE 错误: %v", err)
338364
return strings.Join(textParts, ""), err
339365
}
340366
result := strings.Join(textParts, "")
341-
log.Printf("[Scheduler] queryEmployee 完成: employee=%q threadId=%s prompt=%q 耗时 %s 共 %d 帧 文本 %d 字",
342-
employeeName, threadId, message, time.Since(startSSE).Round(time.Millisecond), responseCount, len([]rune(result)))
367+
log.Printf("[Scheduler] queryEmployee 完成: employee=%q threadId=%s 耗时 %s 共 %d 帧 文本 %d 字",
368+
employeeName, threadId, time.Since(startSSE).Round(time.Millisecond), responseCount, len([]rune(result)))
343369
return result, nil
344370
}
345371
}
346372
}
373+
374+
// maskURL 遮蔽 URL 中的敏感信息,只显示域名和路径的前后部分
375+
func maskURL(url string) string {
376+
if len(url) < 30 {
377+
return url
378+
}
379+
// 保留前 20 个字符和后 10 个字符
380+
return url[:20] + "..." + url[len(url)-10:]
381+
}

backend/internal/wecom/longconn.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,11 @@ func (b *LongConnBot) connect() error {
215215

216216
log.Printf("[WeCom-LongConn] 正在连接 %s ...", wsURL)
217217

218-
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
218+
// 创建新的 Dialer,避免使用共享的 DefaultDialer
219+
dialer := &websocket.Dialer{
220+
HandshakeTimeout: 10 * time.Second,
221+
}
222+
conn, _, err := dialer.Dial(wsURL, nil)
219223
if err != nil {
220224
return fmt.Errorf("WebSocket 连接失败: %w", err)
221225
}

0 commit comments

Comments
 (0)