@@ -9,11 +9,14 @@ use crate::setup;
99use anyhow:: { bail, Result } ;
1010use futures:: FutureExt ;
1111use qdrant_client:: qdrant:: vectors_output:: VectorsOptions ;
12- use qdrant_client:: qdrant:: { NamedVectors , PointStruct , UpsertPointsBuilder , Value as QdrantValue } ;
12+ use qdrant_client:: qdrant:: {
13+ NamedVectors , PointId , PointStruct , UpsertPointsBuilder , Value as QdrantValue ,
14+ } ;
1315use qdrant_client:: qdrant:: { Query , QueryPointsBuilder , ScoredPoint } ;
1416use qdrant_client:: Qdrant ;
1517use serde:: Serialize ;
1618use serde_json:: json;
19+ use uuid:: Uuid ;
1720
1821fn key_value_fields_iter < ' a > (
1922 key_fields_schema : & [ FieldSchema ] ,
@@ -45,8 +48,8 @@ pub struct Executor {
4548
4649impl Executor {
4750 fn new (
48- url : & str ,
49- collection_name : & str ,
51+ url : String ,
52+ collection_name : String ,
5053 key_fields_schema : Vec < FieldSchema > ,
5154 value_fields_schema : Vec < FieldSchema > ,
5255 ) -> Result < Self > {
@@ -56,11 +59,11 @@ impl Executor {
5659 . cloned ( )
5760 . collect :: < Vec < _ > > ( ) ;
5861 Ok ( Self {
59- client : Qdrant :: from_url ( url) . build ( ) ?,
62+ client : Qdrant :: from_url ( & url) . build ( ) ?,
6063 key_fields_schema,
6164 value_fields_schema,
6265 all_fields,
63- collection_name : collection_name . to_string ( ) ,
66+ collection_name,
6467 } )
6568 }
6669}
@@ -71,12 +74,11 @@ impl ExportTargetExecutor for Executor {
7174 let mut points: Vec < PointStruct > = Vec :: with_capacity ( mutation. upserts . len ( ) ) ;
7275 for upsert in mutation. upserts . iter ( ) {
7376 let key_fields = key_value_fields_iter ( & self . key_fields_schema , & upsert. key ) ?;
74- let key_fields = key_values_to_payload ( key_fields, & self . key_fields_schema ) ?;
75- let ( mut payload, vectors) =
77+ let point_id = key_to_point_id ( key_fields) ?;
78+ let ( payload, vectors) =
7679 values_to_payload ( & upsert. value . fields , & self . value_fields_schema ) ?;
77- payload. extend ( key_fields) ;
7880
79- points. push ( PointStruct :: new ( 1 , vectors, payload) ) ;
81+ points. push ( PointStruct :: new ( point_id , vectors, payload) ) ;
8082 }
8183
8284 self . client
@@ -85,27 +87,20 @@ impl ExportTargetExecutor for Executor {
8587 Ok ( ( ) )
8688 }
8789}
90+ fn key_to_point_id ( key_values : & [ KeyValue ] ) -> Result < PointId > {
91+ let point_id = if let Some ( key_value) = key_values. first ( ) {
92+ match key_value {
93+ KeyValue :: Str ( v) => PointId :: from ( v. to_string ( ) ) ,
94+ KeyValue :: Int64 ( v) => PointId :: from ( * v as u64 ) ,
95+ KeyValue :: Uuid ( v) => PointId :: from ( v. to_string ( ) ) ,
96+ _ => bail ! ( "Unsupported Qdrant Point ID key type" ) ,
97+ }
98+ } else {
99+ let uuid = Uuid :: new_v4 ( ) . to_string ( ) ;
100+ PointId :: from ( uuid)
101+ } ;
88102
89- fn key_values_to_payload (
90- key_fields : & [ KeyValue ] ,
91- schema : & [ FieldSchema ] ,
92- ) -> Result < HashMap < String , QdrantValue > > {
93- let mut payload = HashMap :: with_capacity ( key_fields. len ( ) ) ;
94-
95- for ( key_value, field_schema) in key_fields. iter ( ) . zip ( schema. iter ( ) ) {
96- let json_value = match key_value {
97- KeyValue :: Bytes ( v) => String :: from_utf8_lossy ( v) . into ( ) ,
98- KeyValue :: Str ( v) => v. to_string ( ) . into ( ) ,
99- KeyValue :: Bool ( v) => ( * v) . into ( ) ,
100- KeyValue :: Int64 ( v) => ( * v) . into ( ) ,
101- KeyValue :: Uuid ( v) => v. to_string ( ) . into ( ) ,
102- KeyValue :: Range ( v) => json ! ( { "start" : v. start, "end" : v. end } ) ,
103- _ => bail ! ( "Unsupported key value type" ) ,
104- } ;
105- payload. insert ( field_schema. name . clone ( ) , json_value. into ( ) ) ;
106- }
107-
108- Ok ( payload)
103+ Ok ( point_id)
109104}
110105
111106fn values_to_payload (
@@ -303,10 +298,11 @@ impl StorageFactoryBase for Arc<Factory> {
303298 // TODO(Anush008): Add as a field to the Spec
304299 let url = "http://localhost:6334/" ;
305300 let collection_name = spec. collection_name . clone ( ) ;
301+
306302 let executors = async move {
307303 let executor = Arc :: new ( Executor :: new (
308- url,
309- & spec. collection_name . clone ( ) ,
304+ url. to_string ( ) ,
305+ spec. collection_name . clone ( ) ,
310306 key_fields_schema,
311307 value_fields_schema,
312308 ) ?) ;
0 commit comments