@@ -12,28 +12,57 @@ export class Query<T> {
12
12
private correlationId : string ;
13
13
private sql : string ;
14
14
private isPrepared : boolean = false ;
15
- private parameters : any [ ] ;
15
+ private parameters : any [ ] | undefined ;
16
16
private rowsToFetch : number = 100 ;
17
17
private isCLCommand : boolean ;
18
18
private state : QueryState = QueryState . NOT_YET_RUN ;
19
19
20
- constructor ( private job : SQLJob , query : string , opts : QueryOptions = { isClCommand : false , parameters : undefined } ) {
20
+ public shouldAutoClose : boolean ;
21
+
22
+ constructor ( private job : SQLJob , query : string , opts : QueryOptions = { isClCommand : false , parameters : undefined , autoClose : false } ) {
21
23
this . job = job ;
22
24
this . isPrepared = ( undefined !== opts . parameters ) ;
23
25
this . parameters = opts . parameters ;
24
26
this . sql = query ;
25
27
this . isCLCommand = opts . isClCommand ;
28
+ this . shouldAutoClose = opts . autoClose ;
29
+
26
30
Query . globalQueryList . push ( this ) ;
27
31
}
32
+
28
33
public static byId ( id : string ) {
29
34
return ( undefined === id || '' === id ) ? undefined : Query . globalQueryList . find ( query => query . correlationId === id ) ;
30
35
}
36
+
37
+ public static getOpenIds ( forJob ?: string ) {
38
+ return this . globalQueryList
39
+ . filter ( q => q . job . id === forJob || forJob === undefined )
40
+ . map ( q => q . correlationId ) ;
41
+ }
42
+
43
+ public static async cleanup ( ) {
44
+ let closePromises = [ ] ;
45
+
46
+ // First, let's check to see if we should also cleanup
47
+ // any cursors that remain open, and we've been told to close
48
+ for ( const query of this . globalQueryList ) {
49
+ if ( query . shouldAutoClose ) {
50
+ closePromises . push ( query . close ( ) )
51
+ }
52
+ } ;
53
+
54
+ await Promise . all ( closePromises ) ;
55
+
56
+ // Automatically remove any queries done and dusted. They're useless.
57
+ this . globalQueryList = this . globalQueryList . filter ( q => q . getState ( ) !== QueryState . RUN_DONE ) ;
58
+ }
59
+
31
60
public async run ( rowsToFetch : number = this . rowsToFetch ) : Promise < QueryResult < T > > {
32
61
switch ( this . state ) {
33
- case QueryState . RUN_MORE_DATA_AVAILABLE :
34
- throw new Error ( 'Statement has already been run' ) ;
35
- case QueryState . RUN_DONE :
36
- throw new Error ( 'Statement has already been fully run' ) ;
62
+ case QueryState . RUN_MORE_DATA_AVAILABLE :
63
+ throw new Error ( 'Statement has already been run' ) ;
64
+ case QueryState . RUN_DONE :
65
+ throw new Error ( 'Statement has already been fully run' ) ;
37
66
}
38
67
let queryObject ;
39
68
if ( this . isCLCommand ) {
@@ -55,7 +84,7 @@ export class Query<T> {
55
84
let result = await this . job . send ( JSON . stringify ( queryObject ) ) ;
56
85
let queryResult : QueryResult < T > = JSON . parse ( result ) ;
57
86
58
- this . state = queryResult . is_done ? QueryState . RUN_DONE : QueryState . RUN_MORE_DATA_AVAILABLE ;
87
+ this . state = queryResult . is_done ? QueryState . RUN_DONE : QueryState . RUN_MORE_DATA_AVAILABLE ;
59
88
60
89
if ( queryResult . success !== true && ! this . isCLCommand ) {
61
90
this . state = QueryState . ERROR ;
@@ -64,13 +93,14 @@ export class Query<T> {
64
93
this . correlationId = queryResult . id ;
65
94
return queryResult ;
66
95
}
96
+
67
97
public async fetchMore ( rowsToFetch : number = this . rowsToFetch ) : Promise < QueryResult < T > > {
68
98
//TODO: verify that the SQL job hasn't changed
69
99
switch ( this . state ) {
70
- case QueryState . NOT_YET_RUN :
71
- throw new Error ( 'Statement has not yet been run' ) ;
72
- case QueryState . RUN_DONE :
73
- throw new Error ( 'Statement has already been fully run' ) ;
100
+ case QueryState . NOT_YET_RUN :
101
+ throw new Error ( 'Statement has not yet been run' ) ;
102
+ case QueryState . RUN_DONE :
103
+ throw new Error ( 'Statement has already been fully run' ) ;
74
104
}
75
105
let queryObject = {
76
106
id : SQLJob . getNewUniqueRequestId ( `fetchMore` ) ,
@@ -84,20 +114,32 @@ export class Query<T> {
84
114
let result = await this . job . send ( JSON . stringify ( queryObject ) ) ;
85
115
86
116
let queryResult : QueryResult < T > = JSON . parse ( result ) ;
87
- this . state = queryResult . is_done ? QueryState . RUN_DONE : QueryState . RUN_MORE_DATA_AVAILABLE ;
117
+ this . state = queryResult . is_done ? QueryState . RUN_DONE : QueryState . RUN_MORE_DATA_AVAILABLE ;
88
118
89
119
if ( queryResult . success !== true ) {
90
120
this . state = QueryState . ERROR ;
91
121
throw new Error ( queryResult . error || `Failed to run query (unknown error)` ) ;
92
122
}
93
123
return queryResult ;
94
124
}
125
+
95
126
public async close ( ) {
96
- this . state = QueryState . RUN_DONE ;
127
+ if ( this . correlationId && this . state !== QueryState . RUN_DONE ) {
128
+ this . state = QueryState . RUN_DONE ;
129
+ let queryObject = {
130
+ id : SQLJob . getNewUniqueRequestId ( `sqlclose` ) ,
131
+ cont_id : this . correlationId ,
132
+ type : `sqlclose` ,
133
+ } ;
134
+
135
+ return this . job . send ( JSON . stringify ( queryObject ) ) ;
136
+ }
97
137
}
138
+
98
139
public getId ( ) : string {
99
140
return this . correlationId ;
100
141
}
142
+
101
143
public getState ( ) : QueryState {
102
144
return this . state ;
103
145
}
0 commit comments