Skip to content

Commit 5244856

Browse files
refactor: DRY updates (#29)
* refactor: Qdrant to Qdrant impl Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: qdrant.PtrOf Signed-off-by: Anush008 <anushshetty90@gmail.com> * Bump golangci/golangci-lint-action from 7 to 8 (#22) Bumps [golangci/golangci-lint-action](https://github.com/golangci/golangci-lint-action) from 7 to 8. - [Release notes](https://github.com/golangci/golangci-lint-action/releases) - [Commits](golangci/golangci-lint-action@v7...v8) --- updated-dependencies: - dependency-name: golangci/golangci-lint-action dependency-version: '8' dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Bump github.com/grpc-ecosystem/go-grpc-middleware/v2 from 2.3.1 to 2.3.2 (#21) Bumps [github.com/grpc-ecosystem/go-grpc-middleware/v2](https://github.com/grpc-ecosystem/go-grpc-middleware) from 2.3.1 to 2.3.2. - [Release notes](https://github.com/grpc-ecosystem/go-grpc-middleware/releases) - [Commits](grpc-ecosystem/go-grpc-middleware@v2.3.1...v2.3.2) --- updated-dependencies: - dependency-name: github.com/grpc-ecosystem/go-grpc-middleware/v2 dependency-version: 2.3.2 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * chore: DRY Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: Prettify table Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: Nitfix Signed-off-by: Anush008 <anushshetty90@gmail.com> * docs: comment Signed-off-by: Anush008 <anushshetty90@gmail.com> * refactor: Don't change M Signed-off-by: Anush008 <anushshetty90@gmail.com> * refactor: Updated commons.go Signed-off-by: Anush008 <anushshetty90@gmail.com> * feat: Milvus Source Signed-off-by: Anush008 <anushshetty90@gmail.com> * refactor: Use nested flags Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: file renamed Signed-off-by: Anush008 <anushshetty90@gmail.com> * test: Milvus source Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: Formatting Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: Use time.RFC3339 Signed-off-by: Anush008 <anushshetty90@gmail.com> * refactor: Renamed PrepareMigrationOffsetsCollection Signed-off-by: Anush008 <anushshetty90@gmail.com> * DRY Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: Use %q to log withing quotes Signed-off-by: Anush008 <anushshetty90@gmail.com> * refactor: Rename to offsetId Signed-off-by: Anush008 <anushshetty90@gmail.com> --------- Signed-off-by: Anush008 <anushshetty90@gmail.com> Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
1 parent f352dd3 commit 5244856

File tree

4 files changed

+128
-124
lines changed

4 files changed

+128
-124
lines changed

README.md

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,18 @@
22

33
CLI tool for migrating data to [Qdrant](http://qdrant.tech) with support for resumable transfers in case of interruptions.
44

5-
> [!WARNING]
6-
> This project is in beta. The API may change in future releases.
5+
Easily move your data to Qdrant from other vector storages. With support for resumable migration, even interrupted processes can continue smoothly.
76

8-
## Supported Sources
7+
Supported sources:
98

10-
* [Milvus](https://milvus.io)
11-
* Another [Qdrant](http://qdrant.tech) instance
9+
* Milvus
10+
* Another Qdrant instance
1211

1312
## Installation
1413

15-
You can run this tool on any machine with connectivity to both the source and the Qdrant database. For best performance, use a machine with a fast network and minimal latency to both endpoints.
14+
The easiest way to run the qdrant-migration tool is as a container. You can run it any machine where you have connectivity to both the source and the target Qdrant databases. For optimal performance, you should run the tool on a machine with a fast network connection and minimum latency to both databases.
15+
16+
To pull the latest image run:
1617

1718
#### Binaries
1819

@@ -23,7 +24,29 @@ Each release includes **precompiled binaries** for all major OS and CPU architec
2324
To get the latest Docker image run the following command.
2425

2526
```bash
26-
docker pull registry.cloud.qdrant.io/library/qdrant-migration
27+
$ docker run --net=host --rm -it registry.cloud.qdrant.io/library/qdrant-migration qdrant --help
28+
Usage: migration qdrant --source-url=STRING --source-collection=STRING --target-url=STRING --target-collection=STRING [flags]
29+
30+
Migrate data from a Qdrant database to Qdrant.
31+
32+
Flags:
33+
-h, --help Show context-sensitive help.
34+
--debug Enable debug mode.
35+
--trace Enable trace mode.
36+
--skip-tls-verification Skip TLS verification.
37+
--version Print version information and quit
38+
39+
--source-url=STRING Source gRPC URL, e.g. https://your-qdrant-hostname:6334
40+
--source-collection=STRING Source collection
41+
--source-api-key=STRING Source API key ($SOURCE_API_KEY)
42+
--target-url=STRING Target gRPC URL, e.g. https://your-qdrant-hostname:6334
43+
--target-collection=STRING Target collection
44+
--target-api-key=STRING Target API key ($TARGET_API_KEY)
45+
-b, --batch-size=50 Batch size
46+
-c, --create-target-collection Create the target collection if it does not exist
47+
--ensure-payload-indexes Ensure payload indexes are created
48+
--migration-offsets-collection-name="_migration_offsets" Collection where the current migration offset should be stored
49+
--restart-migration Restart the migration and do not continue from last offset
2750
```
2851
2952
## How To Migrate?

cmd/migrate_from_milvus.go

Lines changed: 24 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7-
"net/url"
87
"os"
98
"os/signal"
109
"strconv"
@@ -32,27 +31,17 @@ type MigrateFromMilvusCmd struct {
3231
}
3332

3433
func (r *MigrateFromMilvusCmd) Parse() error {
35-
targetUrl, err := url.Parse(r.Qdrant.Url)
34+
var err error
35+
r.targetHost, r.targetPort, r.targetTLS, err = parseQdrantUrl(r.Qdrant.Url)
3636
if err != nil {
3737
return fmt.Errorf("failed to parse target URL: %w", err)
3838
}
3939

40-
r.targetHost = targetUrl.Hostname()
41-
r.targetTLS = targetUrl.Scheme == HTTPS
42-
r.targetPort, err = getPort(targetUrl)
43-
if err != nil {
44-
return fmt.Errorf("failed to parse target port: %w", err)
45-
}
46-
4740
return nil
4841
}
4942

5043
func (r *MigrateFromMilvusCmd) Validate() error {
51-
if r.Migration.BatchSize < 1 {
52-
return fmt.Errorf("batch size must be greater than 0")
53-
}
54-
55-
return nil
44+
return validateBatchSize(r.Migration.BatchSize)
5645
}
5746

5847
func (r *MigrateFromMilvusCmd) Run(globals *Globals) error {
@@ -91,28 +80,14 @@ func (r *MigrateFromMilvusCmd) Run(globals *Globals) error {
9180
return fmt.Errorf("error preparing target collection: %w", err)
9281
}
9382

94-
targetPointCount, err := targetClient.Count(ctx, &qdrant.CountPoints{
95-
CollectionName: r.Qdrant.Collection,
96-
Exact: qdrant.PtrOf(true),
97-
})
98-
if err != nil {
99-
return fmt.Errorf("failed to count points in target: %w", err)
100-
}
101-
102-
pterm.DefaultSection.Println("Starting data migration")
103-
104-
_ = pterm.DefaultTable.WithHasHeader().WithData(pterm.TableData{
105-
{"Type", "Provider", "Collection", "Points"},
106-
{"Source", "milvus", r.Milvus.Collection, strconv.FormatUint(sourcePointCount, 10)},
107-
{"Target", "qdrant", r.Qdrant.Collection, strconv.FormatUint(targetPointCount, 10)},
108-
}).Render()
83+
displayMigrationStart("milvus", r.Milvus.Collection, r.Qdrant.Collection)
10984

11085
err = r.migrateData(ctx, sourceClient, targetClient, sourcePointCount)
11186
if err != nil {
11287
return fmt.Errorf("failed to migrate data: %w", err)
11388
}
11489

115-
targetPointCount, err = targetClient.Count(ctx, &qdrant.CountPoints{
90+
targetPointCount, err := targetClient.Count(ctx, &qdrant.CountPoints{
11691
CollectionName: r.Qdrant.Collection,
11792
Exact: qdrant.PtrOf(true),
11893
})
@@ -154,7 +129,6 @@ func (r *MigrateFromMilvusCmd) countMilvusVectors(ctx context.Context, client *m
154129
}
155130

156131
return count, nil
157-
158132
}
159133

160134
func (r *MigrateFromMilvusCmd) prepareTargetCollection(ctx context.Context, sourceClient *milvusclient.Client, targetClient *qdrant.Client) error {
@@ -168,7 +142,7 @@ func (r *MigrateFromMilvusCmd) prepareTargetCollection(ctx context.Context, sour
168142
}
169143

170144
if targetCollectionExists {
171-
pterm.Info.Printfln("Target collection '%s' already exists. Skipping creation.", r.Qdrant.Collection)
145+
pterm.Info.Printfln("Target collection %q already exists. Skipping creation.", r.Qdrant.Collection)
172146
return nil
173147
}
174148

@@ -203,36 +177,29 @@ func (r *MigrateFromMilvusCmd) prepareTargetCollection(ctx context.Context, sour
203177
return fmt.Errorf("failed to create target collection: %w", err)
204178
}
205179

206-
pterm.Success.Printfln("Created target collection '%s'", r.Qdrant.Collection)
180+
pterm.Success.Printfln("Created target collection %q", r.Qdrant.Collection)
207181
return nil
208182
}
209183

210184
func (r *MigrateFromMilvusCmd) migrateData(ctx context.Context, sourceClient *milvusclient.Client, targetClient *qdrant.Client, sourcePointCount uint64) error {
211185
startTime := time.Now()
212186
batchSize := r.Migration.BatchSize
213187

214-
var lastID *qdrant.PointId
188+
var offsetID *qdrant.PointId
215189
offsetCount := uint64(0)
216190
var err error
217191

218192
if !r.Migration.Restart {
219-
offsetId, offsetStored, err := commons.GetStartOffset(ctx, r.Migration.OffsetsCollection, targetClient, r.Milvus.Collection, r.Migration.Restart)
193+
id, count, err := commons.GetStartOffset(ctx, r.Migration.OffsetsCollection, targetClient, r.Milvus.Collection)
220194
if err != nil {
221195
return fmt.Errorf("failed to get start offset: %w", err)
222196
}
223-
offsetCount = offsetStored
224-
lastID = offsetId
197+
offsetCount = count
198+
offsetID = id
225199
}
226200

227201
bar, _ := pterm.DefaultProgressbar.WithTotal(int(sourcePointCount)).Start()
228-
if offsetCount > 0 {
229-
pterm.Info.Printfln("Starting from offset %d", offsetCount)
230-
bar.Add(int(offsetCount))
231-
} else {
232-
pterm.Info.Printfln("Starting from beginning")
233-
}
234-
235-
fmt.Print("\n")
202+
displayMigrationProgress(bar, offsetCount)
236203

237204
schema, err := sourceClient.DescribeCollection(ctx, milvusclient.NewDescribeCollectionOption(r.Milvus.Collection))
238205
if err != nil {
@@ -245,31 +212,28 @@ func (r *MigrateFromMilvusCmd) migrateData(ctx context.Context, sourceClient *mi
245212

246213
for {
247214
filter := ""
248-
if lastID != nil {
215+
if offsetID != nil {
249216
switch pkType {
250217
case entity.FieldTypeInt64:
251-
filter = fmt.Sprintf("%s > %d", pkName, lastID.GetNum())
218+
filter = fmt.Sprintf("%s > %d", pkName, offsetID.GetNum())
252219
case entity.FieldTypeVarChar:
253-
filter = fmt.Sprintf("%s > '%s'", pkName, lastID.GetUuid())
254-
default:
255-
return fmt.Errorf("unsupported primary key type: %v", pkType)
220+
filter = fmt.Sprintf("%s > '%s'", pkName, offsetID.GetUuid())
256221
}
257222
}
258223

259224
result, err := sourceClient.Query(ctx, milvusclient.NewQueryOption(r.Milvus.Collection).
260225
WithFilter(filter).
261226
WithOutputFields("*").
262-
WithLimit(batchSize),
263-
)
227+
WithLimit(batchSize))
264228
if err != nil {
265-
return fmt.Errorf("query failed: %w", err)
229+
return fmt.Errorf("failed to query Milvus: %w", err)
266230
}
231+
267232
if result.ResultCount == 0 {
268233
break
269234
}
270235

271236
var targetPoints []*qdrant.PointStruct
272-
273237
for i := 0; i < result.ResultCount; i++ {
274238
point := &qdrant.PointStruct{}
275239
vectors := make(map[string]*qdrant.Vector)
@@ -286,12 +250,12 @@ func (r *MigrateFromMilvusCmd) migrateData(ctx context.Context, sourceClient *mi
286250
switch col.Type() {
287251
case entity.FieldTypeVarChar:
288252
uuid := value.(string)
289-
lastID = qdrant.NewID(uuid)
290-
point.Id = lastID
253+
offsetID = qdrant.NewID(uuid)
254+
point.Id = offsetID
291255
case entity.FieldTypeInt64:
292256
num := value.(int64)
293-
lastID = qdrant.NewIDNum(uint64(num))
294-
point.Id = lastID
257+
offsetID = qdrant.NewIDNum(uint64(num))
258+
point.Id = offsetID
295259
}
296260
continue
297261
}
@@ -318,13 +282,12 @@ func (r *MigrateFromMilvusCmd) migrateData(ctx context.Context, sourceClient *mi
318282
Points: targetPoints,
319283
Wait: qdrant.PtrOf(true),
320284
})
321-
322285
if err != nil {
323286
return fmt.Errorf("failed to insert data into target: %w", err)
324287
}
325288

326289
offsetCount += uint64(len(targetPoints))
327-
err = commons.StoreStartOffset(ctx, r.Migration.OffsetsCollection, targetClient, r.Milvus.Collection, lastID, offsetCount)
290+
err = commons.StoreStartOffset(ctx, r.Migration.OffsetsCollection, targetClient, r.Milvus.Collection, offsetID, offsetCount)
328291
if err != nil {
329292
return fmt.Errorf("failed to store offset: %w", err)
330293
}
@@ -338,12 +301,11 @@ func (r *MigrateFromMilvusCmd) migrateData(ctx context.Context, sourceClient *mi
338301
// If one minute elapsed get updated sourcePointCount.
339302
// Useful if any new points were added to the source during migration.
340303
if time.Since(startTime) > time.Minute {
341-
sourcePointCount, err := r.countMilvusVectors(ctx, sourceClient)
304+
sourcePointCount, err = r.countMilvusVectors(ctx, sourceClient)
342305
if err != nil {
343306
return fmt.Errorf("failed to count points in source: %w", err)
344307
}
345308
bar.Total = int(sourcePointCount)
346-
startTime = time.Now()
347309
}
348310
}
349311

0 commit comments

Comments
 (0)