Skip to content

Commit d4261ee

Browse files
Anush008pw-ppodhajski
authored andcommitted
refactor: More review updates
Signed-off-by: Anush008 <[email protected]>
1 parent eead16b commit d4261ee

File tree

4 files changed

+70
-75
lines changed

4 files changed

+70
-75
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
66
## [Unreleased]
77

88
### Added
9-
- New external integration with [Qdrant](https://qdrant.tech/)
9+
- New external integration with [Qdrant](https://qdrant.tech/).
1010
- `pw.io.mysql.write` method for writing to MySQL. It supports two output table types: stream of changes and a realtime-updated data snapshot.
1111

1212
### Changed

src/engine/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::connectors::synchronization::Error as InputSynchronizationError;
1111
use crate::persistence::Error as PersistenceBackendError;
1212

1313
use crate::connectors::data_storage::{ReadError, WriteError};
14+
use crate::external_integration::IndexingError;
1415

1516
#[allow(clippy::module_name_repetitions)]
1617
pub type DynError = Box<dyn error::Error + Send + Sync>;
@@ -143,6 +144,9 @@ pub enum Error {
143144
#[error("input synchronization failed: {0}")]
144145
InputSynchronization(#[from] InputSynchronizationError),
145146

147+
#[error("indexing has failed: {0}")]
148+
Indexing(#[from] IndexingError),
149+
146150
#[error("precision for HyperLogLogPlus should be between 4 and 18 but is {0}")]
147151
HyperLogLogPlusInvalidPrecision(usize),
148152

src/external_integration/mod.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,6 @@ pub enum IndexingError {
6868
Qdrant(#[from] qdrant_client::QdrantError),
6969
}
7070

71-
impl From<IndexingError> for Error {
72-
fn from(error: IndexingError) -> Self {
73-
Error::Other(Box::new(error))
74-
}
75-
}
76-
7771
impl IndexDerivedImpl {
7872
pub fn new(
7973
inner: Box<dyn ExternalIndex>,

src/external_integration/qdrant_integration.rs

Lines changed: 65 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ impl QdrantIndex {
3434
vector_size: usize,
3535
api_key: Option<String>,
3636
) -> Result<Self, Error> {
37-
let runtime = create_async_tokio_runtime()
38-
.map_err(|e| Error::Other(format!("Failed to create async runtime: {e}").into()))?;
37+
let runtime = create_async_tokio_runtime().map_err(IndexingError::from)?;
3938

4039
let client = Qdrant::from_url(url)
4140
.api_key(api_key)
@@ -84,83 +83,82 @@ impl QdrantIndex {
8483
)
8584
.await?;
8685

87-
let mut results = Vec::with_capacity(search_result.result.len());
88-
for point in search_result.result {
89-
let Some(point_id) = point.id else {
90-
warn!("Qdrant returned point without ID, ignoring");
91-
continue;
92-
};
93-
94-
let Some(point_id_options) = point_id.point_id_options else {
95-
warn!("Qdrant returned point ID without options, ignoring");
96-
continue;
97-
};
98-
99-
let id = match point_id_options {
100-
PointIdOptions::Num(num) => num,
101-
PointIdOptions::Uuid(_) => {
102-
warn!("Qdrant returned UUID point ID, expected numeric ID");
103-
continue;
104-
}
105-
};
106-
107-
let Some(key) = self.key_to_id_mapper.get_key_for_id(id) else {
108-
warn!("Qdrant index returned a nonexistent ID {id}, ignoring");
109-
continue;
110-
};
111-
112-
results.push(KeyScoreMatch {
113-
key,
114-
score: f64::from(point.score),
115-
});
116-
}
86+
let results = search_result
87+
.result
88+
.into_iter()
89+
.filter_map(|point| {
90+
let Some(point_id) = point.id else {
91+
warn!("Qdrant returned point without ID, ignoring");
92+
return None;
93+
};
94+
95+
let Some(point_id_options) = point_id.point_id_options else {
96+
warn!("Qdrant returned point ID without options, ignoring");
97+
return None;
98+
};
99+
100+
let id = match point_id_options {
101+
PointIdOptions::Num(num) => num,
102+
PointIdOptions::Uuid(_) => {
103+
warn!("Qdrant returned UUID point ID, expected numeric ID");
104+
return None;
105+
}
106+
};
107+
108+
let Some(key) = self.key_to_id_mapper.get_key_for_id(id) else {
109+
warn!("Qdrant index returned a nonexistent ID {id}, ignoring");
110+
return None;
111+
};
112+
113+
Some(KeyScoreMatch {
114+
key,
115+
score: f64::from(point.score),
116+
})
117+
})
118+
.collect();
117119

118120
Ok(results)
119121
}
120122

121123
#[allow(clippy::cast_possible_truncation)]
122124
fn add_batch(&mut self, data: Vec<(Key, Vec<f64>)>) -> Result<(), IndexingError> {
123-
let mut points = Vec::with_capacity(data.len());
124-
125-
for (key, vec_data) in data {
126-
if vec_data.len() != self.vector_size {
127-
return Err(IndexingError::Io(std::io::Error::new(
128-
std::io::ErrorKind::InvalidData,
129-
format!(
130-
"Vector size mismatch: expected {}, got {}",
131-
self.vector_size,
132-
vec_data.len()
133-
),
134-
)));
135-
}
125+
let points: Result<Vec<_>, IndexingError> = data
126+
.into_iter()
127+
.map(|(key, vec_data)| {
128+
if vec_data.len() != self.vector_size {
129+
return Err(IndexingError::Io(std::io::Error::new(
130+
std::io::ErrorKind::InvalidData,
131+
format!(
132+
"Vector size mismatch: expected {}, got {}",
133+
self.vector_size,
134+
vec_data.len()
135+
),
136+
)));
137+
}
136138

137-
let key_id = self.key_to_id_mapper.get_next_free_u64_id(key);
138-
let vec_f32: Vec<f32> = vec_data.iter().map(|v| *v as f32).collect();
139-
points.push(PointStruct::new(
140-
key_id,
141-
vec_f32,
142-
HashMap::<String, Value>::new(),
143-
));
144-
}
139+
let key_id = self.key_to_id_mapper.get_next_free_u64_id(key);
140+
let vec_f32: Vec<f32> = vec_data.iter().map(|v| *v as f32).collect();
141+
Ok(PointStruct::new(
142+
key_id,
143+
vec_f32,
144+
HashMap::<String, Value>::new(),
145+
))
146+
})
147+
.collect();
145148

146149
self.runtime.block_on(
147150
self.client
148-
.upsert_points(UpsertPointsBuilder::new(&self.collection_name, points)),
151+
.upsert_points(UpsertPointsBuilder::new(&self.collection_name, points?)),
149152
)?;
150153

151154
Ok(())
152155
}
153156

154157
fn remove_batch(&mut self, keys: Vec<Key>) -> Result<Vec<u64>, IndexingError> {
155-
let mut key_ids = Vec::with_capacity(keys.len());
156-
let mut missing_keys = Vec::new();
157-
158-
for key in keys {
159-
match self.key_to_id_mapper.remove_key(key) {
160-
Ok(key_id) => key_ids.push(key_id),
161-
Err(_) => missing_keys.push(key),
162-
}
163-
}
158+
let key_ids: Vec<u64> = keys
159+
.into_iter()
160+
.filter_map(|key| self.key_to_id_mapper.remove_key(key).ok())
161+
.collect();
164162

165163
if !key_ids.is_empty() {
166164
self.runtime.block_on(self.client.delete_points(
@@ -221,11 +219,10 @@ impl NonFilteringExternalIndex<Vec<f64>, Vec<f64>> for QdrantIndex {
221219
let keys: Vec<Key> = queries.iter().map(|(k, _, _)| *k).collect();
222220

223221
let results = self.runtime.block_on(async {
224-
let mut futures = Vec::with_capacity(queries.len());
225-
226-
for (_, data, limit) in queries {
227-
futures.push(self.search_one_async(data, *limit));
228-
}
222+
let futures: Vec<_> = queries
223+
.iter()
224+
.map(|(_, data, limit)| self.search_one_async(data, *limit))
225+
.collect();
229226

230227
futures::future::join_all(futures).await
231228
});

0 commit comments

Comments
 (0)