Skip to content

Commit 20a93c5

Browse files
committed
Determine VSS schema version at startup
Since we just made some breaking changes to how exactly we persist data via VSS (now using an `aad` that commits to the key and also obfuscating namespaces), we have to detect which schema version we're on to ensure backwards compatibility. To this end, we here start reading a persisted `vss_schema_version` key in `VssStore::new`. If it is present, we just return the encoded value (right now that can only be V1). If it is not present, it can either mean we run for the first time *or* we're on V0, which we determine checking if anything related to the `bdk_wallet` descriptors are present in the store. If we're running for the first time, we also persist the schema version to save us these rather inefficient steps on following startups.
1 parent 37ed5c4 commit 20a93c5

File tree

2 files changed

+152
-20
lines changed

2 files changed

+152
-20
lines changed

src/builder.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,12 @@ impl NodeBuilder {
731731

732732
let vss_seed_bytes: [u8; 32] = vss_xprv.private_key.secret_bytes();
733733

734-
let vss_store = VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider);
734+
let vss_store =
735+
VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider).map_err(|e| {
736+
log_error!(logger, "Failed to setup VSS store: {}", e);
737+
BuildError::KVStoreSetupFailed
738+
})?;
739+
735740
build_with_store_internal(
736741
config,
737742
self.chain_data_source_config.as_ref(),

src/io/vss_store.rs

Lines changed: 146 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ use std::time::Duration;
1717

1818
use bdk_chain::Merge;
1919
use bitcoin::hashes::{sha256, Hash, HashEngine, Hmac, HmacEngine};
20+
use lightning::impl_writeable_tlv_based_enum;
2021
use lightning::io::{self, Error, ErrorKind};
2122
use lightning::util::persist::{KVStore, KVStoreSync};
23+
use lightning::util::ser::{Readable, Writeable};
2224
use prost::Message;
2325
use rand::RngCore;
2426
use vss_client::client::VssClient;
@@ -55,6 +57,13 @@ enum VssSchemaVersion {
5557
V1,
5658
}
5759

60+
impl_writeable_tlv_based_enum!(VssSchemaVersion,
61+
(0, V0) => {},
62+
(1, V1) => {},
63+
);
64+
65+
const VSS_SCHEMA_VERSION_KEY: &str = "vss_schema_version";
66+
5867
// We set this to a small number of threads that would still allow to make some progress if one
5968
// would hit a blocking case
6069
const INTERNAL_RUNTIME_WORKERS: usize = 2;
@@ -78,23 +87,20 @@ impl VssStore {
7887
pub(crate) fn new(
7988
base_url: String, store_id: String, vss_seed: [u8; 32],
8089
header_provider: Arc<dyn VssHeaderProvider>,
81-
) -> Self {
90+
) -> io::Result<Self> {
8291
let next_version = AtomicU64::new(1);
83-
let internal_runtime = Some(
84-
tokio::runtime::Builder::new_multi_thread()
85-
.enable_all()
86-
.thread_name_fn(|| {
87-
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
88-
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
89-
format!("ldk-node-vss-runtime-{}", id)
90-
})
91-
.worker_threads(INTERNAL_RUNTIME_WORKERS)
92-
.max_blocking_threads(INTERNAL_RUNTIME_WORKERS)
93-
.build()
94-
.unwrap(),
95-
);
92+
let internal_runtime = tokio::runtime::Builder::new_multi_thread()
93+
.enable_all()
94+
.thread_name_fn(|| {
95+
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
96+
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
97+
format!("ldk-node-vss-runtime-{}", id)
98+
})
99+
.worker_threads(INTERNAL_RUNTIME_WORKERS)
100+
.max_blocking_threads(INTERNAL_RUNTIME_WORKERS)
101+
.build()
102+
.unwrap();
96103

97-
let schema_version = VssSchemaVersion::V0;
98104
let (data_encryption_key, obfuscation_master_key) =
99105
derive_data_encryption_and_obfuscation_keys(&vss_seed);
100106
let key_obfuscator = KeyObfuscator::new(obfuscation_master_key);
@@ -106,6 +112,19 @@ impl VssStore {
106112
header_provider.clone(),
107113
);
108114

115+
let runtime_handle = internal_runtime.handle();
116+
let schema_version = tokio::task::block_in_place(|| {
117+
runtime_handle.block_on(async {
118+
determine_and_write_schema_version(
119+
&blocking_client,
120+
&store_id,
121+
data_encryption_key,
122+
&key_obfuscator,
123+
)
124+
.await
125+
})
126+
})?;
127+
109128
let async_retry_policy = retry_policy();
110129
let async_client =
111130
VssClient::new_with_headers(base_url, async_retry_policy, header_provider);
@@ -119,7 +138,7 @@ impl VssStore {
119138
key_obfuscator,
120139
));
121140

122-
Self { inner, next_version, internal_runtime }
141+
Ok(Self { inner, next_version, internal_runtime: Some(internal_runtime) })
123142
}
124143

125144
// Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys
@@ -676,6 +695,111 @@ fn retry_policy() -> CustomRetryPolicy {
676695
}) as _)
677696
}
678697

698+
async fn determine_and_write_schema_version(
699+
client: &VssClient<CustomRetryPolicy>, store_id: &String, data_encryption_key: [u8; 32],
700+
key_obfuscator: &KeyObfuscator,
701+
) -> io::Result<VssSchemaVersion> {
702+
// Build the obfuscated `vss_schema_version` key.
703+
let obfuscated_prefix = key_obfuscator.obfuscate(&format! {"{}#{}", "", ""});
704+
let obfuscated_key = key_obfuscator.obfuscate(VSS_SCHEMA_VERSION_KEY);
705+
let store_key = format!("{}#{}", obfuscated_prefix, obfuscated_key);
706+
707+
// Try to read the stored schema version.
708+
let request = GetObjectRequest { store_id: store_id.clone(), key: store_key.clone() };
709+
let resp = match client.get_object(&request).await {
710+
Ok(resp) => Some(resp),
711+
Err(VssError::NoSuchKeyError(..)) => {
712+
// The value is not set.
713+
None
714+
},
715+
Err(e) => {
716+
let msg = format!("Failed to read schema version: {}", e);
717+
return Err(Error::new(ErrorKind::Other, msg));
718+
},
719+
};
720+
721+
if let Some(resp) = resp {
722+
// The schema version was present, so just decrypt the stored data.
723+
724+
// unwrap safety: resp.value must be always present for a non-erroneous VSS response, otherwise
725+
// it is an API-violation which is converted to [`VssError::InternalServerError`] in [`VssClient`]
726+
let storable = Storable::decode(&resp.value.unwrap().value[..]).map_err(|e| {
727+
let msg = format!("Failed to decode schema version: {}", e);
728+
Error::new(ErrorKind::Other, msg)
729+
})?;
730+
731+
let storable_builder = StorableBuilder::new(RandEntropySource);
732+
// Schema version was added starting with V1, so if set at all, we use the key as `aad`
733+
let aad = store_key.as_bytes();
734+
let decrypted = storable_builder
735+
.deconstruct(storable, &data_encryption_key, aad)
736+
.map_err(|e| {
737+
let msg = format!("Failed to decode schema version: {}", e);
738+
Error::new(ErrorKind::Other, msg)
739+
})?
740+
.0;
741+
742+
let schema_version: VssSchemaVersion = Readable::read(&mut io::Cursor::new(decrypted))
743+
.map_err(|e| {
744+
let msg = format!("Failed to decode schema version: {}", e);
745+
Error::new(ErrorKind::Other, msg)
746+
})?;
747+
Ok(schema_version)
748+
} else {
749+
// The schema version wasn't present, this either means we're running for the first time *or* it's V0 pre-migration (predating writing of the schema version).
750+
751+
// Check if any `bdk_wallet` data was written by listing keys under the respective
752+
// (unobfuscated) prefix.
753+
const V0_BDK_WALLET_PREFIX: &str = "bdk_wallet#";
754+
let request = ListKeyVersionsRequest {
755+
store_id: store_id.clone(),
756+
key_prefix: Some(V0_BDK_WALLET_PREFIX.to_string()),
757+
page_token: None,
758+
page_size: None,
759+
};
760+
761+
let response = client.list_key_versions(&request).await.map_err(|e| {
762+
let msg = format!("Failed to determine schema version: {}", e);
763+
Error::new(ErrorKind::Other, msg)
764+
})?;
765+
766+
let wallet_data_present = !response.key_versions.is_empty();
767+
if wallet_data_present {
768+
// If the wallet data is present, it means we're not running for the first time.
769+
Ok(VssSchemaVersion::V0)
770+
} else {
771+
// We're running for the first time, write the schema version to save unnecessary IOps
772+
// on future startup.
773+
let schema_version = VssSchemaVersion::V1;
774+
let encoded_version = schema_version.encode();
775+
776+
let storable_builder = StorableBuilder::new(RandEntropySource);
777+
let vss_version = -1;
778+
let aad = store_key.as_bytes();
779+
let storable =
780+
storable_builder.build(encoded_version, vss_version, &data_encryption_key, aad);
781+
782+
let request = PutObjectRequest {
783+
store_id: store_id.clone(),
784+
global_version: None,
785+
transaction_items: vec![KeyValue {
786+
key: store_key,
787+
version: vss_version,
788+
value: storable.encode_to_vec(),
789+
}],
790+
delete_items: vec![],
791+
};
792+
793+
client.put_object(&request).await.map_err(|e| {
794+
let msg = format!("Failed to write schema version: {}", e);
795+
Error::new(ErrorKind::Other, msg)
796+
})?;
797+
798+
Ok(schema_version)
799+
}
800+
}
801+
}
802+
679803
/// A source for generating entropy/randomness using [`rand`].
680804
pub(crate) struct RandEntropySource;
681805

@@ -708,7 +832,8 @@ mod tests {
708832
let mut vss_seed = [0u8; 32];
709833
rng.fill_bytes(&mut vss_seed);
710834
let header_provider = Arc::new(FixedHeaders::new(HashMap::new()));
711-
let vss_store = VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider);
835+
let vss_store =
836+
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap();
712837
do_read_write_remove_list_persist(&vss_store);
713838
}
714839

@@ -720,7 +845,8 @@ mod tests {
720845
let mut vss_seed = [0u8; 32];
721846
rng.fill_bytes(&mut vss_seed);
722847
let header_provider = Arc::new(FixedHeaders::new(HashMap::new()));
723-
let vss_store = VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider);
848+
let vss_store =
849+
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap();
724850

725851
do_read_write_remove_list_persist(&vss_store);
726852
drop(vss_store)
@@ -734,7 +860,8 @@ mod tests {
734860
let mut vss_seed = [0u8; 32];
735861
rng.fill_bytes(&mut vss_seed);
736862
let header_provider = Arc::new(FixedHeaders::new(HashMap::new()));
737-
let vss_store = VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider);
863+
let vss_store =
864+
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap();
738865

739866
let primary_namespace = "test_namespace";
740867
let secondary_namespace = "";

0 commit comments

Comments
 (0)