Skip to content

Commit e3db727

Browse files
committed
fix
1 parent 14df7ef commit e3db727

File tree

10 files changed

+67
-80
lines changed

10 files changed

+67
-80
lines changed

biz/application/service/config.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@ package service
22

33
import (
44
"context"
5-
"github.com/xh-polaris/psych-core-api/biz/application/dto/basic"
6-
"github.com/xh-polaris/psych-core-api/biz/application/dto/core_api"
75
"time"
86

7+
"github.com/xh-polaris/psych-core-api/biz/application/dto/basic"
8+
"github.com/xh-polaris/psych-core-api/biz/application/dto/core_api"
99
"github.com/xh-polaris/psych-core-api/biz/infra/mapper/config"
10+
"github.com/xh-polaris/psych-core-api/biz/infra/util"
1011
"github.com/xh-polaris/psych-core-api/biz/infra/util/enum"
1112
"github.com/xh-polaris/psych-core-api/pkg/errorx"
1213
"github.com/xh-polaris/psych-core-api/pkg/logs"
@@ -139,6 +140,7 @@ func (c *ConfigService) ConfigGetByUnitID(ctx context.Context, req *core_api.Con
139140
// 根据权限返回不同DTO
140141
switch req.GetAdmin() {
141142
case true:
143+
util.DPrint("configDAO: %+v\n", configDAO.Chat)
142144
return &core_api.ConfigGetByUnitIdResp{
143145
Config: adminConfig(configDAO),
144146
Code: 0,
@@ -151,7 +153,6 @@ func (c *ConfigService) ConfigGetByUnitID(ctx context.Context, req *core_api.Con
151153
Msg: "success",
152154
}, nil
153155
}
154-
155156
return nil, errorx.New(errno.ErrInternalError)
156157
}
157158

biz/domain/engine/asr.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/base64"
66

7+
"github.com/gorilla/websocket"
78
"github.com/xh-polaris/psych-core-api/biz/infra/util"
89
"github.com/xh-polaris/psych-core-api/pkg/app"
910
"github.com/xh-polaris/psych-core-api/pkg/core"
@@ -44,9 +45,12 @@ func (e *Engine) execASRRecv(ctx context.Context) {
4445
return
4546
default:
4647
text, last, err := e.asr.Receive(ctx)
47-
if err != nil && !wsx.IsNormal(err) { // 出现问题, 需要结束整个链路
48+
// 这里由于ASR的问题, 可能出现正常响应也1006, 所以这里跳过
49+
if err != nil && !wsx.IsNormal(err) && !websocket.IsCloseError(err, websocket.CloseAbnormalClosure) { // 出现问题, 需要结束整个链路
4850
e.unexpected(err, "asr receive err")
4951
return
52+
} else if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) {
53+
return
5054
}
5155
if err = e.MWrite(core.MResp, &core.Resp{ID: 0, Type: core.RUserText, Content: text}); err != nil { // 写回响应
5256
e.unexpected(err, "asr receive write err")

biz/domain/engine/auth.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ func (e *Engine) auth(auth *core.Auth) (bool, error) {
3232
} else {
3333
e.uSession = bson.NewObjectID().Hex()
3434
}
35-
util.DPrint("[engine] [auth] info: %+v, merr: %+v\n", alreadyAuth, merr) // debug
36-
return true, e.MWrite(core.MAuth, alreadyAuth) // 前端收到Auth响应后, 需要显示配置中
35+
util.DPrint("[engine] [auth] info: %+v, merr: %+v, uSession: %s\n", alreadyAuth, merr, e.uSession) // debug
36+
return true, e.MWrite(core.MAuth, alreadyAuth) // 前端收到Auth响应后, 需要显示配置中
3737
}
3838

3939
// 已登录

biz/domain/engine/config.go

Lines changed: 12 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -16,73 +16,32 @@ import (
1616

1717
// config 配置app与workflow
1818
func (e *Engine) config() error {
19-
var err error
20-
var cf *core.Config
21-
var wfc *core.WorkFlowConfig
22-
var configResp *core_api.ConfigGetByUnitIdResp
19+
var (
20+
err error
21+
cf *core.Config
22+
wfc *core.WorkFlowConfig
23+
configResp *core_api.ConfigGetByUnitIdResp
24+
)
2325

2426
// 获取配置
2527
req := &core_api.ConfigGetByUnitIdReq{UnitId: e.info[cst.JsonUnitID].(string), Admin: true}
26-
if e.cfgSvc != nil {
27-
if configResp, err = e.cfgSvc.ConfigGetByUnitID(e.ctx, req); err != nil {
28-
logs.Error("[engine] [%s] UnitAppConfigGetByUnitId err: %v", core.AConfig, err)
29-
return e.MWrite(core.MErr, core.ToErr(errorx.WrapByCode(err, errno.GetConfigErr)))
30-
}
31-
} else {
32-
// 本地fallback: 从 conf.ModelConfig 中选取第一个可用 provider 来构造最小 Config
33-
confLocal := conf.GetConfig()
34-
var chatProvider string
35-
var ttsProvider string
36-
if confLocal != nil && confLocal.ModelConfig != nil {
37-
for k := range confLocal.ModelConfig.Chat {
38-
chatProvider = k
39-
break
40-
}
41-
for k := range confLocal.ModelConfig.TTS {
42-
ttsProvider = k
43-
break
44-
}
45-
}
46-
configResp = &core_api.ConfigGetByUnitIdResp{
47-
Config: &core_api.Config{
48-
Type: "",
49-
Chat: &core_api.ChatApp{
50-
Name: "",
51-
Description: "",
52-
Provider: chatProvider,
53-
AppId: "",
54-
},
55-
Tts: &core_api.TTSApp{
56-
Name: "",
57-
Description: "",
58-
Provider: ttsProvider,
59-
AppId: "",
60-
Speaker: "",
61-
},
62-
Report: &core_api.ReportApp{
63-
Name: "",
64-
Description: "",
65-
Provider: "",
66-
AppId: "",
67-
},
68-
},
69-
Code: 0,
70-
Msg: "local-fallback",
71-
}
28+
if configResp, err = e.cfgSvc.ConfigGetByUnitID(e.ctx, req); err != nil {
29+
logs.Error("[engine] [%s] UnitAppConfigGetByUnitId err: %v", core.AConfig, err)
30+
return e.MWrite(core.MErr, core.ToErr(errorx.WrapByCode(err, errno.GetConfigErr)))
7231
}
32+
util.DPrint("configResp: %+v\n", configResp)
7333

7434
// 构造配置
7535
if cf, wfc, err = e.buildConfig(configResp); err != nil {
7636
logs.Error("[workflow] [config] build config err: %v", err)
7737
return errorx.WrapByCode(err, errno.AppConfigErr, errorx.KV("app", "llm"))
7838
}
79-
8039
// 构造llm
8140
if e.llm, err = app.NewChatApp(e.ctx, e.uSession, wfc.ChatConfig); err != nil {
8241
logs.Error("[workflow] [config] new chatApp err: %v", err)
8342
return errorx.WrapByCode(err, errno.AppConfigErr, errorx.KV("app", "llm"))
8443
}
85-
util.DPrint("llm: %+v", e.llm)
44+
util.DPrint("llm: %+v\n", e.llm)
8645
// 构造asr
8746
if e.asr, err = app.NewASRApp(e.uSession, wfc.ASRConfig); err != nil {
8847
logs.Error("[workflow] [config] new asrApp err: %v", err)
@@ -93,7 +52,7 @@ func (e *Engine) config() error {
9352
logs.Error("[workflow] [config] new asrApp err: %v", err)
9453
return errorx.WrapByCode(err, errno.AppConfigErr, errorx.KV("app", "tts"))
9554
}
96-
util.DPrint("tts: %+v", e.tts)
55+
util.DPrint("tts: %+v\n", e.tts)
9756
// 返回前端
9857
util.DPrint("[engine] [config] workflow config: %+v\n conf: %+v\n", wfc, cf)
9958
return e.MWrite(core.MConfig, cf)

biz/domain/engine/llm.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func (e *Engine) execLLM(ctx context.Context, cmd *core.Cmd) (err error) {
2828
}
2929

3030
// 创建用户消息
31-
oids, err := util.ObjectIDsFromHex(e.uSession, e.info[cst.UserID].(string))
31+
oids, err := util.ObjectIDsFromHex(e.uSession, e.info[cst.JsonUserID].(string))
3232
if err != nil {
3333
return errorx.WrapByCode(err, errno.RetrieveHisErr)
3434
}
@@ -44,22 +44,24 @@ func (e *Engine) execLLM(ctx context.Context, cmd *core.Cmd) (err error) {
4444
// 创建模型消息
4545
astMsg := convert.AssistantMMsg(oids[0], oids[1], "", index+1)
4646

47-
util.DPrint("mMsgs:%s", mMsgs)
47+
util.DPrint("mMsgs:%+v", mMsgs)
4848
// 调用大模型
4949
eMsgs := convert.MMsgToEMsgList(mMsgs) // 存储域消息转模型域
50-
//ctx, e.llmCancel = context.WithCancel(ctx)
51-
stream, err := e.llm.Stream(ctx, eMsgs)
50+
51+
var subctx context.Context
52+
subctx, e.llmCancel = context.WithCancel(ctx)
53+
stream, err := e.llm.Stream(subctx, eMsgs)
5254
if err != nil {
53-
return errorx.WrapByCode(err, errno.RetrieveHisErr)
55+
return errorx.WrapByCode(err, errno.LLMStreamErr)
5456
}
5557

5658
// 拷贝流以用作不同用途
5759
streams := stream.Copy(2)
5860
ret, tts := streams[0], streams[1] // 分别用于返回给前端与TTS音频生成
5961
// 返回给前端
60-
go e.execLLMResponse(ctx, cmd.ID, ret, astMsg)
62+
go e.execLLMResponse(subctx, cmd.ID, ret, astMsg)
6163
// 启用tts发送
62-
go e.execTTS(ctx, cmd.ID, tts)
64+
go e.execTTS(subctx, cmd.ID, tts)
6365
return err
6466
}
6567

biz/domain/llm/impl/coze.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ package impl
22

33
import (
44
"context"
5+
"errors"
6+
"io"
7+
"net"
58
"net/http"
9+
"time"
610

711
"github.com/cloudwego/eino/components/model"
812
"github.com/cloudwego/eino/schema"
@@ -28,8 +32,26 @@ type CozeModel struct {
2832
botId string
2933
}
3034

35+
var cozeDial = &net.Dialer{
36+
Timeout: 30 * time.Second,
37+
KeepAlive: 30 * time.Second,
38+
}
39+
40+
var cozeHttpCli = &http.Client{
41+
Transport: &http.Transport{
42+
Proxy: http.ProxyFromEnvironment,
43+
DialContext: cozeDial.DialContext,
44+
ForceAttemptHTTP2: true,
45+
MaxIdleConns: 100,
46+
IdleConnTimeout: 90 * time.Second,
47+
TLSHandshakeTimeout: 10 * time.Second,
48+
ExpectContinueTimeout: 1 * time.Second,
49+
},
50+
Timeout: 0,
51+
}
52+
3153
func NewCozeModel(ctx context.Context, url, sk, uid, botId string) (_ model.ToolCallingChatModel, err error) {
32-
cozeCli := coze.NewCozeAPI(coze.NewTokenAuth(sk), coze.WithBaseURL(url), coze.WithHttpClient(http.DefaultClient))
54+
cozeCli := coze.NewCozeAPI(coze.NewTokenAuth(sk), coze.WithBaseURL(url), coze.WithHttpClient(cozeHttpCli))
3355
return &CozeModel{Coze, &cozeCli, uid, botId}, nil
3456
}
3557

@@ -72,7 +94,7 @@ func process(ctx context.Context, reader coze.Stream[coze.ChatEvent], writer *sc
7294
return
7395
default:
7496
if event, err = reader.Recv(); err != nil {
75-
logs.Errorf("[coze] process recv err: %s", err)
97+
logs.CondErrorf(errors.Is(err, io.EOF), "[coze] process recv err: %s", err)
7698
writer.Send(nil, err)
7799
return
78100
}

pkg/app/volc/asr/asr.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package asr
66

77
import (
88
"encoding/json"
9-
"fmt"
109
"net/http"
1110

1211
"github.com/gorilla/websocket"
@@ -127,7 +126,7 @@ func (asr *VcASRApp) Receive(_ context.Context) (text string, last bool, err err
127126
case websocket.TextMessage:
128127
return asr.receiveText(msg)
129128
default:
130-
return "", false, fmt.Errorf("[volc asr] Receive: invalid websocket message")
129+
return "", false, err
131130
}
132131
}
133132
return "", false, err
@@ -150,7 +149,6 @@ func (asr *VcASRApp) Close() (err error) {
150149
// buildHTTPHeader 构造鉴权请求头
151150
func (asr *VcASRApp) buildHTTPHeader() {
152151
asr.header = http.Header{
153-
"X-Tt-Logid": []string{asr.uSession},
154152
"X-Api-Resource-Id": []string{asr.setting.ResourceId},
155153
"X-Api-Access-Key": []string{asr.setting.AccessKey},
156154
"X-Api-App-Key": []string{asr.setting.AppID},

pkg/wsx/helper.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ func (ws *WSClient) classifyErr(err error) error {
5656
return NormalCloseErr
5757
case websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived):
5858
// 为了避免内部错误被隐藏, 此处日志记录错误原因
59-
logs.Errorf("[WSClient] close error%v", err)
59+
logs.Errorf("[WSClient] close error: %v", err)
6060
ws.closed = true
61-
return AbnormalCloseErr
61+
return err
6262
default:
6363
return err
6464
}
@@ -96,7 +96,7 @@ func NewWSClientWithDial(ctx context.Context, url string, header http.Header) (*
9696
// 连接失败若有响应, 打印错误日志
9797
if r != nil {
9898
if body, parseErr := io.ReadAll(r.Body); parseErr == nil {
99-
logs.Error("[WSClient] parse conn resp body:%s", string(body))
99+
logs.Errorf("[WSClient] parse conn resp body:%s", string(body))
100100
}
101101
}
102102
return nil, err

pkg/wsx/hzhelper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func (ws *HZWSClient) classifyErr(err error) error {
2323
// 为了避免内部错误被隐藏, 此处日志记录错误原因
2424
logs.Error("[HZWSClient] close error", err)
2525
ws.closed = true
26-
return AbnormalCloseErr
26+
return err
2727
default:
2828
return err
2929
}

test/component/volc_asr_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func TestVolcASRApp(t *testing.T) {
5555
func sendAudio(ctx context.Context, t *testing.T, asrApp app.ASRApp, file *os.File) {
5656

5757
buf := make([]byte, 3200)
58-
//last := []byte{app.LastASR}
58+
last := []byte{app.LastASR}
5959

6060
i := 0
6161

@@ -92,9 +92,10 @@ func sendAudio(ctx context.Context, t *testing.T, asrApp app.ASRApp, file *os.Fi
9292

9393
END:
9494

95-
//if err := asrApp.Send(ctx, last); err != nil {
96-
// t.Errorf("发送 last 失败: %v", err)
97-
//}
95+
if err := asrApp.Send(ctx, last); err != nil {
96+
t.Errorf("发送 last 失败: %v", err)
97+
}
98+
t.Log("发送Last")
9899
}
99100

100101
func receiveResults(ctx context.Context, t *testing.T, app app.ASRApp) {
@@ -149,5 +150,5 @@ func GetASRApp(t *testing.T) app.ASRApp {
149150
ResultType: GetTestConfig()["VCASRAppResultType"].(string),
150151
}
151152
t.Logf("asr setting: %v", setting)
152-
return asr.NewVcASRApp("vc-asr-test", setting)
153+
return asr.NewVcASRApp("", setting)
153154
}

0 commit comments

Comments
 (0)