Skip to content

Commit b0e9e8d

Browse files
feat: sync digest to notification flag (#2990)
1 parent bdd699b commit b0e9e8d

File tree

1 file changed

+85
-0
lines changed

1 file changed

+85
-0
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import fastq from 'fastq';
2+
import '../src/config';
3+
import createOrGetConnection from '../src/db';
4+
5+
const QUEUE_CONCURRENCY = 1;
6+
7+
(async (): Promise<void> => {
8+
const limitArgument = process.argv[2];
9+
const offsetArgument = process.argv[3];
10+
11+
if (!limitArgument || !offsetArgument) {
12+
throw new Error('limit and offset arguments are required');
13+
}
14+
15+
const limit = +limitArgument;
16+
if (Number.isNaN(limit)) {
17+
throw new Error('limit argument is invalid, it should be a number');
18+
}
19+
20+
const offset = +offsetArgument;
21+
if (Number.isNaN(offset)) {
22+
throw new Error('offset argument is invalid, it should be a number');
23+
}
24+
25+
const con = await createOrGetConnection();
26+
27+
try {
28+
console.log(
29+
`Processing users starting from offset ${offset} (limit ${limit})...`,
30+
);
31+
32+
let processedCount = 0;
33+
34+
await con.transaction(async (manager) => {
35+
const digestRepo = manager.getRepository('UserPersonalizedDigest');
36+
37+
const builder = digestRepo
38+
.createQueryBuilder('upd')
39+
.select('upd.userId', 'userId')
40+
.where('upd.type = :briefType')
41+
.andWhere('upd.flags ->> :emailFlag = :emailValue')
42+
.setParameters({
43+
briefType: 'brief',
44+
emailFlag: 'email',
45+
emailValue: 'true',
46+
})
47+
.orderBy('upd.lastSendDate', 'DESC', 'NULLS LAST')
48+
.limit(limit)
49+
.offset(offset);
50+
51+
const stream = await builder.stream();
52+
53+
const updateQueue = fastq.promise(async (userId: string) => {
54+
await manager.query(
55+
`UPDATE public.user
56+
SET "notificationFlags" = jsonb_set("notificationFlags", '{briefing_ready,email}', '"subscribed"')
57+
WHERE id = $1
58+
AND "notificationFlags"->'briefing_ready'->>'email' = 'muted'`,
59+
[userId],
60+
);
61+
62+
processedCount++;
63+
}, QUEUE_CONCURRENCY);
64+
65+
stream.on('data', (digest: { userId: string }) => {
66+
updateQueue.push(digest.userId);
67+
});
68+
69+
await new Promise((resolve, reject) => {
70+
stream.on('error', reject);
71+
stream.on('end', () => resolve(true));
72+
});
73+
await updateQueue.drained();
74+
});
75+
76+
console.log(
77+
`Update was successful. Updated ${processedCount} users (offset ${offset} to ${offset + processedCount - 1}).`,
78+
);
79+
} catch (error) {
80+
console.error('Update failed:', error);
81+
throw error;
82+
}
83+
84+
process.exit(0);
85+
})();

0 commit comments

Comments
 (0)