Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
],
"dependencies": {
"buffer": "^6.0.3",
"events": "^3.3.0",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already a transitive dependency via readable-stream

"queue-microtask": "^1.1.2",
"readable-stream": "^4.0.0"
},
Expand Down
49 changes: 48 additions & 1 deletion src/N3Store.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// **N3Store** objects store N3 quads by graph in memory.
import EventEmitter from 'events';
import { Readable } from 'readable-stream';
import { default as N3DataFactory, termToId, termFromId } from './N3DataFactory';
import namespaces from './IRIs';
Expand Down Expand Up @@ -410,8 +411,54 @@ export default class N3Store {

// ### `import` adds a stream of quads to the store
import(stream) {
// Add quads to the store as they arrive
stream.on('data', quad => { this.addQuad(quad); });
return stream;

// Create a promise that resolves when the stream ends
const promise = new Promise((resolve, reject) => {
// Create proxy that combines N3Store with EventEmitter capabilities
const storeProxy = new Proxy(this, {
get(target, prop, receiver) {
return Reflect.get(prop in EventEmitter.prototype ? stream : target, prop, receiver);
},
});

// Check if stream is already closed/ended
if (stream.readableEnded || stream.destroyed || stream.readable === false || stream.closed || stream.ended) {
// Resolve immediately if stream is already closed
resolve(storeProxy);
}
else {
// Otherwise, wait for end/error events
// eslint-disable-next-line func-style
const onEnd = () => {
// eslint-disable-next-line no-use-before-define
stream.removeListener('error', onError);
resolve(storeProxy);
};

// eslint-disable-next-line func-style
const onError = err => {
stream.removeListener('end', onEnd);
reject(err);
};

stream.once('end', onEnd);
stream.once('error', onError);
Comment on lines +451 to +452
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to only create this promise when then, catch, or finally were used in the below proxy - and thus only start listening to the stream at that point in time.

Unfortunately, that turned out to not be an option; as EventEmitters error if the error event is called and there are no error listeners - which causes the last added test case to break.

}
});

// Return a proxy that acts as both stream and promise without mutating the stream object
return new Proxy(stream, {
get(target, prop) {
// Forward Promise methods to the promise object
if (prop === 'then' || prop === 'catch' || prop === 'finally') {
return promise[prop].bind(promise);
}
// All other properties and methods are from the stream
return target[prop];
},
});
}

// ### `removeQuad` removes a quad from the store if it exists
Expand Down
125 changes: 125 additions & 0 deletions test/N3Store-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2397,6 +2397,131 @@ describe('Store', () => {
it('should have size 2', () => { expect(empty.size).toEqual(2); });
});

describe('#import promise', () => {
it('should have size 2', async () => {
const stream = new ArrayReader([
new Quad(new NamedNode('s1'), new NamedNode('p2'), new NamedNode('o2')),
new Quad(new NamedNode('s1'), new NamedNode('p1'), new NamedNode('o1')),
]);

expect((await new Store().import(stream)).size).toEqual(2);
});
});

describe('N3Store import', () => {
it('should not add "end" or "error" listeners until await is called', async () => {
const stream = new ArrayReader([
new Quad(new NamedNode('s1'), new NamedNode('p2'), new NamedNode('o2')),
new Quad(new NamedNode('s1'), new NamedNode('p1'), new NamedNode('o1')),
]);
const store = new Store();

const newListners = [];
stream.on('newListener', (event, listener) => { newListners.push({ event, listener }); });

const importPromise = store.import(stream);

expect(stream.listenerCount('end')).toEqual(1);
expect(stream.listenerCount('error')).toEqual(1);

expect(newListners).toEqual([
{ event: 'data', listener: expect.any(Function) },
{ event: 'end', listener: expect.any(Function) },
{ event: 'error', listener: expect.any(Function) },
]);

await importPromise;

expect(stream.listenerCount('end')).toEqual(0);
expect(stream.listenerCount('error')).toEqual(0);

// Checking that 'end' and 'error' listeners were added and then removed in importing
expect(newListners).toEqual([
{ event: 'data', listener: expect.any(Function) },
{ event: 'end', listener: expect.any(Function) },
{ event: 'error', listener: expect.any(Function) },
]);
});

it('should still be a functional event emitter before and after awaiting', async () => {
const stream = new ArrayReader([
new Quad(new NamedNode('s1'), new NamedNode('p2'), new NamedNode('o2')),
new Quad(new NamedNode('s1'), new NamedNode('p1'), new NamedNode('o1')),
]);
const store = new Store();
const importPromise = store.import(stream);

let receivedPreAwait = false;
stream.on('testEvent', data => {
receivedPreAwait = true;
expect(data).toBe('testData');
});
stream.emit('testEvent', 'testData');
expect(receivedPreAwait).toBe(true);

await importPromise;

let receivedPostAwait = false;
stream.on('testEvent2', data => {
receivedPostAwait = true;
expect(data).toBe('testDataAfterAwait');
});
stream.emit('testEvent2', 'testDataAfterAwait');
expect(receivedPostAwait).toBe(true);
});

it('should not add store attributes to the stream and vice-versa; pre and post import', async () => {
const stream = new ArrayReader([
new Quad(new NamedNode('s1'), new NamedNode('p2'), new NamedNode('o2')),
new Quad(new NamedNode('s1'), new NamedNode('p1'), new NamedNode('o1')),
]);
const store = new Store();
const importPromise = store.import(stream);

expect(stream.addQuad).toBeUndefined();
expect(store.addQuad).not.toBeUndefined();

expect(store.on).toBeUndefined();
expect(stream.on).not.toBeUndefined();

const res = await importPromise;

expect(stream.addQuad).toBeUndefined();
expect(store.addQuad).not.toBeUndefined();
expect(res.addQuad).not.toBeUndefined();
expect(importPromise.addQuad).toBeUndefined();

expect(store.on).toBeUndefined();
expect(stream.on).not.toBeUndefined();
expect(res.on).not.toBeUndefined();
});

it('should resolve the promise if importing a completed stream', async () => {
const stream = new ArrayReader([]);
const store = new Store();

stream.on('data', () => {});

await new Promise(resolve => {
stream.on('end', resolve);
stream.on('error', resolve);
});

const importResult = await store.import(stream);

expect(importResult).toBeTruthy();
});

it('should reject if there is an error in the stream', async () => {
const stream = new ArrayReader([]);
const store = new Store();
const imported = store.import(stream);
stream.emit('error', 'Test error');

await expect(imported).rejects.toEqual('Test error');
});
});

describe('#forEach', () => {
it('should iterate over quads', () => {
let count = 0;
Expand Down