Skip to content

Commit b87dd9c

Browse files
authored
feat: Migrate from S3 Vectors (#83)
* feat: Migrate from S3 vectors Signed-off-by: Anush008 <anushshetty90@gmail.com> * Update README.md * docs: Updated README.md Signed-off-by: Anush008 <anushshetty90@gmail.com> * test: S3 Vectors source (#84) * docs: Updated README.md Signed-off-by: Anush008 <anushshetty90@gmail.com> * docs: Updated README.md * test: S3 vectors source Signed-off-by: Anush008 <anushshetty90@gmail.com> * test: Reuse bucket Signed-off-by: Anush008 <anushshetty90@gmail.com> * test: Configurable bucket name Signed-off-by: Anush008 <anushshetty90@gmail.com> --------- Signed-off-by: Anush008 <anushshetty90@gmail.com> * test: Move index deletion to t.CleanUp() Signed-off-by: Anush008 <anushshetty90@gmail.com> --------- Signed-off-by: Anush008 <anushshetty90@gmail.com>
1 parent 20abf1e commit b87dd9c

File tree

10 files changed

+569
-6
lines changed

10 files changed

+569
-6
lines changed

.github/workflows/pr-workflow.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ jobs:
4343
cache-dependency-path: go.sum
4444

4545
- name: Unit tests
46+
env:
47+
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
48+
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
49+
AWS_DEFAULT_REGION: ${{ secrets.AWS_DEFAULT_REGION }}
50+
AWS_S3_VECTOR_BUCKET: ${{ secrets.AWS_S3_VECTOR_BUCKET }}
4651
run: |
4752
make test_unit
4853

README.md

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ CLI tool for migrating data to [Qdrant](http://qdrant.tech) with support for res
1414
* Redis
1515
* MongoDB
1616
* OpenSearch
17-
* Postgres
17+
* Postgres (pgvector)
18+
* S3 Vectors
1819
* Another Qdrant instance
1920

2021
## Installation
@@ -402,6 +403,47 @@ docker run --net=host --rm -it registry.cloud.qdrant.io/library/qdrant-migration
402403

403404
</details>
404405

406+
<details>
407+
<summary><h3>From S3 Vectors</h3></summary>
408+
409+
Migrate data from an **S3 Vectors** index to **Qdrant**:
410+
411+
### 📥 Example
412+
413+
> Important:
414+
> Set your AWS credentials using the AWS CLI's [configure](https://docs.aws.amazon.com/cli/latest/reference/configure/#examples) command or [environment variables](https://docs.aws.amazon.com/cli/v1/userguide/cli-configure-envvars.html).
415+
416+
```bash
417+
docker run --net=host --rm -it registry.cloud.qdrant.io/library/qdrant-migration s3 \
418+
--s3.bucket 'your-bucket-name' \
419+
--s3.index 'your-index-name' \
420+
--qdrant.url 'http://localhost:6334' \
421+
--qdrant.api-key 'optional-qdrant-api-key' \
422+
--qdrant.collection 'target-collection' \
423+
--migration.batch-size 64
424+
```
425+
426+
#### S3 Vectors Options
427+
428+
| Flag | Description |
429+
| ----------------| -------------------------------- |
430+
| `--s3.bucket` | S3 Vectors bucket name (required) |
431+
| `--s3.index` | S3 Vectors index name (required) |
432+
433+
#### Qdrant Options
434+
435+
| Flag | Description |
436+
| ----------------------- | ------------------------------------------------------------- |
437+
| `--qdrant.url` | Qdrant gRPC URL. Default: `"http://localhost:6334"` |
438+
| `--qdrant.collection` | Target collection name |
439+
| `--qdrant.api-key` | Qdrant API key (optional) |
440+
| `--qdrant.id-field` | Field storing S3 IDs in Qdrant. Default: `"__id__"` |
441+
| `--qdrant.dense-vector` | Name of the dense vector in Qdrant. Default: `"dense_vector"` |
442+
443+
* See [Shared Migration Options](#shared-migration-options) for common migration parameters.
444+
445+
</details>
446+
405447
<details>
406448
<summary><h3>From Another Qdrant Instance</h3></summary>
407449

cmd/migrate_from_s3_vectors.go

Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
package cmd
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"os/signal"
8+
"syscall"
9+
10+
"github.com/aws/aws-sdk-go-v2/config"
11+
"github.com/aws/aws-sdk-go-v2/service/s3vectors"
12+
"github.com/aws/aws-sdk-go-v2/service/s3vectors/types"
13+
"github.com/aws/smithy-go/document"
14+
"github.com/pterm/pterm"
15+
16+
"github.com/qdrant/go-client/qdrant"
17+
18+
"github.com/qdrant/migration/pkg/commons"
19+
)
20+
21+
type MigrateFromS3VectorsCmd struct {
22+
S3 commons.S3VectorsConfig `embed:"" prefix:"s3."`
23+
Qdrant commons.QdrantConfig `embed:"" prefix:"qdrant."`
24+
Migration commons.MigrationConfig `embed:"" prefix:"migration."`
25+
IdField string `prefix:"qdrant." help:"Field storing S3 IDs in Qdrant." default:"__id__"`
26+
DenseVector string `prefix:"qdrant." help:"Name of the dense vector in Qdrant" default:"dense_vector"`
27+
28+
targetHost string
29+
targetPort int
30+
targetTLS bool
31+
}
32+
33+
func (r *MigrateFromS3VectorsCmd) Parse() error {
34+
var err error
35+
r.targetHost, r.targetPort, r.targetTLS, err = parseQdrantUrl(r.Qdrant.Url)
36+
if err != nil {
37+
return fmt.Errorf("failed to parse target URL: %w", err)
38+
}
39+
40+
return nil
41+
}
42+
43+
func (r *MigrateFromS3VectorsCmd) Validate() error {
44+
return validateBatchSize(r.Migration.BatchSize)
45+
}
46+
47+
func (r *MigrateFromS3VectorsCmd) Run(globals *Globals) error {
48+
pterm.DefaultHeader.WithFullWidth().Println("S3 Vectors to Qdrant Data Migration")
49+
50+
err := r.Parse()
51+
if err != nil {
52+
return fmt.Errorf("failed to parse input: %w", err)
53+
}
54+
55+
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
56+
defer stop()
57+
58+
sourceClient, err := r.connectToS3Vectors()
59+
if err != nil {
60+
return fmt.Errorf("failed to connect to S3 Vectors source: %w", err)
61+
}
62+
63+
targetClient, err := connectToQdrant(globals, r.targetHost, r.targetPort, r.Qdrant.APIKey, r.targetTLS, 0)
64+
if err != nil {
65+
return fmt.Errorf("failed to connect to Qdrant target: %w", err)
66+
}
67+
68+
err = commons.PrepareOffsetsCollection(ctx, r.Migration.OffsetsCollection, targetClient)
69+
if err != nil {
70+
return fmt.Errorf("failed to prepare migration marker collection: %w", err)
71+
}
72+
73+
err = r.prepareTargetCollection(ctx, sourceClient, targetClient)
74+
if err != nil {
75+
return fmt.Errorf("error preparing target collection: %w", err)
76+
}
77+
78+
displayMigrationStart("s3-vectors", fmt.Sprintf("%s/%s", r.S3.Bucket, r.S3.Index), r.Qdrant.Collection)
79+
80+
err = r.migrateData(ctx, sourceClient, targetClient)
81+
if err != nil {
82+
return fmt.Errorf("failed to migrate data: %w", err)
83+
}
84+
85+
targetPointCount, err := targetClient.Count(ctx, &qdrant.CountPoints{
86+
CollectionName: r.Qdrant.Collection,
87+
Exact: qdrant.PtrOf(true),
88+
})
89+
if err != nil {
90+
return fmt.Errorf("failed to count points in target: %w", err)
91+
}
92+
93+
pterm.Info.Printfln("Target collection has %d points\n", targetPointCount)
94+
95+
return nil
96+
}
97+
98+
func (r *MigrateFromS3VectorsCmd) connectToS3Vectors() (*s3vectors.Client, error) {
99+
ctx := context.Background()
100+
sdkConfig, err := config.LoadDefaultConfig(ctx)
101+
if err != nil {
102+
return nil, fmt.Errorf("failed to load AWS config: %w", err)
103+
}
104+
105+
return s3vectors.NewFromConfig(sdkConfig), nil
106+
}
107+
108+
func (r *MigrateFromS3VectorsCmd) prepareTargetCollection(ctx context.Context, sourceClient *s3vectors.Client, targetClient *qdrant.Client) error {
109+
if !r.Migration.CreateCollection {
110+
return nil
111+
}
112+
113+
targetCollectionExists, err := targetClient.CollectionExists(ctx, r.Qdrant.Collection)
114+
if err != nil {
115+
return fmt.Errorf("failed to check if collection exists: %w", err)
116+
}
117+
118+
if targetCollectionExists {
119+
pterm.Info.Printfln("Target collection '%s' already exists. Skipping creation.", r.Qdrant.Collection)
120+
return nil
121+
}
122+
123+
indexInfo, err := sourceClient.GetIndex(ctx, &s3vectors.GetIndexInput{
124+
IndexName: &r.S3.Index,
125+
VectorBucketName: &r.S3.Bucket,
126+
})
127+
if err != nil {
128+
return fmt.Errorf("failed to get S3 Vectors index: %w", err)
129+
}
130+
131+
distanceMapping := map[types.DistanceMetric]qdrant.Distance{
132+
types.DistanceMetricCosine: qdrant.Distance_Cosine,
133+
types.DistanceMetricEuclidean: qdrant.Distance_Euclid,
134+
}
135+
136+
dist, ok := distanceMapping[indexInfo.Index.DistanceMetric]
137+
if !ok {
138+
return fmt.Errorf("unsupported distance metric: %v", indexInfo.Index.DistanceMetric)
139+
}
140+
141+
createReq := &qdrant.CreateCollection{
142+
CollectionName: r.Qdrant.Collection,
143+
VectorsConfig: qdrant.NewVectorsConfigMap(map[string]*qdrant.VectorParams{
144+
r.DenseVector: {
145+
Size: uint64(*indexInfo.Index.Dimension),
146+
Distance: dist,
147+
},
148+
}),
149+
}
150+
151+
if err := targetClient.CreateCollection(ctx, createReq); err != nil {
152+
return fmt.Errorf("failed to create target collection: %w", err)
153+
}
154+
155+
pterm.Success.Printfln("Created target collection '%s'", r.Qdrant.Collection)
156+
return nil
157+
}
158+
159+
func (r *MigrateFromS3VectorsCmd) migrateData(ctx context.Context, sourceClient *s3vectors.Client, targetClient *qdrant.Client) error {
160+
batchSize := r.Migration.BatchSize
161+
162+
var offsetId *qdrant.PointId
163+
offsetCount := uint64(0)
164+
165+
sourceIdentifier := fmt.Sprintf("%s/%s", r.S3.Bucket, r.S3.Index)
166+
167+
if !r.Migration.Restart {
168+
id, offsetStored, err := commons.GetStartOffset(ctx, r.Migration.OffsetsCollection, targetClient, sourceIdentifier)
169+
if err != nil {
170+
return fmt.Errorf("failed to get start offset: %w", err)
171+
}
172+
offsetCount = offsetStored
173+
offsetId = id
174+
}
175+
176+
spinner, _ := pterm.DefaultSpinner.Start("Migrating data")
177+
if offsetCount > 0 {
178+
spinner.UpdateText(fmt.Sprintf("Resuming migration from %d points", offsetCount))
179+
}
180+
181+
for {
182+
limit := int32(batchSize)
183+
var nextToken *string
184+
if offsetId != nil {
185+
token := offsetId.GetUuid()
186+
nextToken = &token
187+
}
188+
listRes, err := sourceClient.ListVectors(ctx, &s3vectors.ListVectorsInput{
189+
IndexName: &r.S3.Index,
190+
VectorBucketName: &r.S3.Bucket,
191+
MaxResults: &limit,
192+
NextToken: nextToken,
193+
ReturnData: true,
194+
ReturnMetadata: true,
195+
})
196+
if err != nil {
197+
return fmt.Errorf("failed to list vectors from S3 Vectors: %w", err)
198+
}
199+
200+
if len(listRes.Vectors) == 0 {
201+
break
202+
}
203+
204+
var targetPoints []*qdrant.PointStruct
205+
for _, vec := range listRes.Vectors {
206+
point := &qdrant.PointStruct{
207+
Id: arbitraryIDToUUID(*vec.Key),
208+
}
209+
210+
vData, ok := vec.Data.(*types.VectorDataMemberFloat32)
211+
if !ok {
212+
return fmt.Errorf("unexpected vector data type")
213+
}
214+
215+
vectorMap := map[string]*qdrant.Vector{
216+
r.DenseVector: qdrant.NewVector(vData.Value...),
217+
}
218+
point.Vectors = qdrant.NewVectorsMap(vectorMap)
219+
220+
payload := make(map[string]*qdrant.Value)
221+
if vec.Metadata != nil {
222+
var metaMap map[string]any
223+
err = vec.Metadata.UnmarshalSmithyDocument(&metaMap)
224+
if err != nil {
225+
return fmt.Errorf("failed to unmarshal metadata: %w", err)
226+
}
227+
metaMap = convertSmithyDocumentNumbers(metaMap).(map[string]any)
228+
payload = qdrant.NewValueMap(metaMap)
229+
}
230+
payload[r.IdField] = qdrant.NewValueString(*vec.Key)
231+
point.Payload = payload
232+
233+
targetPoints = append(targetPoints, point)
234+
}
235+
236+
_, err = targetClient.Upsert(ctx, &qdrant.UpsertPoints{
237+
CollectionName: r.Qdrant.Collection,
238+
Points: targetPoints,
239+
Wait: qdrant.PtrOf(true),
240+
})
241+
if err != nil {
242+
return fmt.Errorf("failed to insert data into target: %w", err)
243+
}
244+
245+
offsetCount += uint64(len(targetPoints))
246+
if listRes.NextToken != nil {
247+
offsetId = qdrant.NewID(*listRes.NextToken)
248+
err = commons.StoreStartOffset(ctx, r.Migration.OffsetsCollection, targetClient, sourceIdentifier, offsetId, offsetCount)
249+
if err != nil {
250+
return fmt.Errorf("failed to store offset: %w", err)
251+
}
252+
}
253+
254+
spinner.UpdateText(fmt.Sprintf("Migrated %d points", offsetCount))
255+
256+
if listRes.NextToken == nil {
257+
break
258+
}
259+
}
260+
261+
spinner.Success("Migration finished successfully")
262+
return nil
263+
}
264+
265+
// Recursively converts document.Number values to float64 or int64 for Qdrant compatibility.
266+
func convertSmithyDocumentNumbers(v any) any {
267+
if v == nil {
268+
return nil
269+
}
270+
271+
switch val := v.(type) {
272+
case map[string]any:
273+
res := make(map[string]any, len(val))
274+
for k, v2 := range val {
275+
res[k] = convertSmithyDocumentNumbers(v2)
276+
}
277+
return res
278+
case []any:
279+
res := make([]any, len(val))
280+
for i, v2 := range val {
281+
res[i] = convertSmithyDocumentNumbers(v2)
282+
}
283+
return res
284+
case document.Number:
285+
// Try int64 first, fallback to float64
286+
if i, err := val.Int64(); err == nil {
287+
return i
288+
}
289+
if f, err := val.Float64(); err == nil {
290+
return f
291+
}
292+
return val.String() // fallback: string
293+
default:
294+
return v
295+
}
296+
}

cmd/root.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type CLI struct {
2626
Mongodb MigrateFromMongoDBCmd `cmd:"" help:"Migrate data from a Mongo database to Qdrant."`
2727
OpenSearch MigrateFromOpenSearchCmd `cmd:"" name:"opensearch" help:"Migrate data from an OpenSearch database to Qdrant."`
2828
PG MigrateFromPGCmd `cmd:"" name:"pg" help:"Migrate data from a PostgreSQL database to Qdrant."`
29+
S3Vectors MigrateFromS3VectorsCmd `cmd:"" name:"s3" help:"Migrate data from S3 Vectors to Qdrant."`
2930
}
3031

3132
func Execute(projectVersion, projectBuild string) {

0 commit comments

Comments
 (0)