Skip to content

Commit 37ed5c4

Browse files
committed
Introduce two separate VssClients for async/blocking contexts
To avoid any blocking cross-runtime behavior that could arise from reusing a single client's TCP connections in different runtime contexts, we here split out the `VssStore` behavior to use one dedicated `VssClient` per context. I.e., we're now using two connections/connection pools and make sure only the `blocking_client` is used in `KVStoreSync` contexts, and `async_client` in `KVStore` contexts.
1 parent 37ee256 commit 37ed5c4

File tree

1 file changed

+77
-37
lines changed

1 file changed

+77
-37
lines changed

src/io/vss_store.rs

Lines changed: 77 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -98,24 +98,22 @@ impl VssStore {
9898
let (data_encryption_key, obfuscation_master_key) =
9999
derive_data_encryption_and_obfuscation_keys(&vss_seed);
100100
let key_obfuscator = KeyObfuscator::new(obfuscation_master_key);
101-
let retry_policy = ExponentialBackoffRetryPolicy::new(Duration::from_millis(10))
102-
.with_max_attempts(100)
103-
.with_max_total_delay(Duration::from_secs(180))
104-
.with_max_jitter(Duration::from_millis(100))
105-
.skip_retry_on_error(Box::new(|e: &VssError| {
106-
matches!(
107-
e,
108-
VssError::NoSuchKeyError(..)
109-
| VssError::InvalidRequestError(..)
110-
| VssError::ConflictError(..)
111-
)
112-
}) as _);
113101

114-
let client = VssClient::new_with_headers(base_url, retry_policy, header_provider);
102+
let sync_retry_policy = retry_policy();
103+
let blocking_client = VssClient::new_with_headers(
104+
base_url.clone(),
105+
sync_retry_policy,
106+
header_provider.clone(),
107+
);
108+
109+
let async_retry_policy = retry_policy();
110+
let async_client =
111+
VssClient::new_with_headers(base_url, async_retry_policy, header_provider);
115112

116113
let inner = Arc::new(VssStoreInner::new(
117114
schema_version,
118-
client,
115+
blocking_client,
116+
async_client,
119117
store_id,
120118
data_encryption_key,
121119
key_obfuscator,
@@ -164,8 +162,11 @@ impl KVStoreSync for VssStore {
164162
let secondary_namespace = secondary_namespace.to_string();
165163
let key = key.to_string();
166164
let inner = Arc::clone(&self.inner);
167-
let fut =
168-
async move { inner.read_internal(primary_namespace, secondary_namespace, key).await };
165+
let fut = async move {
166+
inner
167+
.read_internal(&inner.blocking_client, primary_namespace, secondary_namespace, key)
168+
.await
169+
};
169170
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
170171
}
171172

@@ -186,6 +187,7 @@ impl KVStoreSync for VssStore {
186187
let fut = async move {
187188
inner
188189
.write_internal(
190+
&inner.blocking_client,
189191
inner_lock_ref,
190192
locking_key,
191193
version,
@@ -216,6 +218,7 @@ impl KVStoreSync for VssStore {
216218
let fut = async move {
217219
inner
218220
.remove_internal(
221+
&inner.blocking_client,
219222
inner_lock_ref,
220223
locking_key,
221224
version,
@@ -238,7 +241,11 @@ impl KVStoreSync for VssStore {
238241
let primary_namespace = primary_namespace.to_string();
239242
let secondary_namespace = secondary_namespace.to_string();
240243
let inner = Arc::clone(&self.inner);
241-
let fut = async move { inner.list_internal(primary_namespace, secondary_namespace).await };
244+
let fut = async move {
245+
inner
246+
.list_internal(&inner.blocking_client, primary_namespace, secondary_namespace)
247+
.await
248+
};
242249
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
243250
}
244251
}
@@ -251,9 +258,11 @@ impl KVStore for VssStore {
251258
let secondary_namespace = secondary_namespace.to_string();
252259
let key = key.to_string();
253260
let inner = Arc::clone(&self.inner);
254-
Box::pin(
255-
async move { inner.read_internal(primary_namespace, secondary_namespace, key).await },
256-
)
261+
Box::pin(async move {
262+
inner
263+
.read_internal(&inner.async_client, primary_namespace, secondary_namespace, key)
264+
.await
265+
})
257266
}
258267
fn write(
259268
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
@@ -267,6 +276,7 @@ impl KVStore for VssStore {
267276
Box::pin(async move {
268277
inner
269278
.write_internal(
279+
&inner.async_client,
270280
inner_lock_ref,
271281
locking_key,
272282
version,
@@ -290,6 +300,7 @@ impl KVStore for VssStore {
290300
Box::pin(async move {
291301
inner
292302
.remove_internal(
303+
&inner.async_client,
293304
inner_lock_ref,
294305
locking_key,
295306
version,
@@ -307,7 +318,9 @@ impl KVStore for VssStore {
307318
let primary_namespace = primary_namespace.to_string();
308319
let secondary_namespace = secondary_namespace.to_string();
309320
let inner = Arc::clone(&self.inner);
310-
Box::pin(async move { inner.list_internal(primary_namespace, secondary_namespace).await })
321+
Box::pin(async move {
322+
inner.list_internal(&inner.async_client, primary_namespace, secondary_namespace).await
323+
})
311324
}
312325
}
313326

@@ -320,7 +333,10 @@ impl Drop for VssStore {
320333

321334
struct VssStoreInner {
322335
schema_version: VssSchemaVersion,
323-
client: VssClient<CustomRetryPolicy>,
336+
blocking_client: VssClient<CustomRetryPolicy>,
337+
// A secondary client that will only be used for async persistence via `KVStore`, to ensure TCP
338+
// connections aren't shared between our outer and the internal runtime.
339+
async_client: VssClient<CustomRetryPolicy>,
324340
store_id: String,
325341
data_encryption_key: [u8; 32],
326342
key_obfuscator: KeyObfuscator,
@@ -332,14 +348,16 @@ struct VssStoreInner {
332348

333349
impl VssStoreInner {
334350
pub(crate) fn new(
335-
schema_version: VssSchemaVersion, client: VssClient<CustomRetryPolicy>, store_id: String,
351+
schema_version: VssSchemaVersion, blocking_client: VssClient<CustomRetryPolicy>,
352+
async_client: VssClient<CustomRetryPolicy>, store_id: String,
336353
data_encryption_key: [u8; 32], key_obfuscator: KeyObfuscator,
337354
) -> Self {
338355
let locks = Mutex::new(HashMap::new());
339356
let pending_lazy_deletes = Mutex::new(Vec::new());
340357
Self {
341358
schema_version,
342-
client,
359+
blocking_client,
360+
async_client,
343361
store_id,
344362
data_encryption_key,
345363
key_obfuscator,
@@ -405,7 +423,8 @@ impl VssStoreInner {
405423
}
406424

407425
async fn list_all_keys(
408-
&self, primary_namespace: &str, secondary_namespace: &str,
426+
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: &str,
427+
secondary_namespace: &str,
409428
) -> io::Result<Vec<String>> {
410429
let mut page_token = None;
411430
let mut keys = vec![];
@@ -418,7 +437,7 @@ impl VssStoreInner {
418437
page_size: None,
419438
};
420439

421-
let response = self.client.list_key_versions(&request).await.map_err(|e| {
440+
let response = client.list_key_versions(&request).await.map_err(|e| {
422441
let msg = format!(
423442
"Failed to list keys in {}/{}: {}",
424443
primary_namespace, secondary_namespace, e
@@ -435,13 +454,14 @@ impl VssStoreInner {
435454
}
436455

437456
async fn read_internal(
438-
&self, primary_namespace: String, secondary_namespace: String, key: String,
457+
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String,
458+
secondary_namespace: String, key: String,
439459
) -> io::Result<Vec<u8>> {
440460
check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?;
441461

442462
let store_key = self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
443463
let request = GetObjectRequest { store_id: self.store_id.clone(), key: store_key.clone() };
444-
let resp = self.client.get_object(&request).await.map_err(|e| {
464+
let resp = client.get_object(&request).await.map_err(|e| {
445465
let msg = format!(
446466
"Failed to read from key {}/{}/{}: {}",
447467
primary_namespace, secondary_namespace, key, e
@@ -470,8 +490,9 @@ impl VssStoreInner {
470490
}
471491

472492
async fn write_internal(
473-
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
474-
primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
493+
&self, client: &VssClient<CustomRetryPolicy>, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>,
494+
locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String,
495+
key: String, buf: Vec<u8>,
475496
) -> io::Result<()> {
476497
check_namespace_key_validity(
477498
&primary_namespace,
@@ -505,7 +526,7 @@ impl VssStoreInner {
505526
};
506527

507528
self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
508-
self.client.put_object(&request).await.map_err(|e| {
529+
client.put_object(&request).await.map_err(|e| {
509530
// Restore delete items so they'll be retried on next write.
510531
if !delete_items.is_empty() {
511532
self.pending_lazy_deletes.lock().unwrap().extend(delete_items);
@@ -524,8 +545,9 @@ impl VssStoreInner {
524545
}
525546

526547
async fn remove_internal(
527-
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
528-
primary_namespace: String, secondary_namespace: String, key: String, lazy: bool,
548+
&self, client: &VssClient<CustomRetryPolicy>, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>,
549+
locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String,
550+
key: String, lazy: bool,
529551
) -> io::Result<()> {
530552
check_namespace_key_validity(
531553
&primary_namespace,
@@ -548,7 +570,7 @@ impl VssStoreInner {
548570
let request =
549571
DeleteObjectRequest { store_id: self.store_id.clone(), key_value: Some(key_value) };
550572

551-
self.client.delete_object(&request).await.map_err(|e| {
573+
client.delete_object(&request).await.map_err(|e| {
552574
let msg = format!(
553575
"Failed to delete key {}/{}/{}: {}",
554576
primary_namespace, secondary_namespace, key, e
@@ -562,12 +584,15 @@ impl VssStoreInner {
562584
}
563585

564586
async fn list_internal(
565-
&self, primary_namespace: String, secondary_namespace: String,
587+
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String,
588+
secondary_namespace: String,
566589
) -> io::Result<Vec<String>> {
567590
check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?;
568591

569-
let keys =
570-
self.list_all_keys(&primary_namespace, &secondary_namespace).await.map_err(|e| {
592+
let keys = self
593+
.list_all_keys(client, &primary_namespace, &secondary_namespace)
594+
.await
595+
.map_err(|e| {
571596
let msg = format!(
572597
"Failed to retrieve keys in namespace: {}/{} : {}",
573598
primary_namespace, secondary_namespace, e
@@ -636,6 +661,21 @@ fn derive_data_encryption_and_obfuscation_keys(vss_seed: &[u8; 32]) -> ([u8; 32]
636661
(k1, k2)
637662
}
638663

664+
fn retry_policy() -> CustomRetryPolicy {
665+
ExponentialBackoffRetryPolicy::new(Duration::from_millis(10))
666+
.with_max_attempts(100)
667+
.with_max_total_delay(Duration::from_secs(180))
668+
.with_max_jitter(Duration::from_millis(100))
669+
.skip_retry_on_error(Box::new(|e: &VssError| {
670+
matches!(
671+
e,
672+
VssError::NoSuchKeyError(..)
673+
| VssError::InvalidRequestError(..)
674+
| VssError::ConflictError(..)
675+
)
676+
}) as _)
677+
}
678+
639679
/// A source for generating entropy/randomness using [`rand`].
640680
pub(crate) struct RandEntropySource;
641681

0 commit comments

Comments
 (0)