Skip to content

Commit 533dfab

Browse files
committed
Refactor watch active observer into dedicated service
1 parent 45773aa commit 533dfab

File tree

3 files changed

+46
-33
lines changed

3 files changed

+46
-33
lines changed

packages/attachments/src/AttachmentContext.ts

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,36 +12,6 @@ export class AttachmentContext {
1212
this.logger = logger;
1313
}
1414

15-
watchActiveAttachments(onUpdate: () => void): AbortController {
16-
const abortController = new AbortController();
17-
this.db.watchWithCallback(
18-
/* sql */
19-
`
20-
SELECT
21-
*
22-
FROM
23-
${this.tableName}
24-
WHERE
25-
state = ?
26-
OR state = ?
27-
OR state = ?
28-
ORDER BY
29-
timestamp ASC
30-
`,
31-
[AttachmentState.QUEUED_UPLOAD, AttachmentState.QUEUED_DOWNLOAD, AttachmentState.QUEUED_DELETE],
32-
{
33-
onResult: () => {
34-
onUpdate();
35-
}
36-
},
37-
{
38-
signal: abortController.signal
39-
}
40-
);
41-
42-
return abortController;
43-
}
44-
4515
async getActiveAttachments(): Promise<any[]> {
4616
const attachments = await this.db.getAll(
4717
/* sql */

packages/attachments/src/AttachmentQueue.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { RemoteStorageAdapter } from './RemoteStorageAdapter.js';
55
import { ATTACHMENT_TABLE, AttachmentRecord, AttachmentState } from './Schema.js';
66
import { StorageService } from './StorageService.js';
77
import { WatchedAttachmentItem } from './WatchedAttachmentItem.js';
8+
import { AttachmentService } from './AttachmentService.js';
89

910
export class AttachmentQueue {
1011
periodicSyncTimer?: ReturnType<typeof setInterval>;
@@ -20,6 +21,7 @@ export class AttachmentQueue {
2021
downloadAttachments: boolean = true;
2122
watchActiveAbortController?: AbortController;
2223
archivedCacheLimit: number;
24+
attachmentService: AttachmentService;
2325

2426
constructor({
2527
db,
@@ -51,6 +53,7 @@ export class AttachmentQueue {
5153
this.tableName = tableName;
5254
this.storageService = new StorageService(this.context, localStorage, remoteStorage, logger ?? db.logger);
5355
this.syncInterval = syncInterval;
56+
this.attachmentService = new AttachmentService(tableName, db);
5457
this.syncThrottleDuration = syncThrottleDuration;
5558
this.downloadAttachments = downloadAttachments;
5659
this.archivedCacheLimit = archivedCacheLimit;
@@ -69,8 +72,10 @@ export class AttachmentQueue {
6972
}, this.syncInterval);
7073

7174
// Sync storage when there is a change in active attachments
72-
this.watchActiveAbortController = this.context.watchActiveAttachments(async () => {
73-
await this.syncStorage();
75+
this.attachmentService.watchActiveAttachments().registerListener({
76+
onDiff: async () => {
77+
await this.syncStorage();
78+
}
7479
});
7580

7681
// Process attachments when there is a change in watched attachments
@@ -166,7 +171,7 @@ export class AttachmentQueue {
166171
async stopSync(): Promise<void> {
167172
clearInterval(this.periodicSyncTimer);
168173
this.periodicSyncTimer = undefined;
169-
this.watchActiveAbortController?.abort();
174+
await this.attachmentService.watchActiveAttachments().close();
170175
}
171176

172177
async saveFile({
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { AbstractPowerSyncDatabase, DifferentialWatchedQuery } from '@powersync/common';
2+
import { AttachmentRecord, AttachmentState } from './Schema.js';
3+
4+
/**
5+
* Service for querying and watching attachment records in the database.
6+
*/
7+
export class AttachmentService {
8+
constructor(
9+
private tableName: string = 'attachments',
10+
private db: AbstractPowerSyncDatabase
11+
) {}
12+
13+
/**
14+
* Creates a differential watch query for active attachments requiring synchronization.
15+
* @returns Watch query that emits changes for queued uploads, downloads, and deletes
16+
*/
17+
watchActiveAttachments(): DifferentialWatchedQuery<AttachmentRecord> {
18+
const watch = this.db
19+
.query<AttachmentRecord>({
20+
sql: /* sql */ `
21+
SELECT
22+
*
23+
FROM
24+
${this.tableName}
25+
WHERE
26+
state = ?
27+
OR state = ?
28+
OR state = ?
29+
ORDER BY
30+
timestamp ASC
31+
`,
32+
parameters: [AttachmentState.QUEUED_UPLOAD, AttachmentState.QUEUED_DOWNLOAD, AttachmentState.QUEUED_DELETE]
33+
})
34+
.differentialWatch();
35+
36+
return watch;
37+
}
38+
}

0 commit comments

Comments
 (0)