Skip to content

Commit d33f793

Browse files
authored
Parallelize pre key fetches
1 parent 9c0b466 commit d33f793

File tree

4 files changed

+130
-60
lines changed

4 files changed

+130
-60
lines changed

SignalServiceKit/Messages/MessageSender+Errors.swift

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ public import LibSignalClient
88

99
public enum MessageSenderError: Error, IsRetryableProvider, UserErrorDescriptionProvider {
1010
case prekeyRateLimit
11-
case missingDevice
1211
case blockedContactRecipient
1312
case threadMissing
1413

@@ -19,7 +18,7 @@ public enum MessageSenderError: Error, IsRetryableProvider, UserErrorDescription
1918
"ERROR_DESCRIPTION_MESSAGE_SEND_FAILED_DUE_TO_BLOCK_LIST",
2019
comment: "Error message indicating that message send failed due to block list"
2120
)
22-
case .prekeyRateLimit, .missingDevice, .threadMissing:
21+
case .prekeyRateLimit, .threadMissing:
2322
return OWSLocalizedString(
2423
"MESSAGE_STATUS_SEND_FAILED",
2524
comment: "Label indicating that a message failed to send."
@@ -35,8 +34,6 @@ public enum MessageSenderError: Error, IsRetryableProvider, UserErrorDescription
3534
// TODO: Retry with backoff.
3635
// TODO: Can we honor a retry delay hint from the response?
3736
return true
38-
case .missingDevice:
39-
return true
4037
case .blockedContactRecipient:
4138
return false
4239
case .threadMissing:

SignalServiceKit/Messages/MessageSender.swift

Lines changed: 126 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -71,23 +71,32 @@ public class MessageSender {
7171
/// Establishes a session with the recipient if one doesn't already exist.
7272
private func createSession(
7373
serviceId: ServiceId,
74-
deviceId: DeviceId,
74+
deviceId: PreKeyDevice,
7575
sealedSenderParameters: SealedSenderParameters?
7676
) async throws {
7777
do {
78-
let preKeyBundle = try await makePrekeyRequest(
78+
var preKeyBundle = try await makePreKeyRequest(
7979
serviceId: serviceId,
8080
deviceId: deviceId,
8181
sealedSenderParameters: sealedSenderParameters
8282
)
8383

8484
try await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
85-
try self._createSession(
86-
for: preKeyBundle,
87-
serviceId: serviceId,
88-
deviceId: deviceId,
89-
transaction: tx
90-
)
85+
switch deviceId {
86+
case .all:
87+
self.updateDevices(
88+
serviceId: serviceId,
89+
deviceIds: preKeyBundle.devices.map(\.deviceId),
90+
tx: tx
91+
)
92+
case .specific(let deviceId):
93+
owsAssertDebug(preKeyBundle.devices.map(\.deviceId) == [deviceId], "Server returned unexpected device bundles.")
94+
preKeyBundle.devices.removeAll(where: { $0.deviceId != deviceId })
95+
guard preKeyBundle.devices.map(\.deviceId) == [deviceId] else {
96+
throw OWSAssertionError("The server didn't return a bundle for the device we requested.")
97+
}
98+
}
99+
try self._createSessions(for: preKeyBundle, serviceId: serviceId, tx: tx)
91100
}
92101
} catch {
93102
switch error {
@@ -117,21 +126,24 @@ public class MessageSender {
117126
}
118127
}
119128

120-
private func makePrekeyRequest(
129+
private enum PreKeyDevice {
130+
case all
131+
case specific(DeviceId)
132+
}
133+
134+
private func makePreKeyRequest(
121135
serviceId: ServiceId,
122-
deviceId: DeviceId,
136+
deviceId: PreKeyDevice,
123137
sealedSenderParameters: SealedSenderParameters?
124138
) async throws -> SignalServiceKit.PreKeyBundle {
125-
Logger.info("serviceId: \(serviceId).\(deviceId)")
126-
127139
// As an optimization, skip the request if an error is guaranteed.
128140
if willDefinitelyHaveUntrustedIdentityError(for: serviceId) {
129-
Logger.info("Skipping prekey request due to untrusted identity.")
141+
Logger.warn("Skipping prekey request due to untrusted identity.")
130142
throw UntrustedIdentityError(serviceId: serviceId)
131143
}
132144

133145
if willLikelyHaveInvalidKeySignatureError(for: serviceId) {
134-
Logger.info("Skipping prekey request due to invalid prekey signature.")
146+
Logger.warn("Skipping prekey request due to invalid prekey signature.")
135147

136148
// Check if this error is happening repeatedly. If so, return an
137149
// InvalidKeySignatureError as a terminal failure.
@@ -159,8 +171,15 @@ public class MessageSender {
159171
)
160172

161173
do {
174+
let deviceIdParam: String
175+
switch deviceId {
176+
case .all:
177+
deviceIdParam = "*"
178+
case .specific(let deviceId):
179+
deviceIdParam = String(deviceId.rawValue)
180+
}
162181
let result = try await requestMaker.makeRequest {
163-
return OWSRequestFactory.recipientPreKeyRequest(serviceId: serviceId, deviceId: deviceId, auth: $0)
182+
return OWSRequestFactory.recipientPreKeyRequest(serviceId: serviceId, deviceId: deviceIdParam, auth: $0)
164183
}
165184
guard let responseData = result.response.responseBodyData else {
166185
throw OWSAssertionError("Prekey fetch missing response object.")
@@ -171,8 +190,6 @@ public class MessageSender {
171190
return bundle
172191
} catch {
173192
switch error.httpStatusCode {
174-
case 404:
175-
throw MessageSenderError.missingDevice
176193
case 429:
177194
throw MessageSenderError.prekeyRateLimit
178195
default:
@@ -181,23 +198,31 @@ public class MessageSender {
181198
}
182199
}
183200

184-
private func _createSession(
201+
private func _createSessions(
185202
for preKeyBundle: SignalServiceKit.PreKeyBundle,
186203
serviceId: ServiceId,
187-
deviceId: DeviceId,
188-
transaction: DBWriteTransaction
204+
tx: DBWriteTransaction
189205
) throws {
190206
assert(!Thread.isMainThread)
191207

208+
for deviceBundle in preKeyBundle.devices {
209+
try _createSession(for: deviceBundle, serviceId: serviceId, identityKey: preKeyBundle.identityKey, tx: tx)
210+
}
211+
}
212+
213+
private func _createSession(
214+
for deviceBundle: SignalServiceKit.PreKeyBundle.PreKeyDeviceBundle,
215+
serviceId: ServiceId,
216+
identityKey: IdentityKey,
217+
tx transaction: DBWriteTransaction
218+
) throws {
219+
let deviceId = deviceBundle.deviceId
220+
192221
if try validSession(for: serviceId, deviceId: deviceId, tx: transaction) != nil {
193222
Logger.warn("Session already exists for \(serviceId), deviceId: \(deviceId).")
194223
return
195224
}
196225

197-
guard let deviceBundle = preKeyBundle.devices.first(where: { $0.deviceId == deviceId }) else {
198-
throw OWSAssertionError("Server didn't provide a bundle for the requested device.")
199-
}
200-
201226
Logger.info("Creating session for \(serviceId), deviceId: \(deviceId); signed \(deviceBundle.signedPreKey.keyId), one-time \(deviceBundle.preKey?.keyId as Optional), kyber \(deviceBundle.pqPreKey.keyId as Optional)")
202227

203228
let bundle: LibSignalClient.PreKeyBundle
@@ -210,7 +235,7 @@ public class MessageSender {
210235
signedPrekeyId: deviceBundle.signedPreKey.keyId,
211236
signedPrekey: deviceBundle.signedPreKey.publicKey,
212237
signedPrekeySignature: deviceBundle.signedPreKey.signature,
213-
identity: preKeyBundle.identityKey,
238+
identity: identityKey,
214239
kyberPrekeyId: deviceBundle.pqPreKey.keyId,
215240
kyberPrekey: deviceBundle.pqPreKey.publicKey,
216241
kyberPrekeySignature: deviceBundle.pqPreKey.signature
@@ -222,7 +247,7 @@ public class MessageSender {
222247
signedPrekeyId: deviceBundle.signedPreKey.keyId,
223248
signedPrekey: deviceBundle.signedPreKey.publicKey,
224249
signedPrekeySignature: deviceBundle.signedPreKey.signature,
225-
identity: preKeyBundle.identityKey,
250+
identity: identityKey,
226251
kyberPrekeyId: deviceBundle.pqPreKey.keyId,
227252
kyberPrekey: deviceBundle.pqPreKey.publicKey,
228253
kyberPrekeySignature: deviceBundle.pqPreKey.signature
@@ -244,7 +269,7 @@ public class MessageSender {
244269
Logger.warn("Found untrusted identity for \(serviceId)")
245270
handleUntrustedIdentityKeyError(
246271
serviceId: serviceId,
247-
preKeyBundle: preKeyBundle,
272+
identityKey: identityKey,
248273
transaction: transaction
249274
)
250275
throw UntrustedIdentityError(serviceId: serviceId)
@@ -268,11 +293,11 @@ public class MessageSender {
268293

269294
private func handleUntrustedIdentityKeyError(
270295
serviceId: ServiceId,
271-
preKeyBundle: SignalServiceKit.PreKeyBundle,
296+
identityKey: IdentityKey,
272297
transaction tx: DBWriteTransaction
273298
) {
274299
let identityManager = DependenciesBridge.shared.identityManager
275-
identityManager.saveIdentityKey(preKeyBundle.identityKey, for: serviceId, tx: tx)
300+
identityManager.saveIdentityKey(identityKey, for: serviceId, tx: tx)
276301
}
277302

278303
/// If true, we expect fetching a bundle will fail no matter what it contains.
@@ -1413,26 +1438,44 @@ public class MessageSender {
14131438
throw MessageSenderNoSessionForTransientMessageError()
14141439
}
14151440

1416-
Logger.info("Fetching pre keys for some of \(serviceId)'s devices: \(missingSessions.map(\.deviceId))")
1417-
1418-
for missingSession in missingSessions {
1441+
// If we don't have *any* sessions, we can do less work by asking the
1442+
// server for all of them at the same time. (This also helps establish the
1443+
// initial list of devices when contacting someone for the first time.)
1444+
if deviceMessages.isEmpty {
14191445
do {
14201446
try await createSession(
14211447
serviceId: serviceId,
1422-
deviceId: missingSession.deviceId,
1448+
deviceId: .all,
14231449
sealedSenderParameters: sealedSenderParameters
14241450
)
1425-
} catch MessageSenderError.missingDevice {
1426-
// If we have an invalid device exception, remove this device from the
1427-
// recipient and suppress the error.
1428-
await databaseStorage.awaitableWrite { tx in
1429-
self.updateDevices(
1430-
serviceId: serviceId,
1431-
devicesToAdd: [],
1432-
devicesToRemove: [missingSession.deviceId],
1433-
transaction: tx
1434-
)
1451+
} catch where error.httpStatusCode == 404 {
1452+
try await handle404(serviceId: serviceId, isSelfSend: isSelfSend)
1453+
}
1454+
} else {
1455+
try await withThrowingTaskGroup { taskGroup in
1456+
for missingSession in missingSessions {
1457+
taskGroup.addTask {
1458+
do {
1459+
try await self.createSession(
1460+
serviceId: serviceId,
1461+
deviceId: .specific(missingSession.deviceId),
1462+
sealedSenderParameters: sealedSenderParameters
1463+
)
1464+
} catch where error.httpStatusCode == 404 {
1465+
// If we have an invalid device exception, remove this device from the
1466+
// recipient and suppress the error.
1467+
await databaseStorage.awaitableWrite { tx in
1468+
self.updateDevices(
1469+
serviceId: serviceId,
1470+
devicesToAdd: [],
1471+
devicesToRemove: [missingSession.deviceId],
1472+
transaction: tx
1473+
)
1474+
}
1475+
}
1476+
}
14351477
}
1478+
try await taskGroup.waitForAll()
14361479
}
14371480
}
14381481

@@ -1640,11 +1683,7 @@ public class MessageSender {
16401683

16411684
switch responseError.httpStatusCode {
16421685
case 404:
1643-
if !messageSend.isSelfSend {
1644-
try await checkIfAccountExists(serviceId: messageSend.serviceId)
1645-
}
1646-
Logger.warn("Server endpoints disagree about registration status for \(messageSend.serviceId). Backing off and retrying…")
1647-
throw OWSRetryableMessageSenderError()
1686+
try await handle404(serviceId: messageSend.serviceId, isSelfSend: messageSend.isSelfSend)
16481687
case 409:
16491688
let response = try MismatchedDevices.parse(responseError.httpResponseData ?? Data())
16501689
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
@@ -1678,6 +1717,14 @@ public class MessageSender {
16781717
}
16791718
}
16801719

1720+
private func handle404(serviceId: ServiceId, isSelfSend: Bool) async throws -> Never {
1721+
if !isSelfSend {
1722+
try await checkIfAccountExists(serviceId: serviceId)
1723+
}
1724+
Logger.warn("Server endpoints disagree about registration status for \(serviceId). Backing off and retrying…")
1725+
throw OWSRetryableMessageSenderError()
1726+
}
1727+
16811728
// MARK: - Unregistered, Missing, & Stale Devices
16821729

16831730
func handleMismatchedDevices(serviceId: ServiceId, missingDevices: [DeviceId], extraDevices: [DeviceId], tx: DBWriteTransaction) {
@@ -1698,31 +1745,57 @@ public class MessageSender {
16981745
}
16991746
}
17001747

1748+
private func updateDevices(
1749+
serviceId: ServiceId,
1750+
deviceIds: [DeviceId],
1751+
tx: DBWriteTransaction
1752+
) {
1753+
let recipientFetcher = DependenciesBridge.shared.recipientFetcher
1754+
let recipient = recipientFetcher.fetchOrCreate(serviceId: serviceId, tx: tx)
1755+
self._updateDevices(
1756+
serviceId: serviceId,
1757+
recipient: recipient,
1758+
devicesToAdd: Array(Set(deviceIds).subtracting(recipient.deviceIds)),
1759+
devicesToRemove: Array(Set(recipient.deviceIds).subtracting(deviceIds)),
1760+
tx: tx
1761+
)
1762+
}
1763+
17011764
func updateDevices(
17021765
serviceId: ServiceId,
17031766
devicesToAdd: [DeviceId],
17041767
devicesToRemove: [DeviceId],
1705-
transaction: DBWriteTransaction
1768+
transaction tx: DBWriteTransaction
1769+
) {
1770+
let recipientFetcher = DependenciesBridge.shared.recipientFetcher
1771+
let recipient = recipientFetcher.fetchOrCreate(serviceId: serviceId, tx: tx)
1772+
self._updateDevices(serviceId: serviceId, recipient: recipient, devicesToAdd: devicesToAdd, devicesToRemove: devicesToRemove, tx: tx)
1773+
}
1774+
1775+
private func _updateDevices(
1776+
serviceId: ServiceId,
1777+
recipient: SignalRecipient,
1778+
devicesToAdd: [DeviceId],
1779+
devicesToRemove: [DeviceId],
1780+
tx: DBWriteTransaction
17061781
) {
17071782
AssertNotOnMainThread()
17081783
owsAssertDebug(Set(devicesToAdd).isDisjoint(with: devicesToRemove))
17091784

1710-
let recipientFetcher = DependenciesBridge.shared.recipientFetcher
1711-
let recipient = recipientFetcher.fetchOrCreate(serviceId: serviceId, tx: transaction)
17121785
let recipientManager = DependenciesBridge.shared.recipientManager
17131786
recipientManager.modifyAndSave(
17141787
recipient,
17151788
deviceIdsToAdd: devicesToAdd,
17161789
deviceIdsToRemove: devicesToRemove,
17171790
shouldUpdateStorageService: true,
1718-
tx: transaction
1791+
tx: tx
17191792
)
17201793

17211794
if !devicesToRemove.isEmpty {
17221795
Logger.info("Archiving sessions for extra devices: \(devicesToRemove)")
17231796
let sessionStore = DependenciesBridge.shared.signalProtocolStoreManager.signalProtocolStore(for: .aci).sessionStore
17241797
for deviceId in devicesToRemove {
1725-
sessionStore.archiveSession(for: serviceId, deviceId: deviceId, tx: transaction)
1798+
sessionStore.archiveSession(for: serviceId, deviceId: deviceId, tx: tx)
17261799
}
17271800
}
17281801
}

SignalServiceKit/Network/API/Requests/OWSRequestFactory.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ public enum OWSRequestFactory {
487487
return TSRequest(url: URL(string: path)!, method: "GET", parameters: [:])
488488
}
489489

490-
static func recipientPreKeyRequest(serviceId: ServiceId, deviceId: DeviceId, auth: TSRequest.SealedSenderAuth?) -> TSRequest {
490+
static func recipientPreKeyRequest(serviceId: ServiceId, deviceId: String, auth: TSRequest.SealedSenderAuth?) -> TSRequest {
491491
let path = "\(self.textSecureKeysAPI)/\(serviceId.serviceIdString)/\(deviceId)"
492492

493493
var request = TSRequest(url: URL(string: path)!, method: "GET", parameters: [:])

SignalServiceKit/Storage/AxolotlStore/Model/PreKeyBundle.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import Foundation
77
import LibSignalClient
88

99
struct PreKeyBundle: Decodable {
10-
let identityKey: IdentityKey
11-
let devices: [PreKeyDeviceBundle]
10+
var identityKey: IdentityKey
11+
var devices: [PreKeyDeviceBundle]
1212

1313
enum CodingKeys: CodingKey {
1414
case identityKey

0 commit comments

Comments
 (0)