Skip to content

Commit b6a9d23

Browse files
authored
Track completed unit count for backup attachment upload progress
1 parent d33f793 commit b6a9d23

File tree

8 files changed

+160
-37
lines changed

8 files changed

+160
-37
lines changed

SignalServiceKit/Backups/Attachments/BackupAttachmentUploadProgress.swift

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public actor BackupAttachmentUploadProgressImpl: BackupAttachmentUploadProgress
8282
let queueSnapshot = try self.computeRemainingUnuploadedByteCount()
8383
let sink = OWSProgress.createSink(block)
8484
let source = await sink.addSource(withLabel: "", unitCount: queueSnapshot.totalByteCount)
85+
source.incrementCompletedUnitCount(by: queueSnapshot.completedByteCount)
8586
let observer = Observer(
8687
queueSnapshot: queueSnapshot,
8788
sink: sink,
@@ -226,6 +227,7 @@ public actor BackupAttachmentUploadProgressImpl: BackupAttachmentUploadProgress
226227

227228
fileprivate struct UploadQueueSnapshot {
228229
let totalByteCount: UInt64
230+
let completedByteCount: UInt64
229231
// We want to ignore updates from uploads that were scheduled after
230232
// we started observing. Take advantage of sequential row ids by
231233
// ignoring updates from ids that came after initial setup.
@@ -234,24 +236,46 @@ public actor BackupAttachmentUploadProgressImpl: BackupAttachmentUploadProgress
234236

235237
private nonisolated func computeRemainingUnuploadedByteCount() throws -> UploadQueueSnapshot {
236238
return try db.read { tx in
237-
var totalByteCount: UInt64 = 0
239+
var remainingByteCount: UInt64 = 0
240+
var completedByteCount: UInt64 = 0
238241
var maxRowId: Int64?
239242

240-
let cursor = try QueuedBackupAttachmentUpload
243+
var cursor = try QueuedBackupAttachmentUpload
244+
.filter(
245+
Column(QueuedBackupAttachmentUpload.CodingKeys.state)
246+
== QueuedBackupAttachmentUpload.State.ready.rawValue
247+
)
241248
// Don't coun't thumbnails in progress
242249
.filter(Column(QueuedBackupAttachmentUpload.CodingKeys.isFullsize) == true)
243250
.fetchCursor(tx.database)
244251

245252
while let uploadRecord = try cursor.next() {
246-
totalByteCount += UInt64(uploadRecord.estimatedByteCount)
253+
remainingByteCount += UInt64(uploadRecord.estimatedByteCount)
247254
if let existingMaxRowId = maxRowId {
248255
maxRowId = max(existingMaxRowId, uploadRecord.id!)
249256
} else {
250257
maxRowId = uploadRecord.id
251258
}
252259
}
253260

254-
return UploadQueueSnapshot(totalByteCount: totalByteCount, maxRowId: maxRowId)
261+
cursor = try QueuedBackupAttachmentUpload
262+
.filter(
263+
Column(QueuedBackupAttachmentUpload.CodingKeys.state)
264+
== QueuedBackupAttachmentUpload.State.done.rawValue
265+
)
266+
// Don't coun't thumbnails in progress
267+
.filter(Column(QueuedBackupAttachmentUpload.CodingKeys.isFullsize) == true)
268+
.fetchCursor(tx.database)
269+
270+
while let uploadRecord = try cursor.next() {
271+
completedByteCount += UInt64(uploadRecord.estimatedByteCount)
272+
}
273+
274+
return UploadQueueSnapshot(
275+
totalByteCount: remainingByteCount + completedByteCount,
276+
completedByteCount: completedByteCount,
277+
maxRowId: maxRowId
278+
)
255279
}
256280
}
257281
}
@@ -277,6 +301,7 @@ open class BackupAttachmentUploadProgressMock: BackupAttachmentUploadProgress {
277301
return BackupAttachmentUploadProgressObserver(
278302
queueSnapshot: .init(
279303
totalByteCount: 100,
304+
completedByteCount: 0,
280305
maxRowId: nil
281306
),
282307
sink: sink,

SignalServiceKit/Backups/Attachments/BackupAttachmentUploadQueueRunner.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -756,7 +756,9 @@ class BackupAttachmentUploadQueueRunnerImpl: BackupAttachmentUploadQueueRunner {
756756
}
757757

758758
func removeRecord(_ record: TaskRecord, tx: DBWriteTransaction) throws {
759-
try backupAttachmentUploadStore.removeQueuedUpload(
759+
// We don't actually delete records when finishing; we just mark
760+
// them done so we can still keep track of their byte count.
761+
try backupAttachmentUploadStore.markUploadDone(
760762
for: record.record.attachmentRowId,
761763
fullsize: record.record.isFullsize,
762764
tx: tx

SignalServiceKit/Backups/Attachments/BackupAttachmentUploadStore.swift

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public protocol BackupAttachmentUploadStore {
1313
/// If the same attachment is already enqueued, updates it to the greater of the old and new owner's timestamp.
1414
///
1515
/// Doesn't actually trigger an upload; callers must later call `fetchNextUpload`, complete the upload of
16-
/// both the fullsize and thumbnail as needed, and then call `removeQueuedUpload` once finished.
16+
/// both the fullsize and thumbnail as needed, and then call `markUploadDone` once finished.
1717
/// Note that the upload operation can (and will) be separately durably enqueued in AttachmentUploadQueue,
1818
/// that's fine and doesn't change how this queue works.
1919
func enqueue(
@@ -36,7 +36,7 @@ public protocol BackupAttachmentUploadStore {
3636
/// Remove the upload from the queue. Should be called once uploaded (or permanently failed).
3737
/// - returns the removed record, if any.
3838
@discardableResult
39-
func removeQueuedUpload(
39+
func markUploadDone(
4040
for attachmentId: Attachment.IDType,
4141
fullsize: Bool,
4242
tx: DBWriteTransaction
@@ -79,8 +79,12 @@ public class BackupAttachmentUploadStoreImpl: BackupAttachmentUploadStore {
7979
.fetchOne(db)
8080

8181
if var existingRecord {
82-
// Only update if the new one has higher priority; otherwise leave untouched.
83-
if newRecord.highestPriorityOwnerType.isHigherPriority(than: existingRecord.highestPriorityOwnerType) {
82+
// Only update if done or the new one has higher priority; otherwise leave untouched.
83+
let shouldUpdate = switch existingRecord.state {
84+
case .done: true
85+
case .ready: newRecord.highestPriorityOwnerType.isHigherPriority(than: existingRecord.highestPriorityOwnerType)
86+
}
87+
if shouldUpdate {
8488
existingRecord.highestPriorityOwnerType = newRecord.highestPriorityOwnerType
8589
try existingRecord.update(db)
8690
}
@@ -104,26 +108,29 @@ public class BackupAttachmentUploadStoreImpl: BackupAttachmentUploadStore {
104108
tx.database,
105109
sql: """
106110
SELECT * FROM \(QueuedBackupAttachmentUpload.databaseTableName)
107-
WHERE \(QueuedBackupAttachmentUpload.CodingKeys.isFullsize.rawValue) = ?
111+
WHERE
112+
\(QueuedBackupAttachmentUpload.CodingKeys.state.rawValue) = ?
113+
AND \(QueuedBackupAttachmentUpload.CodingKeys.isFullsize.rawValue) = ?
108114
ORDER BY
109115
\(QueuedBackupAttachmentUpload.CodingKeys.maxOwnerTimestamp.rawValue) DESC NULLS FIRST
110116
LIMIT ?
111117
""",
112-
arguments: [isFullsize, count]
118+
arguments: [QueuedBackupAttachmentUpload.State.ready.rawValue, isFullsize, count]
113119
)
114120
}
115121

116122
@discardableResult
117-
public func removeQueuedUpload(
123+
public func markUploadDone(
118124
for attachmentId: Attachment.IDType,
119125
fullsize: Bool,
120126
tx: DBWriteTransaction
121127
) throws -> QueuedBackupAttachmentUpload? {
122-
let record = try QueuedBackupAttachmentUpload
128+
var record = try QueuedBackupAttachmentUpload
123129
.filter(Column(QueuedBackupAttachmentUpload.CodingKeys.attachmentRowId) == attachmentId)
124130
.filter(Column(QueuedBackupAttachmentUpload.CodingKeys.isFullsize) == fullsize)
125131
.fetchOne(tx.database)
126-
try record?.delete(tx.database)
132+
record?.state = .done
133+
try record?.update(tx.database)
127134
return record
128135
}
129136
}

SignalServiceKit/Backups/Attachments/BackupAttachmentUploadStoreTests.swift

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ class BackupAttachmentUploadStoreTests: XCTestCase {
186186
)
187187

188188
dequeuedRecords = try store.fetchNextUploads(
189-
count: UInt(timestamps.count - 1),
189+
count: UInt(timestamps.count),
190190
isFullsize: true,
191191
tx: tx
192192
)
@@ -203,11 +203,12 @@ class BackupAttachmentUploadStoreTests: XCTestCase {
203203
})
204204

205205
// We should have gotten entries in timestamp order
206-
XCTAssertEqual(dequeuedTimestamps, Array(sortedTimestamps.prefix(sortedTimestamps.count - 1)))
206+
XCTAssertEqual(dequeuedTimestamps, Array(sortedTimestamps.prefix(sortedTimestamps.count)))
207207

208208
try db.write { tx in
209-
try dequeuedRecords.forEach { record in
210-
try store.removeQueuedUpload(
209+
// Finish all but one
210+
try dequeuedRecords.prefix(timestamps.count - 1).forEach { record in
211+
try store.markUploadDone(
211212
for: record.attachmentRowId,
212213
fullsize: true,
213214
tx: tx
@@ -216,9 +217,26 @@ class BackupAttachmentUploadStoreTests: XCTestCase {
216217
}
217218

218219
try db.read { tx in
219-
// all rows but one should be deleted.
220+
// Since not all rows are done, they should all stick around.
221+
let records = try QueuedBackupAttachmentUpload.fetchAll(tx.database)
222+
XCTAssertEqual(5, records.count)
223+
XCTAssertEqual(4, records.filter({ $0.state == .done }).count)
224+
XCTAssertEqual(1, records.filter({ $0.state == .ready }).count)
225+
}
226+
227+
try db.write { tx in
228+
// Finish the last one
229+
_ = try store.markUploadDone(
230+
for: dequeuedRecords.last!.attachmentRowId,
231+
fullsize: true,
232+
tx: tx
233+
)
234+
}
235+
236+
try db.read { tx in
237+
// all rows but one should now be deleted.
220238
XCTAssertEqual(
221-
1,
239+
0,
222240
try QueuedBackupAttachmentUpload.fetchCount(tx.database)
223241
)
224242
}
@@ -285,7 +303,7 @@ class BackupAttachmentUploadStoreTests: XCTestCase {
285303

286304
try db.write { tx in
287305
try dequeuedRecords.forEach { record in
288-
try store.removeQueuedUpload(
306+
try store.markUploadDone(
289307
for: record.attachmentRowId,
290308
fullsize: record.isFullsize,
291309
tx: tx
@@ -294,11 +312,12 @@ class BackupAttachmentUploadStoreTests: XCTestCase {
294312
}
295313

296314
try db.read { tx in
297-
// all fullsize rows should be deleted.
298-
XCTAssertEqual(
299-
4,
300-
try QueuedBackupAttachmentUpload.fetchCount(tx.database)
301-
)
315+
// All fullsize rows should be done
316+
let records = try QueuedBackupAttachmentUpload.fetchAll(tx.database)
317+
XCTAssertEqual(8, records.count)
318+
XCTAssertEqual(4, records.filter(\.isFullsize.negated).count)
319+
XCTAssertEqual(4, records.filter(\.isFullsize).count)
320+
XCTAssertEqual(4, records.filter({ $0.isFullsize && $0.state == .done }).count)
302321
}
303322
}
304323

SignalServiceKit/Backups/Attachments/BackupListMediaManager.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -879,16 +879,16 @@ public class BackupListMediaManagerImpl: BackupListMediaManager {
879879
// Since we now know this is uploaded, we can go ahead and remove
880880
// from the upload queue if present.
881881
if
882-
let removedRecord = try backupAttachmentUploadStore.removeQueuedUpload(
882+
let finishedRecord = try backupAttachmentUploadStore.markUploadDone(
883883
for: attachment.id,
884884
fullsize: isThumbnail.negated,
885885
tx: tx
886886
)
887887
{
888-
if removedRecord.isFullsize {
888+
if finishedRecord.isFullsize {
889889
Task {
890890
await backupAttachmentUploadProgress.didFinishUploadOfFullsizeAttachment(
891-
uploadRecord: removedRecord
891+
uploadRecord: finishedRecord
892892
)
893893
}
894894
}

SignalServiceKit/Backups/Attachments/QueuedBackupAttachmentUpload.swift

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public struct QueuedBackupAttachmentUpload: Codable, FetchableRecord, MutablePer
6464
/// Minimum timestamp at which this upload can be retried (if it failed in the past)
6565
public var minRetryTimestamp: UInt64
6666

67+
public var state: State
68+
6769
public enum OwnerType {
6870
case threadWallpaper
6971
/// Timestamp of the newest message that owns this attachment.
@@ -80,6 +82,11 @@ public struct QueuedBackupAttachmentUpload: Codable, FetchableRecord, MutablePer
8082
}
8183
}
8284

85+
public enum State: Int, Codable {
86+
case ready = 0
87+
case done = 1
88+
}
89+
8390
public init(
8491
id: Int64? = nil,
8592
attachmentRowId: Attachment.IDType,
@@ -88,6 +95,7 @@ public struct QueuedBackupAttachmentUpload: Codable, FetchableRecord, MutablePer
8895
estimatedByteCount: UInt32,
8996
numRetries: UInt32 = 0,
9097
minRetryTimestamp: UInt64 = 0,
98+
state: State = .ready,
9199
) {
92100
self.id = id
93101
self.attachmentRowId = attachmentRowId
@@ -96,6 +104,7 @@ public struct QueuedBackupAttachmentUpload: Codable, FetchableRecord, MutablePer
96104
self.estimatedByteCount = estimatedByteCount
97105
self.numRetries = numRetries
98106
self.minRetryTimestamp = minRetryTimestamp
107+
self.state = state
99108
}
100109

101110
// MARK: FetchableRecord
@@ -124,6 +133,7 @@ public struct QueuedBackupAttachmentUpload: Codable, FetchableRecord, MutablePer
124133
case estimatedByteCount
125134
case numRetries
126135
case minRetryTimestamp
136+
case state
127137
}
128138

129139
public init(from decoder: any Decoder) throws {
@@ -140,6 +150,7 @@ public struct QueuedBackupAttachmentUpload: Codable, FetchableRecord, MutablePer
140150
self.estimatedByteCount = try container.decode(UInt32.self, forKey: .estimatedByteCount)
141151
self.numRetries = try container.decode(UInt32.self, forKey: .numRetries)
142152
self.minRetryTimestamp = try container.decode(UInt64.self, forKey: .minRetryTimestamp)
153+
self.state = try container.decode(State.self, forKey: .state)
143154
}
144155

145156
public func encode(to encoder: any Encoder) throws {
@@ -156,5 +167,6 @@ public struct QueuedBackupAttachmentUpload: Codable, FetchableRecord, MutablePer
156167
try container.encode(estimatedByteCount, forKey: .estimatedByteCount)
157168
try container.encode(numRetries, forKey: .numRetries)
158169
try container.encode(minRetryTimestamp, forKey: .minRetryTimestamp)
170+
try container.encode(state, forKey: .state)
159171
}
160172
}

SignalServiceKit/Resources/schema.sql

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2295,6 +2295,7 @@ CREATE
22952295
,"isFullsize" BOOLEAN NOT NULL
22962296
,"numRetries" INTEGER NOT NULL DEFAULT 0
22972297
,"minRetryTimestamp" INTEGER NOT NULL DEFAULT 0
2298+
,"state" INTEGER DEFAULT 0
22982299
)
22992300
;
23002301

@@ -2393,13 +2394,6 @@ CREATE
23932394
)
23942395
;
23952396

2396-
CREATE
2397-
INDEX "index_BackupAttachmentUploadQueue_on_isFullsize_maxOwnerTimestamp"
2398-
ON "BackupAttachmentUploadQueue"("isFullsize"
2399-
,"maxOwnerTimestamp"
2400-
)
2401-
;
2402-
24032397
CREATE
24042398
TABLE
24052399
IF NOT EXISTS "Poll" (
@@ -2467,3 +2461,32 @@ CREATE
24672461
ON "PollVote"("optionId"
24682462
)
24692463
;
2464+
2465+
CREATE
2466+
INDEX "index_BackupAttachmentUploadQueue_on_state_isFullsize_maxOwnerTimestamp"
2467+
ON "BackupAttachmentUploadQueue"("state"
2468+
,"isFullsize"
2469+
,"maxOwnerTimestamp"
2470+
)
2471+
;
2472+
2473+
CREATE
2474+
TRIGGER __BackupAttachmentUploadQueue_au AFTER UPDATE
2475+
OF state
2476+
ON BackupAttachmentUploadQueue BEGIN DELETE
2477+
FROM
2478+
BackupAttachmentUploadQueue
2479+
WHERE
2480+
state = 1
2481+
AND NOT EXISTS (
2482+
SELECT
2483+
id
2484+
FROM
2485+
BackupAttachmentUploadQueue
2486+
WHERE
2487+
state = 0
2488+
)
2489+
;
2490+
2491+
END
2492+
;

0 commit comments

Comments
 (0)