|
1 | 1 | use graph::blockchain::block_stream::FirehoseCursor;
|
2 | 2 | use graph::data::subgraph::schema::DeploymentCreate;
|
| 3 | +use graph::data::value::Word; |
| 4 | +use graph::data_source::CausalityRegion; |
3 | 5 | use graph::schema::InputSchema;
|
4 | 6 | use lazy_static::lazy_static;
|
| 7 | +use std::collections::BTreeSet; |
5 | 8 | use std::marker::PhantomData;
|
6 | 9 | use test_store::*;
|
7 | 10 |
|
8 |
| -use graph::components::store::{DeploymentLocator, EntityKey, WritableStore}; |
| 11 | +use graph::components::store::{DeploymentLocator, DerivedEntityQuery, EntityKey, WritableStore}; |
9 | 12 | use graph::data::subgraph::*;
|
10 | 13 | use graph::semver::Version;
|
11 | 14 | use graph::{entity, prelude::*};
|
@@ -125,47 +128,113 @@ async fn pause_writer(deployment: &DeploymentLocator) {
|
125 | 128 | writable::allow_steps(deployment, 0).await;
|
126 | 129 | }
|
127 | 130 |
|
128 |
| -async fn resume_writer(deployment: &DeploymentLocator, steps: usize) { |
129 |
| - writable::allow_steps(deployment, steps).await; |
130 |
| - flush(deployment).await.unwrap(); |
131 |
| -} |
132 |
| - |
133 |
| -#[test] |
134 |
| -fn tracker() { |
135 |
| - run_test(|store, writable, deployment| async move { |
| 131 | +/// Test that looking up entities when several changes to the same entity |
| 132 | +/// are queued works. When `batch` is true, the changes all reside in one |
| 133 | +/// batch. If it is false, each change is in its own batch. |
| 134 | +/// |
| 135 | +/// `read_count` lets us look up entities in different ways to exercise |
| 136 | +/// different methods in `WritableStore` |
| 137 | +fn get_with_pending<F>(batch: bool, read_count: F) |
| 138 | +where |
| 139 | + F: Send + Fn(&dyn WritableStore) -> i32 + Sync + 'static, |
| 140 | +{ |
| 141 | + run_test(move |store, writable, deployment| async move { |
136 | 142 | let subgraph_store = store.subgraph_store();
|
137 | 143 |
|
138 |
| - let read_count = || { |
139 |
| - let counter = writable.get(&count_key("1")).unwrap().unwrap(); |
140 |
| - counter.get("count").unwrap().as_int().unwrap() |
141 |
| - }; |
| 144 | + let read_count = || read_count(writable.as_ref()); |
| 145 | + |
| 146 | + if !batch { |
| 147 | + writable.deployment_synced().unwrap(); |
| 148 | + } |
| 149 | + |
142 | 150 | for count in 1..4 {
|
143 | 151 | insert_count(&subgraph_store, &deployment, count).await;
|
144 | 152 | }
|
| 153 | + |
| 154 | + // Test reading back with pending writes to the same entity |
145 | 155 | pause_writer(&deployment).await;
|
| 156 | + for count in 4..7 { |
| 157 | + insert_count(&subgraph_store, &deployment, count).await; |
| 158 | + } |
| 159 | + assert_eq!(6, read_count()); |
146 | 160 |
|
147 |
| - // Test reading back with a pending write |
148 |
| - insert_count(&subgraph_store, &deployment, 4).await; |
149 |
| - assert_eq!(4, read_count()); |
150 |
| - resume_writer(&deployment, 1).await; |
151 |
| - assert_eq!(4, read_count()); |
| 161 | + writable.flush().await.unwrap(); |
| 162 | + assert_eq!(6, read_count()); |
152 | 163 |
|
153 |
| - // Test reading back with a pending revert |
| 164 | + // Test reading back with pending writes and a pending revert |
| 165 | + for count in 7..10 { |
| 166 | + insert_count(&subgraph_store, &deployment, count).await; |
| 167 | + } |
154 | 168 | writable
|
155 | 169 | .revert_block_operations(block_pointer(2), FirehoseCursor::None)
|
156 | 170 | .await
|
157 | 171 | .unwrap();
|
158 | 172 |
|
159 | 173 | assert_eq!(2, read_count());
|
160 | 174 |
|
161 |
| - resume_writer(&deployment, 1).await; |
162 |
| - assert_eq!(2, read_count()); |
163 |
| - |
164 |
| - // There shouldn't be anything left to do, but make sure of that |
165 | 175 | writable.flush().await.unwrap();
|
| 176 | + assert_eq!(2, read_count()); |
166 | 177 | })
|
167 | 178 | }
|
168 | 179 |
|
| 180 | +/// Get the count using `WritableStore::get_many` |
| 181 | +fn count_get_many(writable: &dyn WritableStore) -> i32 { |
| 182 | + let key = count_key("1"); |
| 183 | + let keys = BTreeSet::from_iter(vec![key.clone()]); |
| 184 | + let counter = writable.get_many(keys).unwrap().get(&key).unwrap().clone(); |
| 185 | + counter.get("count").unwrap().as_int().unwrap() |
| 186 | +} |
| 187 | + |
| 188 | +/// Get the count using `WritableStore::get` |
| 189 | +fn count_get(writable: &dyn WritableStore) -> i32 { |
| 190 | + let counter = writable.get(&count_key("1")).unwrap().unwrap(); |
| 191 | + counter.get("count").unwrap().as_int().unwrap() |
| 192 | +} |
| 193 | + |
| 194 | +fn count_get_derived(writable: &dyn WritableStore) -> i32 { |
| 195 | + let key = count_key("1"); |
| 196 | + let query = DerivedEntityQuery { |
| 197 | + entity_type: key.entity_type.clone(), |
| 198 | + entity_field: Word::from("id"), |
| 199 | + value: key.entity_id.clone(), |
| 200 | + id_is_bytes: false, |
| 201 | + causality_region: CausalityRegion::ONCHAIN, |
| 202 | + }; |
| 203 | + let map = writable.get_derived(&query).unwrap(); |
| 204 | + let counter = map.get(&key).unwrap(); |
| 205 | + counter.get("count").unwrap().as_int().unwrap() |
| 206 | +} |
| 207 | + |
| 208 | +#[test] |
| 209 | +fn get_batch() { |
| 210 | + get_with_pending(true, count_get); |
| 211 | +} |
| 212 | + |
| 213 | +#[test] |
| 214 | +fn get_nobatch() { |
| 215 | + get_with_pending(false, count_get); |
| 216 | +} |
| 217 | + |
| 218 | +#[test] |
| 219 | +fn get_many_batch() { |
| 220 | + get_with_pending(true, count_get_many); |
| 221 | +} |
| 222 | + |
| 223 | +#[test] |
| 224 | +fn get_many_nobatch() { |
| 225 | + get_with_pending(false, count_get_many); |
| 226 | +} |
| 227 | + |
| 228 | +#[test] |
| 229 | +fn get_derived_batch() { |
| 230 | + get_with_pending(true, count_get_derived); |
| 231 | +} |
| 232 | + |
| 233 | +#[test] |
| 234 | +fn get_derived_nobatch() { |
| 235 | + get_with_pending(false, count_get_derived); |
| 236 | +} |
| 237 | + |
169 | 238 | #[test]
|
170 | 239 | fn restart() {
|
171 | 240 | run_test(|store, writable, deployment| async move {
|
|
0 commit comments