11import { DurableObject } from "cloudflare:workers" ;
22import { OperationQueueItem , QueryResponse } from "./operation" ;
3+ import { createResponse } from "./utils" ;
34
45export class DatabaseDurableObject extends DurableObject {
56 // Durable storage for the SQL database
@@ -35,27 +36,25 @@ export class DatabaseDurableObject extends DurableObject {
3536 * @param params - Optional parameters for the SQL query.
3637 * @returns A response containing the query result or an error message.
3738 */
38- // async executeExternalQuery(sql: string, params: any[] | undefined): Promise<any> {
39- // try {
40- // const queries = [{ sql, params }];
41- // const response = await enqueueOperation(
42- // queries,
43- // false,
44- // false,
45- // this.operationQueue,
46- // () => processNextOperation(this.sql, this.operationQueue, this.ctx, this.processingOperation)
47- // );
48-
49- // return response;
50- // } catch (error: any) {
51- // console.error('Execute External Query Error:', error);
52- // return null;
53- // }
54- // }
39+ async executeExternalQuery ( sql : string , params : any [ ] | undefined ) : Promise < any > {
40+ try {
41+ const queries = [ { sql, params } ] ;
42+ const response = await this . enqueueOperation (
43+ queries ,
44+ false ,
45+ false ,
46+ this . operationQueue ,
47+ ( ) => this . processNextOperation ( this . sql , this . operationQueue , this . ctx , this . processingOperation )
48+ ) ;
49+
50+ return response ;
51+ } catch ( error : any ) {
52+ console . error ( 'Execute External Query Error:' , error ) ;
53+ return null ;
54+ }
55+ }
5556
5657 public executeQuery ( sql : string , params : any [ ] | undefined , isRaw : boolean ) : QueryResponse {
57- console . log ( 'Executing INTERNAL query: ' , sql , params , isRaw ) ;
58-
5958 try {
6059 let cursor ;
6160
@@ -106,84 +105,84 @@ export class DatabaseDurableObject extends DurableObject {
106105 } ) ;
107106 }
108107
109- // enqueueOperation(
110- // queries: { sql: string; params?: any[] }[],
111- // isTransaction: boolean,
112- // isRaw: boolean,
113- // operationQueue: any[],
114- // processNextOperation: () => Promise<void>
115- // ): Promise<{ result?: any, error?: string | undefined, status: number }> {
116- // const MAX_WAIT_TIME = 25000;
117- // return new Promise((resolve, reject) => {
118- // const timeout = setTimeout(() => {
119- // reject(createResponse(undefined, 'Operation timed out.', 503));
120- // }, MAX_WAIT_TIME);
121-
122- // operationQueue.push({
123- // queries,
124- // isTransaction,
125- // isRaw,
126- // resolve: (value: any) => {
127- // clearTimeout(timeout);
128-
129- // resolve({
130- // result: value,
131- // error: undefined,
132- // status: 200
133- // })
134- // },
135- // reject: (reason?: any) => {
136- // clearTimeout(timeout);
137-
138- // reject({
139- // result: undefined,
140- // error: reason ?? 'Operation failed.',
141- // status: 500
142- // })
143- // }
144- // });
145-
146- // processNextOperation().catch((err) => {
147- // console.error('Error processing operation queue:', err);
148- // });
149- // });
150- // }
151-
152- // async processNextOperation(
153- // sqlInstance: any,
154- // operationQueue: OperationQueueItem[],
155- // ctx: any,
156- // processingOperation: { value: boolean }
157- // ) {
158- // if (processingOperation.value) {
159- // // Already processing an operation
160- // return;
161- // }
162-
163- // if (operationQueue.length === 0) {
164- // // No operations remaining to process
165- // return;
166- // }
167-
168- // processingOperation.value = true;
169- // const { queries, isTransaction, isRaw, resolve, reject } = operationQueue.shift()!;
170-
171- // try {
172- // let result;
173-
174- // if (isTransaction) {
175- // result = await this.executeTransaction(queries, isRaw, sqlInstance, ctx );
176- // } else {
177- // const { sql, params } = queries[0];
178- // result = this.executeQuery(sql, params, isRaw);
179- // }
180-
181- // resolve(result);
182- // } catch (error: any) {
183- // reject(error.message || 'Operation failed.');
184- // } finally {
185- // processingOperation.value = false;
186- // await this.processNextOperation(sqlInstance, operationQueue, ctx, processingOperation);
187- // }
188- // }
108+ enqueueOperation (
109+ queries : { sql : string ; params ?: any [ ] } [ ] ,
110+ isTransaction : boolean ,
111+ isRaw : boolean ,
112+ operationQueue : any [ ] ,
113+ processNextOperation : ( ) => Promise < void >
114+ ) : Promise < { result ?: any , error ?: string | undefined , status : number } > {
115+ const MAX_WAIT_TIME = 25000 ;
116+ return new Promise ( ( resolve , reject ) => {
117+ const timeout = setTimeout ( ( ) => {
118+ reject ( createResponse ( undefined , 'Operation timed out.' , 503 ) ) ;
119+ } , MAX_WAIT_TIME ) ;
120+
121+ operationQueue . push ( {
122+ queries,
123+ isTransaction,
124+ isRaw,
125+ resolve : ( value : any ) => {
126+ clearTimeout ( timeout ) ;
127+
128+ resolve ( {
129+ result : value ,
130+ error : undefined ,
131+ status : 200
132+ } )
133+ } ,
134+ reject : ( reason ?: any ) => {
135+ clearTimeout ( timeout ) ;
136+
137+ reject ( {
138+ result : undefined ,
139+ error : reason ?? 'Operation failed.' ,
140+ status : 500
141+ } )
142+ }
143+ } ) ;
144+
145+ processNextOperation ( ) . catch ( ( err ) => {
146+ console . error ( 'Error processing operation queue:' , err ) ;
147+ } ) ;
148+ } ) ;
149+ }
150+
151+ async processNextOperation (
152+ sqlInstance : any ,
153+ operationQueue : OperationQueueItem [ ] ,
154+ ctx : any ,
155+ processingOperation : { value : boolean }
156+ ) {
157+ if ( processingOperation . value ) {
158+ // Already processing an operation
159+ return ;
160+ }
161+
162+ if ( operationQueue . length === 0 ) {
163+ // No operations remaining to process
164+ return ;
165+ }
166+
167+ processingOperation . value = true ;
168+ const { queries, isTransaction, isRaw, resolve, reject } = operationQueue . shift ( ) ! ;
169+
170+ try {
171+ let result ;
172+
173+ if ( isTransaction ) {
174+ result = await this . executeTransaction ( queries , isRaw ) ;
175+ } else {
176+ const { sql, params } = queries [ 0 ] ;
177+ result = this . executeQuery ( sql , params , isRaw ) ;
178+ }
179+
180+ resolve ( result ) ;
181+ } catch ( error : any ) {
182+ reject ( error . message || 'Operation failed.' ) ;
183+ } finally {
184+ processingOperation . value = false ;
185+ await this . processNextOperation ( sqlInstance , operationQueue , ctx , processingOperation ) ;
186+ }
187+ }
189188}
0 commit comments