|
| 1 | +import { DurableObject } from "cloudflare:workers"; |
| 2 | + |
| 3 | +import { internalPurgeCacheByTags } from "../overrides/internal"; |
| 4 | + |
| 5 | +const DEFAULT_BUFFER_TIME = 5; // seconds |
| 6 | + |
| 7 | +export class BucketCachePurge extends DurableObject<CloudflareEnv> { |
| 8 | + bufferTimeInSeconds: number; |
| 9 | + |
| 10 | + constructor(state: DurableObjectState, env: CloudflareEnv) { |
| 11 | + super(state, env); |
| 12 | + this.bufferTimeInSeconds = env.CACHE_BUFFER_TIME_IN_SECONDS |
| 13 | + ? parseInt(env.CACHE_BUFFER_TIME_IN_SECONDS) |
| 14 | + : DEFAULT_BUFFER_TIME; // Default buffer time |
| 15 | + |
| 16 | + // Initialize the sql table if it doesn't exist |
| 17 | + state.storage.sql.exec(` |
| 18 | + CREATE TABLE IF NOT EXISTS cache_purge ( |
| 19 | + tag TEXT NOT NULL |
| 20 | + ); |
| 21 | + CREATE UNIQUE INDEX IF NOT EXISTS tag_index ON cache_purge (tag); |
| 22 | + `); |
| 23 | + } |
| 24 | + |
| 25 | + async purgeCacheByTags(tags: string[]) { |
| 26 | + for (const tag of tags) { |
| 27 | + // Insert the tag into the sql table |
| 28 | + this.ctx.storage.sql.exec( |
| 29 | + ` |
| 30 | + INSERT OR REPLACE INTO cache_purge (tag) |
| 31 | + VALUES (?)`, |
| 32 | + [tag] |
| 33 | + ); |
| 34 | + } |
| 35 | + const nextAlarm = await this.ctx.storage.getAlarm(); |
| 36 | + if (!nextAlarm) { |
| 37 | + // Set an alarm to trigger the cache purge |
| 38 | + this.ctx.storage.setAlarm(Date.now() + this.bufferTimeInSeconds * 1000); |
| 39 | + } |
| 40 | + } |
| 41 | + |
| 42 | + override async alarm() { |
| 43 | + let tags = this.ctx.storage.sql |
| 44 | + .exec<{ tag: string }>( |
| 45 | + ` |
| 46 | + SELECT * FROM cache_purge LIMIT 100 |
| 47 | + ` |
| 48 | + ) |
| 49 | + .toArray(); |
| 50 | + do { |
| 51 | + await internalPurgeCacheByTags( |
| 52 | + this.env, |
| 53 | + tags.map((row) => row.tag) |
| 54 | + ); |
| 55 | + // Delete the tags from the sql table |
| 56 | + this.ctx.storage.sql.exec( |
| 57 | + ` |
| 58 | + DELETE FROM cache_purge |
| 59 | + WHERE tag IN (${tags.map(() => "?").join(",")}) |
| 60 | + `, |
| 61 | + tags.map((row) => row.tag) |
| 62 | + ); |
| 63 | + if (tags.length < 100) { |
| 64 | + // If we have less than 100 tags, we can stop |
| 65 | + tags = []; |
| 66 | + } else { |
| 67 | + // Otherwise, we need to get the next 100 tags |
| 68 | + tags = this.ctx.storage.sql |
| 69 | + .exec<{ tag: string }>( |
| 70 | + ` |
| 71 | + SELECT * FROM cache_purge LIMIT 100 |
| 72 | + ` |
| 73 | + ) |
| 74 | + .toArray(); |
| 75 | + } |
| 76 | + } while (tags.length > 0); |
| 77 | + } |
| 78 | +} |
0 commit comments