Skip to content

Commit 1af44a8

Browse files
committed
Add CompositeDatasource
This allows multiple datasources to be grouped in one virtual datasource.
1 parent 01700e7 commit 1af44a8

File tree

3 files changed

+327
-0
lines changed

3 files changed

+327
-0
lines changed

config/config-composite.json

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
{
2+
"title": "My Composite Linked Data Fragments server",
3+
4+
"datasources": {
5+
"test-composite": {
6+
"title": "Composite Test",
7+
"type": "CompositeDatasource",
8+
"description": "A test composite datasource",
9+
"settings": {
10+
"references": [ "hdt", "ttl", "jsonld" ]
11+
}
12+
},
13+
"hdt": {
14+
"title": "HDT",
15+
"type": "HdtDatasource",
16+
"description": "A test HDT datasource",
17+
"settings": { "file": "test/assets/test.hdt" }
18+
},
19+
"ttl": {
20+
"title": "Turtle",
21+
"type": "TurtleDatasource",
22+
"description": "A test turtle datasource",
23+
"settings": { "file": "test/assets/test.ttl" }
24+
},
25+
"jsonld": {
26+
"title": "JSONLD",
27+
"type": "JsonLdDatasource",
28+
"description": "A test jsonld datasource",
29+
"settings": { "file": "test/assets/test.jsonld" }
30+
}
31+
}
32+
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*! @license ©2016 Ruben Taelman - Multimedia Lab / iMinds / Ghent University */
2+
3+
/** A CompositeDatasource delegates queries to an consecutive list of datasources. */
4+
5+
var Datasource = require('./Datasource');
6+
7+
// Creates a new CompositeDatasource
8+
function CompositeDatasource(options) {
9+
if (!(this instanceof CompositeDatasource))
10+
return new CompositeDatasource(options);
11+
Datasource.call(this, options);
12+
13+
if (!options.references) {
14+
throw new Error("A CompositeDatasource requires a `references` array of datasource id's in its settings.");
15+
}
16+
17+
var allDatasources = options.datasources;
18+
this._datasources = {};
19+
this._datasourceNames = [];
20+
for (var i = 0; i < options.references.length; i++) {
21+
var datasourceName = options.references[i];
22+
var datasource = allDatasources[datasourceName];
23+
if (!datasource) {
24+
throw new Error("No datasource " + datasourceName + " could be found!");
25+
}
26+
if (datasource.enabled === false) {
27+
throw new Error("Datasource " + datasourceName + " must be enabled!");
28+
}
29+
this._datasources[datasourceName] = datasource;
30+
this._datasourceNames.push(datasourceName);
31+
}
32+
}
33+
Datasource.extend(CompositeDatasource);
34+
35+
// Checks whether the data source can evaluate the given query
36+
CompositeDatasource.prototype.supportsQuery = function (query) {
37+
for (var datasourceName in this._datasources) {
38+
if (this._getDatasourceByName(datasourceName).supportsQuery(query)) {
39+
return true;
40+
}
41+
}
42+
return false;
43+
};
44+
45+
// Find a datasource by datasource name
46+
CompositeDatasource.prototype._getDatasourceByName = function(datasourceName) {
47+
return this._datasources[datasourceName].datasource;
48+
};
49+
50+
// Find a datasource by datasource id inside this composition
51+
CompositeDatasource.prototype._getDatasourceById = function(datasourceIndex) {
52+
return this._datasources[this._datasourceNames[datasourceIndex]].datasource;
53+
};
54+
55+
// Recursively find all required datasource composition info to perform a query.
56+
// The callback will provide the parameters:
57+
// Datasource id to start querying from
58+
// The offset to use to start querying from the given datasource id
59+
// The total count for all datasources
60+
CompositeDatasource.prototype._getDatasourceInfo = function(query, absoluteOffset, cb) {
61+
var self = this;
62+
var emptyQuery = {
63+
offset: 0, limit: 0,
64+
subject: query.subject, predicate: query.predicate, object: query.object
65+
};
66+
return findRecursive(0, absoluteOffset, -1, -1, 0, cb);
67+
68+
function findRecursive(datasourceIndex, offset, chosenDatasource, chosenOffset, totalCount) {
69+
if (datasourceIndex >= self._datasourceNames.length) {
70+
// We checked all datasources, return our accumulated information
71+
cb(chosenDatasource, chosenOffset, totalCount);
72+
} else {
73+
var emptyTripleStream = { push: noop };
74+
self._getDatasourceById(datasourceIndex)._executeQuery(emptyQuery, emptyTripleStream, function (metadata) {
75+
var count = metadata.totalCount;
76+
if (chosenDatasource < 0 && offset < count) {
77+
// We can start querying from this datasource
78+
setImmediate(function () {
79+
findRecursive(datasourceIndex + 1, offset - count, datasourceIndex, offset, totalCount + count);
80+
});
81+
} else {
82+
// We forward our accumulated information and go check the next datasource
83+
setImmediate(function () {
84+
findRecursive(datasourceIndex + 1, offset - count, chosenDatasource, chosenOffset, totalCount + count);
85+
});
86+
}
87+
});
88+
}
89+
}
90+
};
91+
92+
function noop() {}
93+
94+
// Writes the results of the query to the given triple stream
95+
CompositeDatasource.prototype._executeQuery = function (query, tripleStream, metadataCallback) {
96+
var offset = query.offset || 0, limit = query.limit || Infinity;
97+
var self = this;
98+
this._getDatasourceInfo(query, offset, function(datasourceIndex, relativeOffset, totalCount) {
99+
if (datasourceIndex < 0) {
100+
// No valid datasource has been found
101+
metadataCallback({ totalCount: totalCount });
102+
tripleStream.push(null);
103+
} else {
104+
// Send query to first applicable datasource and optionally emit triples from consecutive datasources
105+
metadataCallback({ totalCount: totalCount });
106+
var emitted = 0;
107+
108+
// Modify our triple stream so that if all results from one datasource have arrived,
109+
// check if we haven't reached the limit and if so, trigger a new query for the next datasource.
110+
tripleStream.push = makeSmartPush(tripleStream, function(localEmittedCount) {
111+
// This is called after the last element has been pushed
112+
113+
// If we haven't reached our limit, try to fill it with other datasource query results.
114+
emitted += localEmittedCount;
115+
datasourceIndex++;
116+
if (emitted < limit && datasourceIndex < self._datasourceNames.length) {
117+
var localLimit = limit - emitted;
118+
var subQuery = { offset: 0, limit: localLimit,
119+
subject: query.subject, predicate: query.predicate, object: query.object };
120+
self._getDatasourceById(datasourceIndex)._executeQuery(subQuery, tripleStream, function(){});
121+
return true;
122+
} else {
123+
return false;
124+
}
125+
});
126+
127+
// Initiate query to the first datasource.
128+
var subQuery = { offset: relativeOffset, limit: limit,
129+
subject: query.subject, predicate: query.predicate, object: query.object };
130+
self._getDatasourceById(datasourceIndex)._executeQuery(subQuery, tripleStream, function(){});
131+
}
132+
});
133+
134+
// Replaces a tripleStream.push
135+
// It takes the tripleStream as first argument and a callback as second argument.
136+
// The callback will be called when the push function is called with a falsy value.
137+
// Returning a falsy value inside the callback will delegate the falsy value to the original
138+
// push function anyways.
139+
function makeSmartPush(self, nullCb) {
140+
var count = 0;
141+
var originalPush = self.push;
142+
return function(element) {
143+
if (!element) {
144+
if(!nullCb(count)) {
145+
originalPush.call(self, element);
146+
}
147+
} else {
148+
count++;
149+
originalPush.call(self, element);
150+
}
151+
};
152+
}
153+
};
154+
155+
module.exports = CompositeDatasource;
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
var Datasource = require('../../lib/datasources/Datasource'),
2+
HdtDatasource = require('../../lib/datasources/HdtDatasource'),
3+
TurtleDatasource = require('../../lib/datasources/TurtleDatasource'),
4+
CompositeDatasource = require('../../lib/datasources/CompositeDatasource'),
5+
path = require('path');
6+
7+
var exampleHdtFile = path.join(__dirname, '../assets/test.hdt');
8+
var exampleHdtFileWithBlanks = path.join(__dirname, '../assets/test-blank.hdt');
9+
var exampleTurtleUrl = 'file://' + path.join(__dirname, '../assets/test.ttl');
10+
11+
describe('CompositeDatasource', function () {
12+
var datasources = {
13+
"data_0": { datasource: new HdtDatasource({ file: exampleHdtFile }), size: 132 },
14+
"data_1": { datasource: new HdtDatasource({ file: exampleHdtFileWithBlanks }), size: 6 },
15+
"data_2": { datasource: new TurtleDatasource({ url: exampleTurtleUrl }), size: 132 }
16+
};
17+
var references = Object.keys(datasources);
18+
var totalSize = Object.keys(datasources).reduce(function(acc, key) {
19+
return acc + datasources[key].size;
20+
}, 0);
21+
22+
describe('The CompositeDatasource module', function () {
23+
it('should be a function', function () {
24+
CompositeDatasource.should.be.a('function');
25+
});
26+
27+
it('should be an CompositeDatasource constructor', function (done) {
28+
var instance = new CompositeDatasource({ datasources: datasources, references: references });
29+
instance.should.be.an.instanceof(CompositeDatasource);
30+
instance.close(done);
31+
});
32+
33+
it('should create CompositeDatasource objects', function (done) {
34+
var instance = new CompositeDatasource({ datasources: datasources, references: references });
35+
instance.should.be.an.instanceof(CompositeDatasource);
36+
instance.close(done);
37+
});
38+
39+
it('should create Datasource objects', function (done) {
40+
var instance = new CompositeDatasource({ datasources: datasources, references: references });
41+
instance.should.be.an.instanceof(Datasource);
42+
instance.close(done);
43+
});
44+
});
45+
46+
describe('A CompositeDatasource instance for two HdtDatasources', function () {
47+
var datasource, getDatasource = function () { return datasource; };
48+
before(function (done) {
49+
datasource = new CompositeDatasource({ datasources: datasources, references: references });
50+
datasource.on('initialized', done);
51+
});
52+
after(function (done) {
53+
datasource.close(done);
54+
});
55+
56+
itShouldExecute(getDatasource,
57+
'the empty query',
58+
{ features: { triplePattern: true } },
59+
totalSize, totalSize);
60+
61+
itShouldExecute(getDatasource,
62+
'the empty query with a limit',
63+
{ limit: 10, features: { triplePattern: true, limit: true } },
64+
10, totalSize);
65+
66+
itShouldExecute(getDatasource,
67+
'the empty query with an offset of 10',
68+
{ offset: 10, features: { triplePattern: true, offset: true } },
69+
totalSize - 10, totalSize);
70+
71+
itShouldExecute(getDatasource,
72+
'the empty query with an offset of 100',
73+
{ offset: 100, features: { triplePattern: true, offset: true } },
74+
totalSize - 100, totalSize);
75+
76+
itShouldExecute(getDatasource,
77+
'the empty query with an offset of 200',
78+
{ offset: 200, features: { triplePattern: true, offset: true } },
79+
totalSize - 200, totalSize);
80+
81+
itShouldExecute(getDatasource,
82+
'a query for an existing subject',
83+
{ subject: 'http://example.org/s1', limit: 10, features: { triplePattern: true, limit: true } },
84+
10, 200);
85+
86+
itShouldExecute(getDatasource,
87+
'a query for a non-existing subject',
88+
{ subject: 'http://example.org/p1', limit: 10, features: { triplePattern: true, limit: true } },
89+
0, 0);
90+
91+
itShouldExecute(getDatasource,
92+
'a query for an existing predicate',
93+
{ predicate: 'http://example.org/p1', limit: 10, features: { triplePattern: true, limit: true } },
94+
10, 220 + 110); // 220 is an inexact count from the HDT files, the 110 from the ttl file is exact.
95+
96+
itShouldExecute(getDatasource,
97+
'a query for a non-existing predicate',
98+
{ predicate: 'http://example.org/s1', limit: 10, features: { triplePattern: true, limit: true } },
99+
0, 0);
100+
101+
itShouldExecute(getDatasource,
102+
'a query for an existing object',
103+
{ object: 'http://example.org/o001', limit: 10, features: { triplePattern: true, limit: true } },
104+
6, 6);
105+
106+
itShouldExecute(getDatasource,
107+
'a query for a non-existing object',
108+
{ object: 'http://example.org/s1', limit: 10, features: { triplePattern: true, limit: true } },
109+
0, 0);
110+
});
111+
});
112+
113+
function itShouldExecute(getDatasource, name, query,
114+
expectedResultsCount, expectedTotalCount, expectedTriples) {
115+
describe('executing ' + name, function () {
116+
var resultsCount = 0, totalCount, triples = [];
117+
before(function (done) {
118+
var result = getDatasource().select(query);
119+
result.on('metadata', function (metadata) { totalCount = metadata.totalCount; });
120+
result.on('data', function (triple) { resultsCount++; expectedTriples && triples.push(triple); });
121+
result.on('end', done);
122+
});
123+
124+
it('should return the expected number of triples', function () {
125+
expect(resultsCount).to.equal(expectedResultsCount);
126+
});
127+
128+
it('should emit the expected total number of triples', function () {
129+
expect(totalCount).to.equal(expectedTotalCount);
130+
});
131+
132+
if (expectedTriples) {
133+
it('should emit the expected triples', function () {
134+
expect(triples.length).to.equal(expectedTriples.length);
135+
for (var i = 0; i < expectedTriples.length; i++)
136+
triples[i].should.deep.equal(expectedTriples[i]);
137+
});
138+
}
139+
});
140+
}

0 commit comments

Comments
 (0)