Skip to content

Commit e17b046

Browse files
authored
fix(etcd): KeyValueStore etcd uses our client not the lib directly (#4199)
Signed-off-by: Graham King <[email protected]>
1 parent 560bb2f commit e17b046

File tree

1 file changed

+31
-30
lines changed
  • lib/runtime/src/storage/key_value_store

1 file changed

+31
-30
lines changed

lib/runtime/src/storage/key_value_store/etcd.rs

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::time::Duration;
77

88
use crate::{
99
storage::key_value_store::{Key, KeyValue, WatchEvent},
10-
transports::etcd::Client,
10+
transports::etcd,
1111
};
1212
use async_stream::stream;
1313
use async_trait::async_trait;
@@ -17,11 +17,11 @@ use super::{KeyValueBucket, KeyValueStore, StoreError, StoreOutcome};
1717

1818
#[derive(Clone)]
1919
pub struct EtcdStore {
20-
client: Client,
20+
client: etcd::Client,
2121
}
2222

2323
impl EtcdStore {
24-
pub fn new(client: Client) -> Self {
24+
pub fn new(client: etcd::Client) -> Self {
2525
Self { client }
2626
}
2727
}
@@ -61,7 +61,7 @@ impl KeyValueStore for EtcdStore {
6161
}
6262

6363
pub struct EtcdBucket {
64-
client: Client,
64+
client: etcd::Client,
6565
bucket_name: String,
6666
}
6767

@@ -114,36 +114,37 @@ impl KeyValueBucket for EtcdBucket {
114114
) -> Result<Pin<Box<dyn futures::Stream<Item = WatchEvent> + Send + 'life0>>, StoreError> {
115115
let prefix = make_key(&self.bucket_name, &"".into());
116116
tracing::trace!("etcd watch: {prefix}");
117-
let (watcher, mut watch_stream) = self
117+
let watcher = self
118118
.client
119-
.etcd_client()
120-
.clone()
121-
.watch(prefix.as_bytes(), Some(WatchOptions::new().with_prefix()))
119+
.kv_watch_prefix(&prefix)
122120
.await
123121
.map_err(|e| StoreError::EtcdError(e.to_string()))?;
122+
let (_, mut watch_stream) = watcher.dissolve();
124123
let output = stream! {
125-
let _watcher = watcher; // Keep it alive. Not sure if necessary.
126-
while let Ok(Some(resp)) = watch_stream.message().await {
127-
for e in resp.events() {
128-
let Some(kv) = e.kv() else {
129-
continue;
130-
};
131-
let (k_bytes, v_bytes) = kv.clone().into_key_value();
132-
let key = match String::from_utf8(k_bytes) {
133-
Ok(k) => k,
134-
Err(err) => {
135-
tracing::error!(%err, prefix, "Invalid UTF8 in etcd key");
136-
continue;
137-
}
138-
};
139-
match e.event_type() {
140-
EventType::Put => {
141-
let item = KeyValue::new(key, v_bytes.into());
142-
yield WatchEvent::Put(item);
143-
}
144-
EventType::Delete => {
145-
yield WatchEvent::Delete(Key::from_raw(key));
146-
}
124+
while let Some(event) = watch_stream.recv().await {
125+
match event {
126+
etcd::WatchEvent::Put(kv) => {
127+
let (k, v) = kv.into_key_value();
128+
let key = match String::from_utf8(k) {
129+
Ok(k) => k,
130+
Err(err) => {
131+
tracing::error!(%err, prefix, "Invalid UTF8 in etcd key");
132+
continue;
133+
}
134+
};
135+
let item = KeyValue::new(key, v.into());
136+
yield WatchEvent::Put(item);
137+
}
138+
etcd::WatchEvent::Delete(kv) => {
139+
let (k, _) = kv.into_key_value();
140+
let key = match String::from_utf8(k) {
141+
Ok(k) => k,
142+
Err(err) => {
143+
tracing::error!(%err, prefix, "Invalid UTF8 in etcd key");
144+
continue;
145+
}
146+
};
147+
yield WatchEvent::Delete(Key::from_raw(key));
147148
}
148149
}
149150
}

0 commit comments

Comments
 (0)