Skip to content

Commit a560353

Browse files
committed
fix memory leaks and add godot restart between entities
- use streaming uploads for S3 instead of loading files into memory - add HTTP/HTTPS agents with connection pooling limits - consume response bodies on error to free connections - clean up file streams in finally blocks - restart godot after each entity to prevent memory accumulation - add better error logging for failed asset processing - handle entities without GLTF as success with 0 assets
1 parent 32bcb5e commit a560353

File tree

14 files changed

+4887
-2294
lines changed

14 files changed

+4887
-2294
lines changed

.env.default

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,9 @@ FETCH_MAX_RETRIES=3
1919
FETCH_INITIAL_DELAY_MS=1000
2020
FETCH_MAX_DELAY_MS=30000
2121
FETCH_TIMEOUT_MS=60000
22-
FETCH_BACKOFF_MULTIPLIER=2
22+
FETCH_BACKOFF_MULTIPLIER=2
23+
24+
# Asset server configuration
25+
ASSET_SERVER_URL=http://localhost:8080
26+
ASSET_SERVER_TIMEOUT_MS=600000
27+
ASSET_SERVER_CONCURRENT_BUNDLES=4

package-lock.json

Lines changed: 3514 additions & 1408 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
"@aws-sdk/client-s3": "^3.726.1",
2727
"@aws-sdk/client-sns": "^3.699.0",
2828
"@aws-sdk/client-sqs": "^3.699.0",
29+
"@aws-sdk/lib-storage": "^3.726.1",
2930
"@dcl/schemas": "^7.4.1",
3031
"@well-known-components/env-config-provider": "^1.1.1",
3132
"@well-known-components/http-server": "^2.0.0",

src/adapters/asset-server.ts

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
import { IFetchComponent } from '@well-known-components/http-server'
22
import { ILoggerComponent } from '@well-known-components/interfaces'
3+
import { exec } from 'child_process'
4+
import { promisify } from 'util'
5+
6+
const execAsync = promisify(exec)
37

48
export type AssetType = 'scene' | 'wearable' | 'emote' | 'texture'
59

@@ -9,6 +13,7 @@ export interface IAssetServerComponent {
913
processAssets(params: ProcessAssetsParams): Promise<ProcessAssetsResponse>
1014
getBatchStatus(batchId: string): Promise<BatchStatus>
1115
waitForCompletion(batchId: string, timeoutMs?: number): Promise<BatchStatus>
16+
restartGodot(): Promise<boolean>
1217
}
1318

1419
export type ProcessSceneParams = {
@@ -85,6 +90,8 @@ export function createAssetServerComponent(
8590
async function isReady(): Promise<boolean> {
8691
try {
8792
const response = await fetch.fetch(`${baseUrl}/health`)
93+
// Consume response body to free the connection
94+
await response.text().catch(() => {})
8895
return response.ok
8996
} catch {
9097
return false
@@ -173,17 +180,65 @@ export function createAssetServerComponent(
173180
return status
174181
}
175182

176-
await new Promise((resolve) => setTimeout(resolve, pollIntervalMs))
183+
// Use a cancellable sleep pattern
184+
await new Promise<void>((resolve) => {
185+
const timeoutId = setTimeout(() => resolve(), pollIntervalMs)
186+
// Ensure timeout is unreferenced so it doesn't keep the process alive
187+
if (timeoutId.unref) {
188+
timeoutId.unref()
189+
}
190+
})
177191
}
178192

179193
throw new Error(`Timeout waiting for batch ${batchId} to complete after ${timeoutMs}ms`)
180194
}
181195

196+
async function restartGodot(): Promise<boolean> {
197+
const port = process.env.ASSET_SERVER_PORT || '8080'
198+
logger.info('Restarting Godot asset-server...')
199+
200+
try {
201+
// Kill existing Godot process - use simpler pattern
202+
try {
203+
await execAsync('pkill -9 -f "decentraland.godot.client" || true')
204+
logger.info('Sent kill signal to Godot process')
205+
} catch {
206+
// Ignore errors - process might not exist
207+
}
208+
209+
// Wait for process to fully terminate
210+
await new Promise((resolve) => setTimeout(resolve, 2000))
211+
212+
// Start new Godot process in background
213+
const godotCmd = `/app/decentraland.godot.client.x86_64 --headless --asset-server --asset-server-port ${port}`
214+
const child = exec(godotCmd)
215+
216+
// Detach from child process - we don't need to track it
217+
child.unref()
218+
219+
// Wait for server to be ready (up to 60 seconds)
220+
for (let i = 0; i < 60; i++) {
221+
await new Promise((resolve) => setTimeout(resolve, 1000))
222+
if (await isReady()) {
223+
logger.info('Godot asset-server is ready after restart')
224+
return true
225+
}
226+
}
227+
228+
logger.error('Godot asset-server failed to become ready after restart')
229+
return false
230+
} catch (error) {
231+
logger.error('Failed to restart Godot', { error: error instanceof Error ? error.message : String(error) })
232+
return false
233+
}
234+
}
235+
182236
return {
183237
isReady,
184238
processScene,
185239
processAssets,
186240
getBatchStatus,
187-
waitForCompletion
241+
waitForCompletion,
242+
restartGodot
188243
}
189244
}

src/adapters/fetch.ts

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,23 @@
11
import { IFetchComponent } from '@well-known-components/http-server'
22
import { IConfigComponent, ILoggerComponent } from '@well-known-components/interfaces'
33
import * as nodeFetch from 'node-fetch'
4+
import http from 'http'
5+
import https from 'https'
6+
7+
// Create HTTP agents with limited connection pooling to prevent memory leaks
8+
const httpAgent = new http.Agent({
9+
keepAlive: true,
10+
maxSockets: 10,
11+
maxFreeSockets: 5,
12+
timeout: 60000
13+
})
14+
15+
const httpsAgent = new https.Agent({
16+
keepAlive: true,
17+
maxSockets: 10,
18+
maxFreeSockets: 5,
19+
timeout: 60000
20+
})
421

522
// Error codes that indicate transient network failures
623
const RETRYABLE_ERROR_CODES = new Set([
@@ -84,17 +101,28 @@ export async function createFetchComponent(deps?: {
84101
const timeoutId = setTimeout(() => controller.abort(), fetchConfig.timeoutMs)
85102

86103
try {
87-
// Merge abort signal with any existing signal
104+
// Select appropriate agent based on protocol
105+
const isHttps = urlString.startsWith('https')
106+
const agent = isHttps ? httpsAgent : httpAgent
107+
108+
// Merge abort signal and agent with any existing options
88109
const mergedInit: nodeFetch.RequestInit = {
89110
...init,
90-
signal: controller.signal as nodeFetch.RequestInit['signal']
111+
signal: controller.signal as nodeFetch.RequestInit['signal'],
112+
agent
91113
}
92114

93115
const response = await nodeFetch.default(url, mergedInit)
94116
clearTimeout(timeoutId)
95117

96118
// Check for retryable HTTP status
97119
if (isRetryableStatus(response.status) && attempt < fetchConfig.maxRetries) {
120+
// Consume and discard the response body to free up the connection
121+
try {
122+
await response.text()
123+
} catch {
124+
// Ignore errors when consuming body
125+
}
98126
const delay = calculateDelay(attempt, fetchConfig)
99127
logger?.warn(
100128
`Retryable HTTP status ${response.status} for ${urlString}, attempt ${attempt + 1}/${fetchConfig.maxRetries + 1}, retrying in ${delay}ms`

src/adapters/monitoring-reporter.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,24 +65,28 @@ export function createMonitoringReporter(
6565
return
6666
}
6767

68+
let controller: AbortController | null = new AbortController()
69+
const timeoutId = setTimeout(() => controller?.abort(), 5000)
70+
6871
try {
6972
const url = `${monitoringUrl}${endpoint}`
70-
const controller = new AbortController()
71-
const timeoutId = setTimeout(() => controller.abort(), 5000)
7273

73-
await fetch.fetch(url, {
74+
const response = await fetch.fetch(url, {
7475
method: 'POST',
7576
headers: { 'Content-Type': 'application/json' },
7677
body: JSON.stringify({ ...data, secret: monitoringSecret }),
7778
signal: controller.signal
7879
})
79-
80-
clearTimeout(timeoutId)
80+
// Consume response body to free the connection
81+
await response.text().catch(() => {})
8182
} catch (error) {
8283
// Silently ignore - monitoring should never block pipeline
8384
logger.debug('Monitoring report failed (non-blocking)', {
8485
error: error instanceof Error ? error.message : 'Unknown error'
8586
})
87+
} finally {
88+
clearTimeout(timeoutId)
89+
controller = null
8690
}
8791
}
8892

src/adapters/storage.ts

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { IBaseComponent } from '@well-known-components/interfaces'
2-
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3'
2+
import { S3Client } from '@aws-sdk/client-s3'
3+
import { Upload } from '@aws-sdk/lib-storage'
4+
import { createReadStream } from 'fs'
35
import { mkdir, readFile, writeFile } from 'fs/promises'
46
import path from 'path'
57
import { AppComponents } from '../types'
@@ -28,20 +30,32 @@ export async function createS3StorageComponent(
2830
return {
2931
storeFile: async function (key: string, filePath: string) {
3032
const keyWithPrefix = `${formattedPrefix}${key}`
33+
let fileStream: ReturnType<typeof createReadStream> | null = null
3134

3235
try {
33-
const fileContent = await readFile(filePath)
34-
const command = new PutObjectCommand({
35-
Bucket: bucketName,
36-
Key: keyWithPrefix,
37-
Body: fileContent
36+
// Use streaming upload to avoid loading entire file into memory
37+
fileStream = createReadStream(filePath)
38+
39+
const upload = new Upload({
40+
client: s3Client,
41+
params: {
42+
Bucket: bucketName,
43+
Key: keyWithPrefix,
44+
Body: fileStream
45+
}
3846
})
3947

40-
await s3Client.send(command)
48+
await upload.done()
4149
logger.info(`Stored file ${keyWithPrefix} in S3`)
4250
} catch (error) {
4351
logger.error(`Error storing file ${keyWithPrefix} in S3`)
4452
logger.error(error as any)
53+
} finally {
54+
// Ensure stream is closed
55+
if (fileStream) {
56+
fileStream.destroy()
57+
fileStream = null
58+
}
4559
}
4660
},
4761

@@ -52,20 +66,23 @@ export async function createS3StorageComponent(
5266
let success = false
5367

5468
while (attempt < 3 && !success) {
69+
let fileStream: ReturnType<typeof createReadStream> | null = null
5570
try {
56-
// Read the file content
57-
const fileContent = await readFile(filePath)
58-
59-
// Create and send the S3 command
60-
const command = new PutObjectCommand({
61-
Bucket: bucketName,
62-
Key: keyWithPrefix,
63-
Body: fileContent
71+
// Use streaming upload to avoid loading entire file into memory
72+
fileStream = createReadStream(filePath)
73+
74+
const upload = new Upload({
75+
client: s3Client,
76+
params: {
77+
Bucket: bucketName,
78+
Key: keyWithPrefix,
79+
Body: fileStream
80+
}
6481
})
6582

66-
await s3Client.send(command)
83+
await upload.done()
6784

68-
// If the command succeeds, log the success and exit the retry loop
85+
// If the upload succeeds, log the success and exit the retry loop
6986
logger.info(`Successfully stored file ${keyWithPrefix} in S3`)
7087
success = true
7188
} catch (error) {
@@ -77,6 +94,12 @@ export async function createS3StorageComponent(
7794
logger.error(`Failed to store file ${keyWithPrefix} after 3 attempts`)
7895
logger.error(error as any)
7996
}
97+
} finally {
98+
// Ensure stream is closed
99+
if (fileStream) {
100+
fileStream.destroy()
101+
fileStream = null
102+
}
80103
}
81104
}
82105

src/components.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import { AwsCredentialIdentity } from '@smithy/types'
1616
import { createMonitoringReporter } from './adapters/monitoring-reporter'
1717
import { getProcessMethod } from './service'
1818
import { createAssetServerComponent } from './adapters/asset-server'
19+
import { rm } from 'fs/promises'
1920

2021
// Helper function to convert URN with token ID to pointer without token ID
2122
function urnToPointer(urn: string): string {
@@ -63,6 +64,8 @@ async function handleProfile(
6364

6465
const profileResponse = await fetch.fetch(profileUrl)
6566
if (!profileResponse.ok) {
67+
// Consume body to free the connection
68+
await profileResponse.text().catch(() => {})
6669
throw new Error(`Failed to fetch profile: ${profileResponse.statusText}`)
6770
}
6871

@@ -111,6 +114,8 @@ async function handleProfile(
111114
})
112115

113116
if (!entitiesResponse.ok) {
117+
// Consume body to free the connection
118+
await entitiesResponse.text().catch(() => {})
114119
throw new Error(`Failed to fetch entities: ${entitiesResponse.statusText}`)
115120
}
116121

@@ -201,6 +206,8 @@ async function handleProfile(
201206
if (result.status === 'completed' && result.zip_path) {
202207
const s3Key = `${asset.gltfHash}-mobile.zip`
203208
await storage.storeFile(s3Key, result.zip_path)
209+
// Clean up temp file from asset-server
210+
await rm(result.zip_path, { force: true }).catch(() => {})
204211
logger.info(`Completed: ${asset.pointer} ${variant} -> ${s3Key}`)
205212
return { success: true, hash: asset.gltfHash }
206213
} else {
@@ -250,6 +257,8 @@ async function handleEntityId(
250257
})
251258

252259
if (!response.ok) {
260+
// Consume body to free the connection
261+
await response.text().catch(() => {})
253262
throw new Error(`Failed to fetch entity ID: ${response.statusText}`)
254263
}
255264

@@ -282,6 +291,8 @@ async function handleEntityId(
282291
})
283292

284293
if (!response.ok) {
294+
// Consume body to free the connection
295+
await response.text().catch(() => {})
285296
throw new Error(`Failed to fetch world data: ${response.statusText}`)
286297
}
287298

0 commit comments

Comments
 (0)