Skip to content

Commit b315e38

Browse files
committed
Parallelize ChannelMonitorUpdate loading
When reading `ChannelMonitor`s from a `MonitorUpdatingPersister` on startup, we have to make sure to load any `ChannelMonitorUpdate`s and re-apply them as well. Now that we know which `ChannelMonitorUpdate`s to load from `list`ing the entries from the `KVStore` we can parallelize the reads themselves, which we do here. Now, loading all `ChannelMonitor`s from an async `KVStore` requires only three full RTTs - one to list the set of `ChannelMonitor`s, one to both fetch the `ChanelMonitor` and list the set of `ChannelMonitorUpdate`s, and one to fetch all the `ChannelMonitorUpdate`s (with the last one skipped when there are no `ChannelMonitorUpdate`s to read).
1 parent af61d84 commit b315e38

File tree

1 file changed

+21
-17
lines changed

1 file changed

+21
-17
lines changed

lightning/src/util/persist.rs

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1106,23 +1106,27 @@ where
11061106
list_res?.into_iter().map(|name| UpdateName::new(name)).collect();
11071107
let mut updates = updates?;
11081108
updates.sort_unstable();
1109-
// TODO: Parallelize this loop
1110-
for update_name in updates {
1111-
if update_name.0 > current_update_id {
1112-
let update = self.read_monitor_update(monitor_key, &update_name).await?;
1113-
monitor
1114-
.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
1115-
.map_err(|e| {
1116-
log_error!(
1117-
self.logger,
1118-
"Monitor update failed. monitor: {} update: {} reason: {:?}",
1119-
monitor_key,
1120-
update_name.as_str(),
1121-
e
1122-
);
1123-
io::Error::new(io::ErrorKind::Other, "Monitor update failed")
1124-
})?;
1125-
}
1109+
let updates_to_load = updates.iter().filter(|update| update.0 > current_update_id);
1110+
let mut update_futures = Vec::with_capacity(updates_to_load.clone().count());
1111+
for update_name in updates_to_load {
1112+
update_futures.push(ResultFuture::Pending(Box::pin(async move {
1113+
(update_name, self.read_monitor_update(monitor_key, update_name).await)
1114+
})));
1115+
}
1116+
for (update_name, update_res) in MultiResultFuturePoller::new(update_futures).await {
1117+
let update = update_res?;
1118+
monitor
1119+
.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
1120+
.map_err(|e| {
1121+
log_error!(
1122+
self.logger,
1123+
"Monitor update failed. monitor: {} update: {} reason: {:?}",
1124+
monitor_key,
1125+
update_name.as_str(),
1126+
e
1127+
);
1128+
io::Error::new(io::ErrorKind::Other, "Monitor update failed")
1129+
})?;
11261130
}
11271131
Ok(Some((block_hash, monitor)))
11281132
}

0 commit comments

Comments
 (0)