Skip to content

Commit 2015fc5

Browse files
committed
feat: Qdrant storage
Signed-off-by: Anush008 <[email protected]>
1 parent 6e8244d commit 2015fc5

File tree

3 files changed

+195
-0
lines changed

3 files changed

+195
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,4 @@ hyper-rustls = { version = "0.27.5" }
8787
yup-oauth2 = "12.1.0"
8888
rustls = { version = "0.23.25" }
8989
http-body-util = "0.1.3"
90+
qdrant-client = "1.13.0"

src/ops/storages/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
pub mod postgres;
2+
pub mod qdrant;

src/ops/storages/qdrant.rs

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
use std::sync::Arc;
2+
3+
use crate::base::spec::*;
4+
use crate::ops::sdk::*;
5+
use crate::setup;
6+
use crate::utils::db::ValidIdentifier;
7+
use anyhow::Result;
8+
use derivative::Derivative;
9+
use futures::FutureExt;
10+
use serde::Serialize;
11+
12+
#[derive(Debug, Deserialize)]
13+
pub struct Spec {
14+
qdrant_url: Option<String>,
15+
collection_name: Option<String>,
16+
}
17+
const BIND_LIMIT: usize = 65535;
18+
19+
pub struct Executor {
20+
collection_name: ValidIdentifier,
21+
key_fields_schema: Vec<FieldSchema>,
22+
value_fields_schema: Vec<FieldSchema>,
23+
}
24+
25+
impl Executor {
26+
fn new(
27+
collection_name: String,
28+
key_fields_schema: Vec<FieldSchema>,
29+
value_fields_schema: Vec<FieldSchema>,
30+
) -> Result<Self> {
31+
let collection_name = ValidIdentifier::try_from(collection_name)?;
32+
Ok(Self {
33+
key_fields_schema,
34+
value_fields_schema,
35+
collection_name,
36+
})
37+
}
38+
}
39+
40+
#[async_trait]
41+
impl ExportTargetExecutor for Executor {
42+
async fn apply_mutation(&self, mutation: ExportTargetMutation) -> Result<()> {
43+
let num_parameters = self.key_fields_schema.len() + self.value_fields_schema.len();
44+
for _upsert_chunk in mutation.upserts.chunks(BIND_LIMIT / num_parameters) {}
45+
46+
// TODO: Find a way to batch delete.
47+
for _delete_key in mutation.delete_keys.iter() {}
48+
49+
Ok(())
50+
}
51+
}
52+
53+
#[async_trait]
54+
impl QueryTarget for Executor {
55+
async fn search(&self, _query: VectorMatchQuery) -> Result<QueryResults> {
56+
Ok(QueryResults {
57+
fields: vec![],
58+
results: vec![],
59+
})
60+
}
61+
}
62+
63+
#[derive(Default)]
64+
pub struct Factory {}
65+
66+
67+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
68+
pub struct TableId {
69+
database_url: Option<String>,
70+
collection_name: String,
71+
}
72+
73+
#[derive(Debug, Clone, Serialize, Deserialize)]
74+
pub struct SetupState {}
75+
76+
#[derive(Derivative)]
77+
#[derivative(Debug)]
78+
pub struct SetupStatusCheck {
79+
#[derivative(Debug = "ignore")]
80+
table_id: TableId,
81+
82+
desired_state: Option<SetupState>,
83+
}
84+
85+
impl SetupStatusCheck {
86+
fn new(table_id: TableId, desired_state: Option<SetupState>) -> Self {
87+
Self {
88+
table_id,
89+
desired_state,
90+
}
91+
}
92+
}
93+
94+
#[async_trait]
95+
impl setup::ResourceSetupStatusCheck for SetupStatusCheck {
96+
type Key = TableId;
97+
type State = SetupState;
98+
99+
fn describe_resource(&self) -> String {
100+
format!("Qdrant table {}", "TABLE ID")
101+
}
102+
103+
fn key(&self) -> &Self::Key {
104+
&self.table_id
105+
}
106+
107+
fn desired_state(&self) -> Option<&Self::State> {
108+
self.desired_state.as_ref()
109+
}
110+
111+
fn describe_changes(&self) -> Vec<String> {
112+
vec![]
113+
}
114+
115+
fn change_type(&self) -> setup::SetupChangeType {
116+
setup::SetupChangeType::NoChange
117+
}
118+
119+
async fn apply_change(&self) -> Result<()> {
120+
Ok(())
121+
}
122+
}
123+
124+
impl StorageFactoryBase for Arc<Factory> {
125+
type Spec = Spec;
126+
type SetupState = SetupState;
127+
type Key = TableId;
128+
129+
fn name(&self) -> &str {
130+
"Qdrant"
131+
}
132+
133+
fn build(
134+
self: Arc<Self>,
135+
name: String,
136+
target_id: i32,
137+
spec: Spec,
138+
key_fields_schema: Vec<FieldSchema>,
139+
value_fields_schema: Vec<FieldSchema>,
140+
storage_options: IndexOptions,
141+
context: Arc<FlowInstanceContext>,
142+
) -> Result<(
143+
(TableId, SetupState),
144+
ExecutorFuture<'static, (Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>,
145+
)> {
146+
let _ = storage_options;
147+
let table_id = TableId {
148+
database_url: spec.qdrant_url.clone(),
149+
collection_name: spec.collection_name.unwrap_or_else(|| {
150+
format!("{}__{}__{}", context.flow_instance_name, name, target_id)
151+
}),
152+
};
153+
let setup_state = SetupState {};
154+
let collection_name = table_id.collection_name.clone();
155+
let executors = async move {
156+
let executor = Arc::new(Executor::new(
157+
collection_name,
158+
key_fields_schema,
159+
value_fields_schema,
160+
)?);
161+
let query_target = executor.clone();
162+
Ok((
163+
executor as Arc<dyn ExportTargetExecutor>,
164+
Some(query_target as Arc<dyn QueryTarget>),
165+
))
166+
};
167+
Ok(((table_id, setup_state), executors.boxed()))
168+
}
169+
170+
fn check_setup_status(
171+
&self,
172+
key: TableId,
173+
desired: Option<SetupState>,
174+
existing: setup::CombinedState<SetupState>,
175+
) -> Result<
176+
impl setup::ResourceSetupStatusCheck<Key = TableId, State = SetupState> + 'static,
177+
> {
178+
let _ = existing;
179+
Ok(SetupStatusCheck::new(key, desired))
180+
}
181+
182+
fn will_keep_all_existing_data(
183+
&self,
184+
_name: &str,
185+
_target_id: i32,
186+
desired: &SetupState,
187+
existing: &SetupState,
188+
) -> Result<bool> {
189+
let _ = existing;
190+
let _ = desired;
191+
Ok(true)
192+
}
193+
}

0 commit comments

Comments
 (0)