|
12 | 12 | // See the License for the specific language governing permissions and |
13 | 13 | // limitations under the License. |
14 | 14 |
|
15 | | -use std::fmt; |
16 | | -use std::sync::atomic; |
| 15 | +use std::ops::Deref; |
| 16 | +use std::ops::DerefMut; |
17 | 17 | use std::sync::Arc; |
18 | 18 |
|
19 | | -use databend_common_base::runtime::spawn_named; |
20 | 19 | use databend_common_meta_client::ClientHandle; |
21 | | -use databend_common_meta_types::SeqV; |
22 | | -use futures::FutureExt; |
23 | | -use log::debug; |
24 | | -use tokio::sync::oneshot; |
25 | | -use tokio::sync::Mutex; |
26 | | -use tokio::sync::MutexGuard; |
27 | | -use tokio::task::JoinHandle; |
28 | 20 |
|
29 | | -use crate::cache_data::CacheData; |
30 | | -use crate::errors::Unsupported; |
31 | | -use crate::event_watcher::EventWatcher; |
| 21 | +use crate::meta_cache_types::MetaCacheTypes; |
| 22 | +use crate::meta_client_source; |
32 | 23 |
|
33 | | -/// Cache implemented on top of the distributed meta-service. |
34 | | -/// |
35 | | -/// This cache provides a local view of data stored in the meta-service, with automatic |
36 | | -/// background updates when the underlying data changes. |
37 | | -/// |
38 | | -/// ## Features |
39 | | -/// |
40 | | -/// - **Automatic Synchronization**: Background watcher task keeps local cache in sync with meta-service |
41 | | -/// - **Concurrency Control**: Two-level concurrency control mechanism for safe access |
42 | | -/// - **Safe Reconnection**: Automatic recovery from connection failures with state consistency |
43 | | -/// - **Consistent Initialization**: Ensures cache is fully initialized before use |
44 | | -/// |
45 | | -/// ## Concurrency Control |
46 | | -/// |
47 | | -/// The cache employs a two-level concurrency control mechanism: |
48 | | -/// |
49 | | -/// 1. **Internal Lock (Mutex)**: Protects concurrent access between user operations and the |
50 | | -/// background cache updater. This lock is held briefly during each operation. |
51 | | -/// |
52 | | -/// 2. **External Lock (Method Design)**: Public methods require `&mut self` even for read-only |
53 | | -/// operations. This prevents concurrent access to the cache instance from multiple call sites. |
54 | | -/// External synchronization should be implemented by the caller if needed. |
55 | | -/// |
56 | | -/// This design intentionally separates concerns: |
57 | | -/// - The internal lock handles short-term, fine-grained synchronization with the updater |
58 | | -/// - The external lock requirement (`&mut self`) enables longer-duration access patterns |
59 | | -/// without blocking the background updater unnecessarily |
60 | | -/// |
61 | | -/// Note that despite requiring `&mut self`, all operations are logically read-only |
62 | | -/// with respect to the cache's public API. |
63 | | -/// |
64 | | -/// ## Error Handling |
65 | | -/// |
66 | | -/// - Background watcher task automatically recovers from errors by: |
67 | | -/// - Resetting the cache state |
68 | | -/// - Re-establishing the watch stream |
69 | | -/// - Re-fetching all data to ensure consistency |
70 | | -/// - Users are shielded from transient errors through the abstraction |
71 | 24 | pub struct Cache { |
72 | | - /// The dir path to store the cache ids, without trailing slash. |
73 | | - /// |
74 | | - /// Such as `foo`, not `foo/` |
75 | | - prefix: String, |
76 | | - |
77 | | - /// The background watcher task handle. |
78 | | - watcher_task_handle: Option<JoinHandle<()>>, |
79 | | - |
80 | | - /// The sender to cancel the background watcher task. |
81 | | - /// |
82 | | - /// When this sender is dropped, the corresponding receiver becomes ready, |
83 | | - /// which signals the background task to terminate gracefully. |
84 | | - #[allow(dead_code)] |
85 | | - watcher_cancel_tx: oneshot::Sender<()>, |
86 | | - |
87 | | - data: Arc<Mutex<Result<CacheData, Unsupported>>>, |
| 25 | + pub(crate) inner: sub_cache::Cache<MetaCacheTypes>, |
| 26 | +} |
88 | 27 |
|
89 | | - /// A process-wide unique identifier for the cache. Used for debugging purposes. |
90 | | - uniq: u64, |
| 28 | +impl Deref for Cache { |
| 29 | + type Target = sub_cache::Cache<MetaCacheTypes>; |
91 | 30 |
|
92 | | - /// The name for this cache instance, for debugging. |
93 | | - name: String, |
| 31 | + fn deref(&self) -> &Self::Target { |
| 32 | + &self.inner |
| 33 | + } |
94 | 34 | } |
95 | 35 |
|
96 | | -impl fmt::Display for Cache { |
97 | | - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
98 | | - write!( |
99 | | - f, |
100 | | - "Cache({})({}/)[uniq={}]", |
101 | | - self.name, self.prefix, self.uniq |
102 | | - ) |
| 36 | +impl DerefMut for Cache { |
| 37 | + fn deref_mut(&mut self) -> &mut Self::Target { |
| 38 | + &mut self.inner |
103 | 39 | } |
104 | 40 | } |
105 | 41 |
|
106 | 42 | impl Cache { |
107 | | - /// Create a new cache. |
| 43 | + /// Create a new cache with the given prefix and meta client. |
108 | 44 | /// |
109 | | - /// The created cache starts to watch key-value change event. |
110 | | - /// It does not return until initialization is started. |
111 | | - /// Thus, it is safe to access the data once this method is returned, because initialization holds a lock. |
| 45 | + /// The cache will start watching for changes immediately. |
112 | 46 | /// |
113 | 47 | /// # Parameters |
114 | 48 | /// |
115 | | - /// * `meta_client` - The metadata client to interact with the remote meta-service. |
116 | | - /// * `prefix` - The prefix of the cache name and also the directory name to store in meta-service. |
117 | | - /// * `ctx` - The context info of the cache, used for debugging purposes. |
118 | | - /// |
119 | | - /// This method spawns a background task to watch to the meta-service key value change events. |
120 | | - /// The task will be notified to quit when this instance is dropped. |
| 49 | + /// * `meta_client` - The meta client to interact with the remote data store. |
| 50 | + /// * `prefix` - The prefix for the cache, used to identify the cache instance. |
| 51 | + /// * `name` - The name of the cache, used for debugging and logging purposes. |
121 | 52 | pub async fn new( |
122 | 53 | meta_client: Arc<ClientHandle>, |
123 | 54 | prefix: impl ToString, |
124 | 55 | name: impl ToString, |
125 | 56 | ) -> Self { |
126 | | - let prefix = prefix.to_string(); |
127 | | - let prefix = prefix.trim_end_matches('/').to_string(); |
128 | | - |
129 | | - let (cancel_tx, cancel_rx) = oneshot::channel::<()>(); |
130 | | - |
131 | | - static UNIQ: atomic::AtomicU64 = atomic::AtomicU64::new(0); |
132 | | - let uniq = UNIQ.fetch_add(1, atomic::Ordering::SeqCst); |
133 | | - |
134 | | - let mut cache = Cache { |
| 57 | + let name = name.to_string(); |
| 58 | + let inner = sub_cache::Cache::new( |
| 59 | + meta_client_source::MetaClientSource { |
| 60 | + client: meta_client, |
| 61 | + name: name.clone(), |
| 62 | + }, |
135 | 63 | prefix, |
136 | | - watcher_task_handle: None, |
137 | | - watcher_cancel_tx: cancel_tx, |
138 | | - data: Arc::new(Mutex::new(Err(Unsupported::new("Cache not initialized")))), |
139 | | - uniq, |
140 | | - name: name.to_string(), |
141 | | - }; |
142 | | - |
143 | | - cache.spawn_watcher_task(meta_client, cancel_rx).await; |
144 | | - |
145 | | - cache |
146 | | - } |
147 | | - |
148 | | - /// Get a SeqV from the cache by key. |
149 | | - pub async fn try_get(&mut self, key: &str) -> Result<Option<SeqV>, Unsupported> { |
150 | | - debug!("Cache::access: try_get({})", key); |
151 | | - self.try_access(|cache_data| cache_data.data.get(key).cloned()) |
152 | | - .await |
153 | | - } |
154 | | - |
155 | | - /// Get the last sequence number of the cache. |
156 | | - pub async fn try_last_seq(&mut self) -> Result<u64, Unsupported> { |
157 | | - self.try_access(|cache_data| cache_data.last_seq).await |
158 | | - } |
159 | | - |
160 | | - /// List all entries in the cache directory. |
161 | | - pub async fn try_list_dir(&mut self, prefix: &str) -> Result<Vec<(String, SeqV)>, Unsupported> { |
162 | | - let prefix = prefix.trim_end_matches('/'); |
163 | | - let left = format!("{}/", prefix); |
164 | | - let right = format!("{}0", prefix); |
165 | | - |
166 | | - debug!("Cache::access: try_list_dir({})", prefix); |
167 | | - |
168 | | - self.try_access(|cache_data| { |
169 | | - cache_data |
170 | | - .data |
171 | | - .range(left..right) |
172 | | - .map(|(k, v)| (k.to_string(), v.clone())) |
173 | | - .collect() |
174 | | - }) |
175 | | - .await |
176 | | - } |
177 | | - |
178 | | - /// Get the internal cache data. |
179 | | - pub async fn cache_data(&mut self) -> MutexGuard<'_, Result<CacheData, Unsupported>> { |
180 | | - self.data.lock().await |
181 | | - } |
182 | | - |
183 | | - /// Access the cache data in read-only mode. |
184 | | - pub async fn try_access<T>( |
185 | | - &mut self, |
186 | | - f: impl FnOnce(&CacheData) -> T, |
187 | | - ) -> Result<T, Unsupported> { |
188 | | - let guard = self.data.lock().await; |
189 | | - |
190 | | - let g = guard.as_ref().map_err(|e| e.clone())?; |
191 | | - |
192 | | - let t = f(g); |
193 | | - Ok(t) |
194 | | - } |
195 | | - |
196 | | - /// Spawns a background task to watch to the meta-service key value change events, feed to the cache. |
197 | | - /// |
198 | | - /// It does not return until a full copy of the cache is received. |
199 | | - async fn spawn_watcher_task( |
200 | | - &mut self, |
201 | | - meta_client: Arc<ClientHandle>, |
202 | | - cancel_rx: oneshot::Receiver<()>, |
203 | | - ) { |
204 | | - let (left, right) = self.key_range(); |
205 | | - |
206 | | - let watcher_name = format!("{}-watcher", self); |
207 | | - let watcher = EventWatcher { |
208 | | - left, |
209 | | - right, |
210 | | - meta_client, |
211 | | - data: self.data.clone(), |
212 | | - name: watcher_name.to_string(), |
213 | | - }; |
214 | | - |
215 | | - // For receiving a signal when the cache has started to initialize and safe to use: |
216 | | - // i.e., if the user acquired the data lock, they can see a complete view of the data(fully initialized). |
217 | | - let (started_tx, started_rx) = oneshot::channel::<()>(); |
218 | | - |
219 | | - let task_name = watcher_name.to_string(); |
220 | | - let fu = watcher.main(Some(started_tx), cancel_rx.map(|_| ())); |
221 | | - |
222 | | - let handle = spawn_named(fu, task_name); |
223 | | - self.watcher_task_handle = Some(handle); |
224 | | - |
225 | | - // Wait for the sending end to be dropped, indicating that the cache has started to initialize. |
226 | | - started_rx.await.ok(); |
227 | | - } |
228 | | - |
229 | | - /// The left-close right-open range for the cached keys. |
230 | | - /// |
231 | | - /// Since `'0'` is the next char of `'/'`. |
232 | | - /// `[prefix + "/", prefix + "0")` is the range of the cache ids. |
233 | | - fn key_range(&self) -> (String, String) { |
234 | | - let left = self.prefix.clone() + "/"; |
235 | | - let right = self.prefix.clone() + "0"; |
| 64 | + name, |
| 65 | + ) |
| 66 | + .await; |
236 | 67 |
|
237 | | - (left, right) |
| 68 | + Cache { inner } |
238 | 69 | } |
239 | 70 | } |
0 commit comments