Skip to content

Commit b8605aa

Browse files
author
Burak Serdar
committed
Stream results
1 parent e7863cb commit b8605aa

File tree

22 files changed

+503
-492
lines changed

22 files changed

+503
-492
lines changed

core-api/src/main/java/com/redhat/lightblue/Response.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,24 @@ public void setEntityData(JsonNode node) {
210210
}
211211
}
212212

213+
/**
214+
* Adds a document, or array of documents to the entity data
215+
*/
216+
public void addEntityData(JsonNode doc) {
217+
if(doc!=null) {
218+
if(entityData==null) {
219+
entityData=JsonNodeFactory.instance.arrayNode();
220+
}
221+
if(doc instanceof ArrayNode) {
222+
for(Iterator<JsonNode> itr=doc.elements();itr.hasNext();) {
223+
((ArrayNode)entityData).add(itr.next());
224+
}
225+
} else {
226+
((ArrayNode)entityData).add(doc);
227+
}
228+
}
229+
}
230+
213231
/**
214232
* Metadata list for documents in entityData. If there are more
215233
* than one documents, the entitydata and metadata indexes match.

crud/src/main/java/com/redhat/lightblue/assoc/CompositeFindImpl.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.redhat.lightblue.crud.CRUDFindResponse;
3333
import com.redhat.lightblue.crud.CRUDFindRequest;
3434
import com.redhat.lightblue.crud.DocCtx;
35+
import com.redhat.lightblue.crud.ListDocumentStream;
3536

3637
import com.redhat.lightblue.metadata.CompositeMetadata;
3738

@@ -48,6 +49,8 @@
4849
import com.redhat.lightblue.assoc.ep.StepResult;
4950
import com.redhat.lightblue.assoc.ep.ResultDocument;
5051
import com.redhat.lightblue.assoc.ep.ExecutionContext;
52+
import com.redhat.lightblue.assoc.ep.MakeDocCtx;
53+
import com.redhat.lightblue.assoc.ep.StepResultDocumentStream;
5154

5255
import com.redhat.lightblue.util.JsonDoc;
5356

@@ -140,7 +143,7 @@ public void explain(OperationContext ctx,
140143
initialize(ctx,req);
141144
ExecutionContext executionContext = new ExecutionContext(ctx,null);
142145
JsonDoc doc=new JsonDoc(executionPlan.explain(executionContext));
143-
ctx.addDocument(doc);
146+
ctx.setDocumentStream(new ListDocumentStream(new DocCtx(doc)));
144147
}
145148

146149
@Override
@@ -158,7 +161,7 @@ public CRUDFindResponse find(OperationContext ctx,
158161
Executors.newWorkStealingPool(parallelism));
159162
try {
160163
StepResult<ResultDocument> results = executionPlan.getResults(executionContext);
161-
results.stream().map(d -> new DocCtx(d.getDoc())).forEach(d -> ctx.addDocument(d));
164+
ctx.setDocumentStream(new StepResultDocumentStream(new MakeDocCtx(results)));
162165
response.setSize(executionContext.getMatchCount());
163166
LOGGER.debug("Composite find: end");
164167
return response;

crud/src/main/java/com/redhat/lightblue/assoc/ep/AbstractSearchStep.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import com.redhat.lightblue.crud.CRUDFindRequest;
4242
import com.redhat.lightblue.crud.CRUDFindResponse;
43+
import com.redhat.lightblue.crud.DocumentStream;
4344
import com.redhat.lightblue.assoc.Conjunct;
4445

4546
/**
@@ -90,11 +91,10 @@ public void setQueries(List<Conjunct> conjuncts) {
9091

9192
@Override
9293
public StepResult<ResultDocument> getResults(ExecutionContext ctx) {
93-
List<ResultDocument> list = getSearchResults(ctx);
94-
return new ListStepResult(list);
94+
return new DocumentStreamStepResult<ResultDocument>(getSearchResults(ctx));
9595
}
9696

97-
protected abstract List<ResultDocument> getSearchResults(ExecutionContext ctx);
97+
protected abstract DocumentStream<ResultDocument> getSearchResults(ExecutionContext ctx);
9898

9999
public OperationContext search(ExecutionContext ctx, CRUDFindRequest req) {
100100
return search(block, ctx, req);
@@ -125,7 +125,7 @@ public OperationContext search(ExecutionBlock block, ExecutionContext ctx, CRUDF
125125
}
126126
LOGGER.debug("execute {}: returning {} documents",
127127
block.getQueryPlanNode().getName(),
128-
searchCtx.getDocuments().size());
128+
response.getSize());
129129
}
130130
return searchCtx;
131131
}

crud/src/main/java/com/redhat/lightblue/assoc/ep/Assemble.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import com.redhat.lightblue.assoc.AnalyzeQuery;
4747

4848
import com.redhat.lightblue.mindex.MemDocIndex;
49-
import com.redhat.lightblue.mindex.GetIndexKeySpec;
5049
import com.redhat.lightblue.mindex.GetIndexLookupSpec;
5150
import com.redhat.lightblue.mindex.KeySpec;
5251
import com.redhat.lightblue.mindex.LookupSpec;
@@ -350,7 +349,7 @@ private void associateDocsWithIndex(CompositeMetadata childMetadata,
350349
LOGGER.debug("Lookup spec:"+ls);
351350
List<ResultDocument> docs=reorder(childDocs,childIndex.find(ls));
352351
ArrayNode destNode=null;
353-
for (ResultDocument childDoc : childDocs) {
352+
for (ResultDocument childDoc : docs) {
354353
if (qeval.evaluate(childDoc.getDoc()).getResult()) {
355354
destNode=ensureDestNodeExists(parentDoc,destNode,destFieldName);
356355
destNode.add(childDoc.getDoc().getRoot());

crud/src/main/java/com/redhat/lightblue/assoc/ep/Copy.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import com.fasterxml.jackson.databind.node.ObjectNode;
2727
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
2828

29+
import com.redhat.lightblue.crud.DocumentStream;
30+
2931
/**
3032
* Copies the results from another step
3133
*/
@@ -59,7 +61,7 @@ public Stream<ResultDocument> stream() {
5961
}
6062

6163
@Override
62-
protected final List<ResultDocument> getSearchResults(ExecutionContext ctx) {
64+
protected final DocumentStream<ResultDocument> getSearchResults(ExecutionContext ctx) {
6365
// This should be called at all
6466
throw new IllegalStateException();
6567
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.redhat.lightblue.assoc.ep;
2+
3+
import java.util.List;
4+
import java.util.Spliterators;
5+
import java.util.Spliterator;
6+
import java.util.stream.Stream;
7+
import java.util.stream.StreamSupport;
8+
9+
import com.redhat.lightblue.crud.DocumentStream;
10+
11+
/**
12+
* A step result backed by a document stream
13+
*/
14+
public class DocumentStreamStepResult<T> implements StepResult<T> {
15+
16+
protected DocumentStream<T> stream;
17+
18+
public DocumentStreamStepResult(DocumentStream<T> stream) {
19+
this.stream=stream;
20+
}
21+
22+
@Override
23+
public Stream<T> stream() {
24+
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(stream.getDocuments(),Spliterator.IMMUTABLE),false);
25+
}
26+
}

crud/src/main/java/com/redhat/lightblue/assoc/ep/JoinSearch.java

Lines changed: 78 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020

2121
import java.util.List;
2222
import java.util.ArrayList;
23+
import java.util.Iterator;
24+
import java.util.NoSuchElementException;
25+
2326
import com.fasterxml.jackson.databind.JsonNode;
2427
import com.fasterxml.jackson.databind.node.ObjectNode;
2528
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
@@ -32,6 +35,8 @@
3235

3336
import com.redhat.lightblue.mediator.OperationContext;
3437
import com.redhat.lightblue.crud.CRUDFindRequest;
38+
import com.redhat.lightblue.crud.DocumentStream;
39+
import com.redhat.lightblue.crud.DocCtx;
3540

3641
/**
3742
* Performs searches based on the n-tuple of result documents obtained from the
@@ -51,56 +56,89 @@ public JoinSearch(ExecutionBlock block, Source<JoinTuple> source) {
5156
}
5257

5358
@Override
54-
protected List<ResultDocument> getSearchResults(ExecutionContext ctx) {
55-
BatchQueryExecutor executor = new BatchQueryExecutor(256, ctx);
56-
source.getStep().getResults(ctx).stream().forEach(x -> executor.add(x));
57-
return executor.getResults();
59+
protected DocumentStream<ResultDocument> getSearchResults(final ExecutionContext ctx) {
60+
return new DocumentStream<ResultDocument>() {
61+
@Override
62+
public Iterator<ResultDocument> getDocuments() {
63+
return new BatchQueryIterator(256,ctx);
64+
}
65+
};
5866
}
5967

60-
public class BatchQueryExecutor {
68+
/**
69+
* Streaming batch query executor/iterator
70+
*
71+
* When the results are retrieved from the stream, executes a
72+
* batch of queries, computes results, and streams them to the
73+
* caller
74+
*/
75+
private class BatchQueryIterator implements Iterator<ResultDocument> {
6176
private final int batchSize;
62-
private List<JoinTuple> jtBatch;
63-
;
64-
private List<QueryExpression> qBatch;
6577
private final ExecutionContext ctx;
66-
private List<ResultDocument> docs = new ArrayList<>();
67-
68-
public BatchQueryExecutor(int batchSize, ExecutionContext ctx) {
69-
this.batchSize = batchSize;
70-
this.jtBatch = new ArrayList<>(batchSize);
71-
this.qBatch = new ArrayList<>(batchSize);
72-
this.ctx = ctx;
78+
private final Iterator<JoinTuple> sourceStream;
79+
80+
private Iterator<DocCtx> currentIterator;
81+
private boolean done=false; // Are we still iterating, or are we done?
82+
83+
public BatchQueryIterator(int batchSize,ExecutionContext ctx) {
84+
this.batchSize=batchSize;
85+
this.ctx=ctx;
86+
sourceStream=source.getStep().getResults(ctx).stream().iterator();
7387
}
7488

75-
public void add(JoinTuple tuple) {
76-
jtBatch.add(tuple);
77-
qBatch.addAll(Searches.writeQueriesForJoinTuple(tuple, block));
78-
if (qBatch.size() >= batchSize) {
79-
executeBatch();
80-
qBatch = new ArrayList<>(batchSize);
81-
jtBatch = new ArrayList<>(batchSize);
89+
@Override
90+
public boolean hasNext() {
91+
if(!done) {
92+
if(currentIterator==null||!currentIterator.hasNext())
93+
retrieveNextBatch();
94+
if(done)
95+
return false;
96+
else
97+
return currentIterator.hasNext();
98+
} else {
99+
return false;
82100
}
83101
}
84-
85-
public void executeBatch() {
86-
if (!qBatch.isEmpty()) {
87-
QueryExpression q = Searches.combine(NaryLogicalOperator._or, qBatch);
88-
CRUDFindRequest findRequest = new CRUDFindRequest();
89-
findRequest.setQuery(Searches.and(q, query));
90-
findRequest.setProjection(projection);
91-
findRequest.setSort(sort);
92-
findRequest.setFrom(from);
93-
findRequest.setTo(to);
94-
OperationContext opctx = search(ctx, findRequest);
95-
opctx.getDocuments().stream().
96-
forEach(doc -> docs.add(new ResultDocument(block, doc.getOutputDocument())));
102+
103+
@Override
104+
public ResultDocument next() {
105+
if(!done) {
106+
if(currentIterator==null||!currentIterator.hasNext())
107+
retrieveNextBatch();
108+
if(currentIterator!=null)
109+
return new ResultDocument(block,currentIterator.next().getOutputDocument());
97110
}
111+
throw new NoSuchElementException();
98112
}
99-
100-
public List<ResultDocument> getResults() {
101-
executeBatch();
102-
return docs;
103-
}
113+
114+
private void retrieveNextBatch() {
115+
do {
116+
int n=0;
117+
ArrayList<QueryExpression> qBatch=new ArrayList<>(batchSize);
118+
currentIterator=null;
119+
while(sourceStream.hasNext()&&n<batchSize) {
120+
JoinTuple t=sourceStream.next();
121+
qBatch.addAll(Searches.writeQueriesForJoinTuple(t, block));
122+
n++;
123+
}
124+
if(!qBatch.isEmpty()) {
125+
QueryExpression q = Searches.combine(NaryLogicalOperator._or, qBatch);
126+
CRUDFindRequest findRequest = new CRUDFindRequest();
127+
findRequest.setQuery(Searches.and(q, query));
128+
findRequest.setProjection(projection);
129+
findRequest.setSort(sort);
130+
findRequest.setFrom(from);
131+
findRequest.setTo(to);
132+
OperationContext opctx = search(ctx, findRequest);
133+
currentIterator=opctx.getDocumentStream().getDocuments();
134+
if(!currentIterator.hasNext())
135+
currentIterator=null;
136+
} else {
137+
done=true;
138+
}
139+
} while(!done&&currentIterator==null);
140+
}
141+
104142
}
105143

106144
@Override
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.redhat.lightblue.assoc.ep;
2+
3+
import java.util.stream.Stream;
4+
5+
import com.redhat.lightblue.crud.DocCtx;
6+
7+
/**
8+
* Decorator interface that gets a StepResult<ResultDocument> and return StepResult<DocCtx>
9+
*/
10+
public class MakeDocCtx implements StepResult<DocCtx> {
11+
12+
private final StepResult<ResultDocument> result;
13+
14+
public MakeDocCtx(StepResult<ResultDocument> result) {
15+
this.result=result;
16+
}
17+
18+
@Override
19+
public Stream<DocCtx> stream() {
20+
return result.stream().map(d->new DocCtx(d.getDoc()));
21+
}
22+
}

crud/src/main/java/com/redhat/lightblue/assoc/ep/Search.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.List;
2222
import java.util.ArrayList;
23+
import java.util.Iterator;
2324

2425
import java.util.stream.Collectors;
2526

@@ -36,6 +37,9 @@
3637

3738
import com.redhat.lightblue.crud.CRUDFindRequest;
3839
import com.redhat.lightblue.crud.DocCtx;
40+
import com.redhat.lightblue.crud.DocumentStream;
41+
import com.redhat.lightblue.crud.MapDocumentStream;
42+
import com.redhat.lightblue.crud.ListDocumentStream;
3943

4044
import com.redhat.lightblue.util.JsonDoc;
4145

@@ -51,11 +55,9 @@ public class Search extends AbstractSearchStep {
5155
public Search(ExecutionBlock block) {
5256
super(block);
5357
}
54-
55-
protected List<ResultDocument> postProcess(OperationContext result, ExecutionContext ctx) {
56-
return result.getDocuments().stream().
57-
map(doc -> new ResultDocument(block, doc.getOutputDocument())).
58-
collect(Collectors.toList());
58+
59+
protected DocumentStream<ResultDocument> postProcess(OperationContext result, ExecutionContext ctx) {
60+
return new MapDocumentStream<DocCtx,ResultDocument>(result.getDocumentStream(),x->new ResultDocument(block,x.getOutputDocument()));
5961
}
6062

6163
public OperationContext search(ExecutionContext ctx) {
@@ -78,12 +80,12 @@ protected CRUDFindRequest buildFindRequest(ExecutionContext ctx) {
7880
}
7981

8082
@Override
81-
protected List<ResultDocument> getSearchResults(ExecutionContext ctx) {
83+
protected DocumentStream<ResultDocument> getSearchResults(ExecutionContext ctx) {
8284
OperationContext result = search(ctx);
8385
if (result != null) {
8486
return postProcess(result, ctx);
8587
} else {
86-
return new ArrayList<>();
88+
return new ListDocumentStream<ResultDocument>(new ArrayList<>());
8789
}
8890
}
8991

@@ -94,8 +96,14 @@ public JsonNode explain(ExecutionContext ctx) {
9496
OperationContext searchCtx = ctx.getOperationContext().
9597
getDerivedOperationContext(block.getMetadata().getName(), req);
9698
new SimpleFindImpl(block.getMetadata(), searchCtx.getFactory()).explain(searchCtx,req);
97-
List<JsonDoc> docs = searchCtx.getOutputDocumentsWithoutErrors();
98-
if(docs!=null&&!docs.isEmpty()) {
99+
Iterator<DocCtx> itr=searchCtx.getDocumentStream().getDocuments();
100+
List<JsonDoc> docs=new ArrayList<>();
101+
while(itr.hasNext()) {
102+
DocCtx doc=itr.next();
103+
if(!doc.hasErrors())
104+
docs.add(doc.getOutputDocument());
105+
}
106+
if(!docs.isEmpty()) {
99107
if(docs.size()==1) {
100108
node.set("implementation",docs.get(0).getRoot());
101109
} else {

0 commit comments

Comments
 (0)