Skip to content

Commit 7036415

Browse files
authored
qfix: A better fulltext service logs (#9606)
Signed-off-by: Andrey Sobolev <[email protected]>
1 parent dd4f987 commit 7036415

File tree

7 files changed

+141
-94
lines changed

7 files changed

+141
-94
lines changed

packages/measurements-otlp/src/telemetry.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
context,
1919
metrics as otelMetrics,
2020
Span,
21+
SpanStatusCode,
2122
trace,
2223
type Context,
2324
type Gauge,
@@ -178,6 +179,10 @@ export class OpenTelemetryMetricsContext implements MeasureContext {
178179
if (span !== undefined) {
179180
void value.catch((err) => {
180181
span?.recordException(err)
182+
span?.setStatus({
183+
code: SpanStatusCode.ERROR,
184+
message: err.message
185+
})
181186
})
182187
}
183188
return value.finally(() => {
@@ -219,6 +224,10 @@ export class OpenTelemetryMetricsContext implements MeasureContext {
219224
} catch (err: any) {
220225
if (span !== undefined) {
221226
span.recordException(err)
227+
span?.setStatus({
228+
code: SpanStatusCode.ERROR,
229+
message: err.message
230+
})
222231
}
223232
throw err
224233
} finally {

pods/fulltext/src/manager.ts

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -246,16 +246,35 @@ export class WorkspaceManager {
246246
await this.withIndexer(this.ctx, ws, token, true, async (indexer) => {
247247
await indexer.dropWorkspace()
248248
const toIndex = await indexer.getIndexClassess()
249-
for (const { domain, classes } of toIndex) {
250-
await control.heartbeat()
251-
await indexer.reindex(this.ctx, domain, classes, control)
252-
}
249+
this.ctx.info('reindex starting full', { workspace: ws })
250+
await this.ctx.with(
251+
'reindex-workspace',
252+
{},
253+
async (ctx) => {
254+
for (const { domain, classes } of toIndex) {
255+
try {
256+
await control.heartbeat()
257+
await indexer.reindex(ctx, domain, classes, control)
258+
} catch (err: any) {
259+
ctx.error('failed to reindex domain', { workspace: ws })
260+
throw err
261+
}
262+
}
263+
},
264+
{ workspace: ws }
265+
)
266+
this.ctx.info('reindex full done', { workspace: ws })
253267
})
254268
} else if (mm.type === QueueWorkspaceEvent.Reindex) {
255269
const mmd = mm as QueueWorkspaceReindexMessage
256270
if (!this.restoring.has(ws)) {
257271
await this.withIndexer(this.ctx, ws, token, true, async (indexer) => {
258-
await indexer.reindex(this.ctx, mmd.domain, mmd.classes, control)
272+
try {
273+
await indexer.reindex(this.ctx, mmd.domain, mmd.classes, control)
274+
} catch (err: any) {
275+
this.ctx.error('failed to reindex domain', { workspace: ws })
276+
throw err
277+
}
259278
})
260279
}
261280
}

server/core/src/content.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
//
1515

1616
import { type MeasureContext, type WorkspaceUuid } from '@hcengineering/core'
17-
import { type Readable } from 'stream'
1817
import { type ContentTextAdapterConfiguration } from './configuration'
1918
import { type ContentTextAdapter } from './types'
2019

@@ -24,7 +23,7 @@ class ContentAdapter implements ContentTextAdapter {
2423
private readonly defaultAdapter: ContentTextAdapter
2524
) {}
2625

27-
content (ctx: MeasureContext, workspace: WorkspaceUuid, name: string, type: string, doc: Readable): Promise<string> {
26+
content (ctx: MeasureContext, workspace: WorkspaceUuid, name: string, type: string, doc: Buffer): Promise<string> {
2827
const adapter = this.adapters.get(type) ?? this.defaultAdapter
2928
return adapter.content(ctx, workspace, name, type, doc)
3029
}

server/core/src/types.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515

1616
import { type ServerApi as CommunicationApi } from '@hcengineering/communication-sdk-types'
1717
import {
18-
type AccountRole,
1918
type Account,
19+
type AccountRole,
2020
type AccountUuid,
2121
type Branding,
2222
type Class,
@@ -54,7 +54,6 @@ import type { Asset, Resource } from '@hcengineering/platform'
5454
import type { LiveQuery } from '@hcengineering/query'
5555
import type { RateLimitInfo, ReqId, Request, Response } from '@hcengineering/rpc'
5656
import type { Token } from '@hcengineering/server-token'
57-
import { type Readable } from 'stream'
5857

5958
import type { DbAdapter, DomainHelper } from './adapter'
6059
import { type PlatformQueue, type PlatformQueueProducer, type QueueTopic } from './queue'
@@ -422,7 +421,7 @@ export type FullTextAdapterFactory = (url: string) => Promise<FullTextAdapter>
422421
* @public
423422
*/
424423
export interface ContentTextAdapter {
425-
content: (ctx: MeasureContext, workspace: WorkspaceUuid, name: string, type: string, doc: Readable) => Promise<string>
424+
content: (ctx: MeasureContext, workspace: WorkspaceUuid, name: string, type: string, doc: Buffer) => Promise<string>
426425
}
427426

428427
/**

server/indexer/src/indexer/indexer.ts

Lines changed: 63 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import core, {
2424
type Doc,
2525
docKey,
2626
type Domain,
27+
DOMAIN_COLLABORATOR,
2728
DOMAIN_MODEL,
2829
type FullTextSearchContext,
2930
getFullTextIndexableAttributes,
@@ -247,6 +248,10 @@ export class FullTextIndexPipeline implements FullTextPipeline {
247248
}
248249

249250
const byDomain = groupByArray(allIndexed, (it) => this.hierarchy.getDomain(it))
251+
252+
// Delete few domains
253+
byDomain.delete(DOMAIN_COLLABORATOR)
254+
250255
return Array.from(byDomain.entries())
251256
.sort((a, b) => {
252257
const ap = domainPriorities[a[0]] ?? 0
@@ -265,21 +270,19 @@ export class FullTextIndexPipeline implements FullTextPipeline {
265270
classes: Ref<Class<Doc>>[],
266271
control?: ConsumerControl
267272
): Promise<void> {
268-
ctx.warn('verify document structure', { workspace: this.workspace.uuid })
273+
ctx.warn('reindex verify document structure', { domain, workspace: this.workspace.uuid })
269274

270275
let processed = 0
271276
let processedCommunication = 0
272277
let hasCards = false
273-
await ctx.with('reindex-domain', { domain }, async (ctx) => {
278+
await ctx.with('reindex domain', { domain }, async (ctx) => {
274279
// Iterate over all domain documents and add appropriate entries
275280
const allDocs = this.storage.rawFind(ctx, domain)
276281
try {
277-
let lastPrint = 0
282+
let lastPrint = platformNow()
278283
const pushQueue = new ElasticPushQueue(this.fulltextAdapter, this.workspace, ctx, control)
279284
while (true) {
280-
if (control !== undefined) {
281-
await control?.heartbeat()
282-
}
285+
await control?.heartbeat()
283286
const docs = await allDocs.find(ctx)
284287
if (docs.length === 0) {
285288
break
@@ -307,7 +310,12 @@ export class FullTextIndexPipeline implements FullTextPipeline {
307310

308311
const now = platformNow()
309312
if (now - lastPrint > printThresholdMs) {
310-
ctx.info('processed', { processed, elapsed: Math.round(now - lastPrint), domain })
313+
ctx.info('processed', {
314+
processed,
315+
elapsed: Math.round(now - lastPrint),
316+
domain,
317+
workspace: this.workspace.uuid
318+
})
311319
lastPrint = now
312320
}
313321
}
@@ -441,7 +449,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
441449
}
442450
const indexedDoc = createIndexedDoc(doc, this.hierarchy.findAllMixins(doc), doc.space)
443451

444-
await rateLimit.exec(async () => {
452+
await rateLimit.add(async () => {
445453
await ctx.with('process-document', { _class: doc._class }, async (ctx) => {
446454
try {
447455
// Collect all indexable values
@@ -452,7 +460,10 @@ export class FullTextIndexPipeline implements FullTextPipeline {
452460

453461
for (const [, v] of Object.entries(content)) {
454462
if (v.attr.type._class === core.class.TypeBlob) {
455-
await this.processBlob(ctx, v, doc, indexedDoc)
463+
await ctx.with('process-blob', {}, (ctx) => this.processBlob(ctx, v, doc, indexedDoc), {
464+
attr: v.attr.name,
465+
value: v.value
466+
})
456467
continue
457468
}
458469

@@ -545,7 +556,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
545556
let processed = 0
546557
const cardsInfo = new Map<CardID, { space: Ref<Space>, _class: Ref<Class<Doc>> }>()
547558
const rateLimit = new RateLimiter(10)
548-
let lastPrint = 0
559+
let lastPrint = platformNow()
549560
await ctx.with('process-message-groups', {}, async (ctx) => {
550561
let groups = await communicationApi.findMessagesGroups(this.communicationSession, {
551562
limit: messageGroupsLimit,
@@ -590,7 +601,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
590601
if (message.removed) {
591602
continue
592603
}
593-
await rateLimit.exec(async () => {
604+
await rateLimit.add(async () => {
594605
await this.processCommunicationMessage(
595606
ctx,
596607
pushQueue,
@@ -603,7 +614,11 @@ export class FullTextIndexPipeline implements FullTextPipeline {
603614
processed += 1
604615
const now = platformNow()
605616
if (now - lastPrint > printThresholdMs) {
606-
ctx.info('processed', { processedCommunication: processed, elapsed: Math.round(now - lastPrint) })
617+
ctx.info('processed', {
618+
processedCommunication: processed,
619+
elapsed: Math.round(now - lastPrint),
620+
workspace: this.workspace.uuid
621+
})
607622
lastPrint = now
608623
}
609624
}
@@ -652,7 +667,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
652667
if (this.cancelling) {
653668
return processed
654669
}
655-
await rateLimit.exec(async () => {
670+
await rateLimit.add(async () => {
656671
await this.processCommunicationMessage(
657672
ctx,
658673
pushQueue,
@@ -672,7 +687,11 @@ export class FullTextIndexPipeline implements FullTextPipeline {
672687
processed += 1
673688
const now = platformNow()
674689
if (now - lastPrint > printThresholdMs) {
675-
ctx.info('processed', { processedCommunication: processed, elapsed: Math.round(now - lastPrint) })
690+
ctx.info('processed', {
691+
processedCommunication: processed,
692+
elapsed: Math.round(now - lastPrint),
693+
workspace: this.workspace.uuid
694+
})
676695
lastPrint = now
677696
}
678697
}
@@ -992,7 +1011,6 @@ export class FullTextIndexPipeline implements FullTextPipeline {
9921011
}
9931012
}
9941013

995-
@withContext('process-blob')
9961014
private async processBlob (
9971015
ctx: MeasureContext<any>,
9981016
v: { value: any, attr: AnyAttribute },
@@ -1005,6 +1023,9 @@ export class FullTextIndexPipeline implements FullTextPipeline {
10051023
if (ref === '' || ref.startsWith('http://') || ref.startsWith('https://')) {
10061024
return
10071025
}
1026+
if (ref.startsWith('{')) {
1027+
return
1028+
}
10081029
if (v.attr.name === 'avatar' || v.attr.attributeOf === contactPlugin.class.Contact) {
10091030
return
10101031
}
@@ -1126,10 +1147,8 @@ export class FullTextIndexPipeline implements FullTextPipeline {
11261147
// We have blob, we need to decode it to string.
11271148
const contentType = (docInfo.contentType ?? defaultContentType).split(';')[0]
11281149

1129-
if (
1130-
(contentType.includes('text/') && contentType !== 'text/rtf') ||
1131-
contentType.includes('application/vnd.github.VERSION.diff')
1132-
) {
1150+
const ct = contentType.toLocaleLowerCase()
1151+
if ((ct.includes('text/') && contentType !== 'text/rtf') || ct.includes('application/vnd.github.version.diff')) {
11331152
await this.handleTextBlob(ctx, docInfo, indexedDoc)
11341153
} else if (isBlobAllowed(contentType)) {
11351154
await this.handleBlob(ctx, docInfo, indexedDoc)
@@ -1140,25 +1159,31 @@ export class FullTextIndexPipeline implements FullTextPipeline {
11401159
private async handleBlob (ctx: MeasureContext<any>, docInfo: Blob | undefined, indexedDoc: IndexedDoc): Promise<void> {
11411160
if (docInfo !== undefined) {
11421161
const contentType = (docInfo.contentType ?? '').split(';')[0]
1143-
const readable = await this.storageAdapter?.get(ctx, this.workspace, docInfo._id)
11441162

1145-
if (readable !== undefined) {
1146-
try {
1147-
let textContent = await ctx.with('fetch', {}, () =>
1148-
this.contentAdapter.content(ctx, this.workspace.uuid, docInfo._id, contentType, readable)
1149-
)
1150-
textContent = textContent
1151-
.split(/ +|\t+|\f+/)
1152-
.filter((it) => it)
1153-
.join(' ')
1154-
.split(/\n\n+/)
1155-
.join('\n')
1156-
1157-
indexedDoc.fulltextSummary += '\n' + textContent
1158-
} finally {
1159-
readable?.destroy()
1160-
}
1163+
if (docInfo.size > 30 * 1024 * 1024) {
1164+
throw new Error('Blob size exceeds limit of 30MB')
11611165
}
1166+
const buffer = Buffer.concat(
1167+
await ctx.with('fetch', {}, (ctx) => this.storageAdapter?.read(ctx, this.workspace, docInfo._id))
1168+
)
1169+
let textContent = await ctx.with(
1170+
'to-text',
1171+
{},
1172+
(ctx) => this.contentAdapter.content(ctx, this.workspace.uuid, docInfo._id, contentType, buffer),
1173+
{
1174+
workspace: this.workspace.uuid,
1175+
blobId: docInfo._id,
1176+
contentType
1177+
}
1178+
)
1179+
textContent = textContent
1180+
.split(/ +|\t+|\f+/)
1181+
.filter((it) => it)
1182+
.join(' ')
1183+
.split(/\n\n+/)
1184+
.join('\n')
1185+
1186+
indexedDoc.fulltextSummary += '\n' + textContent
11621187
}
11631188
}
11641189

@@ -1190,6 +1215,7 @@ function isBlobAllowed (contentType: string): boolean {
11901215
!contentType.includes('binary/octet-stream') &&
11911216
!contentType.includes('application/octet-stream') &&
11921217
!contentType.includes('application/zip') &&
1193-
!contentType.includes('application/x-zip-compressed')
1218+
!contentType.includes('application/x-zip-compressed') &&
1219+
!contentType.includes('application/link-preview')
11941220
)
11951221
}

server/indexer/src/rekoni.ts

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,40 +12,29 @@ export async function createRekoniAdapter (url: string): Promise<ContentTextAdap
1212
workspace: WorkspaceUuid,
1313
name: string,
1414
type: string,
15-
doc
15+
body: Buffer
1616
): Promise<string> => {
1717
const token = generateToken(systemAccountUuid, workspace, { service: 'rekoni' })
1818
try {
1919
// Node doesn't support Readable with fetch.
20-
const chunks: any[] = []
21-
let len = 0
22-
await new Promise<void>((resolve, reject) => {
23-
doc.on('data', (chunk) => {
24-
len += (chunk as Buffer).length
25-
chunks.push(chunk)
26-
if (len > 30 * 1024 * 1024) {
27-
reject(new Error('file to big for content processing'))
28-
}
29-
})
30-
doc.on('end', () => {
31-
resolve()
32-
})
33-
doc.on('error', (err) => {
34-
reject(err)
35-
})
36-
})
3720

38-
const body: Buffer = Buffer.concat(chunks)
39-
const r = await (
40-
await fetch(`${url}/toText?name=${encodeURIComponent(name)}&type=${encodeURIComponent(type)}`, {
41-
method: 'POST',
42-
body,
43-
headers: {
44-
Authorization: 'Bearer ' + token,
45-
'Content-type': 'application/octet-stream'
46-
}
47-
} as any)
48-
).json()
21+
const r = await ctx.with(
22+
'rekoni',
23+
{},
24+
async (ctx) =>
25+
await (
26+
await fetch(`${url}/toText?name=${encodeURIComponent(name)}&type=${encodeURIComponent(type)}`, {
27+
method: 'POST',
28+
body,
29+
keepalive: true,
30+
headers: {
31+
Authorization: 'Bearer ' + token,
32+
'Content-type': 'application/octet-stream'
33+
}
34+
} as any)
35+
).json(),
36+
{ url: `${url}/toText`, name, type }
37+
)
4938
if (r.error !== undefined) {
5039
throw new Error(JSON.stringify(r.error))
5140
}

0 commit comments

Comments
 (0)