Skip to content

Commit fa0209e

Browse files
authored
Cache Read Queries
Cache read queries
2 parents 89d4eb6 + 2e4c328 commit fa0209e

File tree

5 files changed

+170
-26
lines changed

5 files changed

+170
-26
lines changed

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"@outerbase/sdk": "2.0.0-rc.3",
2020
"mongodb": "^6.11.0",
2121
"mysql2": "^3.11.4",
22-
"pg": "^8.13.1"
22+
"pg": "^8.13.1",
23+
"node-sql-parser": "^4.18.0"
2324
}
2425
}

pnpm-lock.yaml

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/cache/index.ts

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import { DataSource, Source } from "../types";
2+
const parser = new (require('node-sql-parser').Parser)();
3+
4+
async function createCacheTable(dataSource?: DataSource) {
5+
const statement = `
6+
CREATE TABLE IF NOT EXISTS "main"."tmp_cache"(
7+
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
8+
"timestamp" REAL NOT NULL,
9+
"ttl" INTEGER NOT NULL,
10+
"query" TEXT UNIQUE NOT NULL,
11+
"results" TEXT
12+
);
13+
`
14+
15+
await dataSource?.internalConnection?.durableObject.executeQuery(statement, undefined, false)
16+
}
17+
18+
function hasModifyingStatement(ast: any): boolean {
19+
// Check if current node is a modifying statement
20+
if (ast.type && ['insert', 'update', 'delete'].includes(ast.type.toLowerCase())) {
21+
return true;
22+
}
23+
24+
// Recursively check all properties of the AST
25+
for (const key in ast) {
26+
if (typeof ast[key] === 'object' && ast[key] !== null) {
27+
if (Array.isArray(ast[key])) {
28+
if (ast[key].some(item => hasModifyingStatement(item))) {
29+
return true;
30+
}
31+
} else if (hasModifyingStatement(ast[key])) {
32+
return true;
33+
}
34+
}
35+
}
36+
37+
return false;
38+
}
39+
40+
export async function beforeQueryCache(sql: string, params?: any[], dataSource?: DataSource): Promise<any | null> {
41+
// Currently we do not support caching queries that have dynamic parameters
42+
if (params?.length) return null
43+
44+
let ast = parser.astify(sql);
45+
46+
if (!hasModifyingStatement(ast) && dataSource?.source === Source.external && dataSource?.request.headers.has('X-Starbase-Cache')) {
47+
await createCacheTable(dataSource)
48+
const fetchCacheStatement = `SELECT timestamp, ttl, query, results FROM tmp_cache WHERE query = ?`
49+
const result = await dataSource.internalConnection?.durableObject.executeQuery(fetchCacheStatement, [sql], false) as any[];
50+
51+
if (result?.length) {
52+
const { timestamp, ttl, results } = result[0];
53+
const expirationTime = new Date(timestamp).getTime() + (ttl * 1000);
54+
55+
if (Date.now() < expirationTime) {
56+
return JSON.parse(results)
57+
}
58+
}
59+
}
60+
61+
return null
62+
}
63+
64+
// Serialized RPC arguemnts are limited to 1MiB in size at the moment for Cloudflare
65+
// Workers. An error may occur if we attempt to cache a value result that is greater
66+
// than that size but putting this here to disclose these restrictions. Potential optimizations
67+
// to look into include using Cloudflare Cache but need to find a good way to cache the
68+
// response in a safe way for our use case. Another option is another service for queues
69+
// or another way to ingest it directly to the Durable Object.
70+
export async function afterQueryCache(sql: string, params: any[] | undefined, result: any, dataSource?: DataSource) {
71+
// Currently we do not support caching queries that have dynamic parameters
72+
if (params?.length) return;
73+
74+
try {
75+
let ast = parser.astify(sql);
76+
77+
// If any modifying query exists within our SQL statement then we shouldn't proceed
78+
if (hasModifyingStatement(ast) ||
79+
!(dataSource?.source === Source.external && dataSource?.request.headers.has('X-Starbase-Cache'))) return;
80+
81+
const timestamp = Date.now();
82+
const results = JSON.stringify(result);
83+
84+
const exists = await dataSource.internalConnection?.durableObject.executeQuery(
85+
'SELECT 1 FROM tmp_cache WHERE query = ? LIMIT 1',
86+
[sql],
87+
false
88+
) as any[];
89+
90+
const query = exists?.length
91+
? { sql: 'UPDATE tmp_cache SET timestamp = ?, results = ? WHERE query = ?', params: [timestamp, results, sql] }
92+
: { sql: 'INSERT INTO tmp_cache (timestamp, ttl, query, results) VALUES (?, ?, ?, ?)', params: [timestamp, 60, sql, results] };
93+
94+
await dataSource.internalConnection?.durableObject.executeQuery(query.sql, query.params, false);
95+
} catch (error) {
96+
console.error('Error in cache operation:', error);
97+
return;
98+
}
99+
}

src/index.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,11 @@ export default {
114114
outerbaseApiKey: env.OUTERBASE_API_KEY ?? ''
115115
}
116116
};
117+
118+
// Non-blocking operation to remove expired cache entries from our DO
119+
expireCache(dataSource)
117120

121+
// Return the final response to our user
118122
return await new Handler().handle(request, dataSource, env);
119123
} catch (error) {
120124
// Return error response to client
@@ -124,5 +128,16 @@ export default {
124128
400
125129
);
126130
}
127-
},
131+
132+
function expireCache(dataSource: DataSource) {
133+
ctx.waitUntil((async () => {
134+
try {
135+
const cleanupSQL = `DELETE FROM tmp_cache WHERE timestamp + (ttl * 1000) < ?`;
136+
await dataSource.internalConnection?.durableObject.executeQuery(cleanupSQL, [Date.now()], false);
137+
} catch (err) {
138+
console.error('Error cleaning up expired cache entries:', err);
139+
}
140+
})());
141+
}
142+
}
128143
} satisfies ExportedHandler<Env>;

src/operation.ts

Lines changed: 38 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import { CloudflareD1Connection, MongoDBConnection, MySQLConnection, PostgreSQLC
88
import { DataSource } from './types';
99
import { Env } from './'
1010
import { MongoClient } from 'mongodb';
11+
import { afterQueryCache, beforeQueryCache } from './cache';
12+
13+
let globalConnection: any = null;
1114

1215
export type OperationQueueItem = {
1316
queries: { sql: string; params?: any[] }[];
@@ -46,7 +49,7 @@ async function afterQuery(sql: string, result: any, isRaw: boolean, dataSource?:
4649
result = isRaw ? transformRawResults(result, 'from') : result;
4750

4851
// ## DO NOT REMOVE: POST QUERY HOOK ##
49-
52+
5053
return isRaw ? transformRawResults(result, 'to') : result;
5154
}
5255

@@ -132,6 +135,13 @@ export async function executeQuery(sql: string, params: any | undefined, isRaw:
132135
}
133136

134137
const { sql: updatedSQL, params: updatedParams } = await beforeQuery(sql, params, dataSource, env)
138+
139+
// If a cached version of this query request exists, this function will fetch the cached results.
140+
const cache = await beforeQueryCache(updatedSQL, updatedParams, dataSource)
141+
if (cache) {
142+
return cache
143+
}
144+
135145
let response;
136146

137147
if (dataSource.source === 'internal') {
@@ -140,6 +150,9 @@ export async function executeQuery(sql: string, params: any | undefined, isRaw:
140150
response = await executeExternalQuery(updatedSQL, updatedParams, isRaw, dataSource, env);
141151
}
142152

153+
// If this is a cacheable query, this function will handle that logic.
154+
await afterQueryCache(sql, updatedParams, response, dataSource)
155+
143156
return await afterQuery(updatedSQL, response, isRaw, dataSource, env);
144157
}
145158

@@ -248,30 +261,31 @@ export async function executeSDKQuery(sql: string, params: any | undefined, isRa
248261
return []
249262
}
250263

251-
let db;
252-
253-
if (env?.EXTERNAL_DB_TYPE === 'postgres') {
254-
const { database } = await createSDKPostgresConnection(env)
255-
db = database
256-
} else if (env?.EXTERNAL_DB_TYPE === 'mysql' && env) {
257-
const { database } = await createSDKMySQLConnection(env)
258-
db = database
259-
} else if (env?.EXTERNAL_DB_TYPE === 'mongo' && env) {
260-
const { database } = await createSDKMongoConnection(env)
261-
db = database
262-
} else if (env?.EXTERNAL_DB_TYPE === 'sqlite' && env?.EXTERNAL_DB_CLOUDFLARE_API_KEY && env) {
263-
const { database } = await createSDKCloudflareConnection(env)
264-
db = database
265-
} else if (env?.EXTERNAL_DB_TYPE === 'sqlite' && env?.EXTERNAL_DB_STARBASEDB_URI && env) {
266-
const { database } = await createSDKStarbaseConnection(env)
267-
db = database
268-
} else if (env?.EXTERNAL_DB_TYPE === 'sqlite' && env?.EXTERNAL_DB_TURSO_URI && env) {
269-
const { database } = await createSDKTursoConnection(env)
270-
db = database
264+
// Initialize connection if it doesn't exist
265+
if (!globalConnection) {
266+
if (env?.EXTERNAL_DB_TYPE === 'postgres') {
267+
const { database } = await createSDKPostgresConnection(env)
268+
globalConnection = database;
269+
} else if (env?.EXTERNAL_DB_TYPE === 'mysql' && env) {
270+
const { database } = await createSDKMySQLConnection(env)
271+
globalConnection = database;
272+
} else if (env?.EXTERNAL_DB_TYPE === 'mongo' && env) {
273+
const { database } = await createSDKMongoConnection(env)
274+
globalConnection = database;
275+
} else if (env?.EXTERNAL_DB_TYPE === 'sqlite' && env?.EXTERNAL_DB_CLOUDFLARE_API_KEY && env) {
276+
const { database } = await createSDKCloudflareConnection(env)
277+
globalConnection = database;
278+
} else if (env?.EXTERNAL_DB_TYPE === 'sqlite' && env?.EXTERNAL_DB_STARBASEDB_URI && env) {
279+
const { database } = await createSDKStarbaseConnection(env)
280+
globalConnection = database;
281+
} else if (env?.EXTERNAL_DB_TYPE === 'sqlite' && env?.EXTERNAL_DB_TURSO_URI && env) {
282+
const { database } = await createSDKTursoConnection(env)
283+
globalConnection = database;
284+
}
285+
286+
await globalConnection.connect();
271287
}
272288

273-
await db.connect();
274-
const { data } = await db.raw(sql, params);
275-
289+
const { data } = await globalConnection.raw(sql, params);
276290
return data;
277291
}

0 commit comments

Comments
 (0)