Skip to content

Commit 2fef253

Browse files
authored
fix: update last event id after CC transaction completes - WPB-22219 (#4005)
1 parent d0aad98 commit 2fef253

File tree

4 files changed

+89
-11
lines changed

4 files changed

+89
-11
lines changed

WireDomain/Sources/WireDomain/Synchronization/PullPendingUpdateEventsSync.swift

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,11 @@ public struct PullPendingUpdateEventsSync: PullPendingUpdateEventsSyncProtocol {
9191
// We'll insert new events from this index.
9292
let currentIndex = try await store.indexOfLastEventEnvelope() + 1
9393

94+
var lastEnvelopeID: UUID?
95+
9496
// We are decrypting the batch within one core crypto transaction
9597
try await coreCryptoProvider.coreCrypto().perform { context in
9698

97-
var lastEnvelopeID: UUID?
9899
var decryptedEnvelopes: [UpdateEventEnvelope] = []
99100

100101
for envelope in envelopes {
@@ -125,15 +126,18 @@ public struct PullPendingUpdateEventsSync: PullPendingUpdateEventsSyncProtocol {
125126
decryptedEnvelopes,
126127
index: currentIndex
127128
)
129+
}
128130

129-
if let lastEnvelopeID {
130-
// We keep track of the last event id so next time we fetch
131-
// only new events. We don't track transient events because
132-
// these events aren't stored in the backend.
133-
WireLogger.sync.debug("storing last event id", attributes: [.eventEnvelopeID: lastEnvelopeID])
134-
store.storeLastEventID(id: lastEnvelopeID)
135-
}
136-
131+
if let lastEnvelopeID {
132+
// We keep track of the last event id so next time we fetch
133+
// only new events. We don't track transient events because
134+
// these events aren't stored in the backend.
135+
//
136+
// NOTE: it's important the we are updating the last event ID
137+
// after the CC transaction has successfully completed,
138+
// otherwise we risk data loss in case of a crash.
139+
WireLogger.sync.debug("storing last event id", attributes: [.eventEnvelopeID: lastEnvelopeID])
140+
store.storeLastEventID(id: lastEnvelopeID)
137141
}
138142
}
139143

WireDomain/Sources/WireDomain/Synchronization/PullPendingUpdateEventsV2Sync.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,10 @@ public struct PullPendingUpdateEventsSyncV2: PullPendingUpdateEventsSyncV2Protoc
146146
}
147147
}
148148

149-
// ack
149+
// ack the decrypted events
150+
//
151+
// NOTE: it's important that we ack after the CC transaction has succesfully completed,
152+
// otherwise we risk data loss in case of a crash.
150153
if let lastEnvelope = storedEnvelopes.last?.0 {
151154
try await acknowledgeUntilEnvelope(lastEnvelope, through: pushChannel, batchSize: storedEnvelopes.count)
152155
}

WireDomain/Tests/WireDomainTests/Synchronization/PullPendingUpdateEventsSyncTests.swift

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,52 @@ final class PullPendingUpdateEventsSyncTests: XCTestCase {
120120
XCTAssertEqual(journal[.brokenMLSGroupIDs].first, Scaffolding.mlsGroupID)
121121
}
122122

123+
func testLastEventIDIsNotPersisted_untilTransactionIsCompleted() async throws {
124+
// Mock
125+
store.lastEventID_MockValue = Scaffolding.lastEventID
126+
store.indexOfLastEventEnvelope_MockValue = Scaffolding.indexOfLastEventEnvelope
127+
128+
api.getUpdateEventsSelfClientIDSinceEventID_MockValue = PayloadPager(start: "page2") { start in
129+
switch start {
130+
case "page2":
131+
return Scaffolding.page2
132+
133+
default:
134+
throw "unknown page: \(start ?? "nil")"
135+
}
136+
}
137+
138+
decryptor.decryptEventsInContext_MockMethod = { envelope, _ in
139+
EventDecryptorResult(events: envelope.events, brokenMLSGroupIDs: [Scaffolding.mlsGroupID])
140+
}
141+
142+
store.persistEventEnvelopesIndex_MockMethod = { _, _ in }
143+
store.storeLastEventIDId_MockMethod = { _ in }
144+
store.storeServerTimeDelta_MockMethod = { _ in }
145+
146+
coreCrypto.completeTransactionByDefault = false
147+
148+
// When
149+
let pullingEventsTask = Task { [sut] in
150+
try await sut.pull()
151+
}
152+
153+
// we wait until the sync tries to commit the batch of decrypted events
154+
try await coreCrypto.waitUntilTransactionIsPending()
155+
156+
// Then
157+
try XCTAssertCount(store.storeLastEventIDId_Invocations, count: 0)
158+
159+
coreCrypto.completeAllTransactions()
160+
_ = await pullingEventsTask.result
161+
162+
// after allowing the transaction to complete we should we see
163+
// that the last event ID got persisted
164+
let storeLastEventIDInvocations = store.storeLastEventIDId_Invocations
165+
try XCTAssertCount(storeLastEventIDInvocations, count: 1)
166+
XCTAssertEqual(storeLastEventIDInvocations[0], Scaffolding.envelope4.id)
167+
}
168+
123169
}
124170

125171
private enum Scaffolding {

wire-ios-data-model/Support/Sources/MockSafeCoreCrypto.swift

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ public class MockSafeCoreCrypto: SafeCoreCryptoProtocol {
2424

2525
var coreCrypto: MockCoreCryptoProtocol
2626
var coreCryptoContext: MockCoreCryptoContextProtocol
27+
var completeTransactionByDefault: Bool = true
28+
29+
private var transactionContinuations: [CheckedContinuation<Void, Never>] = []
2730

2831
public init(
2932
coreCrypto: MockCoreCryptoProtocol = .init(),
@@ -48,7 +51,16 @@ public class MockSafeCoreCrypto: SafeCoreCryptoProtocol {
4851
var performAsyncCount = 0
4952
public func perform<T>(_ block: (WireCoreCrypto.CoreCryptoContextProtocol) async throws -> T) async rethrows -> T {
5053
performAsyncCount += 1
51-
return try await block(coreCryptoContext)
54+
55+
let result = try await block(coreCryptoContext)
56+
57+
if !completeTransactionByDefault {
58+
await withCheckedContinuation { continuation in
59+
transactionContinuations.append(continuation)
60+
}
61+
}
62+
63+
return result
5264
}
5365

5466
public func configure(block: (any WireCoreCrypto.CoreCryptoProtocol) async throws -> Void) async throws {
@@ -65,6 +77,19 @@ public class MockSafeCoreCrypto: SafeCoreCryptoProtocol {
6577
try mock(clientID)
6678
}
6779

80+
public func waitUntilTransactionIsPending() async throws {
81+
while transactionContinuations.isEmpty {
82+
try await Task.sleep(nanoseconds: 1_000_000)
83+
}
84+
}
85+
86+
public func completeAllTransactions() {
87+
transactionContinuations.forEach { cont in
88+
cont.resume()
89+
}
90+
91+
}
92+
6893
var tearDownCount = 0
6994
public func tearDown() throws {
7095
tearDownCount += 1

0 commit comments

Comments
 (0)