Skip to content

Commit 21ddc82

Browse files
committed
Latest logic shifts
1 parent 8d57caa commit 21ddc82

File tree

4 files changed

+101
-80
lines changed

4 files changed

+101
-80
lines changed

src/do.ts

Lines changed: 78 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { createResponse, QueryRequest, QueryTransactionRequest } from "./utils";
55
export class DatabaseDurableObject extends DurableObject {
66
// Durable storage for the SQL database
77
public sql: SqlStorage;
8+
public storage: DurableObjectStorage;
89

910
// Queue of operations to be processed, with each operation containing a list of queries to be executed
1011
private operationQueue: Array<OperationQueueItem> = [];
@@ -22,6 +23,7 @@ export class DatabaseDurableObject extends DurableObject {
2223
constructor(ctx: DurableObjectState, env: Env) {
2324
super(ctx, env);
2425
this.sql = ctx.storage.sql;
26+
this.storage = ctx.storage;
2527
}
2628

2729
/**
@@ -86,8 +88,8 @@ export class DatabaseDurableObject extends DurableObject {
8688
}
8789
}
8890

89-
executeTransaction(queries: { sql: string; params?: any[] }[], isRaw: boolean, sqlInstance: any, ctx: any): Promise<any[]> {
90-
return ctx.storage.transactionSync(() => {
91+
public executeTransaction(queries: { sql: string; params?: any[] }[], isRaw: boolean): any[] {
92+
return this.storage.transactionSync(() => {
9193
const results = [];
9294

9395
try {
@@ -105,84 +107,84 @@ export class DatabaseDurableObject extends DurableObject {
105107
});
106108
}
107109

108-
enqueueOperation(
109-
queries: { sql: string; params?: any[] }[],
110-
isTransaction: boolean,
111-
isRaw: boolean,
112-
operationQueue: any[],
113-
processNextOperation: () => Promise<void>
114-
): Promise<{ result?: any, error?: string | undefined, status: number }> {
115-
const MAX_WAIT_TIME = 25000;
116-
return new Promise((resolve, reject) => {
117-
const timeout = setTimeout(() => {
118-
reject(createResponse(undefined, 'Operation timed out.', 503));
119-
}, MAX_WAIT_TIME);
120-
121-
operationQueue.push({
122-
queries,
123-
isTransaction,
124-
isRaw,
125-
resolve: (value: any) => {
126-
clearTimeout(timeout);
127-
128-
resolve({
129-
result: value,
130-
error: undefined,
131-
status: 200
132-
})
133-
},
134-
reject: (reason?: any) => {
135-
clearTimeout(timeout);
136-
137-
reject({
138-
result: undefined,
139-
error: reason ?? 'Operation failed.',
140-
status: 500
141-
})
142-
}
143-
});
144-
145-
processNextOperation().catch((err) => {
146-
console.error('Error processing operation queue:', err);
147-
});
148-
});
149-
}
110+
// enqueueOperation(
111+
// queries: { sql: string; params?: any[] }[],
112+
// isTransaction: boolean,
113+
// isRaw: boolean,
114+
// operationQueue: any[],
115+
// processNextOperation: () => Promise<void>
116+
// ): Promise<{ result?: any, error?: string | undefined, status: number }> {
117+
// const MAX_WAIT_TIME = 25000;
118+
// return new Promise((resolve, reject) => {
119+
// const timeout = setTimeout(() => {
120+
// reject(createResponse(undefined, 'Operation timed out.', 503));
121+
// }, MAX_WAIT_TIME);
122+
123+
// operationQueue.push({
124+
// queries,
125+
// isTransaction,
126+
// isRaw,
127+
// resolve: (value: any) => {
128+
// clearTimeout(timeout);
129+
130+
// resolve({
131+
// result: value,
132+
// error: undefined,
133+
// status: 200
134+
// })
135+
// },
136+
// reject: (reason?: any) => {
137+
// clearTimeout(timeout);
138+
139+
// reject({
140+
// result: undefined,
141+
// error: reason ?? 'Operation failed.',
142+
// status: 500
143+
// })
144+
// }
145+
// });
146+
147+
// processNextOperation().catch((err) => {
148+
// console.error('Error processing operation queue:', err);
149+
// });
150+
// });
151+
// }
150152

151-
async processNextOperation(
152-
sqlInstance: any,
153-
operationQueue: OperationQueueItem[],
154-
ctx: any,
155-
processingOperation: { value: boolean }
156-
) {
157-
if (processingOperation.value) {
158-
// Already processing an operation
159-
return;
160-
}
153+
// async processNextOperation(
154+
// sqlInstance: any,
155+
// operationQueue: OperationQueueItem[],
156+
// ctx: any,
157+
// processingOperation: { value: boolean }
158+
// ) {
159+
// if (processingOperation.value) {
160+
// // Already processing an operation
161+
// return;
162+
// }
161163

162-
if (operationQueue.length === 0) {
163-
// No operations remaining to process
164-
return;
165-
}
164+
// if (operationQueue.length === 0) {
165+
// // No operations remaining to process
166+
// return;
167+
// }
166168

167-
processingOperation.value = true;
168-
const { queries, isTransaction, isRaw, resolve, reject } = operationQueue.shift()!;
169+
// processingOperation.value = true;
170+
// const { queries, isTransaction, isRaw, resolve, reject } = operationQueue.shift()!;
169171

170-
try {
171-
let result;
172+
// try {
173+
// let result;
172174

173-
if (isTransaction) {
174-
result = await this.executeTransaction(queries, isRaw, sqlInstance, ctx);
175-
} else {
176-
const { sql, params } = queries[0];
177-
result = this.executeQuery(sql, params, isRaw);
178-
}
175+
// if (isTransaction) {
176+
// result = await this.executeTransaction(queries, isRaw, sqlInstance, ctx);
177+
// } else {
178+
// const { sql, params } = queries[0];
179+
// result = this.executeQuery(sql, params, isRaw);
180+
// }
179181

180-
resolve(result);
181-
} catch (error: any) {
182-
reject(error.message || 'Operation failed.');
183-
} finally {
184-
processingOperation.value = false;
185-
await this.processNextOperation(sqlInstance, operationQueue, ctx, processingOperation);
186-
}
187-
}
182+
// resolve(result);
183+
// } catch (error: any) {
184+
// reject(error.message || 'Operation failed.');
185+
// } finally {
186+
// processingOperation.value = false;
187+
// await this.processNextOperation(sqlInstance, operationQueue, ctx, processingOperation);
188+
// }
189+
// }
188190
}

src/handler.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { importTableFromCsvRoute } from "./import/csv";
77
import { importDumpRoute } from "./import/dump";
88
import { importTableFromJsonRoute } from "./import/json";
99
import { LiteREST } from "./literest";
10-
import { executeQuery, OperationQueueItem } from "./operation";
10+
import { executeQuery, executeTransaction, OperationQueueItem } from "./operation";
1111
import { createResponse, createResponseFromOperationResponse, QueryRequest, QueryTransactionRequest } from "./utils";
1212

1313
export class Handler {
@@ -88,9 +88,6 @@ export class Handler {
8888
}
8989

9090
async queryRoute(request: Request, isRaw: boolean): Promise<Response> {
91-
// TODO:
92-
// Is it for the `internal` or `external` source?
93-
9491
if (!this.dataSource) {
9592
return createResponse(undefined, 'Data source not found.', 400);
9693
}
@@ -123,6 +120,9 @@ export class Handler {
123120
return { sql, params };
124121
});
125122

123+
const response = await executeTransaction(queries, isRaw, this.dataSource);
124+
return createResponse(response, undefined, 200);
125+
126126
// try {
127127
// const response = await enqueueOperation(
128128
// queries,

src/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ export type DataSource = {
4141
type DatabaseStub = DurableObjectStub & {
4242
fetch: (init?: RequestInit | Request) => Promise<Response>;
4343
executeQuery(sql: string, params: any[] | undefined, isRaw: boolean): QueryResponse;
44+
executeTransaction(queries: { sql: string; params?: any[] }[], isRaw: boolean): any[];
4445
};
4546

4647
interface InternalConnection {
@@ -98,6 +99,9 @@ export default {
9899
* Retrieve the Durable Object identifier from the environment bindings and instantiate a
99100
* Durable Object stub to interact with the Durable Object.
100101
*/
102+
// Get location hint from wrangler.toml environment variables
103+
// const locationHint = env.DATABASE_LOCATION_HINT ?? 'enam';
104+
// let stub = env.DATABASE_DURABLE_OBJECT.get(id, { locationHint: "enam" });
101105
let id: DurableObjectId = env.DATABASE_DURABLE_OBJECT.idFromName(DURABLE_OBJECT_ID);
102106
let stub = env.DATABASE_DURABLE_OBJECT.get(id);
103107

src/operation.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,18 @@ export async function executeQuery(sql: string, params: any[] | undefined, isRaw
4949
return items;
5050
}
5151
}
52+
53+
export async function executeTransaction(queries: { sql: string; params?: any[] }[], isRaw: boolean, dataSource: DataSource): Promise<QueryResponse> {
54+
if (dataSource.source === 'internal') {
55+
const response = await dataSource.internalConnection?.durableObject.executeTransaction(queries, isRaw);
56+
return response ?? [];
57+
} else {
58+
if (!dataSource.externalConnection) {
59+
throw new Error('External connection not found.');
60+
}
61+
62+
// TODO: Implement transaction for external source
63+
}
64+
65+
return [];
66+
}

0 commit comments

Comments
 (0)