Skip to content

Commit 95f477e

Browse files
Fix quad event not being emitted anymore
1 parent 0699221 commit 95f477e

File tree

2 files changed

+35
-1
lines changed

2 files changed

+35
-1
lines changed

lib/StreamingStore.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,14 @@ implements RDF.Source<Q>, RDF.Sink<RDF.Stream<Q>, EventEmitter> {
7575
quad.graph,
7676
)) {
7777
for (const pendingStream of this.pendingStreams.getPendingStreamsForQuad(quad)) {
78+
/**
79+
* The pendingStream emits 'quad' events even before it is initialized.
80+
* This allows us to detect when matching quads are added to the store,
81+
* without having to consume the stream returned by the `match` method.
82+
*/
83+
pendingStream.emit('quad', quad);
7884
if ((<any> pendingStream).isInitialized) {
7985
pendingStream.push(quad);
80-
pendingStream.emit('quad', quad);
8186
}
8287
}
8388
}
@@ -116,6 +121,9 @@ implements RDF.Source<Q>, RDF.Sink<RDF.Stream<Q>, EventEmitter> {
116121
const pendingStream = new PassThrough({ objectMode: true });
117122
this.pendingStreams.addPatternListener(pendingStream, subject, predicate, object, graph);
118123
stream = Readable.from(StreamingStore.concatStreams([ storeResult, pendingStream ]));
124+
pendingStream.on('quad', quad => {
125+
stream.emit('quad', quad);
126+
});
119127
(<any> stream)._pipeSource = storeResult;
120128

121129
// This is an ugly hack to annotate pendingStream with the isInitialized once the store stream started being read.

test/StreamingStore-test.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import 'jest-rdf';
2+
import type * as RDF from '@rdfjs/types';
23
import arrayifyStream from 'arrayify-stream';
34
import { promisifyEventEmitter } from 'event-emitter-promisify/dist';
45
import { Store } from 'n3';
@@ -555,6 +556,31 @@ describe('StreamingStore', () => {
555556
expect(listener).toHaveBeenCalledTimes(1);
556557
});
557558

559+
it('should emit a quad event when new matching quads are imported to the store', async() => {
560+
await promisifyEventEmitter(store.import(streamifyArray([
561+
quad('s1', 'p1', 'o1'),
562+
quad('s2', 'p2', 'o2'),
563+
])));
564+
565+
const stream = store.match(DF.namedNode('s1'));
566+
const quads: RDF.Quad[] = [];
567+
stream.on('quad', importedQuad => {
568+
quads.push(importedQuad);
569+
});
570+
571+
await promisifyEventEmitter(store.import(streamifyArray([
572+
quad('s1', 'p3', 'o3'),
573+
quad('s4', 'p4', 'o4'),
574+
])));
575+
576+
store.end();
577+
expect(quads).toStrictEqual(
578+
[
579+
quad('s1', 'p3', 'o3'),
580+
],
581+
);
582+
});
583+
558584
it('handle the reporting of the ending of the store', () => {
559585
expect(store.hasEnded()).toBe(false);
560586
store.end();

0 commit comments

Comments
 (0)