Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion backend/domain/knowledge/entity/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,15 @@ type WhereSliceOpt struct {
DocumentID int64
DocumentIDs []int64
Keyword *string
Sequence int64
PageSize int64
Offset int64
NotEmpty *bool
}

type WherePhotoSliceOpt struct {
KnowledgeID int64
DocumentIDs []int64
Limit *int
Offset *int
HasCaption *bool
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,11 @@ func (dao *KnowledgeDocumentSliceDAO) FindSliceByCondition(ctx context.Context,

if opts.PageSize != 0 {
do = do.Limit(int(opts.PageSize))
do = do.Offset(int(opts.Sequence)).Order(s.Sequence.Asc())
}
if opts.Offset != 0 {
do = do.Offset(int(opts.Offset))
}
do = do.Order(s.Sequence.Asc())
if opts.NotEmpty != nil {
if ptr.From(opts.NotEmpty) {
do = do.Where(s.Content.Neq(""))
Expand Down Expand Up @@ -319,3 +322,44 @@ func (dao *KnowledgeDocumentSliceDAO) GetLastSequence(ctx context.Context, docum
}
return resp.Sequence, nil
}

func (dao *KnowledgeDocumentSliceDAO) ListPhotoSlice(ctx context.Context, opts *entity.WherePhotoSliceOpt) ([]*model.KnowledgeDocumentSlice, int64, error) {
s := dao.Query.KnowledgeDocumentSlice
do := s.WithContext(ctx)
if opts.KnowledgeID != 0 {
do = do.Where(s.KnowledgeID.Eq(opts.KnowledgeID))
}
if len(opts.DocumentIDs) != 0 {
do = do.Where(s.DocumentID.In(opts.DocumentIDs...))
}
if ptr.From(opts.Limit) != 0 {
do = do.Limit(int(ptr.From(opts.Limit)))
}
if ptr.From(opts.Offset) != 0 {
do = do.Offset(int(ptr.From(opts.Offset)))
}
if opts.HasCaption != nil {
if ptr.From(opts.HasCaption) {
do = do.Where(s.Content.Neq(""))
} else {
do = do.Where(s.Content.Eq(""))
}
}
do = do.Order(s.UpdatedAt.Desc())
pos, err := do.Find()
if err != nil {
return nil, 0, err
}
total, err := do.Limit(-1).Offset(-1).Count()
if err != nil {
return nil, 0, err
}
return pos, total, nil
}

func (dao *KnowledgeDocumentSliceDAO) BatchCreateWithTX(ctx context.Context, tx *gorm.DB, slices []*model.KnowledgeDocumentSlice) error {
if len(slices) == 0 {
return nil
}
return tx.WithContext(ctx).Debug().Model(&model.KnowledgeDocumentSlice{}).CreateInBatches(slices, 100).Error
}
39 changes: 31 additions & 8 deletions backend/domain/knowledge/processor/impl/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ type baseDocProcessor struct {
documentSource *entity.DocumentSource

// Drop DB model
TableName string
docModels []*model.KnowledgeDocument
TableName string
docModels []*model.KnowledgeDocument
imageSlices []*model.KnowledgeDocumentSlice

storage storage.Storage
knowledgeRepo repository.KnowledgeRepo
Expand All @@ -69,14 +70,14 @@ func (p *baseDocProcessor) BeforeCreate() error {

func (p *baseDocProcessor) BuildDBModel() error {
p.docModels = make([]*model.KnowledgeDocument, 0, len(p.Documents))
ids, err := p.idgen.GenMultiIDs(p.ctx, len(p.Documents))
if err != nil {
logs.CtxErrorf(p.ctx, "gen ids failed, err: %v", err)
return errorx.New(errno.ErrKnowledgeIDGenCode)
}
for i := range p.Documents {
id, err := p.idgen.GenID(p.ctx)
if err != nil {
logs.CtxErrorf(p.ctx, "gen id failed, err: %v", err)
return errorx.New(errno.ErrKnowledgeIDGenCode)
}
docModel := &model.KnowledgeDocument{
ID: ids[i],
ID: id,
KnowledgeID: p.Documents[i].KnowledgeID,
Name: p.Documents[i].Name,
FileExtension: string(p.Documents[i].FileExtension),
Expand All @@ -95,6 +96,23 @@ func (p *baseDocProcessor) BuildDBModel() error {
}
p.Documents[i].ID = docModel.ID
p.docModels = append(p.docModels, docModel)
if p.Documents[i].Type == knowledge.DocumentTypeImage {
id, err := p.idgen.GenID(p.ctx)
if err != nil {
logs.CtxErrorf(p.ctx, "gen id failed, err: %v", err)
return errorx.New(errno.ErrKnowledgeIDGenCode)
}
p.imageSlices = append(p.imageSlices, &model.KnowledgeDocumentSlice{
ID: id,
KnowledgeID: p.Documents[i].KnowledgeID,
DocumentID: p.Documents[i].ID,
CreatedAt: time.Now().UnixMilli(),
UpdatedAt: time.Now().UnixMilli(),
CreatorID: p.UserID,
SpaceID: p.SpaceID,
Status: int32(knowledge.SliceStatusInit),
})
}
}

return nil
Expand Down Expand Up @@ -142,6 +160,11 @@ func (p *baseDocProcessor) InsertDBModel() (err error) {
logs.CtxErrorf(ctx, "create document failed, err: %v", err)
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", err.Error()))
}
err = p.sliceRepo.BatchCreateWithTX(ctx, tx, p.imageSlices)
if err != nil {
logs.CtxErrorf(ctx, "update knowledge failed, err: %v", err)
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", err.Error()))
}
err = p.knowledgeRepo.UpdateWithTx(ctx, tx, p.Documents[0].KnowledgeID, map[string]interface{}{
"updated_at": time.Now().UnixMilli(),
})
Expand Down
4 changes: 2 additions & 2 deletions backend/domain/knowledge/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ type KnowledgeDocumentSliceRepo interface {
Create(ctx context.Context, slice *model.KnowledgeDocumentSlice) error
Update(ctx context.Context, slice *model.KnowledgeDocumentSlice) error
Delete(ctx context.Context, slice *model.KnowledgeDocumentSlice) error

BatchCreateWithTX(ctx context.Context, tx *gorm.DB, slices []*model.KnowledgeDocumentSlice) error
BatchCreate(ctx context.Context, slices []*model.KnowledgeDocumentSlice) error
BatchSetStatus(ctx context.Context, ids []int64, status int32, reason string) error
DeleteByDocument(ctx context.Context, documentID int64) error
MGetSlices(ctx context.Context, sliceIDs []int64) ([]*model.KnowledgeDocumentSlice, error)

ListPhotoSlice(ctx context.Context, opts *entity.WherePhotoSliceOpt) ([]*model.KnowledgeDocumentSlice, int64, error)
FindSliceByCondition(ctx context.Context, opts *entity.WhereSliceOpt) (
[]*model.KnowledgeDocumentSlice, int64, error)
GetDocumentSliceIDs(ctx context.Context, docIDs []int64) (sliceIDs []int64, err error)
Expand Down
118 changes: 77 additions & 41 deletions backend/domain/knowledge/service/event_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,24 +190,26 @@ func (k *knowledgeSVC) indexDocument(ctx context.Context, event *entity.Event) (
collectionName := getCollectionName(doc.KnowledgeID)

if !doc.IsAppend {
ids, err := k.sliceRepo.GetDocumentSliceIDs(ctx, []int64{doc.ID})
if err != nil {
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("get document slice ids failed, err: %v", err)))
}
if len(ids) > 0 {
if err = k.sliceRepo.DeleteByDocument(ctx, doc.ID); err != nil {
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("delete document slice failed, err: %v", err)))
if doc.Type != knowledge.DocumentTypeImage {
ids, err := k.sliceRepo.GetDocumentSliceIDs(ctx, []int64{doc.ID})
if err != nil {
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("get document slice ids failed, err: %v", err)))
}
for _, manager := range k.searchStoreManagers {
s, err := manager.GetSearchStore(ctx, collectionName)
if err != nil {
return errorx.New(errno.ErrKnowledgeSearchStoreCode, errorx.KV("msg", fmt.Sprintf("get search store failed, err: %v", err)))
if len(ids) > 0 {
if err = k.sliceRepo.DeleteByDocument(ctx, doc.ID); err != nil {
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("delete document slice failed, err: %v", err)))
}
if err := s.Delete(ctx, slices.Transform(event.SliceIDs, func(id int64) string {
return strconv.FormatInt(id, 10)
})); err != nil {
logs.Errorf("[indexDocument] delete knowledge failed, err: %v", err)
return errorx.New(errno.ErrKnowledgeSearchStoreCode, errorx.KV("msg", fmt.Sprintf("delete search store failed, err: %v", err)))
for _, manager := range k.searchStoreManagers {
s, err := manager.GetSearchStore(ctx, collectionName)
if err != nil {
return errorx.New(errno.ErrKnowledgeSearchStoreCode, errorx.KV("msg", fmt.Sprintf("get search store failed, err: %v", err)))
}
if err := s.Delete(ctx, slices.Transform(event.SliceIDs, func(id int64) string {
return strconv.FormatInt(id, 10)
})); err != nil {
logs.Errorf("[indexDocument] delete knowledge failed, err: %v", err)
return errorx.New(errno.ErrKnowledgeSearchStoreCode, errorx.KV("msg", fmt.Sprintf("delete search store failed, err: %v", err)))
}
}
}
}
Expand Down Expand Up @@ -298,34 +300,68 @@ func (k *knowledgeSVC) indexDocument(ctx context.Context, event *entity.Event) (
seqOffset += 1
}

sliceModels := make([]*model.KnowledgeDocumentSlice, 0, len(parseResult))
for i, src := range parseResult {
now := time.Now().UnixMilli()
sliceModel := &model.KnowledgeDocumentSlice{
ID: allIDs[i],
KnowledgeID: doc.KnowledgeID,
DocumentID: doc.ID,
Content: parseResult[i].Content,
Sequence: seqOffset + float64(i),
CreatedAt: now,
UpdatedAt: now,
CreatorID: doc.CreatorID,
SpaceID: doc.SpaceID,
Status: int32(model.SliceStatusProcessing),
FailReason: "",
}
if doc.Type == knowledge.DocumentTypeTable {
sliceEntity, err := convertFn(src, doc.KnowledgeID, doc.ID, doc.CreatorID)
if doc.Type == knowledge.DocumentTypeImage {
if len(parseResult) != 0 {
slices, _, err := k.sliceRepo.FindSliceByCondition(ctx, &entity.WhereSliceOpt{DocumentID: doc.ID})
if err != nil {
logs.CtxErrorf(ctx, "[indexDocument] convert document failed, err: %v", err)
return errorx.New(errno.ErrKnowledgeSystemCode, errorx.KV("msg", fmt.Sprintf("convert document failed, err: %v", err)))
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("find slice failed, err: %v", err)))
}
var slice *model.KnowledgeDocumentSlice
if len(slices) > 0 {
slice = slices[0]
slice.Content = parseResult[0].Content
} else {
id, err := k.idgen.GenID(ctx)
if err != nil {
return errorx.New(errno.ErrKnowledgeIDGenCode, errorx.KV("msg", fmt.Sprintf("GenID failed, err: %v", err)))
}
slice = &model.KnowledgeDocumentSlice{
ID: id,
KnowledgeID: doc.KnowledgeID,
DocumentID: doc.ID,
Content: parseResult[0].Content,
CreatedAt: time.Now().UnixMilli(),
UpdatedAt: time.Now().UnixMilli(),
CreatorID: doc.CreatorID,
SpaceID: doc.SpaceID,
Status: int32(model.SliceStatusProcessing),
FailReason: "",
}
}
if err = k.sliceRepo.Update(ctx, slice); err != nil {
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("update slice failed, err: %v", err)))
}
sliceModel.Content = sliceEntity.GetSliceContent()
}
sliceModels = append(sliceModels, sliceModel)
}
if err = k.sliceRepo.BatchCreate(ctx, sliceModels); err != nil {
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("batch create slice failed, err: %v", err)))
} else {
sliceModels := make([]*model.KnowledgeDocumentSlice, 0, len(parseResult))
for i, src := range parseResult {
now := time.Now().UnixMilli()
sliceModel := &model.KnowledgeDocumentSlice{
ID: allIDs[i],
KnowledgeID: doc.KnowledgeID,
DocumentID: doc.ID,
Content: parseResult[i].Content,
Sequence: seqOffset + float64(i),
CreatedAt: now,
UpdatedAt: now,
CreatorID: doc.CreatorID,
SpaceID: doc.SpaceID,
Status: int32(model.SliceStatusProcessing),
FailReason: "",
}
if doc.Type == knowledge.DocumentTypeTable {
sliceEntity, err := convertFn(src, doc.KnowledgeID, doc.ID, doc.CreatorID)
if err != nil {
logs.CtxErrorf(ctx, "[indexDocument] convert document failed, err: %v", err)
return errorx.New(errno.ErrKnowledgeSystemCode, errorx.KV("msg", fmt.Sprintf("convert document failed, err: %v", err)))
}
sliceModel.Content = sliceEntity.GetSliceContent()
}
sliceModels = append(sliceModels, sliceModel)
}
if err = k.sliceRepo.BatchCreate(ctx, sliceModels); err != nil {
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("batch create slice failed, err: %v", err)))
}
}

defer func() {
Expand Down
11 changes: 5 additions & 6 deletions backend/domain/knowledge/service/knowledge.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,9 +876,8 @@ func (k *knowledgeSVC) ListSlice(ctx context.Context, request *ListSliceRequest)
KnowledgeID: ptr.From(request.KnowledgeID),
DocumentID: ptr.From(request.DocumentID),
Keyword: request.Keyword,
Sequence: request.Sequence,
Offset: request.Sequence,
PageSize: request.Limit,
Offset: request.Offset,
})
if err != nil {
logs.CtxErrorf(ctx, "list slice failed, err: %v", err)
Expand Down Expand Up @@ -1375,12 +1374,12 @@ func (k *knowledgeSVC) ListPhotoSlice(ctx context.Context, request *ListPhotoSli
if request == nil {
return nil, errorx.New(errno.ErrKnowledgeInvalidParamCode, errorx.KV("msg", "request is empty"))
}
sliceArr, total, err := k.sliceRepo.FindSliceByCondition(ctx, &entity.WhereSliceOpt{
sliceArr, total, err := k.sliceRepo.ListPhotoSlice(ctx, &entity.WherePhotoSliceOpt{
KnowledgeID: request.KnowledgeID,
DocumentIDs: request.DocumentIDs,
Offset: int64(ptr.From(request.Offset)),
PageSize: int64(ptr.From(request.Limit)),
NotEmpty: request.HasCaption,
Offset: request.Offset,
Limit: request.Limit,
HasCaption: request.HasCaption,
})
if err != nil {
return nil, errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", err.Error()))
Expand Down
Loading