Skip to content

Commit 2b97854

Browse files
committed
Determine VSS schema version at startup
Since we just made some breaking changes to how exactly we persist data via VSS (now using an `aad` that commits to the key and also obfuscating namespaces), we have to detect which schema version we're on to ensure backwards compatibility. To this end, we here start reading a persisted `vss_schema_version` key in `VssStore::new`. If it is present, we just return the encoded value (right now that can only be V1). If it is not present, it can either mean we run for the first time *or* we're on V0, which we determine checking if anything related to the `bdk_wallet` descriptors are present in the store. If we're running for the first time, we also persist the schema version to save us these rather inefficient steps on following startups.
1 parent e7d8207 commit 2b97854

File tree

2 files changed

+155
-19
lines changed

2 files changed

+155
-19
lines changed

src/builder.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,12 @@ impl NodeBuilder {
731731

732732
let vss_seed_bytes: [u8; 32] = vss_xprv.private_key.secret_bytes();
733733

734-
let vss_store = VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider);
734+
let vss_store =
735+
VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider).map_err(|e| {
736+
log_error!(logger, "Failed to setup VSS store: {}", e);
737+
BuildError::KVStoreSetupFailed
738+
})?;
739+
735740
build_with_store_internal(
736741
config,
737742
self.chain_data_source_config.as_ref(),

src/io/vss_store.rs

Lines changed: 149 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ use std::sync::{Arc, Mutex};
1616
use std::time::Duration;
1717

1818
use bitcoin::hashes::{sha256, Hash, HashEngine, Hmac, HmacEngine};
19+
use lightning::impl_writeable_tlv_based_enum;
1920
use lightning::io::{self, Error, ErrorKind};
2021
use lightning::util::persist::{KVStore, KVStoreSync};
22+
use lightning::util::ser::{Readable, Writeable};
2123
use prost::Message;
2224
use rand::RngCore;
2325
use vss_client::client::VssClient;
@@ -59,6 +61,13 @@ enum VssSchemaVersion {
5961
V1,
6062
}
6163

64+
impl_writeable_tlv_based_enum!(VssSchemaVersion,
65+
(0, V0) => {},
66+
(1, V1) => {},
67+
);
68+
69+
const VSS_SCHEMA_VERSION_KEY: &str = "vss_schema_version";
70+
6271
// We set this to a small number of threads that would still allow to make some progress if one
6372
// would hit a blocking case
6473
const INTERNAL_RUNTIME_WORKERS: usize = 2;
@@ -82,23 +91,20 @@ impl VssStore {
8291
pub(crate) fn new(
8392
base_url: String, store_id: String, vss_seed: [u8; 32],
8493
header_provider: Arc<dyn VssHeaderProvider>,
85-
) -> Self {
94+
) -> io::Result<Self> {
8695
let next_version = AtomicU64::new(1);
87-
let internal_runtime = Some(
88-
tokio::runtime::Builder::new_multi_thread()
89-
.enable_all()
90-
.thread_name_fn(|| {
91-
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
92-
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
93-
format!("ldk-node-vss-runtime-{}", id)
94-
})
95-
.worker_threads(INTERNAL_RUNTIME_WORKERS)
96-
.max_blocking_threads(INTERNAL_RUNTIME_WORKERS)
97-
.build()
98-
.unwrap(),
99-
);
96+
let internal_runtime = tokio::runtime::Builder::new_multi_thread()
97+
.enable_all()
98+
.thread_name_fn(|| {
99+
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
100+
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
101+
format!("ldk-node-vss-runtime-{}", id)
102+
})
103+
.worker_threads(INTERNAL_RUNTIME_WORKERS)
104+
.max_blocking_threads(INTERNAL_RUNTIME_WORKERS)
105+
.build()
106+
.unwrap();
100107

101-
let schema_version = VssSchemaVersion::V0;
102108
let (data_encryption_key, obfuscation_master_key) =
103109
derive_data_encryption_and_obfuscation_keys(&vss_seed);
104110
let key_obfuscator = KeyObfuscator::new(obfuscation_master_key);
@@ -110,6 +116,21 @@ impl VssStore {
110116
header_provider.clone(),
111117
);
112118

119+
let runtime_handle = internal_runtime.handle();
120+
let schema_store_id = store_id.clone();
121+
let schema_key_obfuscator = KeyObfuscator::new(obfuscation_master_key);
122+
let (schema_version, blocking_client) = tokio::task::block_in_place(move || {
123+
runtime_handle.block_on(async {
124+
determine_and_write_schema_version(
125+
blocking_client,
126+
schema_store_id,
127+
data_encryption_key,
128+
schema_key_obfuscator,
129+
)
130+
.await
131+
})
132+
})?;
133+
113134
let async_retry_policy = retry_policy();
114135
let async_client =
115136
VssClient::new_with_headers(base_url, async_retry_policy, header_provider);
@@ -123,7 +144,7 @@ impl VssStore {
123144
key_obfuscator,
124145
));
125146

126-
Self { inner, next_version, internal_runtime }
147+
Ok(Self { inner, next_version, internal_runtime: Some(internal_runtime) })
127148
}
128149

129150
// Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys
@@ -658,6 +679,114 @@ fn retry_policy() -> CustomRetryPolicy {
658679
}) as _)
659680
}
660681

682+
// FIXME: This returns the used client as currently `VssClient`'s `RetryPolicy`s aren't `Clone`. So
683+
// we're forced to take the owned client and return it to be able to reuse the same connection
684+
// later.
685+
async fn determine_and_write_schema_version(
686+
client: VssClient<CustomRetryPolicy>, store_id: String, data_encryption_key: [u8; 32],
687+
key_obfuscator: KeyObfuscator,
688+
) -> io::Result<(VssSchemaVersion, VssClient<CustomRetryPolicy>)> {
689+
// Build the obfuscated `vss_schema_version` key.
690+
let obfuscated_prefix = key_obfuscator.obfuscate(&format! {"{}#{}", "", ""});
691+
let obfuscated_key = key_obfuscator.obfuscate(VSS_SCHEMA_VERSION_KEY);
692+
let store_key = format!("{}#{}", obfuscated_prefix, obfuscated_key);
693+
694+
// Try to read the stored schema version.
695+
let request = GetObjectRequest { store_id: store_id.clone(), key: store_key.clone() };
696+
let resp = match client.get_object(&request).await {
697+
Ok(resp) => Some(resp),
698+
Err(VssError::NoSuchKeyError(..)) => {
699+
// The value is not set.
700+
None
701+
},
702+
Err(e) => {
703+
let msg = format!("Failed to read schema version: {}", e);
704+
return Err(Error::new(ErrorKind::Other, msg));
705+
},
706+
};
707+
708+
if let Some(resp) = resp {
709+
// The schema version was present, so just decrypt the stored data.
710+
711+
// unwrap safety: resp.value must be always present for a non-erroneous VSS response, otherwise
712+
// it is an API-violation which is converted to [`VssError::InternalServerError`] in [`VssClient`]
713+
let storable = Storable::decode(&resp.value.unwrap().value[..]).map_err(|e| {
714+
let msg = format!("Failed to decode schema version: {}", e);
715+
Error::new(ErrorKind::Other, msg)
716+
})?;
717+
718+
let storable_builder = StorableBuilder::new(RandEntropySource);
719+
// Schema version was added starting with V1, so if set at all, we use the key as `aad`
720+
let aad = store_key.as_bytes();
721+
let decrypted = storable_builder
722+
.deconstruct(storable, &data_encryption_key, aad)
723+
.map_err(|e| {
724+
let msg = format!("Failed to decode schema version: {}", e);
725+
Error::new(ErrorKind::Other, msg)
726+
})?
727+
.0;
728+
729+
let schema_version: VssSchemaVersion = Readable::read(&mut io::Cursor::new(decrypted))
730+
.map_err(|e| {
731+
let msg = format!("Failed to decode schema version: {}", e);
732+
Error::new(ErrorKind::Other, msg)
733+
})?;
734+
Ok((schema_version, client))
735+
} else {
736+
// The schema version wasn't present, this either means we're running for the first time *or* it's V0 pre-migration (predating writing of the schema version).
737+
738+
// Check if any `bdk_wallet` data was written by listing keys under the respective
739+
// (unobfuscated) prefix.
740+
const V0_BDK_WALLET_PREFIX: &str = "bdk_wallet#";
741+
let request = ListKeyVersionsRequest {
742+
store_id: store_id.clone(),
743+
key_prefix: Some(V0_BDK_WALLET_PREFIX.to_string()),
744+
page_token: None,
745+
page_size: None,
746+
};
747+
748+
let response = client.list_key_versions(&request).await.map_err(|e| {
749+
let msg = format!("Failed to determine schema version: {}", e);
750+
Error::new(ErrorKind::Other, msg)
751+
})?;
752+
753+
let wallet_data_present = !response.key_versions.is_empty();
754+
if wallet_data_present {
755+
// If the wallet data is present, it means we're not running for the first time.
756+
Ok((VssSchemaVersion::V0, client))
757+
} else {
758+
// We're running for the first time, write the schema version to save unnecessary IOps
759+
// on future startup.
760+
let schema_version = VssSchemaVersion::V1;
761+
let encoded_version = schema_version.encode();
762+
763+
let storable_builder = StorableBuilder::new(RandEntropySource);
764+
let vss_version = -1;
765+
let aad = store_key.as_bytes();
766+
let storable =
767+
storable_builder.build(encoded_version, vss_version, &data_encryption_key, aad);
768+
769+
let request = PutObjectRequest {
770+
store_id,
771+
global_version: None,
772+
transaction_items: vec![KeyValue {
773+
key: store_key,
774+
version: vss_version,
775+
value: storable.encode_to_vec(),
776+
}],
777+
delete_items: vec![],
778+
};
779+
780+
client.put_object(&request).await.map_err(|e| {
781+
let msg = format!("Failed to write schema version: {}", e);
782+
Error::new(ErrorKind::Other, msg)
783+
})?;
784+
785+
Ok((schema_version, client))
786+
}
787+
}
788+
}
789+
661790
/// A source for generating entropy/randomness using [`rand`].
662791
pub(crate) struct RandEntropySource;
663792

@@ -690,7 +819,8 @@ mod tests {
690819
let mut vss_seed = [0u8; 32];
691820
rng.fill_bytes(&mut vss_seed);
692821
let header_provider = Arc::new(FixedHeaders::new(HashMap::new()));
693-
let vss_store = VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider);
822+
let vss_store =
823+
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap();
694824
do_read_write_remove_list_persist(&vss_store);
695825
}
696826

@@ -702,7 +832,8 @@ mod tests {
702832
let mut vss_seed = [0u8; 32];
703833
rng.fill_bytes(&mut vss_seed);
704834
let header_provider = Arc::new(FixedHeaders::new(HashMap::new()));
705-
let vss_store = VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider);
835+
let vss_store =
836+
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap();
706837

707838
do_read_write_remove_list_persist(&vss_store);
708839
drop(vss_store)

0 commit comments

Comments
 (0)