From 438a8ba281e755f148811feb0580b027a6f3db9f Mon Sep 17 00:00:00 2001 From: Dev Singh Date: Sat, 9 Aug 2025 15:43:16 -0500 Subject: [PATCH 01/11] setup response streaming --- eslint.config.mjs | 10 ++ src/api/lambda.ts | 160 +++++++++++++++--------------- terraform/modules/lambdas/main.tf | 2 + 3 files changed, 90 insertions(+), 82 deletions(-) diff --git a/eslint.config.mjs b/eslint.config.mjs index 33195e74..c0dd8c1a 100644 --- a/eslint.config.mjs +++ b/eslint.config.mjs @@ -7,6 +7,7 @@ import { fileURLToPath } from "node:url"; import js from "@eslint/js"; import { FlatCompat } from "@eslint/eslintrc"; import mantine from "eslint-config-mantine"; +import globals from 'globals'; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); @@ -74,6 +75,15 @@ export default defineConfig([ files: ["src/api/build.js"], rules: { "import/extensions": "off" }, }, + { + files: ["src/api/lambda.ts"], + languageOptions: { + globals: { + ...globals.node, + 'awslambda': 'readonly' + } + } + }, { files: ["src/ui/*", "src/ui/**/*"], rules: { diff --git a/src/api/lambda.ts b/src/api/lambda.ts index 8e420071..e9b83166 100644 --- a/src/api/lambda.ts +++ b/src/api/lambda.ts @@ -1,118 +1,114 @@ import awsLambdaFastify from "@fastify/aws-lambda"; +import { pipeline } from "node:stream/promises"; import init, { instanceId } from "./index.js"; -import { type APIGatewayEvent, type Context } from "aws-lambda"; import { InternalServerError, ValidationError } from "common/errors/index.js"; +// Initialize the proxy with the payloadAsStream option const app = await init(); -const realHandler = awsLambdaFastify(app, { - decorateRequest: false, - serializeLambdaArguments: true, - callbackWaitsForEmptyEventLoop: false, - binaryMimeTypes: ["application/octet-stream", "application/vnd.apple.pkpass"], +const proxy = awsLambdaFastify(app, { + payloadAsStream: true, + decorateRequest: false, // from original code + serializeLambdaArguments: true, // from original code + binaryMimeTypes: ["application/octet-stream", "application/vnd.apple.pkpass"], // from original code }); -type WarmerEvent = { action: "warmer" }; - -/** - * Validates the origin verification header against the current and previous keys. - * @returns {boolean} `true` if the request is valid, otherwise `false`. - */ const validateOriginHeader = ( - originHeader: string | undefined, + originHeader: string, currentKey: string, previousKey: string | undefined, previousKeyExpiresAt: string | undefined, -): boolean => { - // 1. A header must exist to be valid. +) => { if (!originHeader) { return false; } - - // 2. Check against the current key first for an early return on the happy path. if (originHeader === currentKey) { return true; } - - // 3. If it's not the current key, check the previous key during the rotation window. if (previousKey && previousKeyExpiresAt) { const isExpired = new Date() >= new Date(previousKeyExpiresAt); if (originHeader === previousKey && !isExpired) { return true; } } - - // 4. If all checks fail, the header is invalid. return false; }; -const handler = async ( - event: APIGatewayEvent | WarmerEvent, - context: Context, -) => { - if ("action" in event && event.action === "warmer") { - return { instanceId }; - } - event = event as APIGatewayEvent; +// This handler now correctly uses the native streaming support from the packages. +export const handler = awslambda.streamifyResponse( + async (event: any, responseStream: any, context: any) => { + // 1. Handle warmer events + if ("action" in event && event.action === "warmer") { + responseStream.write(JSON.stringify({ instanceId })); + responseStream.end(); + return; + } - const currentKey = process.env.ORIGIN_VERIFY_KEY; - const previousKey = process.env.PREVIOUS_ORIGIN_VERIFY_KEY; - const previousKeyExpiresAt = - process.env.PREVIOUS_ORIGIN_VERIFY_KEY_EXPIRES_AT; + // 2. Perform origin header validation before calling the proxy + const currentKey = process.env.ORIGIN_VERIFY_KEY; + if (currentKey) { + const previousKey = process.env.PREVIOUS_ORIGIN_VERIFY_KEY; + const previousKeyExpiresAt = + process.env.PREVIOUS_ORIGIN_VERIFY_KEY_EXPIRES_AT; - // Log an error if the previous key has expired but is still configured. - if (previousKey && previousKeyExpiresAt) { - if (new Date() >= new Date(previousKeyExpiresAt)) { - console.error( - "Expired previous origin verify key is still present in the environment. Expired at:", + const isValid = validateOriginHeader( + event.headers?.["x-origin-verify"], + currentKey, + previousKey, previousKeyExpiresAt, ); - } - } - // Proceed with verification only if a current key is set. - if (currentKey) { - const isValid = validateOriginHeader( - event.headers?.["x-origin-verify"], - currentKey, - previousKey, - previousKeyExpiresAt, - ); + if (!isValid) { + const error = new ValidationError({ message: "Request is not valid." }); + const body = JSON.stringify(error.toJson()); - if (!isValid) { - const newError = new ValidationError({ - message: "Request is not valid.", - }); - const json = JSON.stringify(newError.toJson()); - return { - statusCode: newError.httpStatusCode, - body: json, - headers: { - "Content-Type": "application/json", - }, - isBase64Encoded: false, - }; + // On validation failure, manually create the response + const meta = { + statusCode: error.httpStatusCode, + headers: { "Content-Type": "application/json" }, + }; + const httpStream = awslambda.HttpResponseStream.from( + responseStream, + meta, + ); + httpStream.write(body); + httpStream.end(); + return; + } + delete event.headers["x-origin-verify"]; } - delete event.headers["x-origin-verify"]; - } + // 3. If validation passes, proxy the request and stream the response + try { + // The proxy returns a body stream and metadata + const { stream: fastifyResponseStream, ...meta } = await proxy( + event, + context, + ); - // If verification is disabled or passed, proceed with the real handler logic. - return await realHandler(event, context).catch((e) => { - console.error(e); - const newError = new InternalServerError({ - message: "Failed to initialize application.", - }); - const json = JSON.stringify(newError.toJson()); - return { - statusCode: newError.httpStatusCode, - body: json, - headers: { - "Content-Type": "application/json", - }, - isBase64Encoded: false, - }; - }); -}; + // Use the helper to apply the status code/headers from Fastify + const httpResponseStream = awslambda.HttpResponseStream.from( + responseStream, + meta, + ); -await app.ready(); -export { handler }; + // Pipe the response from Fastify to the Lambda output stream + await pipeline(fastifyResponseStream, httpResponseStream); + } catch (e) { + console.error("Error during proxy or stream pipeline:", e); + const error = new InternalServerError({ + message: "Failed to process request.", + }); + const body = JSON.stringify(error.toJson()); + const meta = { + statusCode: error.httpStatusCode, + headers: { "Content-Type": "application/json" }, + }; + const httpStream = awslambda.HttpResponseStream.from( + responseStream, + meta, + ); + httpStream.write(body); + httpStream.end(); + } + }, +); diff --git a/terraform/modules/lambdas/main.tf b/terraform/modules/lambdas/main.tf index 9dcd00f6..3f12591b 100644 --- a/terraform/modules/lambdas/main.tf +++ b/terraform/modules/lambdas/main.tf @@ -378,6 +378,7 @@ resource "aws_lambda_function" "sqs_lambda" { resource "aws_lambda_function_url" "api_lambda_function_url" { function_name = aws_lambda_function.api_lambda.function_name authorization_type = "NONE" + invoke_mode = "RESPONSE_STREAM" } // Slow lambda - used for monitoring purposes to avoid triggering lamdba latency alarms @@ -413,6 +414,7 @@ resource "aws_lambda_function" "slow_lambda" { resource "aws_lambda_function_url" "slow_api_lambda_function_url" { function_name = aws_lambda_function.slow_lambda.function_name authorization_type = "NONE" + invoke_mode = "RESPONSE_STREAM" } module "lambda_warmer_main" { From 9b98ce1309c9fa8c385ffbe9e0206ae9d2c9492e Mon Sep 17 00:00:00 2001 From: Dev Singh Date: Sat, 9 Aug 2025 16:06:47 -0500 Subject: [PATCH 02/11] fix response streaming --- src/api/lambda.ts | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/src/api/lambda.ts b/src/api/lambda.ts index e9b83166..27ef5170 100644 --- a/src/api/lambda.ts +++ b/src/api/lambda.ts @@ -7,8 +7,9 @@ import { InternalServerError, ValidationError } from "common/errors/index.js"; const app = await init(); const proxy = awsLambdaFastify(app, { payloadAsStream: true, - decorateRequest: false, // from original code - serializeLambdaArguments: true, // from original code + decorateRequest: false, + callbackWaitsForEmptyEventLoop: false, + serializeLambdaArguments: true, binaryMimeTypes: ["application/octet-stream", "application/vnd.apple.pkpass"], // from original code }); @@ -79,20 +80,12 @@ export const handler = awslambda.streamifyResponse( // 3. If validation passes, proxy the request and stream the response try { - // The proxy returns a body stream and metadata - const { stream: fastifyResponseStream, ...meta } = await proxy( - event, - context, - ); - - // Use the helper to apply the status code/headers from Fastify - const httpResponseStream = awslambda.HttpResponseStream.from( + const { stream, meta } = await proxy(event, context); + responseStream = awslambda.HttpResponseStream.from( responseStream, - meta, + meta as any, ); - - // Pipe the response from Fastify to the Lambda output stream - await pipeline(fastifyResponseStream, httpResponseStream); + await pipeline(stream, responseStream); } catch (e) { console.error("Error during proxy or stream pipeline:", e); const error = new InternalServerError({ From eb2b632a0782b12a06bfc3c5cbb08a750bf05dc6 Mon Sep 17 00:00:00 2001 From: Dev Singh Date: Sat, 9 Aug 2025 16:46:38 -0500 Subject: [PATCH 03/11] Fix issue with Lambda where streaming repsonses always require a body to be present --- src/api/lambda.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/api/lambda.ts b/src/api/lambda.ts index 27ef5170..69ecf1c9 100644 --- a/src/api/lambda.ts +++ b/src/api/lambda.ts @@ -2,6 +2,7 @@ import awsLambdaFastify from "@fastify/aws-lambda"; import { pipeline } from "node:stream/promises"; import init, { instanceId } from "./index.js"; import { InternalServerError, ValidationError } from "common/errors/index.js"; +import { Readable } from "node:stream"; // Initialize the proxy with the payloadAsStream option const app = await init(); @@ -85,7 +86,12 @@ export const handler = awslambda.streamifyResponse( responseStream, meta as any, ); - await pipeline(stream, responseStream); + + // Fix issue with Lambda where streaming repsonses always require a body to be present + const body = + stream.readableLength > 0 ? stream : Readable.from(Buffer.from("")); + + await pipeline(body, responseStream); } catch (e) { console.error("Error during proxy or stream pipeline:", e); const error = new InternalServerError({ From 29bada049af95babe2df60d0c71ef520c4934335 Mon Sep 17 00:00:00 2001 From: Dev Singh Date: Sat, 9 Aug 2025 17:18:47 -0500 Subject: [PATCH 04/11] fix/add live tests --- tests/live/iam.test.ts | 2 +- tests/live/membership.test.ts | 75 +++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/tests/live/iam.test.ts b/tests/live/iam.test.ts index 2fa84b46..f72f5801 100644 --- a/tests/live/iam.test.ts +++ b/tests/live/iam.test.ts @@ -10,7 +10,7 @@ import { getBaseEndpoint } from "./utils.js"; import { environmentConfig, genericConfig } from "../../src/common/config.js"; const baseEndpoint = getBaseEndpoint(); -test("getting groups", async () => { +test("getting groups", { timeout: 10000 }, async () => { const token = await createJwt(); const response = await fetch(`${baseEndpoint}/api/v1/iam/groups`, { method: "GET", diff --git a/tests/live/membership.test.ts b/tests/live/membership.test.ts index 9a535fc9..bd99c862 100644 --- a/tests/live/membership.test.ts +++ b/tests/live/membership.test.ts @@ -1,5 +1,6 @@ import { expect, test, describe } from "vitest"; import { createJwt, getBaseEndpoint } from "./utils.js"; +import { randomUUID } from "node:crypto"; const baseEndpoint = getBaseEndpoint(); const token = await createJwt(); @@ -232,3 +233,77 @@ describe("Membership API basic checks", async () => { }, ); }); + +test("External Membership List lifecycle test", async () => { + const unixTimestampSeconds = Math.floor(Date.now() / 1000); + const listId = `livetest-${unixTimestampSeconds}`; + let response = await fetch( + `${baseEndpoint}/api/v1/membership/externalList/${listId}`, + { + method: "PATCH", + headers: { + authorization: `Bearer ${token}`, + "content-type": "application/json", + }, + body: JSON.stringify({ + add: ["acmtest2"], + remove: [], + }), + }, + ); + expect(response.status).toBe(201); + response = await fetch( + `${baseEndpoint}/api/v1/membership/externalList/${listId}`, + { + method: "GET", + headers: { + authorization: `Bearer ${token}`, + "content-type": "application/json", + }, + }, + ); + let responseJson = await response.json(); + expect(responseJson).toStrictEqual(["acmtest2"]); + response = await fetch( + `${baseEndpoint}/api/v1/membership/externalList/${listId}`, + { + method: "PATCH", + headers: { + authorization: `Bearer ${token}`, + "content-type": "application/json", + }, + body: JSON.stringify({ + add: ["acmtest3"], + remove: ["acmtest2"], + }), + }, + ); + expect(response.status).toEqual(201); + response = await fetch( + `${baseEndpoint}/api/v1/membership/externalList/${listId}`, + { + method: "GET", + headers: { + authorization: `Bearer ${token}`, + "content-type": "application/json", + }, + }, + ); + responseJson = await response.json(); + expect(responseJson).toStrictEqual(["acmtest3"]); + response = await fetch( + `${baseEndpoint}/api/v1/membership/externalList/${listId}`, + { + method: "PATCH", + headers: { + authorization: `Bearer ${token}`, + "content-type": "application/json", + }, + body: JSON.stringify({ + remove: ["acmtest3"], + add: [], + }), + }, + ); + expect(response.status).toEqual(201); +}); From 95e68cc091bfe1eeee25a1cdf6890f7fa0d522bf Mon Sep 17 00:00:00 2001 From: Dev Singh Date: Sat, 9 Aug 2025 17:19:58 -0500 Subject: [PATCH 05/11] increase timeout --- src/ui/util/api.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ui/util/api.ts b/src/ui/util/api.ts index 22433558..f1650c37 100644 --- a/src/ui/util/api.ts +++ b/src/ui/util/api.ts @@ -4,7 +4,7 @@ import { useMemo } from "react"; import { useAuth } from "@ui/components/AuthContext"; import { getRunEnvironmentConfig, ValidService } from "@ui/config"; -export const MAX_API_TIMEOUT_MS = 5000; +export const MAX_API_TIMEOUT_MS = 10000; const createAxiosInstance = (baseURL: string) => axios.create({ From 76126ffb4a717fd2d6cc2404e309fbf6f87fd53a Mon Sep 17 00:00:00 2001 From: Dev Singh Date: Sat, 9 Aug 2025 17:24:17 -0500 Subject: [PATCH 06/11] reorg --- src/api/lambda.ts | 38 +++++++++----------------------------- 1 file changed, 9 insertions(+), 29 deletions(-) diff --git a/src/api/lambda.ts b/src/api/lambda.ts index 69ecf1c9..79aed5fc 100644 --- a/src/api/lambda.ts +++ b/src/api/lambda.ts @@ -79,35 +79,15 @@ export const handler = awslambda.streamifyResponse( delete event.headers["x-origin-verify"]; } - // 3. If validation passes, proxy the request and stream the response - try { - const { stream, meta } = await proxy(event, context); - responseStream = awslambda.HttpResponseStream.from( - responseStream, - meta as any, - ); - - // Fix issue with Lambda where streaming repsonses always require a body to be present - const body = - stream.readableLength > 0 ? stream : Readable.from(Buffer.from("")); + const { stream, meta } = await proxy(event, context); + // Fix issue with Lambda where streaming repsonses always require a body to be present + const body = + stream.readableLength > 0 ? stream : Readable.from(Buffer.from("")); - await pipeline(body, responseStream); - } catch (e) { - console.error("Error during proxy or stream pipeline:", e); - const error = new InternalServerError({ - message: "Failed to process request.", - }); - const body = JSON.stringify(error.toJson()); - const meta = { - statusCode: error.httpStatusCode, - headers: { "Content-Type": "application/json" }, - }; - const httpStream = awslambda.HttpResponseStream.from( - responseStream, - meta, - ); - httpStream.write(body); - httpStream.end(); - } + responseStream = awslambda.HttpResponseStream.from( + responseStream, + meta as any, + ); + await pipeline(body, responseStream); }, ); From 5bc9455b4094feb79148a72a5507f3d2137ac93b Mon Sep 17 00:00:00 2001 From: Dev Singh Date: Sat, 9 Aug 2025 17:31:57 -0500 Subject: [PATCH 07/11] reduce api timeout --- src/api/lambda.ts | 4 +++- .../pages/membershipLists/ExternalMemberListManagement.tsx | 6 ------ terraform/modules/lambdas/main.tf | 4 ++-- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/api/lambda.ts b/src/api/lambda.ts index 79aed5fc..b28ee17e 100644 --- a/src/api/lambda.ts +++ b/src/api/lambda.ts @@ -81,9 +81,11 @@ export const handler = awslambda.streamifyResponse( const { stream, meta } = await proxy(event, context); // Fix issue with Lambda where streaming repsonses always require a body to be present + app.log.warn(`Streamable length: ${stream.readableLength}`); + app.log.warn(`meta: ${JSON.stringify(meta)}`); const body = stream.readableLength > 0 ? stream : Readable.from(Buffer.from("")); - + app.log.warn(`New streamable length: ${body.readableLength}`); responseStream = awslambda.HttpResponseStream.from( responseStream, meta as any, diff --git a/src/ui/pages/membershipLists/ExternalMemberListManagement.tsx b/src/ui/pages/membershipLists/ExternalMemberListManagement.tsx index f9774df3..428432a6 100644 --- a/src/ui/pages/membershipLists/ExternalMemberListManagement.tsx +++ b/src/ui/pages/membershipLists/ExternalMemberListManagement.tsx @@ -148,12 +148,6 @@ const ExternalMemberListManagement: React.FC< color: "green", }); await loadMembers(); - } catch (error) { - notifications.show({ - title: "Update Failed", - message: "Could not save changes to the member list.", - color: "red", - }); } finally { setIsLoading(false); setConfirmationModalOpened(false); diff --git a/terraform/modules/lambdas/main.tf b/terraform/modules/lambdas/main.tf index 3f12591b..0f911969 100644 --- a/terraform/modules/lambdas/main.tf +++ b/terraform/modules/lambdas/main.tf @@ -332,7 +332,7 @@ resource "aws_lambda_function" "api_lambda" { handler = "lambda.handler" runtime = "nodejs22.x" filename = data.archive_file.api_lambda_code.output_path - timeout = 60 + timeout = 15 memory_size = 2048 source_code_hash = data.archive_file.api_lambda_code.output_sha256 environment { @@ -390,7 +390,7 @@ resource "aws_lambda_function" "slow_lambda" { handler = "lambda.handler" runtime = "nodejs22.x" filename = data.archive_file.api_lambda_code.output_path - timeout = 60 + timeout = 15 memory_size = 2048 source_code_hash = data.archive_file.api_lambda_code.output_sha256 logging_config { From 5760948c787e713034b084f46abf5caeb1780cca Mon Sep 17 00:00:00 2001 From: Dev Singh Date: Sat, 9 Aug 2025 17:35:32 -0500 Subject: [PATCH 08/11] fix --- src/api/lambda.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/api/lambda.ts b/src/api/lambda.ts index b28ee17e..defa3635 100644 --- a/src/api/lambda.ts +++ b/src/api/lambda.ts @@ -81,11 +81,8 @@ export const handler = awslambda.streamifyResponse( const { stream, meta } = await proxy(event, context); // Fix issue with Lambda where streaming repsonses always require a body to be present - app.log.warn(`Streamable length: ${stream.readableLength}`); - app.log.warn(`meta: ${JSON.stringify(meta)}`); const body = - stream.readableLength > 0 ? stream : Readable.from(Buffer.from("")); - app.log.warn(`New streamable length: ${body.readableLength}`); + stream.readableLength > 0 ? stream : Readable.from(Buffer.from(" ")); responseStream = awslambda.HttpResponseStream.from( responseStream, meta as any, From 134a254743977811084613b4639efe05f99de46a Mon Sep 17 00:00:00 2001 From: Dev Singh Date: Sat, 9 Aug 2025 17:43:09 -0500 Subject: [PATCH 09/11] use new warmer --- src/api/warmer/lambda.ts | 133 +++++++++++++++++++++++------- terraform/modules/lambdas/main.tf | 4 +- 2 files changed, 106 insertions(+), 31 deletions(-) diff --git a/src/api/warmer/lambda.ts b/src/api/warmer/lambda.ts index fef0f6d2..c476308c 100644 --- a/src/api/warmer/lambda.ts +++ b/src/api/warmer/lambda.ts @@ -1,29 +1,31 @@ -import { LambdaClient, InvokeCommand } from "@aws-sdk/client-lambda"; +import { + LambdaClient, + InvokeCommand, + InvokeWithResponseStreamCommand, +} from "@aws-sdk/client-lambda"; import { TextDecoder } from "util"; +// --- AWS SDK Clients and Utilities --- const lambdaClient = new LambdaClient({}); const textDecoder = new TextDecoder(); +// --- Invocation Logic for Standard Lambdas --- + /** - * Invokes a batch of lambdas concurrently and returns the unique instance IDs found. + * Invokes a batch of standard (non-streaming) Lambdas concurrently. */ -async function invokeBatch( +async function invokeStandardBatch( lambdaName: string, count: number, ): Promise> { - if (count <= 0) { - return new Set(); - } - - console.log(`Firing a batch of ${count} concurrent invocations...`); - - const invocationPromises = Array.from({ length: count }, () => { - const command = new InvokeCommand({ - FunctionName: lambdaName, - Payload: JSON.stringify({ action: "warmer" }), - }); - return lambdaClient.send(command); - }); + const invocationPromises = Array.from({ length: count }, () => + lambdaClient.send( + new InvokeCommand({ + FunctionName: lambdaName, + Payload: JSON.stringify({ action: "warmer" }), + }), + ), + ); const results = await Promise.allSettled(invocationPromises); const foundInstanceIds = new Set(); @@ -37,21 +39,76 @@ async function invokeBatch( foundInstanceIds.add(body.instanceId); } } catch (e) { - console.error("Error parsing payload from target function:", e); + console.error("Error parsing payload from standard function:", e); } } else if (result.status === "rejected") { - console.error("Invocation failed:", result.reason.message); + console.error("Standard invocation failed:", result.reason.message); } }); return foundInstanceIds; } +// --- Invocation Logic for Streaming Lambdas --- + +/** + * Invokes a batch of response-streaming Lambdas concurrently. + */ +async function invokeStreamingBatch( + lambdaName: string, + count: number, +): Promise> { + const invocationPromises = Array.from({ length: count }, () => + lambdaClient.send( + new InvokeWithResponseStreamCommand({ + FunctionName: lambdaName, + Payload: JSON.stringify({ action: "warmer" }), + }), + ), + ); + + const results = await Promise.allSettled(invocationPromises); + const foundInstanceIds = new Set(); + + for (const result of results) { + if (result.status === "fulfilled" && result.value.EventStream) { + try { + const chunks: Uint8Array[] = []; + // Iterate over the EventStream to get data chunks + for await (const event of result.value.EventStream) { + if (event.PayloadChunk && event.PayloadChunk.Payload) { + chunks.push(event.PayloadChunk.Payload); + } + } + + const payloadString = textDecoder.decode(Buffer.concat(chunks)); + const body = JSON.parse(payloadString); + if (body.instanceId) { + foundInstanceIds.add(body.instanceId); + } + } catch (e) { + console.error("Error processing stream from streaming function:", e); + } + } else if (result.status === "rejected") { + console.error("Streaming invocation failed:", result.reason.message); + } + } + + return foundInstanceIds; +} + +// --- Main Lambda Handler --- + +/** + * Main handler that warms a target Lambda function by invoking it multiple times. + * It can handle both standard and response-streaming target functions. + */ export const handler = async (event: {}) => { - const { lambdaName, numInstancesStr, maxWavesStr } = { + const { lambdaName, numInstancesStr, maxWavesStr, isStreaming } = { lambdaName: process.env.LAMBDA_NAME, numInstancesStr: process.env.NUM_INSTANCES, maxWavesStr: process.env.MAX_WAVES, + isStreaming: (process.env.IS_STREAMING || "false").toLowerCase() === "true", // e.g., 'true' or 'false' }; if (!lambdaName || !numInstancesStr) { @@ -59,36 +116,40 @@ export const handler = async (event: {}) => { } const numInstances = parseInt(numInstancesStr, 10); - // Default to 5 waves if MAX_WAVES is not set const maxWaves = parseInt(maxWavesStr || "5", 10); let totalInvocations = 0; let wavesCompleted = 0; const uniqueInstanceIds = new Set(); + console.log(`Warming target: ${lambdaName} (Streaming: ${isStreaming})`); + for (let i = 1; i <= maxWaves; i++) { wavesCompleted = i; - - // Calculate how many more instances are needed const neededCount = numInstances - uniqueInstanceIds.size; if (neededCount <= 0) { console.log("Target met. No more waves needed."); break; } - console.log(`--- Wave ${i} of ${maxWaves} ---`); - const newIds = await invokeBatch(lambdaName, numInstances); + console.log(`-- - Wave ${i} of ${maxWaves} --- `); + + // Choose the correct invoker function based on the flag + const newIds = isStreaming + ? await invokeStreamingBatch(lambdaName, numInstances) + : await invokeStandardBatch(lambdaName, numInstances); + totalInvocations += numInstances; newIds.forEach((id) => uniqueInstanceIds.add(id)); console.log( - `Wave ${i} complete. Found ${uniqueInstanceIds.size} of ${numInstances} unique instances.`, + `Wave ${i} complete.Found ${uniqueInstanceIds.size} of ${numInstances} unique instances.`, ); } console.log( - `Warming complete. Found ${uniqueInstanceIds.size} unique instances from ${totalInvocations} total invocations over ${wavesCompleted} waves.`, + `Warming complete.Found ${uniqueInstanceIds.size} unique instances from ${totalInvocations} total invocations over ${wavesCompleted} waves.`, ); return { @@ -103,9 +164,23 @@ export const handler = async (event: {}) => { }; }; +// --- Local Test Execution Block --- + +// This block runs only when the file is executed directly (e.g., `node index.js`) if (import.meta.url === `file://${process.argv[1]}`) { - process.env.LAMBDA_NAME = "infra-core-api-lambda"; + // --- Configuration for local testing --- + process.env.LAMBDA_NAME = "my-target-lambda-function-name"; process.env.NUM_INSTANCES = "3"; - process.env.MAX_WAVES = "3"; // Configurable number of waves - console.log(await handler({})); + process.env.MAX_WAVES = "5"; + process.env.IS_STREAMING = "false"; // Set to 'true' to test streaming + + console.log("Running warmer in local test mode..."); + handler({}) + .then((result) => { + console.log("\n--- Final Result ---"); + console.log(JSON.parse(result.body)); + }) + .catch((error) => { + console.error("Local test run failed:", error); + }); } diff --git a/terraform/modules/lambdas/main.tf b/terraform/modules/lambdas/main.tf index 0f911969..daf16821 100644 --- a/terraform/modules/lambdas/main.tf +++ b/terraform/modules/lambdas/main.tf @@ -418,12 +418,12 @@ resource "aws_lambda_function_url" "slow_api_lambda_function_url" { } module "lambda_warmer_main" { - source = "github.com/acm-uiuc/terraform-modules/lambda-warmer?ref=v1.0.1" + source = "github.com/acm-uiuc/terraform-modules/lambda-warmer?ref=2677e28b3a8d1b9c8699a2d9c8c36400c1ef9922" function_to_warm = local.core_api_lambda_name } module "lambda_warmer_slow" { - source = "github.com/acm-uiuc/terraform-modules/lambda-warmer?ref=v1.0.1" + source = "github.com/acm-uiuc/terraform-modules/lambda-warmer?ref=2677e28b3a8d1b9c8699a2d9c8c36400c1ef9922" function_to_warm = local.core_api_slow_lambda_name } From 0ffc30a1c59caff56853e1df832eb84c2cc8afc6 Mon Sep 17 00:00:00 2001 From: Dev Singh Date: Sat, 9 Aug 2025 18:26:07 -0500 Subject: [PATCH 10/11] fix warmer --- .gitignore | 1 + src/api/lambda.ts | 15 +++++++++------ terraform/modules/lambdas/main.tf | 10 ++++++---- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index a7bb391d..ebd695bf 100644 --- a/.gitignore +++ b/.gitignore @@ -144,3 +144,4 @@ __pycache__ dist_devel/ !src/ui/pages/logs src/api/package.lambda.json +tfplan diff --git a/src/api/lambda.ts b/src/api/lambda.ts index defa3635..606dac11 100644 --- a/src/api/lambda.ts +++ b/src/api/lambda.ts @@ -38,10 +38,12 @@ const validateOriginHeader = ( // This handler now correctly uses the native streaming support from the packages. export const handler = awslambda.streamifyResponse( async (event: any, responseStream: any, context: any) => { - // 1. Handle warmer events + context.callbackWaitsForEmptyEventLoop = false; if ("action" in event && event.action === "warmer") { - responseStream.write(JSON.stringify({ instanceId })); - responseStream.end(); + const requestStream = Readable.from( + Buffer.from(JSON.stringify({ instanceId })), + ); + await pipeline(requestStream, responseStream); return; } @@ -68,17 +70,18 @@ export const handler = awslambda.streamifyResponse( statusCode: error.httpStatusCode, headers: { "Content-Type": "application/json" }, }; - const httpStream = awslambda.HttpResponseStream.from( + responseStream = awslambda.HttpResponseStream.from( responseStream, meta, ); - httpStream.write(body); - httpStream.end(); + const requestStream = Readable.from(Buffer.from(body)); + await pipeline(requestStream, responseStream); return; } delete event.headers["x-origin-verify"]; } + console.log("calling proxy function"); const { stream, meta } = await proxy(event, context); // Fix issue with Lambda where streaming repsonses always require a body to be present const body = diff --git a/terraform/modules/lambdas/main.tf b/terraform/modules/lambdas/main.tf index daf16821..189c9926 100644 --- a/terraform/modules/lambdas/main.tf +++ b/terraform/modules/lambdas/main.tf @@ -418,13 +418,15 @@ resource "aws_lambda_function_url" "slow_api_lambda_function_url" { } module "lambda_warmer_main" { - source = "github.com/acm-uiuc/terraform-modules/lambda-warmer?ref=2677e28b3a8d1b9c8699a2d9c8c36400c1ef9922" - function_to_warm = local.core_api_lambda_name + source = "github.com/acm-uiuc/terraform-modules/lambda-warmer?ref=b52f22e32c6c07af9b1b4750a226882aaccc769d" + function_to_warm = local.core_api_lambda_name + is_streaming_lambda = true } module "lambda_warmer_slow" { - source = "github.com/acm-uiuc/terraform-modules/lambda-warmer?ref=2677e28b3a8d1b9c8699a2d9c8c36400c1ef9922" - function_to_warm = local.core_api_slow_lambda_name + source = "github.com/acm-uiuc/terraform-modules/lambda-warmer?ref=b52f22e32c6c07af9b1b4750a226882aaccc769d" + function_to_warm = local.core_api_slow_lambda_name + is_streaming_lambda = true } From 569c87b5fe4dab93045d6d5fe1b4eec5a3d1885b Mon Sep 17 00:00:00 2001 From: Dev Singh Date: Sat, 9 Aug 2025 18:26:46 -0500 Subject: [PATCH 11/11] remove console log --- src/api/lambda.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/api/lambda.ts b/src/api/lambda.ts index 606dac11..ed59070d 100644 --- a/src/api/lambda.ts +++ b/src/api/lambda.ts @@ -81,7 +81,6 @@ export const handler = awslambda.streamifyResponse( delete event.headers["x-origin-verify"]; } - console.log("calling proxy function"); const { stream, meta } = await proxy(event, context); // Fix issue with Lambda where streaming repsonses always require a body to be present const body =