Skip to content

Commit 6c5d357

Browse files
committed
Determine VSS schema version at startup
Since we just made some breaking changes to how exactly we persist data via VSS (now using an `aad` that commits to the key and also obfuscating namespaces), we have to detect which schema version we're on to ensure backwards compatibility. To this end, we here start reading a persisted `vss_schema_version` key in `VssStore::new`. If it is present, we just return the encoded value (right now that can only be V1). If it is not present, it can either mean we run for the first time *or* we're on V0, which we determine checking if anything related to the `bdk_wallet` descriptors are present in the store. If we're running for the first time, we also persist the schema version to save us these rather inefficient steps on following startups.
1 parent cfc162f commit 6c5d357

File tree

2 files changed

+157
-20
lines changed

2 files changed

+157
-20
lines changed

src/builder.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,12 @@ impl NodeBuilder {
731731

732732
let vss_seed_bytes: [u8; 32] = vss_xprv.private_key.secret_bytes();
733733

734-
let vss_store = VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider);
734+
let vss_store =
735+
VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider).map_err(|e| {
736+
log_error!(logger, "Failed to setup VSS store: {}", e);
737+
BuildError::KVStoreSetupFailed
738+
})?;
739+
735740
build_with_store_internal(
736741
config,
737742
self.chain_data_source_config.as_ref(),

src/io/vss_store.rs

Lines changed: 151 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ use std::time::Duration;
1717

1818
use bdk_chain::Merge;
1919
use bitcoin::hashes::{sha256, Hash, HashEngine, Hmac, HmacEngine};
20+
use lightning::impl_writeable_tlv_based_enum;
2021
use lightning::io::{self, Error, ErrorKind};
2122
use lightning::util::persist::{KVStore, KVStoreSync};
23+
use lightning::util::ser::{Readable, Writeable};
2224
use prost::Message;
2325
use rand::RngCore;
2426
use vss_client::client::VssClient;
@@ -55,6 +57,13 @@ enum VssSchemaVersion {
5557
V1,
5658
}
5759

60+
impl_writeable_tlv_based_enum!(VssSchemaVersion,
61+
(0, V0) => {},
62+
(1, V1) => {},
63+
);
64+
65+
const VSS_SCHEMA_VERSION_KEY: &str = "vss_schema_version";
66+
5867
// We set this to a small number of threads that would still allow to make some progress if one
5968
// would hit a blocking case
6069
const INTERNAL_RUNTIME_WORKERS: usize = 2;
@@ -78,23 +87,20 @@ impl VssStore {
7887
pub(crate) fn new(
7988
base_url: String, store_id: String, vss_seed: [u8; 32],
8089
header_provider: Arc<dyn VssHeaderProvider>,
81-
) -> Self {
90+
) -> io::Result<Self> {
8291
let next_version = AtomicU64::new(1);
83-
let internal_runtime = Some(
84-
tokio::runtime::Builder::new_multi_thread()
85-
.enable_all()
86-
.thread_name_fn(|| {
87-
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
88-
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
89-
format!("ldk-node-vss-runtime-{}", id)
90-
})
91-
.worker_threads(INTERNAL_RUNTIME_WORKERS)
92-
.max_blocking_threads(INTERNAL_RUNTIME_WORKERS)
93-
.build()
94-
.unwrap(),
95-
);
92+
let internal_runtime = tokio::runtime::Builder::new_multi_thread()
93+
.enable_all()
94+
.thread_name_fn(|| {
95+
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
96+
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
97+
format!("ldk-node-vss-runtime-{}", id)
98+
})
99+
.worker_threads(INTERNAL_RUNTIME_WORKERS)
100+
.max_blocking_threads(INTERNAL_RUNTIME_WORKERS)
101+
.build()
102+
.unwrap();
96103

97-
let schema_version = VssSchemaVersion::V0;
98104
let (data_encryption_key, obfuscation_master_key) =
99105
derive_data_encryption_and_obfuscation_keys(&vss_seed);
100106
let key_obfuscator = KeyObfuscator::new(obfuscation_master_key);
@@ -106,6 +112,21 @@ impl VssStore {
106112
header_provider.clone(),
107113
);
108114

115+
let runtime_handle = internal_runtime.handle();
116+
let schema_store_id = store_id.clone();
117+
let schema_key_obfuscator = KeyObfuscator::new(obfuscation_master_key);
118+
let (schema_version, blocking_client) = tokio::task::block_in_place(move || {
119+
runtime_handle.block_on(async {
120+
determine_and_write_schema_version(
121+
blocking_client,
122+
schema_store_id,
123+
data_encryption_key,
124+
schema_key_obfuscator,
125+
)
126+
.await
127+
})
128+
})?;
129+
109130
let async_retry_policy = retry_policy();
110131
let async_client =
111132
VssClient::new_with_headers(base_url, async_retry_policy, header_provider);
@@ -119,7 +140,7 @@ impl VssStore {
119140
key_obfuscator,
120141
));
121142

122-
Self { inner, next_version, internal_runtime }
143+
Ok(Self { inner, next_version, internal_runtime: Some(internal_runtime) })
123144
}
124145

125146
// Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys
@@ -676,6 +697,114 @@ fn retry_policy() -> CustomRetryPolicy {
676697
}) as _)
677698
}
678699

700+
// FIXME: This returns the used client as currently `VssClient`'s `RetryPolicy`s aren't `Clone`. So
701+
// we're forced to take the owned client and return it to be able to reuse the same connection
702+
// later.
703+
async fn determine_and_write_schema_version(
704+
client: VssClient<CustomRetryPolicy>, store_id: String, data_encryption_key: [u8; 32],
705+
key_obfuscator: KeyObfuscator,
706+
) -> io::Result<(VssSchemaVersion, VssClient<CustomRetryPolicy>)> {
707+
// Build the obfuscated `vss_schema_version` key.
708+
let obfuscated_prefix = key_obfuscator.obfuscate(&format! {"{}#{}", "", ""});
709+
let obfuscated_key = key_obfuscator.obfuscate(VSS_SCHEMA_VERSION_KEY);
710+
let store_key = format!("{}#{}", obfuscated_prefix, obfuscated_key);
711+
712+
// Try to read the stored schema version.
713+
let request = GetObjectRequest { store_id: store_id.clone(), key: store_key.clone() };
714+
let resp = match client.get_object(&request).await {
715+
Ok(resp) => Some(resp),
716+
Err(VssError::NoSuchKeyError(..)) => {
717+
// The value is not set.
718+
None
719+
},
720+
Err(e) => {
721+
let msg = format!("Failed to read schema version: {}", e);
722+
return Err(Error::new(ErrorKind::Other, msg));
723+
},
724+
};
725+
726+
if let Some(resp) = resp {
727+
// The schema version was present, so just decrypt the stored data.
728+
729+
// unwrap safety: resp.value must be always present for a non-erroneous VSS response, otherwise
730+
// it is an API-violation which is converted to [`VssError::InternalServerError`] in [`VssClient`]
731+
let storable = Storable::decode(&resp.value.unwrap().value[..]).map_err(|e| {
732+
let msg = format!("Failed to decode schema version: {}", e);
733+
Error::new(ErrorKind::Other, msg)
734+
})?;
735+
736+
let storable_builder = StorableBuilder::new(RandEntropySource);
737+
// Schema version was added starting with V1, so if set at all, we use the key as `aad`
738+
let aad = store_key.as_bytes();
739+
let decrypted = storable_builder
740+
.deconstruct(storable, &data_encryption_key, aad)
741+
.map_err(|e| {
742+
let msg = format!("Failed to decode schema version: {}", e);
743+
Error::new(ErrorKind::Other, msg)
744+
})?
745+
.0;
746+
747+
let schema_version: VssSchemaVersion = Readable::read(&mut io::Cursor::new(decrypted))
748+
.map_err(|e| {
749+
let msg = format!("Failed to decode schema version: {}", e);
750+
Error::new(ErrorKind::Other, msg)
751+
})?;
752+
Ok((schema_version, client))
753+
} else {
754+
// The schema version wasn't present, this either means we're running for the first time *or* it's V0 pre-migration (predating writing of the schema version).
755+
756+
// Check if any `bdk_wallet` data was written by listing keys under the respective
757+
// (unobfuscated) prefix.
758+
const V0_BDK_WALLET_PREFIX: &str = "bdk_wallet#";
759+
let request = ListKeyVersionsRequest {
760+
store_id: store_id.clone(),
761+
key_prefix: Some(V0_BDK_WALLET_PREFIX.to_string()),
762+
page_token: None,
763+
page_size: None,
764+
};
765+
766+
let response = client.list_key_versions(&request).await.map_err(|e| {
767+
let msg = format!("Failed to determine schema version: {}", e);
768+
Error::new(ErrorKind::Other, msg)
769+
})?;
770+
771+
let wallet_data_present = !response.key_versions.is_empty();
772+
if wallet_data_present {
773+
// If the wallet data is present, it means we're not running for the first time.
774+
Ok((VssSchemaVersion::V0, client))
775+
} else {
776+
// We're running for the first time, write the schema version to save unnecessary IOps
777+
// on future startup.
778+
let schema_version = VssSchemaVersion::V1;
779+
let encoded_version = schema_version.encode();
780+
781+
let storable_builder = StorableBuilder::new(RandEntropySource);
782+
let vss_version = -1;
783+
let aad = store_key.as_bytes();
784+
let storable =
785+
storable_builder.build(encoded_version, vss_version, &data_encryption_key, aad);
786+
787+
let request = PutObjectRequest {
788+
store_id,
789+
global_version: None,
790+
transaction_items: vec![KeyValue {
791+
key: store_key,
792+
version: vss_version,
793+
value: storable.encode_to_vec(),
794+
}],
795+
delete_items: vec![],
796+
};
797+
798+
client.put_object(&request).await.map_err(|e| {
799+
let msg = format!("Failed to write schema version: {}", e);
800+
Error::new(ErrorKind::Other, msg)
801+
})?;
802+
803+
Ok((schema_version, client))
804+
}
805+
}
806+
}
807+
679808
/// A source for generating entropy/randomness using [`rand`].
680809
pub(crate) struct RandEntropySource;
681810

@@ -708,7 +837,8 @@ mod tests {
708837
let mut vss_seed = [0u8; 32];
709838
rng.fill_bytes(&mut vss_seed);
710839
let header_provider = Arc::new(FixedHeaders::new(HashMap::new()));
711-
let vss_store = VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider);
840+
let vss_store =
841+
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap();
712842
do_read_write_remove_list_persist(&vss_store);
713843
}
714844

@@ -720,7 +850,8 @@ mod tests {
720850
let mut vss_seed = [0u8; 32];
721851
rng.fill_bytes(&mut vss_seed);
722852
let header_provider = Arc::new(FixedHeaders::new(HashMap::new()));
723-
let vss_store = VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider);
853+
let vss_store =
854+
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap();
724855

725856
do_read_write_remove_list_persist(&vss_store);
726857
drop(vss_store)
@@ -734,7 +865,8 @@ mod tests {
734865
let mut vss_seed = [0u8; 32];
735866
rng.fill_bytes(&mut vss_seed);
736867
let header_provider = Arc::new(FixedHeaders::new(HashMap::new()));
737-
let vss_store = VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider);
868+
let vss_store =
869+
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap();
738870

739871
let primary_namespace = "test_namespace";
740872
let secondary_namespace = "";

0 commit comments

Comments
 (0)