Skip to content

Commit fefaae0

Browse files
committed
Add uploading snapshot function
1 parent e36af14 commit fefaae0

File tree

2 files changed

+73
-11
lines changed

2 files changed

+73
-11
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ tonic = { version = "0.8.3", features = ["tls", "tls-roots"] }
1717
prost = "0.11.8"
1818
prost-types = "0.11.8"
1919
anyhow = "1"
20-
reqwest = { version = "0.11.14", optional = true, features = ["stream"] }
20+
reqwest = { version = "0.11.14", optional = true, features = ["stream", "multipart"] }
2121
futures-util = { version = "0.3.27", optional = true }
2222

2323
[build-dependencies]

src/client.rs

Lines changed: 72 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,26 @@ use crate::qdrant::snapshots_client::SnapshotsClient;
99
use crate::qdrant::value::Kind;
1010
use crate::qdrant::vectors::VectorsOptions;
1111
use crate::qdrant::with_payload_selector::SelectorOptions;
12-
use crate::qdrant::{qdrant_client, with_vectors_selector, AliasOperations, ChangeAliases, ClearPayloadPoints, CollectionOperationResponse, Condition, CountPoints, CountResponse, CreateAlias, CreateCollection, CreateFieldIndexCollection, CreateFullSnapshotRequest, CreateSnapshotRequest, CreateSnapshotResponse, DeleteAlias, DeleteCollection, DeleteFieldIndexCollection, DeleteFullSnapshotRequest, DeletePayloadPoints, DeletePoints, DeleteSnapshotRequest, DeleteSnapshotResponse, FieldCondition, FieldType, Filter, GetCollectionInfoRequest, GetCollectionInfoResponse, GetPoints, GetResponse, HasIdCondition, HealthCheckReply, HealthCheckRequest, IsEmptyCondition, ListAliasesRequest, ListAliasesResponse, ListCollectionAliasesRequest, ListCollectionsRequest, ListCollectionsResponse, ListFullSnapshotsRequest, ListSnapshotsRequest, ListSnapshotsResponse, ListValue, NamedVectors, OptimizersConfigDiff, PayloadIncludeSelector, PayloadIndexParams, PointId, PointStruct, PointsIdsList, PointsOperationResponse, PointsSelector, RecommendBatchPoints, RecommendBatchResponse, RecommendPoints, RecommendResponse, RenameAlias, ScrollPoints, ScrollResponse, SearchBatchPoints, SearchBatchResponse, SearchPoints, SearchResponse, SetPayloadPoints, Struct, UpdateCollection, UpsertPoints, Value, Vector, Vectors, VectorsSelector, WithPayloadSelector, WithVectorsSelector, WriteOrdering, ReadConsistency};
12+
use crate::qdrant::{
13+
qdrant_client, with_vectors_selector, AliasOperations, ChangeAliases, ClearPayloadPoints,
14+
CollectionOperationResponse, Condition, CountPoints, CountResponse, CreateAlias,
15+
CreateCollection, CreateFieldIndexCollection, CreateFullSnapshotRequest, CreateSnapshotRequest,
16+
CreateSnapshotResponse, DeleteAlias, DeleteCollection, DeleteFieldIndexCollection,
17+
DeleteFullSnapshotRequest, DeletePayloadPoints, DeletePoints, DeleteSnapshotRequest,
18+
DeleteSnapshotResponse, FieldCondition, FieldType, Filter, GetCollectionInfoRequest,
19+
GetCollectionInfoResponse, GetPoints, GetResponse, HasIdCondition, HealthCheckReply,
20+
HealthCheckRequest, IsEmptyCondition, ListAliasesRequest, ListAliasesResponse,
21+
ListCollectionAliasesRequest, ListCollectionsRequest, ListCollectionsResponse,
22+
ListFullSnapshotsRequest, ListSnapshotsRequest, ListSnapshotsResponse, ListValue, NamedVectors,
23+
OptimizersConfigDiff, PayloadIncludeSelector, PayloadIndexParams, PointId, PointStruct,
24+
PointsIdsList, PointsOperationResponse, PointsSelector, ReadConsistency, RecommendBatchPoints,
25+
RecommendBatchResponse, RecommendPoints, RecommendResponse, RenameAlias, ScrollPoints,
26+
ScrollResponse, SearchBatchPoints, SearchBatchResponse, SearchPoints, SearchResponse,
27+
SetPayloadPoints, Struct, UpdateCollection, UpsertPoints, Value, Vector, Vectors,
28+
VectorsSelector, WithPayloadSelector, WithVectorsSelector, WriteOrdering,
29+
};
1330
use anyhow::{bail, Result};
31+
use reqwest::multipart::{Form, Part};
1432
use std::collections::HashMap;
1533
use std::future::Future;
1634
use std::path::PathBuf;
@@ -30,7 +48,10 @@ pub struct QdrantClientConfig {
3048

3149
impl QdrantClientConfig {
3250
pub fn from_url(url: &str) -> Self {
33-
QdrantClientConfig { uri: url.to_string(), ..Self::default() }
51+
QdrantClientConfig {
52+
uri: url.to_string(),
53+
..Self::default()
54+
}
3455
}
3556

3657
pub fn set_api_key(&mut self, api_key: &str) {
@@ -499,7 +520,8 @@ impl QdrantClient {
499520
points: Vec<PointStruct>,
500521
ordering: Option<WriteOrdering>,
501522
) -> Result<PointsOperationResponse> {
502-
self._upsert_points(collection_name, &points, false, ordering).await
523+
self._upsert_points(collection_name, &points, false, ordering)
524+
.await
503525
}
504526

505527
pub async fn upsert_points_blocking(
@@ -508,7 +530,8 @@ impl QdrantClient {
508530
points: Vec<PointStruct>,
509531
ordering: Option<WriteOrdering>,
510532
) -> Result<PointsOperationResponse> {
511-
self._upsert_points(collection_name, &points, true, ordering).await
533+
self._upsert_points(collection_name, &points, true, ordering)
534+
.await
512535
}
513536

514537
#[inline]
@@ -517,7 +540,7 @@ impl QdrantClient {
517540
collection_name: impl ToString,
518541
points: &Vec<PointStruct>,
519542
block: bool,
520-
ordering: Option<WriteOrdering>
543+
ordering: Option<WriteOrdering>,
521544
) -> Result<PointsOperationResponse> {
522545
let collection_name = collection_name.to_string();
523546
let collection_name_ref = collection_name.as_str();
@@ -554,7 +577,7 @@ impl QdrantClient {
554577
collection_name: impl ToString,
555578
points: &PointsSelector,
556579
payload: Payload,
557-
ordering: Option<WriteOrdering>
580+
ordering: Option<WriteOrdering>,
558581
) -> Result<PointsOperationResponse> {
559582
self._set_payload(collection_name, points, &payload, true, ordering)
560583
.await
@@ -594,7 +617,7 @@ impl QdrantClient {
594617
collection_name: impl ToString,
595618
points: &PointsSelector,
596619
payload: Payload,
597-
ordering: Option<WriteOrdering>
620+
ordering: Option<WriteOrdering>,
598621
) -> Result<PointsOperationResponse> {
599622
self._overwrite_payload(collection_name, points, &payload, false, ordering)
600623
.await
@@ -618,7 +641,7 @@ impl QdrantClient {
618641
points: &PointsSelector,
619642
payload: &Payload,
620643
block: bool,
621-
ordering: Option<WriteOrdering>
644+
ordering: Option<WriteOrdering>,
622645
) -> Result<PointsOperationResponse> {
623646
let collection_name = collection_name.to_string();
624647
let collection_name_ref = collection_name.as_str();
@@ -800,7 +823,8 @@ impl QdrantClient {
800823
points: &PointsSelector,
801824
ordering: Option<WriteOrdering>,
802825
) -> Result<PointsOperationResponse> {
803-
self._delete_points(collection_name, false, points, ordering).await
826+
self._delete_points(collection_name, false, points, ordering)
827+
.await
804828
}
805829

806830
pub async fn delete_points_blocking(
@@ -809,7 +833,8 @@ impl QdrantClient {
809833
points: &PointsSelector,
810834
ordering: Option<WriteOrdering>,
811835
) -> Result<PointsOperationResponse> {
812-
self._delete_points(collection_name, true, points, ordering).await
836+
self._delete_points(collection_name, true, points, ordering)
837+
.await
813838
}
814839

815840
async fn _delete_points(
@@ -1146,6 +1171,43 @@ impl QdrantClient {
11461171

11471172
Ok(())
11481173
}
1174+
1175+
pub async fn upload_snapshot<T>(
1176+
&self,
1177+
in_path: impl Into<PathBuf>,
1178+
collection_name: T,
1179+
rest_api_uri: Option<T>,
1180+
) -> Result<()>
1181+
where
1182+
T: ToString + Clone,
1183+
{
1184+
let snapshot_path: PathBuf = in_path.into();
1185+
let filename = snapshot_path
1186+
.file_name()
1187+
.and_then(|name| name.to_str().map(|name| name.to_string()));
1188+
1189+
let snapshot_file = std::fs::read(snapshot_path)?;
1190+
1191+
let mut part = Part::bytes(snapshot_file);
1192+
if let Some(filename) = filename {
1193+
part = part.file_name(filename);
1194+
}
1195+
let form = Form::new().part("snapshot", part);
1196+
let client = reqwest::Client::new();
1197+
client
1198+
.post(format!(
1199+
"{}/collcections/{}/snapshots/upload",
1200+
rest_api_uri
1201+
.map(|uri| uri.to_string())
1202+
.unwrap_or_else(|| String::from("http://localhost:6333")),
1203+
collection_name.to_string()
1204+
))
1205+
.multipart(form)
1206+
.send()
1207+
.await?;
1208+
1209+
Ok(())
1210+
}
11491211
}
11501212

11511213
impl PointStruct {

0 commit comments

Comments
 (0)