Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,13 @@ const IGNORE_PATTERNS = [
'**/target/**', // Maven/Gradle builds
]

const MAX_UNCOMPRESSED_SRC_SIZE_MB = 250 // 250 MB limit per language per workspace folder
const MAX_UNCOMPRESSED_SRC_SIZE_BYTES = MAX_UNCOMPRESSED_SRC_SIZE_MB * 1024 * 1024 // Convert to bytes
interface FileSizeDetails {
includedFileCount: number
includedSize: number
skippedFileCount: number
skippedSize: number
}
const MAX_UNCOMPRESSED_SRC_SIZE_BYTES = 2 * 1024 * 1024 * 1024 // 2 GB

export class ArtifactManager {
private workspace: Workspace
Expand Down Expand Up @@ -81,6 +86,12 @@ export class ArtifactManager {

async addNewDirectories(newDirectories: URI[]): Promise<FileMetadata[]> {
let zipFileMetadata: FileMetadata[] = []
const fileSizeDetails: FileSizeDetails = {
includedFileCount: 0,
includedSize: 0,
skippedFileCount: 0,
skippedSize: 0,
}

for (const directory of newDirectories) {
const workspaceFolder = this.workspaceFolders.find(ws => directory.path.startsWith(URI.parse(ws.uri).path))
Expand All @@ -95,7 +106,12 @@ export class ArtifactManager {
const relativePath = path.relative(workspacePath, directory.path)

const filesByLanguage = await this.processDirectory(workspaceFolder, directory.path, relativePath)
zipFileMetadata = await this.processFilesByLanguage(workspaceFolder, filesByLanguage, relativePath)
zipFileMetadata = await this.processFilesByLanguage(
workspaceFolder,
fileSizeDetails,
filesByLanguage,
relativePath
)
} catch (error) {
this.logging.warn(`Error processing new directory ${directory.path}: ${error}`)
}
Expand Down Expand Up @@ -286,9 +302,12 @@ export class ArtifactManager {
return filesMetadata
}

cleanup(preserveDependencies: boolean = false) {
cleanup(preserveDependencies: boolean = false, workspaceFolders?: WorkspaceFolder[]) {
try {
this.workspaceFolders.forEach(workspaceToRemove => {
if (workspaceFolders === undefined) {
workspaceFolders = this.workspaceFolders
}
workspaceFolders.forEach(workspaceToRemove => {
const workspaceDirPath = path.join(this.tempDirPath, workspaceToRemove.name)

if (preserveDependencies) {
Expand Down Expand Up @@ -430,43 +449,50 @@ export class ArtifactManager {

private async createZipForLanguage(
workspaceFolder: WorkspaceFolder,
fileSizeDetails: FileSizeDetails,
language: CodewhispererLanguage,
files: FileMetadata[],
subDirectory: string = ''
): Promise<FileMetadata> {
): Promise<FileMetadata | undefined> {
const zipDirectoryPath = path.join(this.tempDirPath, workspaceFolder.name, subDirectory)
this.createFolderIfNotExist(zipDirectoryPath)

const zipPath = path.join(zipDirectoryPath, `${language}.zip`)

let currentSize = 0
let skippedSize = 0
let skippedFiles = 0
const filesToInclude: FileMetadata[] = []

// Don't add files to the zip if the total size of uncompressed source code would go over the limit
// Currently there is no ordering on the files. If the first file added to the zip is equal to the limit, only it will be added and no other files will be added
for (const file of files) {
if (currentSize + file.contentLength <= MAX_UNCOMPRESSED_SRC_SIZE_BYTES) {
if (fileSizeDetails.includedSize + file.contentLength <= MAX_UNCOMPRESSED_SRC_SIZE_BYTES) {
filesToInclude.push(file)
currentSize += file.contentLength
fileSizeDetails.includedSize += file.contentLength
fileSizeDetails.includedFileCount += 1
} else {
skippedSize += file.contentLength
skippedFiles += 1
fileSizeDetails.skippedSize += file.contentLength
fileSizeDetails.skippedFileCount += 1
}
}

const zipBuffer = await this.createZipBuffer(filesToInclude)
await fs.promises.writeFile(zipPath, zipBuffer)

const stats = fs.statSync(zipPath)

if (skippedFiles > 0) {
this.log(
`Skipped ${skippedFiles} ${language} files of total size ${skippedSize} bytes due to exceeding the maximum zip size`
)
}

if (filesToInclude.length === 0) {
return undefined
}

const zipBuffer = await this.createZipBuffer(filesToInclude)
await fs.promises.writeFile(zipPath, zipBuffer)

const stats = fs.statSync(zipPath)

return {
filePath: zipPath,
relativePath: path.join(workspaceFolder.name, subDirectory, `files.zip`),
Expand Down Expand Up @@ -569,18 +595,35 @@ export class ArtifactManager {
private async processWorkspaceFolders(workspaceFolders: WorkspaceFolder[]): Promise<FileMetadata[]> {
const startTime = performance.now()
let zipFileMetadata: FileMetadata[] = []
const fileSizeDetails: FileSizeDetails = {
includedFileCount: 0,
includedSize: 0,
skippedFileCount: 0,
skippedSize: 0,
}

for (const workspaceFolder of workspaceFolders) {
const workspacePath = URI.parse(workspaceFolder.uri).path

try {
const filesByLanguage = await this.processDirectory(workspaceFolder, workspacePath)
const fileMetadata = await this.processFilesByLanguage(workspaceFolder, filesByLanguage)
const fileMetadata = await this.processFilesByLanguage(
workspaceFolder,
fileSizeDetails,
filesByLanguage
)
zipFileMetadata.push(...fileMetadata)
} catch (error) {
this.logging.warn(`Error processing workspace folder ${workspacePath}: ${error}`)
}
}
if (fileSizeDetails.skippedFileCount > 0) {
this.logging.warn(
`Skipped ${fileSizeDetails.skippedFileCount} files (total size: ` +
`${fileSizeDetails.skippedSize} bytes) due to exceeding the maximum artifact size`
)
}

const totalTime = performance.now() - startTime
this.log(`Creating workspace source code artifacts took: ${totalTime.toFixed(2)}ms`)

Expand All @@ -589,22 +632,31 @@ export class ArtifactManager {

private async processFilesByLanguage(
workspaceFolder: WorkspaceFolder,
fileSizeDetails: FileSizeDetails,
filesByLanguage: Map<CodewhispererLanguage, FileMetadata[]>,
relativePath?: string
): Promise<FileMetadata[]> {
const zipFileMetadata: FileMetadata[] = []
await this.updateWorkspaceFiles(workspaceFolder, filesByLanguage)

for (const [language, files] of filesByLanguage.entries()) {
// Genrate java .classpath and .project files
// Generate java .classpath and .project files
const processedFiles =
language === 'java' ? await this.processJavaProjectConfig(workspaceFolder, files) : files

const zipMetadata = await this.createZipForLanguage(workspaceFolder, language, processedFiles, relativePath)
this.log(
`Created zip for language ${language} out of ${processedFiles.length} files in ${workspaceFolder.name}`
const zipMetadata = await this.createZipForLanguage(
workspaceFolder,
fileSizeDetails,
language,
processedFiles,
relativePath
)
zipFileMetadata.push(zipMetadata)

if (zipMetadata) {
this.log(
`Created zip for language ${language} out of ${processedFiles.length} files in ${workspaceFolder.name}`
)
zipFileMetadata.push(zipMetadata)
}
}
return zipFileMetadata
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ export class WebSocketClient {
private reconnectAttempts: number = 0
private readonly maxReconnectAttempts: number = 5
private messageQueue: string[] = []
private readonly messageThrottleDelay: number = 100

constructor(url: string, logging: Logging, credentialsProvider: CredentialsProvider) {
this.url = url
Expand Down Expand Up @@ -104,7 +103,11 @@ export class WebSocketClient {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift()
if (message) {
this.send(message).catch(error => this.logging.error(`Error sending message: ${error}`))
try {
this.send(message)
} catch (error) {
this.logging.error(`Error sending message: ${error}`)
}
}
}
}
Expand Down Expand Up @@ -141,23 +144,10 @@ export class WebSocketClient {
}
}

// https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_client_applications#sending_data_to_the_server
// TODO, the approach of delaying websocket messages should be investigated and validated
// The current approach could be susceptible to race conditions that might result in out of order events
// Consider this scenario
// wsClient.send("message1"); // needs throttling, will wait 100ms
// wsClient.send("message2"); // runs immediately without waiting

// What actually happens:
// - Both calls start executing simultaneously
// - Both check timeSinceLastMessage at nearly the same time
// - Both might determine they need to wait
// - They could end up sending in unpredictable order
// It might be better to keep an active queue in the client and expose enqueueMessage instead of send
public async send(message: string): Promise<void> {
public send(message: string): void {
if (this.ws?.readyState === WebSocket.OPEN) {
await new Promise(resolve => setTimeout(resolve, this.messageThrottleDelay))
this.ws.send(message)
this.logging.debug('Message sent successfully to the remote workspace')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reminder that debug logs are disabled by default on production

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, not seeing these messages valuable to present in user logs.

} else {
this.queueMessage(message)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ export class DependencyDiscoverer {
private workspaceFolders: WorkspaceFolder[]
public dependencyHandlerRegistry: LanguageDependencyHandler<BaseDependencyInfo>[] = []
private initializedWorkspaceFolder = new Map<WorkspaceFolder, boolean>()
// Create a SharedArrayBuffer with 4 bytes (for a 32-bit unsigned integer) for thread-safe counter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for posterity, I believe we don't need to go with this approach since we are not using any workers or threads in the code and everything is single threaded at the moment

a simple data type like the following would also work here

type DependencySizeTracker = {
    totalSize: number
    maxSize: number
}

const sizeTracker: DependencySizeTracker = {
    totalSize: 0,
    maxSize: 8 * 1024 * 1024 * 1024
}

The current approach is still fine and I am not suggesting we change it.

protected dependencyUploadedSizeSum = new Uint32Array(new SharedArrayBuffer(4))

constructor(
workspace: Workspace,
Expand All @@ -21,6 +23,7 @@ export class DependencyDiscoverer {
) {
this.workspaceFolders = workspaceFolders
this.logging = logging
this.dependencyUploadedSizeSum[0] = 0

let jstsHandlerCreated = false
supportedWorkspaceContextLanguages.forEach(language => {
Expand All @@ -29,7 +32,8 @@ export class DependencyDiscoverer {
workspace,
logging,
workspaceFolders,
artifactManager
artifactManager,
this.dependencyUploadedSizeSum
)
if (handler) {
// Share handler for javascript and typescript
Expand Down Expand Up @@ -130,6 +134,13 @@ export class DependencyDiscoverer {
this.logging.log(`Dependency search completed successfully`)
}

async reSyncDependenciesToS3(folders: WorkspaceFolder[]) {
Atomics.store(this.dependencyUploadedSizeSum, 0, 0)
for (const dependencyHandler of this.dependencyHandlerRegistry) {
await dependencyHandler.zipDependencyMap(folders)
}
}

async handleDependencyUpdateFromLSP(language: string, paths: string[], workspaceRoot?: WorkspaceFolder) {
for (const dependencyHandler of this.dependencyHandlerRegistry) {
if (dependencyHandler.language != language) {
Expand All @@ -144,6 +155,7 @@ export class DependencyDiscoverer {
this.dependencyHandlerRegistry.forEach(dependencyHandler => {
dependencyHandler.dispose()
})
Atomics.store(this.dependencyUploadedSizeSum, 0, 0)
}

public disposeWorkspaceFolder(workspaceFolder: WorkspaceFolder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,23 @@ export abstract class LanguageDependencyHandler<T extends BaseDependencyInfo> {
protected workspaceFolders: WorkspaceFolder[]
// key: workspaceFolder, value: {key: dependency name, value: Dependency}
protected dependencyMap = new Map<WorkspaceFolder, Map<string, Dependency>>()
protected dependencyUploadedSize = new Map<WorkspaceFolder, number>()
protected dependencyUploadedSizeMap = new Map<WorkspaceFolder, number>()
protected dependencyUploadedSizeSum: Uint32Array<SharedArrayBuffer>
protected dependencyWatchers: Map<string, fs.FSWatcher> = new Map<string, fs.FSWatcher>()
protected artifactManager: ArtifactManager
protected dependenciesFolderName: string
protected eventEmitter: EventEmitter
protected readonly MAX_SINGLE_DEPENDENCY_SIZE: number = 500 * 1024 * 1024 // 500 MB
protected readonly MAX_WORKSPACE_DEPENDENCY_SIZE: number = 5 * 1024 * 1024 * 1024 //5 GB
protected readonly MAX_WORKSPACE_DEPENDENCY_SIZE: number = 8 * 1024 * 1024 * 1024 // 8 GB

constructor(
language: CodewhispererLanguage,
workspace: Workspace,
logging: Logging,
workspaceFolders: WorkspaceFolder[],
artifactManager: ArtifactManager,
dependenciesFolderName: string
dependenciesFolderName: string,
dependencyUploadedSizeSum: Uint32Array<SharedArrayBuffer>
) {
this.language = language
this.workspace = workspace
Expand All @@ -54,7 +56,7 @@ export abstract class LanguageDependencyHandler<T extends BaseDependencyInfo> {
this.workspaceFolders.forEach(workSpaceFolder =>
this.dependencyMap.set(workSpaceFolder, new Map<string, Dependency>())
)

this.dependencyUploadedSizeSum = dependencyUploadedSizeSum
this.eventEmitter = new EventEmitter()
}

Expand Down Expand Up @@ -85,14 +87,18 @@ export abstract class LanguageDependencyHandler<T extends BaseDependencyInfo> {
dependencyMap: Map<string, Dependency>
): void

public onDependencyChange(callback: (workspaceFolder: WorkspaceFolder, zips: FileMetadata[]) => void): void {
public onDependencyChange(
callback: (workspaceFolder: WorkspaceFolder, zips: FileMetadata[], addWSFolderPathInS3: boolean) => void
): void {
this.eventEmitter.on('dependencyChange', callback)
}

protected emitDependencyChange(workspaceFolder: WorkspaceFolder, zips: FileMetadata[]): void {
if (zips.length > 0) {
this.logging.log(`Emitting ${this.language} dependency change event for ${workspaceFolder.name}`)
this.eventEmitter.emit('dependencyChange', workspaceFolder, zips)
// If language is JavaScript or TypeScript, we want to preserve the workspaceFolder path in S3 path
const addWSFolderPathInS3 = this.language === 'javascript' || this.language === 'typescript'
this.eventEmitter.emit('dependencyChange', workspaceFolder, zips, addWSFolderPathInS3)
return
}
}
Expand Down Expand Up @@ -170,10 +176,11 @@ export abstract class LanguageDependencyHandler<T extends BaseDependencyInfo> {
}
currentChunk.push(dependency)
currentChunkSize += dependency.size
this.dependencyUploadedSize.set(
this.dependencyUploadedSizeMap.set(
workspaceFolder,
(this.dependencyUploadedSize.get(workspaceFolder) || 0) + dependency.size
(this.dependencyUploadedSizeMap.get(workspaceFolder) || 0) + dependency.size
)
Atomics.add(this.dependencyUploadedSizeSum, 0, dependency.size)
// Mark this dependency that has been zipped
dependency.zipped = true
this.dependencyMap.get(workspaceFolder)?.set(dependency.name, dependency)
Expand Down Expand Up @@ -299,7 +306,7 @@ export abstract class LanguageDependencyHandler<T extends BaseDependencyInfo> {
* However, everytime flare server restarts, this dependency map will be initialized.
*/
private validateWorkspaceDependencySize(workspaceFolder: WorkspaceFolder): boolean {
let uploadedSize = this.dependencyUploadedSize.get(workspaceFolder)
let uploadedSize = Atomics.load(this.dependencyUploadedSizeSum, 0)
if (uploadedSize && this.MAX_WORKSPACE_DEPENDENCY_SIZE < uploadedSize) {
return false
}
Expand All @@ -308,14 +315,15 @@ export abstract class LanguageDependencyHandler<T extends BaseDependencyInfo> {

dispose(): void {
this.dependencyMap.clear()
this.dependencyUploadedSize.clear()
this.dependencyUploadedSizeMap.clear()
this.dependencyWatchers.forEach(watcher => watcher.close())
this.dependencyWatchers.clear()
}

disposeWorkspaceFolder(workspaceFolder: WorkspaceFolder): void {
this.dependencyMap.delete(workspaceFolder)
this.dependencyUploadedSize.delete(workspaceFolder)
Atomics.sub(this.dependencyUploadedSizeSum, 0, this.dependencyUploadedSizeMap.get(workspaceFolder) || 0)
this.dependencyUploadedSizeMap.delete(workspaceFolder)
this.disposeWatchers(workspaceFolder)
this.disposeDependencyInfo(workspaceFolder)
}
Expand Down
Loading
Loading