Skip to content

Commit 247bb76

Browse files
update migrateNotificationFlags script to stream (#2972)
1 parent 491fcee commit 247bb76

File tree

1 file changed

+39
-33
lines changed

1 file changed

+39
-33
lines changed

bin/migrateNotificationFlags.ts

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import fastq from 'fastq';
12
import '../src/config';
23
import createOrGetConnection from '../src/db';
34
import {
@@ -8,6 +9,8 @@ import {
89
import { User } from '../src/entity/user/User';
910
import type { UserNotificationFlags } from '../src/entity/user/User';
1011

12+
const QUEUE_CONCURRENCY = 1;
13+
1114
interface UserData {
1215
id: string;
1316
notificationEmail: boolean;
@@ -73,50 +76,53 @@ function buildNotificationFlags(user: UserData): UserNotificationFlags {
7376
const con = await createOrGetConnection();
7477

7578
try {
76-
const userRepo = con.getRepository(User);
77-
78-
const users = await userRepo
79-
.createQueryBuilder('user')
80-
.select([
81-
'user.id',
82-
'user.notificationEmail',
83-
'user.followingEmail',
84-
'user.followNotifications',
85-
'user.awardEmail',
86-
'user.awardNotifications',
87-
'user.notificationFlags',
88-
])
89-
.orderBy('user.id')
90-
.limit(limit)
91-
.offset(offset)
92-
.getMany();
93-
9479
console.log(
95-
`Processing ${users.length} users (offset ${offset} to ${offset + users.length - 1})...`,
80+
`Processing users starting from offset ${offset} (limit ${limit})...`,
9681
);
9782

83+
let processedCount = 0;
84+
9885
await con.transaction(async (manager) => {
99-
for (const user of users) {
100-
const userData: UserData = {
101-
id: user.id,
102-
notificationEmail: user.notificationEmail,
103-
followingEmail: user.followingEmail,
104-
followNotifications: user.followNotifications,
105-
awardEmail: user.awardEmail,
106-
awardNotifications: user.awardNotifications,
107-
notificationFlags: user.notificationFlags,
108-
};
109-
110-
const newFlags = buildNotificationFlags(userData);
86+
const userRepo = manager.getRepository(User);
87+
88+
const builder = userRepo
89+
.createQueryBuilder('user')
90+
.select('user.id', 'id')
91+
.addSelect('user.notificationEmail', 'notificationEmail')
92+
.addSelect('user.followingEmail', 'followingEmail')
93+
.addSelect('user.followNotifications', 'followNotifications')
94+
.addSelect('user.awardEmail', 'awardEmail')
95+
.addSelect('user.awardNotifications', 'awardNotifications')
96+
.addSelect('user.notificationFlags', 'notificationFlags')
97+
.orderBy('user.id')
98+
.limit(limit)
99+
.offset(offset);
100+
101+
const stream = await builder.stream();
102+
103+
const insertQueue = fastq.promise(async (user: UserData) => {
104+
const newFlags = buildNotificationFlags(user);
111105

112106
await manager
113107
.getRepository(User)
114108
.update({ id: user.id }, { notificationFlags: newFlags });
115-
}
109+
110+
processedCount++;
111+
}, QUEUE_CONCURRENCY);
112+
113+
stream.on('data', (user: UserData) => {
114+
insertQueue.push(user);
115+
});
116+
117+
await new Promise((resolve, reject) => {
118+
stream.on('error', reject);
119+
stream.on('end', () => resolve(true));
120+
});
121+
await insertQueue.drained();
116122
});
117123

118124
console.log(
119-
`Migration completed successfully. Updated ${users.length} users (offset ${offset} to ${offset + users.length - 1}).`,
125+
`Migration completed successfully. Updated ${processedCount} users (offset ${offset} to ${offset + processedCount - 1}).`,
120126
);
121127
} catch (error) {
122128
console.error('Migration failed:', error);

0 commit comments

Comments
 (0)