Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/dull-lobsters-hope.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@opennextjs/aws": patch
---

lazily initialize lite client
60 changes: 29 additions & 31 deletions packages/open-next/src/cache/incremental/s3-lite.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
import { AwsClient } from "aws4fetch";
import path from "path";
import { IgnorableError, RecoverableError } from "utils/error";
Expand All @@ -7,24 +8,33 @@ import { parseNumberFromEnv } from "../../adapters/util";
import { Extension } from "../next-types";
import { IncrementalCache } from "./types";

const {
CACHE_BUCKET_REGION,
CACHE_BUCKET_KEY_PREFIX,
NEXT_BUILD_ID,
CACHE_BUCKET_NAME,
} = process.env;
let awsClient: AwsClient | null = null;

const awsClient = new AwsClient({
accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
sessionToken: process.env.AWS_SESSION_TOKEN,
region: CACHE_BUCKET_REGION,
retries: parseNumberFromEnv(process.env.AWS_SDK_S3_MAX_ATTEMPTS),
});
const getAwsClient = () => {
const { CACHE_BUCKET_REGION } = process.env;
if (awsClient) {
return awsClient;
} else {
awsClient = new AwsClient({
accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
sessionToken: process.env.AWS_SESSION_TOKEN,
region: CACHE_BUCKET_REGION,
retries: parseNumberFromEnv(process.env.AWS_SDK_S3_MAX_ATTEMPTS),
});
return awsClient;
}
};

const awsFetch = customFetchClient(awsClient);
const awsFetch = async (key: string, options: RequestInit) => {
const { CACHE_BUCKET_REGION, CACHE_BUCKET_NAME } = process.env;
const client = getAwsClient();
const url = `https://${CACHE_BUCKET_NAME}.s3.${CACHE_BUCKET_REGION}.amazonaws.com/${key}`;
return customFetchClient(client)(url, options);
};

function buildS3Key(key: string, extension: Extension) {
const { CACHE_BUCKET_KEY_PREFIX, NEXT_BUILD_ID } = process.env;
return path.posix.join(
CACHE_BUCKET_KEY_PREFIX ?? "",
extension === "fetch" ? "__fetch" : "",
Expand All @@ -36,10 +46,7 @@ function buildS3Key(key: string, extension: Extension) {
const incrementalCache: IncrementalCache = {
async get(key, isFetch) {
const result = await awsFetch(
`https://${CACHE_BUCKET_NAME}.s3.${CACHE_BUCKET_REGION}.amazonaws.com/${buildS3Key(
key,
isFetch ? "fetch" : "cache",
)}`,
buildS3Key(key, isFetch ? "fetch" : "cache"),
{
method: "GET",
},
Expand All @@ -61,10 +68,7 @@ const incrementalCache: IncrementalCache = {
},
async set(key, value, isFetch): Promise<void> {
const response = await awsFetch(
`https://${CACHE_BUCKET_NAME}.s3.${CACHE_BUCKET_REGION}.amazonaws.com/${buildS3Key(
key,
isFetch ? "fetch" : "cache",
)}`,
buildS3Key(key, isFetch ? "fetch" : "cache"),
{
method: "PUT",
body: JSON.stringify(value),
Expand All @@ -75,15 +79,9 @@ const incrementalCache: IncrementalCache = {
}
},
async delete(key): Promise<void> {
const response = await awsFetch(
`https://${CACHE_BUCKET_NAME}.s3.${CACHE_BUCKET_REGION}.amazonaws.com/${buildS3Key(
key,
"cache",
)}`,
{
method: "DELETE",
},
);
const response = await awsFetch(buildS3Key(key, "cache"), {
method: "DELETE",
});
if (response.status !== 204) {
throw new RecoverableError(`Failed to delete cache: ${response.status}`);
}
Expand Down
150 changes: 76 additions & 74 deletions packages/open-next/src/cache/tag/dynamodb-lite.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
import { AwsClient } from "aws4fetch";
import path from "path";
import { RecoverableError } from "utils/error";
Expand All @@ -11,18 +12,46 @@ import {
} from "./constants";
import { TagCache } from "./types";

const { CACHE_BUCKET_REGION, CACHE_DYNAMO_TABLE, NEXT_BUILD_ID } = process.env;
let awsClient: AwsClient | null = null;

const awsClient = new AwsClient({
accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
sessionToken: process.env.AWS_SESSION_TOKEN,
region: CACHE_BUCKET_REGION,
retries: parseNumberFromEnv(process.env.AWS_SDK_S3_MAX_ATTEMPTS),
});
const awsFetch = customFetchClient(awsClient);
const getAwsClient = () => {
const { CACHE_BUCKET_REGION } = process.env;
if (awsClient) {
return awsClient;
} else {
awsClient = new AwsClient({
accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
sessionToken: process.env.AWS_SESSION_TOKEN,
region: CACHE_BUCKET_REGION,
retries: parseNumberFromEnv(process.env.AWS_SDK_S3_MAX_ATTEMPTS),
});
return awsClient;
}
};
const awsFetch = (
body: RequestInit["body"],
type: "query" | "batchWrite" = "query",
) => {
const { CACHE_BUCKET_REGION } = process.env;
const client = getAwsClient();
return customFetchClient(client)(
`https://dynamodb.${CACHE_BUCKET_REGION}.amazonaws.com`,
{
method: "POST",
headers: {
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": `DynamoDB_20120810.${
type === "query" ? "Query" : "BatchWriteItem"
}`,
},
body,
},
);
};

function buildDynamoKey(key: string) {
const { NEXT_BUILD_ID } = process.env;
// FIXME: We should probably use something else than path.join here
// this could transform some fetch cache key into a valid path
return path.posix.join(NEXT_BUILD_ID ?? "", key);
Expand All @@ -40,26 +69,19 @@ const tagCache: TagCache = {
async getByPath(path) {
try {
if (globalThis.disableDynamoDBCache) return [];
const { CACHE_DYNAMO_TABLE, NEXT_BUILD_ID } = process.env;
const result = await awsFetch(
`https://dynamodb.${CACHE_BUCKET_REGION}.amazonaws.com`,
{
method: "POST",
headers: {
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": "DynamoDB_20120810.Query",
JSON.stringify({
TableName: CACHE_DYNAMO_TABLE,
IndexName: "revalidate",
KeyConditionExpression: "#key = :key",
ExpressionAttributeNames: {
"#key": "path",
},
body: JSON.stringify({
TableName: CACHE_DYNAMO_TABLE,
IndexName: "revalidate",
KeyConditionExpression: "#key = :key",
ExpressionAttributeNames: {
"#key": "path",
},
ExpressionAttributeValues: {
":key": { S: buildDynamoKey(path) },
},
}),
},
ExpressionAttributeValues: {
":key": { S: buildDynamoKey(path) },
},
}),
);
if (result.status !== 200) {
throw new RecoverableError(
Expand All @@ -80,25 +102,18 @@ const tagCache: TagCache = {
async getByTag(tag) {
try {
if (globalThis.disableDynamoDBCache) return [];
const { CACHE_DYNAMO_TABLE, NEXT_BUILD_ID } = process.env;
const result = await awsFetch(
`https://dynamodb.${CACHE_BUCKET_REGION}.amazonaws.com`,
{
method: "POST",
headers: {
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": "DynamoDB_20120810.Query",
JSON.stringify({
TableName: CACHE_DYNAMO_TABLE,
KeyConditionExpression: "#tag = :tag",
ExpressionAttributeNames: {
"#tag": "tag",
},
body: JSON.stringify({
TableName: CACHE_DYNAMO_TABLE,
KeyConditionExpression: "#tag = :tag",
ExpressionAttributeNames: {
"#tag": "tag",
},
ExpressionAttributeValues: {
":tag": { S: buildDynamoKey(tag) },
},
}),
},
ExpressionAttributeValues: {
":tag": { S: buildDynamoKey(tag) },
},
}),
);
if (result.status !== 200) {
throw new RecoverableError(`Failed to get by tag: ${result.status}`);
Expand All @@ -119,29 +134,22 @@ const tagCache: TagCache = {
async getLastModified(key, lastModified) {
try {
if (globalThis.disableDynamoDBCache) return lastModified ?? Date.now();
const { CACHE_DYNAMO_TABLE } = process.env;
const result = await awsFetch(
`https://dynamodb.${CACHE_BUCKET_REGION}.amazonaws.com`,
{
method: "POST",
headers: {
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": "DynamoDB_20120810.Query",
JSON.stringify({
TableName: CACHE_DYNAMO_TABLE,
IndexName: "revalidate",
KeyConditionExpression:
"#key = :key AND #revalidatedAt > :lastModified",
ExpressionAttributeNames: {
"#key": "path",
"#revalidatedAt": "revalidatedAt",
},
body: JSON.stringify({
TableName: CACHE_DYNAMO_TABLE,
IndexName: "revalidate",
KeyConditionExpression:
"#key = :key AND #revalidatedAt > :lastModified",
ExpressionAttributeNames: {
"#key": "path",
"#revalidatedAt": "revalidatedAt",
},
ExpressionAttributeValues: {
":key": { S: buildDynamoKey(key) },
":lastModified": { N: String(lastModified ?? 0) },
},
}),
},
ExpressionAttributeValues: {
":key": { S: buildDynamoKey(key) },
":lastModified": { N: String(lastModified ?? 0) },
},
}),
);
if (result.status !== 200) {
throw new RecoverableError(
Expand All @@ -159,6 +167,7 @@ const tagCache: TagCache = {
},
async writeTags(tags) {
try {
const { CACHE_DYNAMO_TABLE } = process.env;
if (globalThis.disableDynamoDBCache) return;
const dataChunks = chunk(tags, MAX_DYNAMO_BATCH_WRITE_ITEM_COUNT).map(
(Items) => ({
Expand All @@ -181,15 +190,8 @@ const tagCache: TagCache = {
await Promise.all(
paramsChunk.map(async (params) => {
const response = await awsFetch(
`https://dynamodb.${CACHE_BUCKET_REGION}.amazonaws.com`,
{
method: "POST",
headers: {
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": "DynamoDB_20120810.BatchWriteItem",
},
body: JSON.stringify(params),
},
JSON.stringify(params),
"batchWrite",
);
if (response.status !== 200) {
throw new RecoverableError(
Expand Down
60 changes: 37 additions & 23 deletions packages/open-next/src/queue/sqs-lite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,48 @@ import { customFetchClient } from "utils/fetch";
import { error } from "../adapters/logger";
import { Queue } from "./types";

// Expected environment variables
const { REVALIDATION_QUEUE_REGION, REVALIDATION_QUEUE_URL } = process.env;
let awsClient: AwsClient | null = null;

const awsClient = new AwsClient({
accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
sessionToken: process.env.AWS_SESSION_TOKEN,
region: REVALIDATION_QUEUE_REGION,
});
const awsFetch = customFetchClient(awsClient);
const getAwsClient = () => {
if (awsClient) {
return awsClient;
} else {
awsClient = new AwsClient({
accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
sessionToken: process.env.AWS_SESSION_TOKEN,
region: process.env.REVALIDATION_QUEUE_REGION,
});
return awsClient;
}
};

const awsFetch = (body: RequestInit["body"]) => {
const { REVALIDATION_QUEUE_REGION } = process.env;
const client = getAwsClient();
return customFetchClient(client)(
`https://sqs.${REVALIDATION_QUEUE_REGION ?? "us-east-1"}.amazonaws.com`,
{
method: "POST",
headers: {
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": "AmazonSQS.SendMessage",
},
body,
},
);
};
const queue: Queue = {
send: async ({ MessageBody, MessageDeduplicationId, MessageGroupId }) => {
try {
const { REVALIDATION_QUEUE_URL } = process.env;
const result = await awsFetch(
`https://sqs.${REVALIDATION_QUEUE_REGION ?? "us-east-1"}.amazonaws.com`,
{
method: "POST",
headers: {
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": "AmazonSQS.SendMessage",
},
body: JSON.stringify({
QueueUrl: REVALIDATION_QUEUE_URL,
MessageBody: JSON.stringify(MessageBody),
MessageDeduplicationId,
MessageGroupId,
}),
},
JSON.stringify({
QueueUrl: REVALIDATION_QUEUE_URL,
MessageBody: JSON.stringify(MessageBody),
MessageDeduplicationId,
MessageGroupId,
}),
);
if (result.status !== 200) {
throw new RecoverableError(`Failed to send message: ${result.status}`);
Expand Down
Loading