Skip to content

Commit 1f22cf3

Browse files
committed
Web sockets on workers instead of DO
1 parent 21ddc82 commit 1f22cf3

File tree

6 files changed

+140
-221
lines changed

6 files changed

+140
-221
lines changed

src/do.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { DurableObject } from "cloudflare:workers";
22
import { OperationQueueItem, QueryResponse } from "./operation";
3-
import { createResponse, QueryRequest, QueryTransactionRequest } from "./utils";
43

54
export class DatabaseDurableObject extends DurableObject {
65
// Durable storage for the SQL database

src/handler.ts

Lines changed: 37 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,31 @@
1-
import { DataSource, Source } from ".";
2-
import { handleApiRequest } from "./api";
3-
import { exportTableToCsvRoute } from "./export/csv";
4-
import { dumpDatabaseRoute } from "./export/dump";
5-
import { exportTableToJsonRoute } from "./export/json";
6-
import { importTableFromCsvRoute } from "./import/csv";
7-
import { importDumpRoute } from "./import/dump";
8-
import { importTableFromJsonRoute } from "./import/json";
1+
import { DataSource } from ".";
92
import { LiteREST } from "./literest";
10-
import { executeQuery, executeTransaction, OperationQueueItem } from "./operation";
11-
import { createResponse, createResponseFromOperationResponse, QueryRequest, QueryTransactionRequest } from "./utils";
3+
import { executeQuery, executeTransaction } from "./operation";
4+
import { createResponse, QueryRequest, QueryTransactionRequest } from "./utils";
5+
import { Env } from './'
6+
import { handleApiRequest } from "./api";
127

138
export class Handler {
14-
// Queue of operations to be processed, with each operation containing a list of queries to be executed
15-
private operationQueue: Array<OperationQueueItem> = [];
16-
17-
// Flag to indicate if an operation is currently being processed
18-
private processingOperation: { value: boolean } = { value: false };
19-
20-
// Map of WebSocket connections to their corresponding session IDs
21-
private connections = new Map<string, WebSocket>();
22-
23-
// Instantiate LiteREST
249
private liteREST?: LiteREST;
25-
2610
private dataSource?: DataSource;
2711

28-
// You can inject dependencies via the constructor if needed
29-
constructor(/* dependencies */) {
30-
// Initialize LiteREST for handling /lite routes
31-
// this.liteREST = new LiteREST(
32-
// ctx,
33-
// this.operationQueue,
34-
// this.processingOperation,
35-
// this.sql
36-
// );
37-
}
12+
constructor() { }
3813

39-
// Main method to handle the request
40-
public async handle(request: Request, dataSource: DataSource): Promise<Response> {
14+
public async handle(request: Request, dataSource: DataSource, env: Env): Promise<Response> {
4115
this.dataSource = dataSource;
16+
this.liteREST = new LiteREST(dataSource, env);
4217
const url = new URL(request.url);
4318

44-
// return createResponse(dataSource, undefined, 200);
45-
4619
if (request.method === 'POST' && url.pathname === '/query/raw') {
4720
return this.queryRoute(request, true);
4821
} else if (request.method === 'POST' && url.pathname === '/query') {
4922
return this.queryRoute(request, false);
50-
}
51-
// else if (url.pathname === '/socket') {
52-
// return this.clientConnected();
53-
// } else if (url.pathname.startsWith('/rest')) {
54-
// return await this.liteREST.handleRequest(request);
55-
// } else if (request.method === 'GET' && url.pathname === '/export/dump') {
23+
} else if (url.pathname === '/socket') {
24+
return this.clientConnected();
25+
} else if (url.pathname.startsWith('/rest')) {
26+
return await this.liteREST.handleRequest(request);
27+
28+
// else if (request.method === 'GET' && url.pathname === '/export/dump') {
5629
// return dumpDatabaseRoute(this.sql, this.operationQueue, this.ctx, this.processingOperation);
5730
// } else if (request.method === 'GET' && url.pathname.startsWith('/export/json/')) {
5831
// const tableName = url.pathname.split('/').pop();
@@ -80,18 +53,14 @@ export class Handler {
8053
// return createResponse(undefined, 'Table name is required', 400);
8154
// }
8255
// return importTableFromCsvRoute(this.sql, this.operationQueue, this.ctx, this.processingOperation, tableName, request);
83-
// } else if (url.pathname.startsWith('/api')) {
84-
// return await handleApiRequest(request);
85-
// }
56+
} else if (url.pathname.startsWith('/api')) {
57+
return await handleApiRequest(request);
58+
}
8659

8760
return createResponse(undefined, 'Unknown operation', 400);
8861
}
8962

9063
async queryRoute(request: Request, isRaw: boolean): Promise<Response> {
91-
if (!this.dataSource) {
92-
return createResponse(undefined, 'Data source not found.', 400);
93-
}
94-
9564
try {
9665
const contentType = request.headers.get('Content-Type') || '';
9766
if (!contentType.includes('application/json')) {
@@ -100,108 +69,52 @@ export class Handler {
10069

10170
const { sql, params, transaction } = await request.json() as QueryRequest & QueryTransactionRequest;
10271

103-
// const response = await executeQuery(sql, params, isRaw, this.dataSource);
104-
// return createResponse(response, undefined, 200);
105-
106-
107-
108-
109-
11072
if (Array.isArray(transaction) && transaction.length) {
11173
const queries = transaction.map((queryObj: any) => {
11274
const { sql, params } = queryObj;
11375

11476
if (typeof sql !== 'string' || !sql.trim()) {
11577
throw new Error('Invalid or empty "sql" field in transaction.');
116-
} else if (params !== undefined && !Array.isArray(params)) {
117-
throw new Error('Invalid "params" field in transaction.');
78+
} else if (params !== undefined && !Array.isArray(params) && typeof params !== 'object') {
79+
throw new Error('Invalid "params" field in transaction. Must be an array or object.');
11880
}
11981

12082
return { sql, params };
12183
});
12284

12385
const response = await executeTransaction(queries, isRaw, this.dataSource);
12486
return createResponse(response, undefined, 200);
125-
126-
// try {
127-
// const response = await enqueueOperation(
128-
// queries,
129-
// true,
130-
// isRaw,
131-
// this.operationQueue,
132-
// () => processNextOperation(this.sql, this.operationQueue, this.ctx, this.processingOperation)
133-
// );
134-
135-
// return createResponseFromOperationResponse(response);
136-
// } catch (error: any) {
137-
// return createResponse(undefined, error.error || 'An unexpected error occurred.', error.status || 500);
138-
// }
13987
} else if (typeof sql !== 'string' || !sql.trim()) {
14088
return createResponse(undefined, 'Invalid or empty "sql" field.', 400);
141-
} else if (params !== undefined && !Array.isArray(params)) {
142-
return createResponse(undefined, 'Invalid "params" field.', 400);
89+
} else if (params !== undefined && !Array.isArray(params) && typeof params !== 'object') {
90+
return createResponse(undefined, 'Invalid "params" field. Must be an array or object.', 400);
14391
}
14492

14593
const response = await executeQuery(sql, params, isRaw, this.dataSource);
14694
return createResponse(response, undefined, 200);
147-
148-
// try {
149-
// const queries = [{ sql, params }];
150-
// const response = await enqueueOperation(
151-
// queries,
152-
// false,
153-
// isRaw,
154-
// this.operationQueue,
155-
// () => processNextOperation(this.sql, this.operationQueue, this.ctx, this.processingOperation)
156-
// );
157-
158-
// return createResponseFromOperationResponse(response);
159-
// } catch (error: any) {
160-
// return createResponse(undefined, error.error || 'An unexpected error occurred.', error.status || 500);
161-
// }
16295
} catch (error: any) {
16396
console.error('Query Route Error:', error);
16497
return createResponse(undefined, error || 'An unexpected error occurred.', 500);
16598
}
16699
}
167100

168-
// clientConnected() {
169-
// const webSocketPair = new WebSocketPair();
170-
// const [client, server] = Object.values(webSocketPair);
171-
// const wsSessionId = crypto.randomUUID();
101+
clientConnected() {
102+
const webSocketPair = new WebSocketPair();
103+
const [client, server] = Object.values(webSocketPair);
172104

173-
// this.ctx.acceptWebSocket(server, [wsSessionId]);
174-
// this.connections.set(wsSessionId, client);
105+
server.accept();
106+
server.addEventListener('message', event => {
107+
const { sql, params, action } = JSON.parse(event.data as string);
175108

176-
// return new Response(null, { status: 101, webSocket: client });
177-
// }
178-
179-
// async webSocketMessage(ws: WebSocket, message: any) {
180-
// const { sql, params, action } = JSON.parse(message);
181-
182-
// if (action === 'query') {
183-
// const queries = [{ sql, params }];
184-
// const response = await enqueueOperation(
185-
// queries,
186-
// false,
187-
// false,
188-
// this.operationQueue,
189-
// () => processNextOperation(this.sql, this.operationQueue, this.ctx, this.processingOperation)
190-
// );
191-
192-
// ws.send(JSON.stringify(response.result));
193-
// }
194-
// }
195-
196-
// async webSocketClose(ws: WebSocket, code: number, reason: string, wasClean: boolean) {
197-
// // If the client closes the connection, the runtime will invoke the webSocketClose() handler.
198-
// ws.close(code, "StarbaseDB is closing WebSocket connection");
109+
if (action === 'query') {
110+
const executeQueryWrapper = async () => {
111+
const response = await executeQuery(sql, params, false, this.dataSource);
112+
server.send(JSON.stringify(response));
113+
};
114+
executeQueryWrapper();
115+
}
116+
});
199117

200-
// // Remove the WebSocket connection from the map
201-
// const tags = this.ctx.getTags(ws);
202-
// if (tags.length) {
203-
// const wsSessionId = tags[0];
204-
// this.connections.delete(wsSessionId);
205-
// }
206-
// }
118+
return new Response(null, { status: 101, webSocket: client });
119+
}
207120
}

src/index.ts

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,27 @@
1-
import { DurableObject } from "cloudflare:workers";
2-
import { createResponse, createResponseFromOperationResponse, QueryRequest, QueryTransactionRequest } from './utils';
3-
import { LiteREST } from './literest';
1+
import { createResponse } from './utils';
42
import handleStudioRequest from "./studio";
5-
import { dumpDatabaseRoute } from './export/dump';
6-
import { exportTableToJsonRoute } from './export/json';
7-
import { exportTableToCsvRoute } from './export/csv';
8-
import { importDumpRoute } from './import/dump';
9-
import { importTableFromJsonRoute } from './import/json';
10-
import { importTableFromCsvRoute } from './import/csv';
11-
import { handleApiRequest } from "./api";
123
import { Handler } from "./handler";
134
import { QueryResponse } from "./operation";
145
export { DatabaseDurableObject } from './do';
156

167
const DURABLE_OBJECT_ID = 'sql-durable-object';
178

18-
interface Env {
9+
export interface Env {
1910
AUTHORIZATION_TOKEN: string;
2011
DATABASE_DURABLE_OBJECT: DurableObjectNamespace;
12+
13+
// Studio credentials
2114
STUDIO_USER?: string;
2215
STUDIO_PASS?: string;
16+
17+
// External database source details
18+
EXTERNAL_DB_TYPE?: string;
19+
EXTERNAL_DB_HOST?: string;
20+
EXTERNAL_DB_NAME?: string;
21+
EXTERNAL_DB_USER?: string;
22+
EXTERNAL_DB_PASS?: string;
23+
EXTERNAL_DB_PORT?: string;
24+
2325
// ## DO NOT REMOVE: TEMPLATE INTERFACE ##
2426
}
2527

@@ -33,21 +35,20 @@ export type DataSource = {
3335
request: Request;
3436
internalConnection?: InternalConnection;
3537
externalConnection?: {
36-
// API Key for Outerbase which currently controls querying external data sources
3738
outerbaseApiKey: string;
3839
};
3940
}
4041

42+
interface InternalConnection {
43+
durableObject: DatabaseStub;
44+
}
45+
4146
type DatabaseStub = DurableObjectStub & {
4247
fetch: (init?: RequestInit | Request) => Promise<Response>;
4348
executeQuery(sql: string, params: any[] | undefined, isRaw: boolean): QueryResponse;
4449
executeTransaction(queries: { sql: string; params?: any[] }[], isRaw: boolean): any[];
4550
};
4651

47-
interface InternalConnection {
48-
durableObject: DatabaseStub;
49-
}
50-
5152
export default {
5253
/**
5354
* This is the standard fetch handler for a Cloudflare Worker
@@ -58,7 +59,8 @@ export default {
5859
* @returns The response to be sent back to the client
5960
*/
6061
async fetch(request, env, ctx): Promise<Response> {
61-
const pathname = new URL(request.url).pathname;
62+
const url = new URL(request.url);
63+
const pathname = url.pathname;
6264
const isWebSocket = request.headers.get("Upgrade") === "websocket";
6365

6466
/**
@@ -87,7 +89,6 @@ export default {
8789
* Web socket connections cannot pass in an Authorization header into their requests,
8890
* so we can use a query parameter to validate the connection.
8991
*/
90-
const url = new URL(request.url);
9192
const token = url.searchParams.get('token');
9293

9394
if (token !== env.AUTHORIZATION_TOKEN) {
@@ -99,25 +100,22 @@ export default {
99100
* Retrieve the Durable Object identifier from the environment bindings and instantiate a
100101
* Durable Object stub to interact with the Durable Object.
101102
*/
102-
// Get location hint from wrangler.toml environment variables
103-
// const locationHint = env.DATABASE_LOCATION_HINT ?? 'enam';
104-
// let stub = env.DATABASE_DURABLE_OBJECT.get(id, { locationHint: "enam" });
105103
let id: DurableObjectId = env.DATABASE_DURABLE_OBJECT.idFromName(DURABLE_OBJECT_ID);
106104
let stub = env.DATABASE_DURABLE_OBJECT.get(id);
107105

108-
const source: Source = request.headers.get('X-Starbase-Source') as Source ?? 'internal';
106+
const source: Source = request.headers.get('X-Starbase-Source') as Source ?? url.searchParams.get('source') as Source ?? 'internal';
109107
const dataSource: DataSource = {
110108
source: source,
111109
request: request.clone(),
112110
internalConnection: {
113111
durableObject: stub as unknown as DatabaseStub,
114112
},
115113
externalConnection: {
116-
outerbaseApiKey: request.headers.get('X-Outerbase-Source-Token') ?? '',
114+
outerbaseApiKey: request.headers.get('X-Outerbase-Source-Token') ?? url.searchParams.get('outerbaseApiKey') ?? '',
117115
},
118116
};
119117

120-
const response = await new Handler().handle(request, dataSource);
118+
const response = await new Handler().handle(request, dataSource, env);
121119

122120
// ## DO NOT REMOVE: TEMPLATE ROUTING ##
123121

0 commit comments

Comments
 (0)