@@ -1085,6 +1085,10 @@ where
1085
1085
if persist_update {
1086
1086
let monitor_key = monitor_name. to_string ( ) ;
1087
1087
let update_name = UpdateName :: from ( update. update_id ) ;
1088
+ // Note that this is NOT an async function, but rather calls the *sync* KVStore
1089
+ // write method, allowing it to do its queueing immediately, and then return a
1090
+ // future for the completion of the write. This ensures monitor persistence
1091
+ // ordering is preserved.
1088
1092
res_a = Some ( self . kv_store . write (
1089
1093
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
1090
1094
monitor_key. as_str ( ) ,
@@ -1093,6 +1097,10 @@ where
1093
1097
) ) ;
1094
1098
} else {
1095
1099
// We could write this update, but it meets criteria of our design that calls for a full monitor write.
1100
+ // Note that this is NOT an async function, but rather calls the *sync* KVStore
1101
+ // write method, allowing it to do its queueing immediately, and then return a
1102
+ // future for the completion of the write. This ensures monitor persistence
1103
+ // ordering is preserved. This, thus, must happen before any await we do below.
1096
1104
let write_fut = self . persist_new_channel ( monitor_name, monitor) ;
1097
1105
let latest_update_id = monitor. get_latest_update_id ( ) ;
1098
1106
@@ -1120,6 +1128,9 @@ where
1120
1128
res_c = Some ( self . persist_new_channel ( monitor_name, monitor) ) ;
1121
1129
}
1122
1130
async move {
1131
+ // Complete any pending future(s). Note that to keep one return type we have to end
1132
+ // with a single async move block that we return, rather than trying to return the
1133
+ // individual futures themselves.
1123
1134
if let Some ( a) = res_a {
1124
1135
a. await ?;
1125
1136
}
0 commit comments