Skip to content

Commit 566c150

Browse files
committed
cleanup(qdrant): remove query related logic for qdrant
1 parent b2d6de5 commit 566c150

File tree

3 files changed

+17
-170
lines changed

3 files changed

+17
-170
lines changed

examples/text_embedding_qdrant/main.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def text_to_embedding(
2626
@cocoindex.flow_def(name="TextEmbeddingWithQdrant")
2727
def text_embedding_flow(
2828
flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
29-
):
29+
) -> None:
3030
"""
3131
Define an example flow that embeds text into a vector database.
3232
"""
@@ -65,7 +65,7 @@ def text_embedding_flow(
6565
)
6666

6767

68-
def _main():
68+
def _main() -> None:
6969
# Initialize Qdrant client
7070
client = QdrantClient(url=QDRANT_GRPC_URL, prefer_grpc=True)
7171

@@ -87,6 +87,8 @@ def _main():
8787
for result in search_results:
8888
score = result.score
8989
payload = result.payload
90+
if payload is None:
91+
continue
9092
print(f"[{score:.3f}] {payload['filename']}")
9193
print(f" {payload['text']}")
9294
print("---")

src/ops/registration.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ fn register_executor_factories(registry: &mut ExecutorFactoryRegistry) -> Result
1717
functions::extract_by_llm::Factory.register(registry)?;
1818

1919
storages::postgres::Factory::default().register(registry)?;
20-
Arc::new(storages::qdrant::Factory::default()).register(registry)?;
20+
storages::qdrant::register(registry)?;
2121
storages::kuzu::register(registry, reqwest_client)?;
2222

2323
storages::neo4j::Factory::new().register(registry)?;

src/ops/storages/qdrant.rs

Lines changed: 12 additions & 167 deletions
Original file line numberDiff line numberDiff line change
@@ -3,28 +3,26 @@ use std::convert::Infallible;
33
use std::fmt::Display;
44
use std::sync::Arc;
55

6-
use crate::base::duration::parse_duration;
6+
use crate::ops::registry::ExecutorFactoryRegistry;
77
use crate::ops::sdk::*;
88
use crate::setup;
99
use anyhow::{Result, bail};
1010
use futures::FutureExt;
1111
use qdrant_client::Qdrant;
12-
use qdrant_client::qdrant::vectors_output::VectorsOptions;
1312
use qdrant_client::qdrant::{
1413
DeletePointsBuilder, NamedVectors, PointId, PointStruct, PointsIdsList, UpsertPointsBuilder,
1514
Value as QdrantValue,
1615
};
17-
use qdrant_client::qdrant::{Query, QueryPointsBuilder, ScoredPoint};
1816
use serde_json::json;
1917

2018
#[derive(Debug, Deserialize, Clone)]
21-
pub struct Spec {
19+
struct Spec {
2220
collection_name: String,
2321
grpc_url: String,
2422
api_key: Option<String>,
2523
}
2624

27-
pub struct ExportContext {
25+
struct ExportContext {
2826
client: Qdrant,
2927
collection_name: String,
3028
value_fields_schema: Vec<FieldSchema>,
@@ -129,8 +127,8 @@ fn values_to_payload(
129127
BasicValue::Range(v) => json!({ "start": v.start, "end": v.end }),
130128
BasicValue::Uuid(v) => v.to_string().into(),
131129
BasicValue::Date(v) => v.to_string().into(),
132-
BasicValue::LocalDateTime(v) => v.to_string().into(),
133130
BasicValue::Time(v) => v.to_string().into(),
131+
BasicValue::LocalDateTime(v) => v.to_string().into(),
134132
BasicValue::OffsetDateTime(v) => v.to_string().into(),
135133
BasicValue::TimeDelta(v) => v.to_string().into(),
136134
BasicValue::Json(v) => (**v).clone(),
@@ -163,167 +161,11 @@ fn convert_to_vector(v: Vec<BasicValue>) -> Vec<f32> {
163161
.collect()
164162
}
165163

166-
fn into_value(point: &ScoredPoint, schema: &FieldSchema) -> Result<Value> {
167-
let field_name = &schema.name;
168-
let typ = schema.value_type.typ.clone();
169-
let value = match typ {
170-
ValueType::Basic(basic_type) => {
171-
let basic_value = match basic_type {
172-
BasicValueType::Str => point.payload.get(field_name).and_then(|v| {
173-
v.as_str()
174-
.map(|s| BasicValue::Str(Arc::from(s.to_string())))
175-
}),
176-
BasicValueType::Bool => point
177-
.payload
178-
.get(field_name)
179-
.and_then(|v| v.as_bool().map(BasicValue::Bool)),
180-
181-
BasicValueType::Int64 => point
182-
.payload
183-
.get(field_name)
184-
.and_then(|v| v.as_integer().map(BasicValue::Int64)),
185-
186-
BasicValueType::Float32 => point
187-
.payload
188-
.get(field_name)
189-
.and_then(|v| v.as_double().map(|f| BasicValue::Float32(f as f32))),
190-
191-
BasicValueType::Float64 => point
192-
.payload
193-
.get(field_name)
194-
.and_then(|v| v.as_double().map(BasicValue::Float64)),
195-
196-
BasicValueType::TimeDelta => point.payload.get(field_name).and_then(|v| {
197-
v.as_str()
198-
.and_then(|s| parse_duration(s).ok().map(BasicValue::TimeDelta))
199-
}),
200-
201-
BasicValueType::Json => point
202-
.payload
203-
.get(field_name)
204-
.map(|v| BasicValue::Json(Arc::from(v.clone().into_json()))),
205-
206-
BasicValueType::Vector(_) => point
207-
.vectors
208-
.as_ref()
209-
.and_then(|v| v.vectors_options.as_ref())
210-
.and_then(|vectors_options| match vectors_options {
211-
VectorsOptions::Vector(vector) => {
212-
let values = vector
213-
.data
214-
.iter()
215-
.map(|f| BasicValue::Float32(*f))
216-
.collect::<Vec<_>>();
217-
Some(BasicValue::Vector(Arc::from(values)))
218-
}
219-
VectorsOptions::Vectors(vectors) => {
220-
vectors.vectors.get(field_name).map(|vector| {
221-
let values = vector
222-
.data
223-
.iter()
224-
.map(|f| BasicValue::Float32(*f))
225-
.collect::<Vec<_>>();
226-
BasicValue::Vector(Arc::from(values))
227-
})
228-
}
229-
}),
230-
231-
BasicValueType::Uuid => point
232-
.payload
233-
.get(field_name)
234-
.and_then(|v| v.as_str()?.parse().ok().map(BasicValue::Uuid)),
235-
236-
BasicValueType::Date => point
237-
.payload
238-
.get(field_name)
239-
.and_then(|v| v.as_str()?.parse().ok().map(BasicValue::Date)),
240-
241-
BasicValueType::Time => point
242-
.payload
243-
.get(field_name)
244-
.and_then(|v| v.as_str()?.parse().ok().map(BasicValue::Time)),
245-
246-
BasicValueType::LocalDateTime => point
247-
.payload
248-
.get(field_name)
249-
.and_then(|v| v.as_str()?.parse().ok().map(BasicValue::LocalDateTime)),
250-
251-
BasicValueType::OffsetDateTime => point
252-
.payload
253-
.get(field_name)
254-
.and_then(|v| v.as_str()?.parse().ok().map(BasicValue::OffsetDateTime)),
255-
256-
BasicValueType::Range => point.payload.get(field_name).and_then(|v| {
257-
v.as_struct().and_then(|s| {
258-
let start = s.fields.get("start").and_then(|f| f.as_integer());
259-
let end = s.fields.get("end").and_then(|f| f.as_integer());
260-
261-
match (start, end) {
262-
(Some(start), Some(end)) => Some(BasicValue::Range(RangeValue {
263-
start: start as usize,
264-
end: end as usize,
265-
})),
266-
_ => None,
267-
}
268-
})
269-
}),
270-
_ => {
271-
anyhow::bail!("Unsupported value type")
272-
}
273-
};
274-
basic_value.map(Value::Basic)
275-
}
276-
_ => point
277-
.payload
278-
.get(field_name)
279-
.map(|v| Value::from_json(v.clone().into_json(), &typ))
280-
.transpose()?,
281-
};
282-
283-
let final_value = if let Some(v) = value { v } else { Value::Null };
284-
Ok(final_value)
285-
}
286-
287-
#[async_trait]
288-
impl QueryTarget for ExportContext {
289-
async fn search(&self, query: VectorMatchQuery) -> Result<QueryResults> {
290-
let points = self
291-
.client
292-
.query(
293-
QueryPointsBuilder::new(&self.collection_name)
294-
.query(Query::new_nearest(query.vector))
295-
.limit(query.limit as u64)
296-
.using(query.vector_field_name)
297-
.with_payload(true)
298-
.with_vectors(true),
299-
)
300-
.await?
301-
.result;
302-
303-
let results = points
304-
.iter()
305-
.map(|point| {
306-
let score = point.score as f64;
307-
let data = self
308-
.all_fields
309-
.iter()
310-
.map(|schema| into_value(point, schema))
311-
.collect::<Result<Vec<_>>>()?;
312-
Ok(QueryResult { data, score })
313-
})
314-
.collect::<Result<Vec<QueryResult>>>()?;
315-
Ok(QueryResults {
316-
fields: self.all_fields.clone(),
317-
results,
318-
})
319-
}
320-
}
321-
322164
#[derive(Default)]
323-
pub struct Factory {}
165+
struct Factory {}
324166

325167
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
326-
pub struct CollectionId {
168+
struct CollectionId {
327169
collection_name: String,
328170
}
329171

@@ -335,7 +177,7 @@ impl Display for CollectionId {
335177
}
336178

337179
#[async_trait]
338-
impl StorageFactoryBase for Arc<Factory> {
180+
impl StorageFactoryBase for Factory {
339181
type Spec = Spec;
340182
type DeclarationSpec = ();
341183
type SetupState = ();
@@ -375,11 +217,10 @@ impl StorageFactoryBase for Arc<Factory> {
375217
d.key_fields_schema,
376218
d.value_fields_schema,
377219
)?);
378-
let query_target = export_context.clone();
379220
let executors = async move {
380221
Ok(TypedExportTargetExecutors {
381222
export_context,
382-
query_target: Some(query_target as Arc<dyn QueryTarget>),
223+
query_target: None,
383224
})
384225
};
385226
Ok(TypedExportDataCollectionBuildOutput {
@@ -435,3 +276,7 @@ impl StorageFactoryBase for Arc<Factory> {
435276
Err(anyhow!("Qdrant does not support setup changes"))
436277
}
437278
}
279+
280+
pub fn register(registry: &mut ExecutorFactoryRegistry) -> Result<()> {
281+
Factory {}.register(registry)
282+
}

0 commit comments

Comments
 (0)