Skip to content

Commit 9187bd3

Browse files
authored
Merge pull request #764 from bserdar/streams
Streams
2 parents fa600e2 + 4588859 commit 9187bd3

File tree

27 files changed

+803
-645
lines changed

27 files changed

+803
-645
lines changed

core-api/src/main/java/com/redhat/lightblue/Response.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,24 @@ public void setEntityData(JsonNode node) {
210210
}
211211
}
212212

213+
/**
214+
* Adds a document, or array of documents to the entity data
215+
*/
216+
public void addEntityData(JsonNode doc) {
217+
if(doc!=null) {
218+
if(entityData==null) {
219+
entityData=JsonNodeFactory.instance.arrayNode();
220+
}
221+
if(doc instanceof ArrayNode) {
222+
for(Iterator<JsonNode> itr=doc.elements();itr.hasNext();) {
223+
((ArrayNode)entityData).add(itr.next());
224+
}
225+
} else {
226+
((ArrayNode)entityData).add(doc);
227+
}
228+
}
229+
}
230+
213231
/**
214232
* Metadata list for documents in entityData. If there are more
215233
* than one documents, the entitydata and metadata indexes match.

crud/src/main/java/com/redhat/lightblue/assoc/CompositeFindImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.util.Set;
2222
import java.util.HashSet;
2323
import java.util.List;
24+
import java.util.ArrayList;
25+
import java.util.Arrays;
2426

2527
import java.util.concurrent.Executors;
2628

@@ -32,6 +34,7 @@
3234
import com.redhat.lightblue.crud.CRUDFindResponse;
3335
import com.redhat.lightblue.crud.CRUDFindRequest;
3436
import com.redhat.lightblue.crud.DocCtx;
37+
import com.redhat.lightblue.crud.ListDocumentStream;
3538

3639
import com.redhat.lightblue.metadata.CompositeMetadata;
3740

@@ -48,6 +51,8 @@
4851
import com.redhat.lightblue.assoc.ep.StepResult;
4952
import com.redhat.lightblue.assoc.ep.ResultDocument;
5053
import com.redhat.lightblue.assoc.ep.ExecutionContext;
54+
import com.redhat.lightblue.assoc.ep.MakeDocCtx;
55+
import com.redhat.lightblue.assoc.ep.StepResultDocumentStream;
5156

5257
import com.redhat.lightblue.util.JsonDoc;
5358

@@ -140,7 +145,7 @@ public void explain(OperationContext ctx,
140145
initialize(ctx,req);
141146
ExecutionContext executionContext = new ExecutionContext(ctx,null);
142147
JsonDoc doc=new JsonDoc(executionPlan.explain(executionContext));
143-
ctx.addDocument(doc);
148+
ctx.setDocumentStream(new ListDocumentStream<DocCtx>(Arrays.asList(new DocCtx(doc))));
144149
}
145150

146151
@Override
@@ -158,7 +163,7 @@ public CRUDFindResponse find(OperationContext ctx,
158163
Executors.newWorkStealingPool(parallelism));
159164
try {
160165
StepResult<ResultDocument> results = executionPlan.getResults(executionContext);
161-
results.stream().map(d -> new DocCtx(d.getDoc())).forEach(d -> ctx.addDocument(d));
166+
ctx.setDocumentStream(new StepResultDocumentStream(new MakeDocCtx(results)));
162167
response.setSize(executionContext.getMatchCount());
163168
LOGGER.debug("Composite find: end");
164169
return response;

crud/src/main/java/com/redhat/lightblue/assoc/ep/AbstractSearchStep.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@
2020

2121
import java.util.List;
2222
import java.util.ArrayList;
23+
import java.util.Spliterator;
24+
import java.util.Spliterators;
25+
26+
import java.util.stream.Stream;
27+
import java.util.stream.StreamSupport;
2328

2429
import org.slf4j.Logger;
2530
import org.slf4j.LoggerFactory;
@@ -40,6 +45,7 @@
4045

4146
import com.redhat.lightblue.crud.CRUDFindRequest;
4247
import com.redhat.lightblue.crud.CRUDFindResponse;
48+
import com.redhat.lightblue.crud.DocumentStream;
4349
import com.redhat.lightblue.assoc.Conjunct;
4450

4551
/**
@@ -90,11 +96,15 @@ public void setQueries(List<Conjunct> conjuncts) {
9096

9197
@Override
9298
public StepResult<ResultDocument> getResults(ExecutionContext ctx) {
93-
List<ResultDocument> list = getSearchResults(ctx);
94-
return new ListStepResult(list);
99+
return new StepResult<ResultDocument>() {
100+
@Override
101+
public Stream<ResultDocument> stream() {
102+
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(getSearchResults(ctx),Spliterator.IMMUTABLE),false);
103+
}
104+
};
95105
}
96106

97-
protected abstract List<ResultDocument> getSearchResults(ExecutionContext ctx);
107+
protected abstract DocumentStream<ResultDocument> getSearchResults(ExecutionContext ctx);
98108

99109
public OperationContext search(ExecutionContext ctx, CRUDFindRequest req) {
100110
return search(block, ctx, req);
@@ -125,7 +135,7 @@ public OperationContext search(ExecutionBlock block, ExecutionContext ctx, CRUDF
125135
}
126136
LOGGER.debug("execute {}: returning {} documents",
127137
block.getQueryPlanNode().getName(),
128-
searchCtx.getDocuments().size());
138+
response.getSize());
129139
}
130140
return searchCtx;
131141
}

crud/src/main/java/com/redhat/lightblue/assoc/ep/Assemble.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import com.redhat.lightblue.assoc.AnalyzeQuery;
4747

4848
import com.redhat.lightblue.mindex.MemDocIndex;
49-
import com.redhat.lightblue.mindex.GetIndexKeySpec;
5049
import com.redhat.lightblue.mindex.GetIndexLookupSpec;
5150
import com.redhat.lightblue.mindex.KeySpec;
5251
import com.redhat.lightblue.mindex.LookupSpec;
@@ -350,7 +349,7 @@ private void associateDocsWithIndex(CompositeMetadata childMetadata,
350349
LOGGER.debug("Lookup spec:"+ls);
351350
List<ResultDocument> docs=reorder(childDocs,childIndex.find(ls));
352351
ArrayNode destNode=null;
353-
for (ResultDocument childDoc : childDocs) {
352+
for (ResultDocument childDoc : docs) {
354353
if (qeval.evaluate(childDoc.getDoc()).getResult()) {
355354
destNode=ensureDestNodeExists(parentDoc,destNode,destFieldName);
356355
destNode.add(childDoc.getDoc().getRoot());

crud/src/main/java/com/redhat/lightblue/assoc/ep/Copy.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import com.fasterxml.jackson.databind.node.ObjectNode;
2727
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
2828

29+
import com.redhat.lightblue.crud.DocumentStream;
30+
2931
/**
3032
* Copies the results from another step
3133
*/
@@ -59,7 +61,7 @@ public Stream<ResultDocument> stream() {
5961
}
6062

6163
@Override
62-
protected final List<ResultDocument> getSearchResults(ExecutionContext ctx) {
64+
protected final DocumentStream<ResultDocument> getSearchResults(ExecutionContext ctx) {
6365
// This should be called at all
6466
throw new IllegalStateException();
6567
}

crud/src/main/java/com/redhat/lightblue/assoc/ep/JoinSearch.java

Lines changed: 93 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020

2121
import java.util.List;
2222
import java.util.ArrayList;
23+
import java.util.Iterator;
24+
import java.util.NoSuchElementException;
25+
import java.util.function.Consumer;
26+
2327
import com.fasterxml.jackson.databind.JsonNode;
2428
import com.fasterxml.jackson.databind.node.ObjectNode;
2529
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
@@ -32,6 +36,8 @@
3236

3337
import com.redhat.lightblue.mediator.OperationContext;
3438
import com.redhat.lightblue.crud.CRUDFindRequest;
39+
import com.redhat.lightblue.crud.DocumentStream;
40+
import com.redhat.lightblue.crud.DocCtx;
3541

3642
/**
3743
* Performs searches based on the n-tuple of result documents obtained from the
@@ -51,56 +57,105 @@ public JoinSearch(ExecutionBlock block, Source<JoinTuple> source) {
5157
}
5258

5359
@Override
54-
protected List<ResultDocument> getSearchResults(ExecutionContext ctx) {
55-
BatchQueryExecutor executor = new BatchQueryExecutor(256, ctx);
56-
source.getStep().getResults(ctx).stream().forEach(x -> executor.add(x));
57-
return executor.getResults();
60+
protected DocumentStream<ResultDocument> getSearchResults(final ExecutionContext ctx) {
61+
return new BatchQueryIterator(256,ctx);
5862
}
5963

60-
public class BatchQueryExecutor {
64+
/**
65+
* Streaming batch query executor/iterator
66+
*
67+
* When the results are retrieved from the stream, executes a
68+
* batch of queries, computes results, and streams them to the
69+
* caller
70+
*/
71+
private class BatchQueryIterator implements DocumentStream<ResultDocument> {
6172
private final int batchSize;
62-
private List<JoinTuple> jtBatch;
63-
;
64-
private List<QueryExpression> qBatch;
6573
private final ExecutionContext ctx;
66-
private List<ResultDocument> docs = new ArrayList<>();
67-
68-
public BatchQueryExecutor(int batchSize, ExecutionContext ctx) {
69-
this.batchSize = batchSize;
70-
this.jtBatch = new ArrayList<>(batchSize);
71-
this.qBatch = new ArrayList<>(batchSize);
72-
this.ctx = ctx;
74+
private final Iterator<JoinTuple> sourceStream;
75+
private final ArrayList<Consumer<ResultDocument>> listeners=new ArrayList<>();
76+
77+
private DocumentStream<DocCtx> currentIterator;
78+
private boolean done=false; // Are we still iterating, or are we done?
79+
80+
public BatchQueryIterator(int batchSize,ExecutionContext ctx) {
81+
this.batchSize=batchSize;
82+
this.ctx=ctx;
83+
sourceStream=source.getStep().getResults(ctx).stream().iterator();
7384
}
7485

75-
public void add(JoinTuple tuple) {
76-
jtBatch.add(tuple);
77-
qBatch.addAll(Searches.writeQueriesForJoinTuple(tuple, block));
78-
if (qBatch.size() >= batchSize) {
79-
executeBatch();
80-
qBatch = new ArrayList<>(batchSize);
81-
jtBatch = new ArrayList<>(batchSize);
86+
@Override
87+
public boolean hasNext() {
88+
if(!done) {
89+
if(currentIterator==null||!currentIterator.hasNext())
90+
retrieveNextBatch();
91+
if(done)
92+
return false;
93+
else
94+
return currentIterator.hasNext();
95+
} else {
96+
return false;
8297
}
8398
}
84-
85-
public void executeBatch() {
86-
if (!qBatch.isEmpty()) {
87-
QueryExpression q = Searches.combine(NaryLogicalOperator._or, qBatch);
88-
CRUDFindRequest findRequest = new CRUDFindRequest();
89-
findRequest.setQuery(Searches.and(q, query));
90-
findRequest.setProjection(projection);
91-
findRequest.setSort(sort);
92-
findRequest.setFrom(from);
93-
findRequest.setTo(to);
94-
OperationContext opctx = search(ctx, findRequest);
95-
opctx.getDocuments().stream().
96-
forEach(doc -> docs.add(new ResultDocument(block, doc.getOutputDocument())));
99+
100+
@Override
101+
public ResultDocument next() {
102+
if(!done) {
103+
if(currentIterator==null||!currentIterator.hasNext())
104+
retrieveNextBatch();
105+
if(currentIterator!=null) {
106+
ResultDocument doc=new ResultDocument(block,currentIterator.next().getOutputDocument());
107+
for(Consumer<ResultDocument> l:listeners)
108+
l.accept(doc);
109+
return doc;
110+
}
97111
}
112+
throw new NoSuchElementException();
113+
}
114+
115+
@Override
116+
public void close() {
117+
if(currentIterator!=null)
118+
currentIterator.close();
98119
}
99120

100-
public List<ResultDocument> getResults() {
101-
executeBatch();
102-
return docs;
121+
@Override
122+
public void addListener(Consumer<ResultDocument> listener) {
123+
listeners.add(listener);
103124
}
125+
126+
private void retrieveNextBatch() {
127+
do {
128+
int n=0;
129+
ArrayList<QueryExpression> qBatch=new ArrayList<>(batchSize);
130+
if(currentIterator!=null) {
131+
currentIterator.close();
132+
currentIterator=null;
133+
}
134+
while(sourceStream.hasNext()&&n<batchSize) {
135+
JoinTuple t=sourceStream.next();
136+
qBatch.addAll(Searches.writeQueriesForJoinTuple(t, block));
137+
n++;
138+
}
139+
if(!qBatch.isEmpty()) {
140+
QueryExpression q = Searches.combine(NaryLogicalOperator._or, qBatch);
141+
CRUDFindRequest findRequest = new CRUDFindRequest();
142+
findRequest.setQuery(Searches.and(q, query));
143+
findRequest.setProjection(projection);
144+
findRequest.setSort(sort);
145+
findRequest.setFrom(from);
146+
findRequest.setTo(to);
147+
OperationContext opctx = search(ctx, findRequest);
148+
currentIterator=opctx.getDocumentStream();
149+
if(!currentIterator.hasNext()) {
150+
currentIterator.close();
151+
currentIterator=null;
152+
}
153+
} else {
154+
done=true;
155+
}
156+
} while(!done&&currentIterator==null);
157+
}
158+
104159
}
105160

106161
@Override
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.redhat.lightblue.assoc.ep;
2+
3+
import java.util.stream.Stream;
4+
5+
import com.redhat.lightblue.crud.DocCtx;
6+
7+
/**
8+
* Decorator interface that gets a StepResult<ResultDocument> and return StepResult<DocCtx>
9+
*/
10+
public class MakeDocCtx implements StepResult<DocCtx> {
11+
12+
private final StepResult<ResultDocument> result;
13+
14+
public MakeDocCtx(StepResult<ResultDocument> result) {
15+
this.result=result;
16+
}
17+
18+
@Override
19+
public Stream<DocCtx> stream() {
20+
return result.stream().map(d->new DocCtx(d.getDoc()));
21+
}
22+
}

0 commit comments

Comments
 (0)