Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/dirty-shrimps-film.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@opennextjs/cloudflare": patch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"@opennextjs/cloudflare": patch
"@opennextjs/cloudflare": minor

---

add regional replicas for the sharded tag cache
3 changes: 3 additions & 0 deletions examples/e2e/app-router/open-next.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ export default defineCloudflareConfig({
shardReplication: {
numberOfSoftReplicas: 8,
numberOfHardReplicas: 2,
regionalReplicationOptions: {
defaultRegion: "enam",
},
},
}),
queue: doQueue,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";

import shardedDOTagCache, { DOId } from "./do-sharded-tag-cache";
import shardedDOTagCache, { AVAILABLE_REGIONS, DOId } from "./do-sharded-tag-cache";

const hasBeenRevalidatedMock = vi.fn();
const writeTagsMock = vi.fn();
Expand All @@ -9,6 +9,8 @@ const getMock = vi
.fn()
.mockReturnValue({ hasBeenRevalidated: hasBeenRevalidatedMock, writeTags: writeTagsMock });
const waitUntilMock = vi.fn().mockImplementation(async (fn) => fn());
// @ts-expect-error - We define it here only for the test
globalThis.continent = undefined;
const sendDLQMock = vi.fn();
vi.mock("../../cloudflare-context", () => ({
getCloudflareContext: () => ({
Expand All @@ -19,6 +21,10 @@ vi.mock("../../cloudflare-context", () => ({
},
},
ctx: { waitUntil: waitUntilMock },
cf: {
// @ts-expect-error - We define it here only for the test
continent: globalThis.continent,
},
}),
}));

Expand Down Expand Up @@ -108,6 +114,87 @@ describe("DOShardedTagCache", () => {
expect(secondDOId?.replicaId).toBeGreaterThanOrEqual(1);
expect(secondDOId?.replicaId).toBeLessThanOrEqual(2);
});

it("should generate one doIds, but in the default region", () => {
const cache = shardedDOTagCache({
baseShardSize: 4,
shardReplication: {
numberOfSoftReplicas: 2,
numberOfHardReplicas: 2,
regionalReplicationOptions: {
defaultRegion: "enam",
},
},
});
const shardedTagCollection = cache.groupTagsByDO({
tags: ["tag1", "_N_T_/tag1"],
generateAllReplicas: false,
});
expect(shardedTagCollection.length).toBe(2);
const firstDOId = shardedTagCollection[0]?.doId;
const secondDOId = shardedTagCollection[1]?.doId;

expect(firstDOId?.shardId).toBe("tag-soft;shard-3");
expect(firstDOId?.region).toBe("enam");
expect(secondDOId?.shardId).toBe("tag-hard;shard-1");
expect(secondDOId?.region).toBe("enam");

// We still need to check if the last part is between the correct boundaries
expect(firstDOId?.replicaId).toBeGreaterThanOrEqual(1);
expect(firstDOId?.replicaId).toBeLessThanOrEqual(2);

expect(secondDOId?.replicaId).toBeGreaterThanOrEqual(1);
expect(secondDOId?.replicaId).toBeLessThanOrEqual(2);
});

it("should generate one doIds, but in the correct region", () => {
// @ts-expect-error - We define it here only for the test
globalThis.continent = "EU";
const cache = shardedDOTagCache({
baseShardSize: 4,
shardReplication: {
numberOfSoftReplicas: 2,
numberOfHardReplicas: 2,
regionalReplicationOptions: {
defaultRegion: "enam",
},
},
});
const shardedTagCollection = cache.groupTagsByDO({
tags: ["tag1", "_N_T_/tag1"],
generateAllReplicas: false,
});
expect(shardedTagCollection.length).toBe(2);
expect(shardedTagCollection[0]?.doId.region).toBe("weur");
expect(shardedTagCollection[1]?.doId.region).toBe("weur");

//@ts-expect-error - We need to reset the global variable
globalThis.continent = undefined;
});

it("should generate all the appropriate replicas in all the regions with enableRegionalReplication", () => {
const cache = shardedDOTagCache({
baseShardSize: 4,
shardReplication: {
numberOfSoftReplicas: 2,
numberOfHardReplicas: 2,
regionalReplicationOptions: {
defaultRegion: "enam",
},
},
});
const shardedTagCollection = cache.groupTagsByDO({
tags: ["tag1", "_N_T_/tag1"],
generateAllReplicas: true,
});
// 6 regions times 4 shards replica
expect(shardedTagCollection.length).toBe(24);
shardedTagCollection.forEach(({ doId }) => {
expect(AVAILABLE_REGIONS).toContain(doId.region);
// It should end with the region
expect(doId.key).toMatch(/tag-(soft|hard);shard-\d;replica-\d;region-(enam|weur|sam|afr|apac|oc)$/);
});
});
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ export const DEFAULT_NUM_SHARDS = 4;
export const NAME = "do-sharded-tag-cache";

const SOFT_TAG_PREFIX = "_N_T_/";
export const DEFAULT_REGION = "enam" as const;
export const AVAILABLE_REGIONS = ["enam", "weur", "apac", "sam", "afr", "oc"] as const;
type AllowedDurableObjectRegion = (typeof AVAILABLE_REGIONS)[number];

interface ShardedDOTagCacheOptions {
/**
Expand Down Expand Up @@ -63,6 +66,19 @@ interface ShardedDOTagCacheOptions {
shardReplication?: {
numberOfSoftReplicas: number;
numberOfHardReplicas: number;

/**
* Enable regional replication for the shards.
*
* If not set, no regional replication will be performed and durable objects will be created without a location hint
*
* Can be used to reduce latency for users in different regions and to spread the load across multiple regions.
*
* This will increase the number of durable objects created, as each shard will be replicated in all regions.
*/
regionalReplicationOptions?: {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we do not really need "Options"?

Suggested change
regionalReplicationOptions?: {
regionalReplication?: {

defaultRegion: AllowedDurableObjectRegion;
};
};

/**
Expand All @@ -78,23 +94,26 @@ interface DOIdOptions {
numberOfReplicas: number;
shardType: "soft" | "hard";
replicaId?: number;
region?: DurableObjectLocationHint;
}

export class DOId {
shardId: string;
replicaId: number;
region?: DurableObjectLocationHint;
constructor(public options: DOIdOptions) {
const { baseShardId, shardType, numberOfReplicas, replicaId } = options;
const { baseShardId, shardType, numberOfReplicas, replicaId, region } = options;
this.shardId = `tag-${shardType};${baseShardId}`;
this.replicaId = replicaId ?? this.generateRandomNumberBetween(1, numberOfReplicas);
this.region = region;
}

private generateRandomNumberBetween(min: number, max: number) {
return Math.floor(Math.random() * (max - min + 1) + min);
}

get key() {
return `${this.shardId};replica-${this.replicaId}`;
return `${this.shardId};replica-${this.replicaId}${this.region ? `;region-${this.region}` : ""}`;
}
}

Expand All @@ -109,20 +128,28 @@ class ShardedDOTagCache implements NextModeTagCache {
readonly numSoftReplicas: number;
readonly numHardReplicas: number;
readonly maxWriteRetries: number;
readonly enableRegionalReplication: boolean;
readonly defaultRegion: AllowedDurableObjectRegion;
localCache?: Cache;

constructor(private opts: ShardedDOTagCacheOptions = { baseShardSize: DEFAULT_NUM_SHARDS }) {
this.numSoftReplicas = opts.shardReplication?.numberOfSoftReplicas ?? 1;
this.numHardReplicas = opts.shardReplication?.numberOfHardReplicas ?? 1;
this.maxWriteRetries = opts.maxWriteRetries ?? DEFAULT_WRITE_RETRIES;
this.enableRegionalReplication = Boolean(opts.shardReplication?.regionalReplicationOptions);
this.defaultRegion = opts.shardReplication?.regionalReplicationOptions?.defaultRegion ?? DEFAULT_REGION;
}

private getDurableObjectStub(doId: DOId) {
const durableObject = getCloudflareContext().env.NEXT_TAG_CACHE_DO_SHARDED;
if (!durableObject) throw new IgnorableError("No durable object binding for cache revalidation");

const id = durableObject.idFromName(doId.key);
return durableObject.get(id);
debug("[shardedTagCache] - Accessing Durable Object : ", {
key: doId.key,
region: doId.region,
});
return durableObject.get(id, { locationHint: doId.region });
}

/**
Expand All @@ -143,10 +170,14 @@ class ShardedDOTagCache implements NextModeTagCache {
}) {
let replicaIndexes: Array<number | undefined> = [1];
const isSoft = shardType === "soft";
const numReplicas = isSoft ? this.numSoftReplicas : this.numHardReplicas;
replicaIndexes = generateAllReplicas ? Array.from({ length: numReplicas }, (_, i) => i + 1) : [undefined];

return replicaIndexes.flatMap((replicaId) => {
let numReplicas = 1;
if (this.opts.shardReplication) {
numReplicas = isSoft ? this.numSoftReplicas : this.numHardReplicas;
replicaIndexes = generateAllReplicas
? Array.from({ length: numReplicas }, (_, i) => i + 1)
: [undefined];
}
const regionalReplicas = replicaIndexes.flatMap((replicaId) => {
return tags
.filter((tag) => (isSoft ? tag.startsWith(SOFT_TAG_PREFIX) : !tag.startsWith(SOFT_TAG_PREFIX)))
.map((tag) => {
Expand All @@ -161,6 +192,51 @@ class ShardedDOTagCache implements NextModeTagCache {
};
});
});
if (!this.enableRegionalReplication) return regionalReplicas;

// If we have regional replication enabled, we need to further duplicate the shards in all the regions
const regionalReplicasInAllRegions = generateAllReplicas
? regionalReplicas.flatMap(({ doId, tag }) => {
return AVAILABLE_REGIONS.map((region) => {
return {
doId: new DOId({
baseShardId: doId.options.baseShardId,
numberOfReplicas: numReplicas,
shardType,
replicaId: doId.replicaId,
region,
}),
tag,
};
});
})
: regionalReplicas.map(({ doId, tag }) => {
doId.region = this.getClosestRegion();
return { doId, tag };
});
return regionalReplicasInAllRegions;
}

getClosestRegion() {
const continent = getCloudflareContext().cf?.continent;
if (!continent) return this.defaultRegion;
debug("[shardedTagCache] - Continent : ", continent);
switch (continent) {
case "AF":
return "afr";
case "AS":
return "apac";
case "EU":
return "weur";
case "NA":
return "enam";
case "OC":
return "oc";
case "SA":
return "sam";
default:
return this.defaultRegion;
}
}

/**
Expand Down