diff --git a/app/favorite/cmd/main.go b/app/favorite/cmd/main.go index 9626c6f..79e5f6f 100644 --- a/app/favorite/cmd/main.go +++ b/app/favorite/cmd/main.go @@ -1,7 +1,7 @@ package main import ( - "fmt" + "github.com/pkg/errors" "net" "github.com/sirupsen/logrus" @@ -36,11 +36,13 @@ func main() { if err != nil { panic(err) } - if _, err := etcdRegister.Register(node, 10); err != nil { - panic(fmt.Sprintf("start service failed, err: %v", err)) + if _, err = etcdRegister.Register(node, 10); err != nil { + logs.LogrusObj.Errorf("start service failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + logs.LogrusObj.Errorf("stack trace: \n%+v\n", err) + panic(err) } logrus.Info("service started listen on ", grpcAddress) - if err := server.Serve(lis); err != nil { + if err = server.Serve(lis); err != nil { panic(err) } } diff --git a/app/favorite/internal/repository/db/dao/favorite.go b/app/favorite/internal/repository/db/dao/favorite.go index f816793..41328ef 100644 --- a/app/favorite/internal/repository/db/dao/favorite.go +++ b/app/favorite/internal/repository/db/dao/favorite.go @@ -2,11 +2,11 @@ package dao import ( "context" + "github.com/pkg/errors" "gorm.io/gorm" favoritePb "github.com/CocaineCong/tangseng/idl/pb/favorite" - log "github.com/CocaineCong/tangseng/pkg/logger" "github.com/CocaineCong/tangseng/repository/mysql/db" "github.com/CocaineCong/tangseng/repository/mysql/model" ) @@ -23,7 +23,7 @@ func (dao *FavoriteDao) ListFavorite(req *favoritePb.FavoriteListReq) (r []*mode err = dao.DB.Model(&model.Favorite{}). Where("user_id = ?", req.UserId).Find(&r).Error if err != nil { - return + return r, errors.Wrapf(err, "failed to query favorite list, userId = %v ", req.UserId) } return } @@ -34,8 +34,7 @@ func (dao *FavoriteDao) CreateFavorite(req *favoritePb.FavoriteCreateReq) (err e UserID: req.UserId, } if err = dao.DB.Create(&favorite).Error; err != nil { - log.LogrusObj.Error("Insert Favorite Error:" + err.Error()) - return + return errors.Wrapf(err, "failed to create favorite, userId = %v ", req.UserId) } return @@ -44,7 +43,9 @@ func (dao *FavoriteDao) CreateFavorite(req *favoritePb.FavoriteCreateReq) (err e func (dao *FavoriteDao) DeleteFavorite(req *favoritePb.FavoriteDeleteReq) (err error) { err = dao.DB.Where("favorite_id = ?", req.FavoriteId). Delete(model.Favorite{}).Error - + if err != nil { + return errors.Wrapf(err, "failed to delete favorite, favoriteId = %v", req.FavoriteId) + } return } @@ -53,7 +54,7 @@ func (dao *FavoriteDao) UpdateFavorite(req *favoritePb.FavoriteUpdateReq) (err e fMap["favorite_name"] = req.FavoriteName err = dao.DB.Where("favorite_id = ?", req.FavoriteId).Updates(&fMap).Error if err != nil { - return + return errors.Wrapf(err, "failed to update favorite, favoriteId = %v ", req.FavoriteId) } return diff --git a/app/favorite/internal/repository/db/dao/favortie_detail.go b/app/favorite/internal/repository/db/dao/favortie_detail.go index 0a46e76..3b019d9 100644 --- a/app/favorite/internal/repository/db/dao/favortie_detail.go +++ b/app/favorite/internal/repository/db/dao/favortie_detail.go @@ -2,6 +2,7 @@ package dao import ( "context" + "github.com/pkg/errors" "gorm.io/gorm" @@ -23,7 +24,7 @@ func (dao *FavoriteDetailDao) CreateFavoriteDetail(req *favoritePb.FavoriteDetai var f []*model.Favorite err = dao.DB.Where("favorite_id = ?", req.FavoriteId).Find(&f).Error if err != nil { - return + return errors.Wrapf(err, "failed to query favorite, favoriteId = %v ", req.FavoriteId) } fd := model.FavoriteDetail{ @@ -34,26 +35,42 @@ func (dao *FavoriteDetailDao) CreateFavoriteDetail(req *favoritePb.FavoriteDetai Favorite: f, } err = dao.DB.Model(&model.FavoriteDetail{}).Create(&fd).Error - + if err != nil { + return errors.Wrapf(err, "failed to create favoriteDetail,userID = %v,urlId = %v", req.UserId, req.UrlId) + } return } func (dao *FavoriteDetailDao) ListFavoriteDetail(req *favoritePb.FavoriteDetailListReq) (r []*model.Favorite, err error) { var f []*model.Favorite - dao.DB.Where("user_id = ?", req.UserId).Find(&f) + err = dao.DB.Where("user_id = ?", req.UserId).Find(&f).Error + if err != nil { + return r, errors.Wrapf(err, "failed to query favorite, userId = %v ", req.UserId) + } for _, v := range f { - _ = dao.DB.Model(&v).Association("FavoriteDetail").Find(&v.FavoriteDetail) + err = dao.DB.Model(&v).Association("FavoriteDetail").Find(&v.FavoriteDetail) r = append(r, v) } - + if err != nil { + err = errors.Wrapf(err, "failed to query favoriteDetail") + } return } -func (dao *FavoriteDetailDao) DeleteFavoriteDetail(req *favoritePb.FavoriteDetailDeleteReq) error { +func (dao *FavoriteDetailDao) DeleteFavoriteDetail(req *favoritePb.FavoriteDetailDeleteReq) (err error) { var f model.Favorite var fd model.FavoriteDetail - dao.DB.Where("favorite_id = ?", req.FavoriteId).First(&f) - dao.DB.Where("favorite_detail_id = ?", req.FavoriteDetailId).First(&fd) - err := dao.DB.Model(&f).Association("FavoriteDetail").Delete(&fd) - return err + err = dao.DB.Where("favorite_id = ?", req.FavoriteId).First(&f).Error + if err != nil { + return errors.Wrapf(err, "failed to query favorite, favoriteId = %v ", req.FavoriteId) + } + err = dao.DB.Where("favorite_detail_id = ?", req.FavoriteDetailId).First(&fd).Error + if err != nil { + return errors.Wrapf(err, "failed to query favoriteDetail, favoriteDetailId = %v ", req.FavoriteDetailId) + } + err = dao.DB.Model(&f).Association("FavoriteDetail").Delete(&fd) + if err != nil { + return errors.Wrapf(err, "failed to delete favoriteDetail, favoriteDetailId = %v ", req.FavoriteDetailId) + } + return } diff --git a/app/favorite/internal/service/favorite.go b/app/favorite/internal/service/favorite.go index 6541717..fbc6b8c 100644 --- a/app/favorite/internal/service/favorite.go +++ b/app/favorite/internal/service/favorite.go @@ -2,6 +2,7 @@ package service import ( "context" + "github.com/pkg/errors" "sync" "github.com/CocaineCong/tangseng/app/favorite/internal/repository/db/dao" @@ -29,6 +30,7 @@ func (s *FavoriteSrv) FavoriteCreate(ctx context.Context, req *pb.FavoriteCreate resp.Code = e.SUCCESS err = dao.NewFavoriteDao(ctx).CreateFavorite(req) if err != nil { + err = errors.WithMessage(err, "dao.CreateFavorite error") resp.Error = err.Error() return } @@ -43,6 +45,7 @@ func (s *FavoriteSrv) FavoriteList(ctx context.Context, req *pb.FavoriteListReq) resp.Code = e.SUCCESS if err != nil { resp.Code = e.ERROR + err = errors.WithMessage(err, "dao.ListFavorite error") return } for i := range f { @@ -61,12 +64,13 @@ func (s *FavoriteSrv) FavoriteUpdate(ctx context.Context, req *pb.FavoriteUpdate err = dao.NewFavoriteDao(ctx).UpdateFavorite(req) if err != nil { resp.Code = e.ERROR + err = errors.WithMessage(err, "dao.UpdateFavorite error") resp.Error = err.Error() return } resp.Msg = e.GetMsg(int(resp.Code)) - return resp, nil + return } func (s *FavoriteSrv) FavoriteDelete(ctx context.Context, req *pb.FavoriteDeleteReq) (resp *pb.FavoriteCommonResponse, err error) { @@ -75,6 +79,7 @@ func (s *FavoriteSrv) FavoriteDelete(ctx context.Context, req *pb.FavoriteDelete err = dao.NewFavoriteDao(ctx).DeleteFavorite(req) if err != nil { resp.Code = e.ERROR + err = errors.WithMessage(err, "dao.DeleteFavorite error") resp.Error = err.Error() return } @@ -89,6 +94,7 @@ func (s *FavoriteSrv) FavoriteDetailCreate(ctx context.Context, req *pb.Favorite err = dao.NewFavoriteDetailDao(ctx).CreateFavoriteDetail(req) if err != nil { resp.Code = e.ERROR + err = errors.WithMessage(err, "dao.CreateFavoriteDetail error") resp.Error = err.Error() return } @@ -102,6 +108,7 @@ func (s *FavoriteSrv) FavoriteDetailDelete(ctx context.Context, req *pb.Favorite err = dao.NewFavoriteDetailDao(ctx).DeleteFavoriteDetail(req) if err != nil { resp.Code = e.ERROR + err = errors.WithMessage(err, "dao.DeleteFavoriteDetail error") resp.Error = err.Error() return } @@ -116,6 +123,7 @@ func (s *FavoriteSrv) FavoriteDetailList(ctx context.Context, req *pb.FavoriteDe fdResp, err := dao.NewFavoriteDetailDao(ctx).ListFavoriteDetail(req) if err != nil { resp.Code = e.ERROR + err = errors.WithMessage(err, "dao.ListFavoriteDetail error") return } diff --git a/app/gateway/http/favorites.go b/app/gateway/http/favorites.go index 06a444a..bb08e3c 100644 --- a/app/gateway/http/favorites.go +++ b/app/gateway/http/favorites.go @@ -1,6 +1,7 @@ package http import ( + "github.com/pkg/errors" "net/http" "github.com/gin-gonic/gin" @@ -20,14 +21,16 @@ func ListFavorite(ctx *gin.Context) { } user, err := ctl.GetUserInfo(ctx.Request.Context()) if err != nil { - log.LogrusObj.Errorf("GetUserInfo:%v", err) + log.LogrusObj.Errorf("ctl.GetUserInfo failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "获取用户信息错误")) return } req.UserId = user.Id r, err := rpc.FavoriteList(ctx, &req) if err != nil { - log.LogrusObj.Errorf("FavoriteList:%v", err) + log.LogrusObj.Errorf("rpc.FavoriteList failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "FavoriteList RPC服务调用错误")) return } @@ -44,14 +47,16 @@ func CreateFavorite(ctx *gin.Context) { } user, err := ctl.GetUserInfo(ctx.Request.Context()) if err != nil { - log.LogrusObj.Errorf("GetUserInfo:%v", err) + log.LogrusObj.Errorf("ctl.GetUserInfo failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "获取用户信息错误")) return } req.UserId = user.Id r, err := rpc.FavoriteCreate(ctx, &req) if err != nil { - log.LogrusObj.Errorf("FavoriteCreate:%v", err) + log.LogrusObj.Errorf("rpc.FavoriteCreate failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "FavoriteCreateReq RPC服务调用错误")) return } @@ -68,14 +73,16 @@ func UpdateFavorite(ctx *gin.Context) { } user, err := ctl.GetUserInfo(ctx.Request.Context()) if err != nil { - log.LogrusObj.Errorf("GetUserInfo:%v", err) + log.LogrusObj.Errorf("ctl.GetUserInfo failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "获取用户信息错误")) return } req.UserId = user.Id r, err := rpc.FavoriteCreate(ctx, &req) if err != nil { - log.LogrusObj.Errorf("FavoriteCreate:%v", err) + log.LogrusObj.Errorf("rpc.FavoriteCreate failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "UpdateFavorite RPC服务调用错误")) return } @@ -92,14 +99,16 @@ func DeleteFavorite(ctx *gin.Context) { } user, err := ctl.GetUserInfo(ctx.Request.Context()) if err != nil { - log.LogrusObj.Errorf("GetUserInfo:%v", err) + log.LogrusObj.Errorf("ctl.GetUserInfo failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "获取用户信息错误")) return } req.UserId = user.Id r, err := rpc.FavoriteDelete(ctx, &req) if err != nil { - log.LogrusObj.Errorf("FavoriteDelete:%v", err) + log.LogrusObj.Errorf("rpc.FavoriteDelete failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "DeleteFavorite RPC服务调用错误")) return } @@ -116,14 +125,16 @@ func ListFavoriteDetail(ctx *gin.Context) { } user, err := ctl.GetUserInfo(ctx.Request.Context()) if err != nil { - log.LogrusObj.Errorf("GetUserInfo:%v", err) + log.LogrusObj.Errorf("ctl.GetUserInfo failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "获取用户信息错误")) return } req.UserId = user.Id r, err := rpc.FavoriteDetailList(ctx, &req) if err != nil { - log.LogrusObj.Errorf("FavoriteDetailList:%v", err) + log.LogrusObj.Errorf("rpc.FavoriteDetailList failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "FavoriteDetailList RPC服务调用错误")) return } @@ -140,14 +151,16 @@ func CreateFavoriteDetail(ctx *gin.Context) { } user, err := ctl.GetUserInfo(ctx.Request.Context()) if err != nil { - log.LogrusObj.Errorf("GetUserInfo:%v", err) + log.LogrusObj.Errorf("ctl.GetUserInfo failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "获取用户信息错误")) return } req.UserId = user.Id r, err := rpc.FavoriteDetailCreate(ctx, &req) if err != nil { - log.LogrusObj.Errorf("FavoriteDetailCreate:%v", err) + log.LogrusObj.Errorf("rpc.FavoriteDetailCreate failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "FavoriteDetailCreate RPC服务调用错误")) return } @@ -164,14 +177,16 @@ func DeleteFavoriteDetail(ctx *gin.Context) { } user, err := ctl.GetUserInfo(ctx.Request.Context()) if err != nil { - log.LogrusObj.Errorf("GetUserInfo:%v", err) + log.LogrusObj.Errorf("ctl.GetUserInfo failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "获取用户信息错误")) return } req.UserId = user.Id r, err := rpc.FavoriteDetailDelete(ctx, &req) if err != nil { - log.LogrusObj.Errorf("FavoriteDetailDelete:%v", err) + log.LogrusObj.Errorf("rpc.FavoriteDetailDelete failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "FavoriteDetailDelete RPC服务调用错误")) return } diff --git a/app/gateway/http/index_platform.go b/app/gateway/http/index_platform.go index 261c36c..015ef3d 100644 --- a/app/gateway/http/index_platform.go +++ b/app/gateway/http/index_platform.go @@ -1,6 +1,7 @@ package http import ( + "github.com/pkg/errors" "net/http" "github.com/gin-gonic/gin" @@ -21,7 +22,8 @@ func BuildIndexByFiles(ctx *gin.Context) { r, err := rpc.BuildIndex(ctx, &req) if err != nil { - log.LogrusObj.Errorf("BuildIndexByFiles:%v", err) + log.LogrusObj.Errorf("rpc.BuildIndex failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "BuildIndexByFiles RPC服务调用错误")) return } diff --git a/app/gateway/http/search_engine.go b/app/gateway/http/search_engine.go index 60bbe14..ea80ce1 100644 --- a/app/gateway/http/search_engine.go +++ b/app/gateway/http/search_engine.go @@ -1,6 +1,7 @@ package http import ( + "github.com/pkg/errors" "net/http" "github.com/gin-gonic/gin" @@ -22,7 +23,8 @@ func SearchEngineSearch(ctx *gin.Context) { r, err := rpc.SearchEngineSearch(ctx, req) if err != nil { - log.LogrusObj.Errorf("SearchEngineSearch:%v", err) + log.LogrusObj.Errorf("rpc.SearchEngineSearch failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "SearchEngineSearch RPC服务调用错误")) return } @@ -41,7 +43,8 @@ func WordAssociation(ctx *gin.Context) { r, err := rpc.WordAssociation(ctx, req) if err != nil { - log.LogrusObj.Errorf("WordAssociation:%v", err) + log.LogrusObj.Errorf("rpc.WordAssociation failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "WordAssociation RPC服务调用错误")) return } diff --git a/app/gateway/http/search_vector.go b/app/gateway/http/search_vector.go index b38c2f5..9690b76 100644 --- a/app/gateway/http/search_vector.go +++ b/app/gateway/http/search_vector.go @@ -1,6 +1,7 @@ package http import ( + "github.com/pkg/errors" "net/http" "github.com/gin-gonic/gin" @@ -22,7 +23,8 @@ func SearchVector(ctx *gin.Context) { r, err := rpc.SearchVector(ctx, &req) if err != nil { - log.LogrusObj.Errorf("SearchVector:%v", err) + log.LogrusObj.Errorf("rpc.SearchVector failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "SearchVector RPC服务调用错误")) return } diff --git a/app/gateway/http/user.go b/app/gateway/http/user.go index e9625c4..96b3973 100644 --- a/app/gateway/http/user.go +++ b/app/gateway/http/user.go @@ -1,6 +1,7 @@ package http import ( + "github.com/pkg/errors" "net/http" "github.com/gin-gonic/gin" @@ -23,7 +24,8 @@ func UserRegister(ctx *gin.Context) { } r, err := rpc.UserRegister(ctx, &userReq) if err != nil { - log.LogrusObj.Errorf("UserRegister:%v", err) + log.LogrusObj.Errorf("rpc.UserRegister failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "UserRegister RPC服务调用错误")) return } @@ -42,14 +44,16 @@ func UserLogin(ctx *gin.Context) { userResp, err := rpc.UserLogin(ctx, &req) if err != nil { - log.LogrusObj.Errorf("RPC UserLogin:%v", err) + log.LogrusObj.Errorf("rpc.UserLogin failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "UserLogin RPC服务调用错误")) return } aToken, rToken, err := jwt.GenerateToken(userResp.UserDetail.UserId, userResp.UserDetail.UserName) if err != nil { - log.LogrusObj.Errorf("RPC GenerateToken:%v", err) + log.LogrusObj.Errorf("jwt.GenerateToken failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "加密错误")) return } diff --git a/app/gateway/rpc/favorite.go b/app/gateway/rpc/favorite.go index ff029c6..45336b2 100644 --- a/app/gateway/rpc/favorite.go +++ b/app/gateway/rpc/favorite.go @@ -2,7 +2,7 @@ package rpc import ( "context" - "errors" + "github.com/pkg/errors" "github.com/CocaineCong/tangseng/consts/e" favoritePb "github.com/CocaineCong/tangseng/idl/pb/favorite" @@ -11,10 +11,11 @@ import ( func FavoriteCreate(ctx context.Context, req *favoritePb.FavoriteCreateReq) (resp *favoritePb.FavoriteCommonResponse, err error) { resp, err = FavoriteClient.FavoriteCreate(ctx, req) if err != nil { + err = errors.WithMessage(err, "FavoriteClient.FavoriteCreate error") return } if resp.Code != e.SUCCESS { - err = errors.New(resp.Error) + err = errors.Wrap(errors.New(resp.Error), "resp.Code is not success") return } @@ -24,11 +25,12 @@ func FavoriteCreate(ctx context.Context, req *favoritePb.FavoriteCreateReq) (res func FavoriteUpdate(ctx context.Context, req *favoritePb.FavoriteUpdateReq) (resp *favoritePb.FavoriteCommonResponse, err error) { resp, err = FavoriteClient.FavoriteUpdate(ctx, req) if err != nil { + err = errors.WithMessage(err, "FavoriteClient.FavoriteUpdate error") return } if resp.Code != e.SUCCESS { - err = errors.New(resp.Error) + err = errors.Wrap(errors.New(resp.Error), "resp.Code is not success") return } @@ -38,10 +40,11 @@ func FavoriteUpdate(ctx context.Context, req *favoritePb.FavoriteUpdateReq) (res func FavoriteList(ctx context.Context, req *favoritePb.FavoriteListReq) (resp *favoritePb.FavoriteListResponse, err error) { resp, err = FavoriteClient.FavoriteList(ctx, req) if err != nil { + err = errors.WithMessage(err, "FavoriteClient.FavoriteList error") return } if resp.Code != e.SUCCESS { - err = errors.New("FavoriteList 出现错误") // TODO 整个错误 proto + err = errors.Wrap(errors.New("FavoriteList 出现错误"), "resp.Code is not success") // TODO 整个错误 proto return } @@ -51,10 +54,11 @@ func FavoriteList(ctx context.Context, req *favoritePb.FavoriteListReq) (resp *f func FavoriteDelete(ctx context.Context, req *favoritePb.FavoriteDeleteReq) (resp *favoritePb.FavoriteCommonResponse, err error) { resp, err = FavoriteClient.FavoriteDelete(ctx, req) if err != nil { + err = errors.WithMessage(err, "FavoriteClient.FavoriteDelete error") return } if resp.Code != e.SUCCESS { - err = errors.New(resp.Error) + err = errors.Wrap(errors.New(resp.Error), "resp.Code is not success") return } @@ -64,10 +68,11 @@ func FavoriteDelete(ctx context.Context, req *favoritePb.FavoriteDeleteReq) (res func FavoriteDetailList(ctx context.Context, req *favoritePb.FavoriteDetailListReq) (resp *favoritePb.FavoriteDetailListResponse, err error) { resp, err = FavoriteClient.FavoriteDetailList(ctx, req) if err != nil { + err = errors.WithMessage(err, "FavoriteClient.FavoriteDetailList error") return } if resp.Code != e.SUCCESS { - err = errors.New("出现错误") + err = errors.Wrap(errors.New("出现错误"), "resp.Code is not success") return } @@ -77,10 +82,11 @@ func FavoriteDetailList(ctx context.Context, req *favoritePb.FavoriteDetailListR func FavoriteDetailDelete(ctx context.Context, req *favoritePb.FavoriteDetailDeleteReq) (resp *favoritePb.FavoriteCommonResponse, err error) { resp, err = FavoriteClient.FavoriteDetailDelete(ctx, req) if err != nil { + err = errors.WithMessage(err, "FavoriteClient.FavoriteDetailDelete error") return } if resp.Code != e.SUCCESS { - err = errors.New(resp.Error) + err = errors.Wrap(errors.New(resp.Error), "resp.Code is not success") return } @@ -90,10 +96,11 @@ func FavoriteDetailDelete(ctx context.Context, req *favoritePb.FavoriteDetailDel func FavoriteDetailCreate(ctx context.Context, req *favoritePb.FavoriteDetailCreateReq) (resp *favoritePb.FavoriteCommonResponse, err error) { resp, err = FavoriteClient.FavoriteDetailCreate(ctx, req) if err != nil { + err = errors.WithMessage(err, "FavoriteClient.FavoriteDetailCreate error") return } if resp.Code != e.SUCCESS { - err = errors.New(resp.Error) + err = errors.Wrap(errors.New(resp.Error), "resp.Code is not success") return } diff --git a/app/gateway/rpc/index_platform.go b/app/gateway/rpc/index_platform.go index 715cb3a..4a857a0 100644 --- a/app/gateway/rpc/index_platform.go +++ b/app/gateway/rpc/index_platform.go @@ -2,16 +2,16 @@ package rpc import ( "context" + "github.com/pkg/errors" pb "github.com/CocaineCong/tangseng/idl/pb/index_platform" - log "github.com/CocaineCong/tangseng/pkg/logger" ) // BuildIndex 建立索引的RPC调用 func BuildIndex(ctx context.Context, req *pb.BuildIndexReq) (resp *pb.BuildIndexResp, err error) { resp, err = IndexPlatformClient.BuildIndexService(ctx, req) if err != nil { - log.LogrusObj.Error("BuildIndex-BuildIndexService", err) + err = errors.WithMessage(err, "IndexPlatformClient.BuildIndexService err") return } diff --git a/app/gateway/rpc/init.go b/app/gateway/rpc/init.go index fd565e4..761af8e 100644 --- a/app/gateway/rpc/init.go +++ b/app/gateway/rpc/init.go @@ -3,6 +3,7 @@ package rpc import ( "context" "fmt" + "github.com/pkg/errors" "log" "time" @@ -84,5 +85,6 @@ func connectServer(serviceName string) (conn *grpc.ClientConn, err error) { } conn, err = grpc.DialContext(ctx, addr, opts...) + err = errors.Wrapf(err, "failed to connect to gRPC service,address is %v", addr) return } diff --git a/app/gateway/rpc/search_engine.go b/app/gateway/rpc/search_engine.go index 7b77d1e..fd4b4b5 100644 --- a/app/gateway/rpc/search_engine.go +++ b/app/gateway/rpc/search_engine.go @@ -2,7 +2,7 @@ package rpc import ( "context" - "errors" + "github.com/pkg/errors" "github.com/CocaineCong/tangseng/consts/e" pb "github.com/CocaineCong/tangseng/idl/pb/search_engine" @@ -11,11 +11,12 @@ import ( func SearchEngineSearch(ctx context.Context, req *pb.SearchEngineRequest) (r *pb.SearchEngineResponse, err error) { r, err = SearchEngineClient.SearchEngineSearch(ctx, req) if err != nil { + err = errors.WithMessage(err, "SearchEngineClient.SearchEngineSearch error") return } if r.Code != e.SUCCESS { - err = errors.New(r.Msg) + err = errors.Wrap(errors.New(r.Msg), "r.Code is unsuccessful") return } @@ -25,11 +26,12 @@ func SearchEngineSearch(ctx context.Context, req *pb.SearchEngineRequest) (r *pb func WordAssociation(ctx context.Context, req *pb.SearchEngineRequest) (r *pb.WordAssociationResponse, err error) { r, err = SearchEngineClient.WordAssociation(ctx, req) if err != nil { + err = errors.WithMessage(err, "SearchEngineClient.WordAssociation error") return } if r.Code != e.SUCCESS { - err = errors.New(r.Msg) + err = errors.Wrap(errors.New(r.Msg), "r.Code is unsuccessful") return } diff --git a/app/gateway/rpc/search_vector.go b/app/gateway/rpc/search_vector.go index d607567..4e220a9 100644 --- a/app/gateway/rpc/search_vector.go +++ b/app/gateway/rpc/search_vector.go @@ -2,7 +2,7 @@ package rpc import ( "context" - "errors" + "github.com/pkg/errors" "github.com/CocaineCong/tangseng/consts/e" pb "github.com/CocaineCong/tangseng/idl/pb/search_vector" @@ -11,11 +11,12 @@ import ( func SearchVector(ctx context.Context, req *pb.SearchVectorRequest) (resp *pb.SearchVectorResponse, err error) { resp, err = SearchVectorClient.SearchVector(ctx, req) if err != nil { + err = errors.WithMessage(err, "SearchEngineClient.SearchVector error") return } if resp.Code != e.SUCCESS { - err = errors.New(resp.Msg) + err = errors.Wrap(errors.New(resp.Msg), "resp.Code is unsuccessful") return } diff --git a/app/gateway/rpc/user.go b/app/gateway/rpc/user.go index 6a79866..f8b198c 100644 --- a/app/gateway/rpc/user.go +++ b/app/gateway/rpc/user.go @@ -2,7 +2,7 @@ package rpc import ( "context" - "errors" + "github.com/pkg/errors" "github.com/CocaineCong/tangseng/consts/e" userPb "github.com/CocaineCong/tangseng/idl/pb/user" @@ -11,11 +11,12 @@ import ( func UserLogin(ctx context.Context, req *userPb.UserLoginReq) (resp *userPb.UserDetailResponse, err error) { r, err := UserClient.UserLogin(ctx, req) if err != nil { + err = errors.WithMessage(err, "UserClient.UserLogin error") return } if r.Code != e.SUCCESS { - err = errors.New("登陆失败") + err = errors.Wrap(errors.New("登陆失败"), "r.Code is unsuccessful") return } @@ -25,11 +26,12 @@ func UserLogin(ctx context.Context, req *userPb.UserLoginReq) (resp *userPb.User func UserRegister(ctx context.Context, req *userPb.UserRegisterReq) (resp *userPb.UserCommonResponse, err error) { r, err := UserClient.UserRegister(ctx, req) if err != nil { + err = errors.WithMessage(err, "UserClient.UserRegister error") return } if r.Code != e.SUCCESS { - err = errors.New(r.Msg) + err = errors.Wrap(errors.New(r.Msg), "r.Code is unsuccessful") return } diff --git a/app/index_platform/cmd/job/inverted_index_merge.go b/app/index_platform/cmd/job/inverted_index_merge.go index 963ccb9..c4dad07 100644 --- a/app/index_platform/cmd/job/inverted_index_merge.go +++ b/app/index_platform/cmd/job/inverted_index_merge.go @@ -3,6 +3,7 @@ package job import ( "context" "fmt" + "github.com/pkg/errors" "os" "sort" @@ -30,10 +31,13 @@ func MergeInvertedIndexDay2Month(ctx context.Context) (err error) { fromPaths, err := redis.ListInvertedPath(ctx, []string{invertedIndexDayKey}) if err != nil { - logs.LogrusObj.Errorln(err) + return errors.WithMessage(err, "redis.ListInvertedPath error") } - - return mergeInvertedIndex(ctx, []string{invertedIndexDayKey}, fromPaths, invertedIndexMonthKey, consts.MergeTypeInvertedIndexDay2Month) + err = mergeInvertedIndex(ctx, []string{invertedIndexDayKey}, fromPaths, invertedIndexMonthKey, consts.MergeTypeInvertedIndexDay2Month) + if err != nil { + return errors.WithMessage(err, "mergeInvertedIndex error") + } + return } // MergeInvertedIndexMonth2Season 增量合并全量, 合并完就会删掉原有的,合并到这个季度 @@ -42,21 +46,24 @@ func MergeInvertedIndexMonth2Season(ctx context.Context) (err error) { invertedIndexSeasonKey := redis.InvertedIndexDbPathSeasonKey monthKeys, err := redis.ListAllPrefixKey(ctx, invertedIndexMonthKey) if err != nil { - logs.LogrusObj.Error(err) + return errors.WithMessage(err, "redis.ListAllPrefixKey error") } // 获取所有的月份的key fromPaths, err := redis.ListInvertedIndexByPrefixKey(ctx, invertedIndexMonthKey) if err != nil { - logs.LogrusObj.Errorln(err) + return errors.WithMessage(err, "redis.ListInvertedIndexByPrefixKey error") } - - return mergeInvertedIndex(ctx, monthKeys, fromPaths, invertedIndexSeasonKey, consts.MergeTypeInvertedIndexMonth2Season) + err = mergeInvertedIndex(ctx, monthKeys, fromPaths, invertedIndexSeasonKey, consts.MergeTypeInvertedIndexMonth2Season) + if err != nil { + return errors.WithMessage(err, "mergeInvertedIndex error") + } + return } // mergeInvertedIndex fromPathKeys 所需要合并的key, fromPaths 需要合并的所有地址(就是key对应的地址),toPathKey 合并完之后的存储该地址的key,mergeType,合并类型 func mergeInvertedIndex(ctx context.Context, fromPathKeys, fromPaths []string, savePathKey string, mergeType int) (err error) { invertedIndex := cmap.New[*roaring.Bitmap]() // 倒排索引 - _, _ = mapreduce.MapReduce(func(source chan<- []*types.InvertedInfo) { + _, err = mapreduce.MapReduce(func(source chan<- []*types.InvertedInfo) { // 获取所有的inverted db for _, path := range fromPaths { invertedDb := storage.NewInvertedDB(path) @@ -83,6 +90,10 @@ func mergeInvertedIndex(ctx context.Context, fromPathKeys, fromPaths []string, s } }) + if err != nil { + return errors.WithMessage(err, "mapreduce.MapReduce error") + } + // 生成所需要存储的key storageBaseName := "" switch mergeType { @@ -117,15 +128,13 @@ func mergeInvertedIndex(ctx context.Context, fromPathKeys, fromPaths []string, s // 保存新生成的索引数据地址 err = redis.SetInvertedPath(ctx, savePathKey, outName) if err != nil { - logs.LogrusObj.Error(err) - return + return errors.WithMessage(err, "redis.SetInvertedPath error") } // 删除旧纬度数据 err = redis.BatchDeleteInvertedIndexPath(ctx, fromPathKeys) if err != nil { - logs.LogrusObj.Error(err) - return + return errors.WithMessage(err, "redis.BatchDeleteInvertedPath error") } return diff --git a/app/index_platform/cmd/job/register.go b/app/index_platform/cmd/job/register.go index 64e3343..e889c6d 100644 --- a/app/index_platform/cmd/job/register.go +++ b/app/index_platform/cmd/job/register.go @@ -2,6 +2,7 @@ package job import ( "context" + log "github.com/CocaineCong/tangseng/pkg/logger" "github.com/robfig/cron/v3" ) @@ -12,9 +13,14 @@ func RegisterJob(ctx context.Context) { // 优化搜索引擎查询的速度,毕竟搜索引擎实时性只有查询。 // 每周的周日凌晨3点,把这一周的所有天数都合并到这个月中 - _, _ = c.AddJob("0 0 3 ? * SUN", &Command{Name: "MergeInvertedIndexDay2Month", Exec: MergeInvertedIndexDay2Month, Context: ctx}) + _, err := c.AddJob("0 0 3 ? * SUN", &Command{Name: "MergeInvertedIndexDay2Month", Exec: MergeInvertedIndexDay2Month, Context: ctx}) + if err != nil { + log.LogrusObj.Errorln(err) + } // 每个月的最后一天的凌晨5点20分,把这一个月的索引数据都合并到这个季度中 - _, _ = c.AddJob("0 20 5 L * ?", &Command{Name: "MergeInvertedIndexMonth2Season", Exec: MergeInvertedIndexMonth2Season, Context: ctx}) - + _, err = c.AddJob("0 20 5 L * ?", &Command{Name: "MergeInvertedIndexMonth2Season", Exec: MergeInvertedIndexMonth2Season, Context: ctx}) + if err != nil { + log.LogrusObj.Errorln(err) + } c.Start() } diff --git a/app/index_platform/cmd/kfk_register/inverted_index.go b/app/index_platform/cmd/kfk_register/inverted_index.go index 6baebe1..50b1036 100644 --- a/app/index_platform/cmd/kfk_register/inverted_index.go +++ b/app/index_platform/cmd/kfk_register/inverted_index.go @@ -2,7 +2,8 @@ package kfk_register import ( "context" - "fmt" + log "github.com/CocaineCong/tangseng/pkg/logger" + "github.com/pkg/errors" "github.com/CocaineCong/tangseng/consts" "github.com/CocaineCong/tangseng/pkg/kfk/consume" @@ -11,6 +12,7 @@ import ( func RunInvertedIndex(ctx context.Context) { err := consume.ForwardIndexKafkaConsume(ctx, consts.KafkaCSVLoaderTopic, consts.KafkaCSVLoaderGroupId, consts.KafkaAssignorRoundRobin) if err != nil { - fmt.Println("RunInvertedIndex-ForwardIndexKafkaConsume err :", err) + log.LogrusObj.Errorf("consume.ForwardIndexKafkaConsume failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) } } diff --git a/app/index_platform/cmd/kfk_register/trie_tree.go b/app/index_platform/cmd/kfk_register/trie_tree.go index 63c5551..27727ca 100644 --- a/app/index_platform/cmd/kfk_register/trie_tree.go +++ b/app/index_platform/cmd/kfk_register/trie_tree.go @@ -2,15 +2,16 @@ package kfk_register import ( "context" - "fmt" - "github.com/CocaineCong/tangseng/consts" "github.com/CocaineCong/tangseng/pkg/kfk/consume" + log "github.com/CocaineCong/tangseng/pkg/logger" + "github.com/pkg/errors" ) func RunTireTree(ctx context.Context) { err := consume.TrieTreeKafkaConsume(ctx, consts.KafkaTrieTreeTopic, consts.KafkaTrieTreeGroupId, consts.KafkaAssignorRoundRobin) if err != nil { - fmt.Println("RunTireTree-TrieTreeKafkaConsume :", err) + log.LogrusObj.Errorf("consume.TrieTreeKafkaConsume failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) } } diff --git a/app/index_platform/cmd/main.go b/app/index_platform/cmd/main.go index 58a3d84..327ae27 100644 --- a/app/index_platform/cmd/main.go +++ b/app/index_platform/cmd/main.go @@ -2,7 +2,7 @@ package main import ( "context" - "fmt" + "github.com/pkg/errors" "net" "github.com/sirupsen/logrus" @@ -46,7 +46,9 @@ func main() { panic(err) } if _, err = etcdRegister.Register(node, 10); err != nil { - panic(fmt.Sprintf("start service failed, err: %v", err)) + logs.LogrusObj.Errorf("start service failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + logs.LogrusObj.Errorf("stack trace: \n%+v\n", err) + panic(err) } logrus.Info("service started listen on ", grpcAddress) if err = server.Serve(lis); err != nil { diff --git a/app/index_platform/input_data/data2kfk.go b/app/index_platform/input_data/data2kfk.go index 07550da..7e30682 100644 --- a/app/index_platform/input_data/data2kfk.go +++ b/app/index_platform/input_data/data2kfk.go @@ -3,8 +3,8 @@ package input_data import ( "github.com/CocaineCong/tangseng/consts" "github.com/CocaineCong/tangseng/pkg/kfk" - logs "github.com/CocaineCong/tangseng/pkg/logger" "github.com/CocaineCong/tangseng/types" + "github.com/pkg/errors" ) // DocData2Kfk Doc数据处理 @@ -12,8 +12,7 @@ func DocData2Kfk(doc *types.Document) (err error) { doctByte, _ := doc.MarshalJSON() err = kfk.KafkaProducer(consts.KafkaCSVLoaderTopic, doctByte) if err != nil { - logs.LogrusObj.Errorf("DocData2Kfk-KafkaCSVLoaderTopic :%+v", err) - return + return errors.WithMessagef(err, "DocData2Kfk-KafkaCSVLoaderTopic :%v", err) } return @@ -26,8 +25,7 @@ func DocTrie2Kfk(tokens []string) (err error) { } if err != nil { - logs.LogrusObj.Errorf("DocTrie2Kfk-KafkaTrieTreeTopic :%+v", err) - return + return errors.WithMessagef(err, "DocTrie2Kfk-KafkaTrieTreeTopic :%v", err) } return diff --git a/app/index_platform/repository/db/dao/data2mysql.go b/app/index_platform/repository/db/dao/data2mysql.go index 41435c8..dd00dd4 100644 --- a/app/index_platform/repository/db/dao/data2mysql.go +++ b/app/index_platform/repository/db/dao/data2mysql.go @@ -2,6 +2,7 @@ package dao import ( "context" + "github.com/pkg/errors" "sync" "time" @@ -99,7 +100,7 @@ func (d *MySqlDirectUpload) StreamUpload() (count int, err error) { err = NewInputDataDao(d.ctx).BatchCreateInputData(d.upData) if err != nil { - log.LogrusObj.Error("BatchCreateInputData ", err) + return count, errors.WithMessage(err, "BatchCreateInputData error") } // 重制 updata diff --git a/app/index_platform/repository/db/dao/input_data.go b/app/index_platform/repository/db/dao/input_data.go index 443ccfe..1814a28 100644 --- a/app/index_platform/repository/db/dao/input_data.go +++ b/app/index_platform/repository/db/dao/input_data.go @@ -2,6 +2,7 @@ package dao import ( "context" + "github.com/pkg/errors" "gorm.io/gorm" @@ -19,23 +20,35 @@ func NewInputDataDao(ctx context.Context) *InputDataDao { } func (d *InputDataDao) CreateInputData(in *model.InputData) (err error) { - return d.DB.Model(&model.InputData{}).Create(&in).Error + err = d.DB.Model(&model.InputData{}).Create(&in).Error + if err != nil { + return errors.Wrap(err, "failed to create inputData") + } + return } func (d *InputDataDao) BatchCreateInputData(in []*model.InputData) (err error) { - return d.DB.Model(&model.InputData{}).CreateInBatches(&in, consts.BatchCreateSize).Error + err = d.DB.Model(&model.InputData{}).CreateInBatches(&in, consts.BatchCreateSize).Error + if err != nil { + return errors.Wrap(err, "failed to batch create inputData") + } + return } func (d *InputDataDao) ListInputData() (in []*model.InputData, err error) { err = d.DB.Model(&model.InputData{}).Where("is_index = ?", false). Find(&in).Error - + if err != nil { + err = errors.Wrap(err, "failed to query inputData") + } return } func (d *InputDataDao) UpdateInputDataByIds(ids []int64) (err error) { err = d.DB.Model(&model.InputData{}).Where("id IN ?", ids). Update("is_index", true).Error - + if err != nil { + err = errors.Wrap(err, "failed to update inputData") + } return } diff --git a/app/index_platform/repository/starrock/bi_dao/input_data.go b/app/index_platform/repository/starrock/bi_dao/input_data.go index b70170f..b078f97 100644 --- a/app/index_platform/repository/starrock/bi_dao/input_data.go +++ b/app/index_platform/repository/starrock/bi_dao/input_data.go @@ -2,6 +2,7 @@ package bi_dao import ( "context" + "github.com/pkg/errors" "gorm.io/gorm" @@ -21,5 +22,5 @@ func (dao *StarRocksDao) ListDataRocks() (r []*types.Data2Starrocks, err error) sql := "SELECT * FROM input_data" err = dao.DB.Raw(sql).Find(&r).Error - return + return r, errors.Wrap(err, "failed to find data") } diff --git a/app/index_platform/repository/starrock/starrocks.go b/app/index_platform/repository/starrock/starrocks.go index 70e705b..0a1d630 100644 --- a/app/index_platform/repository/starrock/starrocks.go +++ b/app/index_platform/repository/starrock/starrocks.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "github.com/pkg/errors" "net/http" "strings" "sync" @@ -104,12 +105,12 @@ func (d *DirectUpload) StreamUpload() (count int, err error) { }, ",") _, err = write.WriteString(line + rowDelimiter) if err != nil { - log.LogrusObj.Errorf("WriteString Error") + return count, errors.Wrap(err, "WriteString Error") } } err = write.Flush() if err != nil { - log.LogrusObj.Errorf("write.Flush() :%+v", err) + return count, errors.Wrapf(err, "write.Flush() :%v", err) } // check 机制 @@ -119,7 +120,7 @@ func (d *DirectUpload) StreamUpload() (count int, err error) { req.Header = v.Header req.Body, err = v.GetBody() if err != nil { - log.LogrusObj.Errorf("starrock woker") + err = errors.Wrapf(err, "starrock woker") } return err }, @@ -144,7 +145,7 @@ func (d *DirectUpload) StreamUpload() (count int, err error) { }).SetBody(sb.Bytes()).SetContentLength(true). Put("https://{host}/api/{db}/{table}/_stream_load") if err != nil { - log.LogrusObj.Errorf("stream load error :%+v", err) + err = errors.Wrap(err, "stream load error") } if hp.StatusCode() != http.StatusOK { diff --git a/app/index_platform/repository/storage/bolt.go b/app/index_platform/repository/storage/bolt.go index 1d2f0c7..3c753ad 100644 --- a/app/index_platform/repository/storage/bolt.go +++ b/app/index_platform/repository/storage/bolt.go @@ -1,6 +1,7 @@ package storage import ( + "github.com/pkg/errors" bolt "go.etcd.io/bbolt" ) @@ -9,9 +10,9 @@ func Put(db *bolt.DB, bucket string, key []byte, value []byte) error { return db.Update(func(tx *bolt.Tx) error { b, err := tx.CreateBucketIfNotExists([]byte(bucket)) if err != nil { - return err + return errors.Wrap(err, "failed to create bucket") } - return b.Put(key, value) + return errors.Wrap(b.Put(key, value), "failed to put data") }) } @@ -27,5 +28,5 @@ func Get(db *bolt.DB, bucket string, key []byte) (r []byte, err error) { return }) - return + return r, errors.Wrap(err, "failed to get data") } diff --git a/app/index_platform/repository/storage/inverted_db.go b/app/index_platform/repository/storage/inverted_db.go index 3c8bc24..3be28e9 100644 --- a/app/index_platform/repository/storage/inverted_db.go +++ b/app/index_platform/repository/storage/inverted_db.go @@ -1,8 +1,7 @@ package storage import ( - "errors" - "fmt" + "github.com/pkg/errors" "os" "github.com/RoaringBitmap/roaring" @@ -44,17 +43,21 @@ func NewInvertedDB(invertedName string) *InvertedDB { // StoragePostings 存储 倒排索引表 func (t *InvertedDB) StoragePostings(token string, values []byte) (err error) { - return t.PutInverted([]byte(token), values) + err = t.PutInverted([]byte(token), values) + return errors.WithMessage(err, "putInverted error") } // PutInverted 插入term -func (t *InvertedDB) PutInverted(key, value []byte) error { - return Put(t.db, consts.InvertedBucket, key, value) +func (t *InvertedDB) PutInverted(key, value []byte) (err error) { + err = Put(t.db, consts.InvertedBucket, key, value) + return errors.WithMessage(err, "put error") } // GetInverted 通过term获取value func (t *InvertedDB) GetInverted(key []byte) (value []byte, err error) { - return Get(t.db, consts.InvertedBucket, key) + value, err = Get(t.db, consts.InvertedBucket, key) + err = errors.WithMessage(err, "get error") + return } func (t *InvertedDB) GetAllInverted() (p []*types.InvertedInfo, err error) { @@ -65,15 +68,19 @@ func (t *InvertedDB) GetAllInverted() (p []*types.InvertedInfo, err error) { func (t *InvertedDB) GetInvertedInfo(token string) (p *types.InvertedInfo, err error) { c, err := t.GetInverted([]byte(token)) if err != nil { + err = errors.WithMessage(err, "getInverted error") return } if len(c) == 0 { - err = errors.New("暂无此token") + err = errors.Wrap(errors.New("暂无此token"), "len(c) equal to zero") return } output := roaring.New() - _ = output.UnmarshalBinary(c) + err = output.UnmarshalBinary(c) + if err != nil { + err = errors.Wrap(err, "failed to unmarshalBinary") + } p = &types.InvertedInfo{ Token: token, DocIds: output, @@ -86,7 +93,7 @@ func (t *InvertedDB) GetInvertedDoc(offset int64, size int64) ([]byte, error) { page := os.Getpagesize() b, err := Mmap(int(t.file.Fd()), offset/int64(page), int(offset+size)) if err != nil { - return nil, fmt.Errorf("GetDocinfo Mmap err: %v", err) + return nil, errors.WithMessage(errors.Errorf("GetDocinfo Mmap err: %v", err), "mmap error") } return b[offset : offset+size], nil } diff --git a/app/index_platform/repository/storage/mmap.go b/app/index_platform/repository/storage/mmap.go index 87b7809..d01d91b 100644 --- a/app/index_platform/repository/storage/mmap.go +++ b/app/index_platform/repository/storage/mmap.go @@ -1,6 +1,7 @@ package storage import ( + "github.com/pkg/errors" "syscall" ) @@ -8,5 +9,6 @@ import ( // 映射后的内存可以像普通的字节切片一样进行读取和写入操作,而不需要额外的文件读写操作。 // 这对于处理大文件或需要频繁访问文件内容的场景非常有用,因为避免了多次磁盘读写操作,提高了性能。 func Mmap(fd int, offset int64, length int) ([]byte, error) { - return syscall.Mmap(fd, offset, length, syscall.PROT_READ, syscall.MAP_SHARED) + data, err := syscall.Mmap(fd, offset, length, syscall.PROT_READ, syscall.MAP_SHARED) + return data, errors.Wrap(err, "failed to mmap file") } diff --git a/app/index_platform/repository/storage/trie_db.go b/app/index_platform/repository/storage/trie_db.go index 727d642..ed6bbf7 100644 --- a/app/index_platform/repository/storage/trie_db.go +++ b/app/index_platform/repository/storage/trie_db.go @@ -1,6 +1,7 @@ package storage import ( + "github.com/pkg/errors" "os" bolt "go.etcd.io/bbolt" @@ -34,34 +35,39 @@ func NewTrieDB(filePath string) *TrieDB { // TODO: 先都放在一个下面吧 func (d *TrieDB) StorageDict(trieTree *trie.Trie) (err error) { trieByte, _ := trieTree.Root.Children.MarshalJSON() err = d.PutTrieTree([]byte(consts.TrieTreeBucket), trieByte) - - return + return errors.WithMessage(err, "putTrieTree error") } // GetTrieTreeInfo 获取 trie tree func (d *TrieDB) GetTrieTreeInfo() (trieTree *trie.Trie, err error) { v, err := d.GetTrieTree([]byte(consts.TrieTreeBucket)) if err != nil { + err = errors.WithMessage(err, "getTrieTree error") return } trieTree = trie.NewTrie() err = trieTree.UnmarshalJSON(v) - return + return trieTree, errors.Wrap(err, "failed to unmarshal data") } // PutTrieTree 存储 -func (d *TrieDB) PutTrieTree(key, value []byte) error { - return Put(d.db, consts.TrieTreeBucket, key, value) +func (d *TrieDB) PutTrieTree(key, value []byte) (err error) { + err = Put(d.db, consts.TrieTreeBucket, key, value) + return errors.WithMessage(err, "put error") } // GetTrieTree 通过term获取value func (d *TrieDB) GetTrieTree(key []byte) (value []byte, err error) { - return Get(d.db, consts.TrieTreeBucket, key) + value, err = Get(d.db, consts.TrieTreeBucket, key) + if err != nil { + err = errors.WithMessage(err, "get error") + } + return } // Close 关闭db func (d *TrieDB) Close() error { - return d.db.Close() + return errors.Wrap(d.db.Close(), "failed to close") } diff --git a/app/index_platform/service/index_platform.go b/app/index_platform/service/index_platform.go index 91e6e46..cf85b01 100644 --- a/app/index_platform/service/index_platform.go +++ b/app/index_platform/service/index_platform.go @@ -3,6 +3,7 @@ package service import ( "context" "fmt" + "github.com/pkg/errors" "hash/fnv" "os" "sort" @@ -142,6 +143,7 @@ func (s *IndexPlatformSrv) BuildIndexService(ctx context.Context, req *pb.BuildI err = storeDictTrieByHash(newCtx, dictTrie) if err != nil { logs.LogrusObj.Error("storeDictTrieByHash error ", err) + logs.LogrusObj.Errorf("stack trace: \n%+v\n", err) } }() @@ -170,8 +172,7 @@ func storeInvertedIndexByHash(ctx context.Context, invertedIndex cmap.Concurrent err = redis.PushInvertedPath(ctx, redis.InvertedIndexDbPathDayKey, []string{outName}) if err != nil { - logs.LogrusObj.Error(err) - return + return errors.WithMessage(err, "redis.PushInvertedPath error") } // TODO: hash 分片存储, 目前只是根据天数分库,一天的数据都放到同一个库中,感觉这样还是不太行,还是按照每小时或者ihash进行分库,以下同理 @@ -197,15 +198,13 @@ func storeDictTrieByHash(ctx context.Context, dict *trie.Trie) (err error) { trieDB := storage.NewTrieDB(outName) err = trieDB.StorageDict(dict) if err != nil { - logs.LogrusObj.Error(err) - return + return errors.WithMessage(err, "storageDict error") } _ = trieDB.Close() err = redis.PushInvertedPath(ctx, redis.TireTreeDbPathDayKey, []string{outName}) if err != nil { - logs.LogrusObj.Error(err) - return + return errors.WithMessage(err, "redis.PushInvertedPath error") } return diff --git a/app/mapreduce/cmd/main.go b/app/mapreduce/cmd/main.go index f941aa4..0df16c7 100644 --- a/app/mapreduce/cmd/main.go +++ b/app/mapreduce/cmd/main.go @@ -1,7 +1,7 @@ package main import ( - "fmt" + "github.com/pkg/errors" "net" "github.com/sirupsen/logrus" @@ -42,7 +42,8 @@ func main() { panic(err) } if _, err = etcdRegister.Register(node, 10); err != nil { - panic(fmt.Sprintf("start service failed, err: %v", err)) + logs.LogrusObj.Errorf("start service failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + logs.LogrusObj.Errorf("stack trace: \n%+v\n", err) } logrus.Info("service started listen on ", grpcAddress) if err = server.Serve(lis); err != nil { diff --git a/app/mapreduce/master/mapreduce.go b/app/mapreduce/master/mapreduce.go index 0b6fa29..c418da9 100644 --- a/app/mapreduce/master/mapreduce.go +++ b/app/mapreduce/master/mapreduce.go @@ -2,6 +2,7 @@ package master import ( "context" + "github.com/pkg/errors" "math" "sync" "time" @@ -166,6 +167,7 @@ func (m *MasterSrv) MasterTaskCompleted(ctx context.Context, req *mapreduce.MapR m.TaskMeta[int(req.TaskNumber)].TaskStatus = types.Completed err = m.processTaskResult(req) // always success haha and hope u so :) if err != nil { + err = errors.WithMessage(err, "processTaskResult error") resp.Code = e.ERROR resp.Message = "map finish failed" return diff --git a/app/mapreduce/rpc/init.go b/app/mapreduce/rpc/init.go index 996bd20..ea19e20 100644 --- a/app/mapreduce/rpc/init.go +++ b/app/mapreduce/rpc/init.go @@ -3,6 +3,8 @@ package rpc import ( "context" "fmt" + logs "github.com/CocaineCong/tangseng/pkg/logger" + "github.com/pkg/errors" "log" "time" @@ -37,6 +39,7 @@ func initClient(serviceName string, client interface{}) { conn, err := connectServer(serviceName) if err != nil { + logs.LogrusObj.Errorf("start service failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) panic(err) } @@ -61,5 +64,8 @@ func connectServer(serviceName string) (conn *grpc.ClientConn, err error) { } conn, err = grpc.DialContext(ctx, addr, opts...) + if err != nil { + err = errors.Wrap(err, "failed to connect grpc") + } return } diff --git a/app/mapreduce/rpc/mapreduce.go b/app/mapreduce/rpc/mapreduce.go index a08b2c2..1989817 100644 --- a/app/mapreduce/rpc/mapreduce.go +++ b/app/mapreduce/rpc/mapreduce.go @@ -2,16 +2,16 @@ package rpc import ( "context" + "github.com/pkg/errors" "github.com/CocaineCong/tangseng/idl/pb/mapreduce" - log "github.com/CocaineCong/tangseng/pkg/logger" ) // MasterAssignTask 通过 master 发送任务 func MasterAssignTask(ctx context.Context, taskReq *mapreduce.MapReduceTask) (resp *mapreduce.MapReduceTask, err error) { resp, err = MapReduceClient.MasterAssignTask(ctx, taskReq) if err != nil { - log.LogrusObj.Error("MasterAssignTask-MapReduceClient", err) + err = errors.WithMessage(err, "MasterAssignTask-MapReduceClient error") return } @@ -22,7 +22,7 @@ func MasterAssignTask(ctx context.Context, taskReq *mapreduce.MapReduceTask) (re func MasterTaskCompleted(ctx context.Context, task *mapreduce.MapReduceTask) (resp *mapreduce.MasterTaskCompletedResp, err error) { resp, err = MapReduceClient.MasterTaskCompleted(ctx, task) if err != nil { - log.LogrusObj.Error("MasterTaskCompleted-MapReduceClient", err) + err = errors.WithMessage(err, "MapReduceClient.MasterTaskCompleted error") return } diff --git a/app/mapreduce/woker/map.go b/app/mapreduce/woker/map.go index 0cb9466..d1c3158 100644 --- a/app/mapreduce/woker/map.go +++ b/app/mapreduce/woker/map.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/pkg/errors" "os" "path/filepath" @@ -39,7 +40,8 @@ func mapper(ctx context.Context, task *mapreduce.MapReduceTask, mapf func(string task.Intermediates = mapOutput _, err = TaskCompleted(ctx, task) if err != nil { - fmt.Println("mapper-TaskCompleted", err) + log.LogrusObj.Errorf("TaskCompleted failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) } } diff --git a/app/mapreduce/woker/worker.go b/app/mapreduce/woker/worker.go index 708b2c6..9df1cc5 100644 --- a/app/mapreduce/woker/worker.go +++ b/app/mapreduce/woker/worker.go @@ -3,6 +3,7 @@ package woker import ( "context" "fmt" + "github.com/pkg/errors" "hash/fnv" "time" @@ -21,7 +22,8 @@ func Worker(ctx context.Context, mapf func(string, string) []*types.KeyValue, re // worker从master获取任务 task, err := getTask(ctx) if err != nil { - log.LogrusObj.Error("Worker-getTask", err) + log.LogrusObj.Errorf("getTask failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + log.LogrusObj.Errorf("stack trace: \n%+v\n", err) return } fmt.Println("Worker task", task) @@ -54,12 +56,12 @@ func getTask(ctx context.Context) (resp *mapreduce.MapReduceTask, err error) { taskReq := &mapreduce.MapReduceTask{} resp, err = rpc.MasterAssignTask(ctx, taskReq) fmt.Println("getTask Resp") - + err = errors.WithMessage(err, "MasterAssignTask error") return } func TaskCompleted(ctx context.Context, task *mapreduce.MapReduceTask) (reply *mapreduce.MasterTaskCompletedResp, err error) { reply, err = rpc.MasterTaskCompleted(ctx, task) - + err = errors.WithMessage(err, "MasterTaskCompleted error") return } diff --git a/app/search_engine/cmd/main.go b/app/search_engine/cmd/main.go index 0225929..c8fb999 100644 --- a/app/search_engine/cmd/main.go +++ b/app/search_engine/cmd/main.go @@ -2,7 +2,8 @@ package main import ( "context" - "fmt" + logs "github.com/CocaineCong/tangseng/pkg/logger" + "github.com/pkg/errors" "net" "github.com/sirupsen/logrus" @@ -46,7 +47,8 @@ func main() { panic(err) } if _, err := etcdRegister.Register(node, 10); err != nil { - panic(fmt.Sprintf("start service failed, err: %v", err)) + logs.LogrusObj.Errorf("start service failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + logs.LogrusObj.Errorf("stack trace: \n%+v\n", err) } logrus.Info("service started listen on ", grpcAddress) if err := server.Serve(lis); err != nil { diff --git a/app/search_engine/ranking/tf_idf.go b/app/search_engine/ranking/tf_idf.go index 6ded2ec..f9d7c00 100644 --- a/app/search_engine/ranking/tf_idf.go +++ b/app/search_engine/ranking/tf_idf.go @@ -1,6 +1,7 @@ package ranking import ( + "github.com/pkg/errors" "math" "github.com/CocaineCong/tangseng/app/search_engine/analyzer" @@ -70,7 +71,9 @@ func CalculateScoreTFIDF(token string, searchItem []*types.SearchItem) (resp []* index++ } }) - + if err != nil { + err = errors.WithMessage(err, "mapreduce error") + } return } diff --git a/app/search_engine/repository/db/dao/input_data.go b/app/search_engine/repository/db/dao/input_data.go index 76c3024..09aa3dd 100644 --- a/app/search_engine/repository/db/dao/input_data.go +++ b/app/search_engine/repository/db/dao/input_data.go @@ -2,6 +2,7 @@ package dao import ( "context" + "github.com/pkg/errors" "gorm.io/gorm" @@ -20,17 +21,27 @@ func NewInputDataDao(ctx context.Context) *InputDataDao { } func (d *InputDataDao) CreateInputData(in *model.InputData) (err error) { - return d.DB.Model(&model.InputData{}).Create(&in).Error + err = d.DB.Model(&model.InputData{}).Create(&in).Error + if err != nil { + return errors.Wrap(err, "failed to create inputData") + } + return } func (d *InputDataDao) BatchCreateInputData(in []*model.InputData) (err error) { - return d.DB.Model(&model.InputData{}).CreateInBatches(&in, consts.BatchCreateSize).Error + err = d.DB.Model(&model.InputData{}).CreateInBatches(&in, consts.BatchCreateSize).Error + if err != nil { + return errors.Wrap(err, "failed to batch create inputData ") + } + return } func (d *InputDataDao) ListInputData() (in []*model.InputData, err error) { err = d.DB.Model(&model.InputData{}).Where("is_index = ?", false). Find(&in).Error - + if err != nil { + err = errors.Wrap(err, "failed to query inputData") + } return } @@ -44,13 +55,17 @@ func (d *InputDataDao) ListInputDataByDocIds(docIds []int64) (in []*types.Search "url," + "score AS content_score"). Find(&in).Error - + if err != nil { + err = errors.Wrap(err, "failed to query inputData by docIds") + } return } func (d *InputDataDao) UpdateInputDataByIds(ids []int64) (err error) { err = d.DB.Model(&model.InputData{}).Where("id IN ?", ids). Update("is_index", true).Error - + if err != nil { + err = errors.Wrap(err, "failed to update inputData") + } return } diff --git a/app/search_engine/repository/starrock/bi_dao/input_data.go b/app/search_engine/repository/starrock/bi_dao/input_data.go index b70170f..d27288b 100644 --- a/app/search_engine/repository/starrock/bi_dao/input_data.go +++ b/app/search_engine/repository/starrock/bi_dao/input_data.go @@ -2,6 +2,7 @@ package bi_dao import ( "context" + "github.com/pkg/errors" "gorm.io/gorm" @@ -20,6 +21,8 @@ func NewStarRocksDao(ctx context.Context) *StarRocksDao { func (dao *StarRocksDao) ListDataRocks() (r []*types.Data2Starrocks, err error) { sql := "SELECT * FROM input_data" err = dao.DB.Raw(sql).Find(&r).Error - + if err != nil { + err = errors.Wrap(err, "failed to get data") + } return } diff --git a/app/search_engine/repository/starrock/starrocks.go b/app/search_engine/repository/starrock/starrocks.go index bedb018..dc135e9 100644 --- a/app/search_engine/repository/starrock/starrocks.go +++ b/app/search_engine/repository/starrock/starrocks.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "github.com/pkg/errors" "net/http" "strings" "sync" @@ -103,12 +104,14 @@ func (d *DirectUpload) StreamUpload() (count int, err error) { }, ",") _, err = write.WriteString(line + rowDelimiter) if err != nil { - log.LogrusObj.Errorf("WriteString Error") + err = errors.Wrap(err, "failed to write string") + return } } err = write.Flush() if err != nil { - log.LogrusObj.Errorf("write.Flush() :%+v", err) + err = errors.Wrap(err, "failed to flush data") + return } // check 机制 @@ -118,7 +121,7 @@ func (d *DirectUpload) StreamUpload() (count int, err error) { req.Header = v.Header req.Body, err = v.GetBody() if err != nil { - log.LogrusObj.Errorf("starrock woker") + err = errors.Wrapf(err, "starrock woker") } return err }, @@ -143,7 +146,8 @@ func (d *DirectUpload) StreamUpload() (count int, err error) { }).SetBody(sb.Bytes()).SetContentLength(true). Put("https://{host}/api/{db}/{table}/_stream_load") if err != nil { - log.LogrusObj.Errorf("stream load error :%+v", err) + err = errors.Wrap(err, "failed to load stream") + return } if hp.StatusCode() != http.StatusOK { diff --git a/app/search_engine/repository/storage/bolt.go b/app/search_engine/repository/storage/bolt.go index 9b3fd37..a73078d 100644 --- a/app/search_engine/repository/storage/bolt.go +++ b/app/search_engine/repository/storage/bolt.go @@ -1,6 +1,7 @@ package storage import ( + "github.com/pkg/errors" bolt "go.etcd.io/bbolt" ) @@ -9,9 +10,13 @@ func Put(db *bolt.DB, bucket string, key []byte, value []byte) error { return db.Update(func(tx *bolt.Tx) error { b, err := tx.CreateBucketIfNotExists([]byte(bucket)) if err != nil { - return err + return errors.Wrap(err, "failed to create bucket") } - return b.Put(key, value) + err = b.Put(key, value) + if err != nil { + err = errors.Wrap(err, "failed to put data") + } + return err }) } @@ -29,6 +34,8 @@ func Get(db *bolt.DB, bucket string, key []byte) (r []byte, err error) { } return }) - + if err != nil { + err = errors.Wrap(err, "view error") + } return } diff --git a/app/search_engine/repository/storage/dict_db.go b/app/search_engine/repository/storage/dict_db.go index 591e040..489ab5d 100644 --- a/app/search_engine/repository/storage/dict_db.go +++ b/app/search_engine/repository/storage/dict_db.go @@ -3,6 +3,7 @@ package storage import ( "bytes" "context" + "github.com/pkg/errors" "os" bolt "go.etcd.io/bbolt" @@ -61,11 +62,12 @@ func NewTrieDB(filePath string) *TrieDB { // TODO: 先都放在一个下面吧 func (d *TrieDB) StorageDict(trieTree *trie.Trie) (err error) { tt, err := trieTree.MarshalJSON() if err != nil { + err = errors.Wrap(err, "failed marshal data") return } - - err = d.PutTrieTree([]byte(consts.TrieTreeBucket), tt) - + if err = d.PutTrieTree([]byte(consts.TrieTreeBucket), tt); err != nil { + err = errors.Wrap(err, "failed to put trie tree") + } return } @@ -73,11 +75,13 @@ func (d *TrieDB) StorageDict(trieTree *trie.Trie) (err error) { func (d *TrieDB) GetTrieTreeDict() (trieTree *trie.Trie, err error) { v, err := d.GetTrieTree([]byte(consts.TrieTreeBucket)) if err != nil { + err = errors.WithMessage(err, "getTrieTree error") return } replaced := bytes.Replace(v, []byte("children"), []byte("children_recall"), -1) node, err := trie.ParseTrieNode(string(replaced)) if err != nil { + err = errors.WithMessage(err, "ParseTrieNode error") return } @@ -88,16 +92,25 @@ func (d *TrieDB) GetTrieTreeDict() (trieTree *trie.Trie, err error) { } // PutTrieTree 存储 -func (d *TrieDB) PutTrieTree(key, value []byte) error { - return Put(d.db, consts.TrieTreeBucket, key, value) +func (d *TrieDB) PutTrieTree(key, value []byte) (err error) { + err = Put(d.db, consts.TrieTreeBucket, key, value) + if err != nil { + err = errors.WithMessage(err, "put error") + } + return } // GetTrieTree 通过term获取value func (d *TrieDB) GetTrieTree(key []byte) (value []byte, err error) { - return Get(d.db, consts.TrieTreeBucket, key) + value, err = Get(d.db, consts.TrieTreeBucket, key) + err = errors.WithMessage(err, "get error") + return } // Close 关闭db -func (d *TrieDB) Close() error { - return d.db.Close() +func (d *TrieDB) Close() (err error) { + if err = d.db.Close(); err != nil { + err = errors.WithMessage(d.db.Close(), "close error") + } + return } diff --git a/app/search_engine/repository/storage/inverted_db.go b/app/search_engine/repository/storage/inverted_db.go index 66a4de8..1222533 100644 --- a/app/search_engine/repository/storage/inverted_db.go +++ b/app/search_engine/repository/storage/inverted_db.go @@ -2,7 +2,7 @@ package storage import ( "context" - "fmt" + "github.com/pkg/errors" "os" bolt "go.etcd.io/bbolt" @@ -71,7 +71,11 @@ func NewInvertedDB(termName, postingsName string) *InvertedDB { // GetInverted 通过term获取value func (t *InvertedDB) GetInverted(key []byte) (value []byte, err error) { - return Get(t.db, consts.InvertedBucket, key) + value, err = Get(t.db, consts.InvertedBucket, key) + if err != nil { + err = errors.WithMessage(err, "get error") + } + return } // GetInvertedDoc 根据地址获取读取文件 @@ -79,7 +83,7 @@ func (t *InvertedDB) GetInvertedDoc(offset int64, size int64) ([]byte, error) { page := os.Getpagesize() b, err := Mmap(int(t.file.Fd()), offset/int64(page), int(offset+size)) if err != nil { - return nil, fmt.Errorf("GetDocinfo Mmap err: %v", err) + return nil, errors.WithMessage(err, "GetDocinfo Mmap error") } return b[offset : offset+size], nil } diff --git a/app/search_engine/repository/storage/mmap.go b/app/search_engine/repository/storage/mmap.go index 5da2bc5..fabe4b5 100644 --- a/app/search_engine/repository/storage/mmap.go +++ b/app/search_engine/repository/storage/mmap.go @@ -4,6 +4,7 @@ package storage import ( + "github.com/pkg/errors" "syscall" ) @@ -11,5 +12,6 @@ import ( // 映射后的内存可以像普通的字节切片一样进行读取和写入操作,而不需要额外的文件读写操作。 // 这对于处理大文件或需要频繁访问文件内容的场景非常有用,因为避免了多次磁盘读写操作,提高了性能。 func Mmap(fd int, offset int64, length int) ([]byte, error) { - return syscall.Mmap(fd, offset, length, syscall.PROT_READ, syscall.MAP_SHARED) + data, err := syscall.Mmap(fd, offset, length, syscall.PROT_READ, syscall.MAP_SHARED) + return data, errors.Wrap(err, "failed to mmap") } diff --git a/app/search_engine/rpc/init.go b/app/search_engine/rpc/init.go index c831d2e..cc4cd0b 100644 --- a/app/search_engine/rpc/init.go +++ b/app/search_engine/rpc/init.go @@ -3,6 +3,8 @@ package rpc import ( "context" "fmt" + logs "github.com/CocaineCong/tangseng/pkg/logger" + "github.com/pkg/errors" "log" "time" @@ -42,6 +44,7 @@ func initClient(serviceName string, client interface{}) { conn, err := connectServer(serviceName) if err != nil { + logs.LogrusObj.Errorf("start service failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) panic(err) } @@ -66,5 +69,6 @@ func connectServer(serviceName string) (conn *grpc.ClientConn, err error) { } conn, err = grpc.DialContext(ctx, addr, opts...) + err = errors.Wrap(err, "failed to connect grpc") return } diff --git a/app/search_engine/rpc/vector.go b/app/search_engine/rpc/vector.go index d607567..1da6b5b 100644 --- a/app/search_engine/rpc/vector.go +++ b/app/search_engine/rpc/vector.go @@ -2,7 +2,7 @@ package rpc import ( "context" - "errors" + "github.com/pkg/errors" "github.com/CocaineCong/tangseng/consts/e" pb "github.com/CocaineCong/tangseng/idl/pb/search_vector" @@ -11,11 +11,12 @@ import ( func SearchVector(ctx context.Context, req *pb.SearchVectorRequest) (resp *pb.SearchVectorResponse, err error) { resp, err = SearchVectorClient.SearchVector(ctx, req) if err != nil { + err = errors.Wrap(err, "failed to search vector client") return } if resp.Code != e.SUCCESS { - err = errors.New(resp.Msg) + err = errors.Wrap(errors.New(resp.Msg), "resp.Code is not success") return } diff --git a/app/search_engine/service/recall/query.go b/app/search_engine/service/recall/query.go index c8e421e..ed241d1 100644 --- a/app/search_engine/service/recall/query.go +++ b/app/search_engine/service/recall/query.go @@ -2,8 +2,8 @@ package recall import ( "context" + "github.com/pkg/errors" - log "github.com/CocaineCong/tangseng/pkg/logger" "github.com/CocaineCong/tangseng/types" ) @@ -12,7 +12,7 @@ func SearchRecall(ctx context.Context, query string) (res []*types.SearchItem, e recallService := NewRecall() res, err = recallService.Search(ctx, query) if err != nil { - log.LogrusObj.Errorf("SearchRecall-NewRecallServ:%+v", err) + err = errors.WithMessage(err, "SearchRecall-NewRecallServ error") return } @@ -24,7 +24,7 @@ func SearchQuery(query string) (res []string, err error) { recallService := NewRecall() res, err = recallService.SearchQueryWord(query) if err != nil { - log.LogrusObj.Errorf("SearchRecall-NewRecallServ:%+v", err) + err = errors.WithMessage(err, "SearchRecall-NewRecallServ error") return } diff --git a/app/search_engine/service/recall/recall.go b/app/search_engine/service/recall/recall.go index 38601b9..b552179 100644 --- a/app/search_engine/service/recall/recall.go +++ b/app/search_engine/service/recall/recall.go @@ -2,6 +2,7 @@ package recall import ( "context" + "github.com/pkg/errors" "sort" "github.com/RoaringBitmap/roaring" @@ -31,21 +32,21 @@ func NewRecall() *Recall { func (r *Recall) Search(ctx context.Context, query string) (resp []*types.SearchItem, err error) { splitQuery, err := analyzer.GseCutForRecall(query) if err != nil { - log.LogrusObj.Errorf("text2postingslists err: %v", err) + err = errors.WithMessagef(err, "text2postingslists error") return } // 倒排库搜索 res, err := r.searchDoc(ctx, splitQuery) if err != nil { - log.LogrusObj.Errorf("searchDoc err: %v", err) + err = errors.WithMessage(err, "searchDoc error") return } // 向量库搜索 vRes, err := r.SearchVector(ctx, splitQuery) if err != nil { - log.LogrusObj.Errorf("SearchVector err: %v", err) + err = errors.WithMessage(err, "searchVector error") return } @@ -92,7 +93,7 @@ func (r *Recall) SearchVector(ctx context.Context, queries []string) (docIds []i req := &pb.SearchVectorRequest{Query: queries} vectorResp, err := rpc.SearchVector(ctx, req) if err != nil { - log.LogrusObj.Errorln(err) + err = errors.WithMessage(err, "searchVector error") return } docIds = make([]int64, len(vectorResp.DocIds)) @@ -108,7 +109,7 @@ func (r *Recall) SearchQueryWord(query string) (resp []string, err error) { for _, trieDb := range storage.GlobalTrieDB { trie, errx := trieDb.GetTrieTreeDict() if errx != nil { - log.LogrusObj.Errorln(errx) + err = errors.WithMessage(errx, "GetTrieTreeDict error") continue } queryTrie := trie.FindAllByPrefixForRecall(query) @@ -129,7 +130,7 @@ func (r *Recall) searchDoc(ctx context.Context, tokens []string) (recalls []int6 // 如果缓存不存在,就去索引表里面读取 postingsList, err = fetchPostingsByToken(token) if err != nil { - log.LogrusObj.Errorln(err) + err = errors.WithMessage(err, "fetchPostingsByToken error") continue } else { // 如果缓存存在,就直接读缓存,不用担心实时性问题,缓存10分钟清空一次,这延迟是能接受到 @@ -172,7 +173,7 @@ func fetchPostingsByToken(token string) (postingsList []*types.PostingsList, err for _, inverted := range storage.GlobalInvertedDB { docIds, errx := inverted.GetInverted([]byte(token)) if errx != nil { - log.LogrusObj.Errorln(errx) + err = errors.WithMessage(err, "getInverted error") continue } output := roaring.New() diff --git a/app/search_engine/service/search_engine.go b/app/search_engine/service/search_engine.go index aef1716..7646a4f 100644 --- a/app/search_engine/service/search_engine.go +++ b/app/search_engine/service/search_engine.go @@ -2,12 +2,12 @@ package service import ( "context" + "github.com/pingcap/errors" "sync" "github.com/CocaineCong/tangseng/app/search_engine/service/recall" "github.com/CocaineCong/tangseng/consts/e" pb "github.com/CocaineCong/tangseng/idl/pb/search_engine" - log "github.com/CocaineCong/tangseng/pkg/logger" "github.com/CocaineCong/tangseng/types" ) @@ -34,7 +34,7 @@ func (s *SearchEngineSrv) SearchEngineSearch(ctx context.Context, req *pb.Search if err != nil { resp.Code = e.ERROR resp.Msg = err.Error() - log.LogrusObj.Error("SearchEngineSearch-recall.SearchRecall", err) + err = errors.WithMessage(err, "SearchEngineSearch-recall.SearchRecall error") return } @@ -42,7 +42,7 @@ func (s *SearchEngineSrv) SearchEngineSearch(ctx context.Context, req *pb.Search if err != nil { resp.Code = e.ERROR resp.Msg = err.Error() - log.LogrusObj.Error("SearchEngineSearch-BuildSearchEngineResp", err) + err = errors.WithMessage(err, "SearchEngineSearch-BuildSearchEngineResp error") return } resp.Count = int64(len(sResult)) @@ -59,7 +59,7 @@ func (s *SearchEngineSrv) WordAssociation(ctx context.Context, req *pb.SearchEng if err != nil { resp.Code = e.ERROR resp.Msg = err.Error() - log.LogrusObj.Error("SearchEngineSearch-WordAssociation", err) + err = errors.WithMessage(err, "SearchEngineSearch-WordAssociation error") return } resp.WordAssociationList = associationList diff --git a/app/user/cmd/main.go b/app/user/cmd/main.go index cc02aaf..56b4a27 100644 --- a/app/user/cmd/main.go +++ b/app/user/cmd/main.go @@ -1,7 +1,8 @@ package main import ( - "fmt" + logs "github.com/CocaineCong/tangseng/pkg/logger" + "github.com/pkg/errors" "net" "github.com/sirupsen/logrus" @@ -36,7 +37,8 @@ func main() { panic(err) } if _, err := etcdRegister.Register(userNode, 10); err != nil { - panic(fmt.Sprintf("start service failed, err: %v", err)) + logs.LogrusObj.Errorf("start service failed, original error: %T %v", errors.Cause(err), errors.Cause(err)) + logs.LogrusObj.Errorf("stack trace: \n%+v\n", err) } logrus.Info("service started listen on ", grpcAddress) if err := server.Serve(lis); err != nil { diff --git a/app/user/internal/repository/db/dao/user.go b/app/user/internal/repository/db/dao/user.go index dab0a72..02694a8 100644 --- a/app/user/internal/repository/db/dao/user.go +++ b/app/user/internal/repository/db/dao/user.go @@ -2,12 +2,11 @@ package dao import ( "context" - "errors" + "github.com/pkg/errors" "gorm.io/gorm" userPb "github.com/CocaineCong/tangseng/idl/pb/user" - log "github.com/CocaineCong/tangseng/pkg/logger" "github.com/CocaineCong/tangseng/repository/mysql/db" "github.com/CocaineCong/tangseng/repository/mysql/model" ) @@ -24,7 +23,9 @@ func NewUserDao(ctx context.Context) *UserDao { func (dao *UserDao) GetUserInfo(req *userPb.UserLoginReq) (r *model.User, err error) { err = dao.Model(&model.User{}).Where("user_name=?", req.UserName). First(&r).Error - + if err != nil { + err = errors.Wrapf(err, "failed to get user info, userName = %v", req.UserName) + } return } @@ -34,7 +35,7 @@ func (dao *UserDao) CreateUser(req *userPb.UserRegisterReq) (err error) { var count int64 dao.Model(&model.User{}).Where("user_name = ?", req.UserName).Count(&count) if count != 0 { - return errors.New("UserName Exist") + return errors.Wrapf(errors.New("UserName Exist"), "failed to create user, userName = %v", req.UserName) } user = model.User{ @@ -43,8 +44,7 @@ func (dao *UserDao) CreateUser(req *userPb.UserRegisterReq) (err error) { } _ = user.SetPassword(req.Password) if err = dao.Model(&model.User{}).Create(&user).Error; err != nil { - log.LogrusObj.Error("Insert User Error:" + err.Error()) - return + return errors.Wrap(err, "failed to create user") } return diff --git a/app/user/internal/service/user.go b/app/user/internal/service/user.go index d3d76d0..f0b6b8c 100644 --- a/app/user/internal/service/user.go +++ b/app/user/internal/service/user.go @@ -2,6 +2,7 @@ package service import ( "context" + "github.com/pkg/errors" "sync" "github.com/CocaineCong/tangseng/app/user/internal/repository/db/dao" @@ -29,6 +30,7 @@ func (u *UserSrv) UserLogin(ctx context.Context, req *pb.UserLoginReq) (resp *pb r, err := dao.NewUserDao(ctx).GetUserInfo(req) if err != nil { resp.Code = e2.ERROR + err = errors.WithMessage(err, "getUserInfo error") return } resp.UserDetail = &pb.UserResp{ @@ -45,6 +47,7 @@ func (u *UserSrv) UserRegister(ctx context.Context, req *pb.UserRegisterReq) (re err = dao.NewUserDao(ctx).CreateUser(req) if err != nil { resp.Code = e2.ERROR + err = errors.WithMessage(err, "createUser error") return } resp.Data = e2.GetMsg(int(resp.Code)) diff --git a/pkg/ctl/user_info.go b/pkg/ctl/user_info.go index 5e3ac21..8775f11 100644 --- a/pkg/ctl/user_info.go +++ b/pkg/ctl/user_info.go @@ -2,7 +2,7 @@ package ctl import ( "context" - "errors" + "github.com/pkg/errors" "github.com/CocaineCong/tangseng/consts" ) @@ -15,7 +15,7 @@ type UserInfo struct { func GetUserInfo(ctx context.Context) (*UserInfo, error) { user, ok := FromContext(ctx) if !ok { - return nil, errors.New("获取用户信息错误") + return nil, errors.Wrap(errors.New("获取用户信息错误"), "FromContext error") } return user, nil } diff --git a/pkg/discovery/instance.go b/pkg/discovery/instance.go index e53f740..3741945 100644 --- a/pkg/discovery/instance.go +++ b/pkg/discovery/instance.go @@ -2,8 +2,8 @@ package discovery import ( "encoding/json" - "errors" "fmt" + "github.com/pkg/errors" "strings" "google.golang.org/grpc/resolver" @@ -31,7 +31,7 @@ func BuildRegisterPath(server Server) string { func ParseValue(value []byte) (Server, error) { server := Server{} if err := json.Unmarshal(value, &server); err != nil { - return server, err + return server, errors.Wrap(err, "failed to unmarshal value") } return server, nil @@ -41,7 +41,7 @@ func SplitPath(path string) (Server, error) { server := Server{} strs := strings.Split(path, "/") if len(strs) == 0 { - return server, errors.New("invalid path") + return server, errors.Wrap(errors.New("invalid path"), "Split error") } server.Addr = strs[len(strs)-1] diff --git a/pkg/discovery/register.go b/pkg/discovery/register.go index 0a1aba5..c1e05de 100644 --- a/pkg/discovery/register.go +++ b/pkg/discovery/register.go @@ -3,7 +3,7 @@ package discovery import ( "context" "encoding/json" - "errors" + "github.com/pkg/errors" "net/http" "strconv" "strings" @@ -41,21 +41,21 @@ func (r *Register) Register(srvInfo Server, ttl int64) (chan<- struct{}, error) var err error if strings.Split(srvInfo.Addr, ":")[0] == "" { - return nil, errors.New("invalid ip address") + return nil, errors.Wrap(errors.New("invalid ip address"), "Split error") } if r.cli, err = clientv3.New(clientv3.Config{ Endpoints: r.EtcdAddrs, DialTimeout: time.Duration(r.DialTimeout) * time.Second, }); err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to create new etcd client") } r.srvInfo = srvInfo r.srvTTL = ttl if err = r.register(); err != nil { - return nil, err + return nil, errors.WithMessage(err, "register error") } r.closeCh = make(chan struct{}) @@ -72,24 +72,26 @@ func (r *Register) register() error { // 在etcd创建一个续期的lease对象 leaseResp, err := r.cli.Grant(ctx, r.srvTTL) if err != nil { - return err + return errors.Wrap(err, "failed to create lease") } r.leasesID = leaseResp.ID // 开启自动续期KeepAlive if r.keepAliveCh, err = r.cli.KeepAlive(context.Background(), r.leasesID); err != nil { - return err + return errors.Wrap(err, "failed to establish KeepAlive for lease") } data, err := json.Marshal(r.srvInfo) if err != nil { - return err + return errors.Wrap(err, "failed to marshal srvInfo") } _, err = r.cli.Put(context.Background(), BuildRegisterPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID)) - - return err + if err != nil { + return errors.Wrap(err, "failed to write service registration data to etcd") + } + return nil } // Stop stop register @@ -100,7 +102,10 @@ func (r *Register) Stop() { // unregister 删除节点 func (r *Register) unregister() error { _, err := r.cli.Delete(context.Background(), BuildRegisterPath(r.srvInfo)) - return err + if err != nil { + return errors.Wrap(err, "failed to unregister") + } + return nil } // 监听服务地址列表的变化 @@ -151,7 +156,7 @@ func (r *Register) UpdateHandler() http.HandlerFunc { } _, err = r.cli.Put(context.Background(), BuildRegisterPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID)) - return err + return errors.WithMessage(err, "put error") } if err := update(); err != nil { @@ -167,13 +172,13 @@ func (r *Register) UpdateHandler() http.HandlerFunc { func (r *Register) GetServerInfo() (Server, error) { resp, err := r.cli.Get(context.Background(), BuildRegisterPath(r.srvInfo)) if err != nil { - return r.srvInfo, err + return r.srvInfo, errors.Wrap(err, "failed to get server info") } server := Server{} if resp.Count >= 1 { if err := json.Unmarshal(resp.Kvs[0].Value, &server); err != nil { - return server, err + return server, errors.Wrap(err, "failed to unmarshal resp") } } diff --git a/pkg/discovery/resolver.go b/pkg/discovery/resolver.go index e62f32d..073965e 100644 --- a/pkg/discovery/resolver.go +++ b/pkg/discovery/resolver.go @@ -2,6 +2,7 @@ package discovery import ( "context" + "github.com/pkg/errors" "time" "github.com/sirupsen/logrus" @@ -50,7 +51,7 @@ func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts re r.keyPrifix = BuildPrefix(Server{Name: target.Endpoint(), Version: target.URL.Host}) if _, err := r.start(); err != nil { - return nil, err + return nil, errors.WithMessage(err, "start error") } return r, nil } @@ -71,14 +72,14 @@ func (r *Resolver) start() (chan<- struct{}, error) { DialTimeout: time.Duration(r.DialTimeout) * time.Second, }) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to create new etcd client") } resolver.Register(r) r.closeCh = make(chan struct{}) if err = r.sync(); err != nil { - return nil, err + return nil, errors.WithMessage(err, "failed to sync") } go r.watch() @@ -144,7 +145,7 @@ func (r *Resolver) sync() error { defer cancel() res, err := r.cli.Get(ctx, r.keyPrifix, clientv3.WithPrefix()) if err != nil { - return err + return errors.Wrap(err, "failed to get data") } r.srvAddrsList = []resolver.Address{} @@ -156,5 +157,6 @@ func (r *Resolver) sync() error { addr := resolver.Address{Addr: info.Addr, Metadata: info.Weight} r.srvAddrsList = append(r.srvAddrsList, addr) } - return r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList}) + err = r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList}) + return errors.Wrap(err, "failed to update state") } diff --git a/pkg/jwt/jwt.go b/pkg/jwt/jwt.go index fd59d86..98feba1 100644 --- a/pkg/jwt/jwt.go +++ b/pkg/jwt/jwt.go @@ -1,7 +1,7 @@ package jwt import ( - "errors" + "github.com/pkg/errors" "time" "github.com/dgrijalva/jwt-go" @@ -33,7 +33,7 @@ func GenerateToken(id int64, username string) (accessToken, refreshToken string, // 加密并获得完整的编码后的字符串token accessToken, err = jwt.NewWithClaims(jwt.SigningMethodHS256, claims).SignedString(jwtSecret) if err != nil { - return "", "", err + return "", "", errors.Wrap(err, "failed to get accessToken") } refreshToken, err = jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.StandardClaims{ @@ -41,7 +41,7 @@ func GenerateToken(id int64, username string) (accessToken, refreshToken string, Issuer: "search-engine", }).SignedString(jwtSecret) if err != nil { - return "", "", err + return "", "", errors.Wrap(err, "failed to get refreshToken") } return accessToken, refreshToken, err @@ -57,19 +57,19 @@ func ParseToken(token string) (*Claims, error) { return claims, nil } } - return nil, err + return nil, errors.Wrap(err, "failed to parse token") } // ParseRefreshToken 验证用户token func ParseRefreshToken(aToken, rToken string) (newAToken, newRToken string, err error) { accessClaim, err := ParseToken(aToken) if err != nil { - return + return newAToken, newRToken, errors.WithMessage(err, "failed to parse accessToken") } refreshClaim, err := ParseToken(rToken) if err != nil { - return + return newAToken, newRToken, errors.WithMessage(err, "failed to parse refreshToken") } if accessClaim.ExpiresAt > time.Now().Unix() { @@ -83,5 +83,5 @@ func ParseRefreshToken(aToken, rToken string) (newAToken, newRToken string, err } // 如果两者都过期了,重新登陆 - return "", "", errors.New("身份过期,重新登陆") + return "", "", errors.WithMessage(errors.New("身份过期,重新登陆"), "failed to parse refreshToken") } diff --git a/pkg/kfk/init.go b/pkg/kfk/init.go index 5854b94..6863107 100644 --- a/pkg/kfk/init.go +++ b/pkg/kfk/init.go @@ -1,6 +1,7 @@ package kfk import ( + logs "github.com/CocaineCong/tangseng/pkg/logger" "github.com/IBM/sarama" "github.com/CocaineCong/tangseng/config" @@ -13,6 +14,7 @@ func InitKafka() { con.Producer.Return.Successes = true kafkaClient, err := sarama.NewClient(config.Conf.Kafka.Address, con) if err != nil { + logs.LogrusObj.Errorln(err) return } GobalKafka = kafkaClient diff --git a/pkg/kfk/produce.go b/pkg/kfk/produce.go index 10d32fb..3bc27ba 100644 --- a/pkg/kfk/produce.go +++ b/pkg/kfk/produce.go @@ -1,6 +1,7 @@ package kfk import ( + "github.com/pkg/errors" _ "net/http/pprof" "github.com/IBM/sarama" @@ -10,7 +11,7 @@ import ( func KafkaProducer(topic string, msg []byte) (err error) { producer, err := sarama.NewSyncProducerFromClient(GobalKafka) if err != nil { - return + return errors.Wrap(err, "failed to create Kafka producer") } message := &sarama.ProducerMessage{ Topic: topic, @@ -18,7 +19,7 @@ func KafkaProducer(topic string, msg []byte) (err error) { } _, _, err = producer.SendMessage(message) if err != nil { - return + return errors.Wrap(err, "failed to send message") } return } @@ -27,11 +28,11 @@ func KafkaProducer(topic string, msg []byte) (err error) { func KafkaProducers(messages []*sarama.ProducerMessage) (err error) { producer, err := sarama.NewSyncProducerFromClient(GobalKafka) if err != nil { - return + return errors.Wrap(err, "failed to create Kafka producer") } err = producer.SendMessages(messages) if err != nil { - return + return errors.Wrap(err, "failed to send messages") } return } diff --git a/pkg/mapreduce/mapreduce.go b/pkg/mapreduce/mapreduce.go index 7de22ad..a905070 100644 --- a/pkg/mapreduce/mapreduce.go +++ b/pkg/mapreduce/mapreduce.go @@ -2,7 +2,8 @@ package mapreduce import ( "context" - "errors" + log "github.com/CocaineCong/tangseng/pkg/logger" + "github.com/pkg/errors" "sync" "sync/atomic" ) @@ -16,8 +17,9 @@ const ( ) var ( - ErrCancelWithNil = errors.New("mapreduce cancelled with nil") - ErrReduceNoOutput = errors.New("reduce not writing value") + ErrCancelWithNil = errors.New("mapreduce cancelled with nil") + ErrReduceNoOutput = errors.New("reduce not writing value") + ErrWriteMoreThanOneProduce = errors.New("more than one element written in reducer") ) type ( @@ -52,11 +54,11 @@ type ( } ) -func MapReduce[T, U, V any](gen GenerateFunc[T], mapper MapperFunc[T, U], reducer ReducerFunc[U, V], opts ...Option) (V, error) { +func MapReduce[T, U, V any](gen GenerateFunc[T], mapper MapperFunc[T, U], reducer ReducerFunc[U, V], opts ...Option) (v V, err error) { panicChan := &onceChan{channel: make(chan any)} source := buildSource(gen, panicChan) - - return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...) + v, err = mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...) + return v, errors.WithMessage(err, "mapReduceWithPanicChan error") } func newOptions() *mapReduceOptions { @@ -82,7 +84,8 @@ func mapReduceWithPanicChan[T, U, V any](source <-chan T, panicChan *onceChan, m defer func() { // reducer can only write once, if more, panic for range output { - panic("more than one element written in reducer") + log.LogrusObj.Errorln(ErrWriteMoreThanOneProduce) + panic(ErrWriteMoreThanOneProduce) } }() @@ -153,7 +156,7 @@ func mapReduceWithPanicChan[T, U, V any](source <-chan T, panicChan *onceChan, m } } - return + return val, errors.Wrap(err, "mapReduceWithPanicChan error") } func once(fn func(error)) func(error) { diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index 4804354..ba3a6e3 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -3,6 +3,7 @@ package retry import ( "context" "fmt" + "github.com/pkg/errors" "sync" "time" ) @@ -40,7 +41,7 @@ func NewRetryOption(ctx context.Context, gapTime time.Duration, retryCount int, func (r *RetryOption) Retry(ctx context.Context, req interface{}) (resp interface{}, err error) { if r.RetryFunc == nil { - return + return resp, errors.Wrap(errors.New("RetryFunc is nil"), "failed to retry") } for i := 0; i < r.RetryCount; i++ { @@ -55,5 +56,5 @@ func (r *RetryOption) Retry(ctx context.Context, req interface{}) (resp interfac break } - return + return resp, errors.Wrap(err, "needRetry or errx is not nil") } diff --git a/pkg/trie/trie_recall.go b/pkg/trie/trie_recall.go index 8729c99..a2e0aac 100644 --- a/pkg/trie/trie_recall.go +++ b/pkg/trie/trie_recall.go @@ -2,8 +2,8 @@ package trie import ( "encoding/json" - "errors" "fmt" + "github.com/pkg/errors" ) // FindAllByPrefixForRecall 召回专用的,通过前缀获取所有的节点 @@ -52,7 +52,7 @@ func ParseTrieNode(str string) (*TrieNode, error) { var data map[string]interface{} err := json.Unmarshal([]byte(str), &data) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to unmarshal data") } node := &TrieNode{ @@ -63,12 +63,12 @@ func ParseTrieNode(str string) (*TrieNode, error) { for key, value := range data { childData, ok := value.(map[string]interface{}) if !ok { - return nil, fmt.Errorf("invalid child data for key: %s", key) + return nil, errors.Wrap(errors.Errorf("invalid child data for key: %s", key), "failed to assert type") } childNode, err := parseTrieNodeChild(childData) if err != nil { - return nil, err + return nil, errors.WithMessage(err, "parseTrieNodeChild error") } node.ChildrenRecall[key] = childNode @@ -92,18 +92,18 @@ func parseTrieNodeChild(data map[string]interface{}) (*TrieNode, error) { childrenData, ok := data["children_recall"].(map[string]interface{}) if !ok { - return nil, errors.New("invalid children data") + return nil, errors.Wrap(errors.New("invalid children data"), "failed to assert type") } for key, value := range childrenData { childData, ok := value.(map[string]interface{}) if !ok { - return nil, fmt.Errorf("invalid child data for key: %s", key) + return nil, errors.Wrap(errors.Errorf("invalid child data for key: %s", key), "failed to assert type") } childNode, err := parseTrieNodeChild(childData) if err != nil { - return nil, err + return nil, errors.WithMessage(err, "parseTrieNodeChild error") } node.ChildrenRecall[key] = childNode diff --git a/pkg/util/codec/binary_byte.go b/pkg/util/codec/binary_byte.go index 689eab8..5198b0e 100644 --- a/pkg/util/codec/binary_byte.go +++ b/pkg/util/codec/binary_byte.go @@ -4,12 +4,11 @@ import ( "bytes" "encoding/binary" "encoding/gob" - "errors" "fmt" + "github.com/pkg/errors" "github.com/bytedance/sonic" - log "github.com/CocaineCong/tangseng/pkg/logger" "github.com/CocaineCong/tangseng/types" ) @@ -20,13 +19,12 @@ func BinaryWrite(buf *bytes.Buffer, v any) (err error) { // log.Debug("docid size:", size) fmt.Println("size", size) if size <= 0 { - log.LogrusObj.Errorf("encodePostings binary.Size err,size: %v", size) - return + return errors.Wrap(errors.Errorf("encodePostings binary.Size err,size: %v", size), "binary size error") } err = binary.Write(buf, binary.LittleEndian, v) if err != nil { - fmt.Println(err) + err = errors.Wrap(err, "BinaryWrite error") } return @@ -35,13 +33,12 @@ func BinaryWrite(buf *bytes.Buffer, v any) (err error) { // GobWrite 将所有的类型 转成 bytes.Buffer 类型,易于存储// TODO change func GobWrite(v any) (buf *bytes.Buffer, err error) { if v == nil { - err = errors.New("BinaryWrite the value is nil") - return + return buf, errors.Wrap(errors.New("BinaryWrite the value is nil"), "GobWrite error") } buf = new(bytes.Buffer) enc := gob.NewEncoder(buf) if err = enc.Encode(v); err != nil { - return + err = errors.Wrap(err, "encode error") } return @@ -51,7 +48,9 @@ func GobWrite(v any) (buf *bytes.Buffer, err error) { func DecodePostings(buf []byte) (p *types.InvertedIndexValue, err error) { p = new(types.InvertedIndexValue) err = sonic.Unmarshal(buf, &p) - + if err != nil { + err = errors.Wrap(err, "unmarshal error") + } return } @@ -59,22 +58,25 @@ func DecodePostings(buf []byte) (p *types.InvertedIndexValue, err error) { func EncodePostings(postings *types.InvertedIndexValue) (buf []byte, err error) { buf, err = sonic.Marshal(postings) if err != nil { - log.LogrusObj.Errorf("sonic.Marshal err:%v,postings:%+v", err, postings) - return + err = errors.Wrap(errors.Errorf("sonic.Marshal err:%v,postings:%+v", err, postings), "marshal error") } - return } // BinaryEncoding 二进制编码 func BinaryEncoding(buf *bytes.Buffer, v any) (err error) { err = gob.NewEncoder(buf).Encode(v) + if err != nil { + err = errors.Wrap(err, "binaryEncoding error") + } return } // BinaryDecoding 二进制解码 func BinaryDecoding(buf *bytes.Buffer, v any) (err error) { err = gob.NewDecoder(buf).Decode(v) - + if err != nil { + err = errors.Wrap(err, "binaryDecoding error") + } return } diff --git a/pkg/util/se/utils.go b/pkg/util/se/utils.go index 392b92f..a2f5c52 100644 --- a/pkg/util/se/utils.go +++ b/pkg/util/se/utils.go @@ -5,6 +5,8 @@ import ( "crypto/md5" "encoding/binary" "fmt" + log "github.com/CocaineCong/tangseng/pkg/logger" + "github.com/pkg/errors" "io" "os" "path" @@ -66,26 +68,36 @@ func GetWd() string { func CopyFile(src, dst string) (int64, error) { sourceFileStat, err := os.Stat(src) if err != nil { - return 0, err + return 0, errors.Wrap(err, "os.Stat error") } if !sourceFileStat.Mode().IsRegular() { - return 0, fmt.Errorf("%s is not a regular file", src) + return 0, errors.Wrap(errors.Errorf("%s is not a regular file", src), "sourceFile IsRegular error") } source, err := os.Open(src) if err != nil { - return 0, err + return 0, errors.Wrap(err, "os.open error") } - defer source.Close() + defer func(source *os.File) { + err := source.Close() + if err != nil { + log.LogrusObj.Errorln(err) + } + }(source) destination, err := os.Create(dst) if err != nil { - return 0, err + return 0, errors.Wrap(err, "os.Create error") } - defer destination.Close() + defer func(destination *os.File) { + err := destination.Close() + if err != nil { + log.LogrusObj.Errorln(err) + } + }(destination) nBytes, err := io.Copy(destination, source) - return nBytes, err + return nBytes, errors.Wrap(err, "os.Copy error") } func ArrayUnique(arr []string) []string { diff --git a/pkg/wrappers/favorites_wrapper.go b/pkg/wrappers/favorites_wrapper.go index 2d860c8..1a35be5 100644 --- a/pkg/wrappers/favorites_wrapper.go +++ b/pkg/wrappers/favorites_wrapper.go @@ -2,6 +2,7 @@ package wrappers import ( "context" + "github.com/pkg/errors" "github.com/afex/hystrix-go/hystrix" "github.com/micro/go-micro/v2/client" @@ -21,11 +22,12 @@ func (wrapper *favoriteWrapper) Call(ctx context.Context, req client.Request, re SleepWindow: 5000, // 过多长时间,熔断器再次检测是否开启,单位毫秒ms(默认5秒) } hystrix.ConfigureCommand(cmdName, config) - return hystrix.Do(cmdName, func() error { + err := hystrix.Do(cmdName, func() error { return wrapper.Client.Call(ctx, req, resp) }, func(err error) error { return err }) + return errors.Wrap(err, "call error") } func NewFavoriteWrapper(c client.Client) client.Client { diff --git a/pkg/wrappers/user_wrapper.go b/pkg/wrappers/user_wrapper.go index f2893a6..205605c 100644 --- a/pkg/wrappers/user_wrapper.go +++ b/pkg/wrappers/user_wrapper.go @@ -2,6 +2,7 @@ package wrappers import ( "context" + "github.com/pkg/errors" "github.com/afex/hystrix-go/hystrix" "github.com/micro/go-micro/v2/client" @@ -20,11 +21,12 @@ func (wrapper *userWrapper) Call(ctx context.Context, req client.Request, resp i SleepWindow: 5000, // 过多长时间,熔断器再次检测是否开启,单位毫秒ms(默认5秒) } hystrix.ConfigureCommand(cmdName, config) - return hystrix.Do(cmdName, func() error { - return wrapper.Client.Call(ctx, req, resp) + err := hystrix.Do(cmdName, func() error { + return errors.Wrap(wrapper.Client.Call(ctx, req, resp), "failed to call") }, func(err error) error { return err }) + return errors.WithMessage(err, "call error") } // NewUserWrapper 初始化Wrapper diff --git a/repository/mysql/db/db_init.go b/repository/mysql/db/db_init.go index 797d720..0c8e532 100644 --- a/repository/mysql/db/db_init.go +++ b/repository/mysql/db/db_init.go @@ -2,7 +2,6 @@ package db import ( "context" - "fmt" "strings" "time" @@ -29,8 +28,7 @@ func InitDB() { dsn := strings.Join([]string{username, ":", password, "@tcp(", host, ":", port, ")/", database, "?charset=" + charset + "&parseTime=true"}, "") err := Database(dsn) if err != nil { - fmt.Println(err) - log.LogrusObj.Error(err) + log.LogrusObj.Errorln(err) } } @@ -55,6 +53,7 @@ func Database(connString string) error { }, }) if err != nil { + log.LogrusObj.Errorln(err) panic(err) } sqlDB, _ := db.DB() diff --git a/repository/mysql/model/user.go b/repository/mysql/model/user.go index 133dbaf..043775d 100644 --- a/repository/mysql/model/user.go +++ b/repository/mysql/model/user.go @@ -1,6 +1,7 @@ package model import ( + "github.com/pkg/errors" "golang.org/x/crypto/bcrypt" "github.com/CocaineCong/tangseng/consts" @@ -17,7 +18,7 @@ type User struct { func (user *User) SetPassword(password string) error { bytes, err := bcrypt.GenerateFromPassword([]byte(password), consts.PassWordCost) if err != nil { - return err + return errors.Wrap(err, "failed to generate password") } user.PasswordDigest = string(bytes) return nil diff --git a/repository/redis/inverted_index.go b/repository/redis/inverted_index.go index e866c31..38f3b46 100644 --- a/repository/redis/inverted_index.go +++ b/repository/redis/inverted_index.go @@ -2,6 +2,7 @@ package redis import ( "context" + "github.com/pkg/errors" "github.com/RoaringBitmap/roaring" "github.com/redis/go-redis/v9" @@ -12,7 +13,7 @@ func PushInvertedPath(ctx context.Context, key string, paths []string) (err erro for _, v := range paths { err = RedisClient.LPush(ctx, key, v).Err() if err != nil { - return err + return errors.Wrap(err, "failed to push inverted path in redis") } } @@ -23,7 +24,7 @@ func PushInvertedPath(ctx context.Context, key string, paths []string) (err erro func SetInvertedPath(ctx context.Context, key string, path string) (err error) { err = RedisClient.Set(ctx, key, path, redis.KeepTTL).Err() if err != nil { - return err + return errors.Wrap(err, "failed to set inverted path in redis") } return @@ -33,7 +34,7 @@ func SetInvertedPath(ctx context.Context, key string, path string) (err error) { func GetInvertedPath(ctx context.Context, key string) (path string, err error) { path, err = RedisClient.Get(ctx, key).Result() if err != nil { - return + return path, errors.Wrap(err, "failed to get inverted path") } return @@ -46,7 +47,7 @@ func ListInvertedPath(ctx context.Context, keys []string) (paths []string, err e case InvertedIndexDbPathDayKey, TireTreeDbPathDayKey: results := RedisClient.LRange(ctx, key, 0, -1) if err != nil { - return + return paths, errors.Wrap(err, "failed to get value") } paths = append(paths, results.Val()...) case InvertedIndexDbPathMonthKey, InvertedIndexDbPathSeasonKey, @@ -56,7 +57,7 @@ func ListInvertedPath(ctx context.Context, keys []string) (paths []string, err e results, errx := ListInvertedIndexByPrefixKey(ctx, prefixKey) if errx != nil { err = errx - return + return paths, errors.Wrap(errx, "failed to list inverted index") } paths = append(paths, results...) default: @@ -69,7 +70,11 @@ func ListInvertedPath(ctx context.Context, keys []string) (paths []string, err e // DeleteInvertedIndexPath 删除 inverted index path func DeleteInvertedIndexPath(ctx context.Context, key string) (err error) { - return RedisClient.Del(ctx, key).Err() + err = RedisClient.Del(ctx, key).Err() + if err != nil { + return errors.Wrap(err, "failed to delete inverted index path") + } + return } // BatchDeleteInvertedIndexPath 批量删除 inverted index path @@ -77,26 +82,32 @@ func BatchDeleteInvertedIndexPath(ctx context.Context, keys []string) (err error for _, key := range keys { _ = DeleteInvertedIndexPath(ctx, key) } - + if err != nil { + return errors.Wrap(err, "failed to batch delete inverted index path") + } return } // SetInvertedIndexTokenDocIds 缓存搜索过的结果 // TODO:后面嵌入LRU func SetInvertedIndexTokenDocIds(ctx context.Context, token string, docIds *roaring.Bitmap) (err error) { docIdsByte, _ := docIds.MarshalBinary() - return RedisClient.Set(ctx, getQueryTokenDocIdsKey(token), docIdsByte, QueryTokenDocIdsDefaultTimeout).Err() + err = RedisClient.Set(ctx, getQueryTokenDocIdsKey(token), docIdsByte, QueryTokenDocIdsDefaultTimeout).Err() + if err != nil { + return errors.Wrap(err, "failed to set inverted index token docIds") + } + return } // GetInvertedIndexTokenDocIds 获取缓存的结果 func GetInvertedIndexTokenDocIds(ctx context.Context, token string) (docIds *roaring.Bitmap, err error) { res, err := RedisClient.Get(ctx, getQueryTokenDocIdsKey(token)).Result() if err != nil { - return + return docIds, errors.Wrap(err, "failed to get query token docIds key from Redis") } docIds = roaring.NewBitmap() err = docIds.UnmarshalBinary([]byte(res)) if err != nil { - return + return docIds, errors.Wrap(err, "failed to unmarshal binary") } return @@ -104,14 +115,18 @@ func GetInvertedIndexTokenDocIds(ctx context.Context, token string) (docIds *roa // PushInvertedIndexToken 存储用户搜索的历史记录 docs ids // TODO:后面嵌入LRU func PushInvertedIndexToken(ctx context.Context, userId int64, token string) (err error) { - return RedisClient.LPush(ctx, getUserQueryTokenKey(userId), token).Err() + err = RedisClient.LPush(ctx, getUserQueryTokenKey(userId), token).Err() + if err != nil { + return errors.Wrap(err, "failed to push inverted index token") + } + return } // ListInvertedIndexToken 获取用户搜索的历史记录 func ListInvertedIndexToken(ctx context.Context, userId int64) (tokens []string, err error) { tokens, err = RedisClient.LRange(ctx, getUserQueryTokenKey(userId), 0, -1).Result() if err != nil { - return + return tokens, errors.Wrap(err, "failed to list inverted index token") } return @@ -122,7 +137,7 @@ func PushInvertedMonthPath(ctx context.Context, key string, paths []string) (err for _, v := range paths { err = RedisClient.LPush(ctx, key, v).Err() if err != nil { - return err + return errors.Wrap(err, "failed to push inverted month path") } } @@ -138,7 +153,9 @@ func ListInvertedIndexByPrefixKey(ctx context.Context, prefixKey string) (paths value, _ := RedisClient.Get(ctx, key).Result() paths = append(paths, value) } - + if err != nil { + err = errors.Wrap(err, "failed to get value") + } return } @@ -150,6 +167,8 @@ func ListAllPrefixKey(ctx context.Context, prefixKey string) (paths []string, er key := iter.Val() paths = append(paths, key) } - + if err != nil { + err = errors.Wrap(err, "failed to get value") + } return } diff --git a/repository/vector/faiss/model.go b/repository/vector/faiss/model.go index 085066d..02f9939 100644 --- a/repository/vector/faiss/model.go +++ b/repository/vector/faiss/model.go @@ -2,6 +2,7 @@ package faiss import ( "context" + "github.com/pkg/errors" "time" "github.com/CocaineCong/tangseng/config" @@ -22,13 +23,17 @@ func (m *FaissModel) Init(ctx context.Context) (err error) { vConfig := config.Conf.Vector client, err := NewVectorClient(ctx, vConfig.ServerAddress, time.Millisecond*time.Duration(vConfig.Timeout)) if err != nil { - return + return errors.Wrap(err, "failed to create new vector client") } m.client = client return } -func (m *FaissModel) Run(data interface{}) (interface{}, error) { - return m.client.Search(data) +func (m *FaissModel) Run(data interface{}) (resp interface{}, err error) { + resp, err = m.client.Search(data) + if err != nil { + err = errors.WithMessage(err, "search error") + } + return } diff --git a/repository/vector/faiss/vector.go b/repository/vector/faiss/vector.go index 5d72bb2..dc3e25d 100644 --- a/repository/vector/faiss/vector.go +++ b/repository/vector/faiss/vector.go @@ -2,13 +2,13 @@ package faiss import ( "context" + "github.com/pkg/errors" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" pb "github.com/CocaineCong/tangseng/idl/pb/vector_retrieval" - logs "github.com/CocaineCong/tangseng/pkg/logger" ) type VectorClient struct { @@ -21,7 +21,7 @@ type VectorClient struct { func NewVectorClient(ctx context.Context, address string, timeout time.Duration) (client *VectorClient, err error) { conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - return + return client, errors.Wrap(err, "failed to connect with grpc") } client = &VectorClient{ @@ -37,14 +37,13 @@ func NewVectorClient(ctx context.Context, address string, timeout time.Duration) func (c *VectorClient) Search(req interface{}) (resp *pb.VectorResp, err error) { request, ok := req.(*pb.VectorReq) if !ok { - return + return resp, errors.Wrap(errors.New("unexpected request type"), "failed to assert req as pb.VectorReq") } ctx, cancl := context.WithTimeout(c.ctx, c.Timeout) defer cancl() resp, err = c.VectorClient.Search(ctx, request) if err != nil { - logs.LogrusObj.Errorln("VectorClient-Search ", err) - return + err = errors.Wrap(err, "failed to VectorClient-search") } return diff --git a/repository/vector/milvus/model.go b/repository/vector/milvus/model.go index 9fca94c..50f8f87 100644 --- a/repository/vector/milvus/model.go +++ b/repository/vector/milvus/model.go @@ -3,12 +3,12 @@ package milvus import ( "context" "fmt" + "github.com/pkg/errors" "time" "github.com/milvus-io/milvus-sdk-go/v2/client" "github.com/CocaineCong/tangseng/config" - logs "github.com/CocaineCong/tangseng/pkg/logger" ) type MilvusModel struct { @@ -27,8 +27,7 @@ func (m *MilvusModel) Init() (err error) { defer cancel() milvusClient, err := client.NewGrpcClient(ctx, fmt.Sprintf("%s:%s", mConfig.Host, mConfig.Port)) if err != nil { - logs.LogrusObj.Errorln(err) - return + return errors.Wrap(err, "failed to create new grpc client") } m.client = milvusClient @@ -38,6 +37,7 @@ func (m *MilvusModel) Init() (err error) { func (m *MilvusModel) Search(req interface{}) (resp interface{}, err error) { request, ok := req.(*MilvusRequest) if !ok { + err = errors.Wrap(errors.New("unexpected request type"), "failed to assert req as MilvusRequest") return }