Skip to content

Commit 33b597d

Browse files
authored
fix: properly close iterators (#9705)
Signed-off-by: Alexander Onnikov <[email protected]>
1 parent bdd6d66 commit 33b597d

File tree

7 files changed

+145
-131
lines changed

7 files changed

+145
-131
lines changed

dev/tool/src/clean.ts

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -179,21 +179,25 @@ export async function fixMinioBW (
179179
const from = new Date(new Date().setDate(new Date().getDate() - 7)).getTime()
180180
const list = await storageService.listStream(ctx, wsIds)
181181
let removed = 0
182-
while (true) {
183-
const objs = await list.next()
184-
if (objs.length === 0) {
185-
break
186-
}
187-
for (const obj of objs) {
188-
if (obj.modifiedOn < from) continue
189-
if ((obj._id as string).includes('%preview%')) {
190-
await storageService.remove(ctx, wsIds, [obj._id])
191-
removed++
192-
if (removed % 100 === 0) {
193-
console.log('removed: ', removed)
182+
try {
183+
while (true) {
184+
const objs = await list.next()
185+
if (objs.length === 0) {
186+
break
187+
}
188+
for (const obj of objs) {
189+
if (obj.modifiedOn < from) continue
190+
if ((obj._id as string).includes('%preview%')) {
191+
await storageService.remove(ctx, wsIds, [obj._id])
192+
removed++
193+
if (removed % 100 === 0) {
194+
console.log('removed: ', removed)
195+
}
194196
}
195197
}
196198
}
199+
} finally {
200+
await list.close()
197201
}
198202
console.log('FINISH, removed: ', removed)
199203
}

dev/tool/src/storage.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ async function processAdapter (
215215
}
216216
printStats()
217217
} finally {
218-
await iterator.close()
218+
await Promise.all([iterator.close(), targetIterator.close()])
219219
}
220220
}
221221

models/contact/src/migration.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -402,11 +402,6 @@ async function ensureGlobalPersonsForLocalAccounts (client: MigrationClient): Pr
402402
async function createUserProfiles (client: MigrationClient): Promise<void> {
403403
client.logger.log('creating user profiles for persons...', {})
404404

405-
const personsIterator = await client.traverse<Person>(DOMAIN_CONTACT, {
406-
_class: contact.class.Person,
407-
profile: { $exists: false }
408-
})
409-
410405
const lastCard = (
411406
await client.find<Card>(
412407
DOMAIN_CARD,
@@ -416,6 +411,11 @@ async function createUserProfiles (client: MigrationClient): Promise<void> {
416411
)[0]
417412
let prevRank = lastCard?.rank
418413

414+
const personsIterator = await client.traverse<Person>(DOMAIN_CONTACT, {
415+
_class: contact.class.Person,
416+
profile: { $exists: false }
417+
})
418+
419419
try {
420420
while (true) {
421421
const docs = await personsIterator.next(200)

models/server-activity/src/migration.ts

Lines changed: 91 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -119,121 +119,125 @@ async function createDocUpdateMessages (client: MigrationClient): Promise<void>
119119

120120
async function generateFor (_class: Ref<Class<Doc>>, documents: MigrationIterator<Doc>): Promise<void> {
121121
const classNotFound = new Set<string>()
122-
while (true) {
123-
const docs = await documents.next(100)
124122

125-
if (docs == null || docs.length === 0) {
126-
break
127-
}
128-
const allTransactions = await getAllObjectTransactions(
129-
txClient,
130-
_class,
131-
docs.map((it) => it._id)
132-
)
123+
try {
124+
while (true) {
125+
const docs = await documents.next(100)
133126

134-
// We need to find parent collection objects if missing
135-
const byClass = new Map<Ref<Class<Doc>>, Set<Ref<Doc>>>()
136-
for (const vv of allTransactions.values()) {
137-
for (const v of vv) {
138-
try {
139-
const _cl = client.hierarchy.getBaseClass(v.objectClass)
140-
const s = byClass.get(_cl) ?? new Set()
141-
s.add(v.objectId)
142-
byClass.set(_cl, s)
143-
} catch {
144-
const has = classNotFound.has(v.objectClass)
145-
if (!has) {
146-
classNotFound.add(v.objectClass)
147-
console.log('class not found:', v.objectClass)
148-
}
149-
continue
150-
}
127+
if (docs == null || docs.length === 0) {
128+
break
129+
}
130+
const allTransactions = await getAllObjectTransactions(
131+
txClient,
132+
_class,
133+
docs.map((it) => it._id)
134+
)
151135

152-
if (v.attachedToClass !== undefined && v.attachedTo !== undefined) {
136+
// We need to find parent collection objects if missing
137+
const byClass = new Map<Ref<Class<Doc>>, Set<Ref<Doc>>>()
138+
for (const vv of allTransactions.values()) {
139+
for (const v of vv) {
153140
try {
154-
const _cl = client.hierarchy.getBaseClass(v.attachedToClass)
141+
const _cl = client.hierarchy.getBaseClass(v.objectClass)
155142
const s = byClass.get(_cl) ?? new Set()
156-
s.add(v.attachedTo)
143+
s.add(v.objectId)
157144
byClass.set(_cl, s)
158145
} catch {
159-
const objClass = v.attachedToClass
160-
const has = classNotFound.has(objClass)
146+
const has = classNotFound.has(v.objectClass)
161147
if (!has) {
162-
classNotFound.add(objClass)
163-
console.log('class not found:', objClass)
148+
classNotFound.add(v.objectClass)
149+
console.log('class not found:', v.objectClass)
150+
}
151+
continue
152+
}
153+
154+
if (v.attachedToClass !== undefined && v.attachedTo !== undefined) {
155+
try {
156+
const _cl = client.hierarchy.getBaseClass(v.attachedToClass)
157+
const s = byClass.get(_cl) ?? new Set()
158+
s.add(v.attachedTo)
159+
byClass.set(_cl, s)
160+
} catch {
161+
const objClass = v.attachedToClass
162+
const has = classNotFound.has(objClass)
163+
if (!has) {
164+
classNotFound.add(objClass)
165+
console.log('class not found:', objClass)
166+
}
164167
}
165168
}
166169
}
167170
}
168-
}
169171

170-
const docIds: Map<Ref<Doc>, Doc | null> = toIdMap(docs)
172+
const docIds: Map<Ref<Doc>, Doc | null> = toIdMap(docs)
171173

172-
for (const [_class, classDocs] of byClass.entries()) {
173-
const ids: Ref<Doc>[] = Array.from(classDocs.values()).filter((it) => !docIds.has(it))
174-
if (ids.length > 0) {
175-
for (const di of ids) {
176-
docIds.set(di, null)
177-
}
178-
const edocs = await txClient.findAll(txClient.ctx, _class, { _id: { $in: ids } })
179-
for (const ed of edocs) {
180-
docIds.set(ed._id, ed)
174+
for (const [_class, classDocs] of byClass.entries()) {
175+
const ids: Ref<Doc>[] = Array.from(classDocs.values()).filter((it) => !docIds.has(it))
176+
if (ids.length > 0) {
177+
for (const di of ids) {
178+
docIds.set(di, null)
179+
}
180+
const edocs = await txClient.findAll(txClient.ctx, _class, { _id: { $in: ids } })
181+
for (const ed of edocs) {
182+
docIds.set(ed._id, ed)
183+
}
181184
}
182185
}
183-
}
184186

185-
const docCache = {
186-
docs: docIds,
187-
transactions: allTransactions
188-
}
189-
const txIds = new Set<Ref<Tx>>()
190-
for (const d of docs) {
191-
processed += 1
192-
if (processed % 1000 === 0) {
193-
console.log('processed', processed)
187+
const docCache = {
188+
docs: docIds,
189+
transactions: allTransactions
194190
}
195-
const transactions = allTransactions.get(d._id) ?? []
196-
for (const tx of transactions) {
197-
txIds.add(tx._id)
191+
const txIds = new Set<Ref<Tx>>()
192+
for (const d of docs) {
193+
processed += 1
194+
if (processed % 1000 === 0) {
195+
console.log('processed', processed)
196+
}
197+
const transactions = allTransactions.get(d._id) ?? []
198+
for (const tx of transactions) {
199+
txIds.add(tx._id)
200+
}
198201
}
199-
}
200202

201-
const ids = (
202-
await client.find<DocUpdateMessage>(
203-
DOMAIN_ACTIVITY,
204-
{ _class: activity.class.DocUpdateMessage, txId: { $in: Array.from(txIds) as Ref<TxCUD<Doc>>[] } },
205-
{ projection: { _id: 1, txId: 1 } }
206-
)
207-
).map((p) => p.txId as Ref<Tx>)
203+
const ids = (
204+
await client.find<DocUpdateMessage>(
205+
DOMAIN_ACTIVITY,
206+
{ _class: activity.class.DocUpdateMessage, txId: { $in: Array.from(txIds) as Ref<TxCUD<Doc>>[] } },
207+
{ projection: { _id: 1, txId: 1 } }
208+
)
209+
).map((p) => p.txId as Ref<Tx>)
208210

209-
const existsMessages = new Set(ids)
211+
const existsMessages = new Set(ids)
210212

211-
for (const d of docs) {
212-
processed += 1
213-
if (processed % 1000 === 0) {
214-
console.log('processed', processed)
215-
}
216-
const transactions = allTransactions.get(d._id) ?? []
217-
for (const tx of transactions) {
218-
if (!client.hierarchy.hasClass(tx.objectClass)) {
219-
const objClass = tx.objectClass
220-
const has = classNotFound.has(objClass)
221-
if (!has) {
222-
classNotFound.add(objClass)
223-
console.log('class not found:', objClass)
224-
}
225-
continue
213+
for (const d of docs) {
214+
processed += 1
215+
if (processed % 1000 === 0) {
216+
console.log('processed', processed)
226217
}
218+
const transactions = allTransactions.get(d._id) ?? []
219+
for (const tx of transactions) {
220+
if (!client.hierarchy.hasClass(tx.objectClass)) {
221+
const objClass = tx.objectClass
222+
const has = classNotFound.has(objClass)
223+
if (!has) {
224+
classNotFound.add(objClass)
225+
console.log('class not found:', objClass)
226+
}
227+
continue
228+
}
227229

228-
try {
229-
await generateDocUpdateMessageByTx(tx, notificationControl, client, docCache, existsMessages)
230-
} catch (e: any) {
231-
console.error('error processing:', d._id, e.stack)
230+
try {
231+
await generateDocUpdateMessageByTx(tx, notificationControl, client, docCache, existsMessages)
232+
} catch (e: any) {
233+
console.error('error processing:', d._id, e.stack)
234+
}
232235
}
233236
}
234237
}
238+
} finally {
239+
await documents.close()
235240
}
236-
await documents.close()
237241
}
238242

239243
for (const activityClass of activityDocClasses) {

packages/storage/src/index.ts

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -182,25 +182,28 @@ export async function removeAllObjects (
182182
ctx.warn('removing all objects from workspace', wsIds)
183183
// We need to list all files and delete them
184184
const iterator = await storage.listStream(ctx, wsIds)
185-
let bulk: string[] = []
186-
while (true) {
187-
const objs = await iterator.next()
188-
if (objs.length === 0) {
189-
break
190-
}
191-
for (const obj of objs) {
192-
bulk.push(obj._id)
193-
if (bulk.length > 50) {
194-
await storage.remove(ctx, wsIds, bulk)
195-
bulk = []
185+
try {
186+
let bulk: string[] = []
187+
while (true) {
188+
const objs = await iterator.next()
189+
if (objs.length === 0) {
190+
break
191+
}
192+
for (const obj of objs) {
193+
bulk.push(obj._id)
194+
if (bulk.length > 50) {
195+
await storage.remove(ctx, wsIds, bulk)
196+
bulk = []
197+
}
196198
}
197199
}
200+
if (bulk.length > 0) {
201+
await storage.remove(ctx, wsIds, bulk)
202+
bulk = []
203+
}
204+
} finally {
205+
await iterator.close()
198206
}
199-
if (bulk.length > 0) {
200-
await storage.remove(ctx, wsIds, bulk)
201-
bulk = []
202-
}
203-
await iterator.close()
204207
}
205208

206209
export async function objectsToArray (
@@ -210,16 +213,19 @@ export async function objectsToArray (
210213
): Promise<ListBlobResult[]> {
211214
// We need to list all files and delete them
212215
const iterator = await storage.listStream(ctx, wsIds)
213-
const bulk: ListBlobResult[] = []
214-
while (true) {
215-
const obj = await iterator.next()
216-
if (obj.length === 0) {
217-
break
216+
try {
217+
const bulk: ListBlobResult[] = []
218+
while (true) {
219+
const obj = await iterator.next()
220+
if (obj.length === 0) {
221+
break
222+
}
223+
bulk.push(...obj)
218224
}
219-
bulk.push(...obj)
225+
return bulk
226+
} finally {
227+
await iterator.close()
220228
}
221-
await iterator.close()
222-
return bulk
223229
}
224230

225231
export function getDataId (wsIds: WorkspaceIds): WorkspaceDataId {

server/minio/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ export class MinioService implements StorageAdapter {
176176
try {
177177
await removeAllObjects(ctx, this, wsIds)
178178
} catch (err: any) {
179-
ctx.error('failed t oclean all objecrs', { error: err })
179+
ctx.error('failed to clean all objects', { error: err })
180180
}
181181
if (this.opt.rootBucket === undefined) {
182182
// Also delete a bucket

server/s3/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ export class S3Service implements StorageAdapter {
227227
try {
228228
await removeAllObjects(ctx, this, wsIds)
229229
} catch (err: any) {
230-
ctx.error('failed t oclean all objecrs', { error: err })
230+
ctx.error('failed to clean all objects', { error: err })
231231
}
232232
if (this.opt.rootBucket === undefined) {
233233
// We should also delete bucket

0 commit comments

Comments
 (0)