Skip to content

Commit 3075122

Browse files
committed
fix: Send data events, remove for await handling
1 parent 6d8774f commit 3075122

File tree

1 file changed

+9
-16
lines changed

1 file changed

+9
-16
lines changed

src/scheduler/scheduler.ts

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -40,26 +40,22 @@ export enum Strategy {
4040
}
4141

4242
class TableResolverStream extends Duplex {
43-
queue: unknown[] = [];
44-
4543
constructor() {
4644
super({ objectMode: true });
4745
}
4846

49-
_read() {
50-
while (this.queue.length > 0) {
51-
this.push(this.queue.shift());
52-
}
53-
if (this.writableEnded) {
54-
// end readable stream if writable stream has ended
55-
this.push(null);
56-
}
57-
}
47+
_read() {}
5848

5949
_write(chunk: unknown, _: string, next: (error?: Error | null) => void) {
60-
this.queue.push(chunk);
50+
this.emit('data', chunk);
6151
next();
6252
}
53+
54+
end(callback?: () => void): this {
55+
this.emit('end');
56+
callback?.();
57+
return this;
58+
}
6359
}
6460

6561
const validateResource = (resource: Resource) => {
@@ -167,10 +163,6 @@ const resolveTable = async (
167163
await queue.add(() => processData(data));
168164
});
169165

170-
stream.on('end', async () => {
171-
await queue.onIdle();
172-
});
173-
174166
try {
175167
await resolverPromise;
176168
} catch (error) {
@@ -182,6 +174,7 @@ const resolveTable = async (
182174
return;
183175
} finally {
184176
stream.end();
177+
await queue.onIdle();
185178
}
186179

187180
logger.info(`done resolving table ${table.name}`);

0 commit comments

Comments
 (0)