diff --git a/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/artifactManager.ts b/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/artifactManager.ts index ae836876d9..f364429119 100644 --- a/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/artifactManager.ts +++ b/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/artifactManager.ts @@ -47,8 +47,13 @@ const IGNORE_PATTERNS = [ '**/target/**', // Maven/Gradle builds ] -const MAX_UNCOMPRESSED_SRC_SIZE_MB = 250 // 250 MB limit per language per workspace folder -const MAX_UNCOMPRESSED_SRC_SIZE_BYTES = MAX_UNCOMPRESSED_SRC_SIZE_MB * 1024 * 1024 // Convert to bytes +interface FileSizeDetails { + includedFileCount: number + includedSize: number + skippedFileCount: number + skippedSize: number +} +const MAX_UNCOMPRESSED_SRC_SIZE_BYTES = 2 * 1024 * 1024 * 1024 // 2 GB export class ArtifactManager { private workspace: Workspace @@ -81,6 +86,12 @@ export class ArtifactManager { async addNewDirectories(newDirectories: URI[]): Promise { let zipFileMetadata: FileMetadata[] = [] + const fileSizeDetails: FileSizeDetails = { + includedFileCount: 0, + includedSize: 0, + skippedFileCount: 0, + skippedSize: 0, + } for (const directory of newDirectories) { const workspaceFolder = this.workspaceFolders.find(ws => directory.path.startsWith(URI.parse(ws.uri).path)) @@ -95,7 +106,12 @@ export class ArtifactManager { const relativePath = path.relative(workspacePath, directory.path) const filesByLanguage = await this.processDirectory(workspaceFolder, directory.path, relativePath) - zipFileMetadata = await this.processFilesByLanguage(workspaceFolder, filesByLanguage, relativePath) + zipFileMetadata = await this.processFilesByLanguage( + workspaceFolder, + fileSizeDetails, + filesByLanguage, + relativePath + ) } catch (error) { this.logging.warn(`Error processing new directory ${directory.path}: ${error}`) } @@ -286,9 +302,12 @@ export class ArtifactManager { return filesMetadata } - cleanup(preserveDependencies: boolean = false) { + cleanup(preserveDependencies: boolean = false, workspaceFolders?: WorkspaceFolder[]) { try { - this.workspaceFolders.forEach(workspaceToRemove => { + if (workspaceFolders === undefined) { + workspaceFolders = this.workspaceFolders + } + workspaceFolders.forEach(workspaceToRemove => { const workspaceDirPath = path.join(this.tempDirPath, workspaceToRemove.name) if (preserveDependencies) { @@ -430,16 +449,16 @@ export class ArtifactManager { private async createZipForLanguage( workspaceFolder: WorkspaceFolder, + fileSizeDetails: FileSizeDetails, language: CodewhispererLanguage, files: FileMetadata[], subDirectory: string = '' - ): Promise { + ): Promise { const zipDirectoryPath = path.join(this.tempDirPath, workspaceFolder.name, subDirectory) this.createFolderIfNotExist(zipDirectoryPath) const zipPath = path.join(zipDirectoryPath, `${language}.zip`) - let currentSize = 0 let skippedSize = 0 let skippedFiles = 0 const filesToInclude: FileMetadata[] = [] @@ -447,26 +466,33 @@ export class ArtifactManager { // Don't add files to the zip if the total size of uncompressed source code would go over the limit // Currently there is no ordering on the files. If the first file added to the zip is equal to the limit, only it will be added and no other files will be added for (const file of files) { - if (currentSize + file.contentLength <= MAX_UNCOMPRESSED_SRC_SIZE_BYTES) { + if (fileSizeDetails.includedSize + file.contentLength <= MAX_UNCOMPRESSED_SRC_SIZE_BYTES) { filesToInclude.push(file) - currentSize += file.contentLength + fileSizeDetails.includedSize += file.contentLength + fileSizeDetails.includedFileCount += 1 } else { skippedSize += file.contentLength skippedFiles += 1 + fileSizeDetails.skippedSize += file.contentLength + fileSizeDetails.skippedFileCount += 1 } } - const zipBuffer = await this.createZipBuffer(filesToInclude) - await fs.promises.writeFile(zipPath, zipBuffer) - - const stats = fs.statSync(zipPath) - if (skippedFiles > 0) { this.log( `Skipped ${skippedFiles} ${language} files of total size ${skippedSize} bytes due to exceeding the maximum zip size` ) } + if (filesToInclude.length === 0) { + return undefined + } + + const zipBuffer = await this.createZipBuffer(filesToInclude) + await fs.promises.writeFile(zipPath, zipBuffer) + + const stats = fs.statSync(zipPath) + return { filePath: zipPath, relativePath: path.join(workspaceFolder.name, subDirectory, `files.zip`), @@ -569,18 +595,35 @@ export class ArtifactManager { private async processWorkspaceFolders(workspaceFolders: WorkspaceFolder[]): Promise { const startTime = performance.now() let zipFileMetadata: FileMetadata[] = [] + const fileSizeDetails: FileSizeDetails = { + includedFileCount: 0, + includedSize: 0, + skippedFileCount: 0, + skippedSize: 0, + } for (const workspaceFolder of workspaceFolders) { const workspacePath = URI.parse(workspaceFolder.uri).path try { const filesByLanguage = await this.processDirectory(workspaceFolder, workspacePath) - const fileMetadata = await this.processFilesByLanguage(workspaceFolder, filesByLanguage) + const fileMetadata = await this.processFilesByLanguage( + workspaceFolder, + fileSizeDetails, + filesByLanguage + ) zipFileMetadata.push(...fileMetadata) } catch (error) { this.logging.warn(`Error processing workspace folder ${workspacePath}: ${error}`) } } + if (fileSizeDetails.skippedFileCount > 0) { + this.logging.warn( + `Skipped ${fileSizeDetails.skippedFileCount} files (total size: ` + + `${fileSizeDetails.skippedSize} bytes) due to exceeding the maximum artifact size` + ) + } + const totalTime = performance.now() - startTime this.log(`Creating workspace source code artifacts took: ${totalTime.toFixed(2)}ms`) @@ -589,22 +632,31 @@ export class ArtifactManager { private async processFilesByLanguage( workspaceFolder: WorkspaceFolder, + fileSizeDetails: FileSizeDetails, filesByLanguage: Map, relativePath?: string ): Promise { const zipFileMetadata: FileMetadata[] = [] await this.updateWorkspaceFiles(workspaceFolder, filesByLanguage) - for (const [language, files] of filesByLanguage.entries()) { - // Genrate java .classpath and .project files + // Generate java .classpath and .project files const processedFiles = language === 'java' ? await this.processJavaProjectConfig(workspaceFolder, files) : files - const zipMetadata = await this.createZipForLanguage(workspaceFolder, language, processedFiles, relativePath) - this.log( - `Created zip for language ${language} out of ${processedFiles.length} files in ${workspaceFolder.name}` + const zipMetadata = await this.createZipForLanguage( + workspaceFolder, + fileSizeDetails, + language, + processedFiles, + relativePath ) - zipFileMetadata.push(zipMetadata) + + if (zipMetadata) { + this.log( + `Created zip for language ${language} out of ${processedFiles.length} files in ${workspaceFolder.name}` + ) + zipFileMetadata.push(zipMetadata) + } } return zipFileMetadata } diff --git a/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/client.ts b/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/client.ts index a4b7366c90..20c0c74467 100644 --- a/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/client.ts +++ b/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/client.ts @@ -11,7 +11,6 @@ export class WebSocketClient { private reconnectAttempts: number = 0 private readonly maxReconnectAttempts: number = 5 private messageQueue: string[] = [] - private readonly messageThrottleDelay: number = 100 constructor(url: string, logging: Logging, credentialsProvider: CredentialsProvider) { this.url = url @@ -104,7 +103,11 @@ export class WebSocketClient { while (this.messageQueue.length > 0) { const message = this.messageQueue.shift() if (message) { - this.send(message).catch(error => this.logging.error(`Error sending message: ${error}`)) + try { + this.send(message) + } catch (error) { + this.logging.error(`Error sending message: ${error}`) + } } } } @@ -141,23 +144,10 @@ export class WebSocketClient { } } - // https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_client_applications#sending_data_to_the_server - // TODO, the approach of delaying websocket messages should be investigated and validated - // The current approach could be susceptible to race conditions that might result in out of order events - // Consider this scenario - // wsClient.send("message1"); // needs throttling, will wait 100ms - // wsClient.send("message2"); // runs immediately without waiting - - // What actually happens: - // - Both calls start executing simultaneously - // - Both check timeSinceLastMessage at nearly the same time - // - Both might determine they need to wait - // - They could end up sending in unpredictable order - // It might be better to keep an active queue in the client and expose enqueueMessage instead of send - public async send(message: string): Promise { + public send(message: string): void { if (this.ws?.readyState === WebSocket.OPEN) { - await new Promise(resolve => setTimeout(resolve, this.messageThrottleDelay)) this.ws.send(message) + this.logging.debug('Message sent successfully to the remote workspace') } else { this.queueMessage(message) } diff --git a/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/dependency/dependencyDiscoverer.ts b/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/dependency/dependencyDiscoverer.ts index 8d891fc527..f0d41e9801 100644 --- a/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/dependency/dependencyDiscoverer.ts +++ b/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/dependency/dependencyDiscoverer.ts @@ -12,6 +12,8 @@ export class DependencyDiscoverer { private workspaceFolders: WorkspaceFolder[] public dependencyHandlerRegistry: LanguageDependencyHandler[] = [] private initializedWorkspaceFolder = new Map() + // Create a SharedArrayBuffer with 4 bytes (for a 32-bit unsigned integer) for thread-safe counter + protected dependencyUploadedSizeSum = new Uint32Array(new SharedArrayBuffer(4)) constructor( workspace: Workspace, @@ -21,6 +23,7 @@ export class DependencyDiscoverer { ) { this.workspaceFolders = workspaceFolders this.logging = logging + this.dependencyUploadedSizeSum[0] = 0 let jstsHandlerCreated = false supportedWorkspaceContextLanguages.forEach(language => { @@ -29,7 +32,8 @@ export class DependencyDiscoverer { workspace, logging, workspaceFolders, - artifactManager + artifactManager, + this.dependencyUploadedSizeSum ) if (handler) { // Share handler for javascript and typescript @@ -130,6 +134,13 @@ export class DependencyDiscoverer { this.logging.log(`Dependency search completed successfully`) } + async reSyncDependenciesToS3(folders: WorkspaceFolder[]) { + Atomics.store(this.dependencyUploadedSizeSum, 0, 0) + for (const dependencyHandler of this.dependencyHandlerRegistry) { + await dependencyHandler.zipDependencyMap(folders) + } + } + async handleDependencyUpdateFromLSP(language: string, paths: string[], workspaceRoot?: WorkspaceFolder) { for (const dependencyHandler of this.dependencyHandlerRegistry) { if (dependencyHandler.language != language) { @@ -144,6 +155,7 @@ export class DependencyDiscoverer { this.dependencyHandlerRegistry.forEach(dependencyHandler => { dependencyHandler.dispose() }) + Atomics.store(this.dependencyUploadedSizeSum, 0, 0) } public disposeWorkspaceFolder(workspaceFolder: WorkspaceFolder) { diff --git a/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/dependency/dependencyHandler/LanguageDependencyHandler.ts b/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/dependency/dependencyHandler/LanguageDependencyHandler.ts index f7b73927b7..44a5c093bb 100644 --- a/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/dependency/dependencyHandler/LanguageDependencyHandler.ts +++ b/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/dependency/dependencyHandler/LanguageDependencyHandler.ts @@ -26,13 +26,14 @@ export abstract class LanguageDependencyHandler { protected workspaceFolders: WorkspaceFolder[] // key: workspaceFolder, value: {key: dependency name, value: Dependency} protected dependencyMap = new Map>() - protected dependencyUploadedSize = new Map() + protected dependencyUploadedSizeMap = new Map() + protected dependencyUploadedSizeSum: Uint32Array protected dependencyWatchers: Map = new Map() protected artifactManager: ArtifactManager protected dependenciesFolderName: string protected eventEmitter: EventEmitter protected readonly MAX_SINGLE_DEPENDENCY_SIZE: number = 500 * 1024 * 1024 // 500 MB - protected readonly MAX_WORKSPACE_DEPENDENCY_SIZE: number = 5 * 1024 * 1024 * 1024 //5 GB + protected readonly MAX_WORKSPACE_DEPENDENCY_SIZE: number = 8 * 1024 * 1024 * 1024 // 8 GB constructor( language: CodewhispererLanguage, @@ -40,7 +41,8 @@ export abstract class LanguageDependencyHandler { logging: Logging, workspaceFolders: WorkspaceFolder[], artifactManager: ArtifactManager, - dependenciesFolderName: string + dependenciesFolderName: string, + dependencyUploadedSizeSum: Uint32Array ) { this.language = language this.workspace = workspace @@ -54,7 +56,7 @@ export abstract class LanguageDependencyHandler { this.workspaceFolders.forEach(workSpaceFolder => this.dependencyMap.set(workSpaceFolder, new Map()) ) - + this.dependencyUploadedSizeSum = dependencyUploadedSizeSum this.eventEmitter = new EventEmitter() } @@ -85,14 +87,18 @@ export abstract class LanguageDependencyHandler { dependencyMap: Map ): void - public onDependencyChange(callback: (workspaceFolder: WorkspaceFolder, zips: FileMetadata[]) => void): void { + public onDependencyChange( + callback: (workspaceFolder: WorkspaceFolder, zips: FileMetadata[], addWSFolderPathInS3: boolean) => void + ): void { this.eventEmitter.on('dependencyChange', callback) } protected emitDependencyChange(workspaceFolder: WorkspaceFolder, zips: FileMetadata[]): void { if (zips.length > 0) { this.logging.log(`Emitting ${this.language} dependency change event for ${workspaceFolder.name}`) - this.eventEmitter.emit('dependencyChange', workspaceFolder, zips) + // If language is JavaScript or TypeScript, we want to preserve the workspaceFolder path in S3 path + const addWSFolderPathInS3 = this.language === 'javascript' || this.language === 'typescript' + this.eventEmitter.emit('dependencyChange', workspaceFolder, zips, addWSFolderPathInS3) return } } @@ -170,10 +176,11 @@ export abstract class LanguageDependencyHandler { } currentChunk.push(dependency) currentChunkSize += dependency.size - this.dependencyUploadedSize.set( + this.dependencyUploadedSizeMap.set( workspaceFolder, - (this.dependencyUploadedSize.get(workspaceFolder) || 0) + dependency.size + (this.dependencyUploadedSizeMap.get(workspaceFolder) || 0) + dependency.size ) + Atomics.add(this.dependencyUploadedSizeSum, 0, dependency.size) // Mark this dependency that has been zipped dependency.zipped = true this.dependencyMap.get(workspaceFolder)?.set(dependency.name, dependency) @@ -299,7 +306,7 @@ export abstract class LanguageDependencyHandler { * However, everytime flare server restarts, this dependency map will be initialized. */ private validateWorkspaceDependencySize(workspaceFolder: WorkspaceFolder): boolean { - let uploadedSize = this.dependencyUploadedSize.get(workspaceFolder) + let uploadedSize = Atomics.load(this.dependencyUploadedSizeSum, 0) if (uploadedSize && this.MAX_WORKSPACE_DEPENDENCY_SIZE < uploadedSize) { return false } @@ -308,14 +315,15 @@ export abstract class LanguageDependencyHandler { dispose(): void { this.dependencyMap.clear() - this.dependencyUploadedSize.clear() + this.dependencyUploadedSizeMap.clear() this.dependencyWatchers.forEach(watcher => watcher.close()) this.dependencyWatchers.clear() } disposeWorkspaceFolder(workspaceFolder: WorkspaceFolder): void { this.dependencyMap.delete(workspaceFolder) - this.dependencyUploadedSize.delete(workspaceFolder) + Atomics.sub(this.dependencyUploadedSizeSum, 0, this.dependencyUploadedSizeMap.get(workspaceFolder) || 0) + this.dependencyUploadedSizeMap.delete(workspaceFolder) this.disposeWatchers(workspaceFolder) this.disposeDependencyInfo(workspaceFolder) } diff --git a/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/dependency/dependencyHandler/LanguageDependencyHandlerFactory.ts b/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/dependency/dependencyHandler/LanguageDependencyHandlerFactory.ts index 8053283088..d5cadbe520 100644 --- a/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/dependency/dependencyHandler/LanguageDependencyHandlerFactory.ts +++ b/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/dependency/dependencyHandler/LanguageDependencyHandlerFactory.ts @@ -12,7 +12,8 @@ export class DependencyHandlerFactory { workspace: Workspace, logging: Logging, workspaceFolders: WorkspaceFolder[], - artifactManager: ArtifactManager + artifactManager: ArtifactManager, + dependencyUploadedSizeSum: Uint32Array ): LanguageDependencyHandler | null { switch (language.toLowerCase()) { case 'python': @@ -22,7 +23,8 @@ export class DependencyHandlerFactory { logging, workspaceFolders, artifactManager, - 'site-packages' + 'site-packages', + dependencyUploadedSizeSum ) case 'javascript': case 'typescript': @@ -32,7 +34,8 @@ export class DependencyHandlerFactory { logging, workspaceFolders, artifactManager, - 'node_modules' + 'node_modules', + dependencyUploadedSizeSum ) case 'java': return new JavaDependencyHandler( @@ -41,7 +44,8 @@ export class DependencyHandlerFactory { logging, workspaceFolders, artifactManager, - 'dependencies' + 'dependencies', + dependencyUploadedSizeSum ) default: return null diff --git a/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/javaManager.ts b/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/javaManager.ts index d37ad5e976..08956a6ef7 100644 --- a/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/javaManager.ts +++ b/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/javaManager.ts @@ -861,11 +861,10 @@ export class EclipseConfigGenerator { private async initializeProjectFiles(): Promise { try { const eclipseFiles = ['.project', '.classpath'] - for (const fileName of eclipseFiles) { const pattern = path.join(this.workspacePath, '**', fileName) const files = await glob(pattern, { - ignore: IGNORE_PATTERNS, + ignore: IGNORE_PATTERNS.filter(p => p !== '**/.*'), onlyFiles: true, followSymbolicLinks: false, dot: true, diff --git a/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/workspaceContextServer.ts b/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/workspaceContextServer.ts index 9e523afc0c..cfc68f0348 100644 --- a/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/workspaceContextServer.ts +++ b/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/workspaceContextServer.ts @@ -5,7 +5,7 @@ import { Server, WorkspaceFolder, } from '@aws/language-server-runtimes/server-interface' -import { cleanUrl, getRelativePath, isDirectory, isEmptyDirectory, isLoggedInUsingBearerToken } from './util' +import { cleanUrl, isDirectory, isEmptyDirectory, isLoggedInUsingBearerToken } from './util' import { ArtifactManager, FileMetadata, SUPPORTED_WORKSPACE_CONTEXT_LANGUAGES } from './artifactManager' import { WorkspaceFolderManager } from './workspaceFolderManager' import { URI } from 'vscode-uri' @@ -20,6 +20,7 @@ const Q_CONTEXT_CONFIGURATION_SECTION = 'aws.q.workspaceContext' export const WorkspaceContextServer = (): Server => features => { const { credentialsProvider, workspace, logging, lsp, runtime, sdkInitializator } = features + let workspaceIdentifier: string = '' let workspaceFolders: WorkspaceFolder[] = [] let artifactManager: ArtifactManager let dependencyDiscoverer: DependencyDiscoverer @@ -31,6 +32,11 @@ export const WorkspaceContextServer = (): Server => features => { let amazonQServiceManager: AmazonQTokenServiceManager lsp.addInitializer((params: InitializeParams) => { + workspaceIdentifier = params.initializationOptions?.aws?.contextConfiguration?.workspaceIdentifier || '' + if (!workspaceIdentifier) { + logging.warn(`No workspaceIdentifier set!`) + } + const folders = workspace.getAllWorkspaceFolders() workspaceFolders = folders || params.workspaceFolders || [] @@ -76,22 +82,21 @@ export const WorkspaceContextServer = (): Server => features => { lsp.extensions.onGetConfigurationFromServer( async (params: GetConfigurationFromServerParams, token: CancellationToken) => { if (params.section === Q_CONTEXT_CONFIGURATION_SECTION) { - const workspaceMap = workspaceFolderManager.getWorkspaces() - - // Filter workspaces to only include those with websocket connected. - // To reduce the error of GenerateCompletions getting repoMap from server-side workspace context, - // with websocket connected, at least workspace/didChangeWorkspaceFolders websocket request has been sent. - // When the workspace is reopened and server-side was prepared, compared to filter with READY state of workspace, - // this filter would add delay to wait for websocket connection being established - const workspaceArray = Array.from(workspaceMap) - .filter(([_, workspaceState]) => workspaceState.webSocketClient?.isConnected()) - .map(([workspaceRoot, workspaceState]) => ({ - workspaceRoot, - workspaceId: workspaceState.workspaceId ?? '', - })) + // Only append workspaceId to GenerateCompletions when WebSocket client is connected + if ( + !workspaceFolderManager.getWorkspaceState().webSocketClient?.isConnected || + !workspaceFolderManager.getWorkspaceState().workspaceId + ) { + return { + workspaces: [], + } + } return { - workspaces: workspaceArray, + workspaces: workspaceFolders.map(workspaceFolder => ({ + workspaceRoot: workspaceFolder.uri, + workspaceId: workspaceFolderManager.getWorkspaceState().workspaceId, + })), } } return { @@ -154,7 +159,8 @@ export const WorkspaceContextServer = (): Server => features => { isOptedIn && isLoggedInUsingBearerToken(credentialsProvider) && abTestingEnabled && - !workspaceFolderManager.getOptOutStatus() + !workspaceFolderManager.getOptOutStatus() && + workspaceIdentifier ) } @@ -169,7 +175,8 @@ export const WorkspaceContextServer = (): Server => features => { artifactManager, dependencyDiscoverer, workspaceFolders, - credentialsProvider + credentialsProvider, + workspaceIdentifier ) await updateConfiguration() @@ -232,6 +239,9 @@ export const WorkspaceContextServer = (): Server => features => { return } + workspaceFolderManager.initializeWorkspaceStatusMonitor().catch(error => { + logging.error(`Error while initializing workspace status monitoring: ${error}`) + }) logging.log(`Workspace context workflow initialized`) artifactManager.updateWorkspaceFolders(workspaceFolders) workspaceFolderManager.processNewWorkspaceFolders(workspaceFolders).catch(error => { @@ -262,14 +272,14 @@ export const WorkspaceContextServer = (): Server => features => { logging.log(`Received didSave event for ${event.textDocument.uri}`) - const result = workspaceFolderManager.getWorkspaceDetailsWithId(event.textDocument.uri, workspaceFolders) - if (!result) { - logging.log(`No workspace found for ${event.textDocument.uri} discarding the save event`) + const workspaceFolder = workspaceFolderManager.getWorkspaceFolder(event.textDocument.uri, workspaceFolders) + if (!workspaceFolder) { + logging.log(`No workspaceFolder found for ${event.textDocument.uri} discarding the save event`) return } - const { workspaceDetails, workspaceRoot } = result + const workspaceId = await workspaceFolderManager.waitForRemoteWorkspaceId() - const fileMetadata = await artifactManager.processNewFile(workspaceRoot, event.textDocument.uri) + const fileMetadata = await artifactManager.processNewFile(workspaceFolder, event.textDocument.uri) const s3Url = await workspaceFolderManager.uploadToS3(fileMetadata) if (!s3Url) { return @@ -278,22 +288,18 @@ export const WorkspaceContextServer = (): Server => features => { const message = JSON.stringify({ method: 'textDocument/didSave', params: { - textDocument: { uri: getRelativePath(fileMetadata.workspaceFolder, event.textDocument.uri) }, + textDocument: { + uri: event.textDocument.uri, + }, workspaceChangeMetadata: { - workspaceId: workspaceDetails.workspaceId, + workspaceId: workspaceId, s3Path: cleanUrl(s3Url), programmingLanguage: programmingLanguage, }, }, }) - if (!workspaceDetails.webSocketClient) { - logging.log(`WebSocket client is not connected yet: ${workspaceRoot.uri}, adding didSave message to queue`) - workspaceDetails.messageQueue?.push(message) - } else { - workspaceDetails.webSocketClient.send(message).catch(error => { - logging.error(`Error while sending didSave message: ${error}`) - }) - } + const workspaceState = workspaceFolderManager.getWorkspaceState() + workspaceState.messageQueue.push(message) }) lsp.workspace.onDidCreateFiles(async event => { @@ -302,13 +308,13 @@ export const WorkspaceContextServer = (): Server => features => { } logging.log(`Received didCreateFiles event of length ${event.files.length}`) + const workspaceState = workspaceFolderManager.getWorkspaceState() for (const file of event.files) { const isDir = isDirectory(file.uri) - const result = workspaceFolderManager.getWorkspaceDetailsWithId(file.uri, workspaceFolders) - if (!result) { + const workspaceFolder = workspaceFolderManager.getWorkspaceFolder(file.uri, workspaceFolders) + if (!workspaceFolder) { continue } - const { workspaceDetails, workspaceRoot } = result let filesMetadata: FileMetadata[] = [] if (isDir && isEmptyDirectory(file.uri)) { @@ -316,9 +322,10 @@ export const WorkspaceContextServer = (): Server => features => { } else if (isDir) { filesMetadata = await artifactManager.addNewDirectories([URI.parse(file.uri)]) } else { - filesMetadata = [await artifactManager.processNewFile(workspaceRoot, file.uri)] + filesMetadata = [await artifactManager.processNewFile(workspaceFolder, file.uri)] } + const workspaceId = await workspaceFolderManager.waitForRemoteWorkspaceId() for (const fileMetadata of filesMetadata) { const s3Url = await workspaceFolderManager.uploadToS3(fileMetadata) if (!s3Url) { @@ -330,26 +337,17 @@ export const WorkspaceContextServer = (): Server => features => { params: { files: [ { - uri: getRelativePath(fileMetadata.workspaceFolder, file.uri), + uri: file.uri, }, ], workspaceChangeMetadata: { - workspaceId: workspaceDetails.workspaceId, + workspaceId: workspaceId, s3Path: cleanUrl(s3Url), programmingLanguage: fileMetadata.language, }, }, }) - if (!workspaceDetails.webSocketClient) { - logging.log( - `WebSocket client is not connected yet: ${workspaceRoot.uri}, adding didCreateFiles message to queue` - ) - workspaceDetails.messageQueue?.push(message) - } else { - workspaceDetails.webSocketClient.send(message).catch(error => { - logging.error(`Error while sending didCreateFiles message: ${error}`) - }) - } + workspaceState.messageQueue.push(message) } } }) @@ -361,20 +359,21 @@ export const WorkspaceContextServer = (): Server => features => { logging.log(`Received didDeleteFiles event of length ${event.files.length}`) + const workspaceState = workspaceFolderManager.getWorkspaceState() for (const file of event.files) { - const result = workspaceFolderManager.getWorkspaceDetailsWithId(file.uri, workspaceFolders) - if (!result) { + const workspaceFolder = workspaceFolderManager.getWorkspaceFolder(file.uri, workspaceFolders) + if (!workspaceFolder) { logging.log(`Workspace details not found for deleted file: ${file.uri}`) continue } - const { workspaceDetails, workspaceRoot } = result - const programmingLanguages = artifactManager.handleDeletedPathAndGetLanguages(file.uri, workspaceRoot) + const programmingLanguages = artifactManager.handleDeletedPathAndGetLanguages(file.uri, workspaceFolder) if (programmingLanguages.length === 0) { logging.log(`No programming languages determined for: ${file.uri}`) continue } + const workspaceId = await workspaceFolderManager.waitForRemoteWorkspaceId() // Send notification for each programming language for (const language of programmingLanguages) { const message = JSON.stringify({ @@ -382,25 +381,16 @@ export const WorkspaceContextServer = (): Server => features => { params: { files: [ { - uri: getRelativePath(workspaceRoot, file.uri), + uri: file.uri, }, ], workspaceChangeMetadata: { - workspaceId: workspaceDetails.workspaceId, + workspaceId: workspaceId, programmingLanguage: language, }, }, }) - if (!workspaceDetails.webSocketClient) { - logging.log( - `WebSocket client is not connected yet: ${workspaceRoot.uri}, adding didDeleteFiles message to queue` - ) - workspaceDetails.messageQueue?.push(message) - } else { - workspaceDetails.webSocketClient.send(message).catch(error => { - logging.error(`Error while sending didDeleteFiles message: ${error}`) - }) - } + workspaceState.messageQueue.push(message) } } }) @@ -412,14 +402,16 @@ export const WorkspaceContextServer = (): Server => features => { logging.log(`Received didRenameFiles event of length ${event.files.length}`) + const workspaceState = workspaceFolderManager.getWorkspaceState() for (const file of event.files) { - const result = workspaceFolderManager.getWorkspaceDetailsWithId(file.newUri, workspaceFolders) - if (!result) { + const workspaceFolder = workspaceFolderManager.getWorkspaceFolder(file.newUri, workspaceFolders) + if (!workspaceFolder) { continue } - const { workspaceDetails, workspaceRoot } = result - const filesMetadata = await artifactManager.handleRename(workspaceRoot, file.oldUri, file.newUri) + const filesMetadata = await artifactManager.handleRename(workspaceFolder, file.oldUri, file.newUri) + + const workspaceId = await workspaceFolderManager.waitForRemoteWorkspaceId() for (const fileMetadata of filesMetadata) { const s3Url = await workspaceFolderManager.uploadToS3(fileMetadata) if (!s3Url) { @@ -430,27 +422,18 @@ export const WorkspaceContextServer = (): Server => features => { params: { files: [ { - old_uri: getRelativePath(fileMetadata.workspaceFolder, file.oldUri), - new_uri: getRelativePath(fileMetadata.workspaceFolder, file.newUri), + old_uri: file.oldUri, + new_uri: file.newUri, }, ], workspaceChangeMetadata: { - workspaceId: workspaceDetails.workspaceId, + workspaceId: workspaceId, s3Path: cleanUrl(s3Url), programmingLanguage: fileMetadata.language, }, }, }) - if (!workspaceDetails.webSocketClient) { - logging.log( - `WebSocket client is not connected yet: ${workspaceRoot.uri}, adding didRenameFiles message to queue` - ) - workspaceDetails.messageQueue?.push(message) - } else { - workspaceDetails.webSocketClient.send(message).catch(error => { - logging.error(`Error while sending didRenameFiles message: ${error}`) - }) - } + workspaceState.messageQueue.push(message) } } }) diff --git a/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/workspaceFolderManager.ts b/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/workspaceFolderManager.ts index acfaf28f4f..9272c4fb76 100644 --- a/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/workspaceFolderManager.ts +++ b/server/aws-lsp-codewhisperer/src/language-server/workspaceContext/workspaceFolderManager.ts @@ -17,13 +17,14 @@ import { } from './util' import { DependencyDiscoverer } from './dependency/dependencyDiscoverer' import { AmazonQTokenServiceManager } from '../../shared/amazonQServiceManager/AmazonQTokenServiceManager' +import { URI } from 'vscode-uri' +import path = require('path') interface WorkspaceState { remoteWorkspaceState: WorkspaceStatus + messageQueue: any[] webSocketClient?: WebSocketClient workspaceId?: string - requiresS3Upload?: boolean - messageQueue?: any[] } type WorkspaceRoot = string @@ -33,14 +34,21 @@ export class WorkspaceFolderManager { private logging: Logging private artifactManager: ArtifactManager private dependencyDiscoverer: DependencyDiscoverer - private workspaceMap: Map private static instance: WorkspaceFolderManager | undefined + private readonly workspaceIdentifier: string + private workspaceState: WorkspaceState + private remoteWorkspaceIdPromise: Promise + private remoteWorkspaceIdResolver!: (id: string) => void + private remoteWorkspaceIdRejecter!: (reason: Error) => void private workspaceFolders: WorkspaceFolder[] private credentialsProvider: CredentialsProvider private readonly INITIAL_CHECK_INTERVAL = 40 * 1000 // 40 seconds - private readonly INITIAL_TIMEOUT = 2 * 60 * 1000 // 2 minutes + private readonly INITIAL_CONNECTION_TIMEOUT = 2 * 60 * 1000 // 2 minutes private readonly CONTINUOUS_MONITOR_INTERVAL = 5 * 60 * 1000 // 5 minutes - private monitorIntervals: Map = new Map() + private readonly MESSAGE_PUBLISH_INTERVAL: number = 100 // 100 milliseconds + private continuousMonitorInterval: NodeJS.Timeout | undefined + private optOutMonitorInterval: NodeJS.Timeout | undefined + private messageQueueConsumerInterval: NodeJS.Timeout | undefined private isOptedOut: boolean = false static createInstance( @@ -49,7 +57,8 @@ export class WorkspaceFolderManager { artifactManager: ArtifactManager, dependencyDiscoverer: DependencyDiscoverer, workspaceFolders: WorkspaceFolder[], - credentialsProvider: CredentialsProvider + credentialsProvider: CredentialsProvider, + workspaceIdentifier: string ): WorkspaceFolderManager { if (!this.instance) { this.instance = new WorkspaceFolderManager( @@ -58,7 +67,8 @@ export class WorkspaceFolderManager { artifactManager, dependencyDiscoverer, workspaceFolders, - credentialsProvider + credentialsProvider, + workspaceIdentifier ) } return this.instance @@ -74,23 +84,24 @@ export class WorkspaceFolderManager { artifactManager: ArtifactManager, dependencyDiscoverer: DependencyDiscoverer, workspaceFolders: WorkspaceFolder[], - credentialsProvider: CredentialsProvider + credentialsProvider: CredentialsProvider, + workspaceIdentifier: string ) { this.serviceManager = serviceManager this.logging = logging this.artifactManager = artifactManager this.dependencyDiscoverer = dependencyDiscoverer - this.workspaceMap = new Map() this.workspaceFolders = workspaceFolders this.credentialsProvider = credentialsProvider + this.workspaceIdentifier = workspaceIdentifier this.dependencyDiscoverer.dependencyHandlerRegistry.forEach(handler => { - handler.onDependencyChange(async (workspaceFolder, zips) => { + handler.onDependencyChange(async (workspaceFolder, zips, addWSFolderPathInS3) => { try { this.logging.log(`Dependency change detected in ${workspaceFolder.uri}`) // Process the dependencies - await this.handleDependencyChanges(zips) + await this.handleDependencyChanges(zips, addWSFolderPathInS3) // Clean up only after successful processing await handler.cleanupZipFiles(zips) @@ -99,6 +110,29 @@ export class WorkspaceFolderManager { } }) }) + + this.remoteWorkspaceIdPromise = new Promise((resolve, reject) => { + this.remoteWorkspaceIdResolver = resolve + this.remoteWorkspaceIdRejecter = reject + }) + this.workspaceState = { + remoteWorkspaceState: 'CREATION_PENDING', + messageQueue: [], + } + + this.messageQueueConsumerInterval = setInterval(() => { + if (this.workspaceState.webSocketClient && this.workspaceState.webSocketClient.isConnected()) { + const message = this.workspaceState.messageQueue[0] + if (message) { + try { + this.workspaceState.webSocketClient.send(message) + this.workspaceState.messageQueue.shift() + } catch (error) { + this.logging.error(`Error sending message: ${error}`) + } + } + } + }, this.MESSAGE_PUBLISH_INTERVAL) } /** @@ -110,57 +144,18 @@ export class WorkspaceFolderManager { this.workspaceFolders = workspaceFolders } - getWorkspaces(): Map { - return this.workspaceMap - } - - getWorkspaceDetailsWithId( - fileUri: string, - workspaceFolders?: WorkspaceFolder[] - ): { workspaceDetails: WorkspaceState; workspaceRoot: WorkspaceFolder } | null { - const workspaceRoot = findWorkspaceRootFolder(fileUri, workspaceFolders ?? this.workspaceFolders) - if (!workspaceRoot) { - return null - } - - const workspaceDetails = this.getWorkspaces().get(workspaceRoot.uri) - if (!workspaceDetails) { - this.logging.log(`Workspace details not found for workspace folder ${workspaceRoot.uri}`) - return null - } - - if (!workspaceDetails.workspaceId) { - this.logging.log( - `Workspace initialization in progress - workspaceId not yet assigned for: ${workspaceRoot.uri}` - ) - return { workspaceDetails, workspaceRoot } - } - - return { workspaceDetails, workspaceRoot } - } - - getWorkspaceFolder(fileUri: string): WorkspaceFolder | undefined { - return findWorkspaceRootFolder(fileUri, this.workspaceFolders) - } - - getWorkspaceId(workspaceFolder?: WorkspaceFolder): string | undefined { - if (!workspaceFolder) { - return undefined - } - const workspaceDetails = this.getWorkspaces().get(workspaceFolder.uri) - if (!workspaceDetails || !workspaceDetails.workspaceId) { - this.logging.log( - `Unable to retrieve workspaceId - workspace initialization incomplete for: ${workspaceFolder.uri}` - ) - return undefined - } - return workspaceDetails.workspaceId + getWorkspaceFolder(fileUri: string, workspaceFolders?: WorkspaceFolder[]): WorkspaceFolder | undefined { + return findWorkspaceRootFolder(fileUri, workspaceFolders ?? this.workspaceFolders) } getOptOutStatus(): boolean { return this.isOptedOut } + getWorkspaceState(): WorkspaceState { + return this.workspaceState + } + async processNewWorkspaceFolders(folders: WorkspaceFolder[]) { // Check if user is opted in before trying to process any files const { optOut } = await this.listWorkspaceMetadata() @@ -172,53 +167,37 @@ export class WorkspaceFolderManager { return } - // CreateWorkspace and Setup state machine workflow - for (const folder of folders) { - await this.handleNewWorkspace(folder.uri).catch(e => { - this.logging.warn(`Error processing new workspace ${folder.uri} with error: ${e}`) - }) - } - // Snapshot the workspace - this.snapshotWorkspace(folders).catch(e => { - this.logging.warn(`Error during snapshot workspace: ${e}`) + // Wait for remote workspace id + await this.waitForRemoteWorkspaceId() + + // Sync workspace source codes + await this.syncSourceCodesToS3(folders).catch(e => { + this.logging.warn(`Error during syncing workspace source codes: ${e}`) }) - } - private async snapshotWorkspace(folders: WorkspaceFolder[]) { - let sourceCodeMetadata: FileMetadata[] = [] - sourceCodeMetadata = await this.artifactManager.addWorkspaceFolders(folders) - // Kick off dependency discovery but don't wait + // Kick off dependency discovery but don't wait this.dependencyDiscoverer.searchDependencies(folders).catch(e => { this.logging.warn(`Error during dependency discovery: ${e}`) }) + } - const fileMetadataMap: Map = new Map() - sourceCodeMetadata.forEach((fileMetadata: FileMetadata) => { - let metadata = fileMetadataMap.get(fileMetadata.workspaceFolder.uri) - if (!metadata) { - metadata = [] - fileMetadataMap.set(fileMetadata.workspaceFolder.uri, metadata) - } - metadata.push(fileMetadata) - }) + private async syncSourceCodesToS3(folders: WorkspaceFolder[]) { + let sourceCodeMetadata: FileMetadata[] = [] + sourceCodeMetadata = await this.artifactManager.addWorkspaceFolders(folders) - folders.forEach(folder => { - const workspaceDetails = this.getWorkspaces().get(folder.uri) - if (workspaceDetails) { - workspaceDetails.requiresS3Upload = true - } - }) - await this.uploadWithTimeout(fileMetadataMap) + await this.uploadS3AndQueueEvents(sourceCodeMetadata) + this.artifactManager.cleanup(true, folders) } - async uploadToS3(fileMetadata: FileMetadata): Promise { + async uploadToS3(fileMetadata: FileMetadata, addWSFolderPathInS3: boolean = true): Promise { let relativePath = fileMetadata.relativePath.replace(fileMetadata.workspaceFolder.name, '') relativePath = relativePath.startsWith('/') ? relativePath.slice(1) : relativePath - const workspaceId = this.getWorkspaces().get(fileMetadata.workspaceFolder.uri)?.workspaceId ?? '' + if (addWSFolderPathInS3) { + relativePath = path.join(URI.parse(fileMetadata.workspaceFolder.uri).path.slice(1), relativePath) + } + const workspaceId = this.workspaceState.workspaceId if (!workspaceId) { - this.logging.warn( - `Workspace ID is not found for folder ${fileMetadata.workspaceFolder.uri}, skipping S3 upload` - ) + this.logging.warn(`Workspace ID is not found, skipping S3 upload`) return } @@ -254,15 +233,9 @@ export class WorkspaceFolderManager { } async clearAllWorkspaceResources() { - for (const workspace of this.monitorIntervals.keys()) { - this.stopMonitoring(workspace) - } - - for (const { webSocketClient } of this.workspaceMap.values()) { - webSocketClient?.destroyClient() - } - - this.workspaceMap.clear() + this.stopContinuousMonitoring() + this.resetRemoteWorkspaceId() + this.workspaceState.webSocketClient?.destroyClient() this.artifactManager.cleanup() this.dependencyDiscoverer.dispose() } @@ -273,93 +246,42 @@ export class WorkspaceFolderManager { * @param workspaceFolder */ async processWorkspaceFoldersDeletion(workspaceFolders: WorkspaceFolder[]) { + const workspaceId = await this.waitForRemoteWorkspaceId() for (const folder of workspaceFolders) { - const workspaceDetails = this.workspaceMap.get(folder.uri) - const websocketClient = workspaceDetails?.webSocketClient - const languagesMap = this.artifactManager.getLanguagesForWorkspaceFolder(folder) const programmingLanguages = languagesMap ? Array.from(languagesMap.keys()) : [] - if (websocketClient) { - for (const language of programmingLanguages) { - // Wait for message being sent before disconnecting - await websocketClient - .send( - JSON.stringify({ - method: 'workspace/didChangeWorkspaceFolders', - params: { - workspaceFoldersChangeEvent: { - added: [], - removed: [ - { - uri: '/', - name: folder.name, - }, - ], - }, - workspaceChangeMetadata: { - workspaceId: this.getWorkspaces().get(folder.uri)?.workspaceId ?? '', - programmingLanguage: language, - }, + for (const language of programmingLanguages) { + const message = JSON.stringify({ + method: 'workspace/didChangeWorkspaceFolders', + params: { + workspaceFoldersChangeEvent: { + added: [], + removed: [ + { + uri: folder.uri, + name: folder.name, }, - }) - ) - .catch(e => { - this.logging.error(`Error sending didChangeWorkspaceFolders message: ${e}`) - }) - } - websocketClient.disconnect() + ], + }, + workspaceChangeMetadata: { + workspaceId: workspaceId, + programmingLanguage: language, + }, + }, + }) + this.workspaceState.messageQueue.push(message) } - this.removeWorkspaceEntry(folder.uri) this.dependencyDiscoverer.disposeWorkspaceFolder(folder) - this.stopMonitoring(folder.uri) } await this.artifactManager.removeWorkspaceFolders(workspaceFolders) } - processRemoteWorkspaceRefresh(workspaceFolders: WorkspaceFolder[]) { - for (const folder of workspaceFolders) { - const workspaceDetails = this.workspaceMap.get(folder.uri) - const websocketClient = workspaceDetails?.webSocketClient - if (websocketClient) { - websocketClient.destroyClient() - } - this.removeWorkspaceEntry(folder.uri) - - this.dependencyDiscoverer.disposeWorkspaceFolder(folder) - } - } - - private updateWorkspaceEntry(workspaceRoot: WorkspaceRoot, workspaceState: WorkspaceState) { - if (!workspaceState.messageQueue) { - workspaceState.messageQueue = [] - } - - if (!this.workspaceMap.has(workspaceRoot)) { - workspaceState.requiresS3Upload = true - this.workspaceMap.set(workspaceRoot, workspaceState) - } else { - const existingWorkspaceState = this.workspaceMap.get(workspaceRoot) - if (existingWorkspaceState) { - existingWorkspaceState.remoteWorkspaceState = - workspaceState.remoteWorkspaceState ?? existingWorkspaceState.remoteWorkspaceState - existingWorkspaceState.webSocketClient = - workspaceState.webSocketClient ?? existingWorkspaceState.webSocketClient - existingWorkspaceState.workspaceId = workspaceState.workspaceId ?? existingWorkspaceState.workspaceId - existingWorkspaceState.messageQueue = workspaceState.messageQueue ?? existingWorkspaceState.messageQueue - } - } - } - - private removeWorkspaceEntry(workspaceRoot: WorkspaceRoot) { - this.workspaceMap.delete(workspaceRoot) - } - - private async handleDependencyChanges(zips: FileMetadata[]): Promise { + private async handleDependencyChanges(zips: FileMetadata[], addWSFolderPathInS3: boolean): Promise { this.logging.log(`Processing ${zips.length} dependency changes`) for (const zip of zips) { try { - const s3Url = await this.uploadToS3(zip) + const s3Url = await this.uploadToS3(zip, addWSFolderPathInS3) if (!s3Url) { continue } @@ -371,55 +293,22 @@ export class WorkspaceFolderManager { } private notifyDependencyChange(fileMetadata: FileMetadata, s3Url: string) { - const workspaceDetails = this.getWorkspaces().get(fileMetadata.workspaceFolder.uri) - - if (!workspaceDetails) { - return - } - const message = JSON.stringify({ method: 'didChangeDependencyPaths', params: { event: { paths: [] }, workspaceChangeMetadata: { - workspaceId: workspaceDetails.workspaceId, + workspaceId: this.workspaceState.workspaceId, s3Path: cleanUrl(s3Url), programmingLanguage: fileMetadata.language, }, }, }) - if (!workspaceDetails.webSocketClient) { - this.logging.log( - `WebSocket client is not connected yet: ${fileMetadata.workspaceFolder.uri} adding didChangeDependencyPaths message to queue` - ) - workspaceDetails.messageQueue?.push(message) - } else { - workspaceDetails.webSocketClient.send(message).catch(e => { - this.logging.error(`Error sending didChangeDependencyPaths message: ${e}`) - }) - } - } - - private async createNewWorkspace(workspace: WorkspaceRoot) { - const createWorkspaceResult = await this.createWorkspace(workspace) - const workspaceDetails = createWorkspaceResult.response - if (!workspaceDetails) { - this.logging.warn(`Failed to create remote workspace for ${workspace}`) - return createWorkspaceResult - } - - this.updateWorkspaceEntry(workspace, { - remoteWorkspaceState: workspaceDetails.workspace.workspaceStatus, - workspaceId: workspaceDetails.workspace.workspaceId, - }) - - return createWorkspaceResult + this.workspaceState.messageQueue.push(message) } - private async establishConnection(workspace: WorkspaceRoot, existingMetadata: WorkspaceMetadata) { - const existingState = this.workspaceMap.get(workspace) - + private async establishConnection(existingMetadata: WorkspaceMetadata) { if (!existingMetadata.environmentId) { throw new Error('No environment ID found for ready workspace') } @@ -430,91 +319,58 @@ export class WorkspaceFolderManager { const websocketUrl = existingMetadata.environmentAddress this.logging.log(`Establishing connection to ${websocketUrl}`) - if (existingState?.webSocketClient) { - const websocketConnectionState = existingState.webSocketClient.getWebsocketReadyState() + if (this.workspaceState.webSocketClient) { + const websocketConnectionState = this.workspaceState.webSocketClient.getWebsocketReadyState() if (websocketConnectionState === 'OPEN') { - this.logging.log(`Active connection already exists for ${workspace}`) + this.logging.log(`Active websocket connection already exists.}`) return } // If the client exists but isn't connected, it might be in the process of connecting if (websocketConnectionState === 'CONNECTING') { - this.logging.log(`Connection attempt already in progress for ${workspace}`) + this.logging.log(`Connection attempt already in progress.`) return } } const webSocketClient = new WebSocketClient(websocketUrl, this.logging, this.credentialsProvider) - this.updateWorkspaceEntry(workspace, { - remoteWorkspaceState: 'CONNECTED', - webSocketClient, - messageQueue: existingState?.messageQueue || [], - }) - - await this.processMessagesInQueue(workspace) + this.workspaceState.remoteWorkspaceState = 'CONNECTED' + this.workspaceState.webSocketClient = webSocketClient } - private async handleNewWorkspace(workspace: WorkspaceRoot, queueEvents?: any[]) { - this.logging.log(`Processing new workspace ${workspace}`) - - // First check if the workspace already exists in the workspace map - const existingWorkspace = this.workspaceMap.get(workspace) - if (existingWorkspace) { - this.logging.log(`Workspace ${workspace} already exists in memory`) - return - } - - // Check if workspace exists remotely - const { metadata, optOut } = await this.listWorkspaceMetadata(workspace) + async initializeWorkspaceStatusMonitor() { + this.logging.log(`Initializing workspace status check for workspace [${this.workspaceIdentifier}]`) + // Perform a one-time checkRemoteWorkspaceStatusAndReact first + // Pass skipUploads as true since it would be handled by processNewWorkspaceFolders + await this.checkRemoteWorkspaceStatusAndReact(true) - if (optOut) { - this.logging.log(`Not creating a new workspace ${workspace}, user is opted out`) - this.isOptedOut = true - await this.clearAllWorkspaceResources() - await this.startOptOutMonitor() - return - } - - if (metadata) { - // Workspace exists remotely, add to map with current state - this.updateWorkspaceEntry(workspace, { - workspaceId: metadata.workspaceId, - remoteWorkspaceState: metadata.workspaceStatus, - messageQueue: queueEvents ?? [], - }) - // We don't attempt a connection here even if remote workspace is ready and leave the connection attempt to the workspace status monitor - } else { - // Create new workspace - const createWorkspaceResult = await this.createNewWorkspace(workspace) - if (createWorkspaceResult.error && createWorkspaceResult.error.retryable) { - this.logging.log(`Workspace creation failed with retryable error, starting monitor`) - // todo, consider whether we should add the failed env to the map or not and what to do in the create failure case - this.updateWorkspaceEntry(workspace, { - remoteWorkspaceState: 'CREATION_PENDING', - messageQueue: queueEvents ?? [], - }) - } + // Set up continuous monitoring which periodically invokes checkRemoteWorkspaceStatusAndReact + if (!this.isOptedOut) { + this.logging.log(`Starting continuous monitor for workspace [${this.workspaceIdentifier}]`) + const intervalId = setInterval(async () => { + try { + await this.checkRemoteWorkspaceStatusAndReact() + } catch (error) { + this.logging.error(`Error monitoring workspace status: ${error}`) + } + }, this.CONTINUOUS_MONITOR_INTERVAL) + this.continuousMonitorInterval = intervalId } - - this.startWorkspaceStatusMonitor(workspace).catch(error => { - this.logging.error(`Error starting workspace monitor for ${workspace}: ${error}`) - }) } - private async waitForInitialConnection(workspace: WorkspaceRoot): Promise { - this.logging.log(`Waiting for initial connection to ${workspace}`) + private async waitForInitialConnection(): Promise { + this.logging.log(`Waiting for initial connection to remote workspace`) return new Promise(resolve => { const startTime = Date.now() const intervalId = setInterval(async () => { try { - const workspaceState = this.workspaceMap.get(workspace) - if (!workspaceState) { - this.logging.log(`Workspace ${workspace} no longer exists, stopping monitors for workspace`) + if (Date.now() - startTime >= this.INITIAL_CONNECTION_TIMEOUT) { + this.logging.warn(`Initial connection timeout.`) clearInterval(intervalId) return resolve(false) } - const { metadata, optOut } = await this.listWorkspaceMetadata(workspace) + const { metadata, optOut } = await this.listWorkspaceMetadata(this.workspaceIdentifier) if (optOut) { this.logging.log(`User opted out during initial connection`) @@ -529,16 +385,13 @@ export class WorkspaceFolderManager { return } - this.updateWorkspaceEntry(workspace, { - ...workspaceState, - remoteWorkspaceState: metadata.workspaceStatus, - }) + this.workspaceState.remoteWorkspaceState = metadata.workspaceStatus switch (metadata.workspaceStatus) { case 'READY': - const client = workspaceState.webSocketClient + const client = this.workspaceState.webSocketClient if (!client || !client.isConnected()) { - await this.establishConnection(workspace, metadata) + await this.establishConnection(metadata) } clearInterval(intervalId) return resolve(true) @@ -546,26 +399,17 @@ export class WorkspaceFolderManager { // Continue polling break case 'CREATED': - const createWorkspaceResult = await this.createNewWorkspace(workspace) - // If createWorkspace call returns a retyrable error, next interval will retry - // If the call returns non-retryable error, we will re-throw the error and it will stop the interval - if (createWorkspaceResult.error && !createWorkspaceResult.error.retryable) { - throw createWorkspaceResult.error.originalError - } - break + clearInterval(intervalId) + return resolve(false) default: this.logging.warn(`Unknown workspace status: ${metadata.workspaceStatus}`) clearInterval(intervalId) return resolve(false) } - - if (Date.now() - startTime >= this.INITIAL_TIMEOUT) { - this.logging.warn(`Initial connection timeout for workspace ${workspace}`) - clearInterval(intervalId) - return resolve(false) - } } catch (error: any) { - this.logging.error(`Error during initial connection for workspace ${workspace}: ${error}`) + this.logging.error( + `Error during initializing connection for workspace [${this.workspaceIdentifier}]: ${error}` + ) clearInterval(intervalId) return resolve(false) } @@ -573,121 +417,128 @@ export class WorkspaceFolderManager { }) } - private startContinuousMonitor(workspace: WorkspaceRoot) { - this.logging.log(`Starting continuous monitor for workspace ${workspace}`) - const intervalId = setInterval(async () => { - try { - const workspaceState = this.workspaceMap.get(workspace) - if (!workspaceState) { - // Previously we stop monitoring the workspace if it no longer exists - // But now we want to try give it a chance to re-create the workspace - // this.stopMonitoring(workspace) - return - } + private async checkRemoteWorkspaceStatusAndReact(skipUploads: boolean = false) { + this.logging.log(`Checking remote workspace status for workspace [${this.workspaceIdentifier}]`) + const { metadata, optOut } = await this.listWorkspaceMetadata(this.workspaceIdentifier) + + if (optOut) { + this.logging.log('User opted out, clearing all resources and starting opt-out monitor') + this.isOptedOut = true + await this.clearAllWorkspaceResources() + await this.startOptOutMonitor() + return + } - const { metadata, optOut } = await this.listWorkspaceMetadata(workspace) + if (!metadata) { + // Workspace no longer exists, Recreate it. + this.resetRemoteWorkspaceId() // workspaceId would change if remote record is gone + await this.handleWorkspaceCreatedState(skipUploads) + return + } - if (optOut) { - this.logging.log('User opted out, clearing all resources and starting opt-out monitor') - this.isOptedOut = true - await this.clearAllWorkspaceResources() - await this.startOptOutMonitor() - return - } + this.workspaceState.remoteWorkspaceState = metadata.workspaceStatus + if (this.workspaceState.workspaceId === undefined) { + this.workspaceState.workspaceId = metadata.workspaceId + this.remoteWorkspaceIdResolver(this.workspaceState.workspaceId) + } - if (!metadata) { - // Workspace no longer exists, Recreate it. - await this.handleWorkspaceCreatedState(workspace) - return + switch (metadata.workspaceStatus) { + case 'READY': + // Check if connection exists + const client = this.workspaceState.webSocketClient + if (!client || !client.isConnected()) { + this.logging.log( + `Workspace is ready but no connection exists or connection lost. Re-establishing connection...` + ) + await this.establishConnection(metadata) } + break + case 'PENDING': + // Schedule an initial connection when pending + await this.waitForInitialConnection() + break + case 'CREATED': + // Workspace has no environment, Recreate it. + await this.handleWorkspaceCreatedState(skipUploads) + break + default: + this.logging.warn(`Unknown workspace status: ${metadata.workspaceStatus}`) + } + } - this.updateWorkspaceEntry(workspace, { - ...workspaceState, - remoteWorkspaceState: metadata.workspaceStatus, - }) + async waitForRemoteWorkspaceId(): Promise { + // If workspaceId is already set, return it immediately + if (this.workspaceState.workspaceId) { + return this.workspaceState.workspaceId + } - switch (metadata.workspaceStatus) { - case 'READY': - // Check if connection exists - const client = workspaceState.webSocketClient - if (!client || !client.isConnected()) { - this.logging.log( - `Workspace ${workspace} is ready but no connection exists or connection lost. Re-establishing connection...` - ) - await this.establishConnection(workspace, metadata) - } - break - case 'PENDING': - // Do nothing while pending - break - case 'CREATED': - // Workspace has no environment, Recreate it. - await this.handleWorkspaceCreatedState(workspace) - break - default: - this.logging.warn(`Unknown workspace status: ${metadata.workspaceStatus}`) - } - } catch (error) { - this.logging.error(`Error monitoring workspace ${workspace}: ${error}`) - } - }, this.CONTINUOUS_MONITOR_INTERVAL) + // Otherwise, wait for the promise to resolve or catch the rejection and retry + try { + return await this.remoteWorkspaceIdPromise + } catch (error) { + this.logging.log(`Waiting for a new remote workspaceId`) + return this.waitForRemoteWorkspaceId() + } + } + + private resetRemoteWorkspaceId() { + this.workspaceState.workspaceId = undefined - this.monitorIntervals.set(workspace, intervalId) + // Store the old rejecter + const oldRejecter = this.remoteWorkspaceIdRejecter + + // Create new promise first + this.remoteWorkspaceIdPromise = new Promise((resolve, reject) => { + this.remoteWorkspaceIdResolver = resolve + this.remoteWorkspaceIdRejecter = reject + }) + + // Then reject the old promise if it exists + if (oldRejecter) { + oldRejecter(new Error('Remote workspaceId reset requested')) + } } private async startOptOutMonitor() { - const intervalId = setInterval(async () => { - try { - const { optOut } = await this.listWorkspaceMetadata() + if (this.optOutMonitorInterval === undefined) { + const intervalId = setInterval(async () => { + try { + const { optOut } = await this.listWorkspaceMetadata() - if (!optOut) { - this.isOptedOut = false - this.logging.log('User opted back in, stopping opt-out monitor and re-initializing workspace') - clearInterval(intervalId) - // Process all workspace folders - await this.processNewWorkspaceFolders(this.workspaceFolders) + if (!optOut) { + this.isOptedOut = false + this.logging.log('User opted back in, stopping opt-out monitor and re-initializing workspace') + clearInterval(intervalId) + this.optOutMonitorInterval = undefined + await this.initializeWorkspaceStatusMonitor() + } + } catch (error) { + this.logging.error(`Error in opt-out monitor: ${error}`) } - } catch (error) { - this.logging.error(`Error in opt-out monitor: ${error}`) - } - }, this.CONTINUOUS_MONITOR_INTERVAL) + }, this.CONTINUOUS_MONITOR_INTERVAL) + this.optOutMonitorInterval = intervalId + } } - /** - * Handles the workspace creation flow when a remote workspace is in CREATED state. - * Attempts to create the workspace and schedules a quick connection check on success. - * If the initial creation fails with a retryable error, attempts one retry before - * falling back to the regular polling cycle. - * - * The flow is: - * 1. Attempt initial workspace creation - * 2. On success: Schedule a quick check to establish connection - * 3. On retryable error: Attempt one immediate retry - * 4. On retry success: Schedule a quick check - * 5. On retry failure: Wait for next regular polling cycle - * - * @param workspace - The workspace to re-create - */ - private async handleWorkspaceCreatedState(workspace: WorkspaceRoot): Promise { + private async handleWorkspaceCreatedState(skipUploads: boolean = false): Promise { + this.logging.log(`No READY / PENDING remote workspace found, creating a new one`) // If remote state is CREATED, call create API to create a new workspace - // snapshot the workspace for the new environment - // TODO: this workspace root in the below snapshot function needs to be changes - // after we consolidate workspaceFolder into one Workspace. - const folder = this.getWorkspaceFolder(workspace) - if (folder) { - this.processRemoteWorkspaceRefresh([folder]) - } - const initialResult = await this.createNewWorkspace(workspace) - if (folder) { - this.snapshotWorkspace([folder]).catch(e => { - this.logging.warn(`Error during snapshot workspace: ${e}`) - }) + if (this.workspaceState.webSocketClient) { + this.workspaceState.webSocketClient.destroyClient() + this.workspaceState.webSocketClient = undefined } + const initialResult = await this.createNewWorkspace() - // If creation succeeds, schedule a single connection attempt to happen in 30 seconds + // If creation succeeds, establish connection if (initialResult.response) { - this.logging.log(`Workspace ${workspace} created successfully, scheduling quick check for connection`) - this.scheduleQuickCheck(workspace) + this.logging.log(`Workspace [${this.workspaceIdentifier}] created successfully, establishing connection`) + await this.waitForInitialConnection() + if (!skipUploads) { + await this.syncSourceCodesToS3(this.workspaceFolders) + this.dependencyDiscoverer.reSyncDependenciesToS3(this.workspaceFolders).catch(e => { + this.logging.warn(`Error during re-syncing dependencies: ${e}`) + }) + } return } @@ -697,98 +548,50 @@ export class WorkspaceFolderManager { return } - this.logging.warn( - `Retryable error for workspace ${workspace} creation: ${initialResult.error}. Attempting single retry...` - ) - const retryResult = await this.createNewWorkspace(workspace) + this.logging.warn(`Retryable error for workspace creation: ${initialResult.error}. Attempting single retry...`) + const retryResult = await this.createNewWorkspace() + // If re-creation fails, wait for the next polling cycle if (retryResult.error) { this.logging.warn( - `Retry failed for workspace ${workspace}: ${retryResult.error}. Will wait for next polling cycle` + `Workspace creation retry failed: ${retryResult.error}. Will wait for the next polling cycle` ) return } - this.logging.log(`Retry succeeded for workspace ${workspace}, scheduling quick check for connection`) - this.scheduleQuickCheck(workspace) - } - - /** - * Schedules a one-time check after workspace creation to establish connection as soon as possible. - * This avoids waiting for the regular 5-minute polling interval when we know the workspace - * should be ready soon. Default check delay is 40 seconds after successful workspace creation. - * - * @param workspace - The workspace to check - * @param delayMs - Optional delay in milliseconds before the check (defaults to 40 seconds) - */ - private scheduleQuickCheck(workspace: WorkspaceRoot, delayMs: number = this.INITIAL_CHECK_INTERVAL) { - this.logging.log(`Scheduling quick check for workspace ${workspace} in ${delayMs}ms`) - setTimeout(async () => { - try { - const workspaceState = this.workspaceMap.get(workspace) - if (!workspaceState) { - this.logging.log(`Workspace state not found for: ${workspace} during quick check`) - return - } - - const { metadata, optOut } = await this.listWorkspaceMetadata(workspace) - - if (optOut) { - this.logging.log(`User is opted out during quick check`) - this.isOptedOut = true - await this.clearAllWorkspaceResources() - await this.startOptOutMonitor() - return - } - - if (!metadata) { - this.logging.log(`No metadata available for workspace: ${workspace} during quick check`) - return - } - - if (metadata.workspaceStatus === 'READY') { - this.logging.log(`Quick check found workspace ${workspace} is ready, attempting connection`) - await this.establishConnection(workspace, metadata) - } else { - this.logging.log(`Quick check found workspace ${workspace} state is ${metadata.workspaceStatus}`) - } - } catch (error) { - this.logging.error(`Error during quick check for workspace ${workspace}: ${error}`) - } - }, delayMs) + this.logging.log(`Retry succeeded for workspace creation, establishing connection`) + await this.waitForInitialConnection() + if (!skipUploads) { + await this.syncSourceCodesToS3(this.workspaceFolders) + this.dependencyDiscoverer.reSyncDependenciesToS3(this.workspaceFolders).catch(e => { + this.logging.warn(`Error during re-syncing dependencies: ${e}`) + }) + } } - private stopMonitoring(workspace: WorkspaceRoot) { - this.logging.log(`Stopping monitoring for workspace ${workspace}`) - const intervalId = this.monitorIntervals.get(workspace) - if (intervalId) { - clearInterval(intervalId) - this.monitorIntervals.delete(workspace) + private stopContinuousMonitoring() { + this.logging.log(`Stopping monitoring for workspace [${this.workspaceIdentifier}]`) + if (this.continuousMonitorInterval) { + clearInterval(this.continuousMonitorInterval) + this.continuousMonitorInterval = undefined } } - private async startWorkspaceStatusMonitor(workspace: WorkspaceRoot) { - const success = await this.waitForInitialConnection(workspace) - this.logging.log( - `Initial connection ${success ? 'successful' : 'failed'} for ${workspace}, starting continuous monitor` - ) - if (!success && this.isOptedOut) { - // If initial connection fails due to opt out, do not start the continuous monitor - // The opt-out monitor will already be started - return + private async createNewWorkspace() { + const createWorkspaceResult = await this.createWorkspace(this.workspaceIdentifier) + const workspaceDetails = createWorkspaceResult.response + if (!workspaceDetails) { + this.logging.warn(`Failed to create remote workspace for [${this.workspaceIdentifier}]`) + return createWorkspaceResult } - this.startContinuousMonitor(workspace) - } - // could this cause messages to be lost?????? - private async processMessagesInQueue(workspaceRoot: WorkspaceRoot) { - const workspaceDetails = this.workspaceMap.get(workspaceRoot) - while (workspaceDetails?.messageQueue && workspaceDetails.messageQueue.length > 0) { - const message = workspaceDetails.messageQueue.shift() - await workspaceDetails.webSocketClient?.send(message).catch(error => { - this.logging.error(`Error sending message: ${error}`) - }) + this.workspaceState.remoteWorkspaceState = workspaceDetails.workspace.workspaceStatus + if (this.workspaceState.workspaceId === undefined) { + this.workspaceState.workspaceId = workspaceDetails.workspace.workspaceId + this.remoteWorkspaceIdResolver(this.workspaceState.workspaceId) } + + return createWorkspaceResult } /** @@ -800,50 +603,41 @@ export class WorkspaceFolderManager { if (filesMetadata.length == 0) { return } - const inMemoryQueueEvents: any[] = [] for (const fileMetadata of filesMetadata) { try { const s3Url = await this.uploadToS3(fileMetadata) if (!s3Url) { this.logging.warn( - `Failed to get S3 URL for file in workspace: ${fileMetadata.workspaceFolder.name}` + `Failed to get S3 URL for file in workspaceFolder: ${fileMetadata.workspaceFolder.name}` ) continue } this.logging.log( - `Successfully uploaded to S3: workspace=${fileMetadata.workspaceFolder.name} language=${fileMetadata.language}` + `Successfully uploaded to S3: workspaceFolder=${fileMetadata.workspaceFolder.name} language=${fileMetadata.language}` ) - const workspaceId = this.getWorkspaces().get(fileMetadata.workspaceFolder.uri)?.workspaceId - - if (!workspaceId) { - this.logging.warn(`No workspace ID found for URI: ${fileMetadata.workspaceFolder.uri}`) - } - const event = JSON.stringify({ method: 'workspace/didChangeWorkspaceFolders', params: { workspaceFoldersChangeEvent: { added: [ { - uri: '/', + uri: fileMetadata.workspaceFolder.uri, name: fileMetadata.workspaceFolder.name, }, ], removed: [], }, workspaceChangeMetadata: { - workspaceId: workspaceId ?? '', + workspaceId: this.workspaceState.workspaceId, s3Path: cleanUrl(s3Url), programmingLanguage: fileMetadata.language, }, }, }) - - // We add this event to the front of the queue here to prevent any race condition that might put events before the didChangeWorkspaceFolders event - inMemoryQueueEvents.unshift(event) + this.workspaceState.messageQueue.push(event) this.logging.log(`Added didChangeWorkspaceFolders event to queue`) } catch (error) { this.logging.error( @@ -851,79 +645,6 @@ export class WorkspaceFolderManager { ) } } - - try { - const workspaceDetails = this.getWorkspaces().get(filesMetadata[0].workspaceFolder.uri) - - if (!workspaceDetails) { - this.logging.error(`No workspace details found for URI: ${filesMetadata[0].workspaceFolder.uri}`) - return - } - - if (workspaceDetails.webSocketClient) { - inMemoryQueueEvents.forEach((event, index) => { - try { - workspaceDetails.webSocketClient?.send(event).catch(error => { - this.logging.error( - `Error sending event: ${error instanceof Error ? error.message : 'Unknown error'}, eventIndex=${index}` - ) - }) - this.logging.log(`Successfully sent event ${index + 1}/${inMemoryQueueEvents.length}`) - } catch (error) { - this.logging.error( - `Failed to send event via WebSocket:${error instanceof Error ? error.message : 'Unknown error'}, eventIndex=${index}` - ) - } - }) - } else { - if (workspaceDetails.messageQueue) { - workspaceDetails.messageQueue.push(...inMemoryQueueEvents) - this.logging.log(`Added ${inMemoryQueueEvents.length} events to message queue`) - } else { - this.logging.warn('No message queue available to store events') - } - } - } catch (error) { - this.logging.error(`Error in final processing: ${error instanceof Error ? error.message : 'Unknown error'}`) - } - - this.logging.log(`Completed processing ${inMemoryQueueEvents.length} queued WebSocket events`) - } - - private async uploadWithTimeout(fileMetadataMap: Map) { - const keys = [...fileMetadataMap.keys()] - const totalWorkspaces = keys.length - let workspacesWithS3UploadComplete = 0 - - for (const key of keys) { - const workspaceDetails = this.getWorkspaces().get(key) - if (!workspaceDetails) { - continue - } - - if (workspaceDetails.workspaceId && workspaceDetails.requiresS3Upload) { - this.logging.log(`Starting S3 upload for ${key}, workspace id: ${workspaceDetails.workspaceId}`) - await this.uploadS3AndQueueEvents(fileMetadataMap.get(key) ?? []) - workspaceDetails.requiresS3Upload = false - } - // This if condition needs to be separate because workspacesWithS3UploadComplete variable is set to 0 every time this function is called - // If this function is run once and uploads some of the workspace folders, we need to ensure we don't forget about already uploaded folders the next time the function is run - if (workspaceDetails.workspaceId) { - workspacesWithS3UploadComplete++ - } - } - - if (totalWorkspaces !== workspacesWithS3UploadComplete) { - // Schedule next check if not all workspaces are complete - // Notice that we don't await the uploadWithTimeout now, it is fire and forget at the moment - setTimeout(() => this.uploadWithTimeout(fileMetadataMap), 3000) - } else { - this.logging.log(`All workspaces with S3 upload complete`) - // Clean up source code zip files after S3 upload - // Preserve dependencies because they might still be processing - // LanguageDependencyHandler is responsible for deleting dependency zips - this.artifactManager.cleanup(true) - } } // TODO, this function is unused at the moment