Skip to content

Commit 781254e

Browse files
committed
Merge branch 'feature-compositedatasource' into develop
* feature-compositedatasource: Bump HDT dependency to 1.3.0 rename exactCount to hasExactCount Interpret triplecount-exactness in ExternalHdtDatasource Cache exact count operation in CompositeDatasource Require exact counts for determining the starting datasource in CompositeDatasource Emit whether or not the triple counts are exact in the metadata callback Fix failing HDT count test Allow datasources to be hidden from the index Allow composed datasources to be disabled Fix infinite loop for metadata retrieval when composing HdtDatasources Add CompositeDatasource This allows multiple datasources to be grouped in one virtual datasource.
2 parents 01700e7 + 930f3d7 commit 781254e

File tree

10 files changed

+393
-12
lines changed

10 files changed

+393
-12
lines changed

config/config-composite.json

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
{
2+
"title": "My Composite Linked Data Fragments server",
3+
4+
"datasources": {
5+
"test-composite": {
6+
"title": "Composite Test",
7+
"type": "CompositeDatasource",
8+
"description": "A test composite datasource",
9+
"settings": {
10+
"references": [ "hdt", "ttl", "jsonld", "hdtext" ]
11+
}
12+
},
13+
"hdt": {
14+
"hide": true,
15+
"title": "HDT",
16+
"type": "HdtDatasource",
17+
"description": "A test HDT datasource",
18+
"settings": { "file": "test/assets/test.hdt" }
19+
},
20+
"ttl": {
21+
"hide": true,
22+
"title": "Turtle",
23+
"type": "TurtleDatasource",
24+
"description": "A test turtle datasource",
25+
"settings": { "file": "test/assets/test.ttl" }
26+
},
27+
"jsonld": {
28+
"hide": true,
29+
"title": "JSONLD",
30+
"type": "JsonLdDatasource",
31+
"description": "A test jsonld datasource",
32+
"settings": { "file": "test/assets/test.jsonld" }
33+
},
34+
"hdtext": {
35+
"hide": true,
36+
"title": "HDT-EXT",
37+
"type": "ExternalHdtDatasource",
38+
"description": "A blank test HDT datasource",
39+
"settings": { "file": "test/assets/test-blank.hdt" }
40+
}
41+
}
42+
}
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*! @license ©2016 Ruben Taelman - Multimedia Lab / iMinds / Ghent University */
2+
3+
/** A CompositeDatasource delegates queries to an consecutive list of datasources. */
4+
5+
var Datasource = require('./Datasource'),
6+
LRU = require('lru-cache');
7+
8+
// Creates a new CompositeDatasource
9+
function CompositeDatasource(options) {
10+
if (!(this instanceof CompositeDatasource))
11+
return new CompositeDatasource(options);
12+
Datasource.call(this, options);
13+
14+
if (!options.references) {
15+
throw new Error("A CompositeDatasource requires a `references` array of datasource id's in its settings.");
16+
}
17+
18+
var allDatasources = options.datasources;
19+
this._datasources = {};
20+
this._datasourceNames = [];
21+
for (var i = 0; i < options.references.length; i++) {
22+
var datasourceName = options.references[i];
23+
var datasource = allDatasources[datasourceName];
24+
if (!datasource) {
25+
throw new Error("No datasource " + datasourceName + " could be found!");
26+
}
27+
if (datasource.enabled !== false) {
28+
this._datasources[datasourceName] = datasource;
29+
this._datasourceNames.push(datasourceName);
30+
}
31+
}
32+
this._countCache = LRU({ max: 1000, maxAge: 1000 * 60 * 60 * 3 });
33+
}
34+
Datasource.extend(CompositeDatasource);
35+
36+
// Checks whether the data source can evaluate the given query
37+
CompositeDatasource.prototype.supportsQuery = function (query) {
38+
for (var datasourceName in this._datasources) {
39+
if (this._getDatasourceByName(datasourceName).supportsQuery(query)) {
40+
return true;
41+
}
42+
}
43+
return false;
44+
};
45+
46+
// Find a datasource by datasource name
47+
CompositeDatasource.prototype._getDatasourceByName = function(datasourceName) {
48+
return this._datasources[datasourceName].datasource;
49+
};
50+
51+
// Find a datasource by datasource id inside this composition
52+
CompositeDatasource.prototype._getDatasourceById = function(datasourceIndex) {
53+
return this._datasources[this._datasourceNames[datasourceIndex]].datasource;
54+
};
55+
56+
// Count the amount of triple in the query result to get an exact count.
57+
CompositeDatasource.prototype._getExactCount = function(datasource, query, cb) {
58+
// Try to find a cache match
59+
var cacheKey = query.subject + "|" + query.predicate + "|" + query.object;
60+
var cache = this._countCache, count = cache.get(cacheKey);
61+
if (count) return setImmediate(cb, count);
62+
63+
// Otherwise, count all the triples manually
64+
var emptyQuery = { offset: 0, subject: query.subject, predicate: query.predicate, object: query.object };
65+
var exactCount = 0;
66+
var countingTripleStream = { push: function(triple) {
67+
if (triple) {
68+
exactCount++;
69+
} else {
70+
// Cache large values; small ones are calculated fast anyway
71+
if (exactCount > 1000)
72+
cache.set(cacheKey, exactCount);
73+
cb(exactCount);
74+
}
75+
}};
76+
datasource._executeQuery(emptyQuery, countingTripleStream, noop);
77+
};
78+
79+
// Recursively find all required datasource composition info to perform a query.
80+
// The callback will provide the parameters:
81+
// Datasource id to start querying from
82+
// The offset to use to start querying from the given datasource id
83+
// The total count for all datasources
84+
CompositeDatasource.prototype._getDatasourceInfo = function(query, absoluteOffset, cb) {
85+
var self = this;
86+
var emptyQuery = {
87+
offset: 0, limit: 1,
88+
subject: query.subject, predicate: query.predicate, object: query.object
89+
};
90+
return findRecursive(0, absoluteOffset, -1, -1, 0, cb, true);
91+
92+
function findRecursive(datasourceIndex, offset, chosenDatasource, chosenOffset, totalCount, hasExactCount) {
93+
if (datasourceIndex >= self._datasourceNames.length) {
94+
// We checked all datasources, return our accumulated information
95+
cb(chosenDatasource, chosenOffset, totalCount, hasExactCount);
96+
} else {
97+
var emptyTripleStream = { push: noop };
98+
var datasource = self._getDatasourceById(datasourceIndex);
99+
datasource._executeQuery(emptyQuery, emptyTripleStream, function (metadata) {
100+
var count = metadata.totalCount;
101+
var exact = metadata.hasExactCount;
102+
// If we are still looking for an appropriate datasource, we need exact counts!
103+
if (offset > 0 && !exact) {
104+
self._getExactCount(datasource, query, function(exactCount) {
105+
count = exactCount;
106+
exact = true;
107+
continueRecursion();
108+
});
109+
} else {
110+
continueRecursion();
111+
}
112+
113+
function continueRecursion() {
114+
if (chosenDatasource < 0 && offset < count) {
115+
// We can start querying from this datasource
116+
setImmediate(function () {
117+
findRecursive(datasourceIndex + 1, offset - count, datasourceIndex, offset,
118+
totalCount + count, hasExactCount && exact);
119+
});
120+
} else {
121+
// We forward our accumulated information and go check the next datasource
122+
setImmediate(function () {
123+
findRecursive(datasourceIndex + 1, offset - count, chosenDatasource, chosenOffset,
124+
totalCount + count, hasExactCount && exact);
125+
});
126+
}
127+
}
128+
});
129+
}
130+
}
131+
};
132+
133+
function noop() {}
134+
135+
// Writes the results of the query to the given triple stream
136+
CompositeDatasource.prototype._executeQuery = function (query, tripleStream, metadataCallback) {
137+
var offset = query.offset || 0, limit = query.limit || Infinity;
138+
var self = this;
139+
this._getDatasourceInfo(query, offset, function(datasourceIndex, relativeOffset, totalCount, hasExactCount) {
140+
if (datasourceIndex < 0) {
141+
// No valid datasource has been found
142+
metadataCallback({ totalCount: totalCount, hasExactCount: hasExactCount });
143+
tripleStream.push(null);
144+
} else {
145+
// Send query to first applicable datasource and optionally emit triples from consecutive datasources
146+
metadataCallback({ totalCount: totalCount, hasExactCount: hasExactCount });
147+
var emitted = 0;
148+
149+
// Modify our triple stream so that if all results from one datasource have arrived,
150+
// check if we haven't reached the limit and if so, trigger a new query for the next datasource.
151+
tripleStream.push = makeSmartPush(tripleStream, function(localEmittedCount) {
152+
// This is called after the last element has been pushed
153+
154+
// If we haven't reached our limit, try to fill it with other datasource query results.
155+
emitted += localEmittedCount;
156+
datasourceIndex++;
157+
if (emitted < limit && datasourceIndex < self._datasourceNames.length) {
158+
var localLimit = limit - emitted;
159+
var subQuery = { offset: 0, limit: localLimit,
160+
subject: query.subject, predicate: query.predicate, object: query.object };
161+
self._getDatasourceById(datasourceIndex)._executeQuery(subQuery, tripleStream, function(){});
162+
return true;
163+
} else {
164+
return false;
165+
}
166+
});
167+
168+
// Initiate query to the first datasource.
169+
var subQuery = { offset: relativeOffset, limit: limit,
170+
subject: query.subject, predicate: query.predicate, object: query.object };
171+
self._getDatasourceById(datasourceIndex)._executeQuery(subQuery, tripleStream, function(){});
172+
}
173+
});
174+
175+
// Replaces a tripleStream.push
176+
// It takes the tripleStream as first argument and a callback as second argument.
177+
// The callback will be called when the push function is called with a falsy value.
178+
// Returning a falsy value inside the callback will delegate the falsy value to the original
179+
// push function anyways.
180+
function makeSmartPush(self, nullCb) {
181+
var count = 0;
182+
var originalPush = self.push;
183+
return function(element) {
184+
if (!element) {
185+
if(!nullCb(count)) {
186+
originalPush.call(self, element);
187+
}
188+
} else {
189+
count++;
190+
originalPush.call(self, element);
191+
}
192+
};
193+
}
194+
};
195+
196+
module.exports = CompositeDatasource;

lib/datasources/ExternalHdtDatasource.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ ExternalHdtDatasource.prototype._executeQuery = function (query, tripleStream, m
4343
'--', hdtFile], { stdio: ['ignore', 'pipe', 'ignore'] });
4444
// Parse the result triples
4545
hdt.stdout.setEncoding('utf8');
46-
var parser = new N3Parser(), tripleCount = 0, estimatedTotalCount = 0;
46+
var parser = new N3Parser(), tripleCount = 0, estimatedTotalCount = 0, hasExactCount = true;
4747
parser.parse(hdt.stdout, function (error, triple) {
4848
if (error)
4949
tripleStream.emit('error', new Error('Invalid query result: ' + error.message));
@@ -53,7 +53,7 @@ ExternalHdtDatasource.prototype._executeQuery = function (query, tripleStream, m
5353
// Ensure the estimated total count is as least as large as the number of triples
5454
if (tripleCount && estimatedTotalCount < offset + tripleCount)
5555
estimatedTotalCount = offset + (tripleCount < query.limit ? tripleCount : 2 * tripleCount);
56-
metadataCallback({ totalCount: estimatedTotalCount });
56+
metadataCallback({ totalCount: estimatedTotalCount, hasExactCount: hasExactCount });
5757
tripleStream.push(null);
5858
}
5959
});
@@ -62,6 +62,7 @@ ExternalHdtDatasource.prototype._executeQuery = function (query, tripleStream, m
6262
// Extract the estimated number of total matches from the first (comment) line
6363
hdt.stdout.once('data', function (header) {
6464
estimatedTotalCount = parseInt(header.match(/\d+/), 10) || 0;
65+
hasExactCount = header.indexOf("estimated") < 0;
6566
});
6667

6768
// Report query errors

lib/datasources/HdtDatasource.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,13 @@ HdtDatasource.prototype._initialize = function (done) {
3333
HdtDatasource.prototype._executeQuery = function (query, tripleStream, metadataCallback) {
3434
this._hdtDocument.search(query.subject, query.predicate, query.object,
3535
{ limit: query.limit, offset: query.offset },
36-
function (error, triples, estimatedTotalCount) {
36+
function (error, triples, estimatedTotalCount, hasExactCount) {
3737
if (error) return tripleStream.emit('error', error);
3838
// Ensure the estimated total count is as least as large as the number of triples
3939
var tripleCount = triples.length, offset = query.offset || 0;
4040
if (tripleCount && estimatedTotalCount < offset + tripleCount)
4141
estimatedTotalCount = offset + (tripleCount < query.limit ? tripleCount : 2 * tripleCount);
42-
metadataCallback({ totalCount: estimatedTotalCount });
42+
metadataCallback({ totalCount: estimatedTotalCount, hasExactCount: hasExactCount });
4343
// Add the triples to the stream
4444
for (var i = 0; i < tripleCount; i++)
4545
tripleStream.push(triples[i]);

lib/datasources/IndexDatasource.js

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ MemoryDatasource.extend(IndexDatasource);
2222
IndexDatasource.prototype._getAllTriples = function (addTriple, done) {
2323
for (var name in this._datasources) {
2424
var datasource = this._datasources[name], datasourceUrl = datasource.url;
25-
triple(datasourceUrl, rdf + 'type', voID + 'Dataset');
26-
triple(datasourceUrl, rdfs + 'label', datasource.title, true);
27-
triple(datasourceUrl, dc + 'title', datasource.title, true);
28-
triple(datasourceUrl, dc + 'description', datasource.description, true);
25+
if (!datasource.hide) {
26+
triple(datasourceUrl, rdf + 'type', voID + 'Dataset');
27+
triple(datasourceUrl, rdfs + 'label', datasource.title, true);
28+
triple(datasourceUrl, dc + 'title', datasource.title, true);
29+
triple(datasourceUrl, dc + 'description', datasource.description, true);
30+
}
2931
}
3032
function triple(subject, predicate, object, isLiteral) {
3133
if (subject && predicate && object)

lib/datasources/MemoryDatasource.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ MemoryDatasource.prototype._executeQuery = function (query, tripleStream, metada
2929
var offset = query.offset || 0, limit = query.limit || Infinity,
3030
triples = this._tripleStore.findByIRI(query.subject, query.predicate, query.object);
3131
// Send the metadata
32-
metadataCallback({ totalCount: triples.length });
32+
metadataCallback({ totalCount: triples.length, hasExactCount: true });
3333
// Send the requested subset of triples
3434
for (var i = offset, l = Math.min(offset + limit, triples.length); i < l; i++)
3535
tripleStream.push(triples[i]);

lib/datasources/SparqlDatasource.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ SparqlDatasource.prototype._executeQuery = function (query, tripleStream, metada
5555
// Determine the total number of matching triples
5656
this._getPatternCount(sparqlPattern, function (error, totalCount) {
5757
if (error) emitError(error);
58-
else if (typeof totalCount === 'number') metadataCallback({ totalCount: totalCount });
58+
else if (typeof totalCount === 'number') metadataCallback({ totalCount: totalCount, hasExactCount: true });
5959
});
6060

6161
// Emits an error on the triple stream

lib/views/triplepatternfragments/index.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ <h2>Available datasets</h2>
1414
<dl class="datasets">
1515
<% for (var datasourceName in datasources) {
1616
var datasource = datasources[datasourceName];
17-
if (datasource.role === 'index') continue; %>
17+
if (datasource.role === 'index' || datasource.hide) continue; %>
1818
<dt><a href="<%= datasource.url.replace(/#.*/, '') %>"><%= datasource.title %></a></dt>
1919
<dd><%= datasource.description || ' ' %></dd>
2020
<% } %>

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
"uritemplate": "^0.3.0"
3232
},
3333
"optionalDependencies": {
34-
"hdt": "^1.1.0",
34+
"hdt": "^1.3.0",
3535
"access-log": "^0.3.9"
3636
},
3737
"devDependencies": {

0 commit comments

Comments
 (0)