Skip to content

Commit 4027f46

Browse files
Fix memory usage by batching syncTasks
1 parent ab1e6f8 commit 4027f46

File tree

8 files changed

+149
-70
lines changed

8 files changed

+149
-70
lines changed

ts/background.ts

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ import { AttachmentDownloadManager } from './jobs/AttachmentDownloadManager';
186186
import { onCallLinkUpdateSync } from './util/onCallLinkUpdateSync';
187187
import { CallMode } from './types/CallDisposition';
188188
import type { SyncTaskType } from './util/syncTasks';
189-
import { queueSyncTasks } from './util/syncTasks';
189+
import { queueSyncTasks, runAllSyncTasks } from './util/syncTasks';
190190
import type { ViewSyncTaskType } from './messageModifiers/ViewSyncs';
191191
import type { ReceiptSyncTaskType } from './messageModifiers/MessageReceipts';
192192
import type { ReadSyncTaskType } from './messageModifiers/ReadSyncs';
@@ -1469,15 +1469,7 @@ export async function startApp(): Promise<void> {
14691469
}
14701470
log.info('Expiration start timestamp cleanup: complete');
14711471

1472-
{
1473-
log.info('Startup/syncTasks: Fetching tasks');
1474-
const syncTasks = await DataWriter.getAllSyncTasks();
1475-
1476-
log.info(`Startup/syncTasks: Queueing ${syncTasks.length} sync tasks`);
1477-
await queueSyncTasks(syncTasks, DataWriter.removeSyncTaskById);
1478-
1479-
log.info('Startup/syncTasks: Done');
1480-
}
1472+
await runAllSyncTasks();
14811473

14821474
log.info('listening for registration events');
14831475
window.Whisper.events.on('registration_done', () => {

ts/sql/Interface.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -845,7 +845,10 @@ type WritableInterface = {
845845

846846
removeSyncTaskById: (id: string) => void;
847847
saveSyncTasks: (tasks: Array<SyncTaskType>) => void;
848-
getAllSyncTasks: () => Array<SyncTaskType>;
848+
dequeueOldestSyncTasks: (previousRowId: number | null) => {
849+
tasks: Array<SyncTaskType>;
850+
lastRowId: number | null;
851+
};
849852

850853
getAllUnprocessedIds: () => Array<string>;
851854
getUnprocessedByIdsAndIncrementAttempts: (

ts/sql/Server.ts

Lines changed: 53 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@ import {
119119
} from '../util/search';
120120
import type { SyncTaskType } from '../util/syncTasks';
121121
import { MAX_SYNC_TASK_ATTEMPTS } from '../util/syncTasks.types';
122-
import { isMoreRecentThan } from '../util/timestamp';
123122
import type {
124123
AdjacentMessagesByConversationOptionsType,
125124
BackupCdnMediaObjectType,
@@ -475,7 +474,7 @@ export const DataWriter: ServerWritableInterface = {
475474

476475
removeSyncTaskById,
477476
saveSyncTasks,
478-
getAllSyncTasks,
477+
dequeueOldestSyncTasks,
479478

480479
getUnprocessedByIdsAndIncrementAttempts,
481480
getAllUnprocessedIds,
@@ -2158,47 +2157,68 @@ function saveSyncTask(db: WritableDB, task: SyncTaskType): void {
21582157

21592158
db.prepare(query).run(parameters);
21602159
}
2161-
export function getAllSyncTasks(db: WritableDB): Array<SyncTaskType> {
2160+
2161+
export function dequeueOldestSyncTasks(
2162+
db: WritableDB,
2163+
previousRowId: number | null
2164+
): { tasks: Array<SyncTaskType>; lastRowId: number | null } {
21622165
return db.transaction(() => {
2163-
const [selectAllQuery] = sql`
2164-
SELECT * FROM syncTasks ORDER BY createdAt ASC, sentAt ASC, id ASC
2166+
const orderBy = sqlFragment`ORDER BY rowid ASC`;
2167+
const limit = sqlFragment`LIMIT 10000`;
2168+
const predicate = sqlFragment`rowid > ${previousRowId ?? 0}`;
2169+
2170+
const [deleteOldQuery, deleteOldParams] = sql`
2171+
DELETE FROM syncTasks
2172+
WHERE
2173+
attempts >= ${MAX_SYNC_TASK_ATTEMPTS} AND
2174+
createdAt < ${Date.now() - durations.WEEK}
21652175
`;
21662176

2167-
const rows = db.prepare(selectAllQuery).all();
2177+
const result = db.prepare(deleteOldQuery).run(deleteOldParams);
21682178

2169-
const tasks: Array<SyncTaskType> = rows.map(row => ({
2170-
...row,
2171-
data: jsonToObject(row.data),
2172-
}));
2179+
if (result.changes > 0) {
2180+
logger.info(
2181+
`dequeueOldestSyncTasks: Deleted ${result.changes} expired sync tasks`
2182+
);
2183+
}
21732184

2174-
const [query] = sql`
2175-
UPDATE syncTasks
2176-
SET attempts = attempts + 1
2185+
const [selectAllQuery, selectAllParams] = sql`
2186+
SELECT rowid, * FROM syncTasks
2187+
WHERE ${predicate}
2188+
${orderBy}
2189+
${limit}
21772190
`;
2178-
db.prepare(query).run();
21792191

2180-
const [toDelete, toReturn] = partition(tasks, task => {
2181-
if (
2182-
isNormalNumber(task.attempts) &&
2183-
task.attempts < MAX_SYNC_TASK_ATTEMPTS
2184-
) {
2185-
return false;
2186-
}
2187-
if (isMoreRecentThan(task.createdAt, durations.WEEK)) {
2188-
return false;
2189-
}
2192+
const rows = db.prepare(selectAllQuery).all(selectAllParams);
2193+
if (!rows.length) {
2194+
return { tasks: [], lastRowId: null };
2195+
}
21902196

2191-
return true;
2197+
const firstRowId = rows.at(0)?.rowid;
2198+
const lastRowId = rows.at(-1)?.rowid;
2199+
2200+
strictAssert(firstRowId, 'dequeueOldestSyncTasks: firstRowId is null');
2201+
strictAssert(lastRowId, 'dequeueOldestSyncTasks: lastRowId is null');
2202+
2203+
const tasks: Array<SyncTaskType> = rows.map(row => {
2204+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
2205+
const { rowid: _rowid, ...rest } = row;
2206+
return {
2207+
...rest,
2208+
data: jsonToObject(row.data),
2209+
};
21922210
});
21932211

2194-
if (toDelete.length > 0) {
2195-
logger.warn(`getAllSyncTasks: Removing ${toDelete.length} expired tasks`);
2196-
toDelete.forEach(task => {
2197-
removeSyncTaskById(db, task.id);
2198-
});
2199-
}
2212+
const [updateQuery, updateParams] = sql`
2213+
UPDATE syncTasks
2214+
SET attempts = attempts + 1
2215+
WHERE rowid >= ${firstRowId}
2216+
AND rowid <= ${lastRowId}
2217+
`;
2218+
2219+
db.prepare(updateQuery).run(updateParams);
22002220

2201-
return toReturn;
2221+
return { tasks, lastRowId };
22022222
})();
22032223
}
22042224

@@ -7498,7 +7518,7 @@ function enableMessageInsertTriggersAndBackfill(db: WritableDB): void {
74987518
VALUES
74997519
(new.rowid, new.body);
75007520
END;
7501-
7521+
75027522
DROP TRIGGER IF EXISTS messages_on_insert_insert_mentions;
75037523
CREATE TRIGGER messages_on_insert_insert_mentions AFTER INSERT ON messages
75047524
BEGIN
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright 2024 Signal Messenger, LLC
2+
// SPDX-License-Identifier: AGPL-3.0-only
3+
import type { Database } from '@signalapp/better-sqlite3';
4+
import type { LoggerType } from '../../types/Logging';
5+
import { sql } from '../util';
6+
7+
export const version = 1260;
8+
9+
export function updateToSchemaVersion1260(
10+
currentVersion: number,
11+
db: Database,
12+
logger: LoggerType
13+
): void {
14+
if (currentVersion >= 1260) {
15+
return;
16+
}
17+
18+
db.transaction(() => {
19+
const [query] = sql`
20+
DROP INDEX IF EXISTS syncTasks_order;
21+
CREATE INDEX syncTasks_delete ON syncTasks (attempts DESC);
22+
`;
23+
24+
db.exec(query);
25+
26+
db.pragma('user_version = 1260');
27+
})();
28+
29+
logger.info('updateToSchemaVersion1260: success!');
30+
}

ts/sql/migrations/index.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,11 @@ import { updateToSchemaVersion1210 } from './1210-call-history-started-id';
101101
import { updateToSchemaVersion1220 } from './1220-blob-sessions';
102102
import { updateToSchemaVersion1230 } from './1230-call-links-admin-key-index';
103103
import { updateToSchemaVersion1240 } from './1240-defunct-call-links-table';
104+
import { updateToSchemaVersion1250 } from './1250-defunct-call-links-storage';
104105
import {
105-
updateToSchemaVersion1250,
106+
updateToSchemaVersion1260,
106107
version as MAX_VERSION,
107-
} from './1250-defunct-call-links-storage';
108+
} from './1260-sync-tasks-rowid';
108109
import { DataWriter } from '../Server';
109110

110111
function updateToSchemaVersion1(
@@ -2076,6 +2077,7 @@ export const SCHEMA_VERSIONS = [
20762077
updateToSchemaVersion1230,
20772078
updateToSchemaVersion1240,
20782079
updateToSchemaVersion1250,
2080+
updateToSchemaVersion1260,
20792081
];
20802082

20812083
export class DBVersionFromFutureError extends Error {

ts/sql/util.ts

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import { isNumber, last } from 'lodash';
66
import type { ReadableDB, WritableDB } from './Interface';
7+
import type { LoggerType } from '../types/Logging';
78

89
export type EmptyQuery = [];
910
export type ArrayQuery = Array<ReadonlyArray<null | number | bigint | string>>;
@@ -162,17 +163,6 @@ export function sql(
162163
return [fragment, fragmentParams];
163164
}
164165

165-
type QueryPlanRow = Readonly<{
166-
id: number;
167-
parent: number;
168-
details: string;
169-
}>;
170-
171-
type QueryPlan = Readonly<{
172-
query: string;
173-
plan: ReadonlyArray<QueryPlanRow>;
174-
}>;
175-
176166
/**
177167
* Returns typed objects of the query plan for the given query.
178168
*
@@ -189,11 +179,19 @@ type QueryPlan = Readonly<{
189179
*/
190180
export function explainQueryPlan(
191181
db: ReadableDB,
182+
logger: LoggerType,
192183
template: QueryTemplate
193-
): QueryPlan {
184+
): QueryTemplate {
194185
const [query, params] = template;
195186
const plan = db.prepare(`EXPLAIN QUERY PLAN ${query}`).all(params);
196-
return { query, plan };
187+
logger.info('EXPLAIN QUERY PLAN');
188+
for (const line of query.split('\n')) {
189+
logger.info(line);
190+
}
191+
for (const row of plan) {
192+
logger.info(`id=${row.id}, parent=${row.parent}, detail=${row.detail}`);
193+
}
194+
return [query, params];
197195
}
198196

199197
//

ts/test-node/sql/migration_1060_test.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { assert } from 'chai';
55
import { v4 as generateGuid } from 'uuid';
66

77
import {
8-
getAllSyncTasks,
8+
dequeueOldestSyncTasks,
99
getMostRecentAddressableMessages,
1010
removeSyncTaskById,
1111
saveSyncTasks,
@@ -217,23 +217,23 @@ describe('SQL/updateToSchemaVersion1060', () => {
217217

218218
saveSyncTasks(db, expected);
219219

220-
const actual = getAllSyncTasks(db);
221-
assert.deepEqual(expected, actual, 'before delete');
220+
const actual = dequeueOldestSyncTasks(db, null);
221+
assert.deepEqual(expected, actual.tasks, 'before delete');
222222

223223
removeSyncTaskById(db, expected[1].id);
224224

225-
const actualAfterDelete = getAllSyncTasks(db);
225+
const actualAfterDelete = dequeueOldestSyncTasks(db, null);
226226
assert.deepEqual(
227227
[
228228
{ ...expected[0], attempts: 2 },
229229
{ ...expected[2], attempts: 4 },
230230
],
231-
actualAfterDelete,
231+
actualAfterDelete.tasks,
232232
'after delete'
233233
);
234234
});
235235

236-
it('getAllSyncTasksSync expired tasks', () => {
236+
it('dequeueOldestSyncTasks expired tasks', () => {
237237
const now = Date.now();
238238
const twoWeeksAgo = now - WEEK * 2;
239239
const expected: Array<SyncTaskType> = [
@@ -289,10 +289,10 @@ describe('SQL/updateToSchemaVersion1060', () => {
289289

290290
saveSyncTasks(db, expected);
291291

292-
const actual = getAllSyncTasks(db);
292+
const actual = dequeueOldestSyncTasks(db, null);
293293

294-
assert.lengthOf(actual, 3);
295-
assert.deepEqual([expected[1], expected[2], expected[3]], actual);
294+
assert.lengthOf(actual.tasks, 3);
295+
assert.deepEqual([expected[1], expected[2], expected[3]], actual.tasks);
296296
});
297297
});
298298
});

ts/util/syncTasks.ts

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import {
3131
viewSyncTaskSchema,
3232
} from '../messageModifiers/ViewSyncs';
3333
import { safeParseUnknown } from './schemas';
34+
import { DataWriter } from '../sql/Client';
3435

3536
const syncTaskDataSchema = z.union([
3637
deleteMessageSchema,
@@ -85,7 +86,7 @@ export async function queueSyncTasks(
8586
log.error(`${innerLogId}: Schema not found. Deleting.`);
8687
// eslint-disable-next-line no-await-in-loop
8788
await removeSyncTaskById(id);
88-
return;
89+
continue;
8990
}
9091
const parseResult = safeParseUnknown(syncTaskDataSchema, data);
9192
if (!parseResult.success) {
@@ -94,7 +95,7 @@ export async function queueSyncTasks(
9495
);
9596
// eslint-disable-next-line no-await-in-loop
9697
await removeSyncTaskById(id);
97-
return;
98+
continue;
9899
}
99100

100101
const { data: parsed } = parseResult;
@@ -223,4 +224,37 @@ export async function queueSyncTasks(
223224
await removeSyncTaskById(id);
224225
}
225226
}
227+
228+
// Note: There may still be some tasks in the database, but we expect to be
229+
// called again some time later to process them.
230+
}
231+
232+
async function processSyncTasksBatch(
233+
logId: string,
234+
previousRowId: number | null
235+
): Promise<number | null> {
236+
log.info('syncTasks: Fetching tasks');
237+
const result = await DataWriter.dequeueOldestSyncTasks(previousRowId);
238+
const syncTasks = result.tasks;
239+
240+
if (syncTasks.length === 0) {
241+
log.info(`${logId}/syncTasks: No sync tasks to process, stopping`);
242+
} else {
243+
log.info(`${logId}/syncTasks: Queueing ${syncTasks.length} sync tasks`);
244+
await queueSyncTasks(syncTasks, DataWriter.removeSyncTaskById);
245+
}
246+
247+
return result.lastRowId;
248+
}
249+
250+
const A_TICK = Promise.resolve();
251+
252+
export async function runAllSyncTasks(): Promise<void> {
253+
let lastRowId: number | null = null;
254+
do {
255+
// eslint-disable-next-line no-await-in-loop
256+
lastRowId = await processSyncTasksBatch('Startup', lastRowId);
257+
// eslint-disable-next-line no-await-in-loop
258+
await A_TICK;
259+
} while (lastRowId != null);
226260
}

0 commit comments

Comments
 (0)