diff --git a/.gitignore b/.gitignore index a7bb391d..ebd695bf 100644 --- a/.gitignore +++ b/.gitignore @@ -144,3 +144,4 @@ __pycache__ dist_devel/ !src/ui/pages/logs src/api/package.lambda.json +tfplan diff --git a/eslint.config.mjs b/eslint.config.mjs index 33195e74..c0dd8c1a 100644 --- a/eslint.config.mjs +++ b/eslint.config.mjs @@ -7,6 +7,7 @@ import { fileURLToPath } from "node:url"; import js from "@eslint/js"; import { FlatCompat } from "@eslint/eslintrc"; import mantine from "eslint-config-mantine"; +import globals from 'globals'; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); @@ -74,6 +75,15 @@ export default defineConfig([ files: ["src/api/build.js"], rules: { "import/extensions": "off" }, }, + { + files: ["src/api/lambda.ts"], + languageOptions: { + globals: { + ...globals.node, + 'awslambda': 'readonly' + } + } + }, { files: ["src/ui/*", "src/ui/**/*"], rules: { diff --git a/src/api/lambda.ts b/src/api/lambda.ts index 8e420071..ed59070d 100644 --- a/src/api/lambda.ts +++ b/src/api/lambda.ts @@ -1,118 +1,94 @@ import awsLambdaFastify from "@fastify/aws-lambda"; +import { pipeline } from "node:stream/promises"; import init, { instanceId } from "./index.js"; -import { type APIGatewayEvent, type Context } from "aws-lambda"; import { InternalServerError, ValidationError } from "common/errors/index.js"; +import { Readable } from "node:stream"; +// Initialize the proxy with the payloadAsStream option const app = await init(); -const realHandler = awsLambdaFastify(app, { +const proxy = awsLambdaFastify(app, { + payloadAsStream: true, decorateRequest: false, - serializeLambdaArguments: true, callbackWaitsForEmptyEventLoop: false, - binaryMimeTypes: ["application/octet-stream", "application/vnd.apple.pkpass"], + serializeLambdaArguments: true, + binaryMimeTypes: ["application/octet-stream", "application/vnd.apple.pkpass"], // from original code }); -type WarmerEvent = { action: "warmer" }; - -/** - * Validates the origin verification header against the current and previous keys. - * @returns {boolean} `true` if the request is valid, otherwise `false`. - */ const validateOriginHeader = ( - originHeader: string | undefined, + originHeader: string, currentKey: string, previousKey: string | undefined, previousKeyExpiresAt: string | undefined, -): boolean => { - // 1. A header must exist to be valid. +) => { if (!originHeader) { return false; } - - // 2. Check against the current key first for an early return on the happy path. if (originHeader === currentKey) { return true; } - - // 3. If it's not the current key, check the previous key during the rotation window. if (previousKey && previousKeyExpiresAt) { const isExpired = new Date() >= new Date(previousKeyExpiresAt); if (originHeader === previousKey && !isExpired) { return true; } } - - // 4. If all checks fail, the header is invalid. return false; }; -const handler = async ( - event: APIGatewayEvent | WarmerEvent, - context: Context, -) => { - if ("action" in event && event.action === "warmer") { - return { instanceId }; - } - event = event as APIGatewayEvent; +// This handler now correctly uses the native streaming support from the packages. +export const handler = awslambda.streamifyResponse( + async (event: any, responseStream: any, context: any) => { + context.callbackWaitsForEmptyEventLoop = false; + if ("action" in event && event.action === "warmer") { + const requestStream = Readable.from( + Buffer.from(JSON.stringify({ instanceId })), + ); + await pipeline(requestStream, responseStream); + return; + } - const currentKey = process.env.ORIGIN_VERIFY_KEY; - const previousKey = process.env.PREVIOUS_ORIGIN_VERIFY_KEY; - const previousKeyExpiresAt = - process.env.PREVIOUS_ORIGIN_VERIFY_KEY_EXPIRES_AT; + // 2. Perform origin header validation before calling the proxy + const currentKey = process.env.ORIGIN_VERIFY_KEY; + if (currentKey) { + const previousKey = process.env.PREVIOUS_ORIGIN_VERIFY_KEY; + const previousKeyExpiresAt = + process.env.PREVIOUS_ORIGIN_VERIFY_KEY_EXPIRES_AT; - // Log an error if the previous key has expired but is still configured. - if (previousKey && previousKeyExpiresAt) { - if (new Date() >= new Date(previousKeyExpiresAt)) { - console.error( - "Expired previous origin verify key is still present in the environment. Expired at:", + const isValid = validateOriginHeader( + event.headers?.["x-origin-verify"], + currentKey, + previousKey, previousKeyExpiresAt, ); - } - } - // Proceed with verification only if a current key is set. - if (currentKey) { - const isValid = validateOriginHeader( - event.headers?.["x-origin-verify"], - currentKey, - previousKey, - previousKeyExpiresAt, - ); + if (!isValid) { + const error = new ValidationError({ message: "Request is not valid." }); + const body = JSON.stringify(error.toJson()); - if (!isValid) { - const newError = new ValidationError({ - message: "Request is not valid.", - }); - const json = JSON.stringify(newError.toJson()); - return { - statusCode: newError.httpStatusCode, - body: json, - headers: { - "Content-Type": "application/json", - }, - isBase64Encoded: false, - }; + // On validation failure, manually create the response + const meta = { + statusCode: error.httpStatusCode, + headers: { "Content-Type": "application/json" }, + }; + responseStream = awslambda.HttpResponseStream.from( + responseStream, + meta, + ); + const requestStream = Readable.from(Buffer.from(body)); + await pipeline(requestStream, responseStream); + return; + } + delete event.headers["x-origin-verify"]; } - delete event.headers["x-origin-verify"]; - } - - // If verification is disabled or passed, proceed with the real handler logic. - return await realHandler(event, context).catch((e) => { - console.error(e); - const newError = new InternalServerError({ - message: "Failed to initialize application.", - }); - const json = JSON.stringify(newError.toJson()); - return { - statusCode: newError.httpStatusCode, - body: json, - headers: { - "Content-Type": "application/json", - }, - isBase64Encoded: false, - }; - }); -}; - -await app.ready(); -export { handler }; + const { stream, meta } = await proxy(event, context); + // Fix issue with Lambda where streaming repsonses always require a body to be present + const body = + stream.readableLength > 0 ? stream : Readable.from(Buffer.from(" ")); + responseStream = awslambda.HttpResponseStream.from( + responseStream, + meta as any, + ); + await pipeline(body, responseStream); + }, +); diff --git a/src/api/warmer/lambda.ts b/src/api/warmer/lambda.ts index fef0f6d2..c476308c 100644 --- a/src/api/warmer/lambda.ts +++ b/src/api/warmer/lambda.ts @@ -1,29 +1,31 @@ -import { LambdaClient, InvokeCommand } from "@aws-sdk/client-lambda"; +import { + LambdaClient, + InvokeCommand, + InvokeWithResponseStreamCommand, +} from "@aws-sdk/client-lambda"; import { TextDecoder } from "util"; +// --- AWS SDK Clients and Utilities --- const lambdaClient = new LambdaClient({}); const textDecoder = new TextDecoder(); +// --- Invocation Logic for Standard Lambdas --- + /** - * Invokes a batch of lambdas concurrently and returns the unique instance IDs found. + * Invokes a batch of standard (non-streaming) Lambdas concurrently. */ -async function invokeBatch( +async function invokeStandardBatch( lambdaName: string, count: number, ): Promise> { - if (count <= 0) { - return new Set(); - } - - console.log(`Firing a batch of ${count} concurrent invocations...`); - - const invocationPromises = Array.from({ length: count }, () => { - const command = new InvokeCommand({ - FunctionName: lambdaName, - Payload: JSON.stringify({ action: "warmer" }), - }); - return lambdaClient.send(command); - }); + const invocationPromises = Array.from({ length: count }, () => + lambdaClient.send( + new InvokeCommand({ + FunctionName: lambdaName, + Payload: JSON.stringify({ action: "warmer" }), + }), + ), + ); const results = await Promise.allSettled(invocationPromises); const foundInstanceIds = new Set(); @@ -37,21 +39,76 @@ async function invokeBatch( foundInstanceIds.add(body.instanceId); } } catch (e) { - console.error("Error parsing payload from target function:", e); + console.error("Error parsing payload from standard function:", e); } } else if (result.status === "rejected") { - console.error("Invocation failed:", result.reason.message); + console.error("Standard invocation failed:", result.reason.message); } }); return foundInstanceIds; } +// --- Invocation Logic for Streaming Lambdas --- + +/** + * Invokes a batch of response-streaming Lambdas concurrently. + */ +async function invokeStreamingBatch( + lambdaName: string, + count: number, +): Promise> { + const invocationPromises = Array.from({ length: count }, () => + lambdaClient.send( + new InvokeWithResponseStreamCommand({ + FunctionName: lambdaName, + Payload: JSON.stringify({ action: "warmer" }), + }), + ), + ); + + const results = await Promise.allSettled(invocationPromises); + const foundInstanceIds = new Set(); + + for (const result of results) { + if (result.status === "fulfilled" && result.value.EventStream) { + try { + const chunks: Uint8Array[] = []; + // Iterate over the EventStream to get data chunks + for await (const event of result.value.EventStream) { + if (event.PayloadChunk && event.PayloadChunk.Payload) { + chunks.push(event.PayloadChunk.Payload); + } + } + + const payloadString = textDecoder.decode(Buffer.concat(chunks)); + const body = JSON.parse(payloadString); + if (body.instanceId) { + foundInstanceIds.add(body.instanceId); + } + } catch (e) { + console.error("Error processing stream from streaming function:", e); + } + } else if (result.status === "rejected") { + console.error("Streaming invocation failed:", result.reason.message); + } + } + + return foundInstanceIds; +} + +// --- Main Lambda Handler --- + +/** + * Main handler that warms a target Lambda function by invoking it multiple times. + * It can handle both standard and response-streaming target functions. + */ export const handler = async (event: {}) => { - const { lambdaName, numInstancesStr, maxWavesStr } = { + const { lambdaName, numInstancesStr, maxWavesStr, isStreaming } = { lambdaName: process.env.LAMBDA_NAME, numInstancesStr: process.env.NUM_INSTANCES, maxWavesStr: process.env.MAX_WAVES, + isStreaming: (process.env.IS_STREAMING || "false").toLowerCase() === "true", // e.g., 'true' or 'false' }; if (!lambdaName || !numInstancesStr) { @@ -59,36 +116,40 @@ export const handler = async (event: {}) => { } const numInstances = parseInt(numInstancesStr, 10); - // Default to 5 waves if MAX_WAVES is not set const maxWaves = parseInt(maxWavesStr || "5", 10); let totalInvocations = 0; let wavesCompleted = 0; const uniqueInstanceIds = new Set(); + console.log(`Warming target: ${lambdaName} (Streaming: ${isStreaming})`); + for (let i = 1; i <= maxWaves; i++) { wavesCompleted = i; - - // Calculate how many more instances are needed const neededCount = numInstances - uniqueInstanceIds.size; if (neededCount <= 0) { console.log("Target met. No more waves needed."); break; } - console.log(`--- Wave ${i} of ${maxWaves} ---`); - const newIds = await invokeBatch(lambdaName, numInstances); + console.log(`-- - Wave ${i} of ${maxWaves} --- `); + + // Choose the correct invoker function based on the flag + const newIds = isStreaming + ? await invokeStreamingBatch(lambdaName, numInstances) + : await invokeStandardBatch(lambdaName, numInstances); + totalInvocations += numInstances; newIds.forEach((id) => uniqueInstanceIds.add(id)); console.log( - `Wave ${i} complete. Found ${uniqueInstanceIds.size} of ${numInstances} unique instances.`, + `Wave ${i} complete.Found ${uniqueInstanceIds.size} of ${numInstances} unique instances.`, ); } console.log( - `Warming complete. Found ${uniqueInstanceIds.size} unique instances from ${totalInvocations} total invocations over ${wavesCompleted} waves.`, + `Warming complete.Found ${uniqueInstanceIds.size} unique instances from ${totalInvocations} total invocations over ${wavesCompleted} waves.`, ); return { @@ -103,9 +164,23 @@ export const handler = async (event: {}) => { }; }; +// --- Local Test Execution Block --- + +// This block runs only when the file is executed directly (e.g., `node index.js`) if (import.meta.url === `file://${process.argv[1]}`) { - process.env.LAMBDA_NAME = "infra-core-api-lambda"; + // --- Configuration for local testing --- + process.env.LAMBDA_NAME = "my-target-lambda-function-name"; process.env.NUM_INSTANCES = "3"; - process.env.MAX_WAVES = "3"; // Configurable number of waves - console.log(await handler({})); + process.env.MAX_WAVES = "5"; + process.env.IS_STREAMING = "false"; // Set to 'true' to test streaming + + console.log("Running warmer in local test mode..."); + handler({}) + .then((result) => { + console.log("\n--- Final Result ---"); + console.log(JSON.parse(result.body)); + }) + .catch((error) => { + console.error("Local test run failed:", error); + }); } diff --git a/src/ui/pages/membershipLists/ExternalMemberListManagement.tsx b/src/ui/pages/membershipLists/ExternalMemberListManagement.tsx index f9774df3..428432a6 100644 --- a/src/ui/pages/membershipLists/ExternalMemberListManagement.tsx +++ b/src/ui/pages/membershipLists/ExternalMemberListManagement.tsx @@ -148,12 +148,6 @@ const ExternalMemberListManagement: React.FC< color: "green", }); await loadMembers(); - } catch (error) { - notifications.show({ - title: "Update Failed", - message: "Could not save changes to the member list.", - color: "red", - }); } finally { setIsLoading(false); setConfirmationModalOpened(false); diff --git a/src/ui/util/api.ts b/src/ui/util/api.ts index 22433558..f1650c37 100644 --- a/src/ui/util/api.ts +++ b/src/ui/util/api.ts @@ -4,7 +4,7 @@ import { useMemo } from "react"; import { useAuth } from "@ui/components/AuthContext"; import { getRunEnvironmentConfig, ValidService } from "@ui/config"; -export const MAX_API_TIMEOUT_MS = 5000; +export const MAX_API_TIMEOUT_MS = 10000; const createAxiosInstance = (baseURL: string) => axios.create({ diff --git a/terraform/modules/lambdas/main.tf b/terraform/modules/lambdas/main.tf index 9dcd00f6..189c9926 100644 --- a/terraform/modules/lambdas/main.tf +++ b/terraform/modules/lambdas/main.tf @@ -332,7 +332,7 @@ resource "aws_lambda_function" "api_lambda" { handler = "lambda.handler" runtime = "nodejs22.x" filename = data.archive_file.api_lambda_code.output_path - timeout = 60 + timeout = 15 memory_size = 2048 source_code_hash = data.archive_file.api_lambda_code.output_sha256 environment { @@ -378,6 +378,7 @@ resource "aws_lambda_function" "sqs_lambda" { resource "aws_lambda_function_url" "api_lambda_function_url" { function_name = aws_lambda_function.api_lambda.function_name authorization_type = "NONE" + invoke_mode = "RESPONSE_STREAM" } // Slow lambda - used for monitoring purposes to avoid triggering lamdba latency alarms @@ -389,7 +390,7 @@ resource "aws_lambda_function" "slow_lambda" { handler = "lambda.handler" runtime = "nodejs22.x" filename = data.archive_file.api_lambda_code.output_path - timeout = 60 + timeout = 15 memory_size = 2048 source_code_hash = data.archive_file.api_lambda_code.output_sha256 logging_config { @@ -413,16 +414,19 @@ resource "aws_lambda_function" "slow_lambda" { resource "aws_lambda_function_url" "slow_api_lambda_function_url" { function_name = aws_lambda_function.slow_lambda.function_name authorization_type = "NONE" + invoke_mode = "RESPONSE_STREAM" } module "lambda_warmer_main" { - source = "github.com/acm-uiuc/terraform-modules/lambda-warmer?ref=v1.0.1" - function_to_warm = local.core_api_lambda_name + source = "github.com/acm-uiuc/terraform-modules/lambda-warmer?ref=b52f22e32c6c07af9b1b4750a226882aaccc769d" + function_to_warm = local.core_api_lambda_name + is_streaming_lambda = true } module "lambda_warmer_slow" { - source = "github.com/acm-uiuc/terraform-modules/lambda-warmer?ref=v1.0.1" - function_to_warm = local.core_api_slow_lambda_name + source = "github.com/acm-uiuc/terraform-modules/lambda-warmer?ref=b52f22e32c6c07af9b1b4750a226882aaccc769d" + function_to_warm = local.core_api_slow_lambda_name + is_streaming_lambda = true } diff --git a/tests/live/iam.test.ts b/tests/live/iam.test.ts index 2fa84b46..f72f5801 100644 --- a/tests/live/iam.test.ts +++ b/tests/live/iam.test.ts @@ -10,7 +10,7 @@ import { getBaseEndpoint } from "./utils.js"; import { environmentConfig, genericConfig } from "../../src/common/config.js"; const baseEndpoint = getBaseEndpoint(); -test("getting groups", async () => { +test("getting groups", { timeout: 10000 }, async () => { const token = await createJwt(); const response = await fetch(`${baseEndpoint}/api/v1/iam/groups`, { method: "GET", diff --git a/tests/live/membership.test.ts b/tests/live/membership.test.ts index 9a535fc9..bd99c862 100644 --- a/tests/live/membership.test.ts +++ b/tests/live/membership.test.ts @@ -1,5 +1,6 @@ import { expect, test, describe } from "vitest"; import { createJwt, getBaseEndpoint } from "./utils.js"; +import { randomUUID } from "node:crypto"; const baseEndpoint = getBaseEndpoint(); const token = await createJwt(); @@ -232,3 +233,77 @@ describe("Membership API basic checks", async () => { }, ); }); + +test("External Membership List lifecycle test", async () => { + const unixTimestampSeconds = Math.floor(Date.now() / 1000); + const listId = `livetest-${unixTimestampSeconds}`; + let response = await fetch( + `${baseEndpoint}/api/v1/membership/externalList/${listId}`, + { + method: "PATCH", + headers: { + authorization: `Bearer ${token}`, + "content-type": "application/json", + }, + body: JSON.stringify({ + add: ["acmtest2"], + remove: [], + }), + }, + ); + expect(response.status).toBe(201); + response = await fetch( + `${baseEndpoint}/api/v1/membership/externalList/${listId}`, + { + method: "GET", + headers: { + authorization: `Bearer ${token}`, + "content-type": "application/json", + }, + }, + ); + let responseJson = await response.json(); + expect(responseJson).toStrictEqual(["acmtest2"]); + response = await fetch( + `${baseEndpoint}/api/v1/membership/externalList/${listId}`, + { + method: "PATCH", + headers: { + authorization: `Bearer ${token}`, + "content-type": "application/json", + }, + body: JSON.stringify({ + add: ["acmtest3"], + remove: ["acmtest2"], + }), + }, + ); + expect(response.status).toEqual(201); + response = await fetch( + `${baseEndpoint}/api/v1/membership/externalList/${listId}`, + { + method: "GET", + headers: { + authorization: `Bearer ${token}`, + "content-type": "application/json", + }, + }, + ); + responseJson = await response.json(); + expect(responseJson).toStrictEqual(["acmtest3"]); + response = await fetch( + `${baseEndpoint}/api/v1/membership/externalList/${listId}`, + { + method: "PATCH", + headers: { + authorization: `Bearer ${token}`, + "content-type": "application/json", + }, + body: JSON.stringify({ + remove: ["acmtest3"], + add: [], + }), + }, + ); + expect(response.status).toEqual(201); +});