Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions crates/key-value-azure/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl Store for AzureCosmosStore {
}

async fn exists(&self, key: &str) -> Result<bool, Error> {
Ok(self.get_entity::<Key>(key).await?.is_some())
Ok(self.get_entity_by_id::<Key>(key).await?.is_some())
}

async fn get_keys(&self) -> Result<Vec<String>, Error> {
Expand Down Expand Up @@ -424,6 +424,28 @@ impl AzureCosmosStore {
.map(|(p, _)| p.clone()))
}

async fn get_entity_by_id<F>(&self, key: &str) -> Result<Option<F>, Error>
where
F: CosmosEntity + Send + Sync + serde::de::DeserializeOwned + Clone,
{
let query = self
.client
.query_documents(Query::new(self.get_id_query(key)))
.query_cross_partition(true)
.max_item_count(1);

// There can be no duplicated keys, so we create the stream and only take the first result.
let mut stream = query.into_stream::<F>();
let Some(res) = stream.next().await else {
return Ok(None);
};
Ok(res
.map_err(log_error)?
.results
.first()
.map(|(p, _)| p.clone()))
}

async fn get_keys(&self) -> Result<Vec<String>, Error> {
let query = self
.client
Expand All @@ -446,8 +468,14 @@ impl AzureCosmosStore {
query
}

fn get_id_query(&self, key: &str) -> String {
let mut query = format!("SELECT c.id FROM c WHERE c.id='{key}'");
self.append_store_id(&mut query, true);
query
}

fn get_keys_query(&self) -> String {
let mut query = "SELECT * FROM c".to_owned();
let mut query = "SELECT c.id, c.store_id FROM c".to_owned();
self.append_store_id(&mut query, false);
query
}
Expand Down
Loading