-
Notifications
You must be signed in to change notification settings - Fork 77
Expand file tree
/
Copy pathmod.rs
More file actions
234 lines (217 loc) · 7.8 KB
/
mod.rs
File metadata and controls
234 lines (217 loc) · 7.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
pub(crate) mod builders;
mod collection;
pub mod config;
mod conversions;
pub mod error;
mod index;
mod payload;
mod points;
mod query;
mod search;
mod sharding_keys;
mod snapshot;
mod version_check;
use std::future::Future;
use std::sync::Arc;
use std::thread;
use tonic::codegen::InterceptedService;
use tonic::transport::{Channel, Uri};
use tonic::Status;
use crate::auth::MetadataInterceptor;
use crate::channel_pool::ChannelPool;
use crate::qdrant::{qdrant_client, HealthCheckReply, HealthCheckRequest};
use crate::qdrant_client::config::QdrantConfig;
use crate::qdrant_client::version_check::is_compatible;
use crate::QdrantError;
/// [`Qdrant`] client result
pub type QdrantResult<T> = Result<T, QdrantError>;
/// A builder for [`Qdrant`]
pub type QdrantBuilder = QdrantConfig;
/// API client to interact with a [Qdrant](https://qdrant.tech/) server.
///
/// Connects to a Qdrant server and provides an API interface.
///
/// # Set up
///
/// Set up a [`Qdrant`] client to connect to a Qdrant instance with just an [URL](Qdrant::from_url):
///
/// ```no_run
/// use qdrant_client::Qdrant;
///
///# async fn connect() -> Result<(), qdrant_client::QdrantError> {
/// let client = Qdrant::from_url("http://localhost:6334").build()?;
///# Ok(())
///# }
/// ```
///
/// Or use an [URL](Qdrant::from_url), [API key](fn@QdrantBuilder::api_key) and
/// [timeout](fn@QdrantBuilder::timeout):
///
/// ```no_run
/// use qdrant_client::Qdrant;
///
///# async fn connect() -> Result<(), qdrant_client::QdrantError> {
/// let client = Qdrant::from_url("http://localhost:6334")
/// .api_key(std::env::var("QDRANT_API_KEY"))
/// .timeout(std::time::Duration::from_secs(10))
/// .build()?;
///# Ok(())
///# }
/// ```
///
/// # Operations
///
/// Categories:
///
/// - [Collection operations](Self#collection-operations) - manage collections, aliases and cluster configuration
/// - [Point operations](Self#point-operations) - manage points and vectors
/// - [Payload operations](Self#payload-operations) - manage point payloads
/// - [Query operations](Self#query-operations) - query (search) points using universal search
/// - [Index operations](Self#index-operations) - manage field and payload indices
/// - [Snapshot operations](Self#snapshot-operations) - manage instance or collection snapshots
/// - [Shard key operations](Self#sharding-key-operations) - manage shard keys
///
/// Common operations include:
///
/// - [`create_collection`](Self::create_collection) - create a new collection
/// - [`upsert_points`](Self::upsert_points) - insert or update points
/// - [`query`](Self::query) - query points with similarity search
#[derive(Clone)]
pub struct Qdrant {
/// Client configuration
pub config: QdrantConfig,
/// Internal connection pool
channel: Arc<ChannelPool>,
}
/// # Construct and connect
///
/// Methods to construct a new Qdrant client.
impl Qdrant {
/// Create a new Qdrant client.
///
/// Constructs the client and connects based on the given [`QdrantConfig`](config::QdrantConfig).
pub fn new(config: QdrantConfig) -> QdrantResult<Self> {
if config.check_compatibility {
// create a temporary client to check compatibility
let channel = ChannelPool::new(
config.uri.parse::<Uri>()?,
config.timeout,
config.connect_timeout,
config.keep_alive_while_idle,
1, // No need to create a pool for the compatibility check.
);
let client = Self {
channel: Arc::new(channel),
config: config.clone(),
};
// We're in sync context, spawn temporary runtime in thread to do async health check
let server_version = thread::scope(|s| {
s.spawn(|| {
tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.map_err(QdrantError::Io)?
.block_on(client.health_check())
})
.join()
.expect("Failed to join health check thread")
})
.ok()
.map(|info| info.version);
let client_version = env!("CARGO_PKG_VERSION").to_string();
if let Some(server_version) = server_version {
let is_compatible = is_compatible(Some(&client_version), Some(&server_version));
if !is_compatible {
println!("Client version {client_version} is not compatible with server version {server_version}. \
Major versions should match and minor version difference must not exceed 1. \
Set check_compatibility=false to skip version check.");
}
} else {
println!(
"Failed to obtain server version. \
Unable to check client-server compatibility. \
Set check_compatibility=false to skip version check."
);
}
}
let channel = ChannelPool::new(
config.uri.parse::<Uri>()?,
config.timeout,
config.connect_timeout,
config.keep_alive_while_idle,
config.pool_size,
);
let client = Self {
channel: Arc::new(channel),
config,
};
Ok(client)
}
/// Build a new Qdrant client with the given URL.
///
/// ```no_run
/// use qdrant_client::Qdrant;
///
///# async fn connect() -> Result<(), qdrant_client::QdrantError> {
/// let client = Qdrant::from_url("http://localhost:6334").build()?;
///# Ok(())
///# }
/// ```
///
/// See more ways to set up the client [here](Self#set-up).
pub fn from_url(url: &str) -> QdrantBuilder {
QdrantBuilder::from_url(url)
}
/// Wraps a channel with a metadata interceptor (api key + custom headers)
fn with_api_key(&self, channel: Channel) -> InterceptedService<Channel, MetadataInterceptor> {
let interceptor = MetadataInterceptor::new(
self.config.api_key.clone(),
self.config.custom_headers.clone(),
);
InterceptedService::new(channel, interceptor)
}
// Access to raw root qdrant API
async fn with_root_qdrant_client<T, O: Future<Output = Result<T, Status>>>(
&self,
f: impl Fn(qdrant_client::QdrantClient<InterceptedService<Channel, MetadataInterceptor>>) -> O,
) -> QdrantResult<T> {
let result = self
.channel
.with_channel(
|channel| {
let service = self.with_api_key(channel);
let mut client = qdrant_client::QdrantClient::new(service)
.max_decoding_message_size(usize::MAX);
if let Some(compression) = self.config.compression {
client = client
.send_compressed(compression.into())
.accept_compressed(compression.into());
}
f(client)
},
true,
)
.await?;
Ok(result)
}
/// Health check.
///
/// Do a health check and fetch server information such as the current version and commit.
///
/// ```no_run
///# use qdrant_client::{Qdrant, QdrantError};
///# async fn list_collections(client: &Qdrant)
///# -> Result<(), QdrantError> {
/// client.health_check().await?;
///# Ok(())
///# }
/// ```
pub async fn health_check(&self) -> QdrantResult<HealthCheckReply> {
self.with_root_qdrant_client(|mut qdrant_api| async move {
let result = qdrant_api.health_check(HealthCheckRequest {}).await?;
Ok(result.into_inner())
})
.await
}
}