Skip to content

Commit 32bcb5e

Browse files
committed
update godot-optimizer to use asset-server
1 parent 45c25aa commit 32bcb5e

File tree

7 files changed

+289
-46
lines changed

7 files changed

+289
-46
lines changed

.env.default

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,11 @@ AWS_ENDPOINT=
1212

1313
# Pipeline monitoring (optional)
1414
MONITORING_URL=
15-
MONITORING_SECRET=
15+
MONITORING_SECRET=
16+
17+
# Fetch retry configuration
18+
FETCH_MAX_RETRIES=3
19+
FETCH_INITIAL_DELAY_MS=1000
20+
FETCH_MAX_DELAY_MS=30000
21+
FETCH_TIMEOUT_MS=60000
22+
FETCH_BACKOFF_MULTIPLIER=2

src/adapters/fetch.ts

Lines changed: 128 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,136 @@
11
import { IFetchComponent } from '@well-known-components/http-server'
2+
import { IConfigComponent, ILoggerComponent } from '@well-known-components/interfaces'
23
import * as nodeFetch from 'node-fetch'
34

4-
export async function createFetchComponent() {
5-
const fetch: IFetchComponent = {
5+
// Error codes that indicate transient network failures
6+
const RETRYABLE_ERROR_CODES = new Set([
7+
'ENOTFOUND', // DNS resolution failure
8+
'ETIMEDOUT', // Connection timeout
9+
'ECONNRESET', // Connection reset
10+
'ECONNREFUSED', // Connection refused
11+
'EPIPE', // Broken pipe
12+
'ENETUNREACH', // Network unreachable
13+
'EHOSTUNREACH', // Host unreachable
14+
'EAI_AGAIN' // DNS temporary failure
15+
])
16+
17+
// HTTP status codes that indicate transient server failures
18+
const RETRYABLE_HTTP_STATUS = new Set([
19+
408, // Request Timeout
20+
429, // Too Many Requests
21+
500, // Internal Server Error
22+
502, // Bad Gateway
23+
503, // Service Unavailable
24+
504 // Gateway Timeout
25+
])
26+
27+
interface FetchConfig {
28+
maxRetries: number
29+
initialDelayMs: number
30+
maxDelayMs: number
31+
timeoutMs: number
32+
backoffMultiplier: number
33+
}
34+
35+
function isRetryableError(error: unknown): boolean {
36+
if (error && typeof error === 'object' && 'code' in error) {
37+
return RETRYABLE_ERROR_CODES.has((error as { code: string }).code)
38+
}
39+
return false
40+
}
41+
42+
function isRetryableStatus(status: number): boolean {
43+
return RETRYABLE_HTTP_STATUS.has(status)
44+
}
45+
46+
function calculateDelay(attempt: number, config: FetchConfig): number {
47+
// Exponential backoff: initialDelay * (multiplier ^ attempt)
48+
const exponentialDelay = config.initialDelayMs * Math.pow(config.backoffMultiplier, attempt)
49+
// Cap at max delay
50+
const cappedDelay = Math.min(exponentialDelay, config.maxDelayMs)
51+
// Add jitter (0-25% of delay)
52+
const jitter = cappedDelay * Math.random() * 0.25
53+
return Math.floor(cappedDelay + jitter)
54+
}
55+
56+
function sleep(ms: number): Promise<void> {
57+
return new Promise((resolve) => setTimeout(resolve, ms))
58+
}
59+
60+
export async function createFetchComponent(deps?: {
61+
config?: IConfigComponent
62+
logs?: ILoggerComponent
63+
}): Promise<IFetchComponent> {
64+
const config = deps?.config
65+
const logger = deps?.logs?.getLogger('fetch')
66+
67+
// Load configuration with defaults
68+
const fetchConfig: FetchConfig = {
69+
maxRetries: parseInt((await config?.getString('FETCH_MAX_RETRIES')) ?? '3', 10),
70+
initialDelayMs: parseInt((await config?.getString('FETCH_INITIAL_DELAY_MS')) ?? '1000', 10),
71+
maxDelayMs: parseInt((await config?.getString('FETCH_MAX_DELAY_MS')) ?? '30000', 10),
72+
timeoutMs: parseInt((await config?.getString('FETCH_TIMEOUT_MS')) ?? '60000', 10),
73+
backoffMultiplier: parseFloat((await config?.getString('FETCH_BACKOFF_MULTIPLIER')) ?? '2')
74+
}
75+
76+
const fetchComponent: IFetchComponent = {
677
async fetch(url: nodeFetch.RequestInfo, init?: nodeFetch.RequestInit): Promise<nodeFetch.Response> {
7-
return nodeFetch.default(url, init)
78+
const urlString = typeof url === 'string' ? url : String(url)
79+
let lastError: Error | undefined
80+
81+
for (let attempt = 0; attempt <= fetchConfig.maxRetries; attempt++) {
82+
// Create AbortController for timeout
83+
const controller = new AbortController()
84+
const timeoutId = setTimeout(() => controller.abort(), fetchConfig.timeoutMs)
85+
86+
try {
87+
// Merge abort signal with any existing signal
88+
const mergedInit: nodeFetch.RequestInit = {
89+
...init,
90+
signal: controller.signal as nodeFetch.RequestInit['signal']
91+
}
92+
93+
const response = await nodeFetch.default(url, mergedInit)
94+
clearTimeout(timeoutId)
95+
96+
// Check for retryable HTTP status
97+
if (isRetryableStatus(response.status) && attempt < fetchConfig.maxRetries) {
98+
const delay = calculateDelay(attempt, fetchConfig)
99+
logger?.warn(
100+
`Retryable HTTP status ${response.status} for ${urlString}, attempt ${attempt + 1}/${fetchConfig.maxRetries + 1}, retrying in ${delay}ms`
101+
)
102+
await sleep(delay)
103+
continue
104+
}
105+
106+
return response
107+
} catch (error) {
108+
clearTimeout(timeoutId)
109+
lastError = error instanceof Error ? error : new Error(String(error))
110+
111+
// Check if error is retryable
112+
const isAbortError = lastError.name === 'AbortError'
113+
const isNetworkError = isRetryableError(error)
114+
115+
if ((isAbortError || isNetworkError) && attempt < fetchConfig.maxRetries) {
116+
const errorCode = isAbortError ? 'TIMEOUT' : ((error as { code?: string }).code ?? 'UNKNOWN')
117+
const delay = calculateDelay(attempt, fetchConfig)
118+
logger?.warn(
119+
`Network error ${errorCode} for ${urlString}, attempt ${attempt + 1}/${fetchConfig.maxRetries + 1}, retrying in ${delay}ms`
120+
)
121+
await sleep(delay)
122+
continue
123+
}
124+
125+
// Not retryable or retries exhausted
126+
throw lastError
127+
}
128+
}
129+
130+
// Should not reach here, but throw last error if we do
131+
throw lastError ?? new Error(`Fetch failed for ${urlString} after ${fetchConfig.maxRetries} retries`)
8132
}
9133
}
10134

11-
return fetch
135+
return fetchComponent
12136
}

src/adapters/monitoring-reporter.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ export interface HeartbeatData {
1515
progressPercent?: number
1616
startedAt?: string
1717
isPriority?: boolean
18+
entityType?: 'scene' | 'wearable' | 'emote'
1819
}
1920

2021
export interface JobCompleteData {
@@ -25,6 +26,7 @@ export interface JobCompleteData {
2526
durationMs: number
2627
errorMessage?: string
2728
isPriority?: boolean
29+
entityType?: 'scene' | 'wearable' | 'emote'
2830
}
2931

3032
interface MonitoringReporterComponents {

src/adapters/sqs.ts

Lines changed: 68 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -83,49 +83,92 @@ export function createMemoryQueueAdapter<T>(
8383
}
8484
}
8585

86+
export interface SqsAdapterOptions {
87+
queueUrl: string
88+
priorityQueueUrl?: string
89+
wearableQueueUrl?: string
90+
emoteQueueUrl?: string
91+
queueRegion?: string
92+
endpoint?: string
93+
}
94+
95+
interface QueueInfo {
96+
url: string
97+
name: string
98+
}
99+
86100
export function createSqsAdapter<T>(
87101
components: Pick<AppComponents, 'logs' | 'metrics'>,
88-
options: { queueUrl: string; priorityQueueUrl?: string; queueRegion?: string; endpoint?: string }
102+
options: SqsAdapterOptions
89103
): ITaskQueue<T> {
90104
const logger = components.logs.getLogger(options.queueUrl)
91105
const sqs = new SQSClient({
92106
region: options.queueRegion,
93107
...(options.endpoint && { endpoint: options.endpoint })
94108
})
95109

96-
async function receiveMessage(quantityOfMessages: number): Promise<{ response: any | undefined; queueUsed: string }> {
97-
let response
98-
let queueUsed = ''
110+
// Build list of queues for round-robin polling
111+
// Priority queue is always checked first, then round-robin through entity type queues
112+
const entityQueues: QueueInfo[] = []
113+
if (options.queueUrl) entityQueues.push({ url: options.queueUrl, name: 'scene' })
114+
if (options.wearableQueueUrl) entityQueues.push({ url: options.wearableQueueUrl, name: 'wearable' })
115+
if (options.emoteQueueUrl) entityQueues.push({ url: options.emoteQueueUrl, name: 'emote' })
99116

100-
if (options.priorityQueueUrl) {
101-
try {
102-
response = await sqs.send(
103-
new ReceiveMessageCommand({
104-
MaxNumberOfMessages: quantityOfMessages,
105-
MessageAttributeNames: ['All'],
106-
QueueUrl: options.priorityQueueUrl,
107-
WaitTimeSeconds: 15,
108-
VisibilityTimeout: 3 * 3600
109-
})
110-
)
111-
queueUsed = options.priorityQueueUrl
112-
} catch {}
113-
}
117+
let currentQueueIndex = 0
114118

115-
if (!response || !response.Messages || response.Messages.length < 1) {
116-
response = await sqs.send(
119+
async function tryReceiveFromQueue(queueUrl: string, waitTimeSeconds: number): Promise<any | undefined> {
120+
try {
121+
const response = await sqs.send(
117122
new ReceiveMessageCommand({
118-
MaxNumberOfMessages: quantityOfMessages,
123+
MaxNumberOfMessages: 1,
119124
MessageAttributeNames: ['All'],
120-
QueueUrl: options.queueUrl,
121-
WaitTimeSeconds: 15,
125+
QueueUrl: queueUrl,
126+
WaitTimeSeconds: waitTimeSeconds,
122127
VisibilityTimeout: 3 * 3600
123128
})
124129
)
125-
queueUsed = options.queueUrl
130+
return response?.Messages && response.Messages.length > 0 ? response : undefined
131+
} catch (err) {
132+
logger.debug(`Failed to receive from queue ${queueUrl}`, {
133+
error: err instanceof Error ? err.message : 'Unknown'
134+
})
135+
return undefined
136+
}
137+
}
138+
139+
async function receiveMessage(quantityOfMessages: number): Promise<{ response: any | undefined; queueUsed: string }> {
140+
// First, always check the priority queue
141+
if (options.priorityQueueUrl) {
142+
const response = await tryReceiveFromQueue(options.priorityQueueUrl, 1)
143+
if (response) {
144+
logger.info('Processing from priority queue')
145+
return { response, queueUsed: options.priorityQueueUrl }
146+
}
147+
}
148+
149+
// Round-robin through entity type queues
150+
if (entityQueues.length === 0) {
151+
return { response: undefined, queueUsed: '' }
152+
}
153+
154+
// Try each queue starting from current index
155+
for (let i = 0; i < entityQueues.length; i++) {
156+
const queueIndex = (currentQueueIndex + i) % entityQueues.length
157+
const queue = entityQueues[queueIndex]
158+
const isLastAttempt = i === entityQueues.length - 1
159+
160+
const response = await tryReceiveFromQueue(queue.url, isLastAttempt ? 15 : 1)
161+
if (response) {
162+
logger.info(`Processing from ${queue.name} queue`)
163+
// Move to next queue for next poll (round-robin)
164+
currentQueueIndex = (queueIndex + 1) % entityQueues.length
165+
return { response, queueUsed: queue.url }
166+
}
126167
}
127168

128-
return { response, queueUsed }
169+
// No messages found in any queue, advance to next queue for fairness
170+
currentQueueIndex = (currentQueueIndex + 1) % entityQueues.length
171+
return { response: undefined, queueUsed: '' }
129172
}
130173

131174
return {

src/components.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,9 @@ async function handleProfile(
169169
for (let i = 0; i < gltfAssets.length; i += concurrentLimit) {
170170
const batch = gltfAssets.slice(i, i + concurrentLimit)
171171

172-
logger.info(`Processing batch ${Math.floor(i / concurrentLimit) + 1}/${Math.ceil(gltfAssets.length / concurrentLimit)} (${batch.length} assets)`)
172+
logger.info(
173+
`Processing batch ${Math.floor(i / concurrentLimit) + 1}/${Math.ceil(gltfAssets.length / concurrentLimit)} (${batch.length} assets)`
174+
)
173175

174176
const batchPromises = batch.map(async (asset) => {
175177
const variant = asset.gltfFile.toLowerCase().includes('male/')
@@ -331,17 +333,25 @@ export async function initComponents(): Promise<AppComponents> {
331333
}
332334
)
333335
const statusChecks = await createStatusCheckComponent({ server, config })
334-
const fetch = await createFetchComponent()
336+
const fetch = await createFetchComponent({ config, logs })
335337

336338
await instrumentHttpServerWithMetrics({ metrics, server, config })
337339

338340
const sqsQueue = await config.getString('TASK_QUEUE')
339341
const prioritySqsQueue = await config.getString('PRIORITY_TASK_QUEUE')
342+
const wearableSqsQueue = await config.getString('WEARABLE_TASK_QUEUE')
343+
const emoteSqsQueue = await config.getString('EMOTE_TASK_QUEUE')
340344
const awsEndpoint = await config.getString('AWS_ENDPOINT')
341345
const taskQueue = sqsQueue
342346
? createSqsAdapter<DeploymentToSqs>(
343347
{ logs, metrics },
344-
{ queueUrl: sqsQueue, priorityQueueUrl: prioritySqsQueue, endpoint: awsEndpoint }
348+
{
349+
queueUrl: sqsQueue,
350+
priorityQueueUrl: prioritySqsQueue,
351+
wearableQueueUrl: wearableSqsQueue,
352+
emoteQueueUrl: emoteSqsQueue,
353+
endpoint: awsEndpoint
354+
}
345355
)
346356
: createMemoryQueueAdapter<DeploymentToSqs>({ logs, metrics }, { queueName: 'ConversionTaskQueue' })
347357

0 commit comments

Comments
 (0)