Skip to content

Commit d7f419b

Browse files
committed
chore: Support all Value::Basic values
Signed-off-by: Anush008 <[email protected]>
1 parent c5f6a9d commit d7f419b

File tree

1 file changed

+22
-4
lines changed

1 file changed

+22
-4
lines changed

src/ops/storages/qdrant.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ impl ExportTargetExecutor for Executor {
8282
}
8383

8484
self.client
85-
.upsert_points(UpsertPointsBuilder::new(&self.collection_name, points))
85+
.upsert_points(UpsertPointsBuilder::new(&self.collection_name, points).wait(true))
8686
.await?;
8787
Ok(())
8888
}
@@ -115,20 +115,25 @@ fn values_to_payload(
115115

116116
match value {
117117
Value::Basic(basic_value) => {
118-
let json_value = match basic_value {
118+
let json_value: serde_json::Value = match basic_value {
119119
BasicValue::Bytes(v) => String::from_utf8_lossy(v).into(),
120120
BasicValue::Str(v) => v.clone().to_string().into(),
121121
BasicValue::Bool(v) => (*v).into(),
122122
BasicValue::Int64(v) => (*v).into(),
123123
BasicValue::Float32(v) => (*v as f64).into(),
124124
BasicValue::Float64(v) => (*v).into(),
125125
BasicValue::Range(v) => json!({ "start": v.start, "end": v.end }),
126+
BasicValue::Uuid(v) => v.to_string().into(),
127+
BasicValue::Date(v) => v.to_string().into(),
128+
BasicValue::LocalDateTime(v) => v.to_string().into(),
129+
BasicValue::Time(v) => v.to_string().into(),
130+
BasicValue::OffsetDateTime(v) => v.to_string().into(),
131+
BasicValue::Json(v) => (**v).clone(),
126132
BasicValue::Vector(v) => {
127133
let vector = convert_to_vector(v.to_vec());
128134
vectors = vectors.add_vector(field_name, vector);
129135
continue;
130136
}
131-
_ => bail!("Unsupported BasicValue type in Value::Basic"),
132137
};
133138
payload.insert(field_name.clone(), json_value.into());
134139
}
@@ -296,6 +301,14 @@ impl StorageFactoryBase for Arc<Factory> {
296301
_context: Arc<FlowInstanceContext>,
297302
) -> Result<ExportTargetBuildOutput<Self>> {
298303
// TODO(Anush008): Add as a field to the Spec
304+
305+
if key_fields_schema.len() > 1 {
306+
api_bail!(
307+
"Expected only one primary key for the point ID. Got {}.",
308+
key_fields_schema.len()
309+
)
310+
}
311+
299312
let url = "http://localhost:6334/";
300313
let collection_name = spec.collection_name.clone();
301314

@@ -324,7 +337,8 @@ impl StorageFactoryBase for Arc<Factory> {
324337
_key: String,
325338
_desired: Option<()>,
326339
_existing: setup::CombinedState<()>,
327-
) -> Result<impl setup::ResourceSetupStatusCheck<String, ()> + 'static> {
340+
_auth_registry: &Arc<AuthRegistry>,
341+
) -> Result<impl setup::ResourceSetupStatusCheck + 'static> {
328342
Err(anyhow!(
329343
"Set `setup_by_user` to `true` to use Qdrant storage"
330344
)) as Result<Infallible, _>
@@ -337,4 +351,8 @@ impl StorageFactoryBase for Arc<Factory> {
337351
) -> Result<SetupStateCompatibility> {
338352
Ok(SetupStateCompatibility::Compatible)
339353
}
354+
355+
fn describe_resource(&self, key: &String) -> Result<String> {
356+
Ok(format!("Qdrant collection {}", key))
357+
}
340358
}

0 commit comments

Comments
 (0)