Skip to content

Commit 1fc701c

Browse files
kaijchenN3kox
andauthored
feat(es9): add Elasticsearch 9 indexer and retriever components (#640)
* feat(es9): add Elasticsearch 9 indexer and retriever components * add more tests * update readme --------- Co-authored-by: N3ko <xuzhaonan@bytedance.com>
1 parent b9080db commit 1fc701c

31 files changed

+15538
-0
lines changed

components/indexer/es9/README.md

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
# ES9 Indexer
2+
3+
English | [中文](README_zh.md)
4+
5+
An Elasticsearch 9.x indexer implementation for [Eino](https://github.com/cloudwego/eino) that implements the `Indexer` interface. This enables seamless integration with Eino's vector storage and retrieval system for enhanced semantic search capabilities.
6+
7+
## Features
8+
9+
- Implements `github.com/cloudwego/eino/components/indexer.Indexer`
10+
- Easy integration with Eino's indexer system
11+
- Configurable Elasticsearch parameters
12+
- Support for vector similarity search
13+
- Bulk indexing operations
14+
- Custom field mapping support
15+
- Flexible document vectorization
16+
17+
## Installation
18+
19+
```bash
20+
go get github.com/cloudwego/eino-ext/components/indexer/es9@latest
21+
```
22+
23+
## Quick Start
24+
25+
Here's a quick example of how to use the indexer, you could read components/indexer/es9/examples/indexer/add_documents.go for more details:
26+
27+
```go
28+
import (
29+
"context"
30+
"fmt"
31+
"log"
32+
"os"
33+
34+
"github.com/cloudwego/eino/components/embedding"
35+
"github.com/cloudwego/eino/schema"
36+
"github.com/elastic/go-elasticsearch/v9"
37+
38+
"github.com/cloudwego/eino-ext/components/embedding/ark"
39+
"github.com/cloudwego/eino-ext/components/indexer/es9"
40+
)
41+
42+
const (
43+
indexName = "eino_example"
44+
fieldContent = "content"
45+
fieldContentVector = "content_vector"
46+
fieldExtraLocation = "location"
47+
docExtraLocation = "location"
48+
)
49+
50+
func main() {
51+
ctx := context.Background()
52+
// es supports multiple ways to connect
53+
username := os.Getenv("ES_USERNAME")
54+
password := os.Getenv("ES_PASSWORD")
55+
56+
// 1. Create ES client
57+
httpCACertPath := os.Getenv("ES_HTTP_CA_CERT_PATH")
58+
var cert []byte
59+
var err error
60+
if httpCACertPath != "" {
61+
cert, err = os.ReadFile(httpCACertPath)
62+
if err != nil {
63+
log.Fatalf("read file failed, err=%v", err)
64+
}
65+
}
66+
67+
client, _ := elasticsearch.NewClient(elasticsearch.Config{
68+
Addresses: []string{"https://localhost:9200"},
69+
Username: username,
70+
Password: password,
71+
CACert: cert,
72+
})
73+
74+
// 2. Create embedding component using ARK
75+
// Replace "ARK_API_KEY", "ARK_REGION", "ARK_MODEL" with your actual config
76+
emb, _ := ark.NewEmbedder(ctx, &ark.EmbeddingConfig{
77+
APIKey: os.Getenv("ARK_API_KEY"),
78+
Region: os.Getenv("ARK_REGION"),
79+
Model: os.Getenv("ARK_MODEL"),
80+
})
81+
82+
// 3. Prepare documents
83+
// Documents usually contain at least an ID and Content.
84+
// You can also add extra metadata for filtering or other purposes.
85+
docs := []*schema.Document{
86+
{
87+
ID: "1",
88+
Content: "Eiffel Tower: Located in Paris, France.",
89+
MetaData: map[string]any{
90+
docExtraLocation: "France",
91+
},
92+
},
93+
{
94+
ID: "2",
95+
Content: "The Great Wall: Located in China.",
96+
MetaData: map[string]any{
97+
docExtraLocation: "China",
98+
},
99+
},
100+
}
101+
102+
// 4. Create ES indexer component
103+
indexer, _ := es9.NewIndexer(ctx, &es9.IndexerConfig{
104+
Client: client,
105+
Index: indexName,
106+
BatchSize: 10,
107+
// DocumentToFields specifies how to map document fields to ES fields
108+
DocumentToFields: func(ctx context.Context, doc *schema.Document) (field2Value map[string]es9.FieldValue, err error) {
109+
return map[string]es9.FieldValue{
110+
fieldContent: {
111+
Value: doc.Content,
112+
EmbedKey: fieldContentVector, // vectorize content and save to "content_vector"
113+
},
114+
fieldExtraLocation: {
115+
// Extra metadata field
116+
Value: doc.MetaData[docExtraLocation],
117+
},
118+
}, nil
119+
},
120+
// Provide the embedding component to use for vectorization
121+
Embedding: emb,
122+
})
123+
124+
// 5. Index documents
125+
ids, err := indexer.Store(ctx, docs)
126+
if err != nil {
127+
fmt.Printf("index error: %v\n", err)
128+
return
129+
}
130+
fmt.Println("indexed ids:", ids)
131+
}
132+
```
133+
134+
## Configuration
135+
136+
The indexer can be configured using the `IndexerConfig` struct:
137+
138+
```go
139+
type IndexerConfig struct {
140+
Client *elasticsearch.Client // Required: Elasticsearch client instance
141+
Index string // Required: Index name to store documents
142+
BatchSize int // Optional: Max texts size for embedding (default: 5)
143+
144+
// Required: Function to map Document fields to Elasticsearch fields
145+
DocumentToFields func(ctx context.Context, doc *schema.Document) (map[string]FieldValue, error)
146+
147+
// Optional: Required only if vectorization is needed
148+
Embedding embedding.Embedder
149+
}
150+
151+
// FieldValue defines how a field should be stored and vectorized
152+
type FieldValue struct {
153+
Value any // Original value to store
154+
EmbedKey string // If set, Value will be vectorized and saved
155+
Stringify func(val any) (string, error) // Optional: custom string conversion
156+
}
157+
```
158+
159+
## Full Examples
160+
161+
- [Indexer Example](./examples/indexer)
162+
- [Indexer with Sparse Vector Example](./examples/indexer_with_sparse_vector)
163+
164+
## For More Details
165+
166+
- [Eino Documentation](https://www.cloudwego.io/zh/docs/eino/)
167+
- [Elasticsearch Go Client Documentation](https://github.com/elastic/go-elasticsearch)
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
# ES9 Indexer
2+
3+
[English](README.md)
4+
5+
[Eino](https://github.com/cloudwego/eino) 实现的 Elasticsearch 9.x 索引器,实现了 `Indexer` 接口。这使得可以与 Eino 的向量存储和检索系统无缝集成,从而增强语义搜索能力。
6+
7+
## 功能特性
8+
9+
- 实现 `github.com/cloudwego/eino/components/indexer.Indexer`
10+
- 易于集成 Eino 的索引系统
11+
- 可配置 Elasticsearch 参数
12+
- 支持向量相似度搜索
13+
- 批量索引操作
14+
- 自定义字段映射支持
15+
- 灵活的文档向量化
16+
17+
## 安装
18+
19+
```bash
20+
go get github.com/cloudwego/eino-ext/components/indexer/es9@latest
21+
```
22+
23+
## 快速开始
24+
25+
这里是使用索引器的快速示例,更多细节请阅读 components/indexer/es9/examples/indexer/add_documents.go:
26+
27+
```go
28+
import (
29+
"context"
30+
"fmt"
31+
"log"
32+
"os"
33+
34+
"github.com/cloudwego/eino/components/embedding"
35+
"github.com/cloudwego/eino/schema"
36+
"github.com/elastic/go-elasticsearch/v9"
37+
38+
"github.com/cloudwego/eino-ext/components/embedding/ark"
39+
"github.com/cloudwego/eino-ext/components/indexer/es9"
40+
)
41+
42+
const (
43+
indexName = "eino_example"
44+
fieldContent = "content"
45+
fieldContentVector = "content_vector"
46+
fieldExtraLocation = "location"
47+
docExtraLocation = "location"
48+
)
49+
50+
func main() {
51+
ctx := context.Background()
52+
53+
// ES 支持多种连接方式
54+
username := os.Getenv("ES_USERNAME")
55+
password := os.Getenv("ES_PASSWORD")
56+
httpCACertPath := os.Getenv("ES_HTTP_CA_CERT_PATH")
57+
58+
var cert []byte
59+
var err error
60+
if httpCACertPath != "" {
61+
cert, err = os.ReadFile(httpCACertPath)
62+
if err != nil {
63+
log.Fatalf("read file failed, err=%v", err)
64+
}
65+
}
66+
67+
client, _ := elasticsearch.NewClient(elasticsearch.Config{
68+
Addresses: []string{"https://localhost:9200"},
69+
Username: username,
70+
Password: password,
71+
CACert: cert,
72+
})
73+
74+
// 2. 创建 embedding 组件 (使用 Ark)
75+
// 请将 "ARK_API_KEY", "ARK_REGION", "ARK_MODEL" 替换为实际配置
76+
emb, _ := ark.NewEmbedder(ctx, &ark.EmbeddingConfig{
77+
APIKey: os.Getenv("ARK_API_KEY"),
78+
Region: os.Getenv("ARK_REGION"),
79+
Model: os.Getenv("ARK_MODEL"),
80+
})
81+
82+
// 3. 准备文档
83+
// 文档通常包含 ID 和 Content
84+
// 也可以包含额外的 Metadata 用于过滤或其他用途
85+
docs := []*schema.Document{
86+
{
87+
ID: "1",
88+
Content: "Eiffel Tower: Located in Paris, France.",
89+
MetaData: map[string]any{
90+
docExtraLocation: "France",
91+
},
92+
},
93+
{
94+
ID: "2",
95+
Content: "The Great Wall: Located in China.",
96+
MetaData: map[string]any{
97+
docExtraLocation: "China",
98+
},
99+
},
100+
}
101+
102+
// 4. 创建 ES 索引器组件
103+
indexer, _ := es9.NewIndexer(ctx, &es9.IndexerConfig{
104+
Client: client,
105+
Index: indexName,
106+
BatchSize: 10,
107+
// DocumentToFields 指定如何将文档字段映射到 ES 字段
108+
DocumentToFields: func(ctx context.Context, doc *schema.Document) (field2Value map[string]es9.FieldValue, err error) {
109+
return map[string]es9.FieldValue{
110+
fieldContent: {
111+
Value: doc.Content,
112+
EmbedKey: fieldContentVector, // 对文档内容进行向量化并保存到 "content_vector" 字段
113+
},
114+
fieldExtraLocation: {
115+
// 额外的 metadata 字段
116+
Value: doc.MetaData[docExtraLocation],
117+
},
118+
}, nil
119+
},
120+
// 提供 embedding 组件用于向量化
121+
Embedding: emb,
122+
})
123+
124+
// 5. 索引文档
125+
ids, err := indexer.Store(ctx, docs)
126+
if err != nil {
127+
fmt.Printf("index error: %v\n", err)
128+
return
129+
}
130+
fmt.Println("indexed ids:", ids)
131+
}
132+
```
133+
134+
## 配置
135+
136+
可以使用 `IndexerConfig` 结构体配置索引器:
137+
138+
```go
139+
type IndexerConfig struct {
140+
Client *elasticsearch.Client // 必填: Elasticsearch 客户端实例
141+
Index string // 必填: 存储文档的索引名称
142+
BatchSize int // 选填: 用于 embedding 的最大文本数量 (默认: 5)
143+
144+
// 必填: 将 Document 字段映射到 Elasticsearch 字段的函数
145+
DocumentToFields func(ctx context.Context, doc *schema.Document) (map[string]FieldValue, error)
146+
147+
// 选填: 仅在需要向量化时必填
148+
Embedding embedding.Embedder
149+
}
150+
151+
// FieldValue 定义了字段应如何存储和向量化
152+
type FieldValue struct {
153+
Value any // 要存储的原始值
154+
EmbedKey string // 如果设置,Value 将被向量化并保存
155+
Stringify func(val any) (string, error) // 选填: 自定义字符串转换
156+
}
157+
```
158+
159+
## 完整示例
160+
161+
- [Indexer 示例](./examples/indexer)
162+
- [带稀疏向量的 Indexer 示例](./examples/indexer_with_sparse_vector)
163+
164+
## 更多详情
165+
166+
- [Eino 文档](https://www.cloudwego.io/zh/docs/eino/)
167+
- [Elasticsearch Go Client 文档](https://github.com/elastic/go-elasticsearch)

components/indexer/es9/consts.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2025 CloudWeGo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package es9
18+
19+
const typ = "ElasticSearch9"
20+
21+
const (
22+
defaultBatchSize = 5
23+
)

0 commit comments

Comments
 (0)