-
Notifications
You must be signed in to change notification settings - Fork 83
chore: add worker thread for processWorkspaceFolders #2441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
31cde71
3df13bc
26831e9
bd6a856
04b9158
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| const { parentPort } = require('worker_threads') | ||
| const fs = require('fs') | ||
|
|
||
| const uniqueFiles = new Set() | ||
| let filesExceedingMaxSize = 0 | ||
| let maxFileSize | ||
| let remainingIndexSize | ||
|
|
||
| function getFileExtensionName(filepath) { | ||
| if (!filepath || !filepath.includes('.') || filepath.endsWith('.')) { | ||
| return '' | ||
| } | ||
| if (filepath.startsWith('.') && filepath.indexOf('.', 1) === -1) { | ||
| return '' | ||
| } | ||
| return filepath.substring(filepath.lastIndexOf('.') + 1).toLowerCase() | ||
| } | ||
|
|
||
| parentPort.on('message', message => { | ||
| const { type, data } = message | ||
|
|
||
| try { | ||
| if (type === 'init') { | ||
| const { maxFileSizeMB, maxIndexSizeMB } = data | ||
| const MB_TO_BYTES = 1024 * 1024 | ||
| maxFileSize = maxFileSizeMB * MB_TO_BYTES | ||
| remainingIndexSize = maxIndexSizeMB * MB_TO_BYTES | ||
| parentPort.postMessage({ type: 'ready' }) | ||
| } else if (type === 'processBatch') { | ||
| const { files, fileExtensions } = data | ||
|
|
||
| for (const file of files) { | ||
| const fileExtName = '.' + getFileExtensionName(file) | ||
| if (!uniqueFiles.has(file) && fileExtensions.includes(fileExtName)) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Converting fileExtensions to set should quicken the includes lookup There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is copied from previous processWorkspaceFolders |
||
| try { | ||
| const fileSize = fs.statSync(file).size | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should not be sync to allow node to parallelize better |
||
| if (fileSize < maxFileSize) { | ||
| if (remainingIndexSize > fileSize) { | ||
| uniqueFiles.add(file) | ||
| remainingIndexSize -= fileSize | ||
| } else { | ||
| parentPort.postMessage({ | ||
| type: 'result', | ||
| data: { | ||
| files: [...uniqueFiles], | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in case of 184K files like in description, would sending a message help with all the 184k file paths? could we stream it instead or in chunks? |
||
| filesExceedingMaxSize, | ||
| reachedLimit: true, | ||
| }, | ||
| }) | ||
| return | ||
| } | ||
| } else { | ||
| filesExceedingMaxSize++ | ||
| } | ||
| } catch (error) { | ||
| // Skip files that can't be accessed | ||
| } | ||
| } | ||
| } | ||
|
|
||
| parentPort.postMessage({ type: 'batchComplete' }) | ||
| } else if (type === 'complete') { | ||
| parentPort.postMessage({ | ||
| type: 'result', | ||
| data: { | ||
| files: [...uniqueFiles], | ||
| filesExceedingMaxSize, | ||
| reachedLimit: false, | ||
| }, | ||
| }) | ||
| } else { | ||
| parentPort.postMessage({ | ||
| type: 'error', | ||
| error: `Unknown message type: ${type}`, | ||
| }) | ||
| } | ||
| } catch (error) { | ||
| parentPort.postMessage({ | ||
| type: 'error', | ||
| error: error.message, | ||
| }) | ||
| } | ||
| }) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -404,10 +404,119 @@ export class LocalProjectContextController { | |
| } | ||
|
|
||
| this.log.info(`Processing ${workspaceFolders.length} workspace folders...`) | ||
| const startTime = Date.now() | ||
|
|
||
| maxFileSizeMB = Math.min(maxFileSizeMB ?? Infinity, this.DEFAULT_MAX_FILE_SIZE_MB) | ||
| maxIndexSizeMB = Math.min(maxIndexSizeMB ?? Infinity, this.DEFAULT_MAX_INDEX_SIZE_MB) | ||
|
|
||
| try { | ||
| const { Worker } = await import('worker_threads') | ||
| const workerPath = path.join(__dirname, 'fileProcessingWorker.js') | ||
|
|
||
| if (!fs.existsSync(workerPath)) { | ||
| throw new Error(`Worker file not found: ${workerPath}`) | ||
| } | ||
|
|
||
| this.log.info(`Processing ${workspaceFolders.length} workspace folders in worker thread`) | ||
| const worker = new Worker(workerPath) | ||
|
|
||
| return await new Promise<string[]>((resolve, reject) => { | ||
| const timeout = setTimeout(() => { | ||
| void worker.terminate() | ||
| reject(new Error('Worker timeout after 5 minutes')) | ||
| }, 300_000) | ||
|
|
||
| let batchesInProgress = 0 | ||
|
|
||
| worker.on('message', msg => { | ||
| if (msg.type === 'ready') { | ||
| // Worker initialized, start sending batches | ||
| sendBatches().catch(reject) | ||
| } else if (msg.type === 'batchComplete') { | ||
| batchesInProgress-- | ||
| } else if (msg.type === 'result') { | ||
| clearTimeout(timeout) | ||
| void worker.terminate() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. try and catch for terminate failures/errors |
||
| const { files, filesExceedingMaxSize, reachedLimit } = msg.data | ||
| const duration = Date.now() - startTime | ||
| if (reachedLimit) { | ||
| this.log.info( | ||
| `Reaching max file collection size limit ${maxIndexSizeMB} MB. ${files.length} files found. ${filesExceedingMaxSize} files exceeded ${maxFileSizeMB} MB (took ${duration}ms)` | ||
| ) | ||
| } else { | ||
| this.log.info( | ||
| `ProcessWorkspaceFolders complete. ${files.length} files found. ${filesExceedingMaxSize} files exceeded ${maxFileSizeMB} MB (took ${duration}ms using worker thread)` | ||
| ) | ||
| } | ||
| resolve(files) | ||
| } else if (msg.type === 'error') { | ||
| clearTimeout(timeout) | ||
| void worker.terminate() | ||
| reject(new Error(msg.error)) | ||
| } | ||
| }) | ||
|
|
||
| worker.on('error', err => { | ||
| clearTimeout(timeout) | ||
| void worker.terminate() | ||
| reject(err) | ||
| }) | ||
|
|
||
| async function sendBatches() { | ||
| const BATCH_SIZE = 10000 | ||
|
|
||
| for (const folder of workspaceFolders!) { | ||
| const folderPath = path.resolve(URI.parse(folder.uri).fsPath) | ||
| const filesInFolder = await listFilesWithGitignore(folderPath) | ||
|
|
||
| for (let i = 0; i < filesInFolder.length; i += BATCH_SIZE) { | ||
| const batch = filesInFolder.slice(i, i + BATCH_SIZE) | ||
| batchesInProgress++ | ||
| worker.postMessage({ | ||
| type: 'processBatch', | ||
| data: { files: batch, fileExtensions }, | ||
| }) | ||
|
|
||
| // Wait if too many batches in progress | ||
| while (batchesInProgress > 5) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. move to constants: |
||
| await sleep(10) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Wait for all batches to complete | ||
| while (batchesInProgress > 0) { | ||
| await sleep(10) | ||
| } | ||
|
|
||
| worker.postMessage({ type: 'complete' }) | ||
| } | ||
|
|
||
| worker.postMessage({ | ||
| type: 'init', | ||
| data: { maxFileSizeMB, maxIndexSizeMB }, | ||
| }) | ||
| }) | ||
| } catch (error) { | ||
| this.log.warn(`Worker thread failed, falling back to main thread: ${error}`) | ||
| const result = await this.processWorkspaceFoldersFallback( | ||
| workspaceFolders, | ||
| maxFileSizeMB, | ||
| maxIndexSizeMB, | ||
| fileExtensions | ||
| ) | ||
| const duration = Date.now() - startTime | ||
| this.log.info(`Processing completed in ${duration}ms (fallback)`) | ||
| return result | ||
| } | ||
| } | ||
|
|
||
| private async processWorkspaceFoldersFallback( | ||
| workspaceFolders: WorkspaceFolder[], | ||
| maxFileSizeMB: number, | ||
| maxIndexSizeMB: number, | ||
| fileExtensions?: string[] | ||
| ): Promise<string[]> { | ||
| const sizeConstraints: SizeConstraints = { | ||
| maxFileSize: maxFileSizeMB * this.MB_TO_BYTES, | ||
| remainingIndexSize: maxIndexSizeMB * this.MB_TO_BYTES, | ||
|
|
@@ -429,7 +538,7 @@ export class LocalProjectContextController { | |
| sizeConstraints.remainingIndexSize = sizeConstraints.remainingIndexSize - fileSize | ||
| } else { | ||
| this.log.info( | ||
| `Reaching max file collection size limit ${this.maxIndexSizeMB} MB. ${uniqueFilesToIndex.size} files found. ${filesExceedingMaxSize} files exceeded ${maxFileSizeMB} MB ` | ||
| `Reaching max file collection size limit ${maxIndexSizeMB} MB. ${uniqueFilesToIndex.size} files found. ${filesExceedingMaxSize} files exceeded ${maxFileSizeMB} MB ` | ||
| ) | ||
| return [...uniqueFilesToIndex] | ||
| } | ||
|
|
@@ -446,7 +555,7 @@ export class LocalProjectContextController { | |
| } | ||
|
|
||
| this.log.info( | ||
| `ProcessWorkspaceFolders complete. ${uniqueFilesToIndex.size} files found. ${filesExceedingMaxSize} files exceeded ${maxFileSizeMB} MB` | ||
| `ProcessWorkspaceFolders complete. ${uniqueFilesToIndex.size} files found. ${filesExceedingMaxSize} files exceeded ${maxFileSizeMB} MB (fallback)` | ||
| ) | ||
| return [...uniqueFilesToIndex] | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
confused about this...wouldn't this turn off worker threads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no the worker thread are copied to the servers folder on package.sh, we are not put it in the browser bundle using webpack. This will help resolve a CI failure on webpack