Skip to content

Commit 4af51d8

Browse files
committed
chore: minor improvements of IterableRequest
1 parent af3dd4f commit 4af51d8

File tree

2 files changed

+82
-36
lines changed

2 files changed

+82
-36
lines changed

src/iterable-request.ts

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ export interface IterableRequestItem {
3434
* Metadata of all columns.
3535
*/
3636
columnMetadata: ColumnMetadataDef;
37+
3738
}
3839

3940
type iteratorPromiseResolveFunction = (value: IteratorResult<IterableRequestItem>) => void;
4041
type iteratorPromiseRejectFunction = (error: Error) => void;
42+
interface IteratorPromiseFunctions {resolve: iteratorPromiseResolveFunction, reject: iteratorPromiseRejectFunction}
4143

4244
// Internal class for the state controller logic of the iterator.
4345
class IterableRequestController {
@@ -54,10 +56,9 @@ class IterableRequestController {
5456
private fifoPauseLevel: number;
5557
private fifoResumeLevel: number;
5658

57-
private promisePending: boolean;
58-
private resolvePromise: iteratorPromiseResolveFunction | undefined;
59-
private rejectPromise: iteratorPromiseRejectFunction | undefined;
59+
private promises: IteratorPromiseFunctions[]; // FIFO of resolve/reject function pairs of pending promises
6060
private terminatorResolve: (() => void) | undefined;
61+
private terminatorPromise: Promise<void> | undefined;
6162

6263
// --- Constructor / Terminator ----------------------------------------------
6364

@@ -73,7 +74,7 @@ class IterableRequestController {
7374
this.fifoPauseLevel = fifoSize;
7475
this.fifoResumeLevel = Math.floor(fifoSize / 2);
7576

76-
this.promisePending = false;
77+
this.promises = [];
7778

7879
request.addListener('row', this.rowEventHandler);
7980
request.addListener('columnMetadata', this.columnMetadataEventHandler);
@@ -86,29 +87,32 @@ class IterableRequestController {
8687
return Promise.resolve();
8788
}
8889
this.request.connection.cancel();
89-
return new Promise<void>((resolve: () => void) => {
90-
this.terminatorResolve = resolve;
91-
});
90+
if (!this.terminatorPromise) {
91+
this.terminatorPromise = new Promise<void>((resolve: () => void) => {
92+
this.terminatorResolve = resolve;
93+
});
94+
}
95+
return this.terminatorPromise;
9296
}
9397

9498
// --- Promise logic ---------------------------------------------------------
9599

96100
private serveError(): boolean {
97-
if (!this.error || !this.promisePending) {
101+
if (!this.error || !this.promises.length) {
98102
return false;
99103
}
100-
this.rejectPromise!(this.error);
101-
this.promisePending = false;
104+
const promise = this.promises.shift()!;
105+
promise.reject(this.error);
102106
return true;
103107
}
104108

105109
private serveRowItem(): boolean {
106-
if (!this.fifo.length || !this.promisePending) {
110+
if (!this.fifo.length || !this.promises.length) {
107111
return false;
108112
}
109113
const item = this.fifo.shift()!;
110-
this.resolvePromise!({ value: item });
111-
this.promisePending = false;
114+
const promise = this.promises.shift()!;
115+
promise.resolve({ value: item });
112116
if (this.fifo.length <= this.fifoResumeLevel && this.requestPaused) {
113117
this.request.resume();
114118
this.requestPaused = false;
@@ -117,35 +121,39 @@ class IterableRequestController {
117121
}
118122

119123
private serveRequestCompletion(): boolean {
120-
if (!this.requestCompleted || !this.promisePending) {
124+
if (!this.requestCompleted || !this.promises.length) {
121125
return false;
122126
}
123-
this.resolvePromise!({ done: true, value: undefined });
124-
this.promisePending = false;
127+
const promise = this.promises.shift()!;
128+
promise.resolve({ done: true, value: undefined });
125129
return true;
126130
}
127131

128-
private servePromise() {
129-
if (this.serveError()) {
130-
return;
131-
}
132+
private serveNextPromise(): boolean {
132133
if (this.serveRowItem()) {
133-
return;
134+
return true;
135+
}
136+
if (this.serveError()) {
137+
return true;
134138
}
135139
if (this.serveRequestCompletion()) {
136-
return; // eslint-disable-line no-useless-return
140+
return true;
141+
}
142+
return false;
143+
}
144+
145+
private servePromises() {
146+
while (true) {
147+
if (!this.serveNextPromise()) {
148+
break;
149+
}
137150
}
138151
}
139152

140153
// This promise executor is called synchronously from within Iterator.next().
141154
public promiseExecutor = (resolve: iteratorPromiseResolveFunction, reject: iteratorPromiseRejectFunction) => {
142-
if (this.promisePending) {
143-
throw new Error('Previous promise is still active.');
144-
}
145-
this.resolvePromise = resolve;
146-
this.rejectPromise = reject;
147-
this.promisePending = true;
148-
this.servePromise();
155+
this.promises.push({ resolve, reject });
156+
this.servePromises();
149157
};
150158

151159
// --- Event handlers --------------------------------------------------------
@@ -161,7 +169,7 @@ class IterableRequestController {
161169
if (error && !this.error) {
162170
this.error = error;
163171
}
164-
this.servePromise();
172+
this.servePromises();
165173
}
166174

167175
private columnMetadataEventHandler = (columnMetadata: ColumnMetadata[] | Record<string, ColumnMetadata>) => {
@@ -175,7 +183,7 @@ class IterableRequestController {
175183
}
176184
if (this.resultSetNo === 0 || !this.columnMetadata) {
177185
this.error = new Error('No columnMetadata event received before row event.');
178-
this.servePromise();
186+
this.servePromises();
179187
return;
180188
}
181189
const item: IterableRequestItem = { row, resultSetNo: this.resultSetNo, columnMetadata: this.columnMetadata };
@@ -184,7 +192,7 @@ class IterableRequestController {
184192
this.request.pause();
185193
this.requestPaused = true;
186194
}
187-
this.servePromise();
195+
this.servePromises();
188196
};
189197

190198
}
@@ -207,9 +215,13 @@ class IterableRequestIterator implements AsyncIterator<IterableRequestItem> {
207215
return Promise.resolve({ value, done: true }); // eslint-disable-line @typescript-eslint/return-await
208216
}
209217

210-
public async throw(_exception?: any): Promise<any> {
218+
public async throw(exception?: any): Promise<any> {
211219
await this.controller.terminate();
212-
return Promise.resolve({ done: true }); // eslint-disable-line @typescript-eslint/return-await
220+
if (exception) {
221+
return Promise.reject(exception); // eslint-disable-line @typescript-eslint/return-await
222+
} else {
223+
return Promise.resolve({ done: true }); // eslint-disable-line @typescript-eslint/return-await
224+
}
213225
}
214226

215227
}

test/integration/iterable-request-test.ts

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { assert } from 'chai';
22

33
import Connection from '../../src/connection';
4+
import { RequestError } from '../../src/errors';
45
import IterableRequest, { type ColumnValue } from '../../src/iterable-request';
56
import { debugOptionsFromEnv } from '../helpers/debug-options-from-env';
67

@@ -96,10 +97,10 @@ describe('Iterable Request Test', function() {
9697

9798
await testForLoop(10000, 500);
9899
await testForLoop(10000, 3);
99-
await testForLoop(10000, 250);
100+
await testForLoop(10000, 250, 100);
100101
await testForLoop(100, 100);
101102

102-
async function testForLoop(n: number, abortCount: number) {
103+
async function testForLoop(n: number, abortCount: number, sleepPos = -1) {
103104
const sql = `
104105
with cte1 as
105106
(select 1 as i union all select i + 1 from cte1 where i < ${n})
@@ -114,6 +115,9 @@ describe('Iterable Request Test', function() {
114115
const row = item.row as ColumnValue[];
115116
const i = row[0].value;
116117
assert(i === ctr + 1);
118+
if (ctr === sleepPos) {
119+
await sleep(250);
120+
}
117121
ctr++;
118122
if (ctr === abortCount) {
119123
break;
@@ -124,4 +128,34 @@ describe('Iterable Request Test', function() {
124128

125129
});
126130

131+
it('tests the error handling logic of the iterable request module', async function() {
132+
const sql = `
133+
select 1
134+
select 2
135+
select 3 / 0
136+
`;
137+
138+
const request = new IterableRequest(sql);
139+
connection.execSql(request);
140+
141+
let ctr = 0;
142+
let errCtr = 0;
143+
try {
144+
for await (const item of request) {
145+
assert(item.resultSetNo === ctr + 1);
146+
const row = item.row as ColumnValue[];
147+
const i = row[0].value;
148+
assert(i === ctr + 1);
149+
ctr++;
150+
}
151+
} catch (err) {
152+
assert.instanceOf(err, RequestError);
153+
assert((err as RequestError).message.toLowerCase().includes('divide by zero'));
154+
errCtr++;
155+
}
156+
assert(ctr === 2);
157+
assert(errCtr === 1);
158+
});
159+
160+
127161
});

0 commit comments

Comments
 (0)