Skip to content

Commit d95ab43

Browse files
authored
feat: Redis source (#44)
* feat: Redis Signed-off-by: Anush008 <[email protected]> * docs: Updated README.md Signed-off-by: Anush008 <[email protected]> * docs: Updated README.md Signed-off-by: Anush008 <[email protected]> * test: Redis integration tests Signed-off-by: Anush008 <[email protected]> * chore: go mod tidy Signed-off-by: Anush008 <[email protected]> * test: Updated implemenation Signed-off-by: Anush008 <[email protected]> * docs: Updated README.md Signed-off-by: Anush008 <[email protected]> * chore: Removed cmd_test/ dir Signed-off-by: Anush008 <[email protected]> --------- Signed-off-by: Anush008 <[email protected]>
1 parent 304bd85 commit d95ab43

File tree

8 files changed

+521
-6
lines changed

8 files changed

+521
-6
lines changed

README.md

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ CLI tool for migrating data to [Qdrant](http://qdrant.tech) with support for res
1111
* [Pinecone](https://www.pinecone.io/)
1212
* [Milvus](https://milvus.io/)
1313
* [Weaviate](https://weaviate.io/)
14+
* [Redis](https://redis.io)
1415
* Another [Qdrant](http://qdrant.tech/) instance
1516

1617
## Installation
@@ -95,7 +96,7 @@ docker run --net=host --rm -it registry.cloud.qdrant.io/library/qdrant-migration
9596

9697
Migrate data from a **Pinecone** database to **Qdrant**:
9798

98-
> IMPORTANT ⚠️
99+
> IMPORTANT ⚠️:
99100
> Only Pinecone serverless indexes support listing all vectors for migration. [Reference](https://docs.pinecone.io/reference/api/2025-01/data-plane/list)
100101

101102
### 📥 Example
@@ -207,7 +208,7 @@ Migrate data from a **Weaviate** database to **Qdrant**:
207208
### 📥 Example
208209
209210
> Important ⚠️:
210-
> Weaviate does not expose vector dimensions and distance metric after a collection is created. [Reference](https://forum.weaviate.io/t/get-vector-dimension-of-a-collection/1769/).
211+
> Weaviate [does not expose](https://forum.weaviate.io/t/get-vector-dimension-of-a-collection/1769/) vector dimensions and distance metric after a collection is created.
211212
> Therefore, you must [manually create](https://qdrant.tech/documentation/concepts/collections/#create-a-collection) a Qdrant collection before starting the migration.
212213
> Ensure that the **vector dimensions in Qdrant exactly match** those used in Weaviate.
213214
@@ -261,6 +262,62 @@ docker run --net=host --rm -it registry.cloud.qdrant.io/library/qdrant-migration
261262
262263
</details>
263264
265+
<details>
266+
267+
<summary><h3>From Redis</h3></summary>
268+
269+
Migrate data from a **Redis** database to **Qdrant**:
270+
271+
> Important ⚠️:
272+
> Redis does not expose vector configurations after an index is created.
273+
> Therefore, you must [manually create](https://qdrant.tech/documentation/concepts/vectors/#named-vectors) a Qdrant collection before starting the migration.
274+
> Ensure that the **vector names and dimensions in Qdrant exactly match** those used in Redis.
275+
276+
### 📥 Example
277+
278+
```bash
279+
migration redis \
280+
--redis.index 'index_name' \
281+
--redis.addr 'localhost:6379' \
282+
--qdrant.url 'http://localhost:6334' \
283+
--qdrant.collection 'target-collection' \
284+
--migration.batch-size 100
285+
```
286+
287+
With Docker:
288+
289+
```bash
290+
docker run --net=host --rm -it registry.cloud.qdrant.io/library/qdrant-migration milvus \
291+
--redis.index 'index_name' \
292+
...
293+
```
294+
295+
#### Redis Options
296+
297+
| Flag | Description |
298+
| --------------------- | ----------------------------------------------------------------------- |
299+
| `--redis.index` | Redis index name |
300+
| `--redis.addr` | Redis address in the format `host:port` (default: `localhost:6379`) |
301+
| `--redis.protocol` | Redis protocol version (default: `2`) |
302+
| `--redis.password` | Password to authenticate requests. Optional. |
303+
| `--redis.username` | Username to authenticate requests. Optional. |
304+
| `--redis.client-name` | Will execute the `CLIENT SETNAME <NAME>` for each connection. Optional. |
305+
| `--redis.db` | Database to be selected after connecting to the server. Optional. |
306+
| `--redis.network` | Redis network type (`tcp` or `unix`, default: `tcp`) |
307+
308+
#### Qdrant Options
309+
310+
| Flag | Description |
311+
| ------------------------------- | --------------------------------------------------------------- |
312+
| `--qdrant.url` | Qdrant gRPC URL. Default: `"http://localhost:6334"` |
313+
| `--qdrant.collection` | Target collection name |
314+
| `--qdrant.api-key` | Qdrant API key |
315+
| `--qdrant.id-field` | Field storing Redis IDs in Qdrant. Default: `"__id__"` |
316+
317+
* See [Shared Migration Options](#shared-migration-options) for common migration parameters.
318+
319+
</details>
320+
264321
<details>
265322
<summary><h3>From Another Qdrant Instance</h3></summary>
266323

cmd/migrate_from_redis.go

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
package cmd
2+
3+
import (
4+
"context"
5+
"encoding/binary"
6+
"fmt"
7+
"log"
8+
"math"
9+
"os"
10+
"os/signal"
11+
"strconv"
12+
"syscall"
13+
14+
"github.com/pterm/pterm"
15+
"github.com/redis/go-redis/v9"
16+
17+
"github.com/qdrant/go-client/qdrant"
18+
19+
"github.com/qdrant/migration/pkg/commons"
20+
)
21+
22+
type MigrateFromRedisCmd struct {
23+
Redis commons.RedisConfig `embed:"" prefix:"redis."`
24+
Qdrant commons.QdrantConfig `embed:"" prefix:"qdrant."`
25+
Migration commons.MigrationConfig `embed:"" prefix:"migration."`
26+
IdField string `prefix:"qdrant." help:"Field storing Redis IDs in Qdrant." default:"__id__"`
27+
28+
targetHost string
29+
targetPort int
30+
targetTLS bool
31+
}
32+
33+
func (r *MigrateFromRedisCmd) Parse() error {
34+
var err error
35+
r.targetHost, r.targetPort, r.targetTLS, err = parseQdrantUrl(r.Qdrant.Url)
36+
if err != nil {
37+
return fmt.Errorf("failed to parse target URL: %w", err)
38+
}
39+
40+
return nil
41+
}
42+
43+
func (r *MigrateFromRedisCmd) Validate() error {
44+
return validateBatchSize(r.Migration.BatchSize)
45+
}
46+
47+
func (r *MigrateFromRedisCmd) Run(globals *Globals) error {
48+
pterm.DefaultHeader.WithFullWidth().Println("Redis Vector to Qdrant Data Migration")
49+
50+
err := r.Parse()
51+
if err != nil {
52+
return fmt.Errorf("failed to parse input: %w", err)
53+
}
54+
55+
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
56+
defer stop()
57+
58+
rdb := redis.NewClient(&redis.Options{
59+
Addr: r.Redis.Addr,
60+
Username: r.Redis.Username,
61+
Password: r.Redis.Password,
62+
DB: r.Redis.DB,
63+
Protocol: r.Redis.Protocol,
64+
Network: r.Redis.Network,
65+
ClientName: r.Redis.ClientName,
66+
})
67+
defer rdb.Close()
68+
69+
targetClient, err := connectToQdrant(globals, r.targetHost, r.targetPort, r.Qdrant.APIKey, r.targetTLS)
70+
if err != nil {
71+
return fmt.Errorf("failed to connect to Qdrant target: %w", err)
72+
}
73+
defer targetClient.Close()
74+
75+
targetCollectionExists, err := targetClient.CollectionExists(ctx, r.Qdrant.Collection)
76+
if err != nil {
77+
return fmt.Errorf("failed to check if collection exists: %w", err)
78+
}
79+
if !targetCollectionExists {
80+
return fmt.Errorf("target collection '%s' does not exist in Qdrant", r.Qdrant.Collection)
81+
}
82+
83+
err = commons.PrepareOffsetsCollection(ctx, r.Migration.OffsetsCollection, targetClient)
84+
if err != nil {
85+
return fmt.Errorf("failed to prepare migration marker collection: %w", err)
86+
}
87+
88+
displayMigrationStart("redis", r.Redis.Index, r.Qdrant.Collection)
89+
90+
sourcePointCount, err := r.countRedisDocuments(ctx, rdb)
91+
if err != nil {
92+
return fmt.Errorf("failed to count documents in Redis index: %w", err)
93+
}
94+
95+
err = r.migrateData(ctx, rdb, targetClient, sourcePointCount)
96+
if err != nil {
97+
return fmt.Errorf("failed to migrate data: %w", err)
98+
}
99+
100+
targetPointCount, err := targetClient.Count(ctx, &qdrant.CountPoints{
101+
CollectionName: r.Qdrant.Collection,
102+
Exact: qdrant.PtrOf(true),
103+
})
104+
if err != nil {
105+
return fmt.Errorf("failed to count points in target: %w", err)
106+
}
107+
108+
pterm.Info.Printfln("Target collection has %d points\n", targetPointCount)
109+
110+
return nil
111+
}
112+
113+
func (r *MigrateFromRedisCmd) countRedisDocuments(ctx context.Context, rdb *redis.Client) (uint64, error) {
114+
info, err := rdb.FTInfo(ctx, r.Redis.Index).Result()
115+
if err != nil {
116+
return 0, fmt.Errorf("failed to get Redis index info: %w", err)
117+
}
118+
119+
pterm.Info.Printfln("Found Redis index '%s' with %d documents", r.Redis.Index, info.NumDocs)
120+
return uint64(info.NumDocs), nil
121+
}
122+
123+
func (r *MigrateFromRedisCmd) migrateData(ctx context.Context, rdb *redis.Client, targetClient *qdrant.Client, sourcePointCount uint64) error {
124+
batchSize := r.Migration.BatchSize
125+
126+
var currentOffset uint64 = 0
127+
128+
if !r.Migration.Restart {
129+
_, offsetStored, err := commons.GetStartOffset(ctx, r.Migration.OffsetsCollection, targetClient, r.Redis.Index)
130+
if err != nil {
131+
return fmt.Errorf("failed to get start offset: %w", err)
132+
}
133+
currentOffset = offsetStored
134+
}
135+
136+
bar, _ := pterm.DefaultProgressbar.WithTotal(int(sourcePointCount)).Start()
137+
displayMigrationProgress(bar, currentOffset)
138+
139+
info, err := rdb.FTInfo(ctx, r.Redis.Index).Result()
140+
if err != nil {
141+
return fmt.Errorf("failed to get index info: %w", err)
142+
}
143+
144+
attrTypes := make(map[string]string)
145+
for _, attr := range info.Attributes {
146+
attrTypes[attr.Identifier] = attr.Type
147+
}
148+
149+
for {
150+
res, err := rdb.FTSearchWithArgs(ctx, r.Redis.Index, "*", &redis.FTSearchOptions{
151+
LimitOffset: int(currentOffset),
152+
Limit: int(batchSize),
153+
}).Result()
154+
if err != nil {
155+
return fmt.Errorf("failed to search Redis: %w", err)
156+
}
157+
158+
count := len(res.Docs)
159+
if count == 0 {
160+
break
161+
}
162+
163+
targetPoints := make([]*qdrant.PointStruct, 0, count)
164+
165+
for i := 0; i < count; i++ {
166+
doc := res.Docs[i]
167+
168+
parsedFields := make(map[string]interface{})
169+
vectorMap := make(map[string]*qdrant.Vector)
170+
171+
for fieldName, rawVal := range doc.Fields {
172+
attrType := attrTypes[fieldName]
173+
174+
if attrType == redis.SearchFieldTypeVector.String() {
175+
vec := bytesToFloats([]byte(rawVal))
176+
vectorMap[fieldName] = qdrant.NewVectorDense(vec)
177+
} else {
178+
parsedFields[fieldName] = parseFieldValue(attrType, rawVal)
179+
}
180+
}
181+
182+
point := &qdrant.PointStruct{
183+
Id: arbitraryIDToUUID(doc.ID),
184+
Vectors: qdrant.NewVectorsMap(vectorMap),
185+
}
186+
187+
payload := qdrant.NewValueMap(parsedFields)
188+
payload[r.IdField] = qdrant.NewValueString(doc.ID)
189+
point.Payload = payload
190+
191+
targetPoints = append(targetPoints, point)
192+
}
193+
194+
if len(targetPoints) > 0 {
195+
_, err = targetClient.Upsert(ctx, &qdrant.UpsertPoints{
196+
CollectionName: r.Qdrant.Collection,
197+
Points: targetPoints,
198+
Wait: qdrant.PtrOf(true),
199+
})
200+
if err != nil {
201+
return fmt.Errorf("failed to insert data into target: %w", err)
202+
}
203+
}
204+
205+
currentOffset += uint64(count)
206+
// Just a placeholder ID for offset tracking.
207+
// We're only using the offset count
208+
offsetId := qdrant.NewIDNum(0)
209+
err = commons.StoreStartOffset(ctx, r.Migration.OffsetsCollection, targetClient, r.Redis.Index, offsetId, currentOffset)
210+
if err != nil {
211+
return fmt.Errorf("failed to store offset: %w", err)
212+
}
213+
214+
bar.Add(count)
215+
}
216+
217+
pterm.Success.Printfln("Data migration finished successfully")
218+
return nil
219+
}
220+
221+
func bytesToFloats(b []byte) []float32 {
222+
if len(b)%4 != 0 {
223+
log.Printf("Warning: byte slice length %d is not a multiple of 4, truncating", len(b))
224+
b = b[:len(b)-(len(b)%4)]
225+
}
226+
227+
fs := make([]float32, len(b)/4)
228+
for i := 0; i < len(fs); i++ {
229+
bits := binary.LittleEndian.Uint32(b[i*4 : (i+1)*4])
230+
fs[i] = math.Float32frombits(bits)
231+
}
232+
return fs
233+
}
234+
235+
func parseFieldValue(attrType string, val string) interface{} {
236+
// redis.SearchFieldTypeVector is handled
237+
// before invoking this function.
238+
if attrType == redis.SearchFieldTypeNumeric.String() {
239+
f, _ := strconv.ParseFloat(val, 64)
240+
return f
241+
}
242+
return val
243+
}

cmd/root.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type CLI struct {
2222
Pinecone MigrateFromPineconeCmd `cmd:"" help:"Migrate data from a Pinecone database to Qdrant."`
2323
Chroma MigrateFromChromaCmd `cmd:"" help:"Migrate data from a Chroma database to Qdrant."`
2424
Weaviate MigrateFromWeaviateCmd `cmd:"" help:"Migrate data from a Weaviate database to Qdrant."`
25+
Redis MigrateFromRedisCmd `cmd:"" help:"Migrate data from a Redis database to Qdrant."`
2526
}
2627

2728
func Execute(projectVersion, projectBuild string) {

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ require (
1313
github.com/pinecone-io/go-pinecone/v3 v3.1.0
1414
github.com/pterm/pterm v0.12.80
1515
github.com/qdrant/go-client v1.14.0
16+
github.com/redis/go-redis/v9 v9.9.0
1617
github.com/stretchr/testify v1.10.0
1718
github.com/testcontainers/testcontainers-go v0.36.0
1819
github.com/weaviate/weaviate v1.27.0
@@ -46,6 +47,7 @@ require (
4647
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
4748
github.com/cpuguy83/dockercfg v0.3.2 // indirect
4849
github.com/davecgh/go-spew v1.1.1 // indirect
50+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
4951
github.com/distribution/reference v0.6.0 // indirect
5052
github.com/docker/docker v28.0.1+incompatible // indirect
5153
github.com/docker/go-connections v0.5.0 // indirect

go.sum

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
5252
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
5353
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
5454
github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w=
55+
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
56+
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
57+
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
58+
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
5559
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
5660
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
5761
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@@ -90,6 +94,8 @@ github.com/creack/pty v1.1.21/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr
9094
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
9195
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
9296
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
97+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
98+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
9399
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
94100
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
95101
github.com/docker/docker v28.0.1+incompatible h1:FCHjSRdXhNRFjlHMTv4jUNlIBbTeRjrWfeFuJp7jpo0=
@@ -374,6 +380,8 @@ github.com/pterm/pterm v0.12.80 h1:mM55B+GnKUnLMUSqhdINe4s6tOuVQIetQ3my8JGyAIg=
374380
github.com/pterm/pterm v0.12.80/go.mod h1:c6DeF9bSnOSeFPZlfs4ZRAFcf5SCoTwvwQ5xaKGQlHo=
375381
github.com/qdrant/go-client v1.14.0 h1:cyz9OOooAexudw5w69LRe9vKCQFYJvaFvt9icOciI1U=
376382
github.com/qdrant/go-client v1.14.0/go.mod h1:iO8ts78jL4x6LDHFOViyYWELVtIBDTjOykBmiOTHLnQ=
383+
github.com/redis/go-redis/v9 v9.9.0 h1:URbPQ4xVQSQhZ27WMQVmZSo3uT3pL+4IdHVcYq2nVfM=
384+
github.com/redis/go-redis/v9 v9.9.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
377385
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
378386
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
379387
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=

0 commit comments

Comments
 (0)