Skip to content

Commit a07035e

Browse files
author
Burak Serdar
committed
Streams v2: non-rewindable streaming
1 parent b8605aa commit a07035e

File tree

19 files changed

+238
-236
lines changed

19 files changed

+238
-236
lines changed

crud/src/main/java/com/redhat/lightblue/assoc/CompositeFindImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Set;
2222
import java.util.HashSet;
2323
import java.util.List;
24+
import java.util.ArrayList;
2425

2526
import java.util.concurrent.Executors;
2627

@@ -143,7 +144,9 @@ public void explain(OperationContext ctx,
143144
initialize(ctx,req);
144145
ExecutionContext executionContext = new ExecutionContext(ctx,null);
145146
JsonDoc doc=new JsonDoc(executionPlan.explain(executionContext));
146-
ctx.setDocumentStream(new ListDocumentStream(new DocCtx(doc)));
147+
ArrayList<DocCtx> l=new ArrayList<>(1);
148+
l.add(new DocCtx(doc));
149+
ctx.setDocumentStream(new ListDocumentStream<DocCtx>(l));
147150
}
148151

149152
@Override

crud/src/main/java/com/redhat/lightblue/assoc/ep/AbstractSearchStep.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@
2020

2121
import java.util.List;
2222
import java.util.ArrayList;
23+
import java.util.Spliterator;
24+
import java.util.Spliterators;
25+
26+
import java.util.stream.Stream;
27+
import java.util.stream.StreamSupport;
2328

2429
import org.slf4j.Logger;
2530
import org.slf4j.LoggerFactory;
@@ -91,7 +96,12 @@ public void setQueries(List<Conjunct> conjuncts) {
9196

9297
@Override
9398
public StepResult<ResultDocument> getResults(ExecutionContext ctx) {
94-
return new DocumentStreamStepResult<ResultDocument>(getSearchResults(ctx));
99+
return new StepResult<ResultDocument>() {
100+
@Override
101+
public Stream<ResultDocument> stream() {
102+
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(getSearchResults(ctx),Spliterator.IMMUTABLE),false);
103+
}
104+
};
95105
}
96106

97107
protected abstract DocumentStream<ResultDocument> getSearchResults(ExecutionContext ctx);

crud/src/main/java/com/redhat/lightblue/assoc/ep/DocumentStreamStepResult.java

Lines changed: 0 additions & 26 deletions
This file was deleted.

crud/src/main/java/com/redhat/lightblue/assoc/ep/JoinSearch.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,7 @@ public JoinSearch(ExecutionBlock block, Source<JoinTuple> source) {
5757

5858
@Override
5959
protected DocumentStream<ResultDocument> getSearchResults(final ExecutionContext ctx) {
60-
return new DocumentStream<ResultDocument>() {
61-
@Override
62-
public Iterator<ResultDocument> getDocuments() {
63-
return new BatchQueryIterator(256,ctx);
64-
}
65-
};
60+
return new BatchQueryIterator(256,ctx);
6661
}
6762

6863
/**
@@ -72,12 +67,12 @@ public Iterator<ResultDocument> getDocuments() {
7267
* batch of queries, computes results, and streams them to the
7368
* caller
7469
*/
75-
private class BatchQueryIterator implements Iterator<ResultDocument> {
70+
private class BatchQueryIterator implements DocumentStream<ResultDocument> {
7671
private final int batchSize;
7772
private final ExecutionContext ctx;
7873
private final Iterator<JoinTuple> sourceStream;
7974

80-
private Iterator<DocCtx> currentIterator;
75+
private DocumentStream<DocCtx> currentIterator;
8176
private boolean done=false; // Are we still iterating, or are we done?
8277

8378
public BatchQueryIterator(int batchSize,ExecutionContext ctx) {
@@ -110,12 +105,21 @@ public ResultDocument next() {
110105
}
111106
throw new NoSuchElementException();
112107
}
108+
109+
@Override
110+
public void close() {
111+
if(currentIterator!=null)
112+
currentIterator.close();
113+
}
113114

114115
private void retrieveNextBatch() {
115116
do {
116117
int n=0;
117118
ArrayList<QueryExpression> qBatch=new ArrayList<>(batchSize);
118-
currentIterator=null;
119+
if(currentIterator!=null) {
120+
currentIterator.close();
121+
currentIterator=null;
122+
}
119123
while(sourceStream.hasNext()&&n<batchSize) {
120124
JoinTuple t=sourceStream.next();
121125
qBatch.addAll(Searches.writeQueriesForJoinTuple(t, block));
@@ -130,9 +134,11 @@ private void retrieveNextBatch() {
130134
findRequest.setFrom(from);
131135
findRequest.setTo(to);
132136
OperationContext opctx = search(ctx, findRequest);
133-
currentIterator=opctx.getDocumentStream().getDocuments();
134-
if(!currentIterator.hasNext())
137+
currentIterator=opctx.getDocumentStream();
138+
if(!currentIterator.hasNext()) {
139+
currentIterator.close();
135140
currentIterator=null;
141+
}
136142
} else {
137143
done=true;
138144
}

crud/src/main/java/com/redhat/lightblue/assoc/ep/Search.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import com.redhat.lightblue.crud.CRUDFindRequest;
3939
import com.redhat.lightblue.crud.DocCtx;
4040
import com.redhat.lightblue.crud.DocumentStream;
41-
import com.redhat.lightblue.crud.MapDocumentStream;
4241
import com.redhat.lightblue.crud.ListDocumentStream;
4342

4443
import com.redhat.lightblue.util.JsonDoc;
@@ -57,7 +56,7 @@ public Search(ExecutionBlock block) {
5756
}
5857

5958
protected DocumentStream<ResultDocument> postProcess(OperationContext result, ExecutionContext ctx) {
60-
return new MapDocumentStream<DocCtx,ResultDocument>(result.getDocumentStream(),x->new ResultDocument(block,x.getOutputDocument()));
59+
return DocumentStream.map(result.getDocumentStream(),x->new ResultDocument(block,x.getOutputDocument()));
6160
}
6261

6362
public OperationContext search(ExecutionContext ctx) {
@@ -96,13 +95,14 @@ public JsonNode explain(ExecutionContext ctx) {
9695
OperationContext searchCtx = ctx.getOperationContext().
9796
getDerivedOperationContext(block.getMetadata().getName(), req);
9897
new SimpleFindImpl(block.getMetadata(), searchCtx.getFactory()).explain(searchCtx,req);
99-
Iterator<DocCtx> itr=searchCtx.getDocumentStream().getDocuments();
98+
DocumentStream<DocCtx> itr=searchCtx.getDocumentStream();
10099
List<JsonDoc> docs=new ArrayList<>();
101100
while(itr.hasNext()) {
102101
DocCtx doc=itr.next();
103102
if(!doc.hasErrors())
104103
docs.add(doc.getOutputDocument());
105104
}
105+
itr.close();
106106
if(!docs.isEmpty()) {
107107
if(docs.size()==1) {
108108
node.set("implementation",docs.get(0).getRoot());

crud/src/main/java/com/redhat/lightblue/assoc/ep/StepResultDocumentStream.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,22 @@
1111
*/
1212
public class StepResultDocumentStream implements DocumentStream<DocCtx> {
1313

14-
private final StepResult<DocCtx> result;
14+
private final Iterator<DocCtx> itr;
1515

1616
public StepResultDocumentStream(StepResult<DocCtx> result) {
17-
this.result=result;
17+
this.itr=result.stream().iterator();
1818
}
1919

2020
@Override
21-
public Iterator<DocCtx> getDocuments() {
22-
return result.stream().iterator();
21+
public boolean hasNext() {
22+
return itr.hasNext();
2323
}
24+
25+
@Override
26+
public DocCtx next() {
27+
return itr.next();
28+
}
29+
30+
@Override
31+
public void close() {}
2432
}

crud/src/main/java/com/redhat/lightblue/crud/CRUDOperationContext.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ public CRUDOperationContext(CRUDOperation op,
7171
this.CRUDOperation = op;
7272
this.entityName = entityName;
7373
this.factory = f;
74-
if(docs!=null)
75-
documents=docs.stream().map(x->new DocCtx(x)).collect(Collectors.toList());
74+
if(docs!=null) {
75+
setInputDocuments(docs.stream().map(x->new DocCtx(x)).collect(Collectors.toList()));
76+
}
7677
this.hookManager = new HookManager(factory.getHookResolver(), factory.getNodeFactory());
7778
this.callerRoles = new HashSet<>();
7879
this.executionOptions = eo;
@@ -88,8 +89,9 @@ public CRUDOperationContext(CRUDOperation op,
8889
this.CRUDOperation = op;
8990
this.entityName = entityName;
9091
this.factory = f;
91-
if(docs!=null)
92-
documents=docs.stream().map(x->new DocCtx(x)).collect(Collectors.toList());
92+
if(docs!=null) {
93+
setInputDocuments(docs.stream().map(x->new DocCtx(x)).collect(Collectors.toList()));
94+
}
9395
this.callerRoles = callerRoles;
9496
this.hookManager = hookManager;
9597
this.executionOptions = eo;
@@ -109,7 +111,7 @@ public CRUDOperationContext(CRUDOperation op,
109111
this.CRUDOperation = op;
110112
this.entityName = entityName;
111113
this.factory = f;
112-
this.documents = docs;
114+
setInputDocuments(docs);
113115
this.callerRoles = callerRoles;
114116
this.hookManager = hookManager;
115117
this.executionOptions = eo;
@@ -183,26 +185,22 @@ public void setDocumentStream(DocumentStream<DocCtx> stream) {
183185
* to iterate through those documents.
184186
*/
185187
public DocumentStream<DocCtx> getDocumentStream() {
186-
return documentStream==null?new ListDocumentStream<DocCtx>(documents==null?new ArrayList<DocCtx>():documents):documentStream;
188+
return documentStream;
187189
}
188190

189191
/**
190-
* Returns the list of documents in the context.
191-
*
192-
* If the list of documents is not set, but the document stream is set, this call constructs a list from the
193-
* document stream and returns that
192+
* Returns the list of input documents in the context.
194193
*/
195-
public List<DocCtx> getDocuments() {
196-
if(documents==null&&documentStream!=null) {
197-
documents=new ArrayList<DocCtx>();
198-
for(Iterator<DocCtx> itr=documentStream.getDocuments();itr.hasNext();)
199-
documents.add(itr.next());
200-
}
194+
public List<DocCtx> getInputDocuments() {
201195
return documents;
202196
}
203197

204-
public void setDocuments(List<DocCtx> docs) {
198+
public void setInputDocuments(List<DocCtx> docs) {
205199
documents = docs;
200+
if(documents!=null) {
201+
// Set the documentStream to input documents. A later call to setDocumentStream may set the output stream
202+
documentStream=new ListDocumentStream(documents);
203+
}
206204
}
207205

208206
/**

crud/src/main/java/com/redhat/lightblue/crud/DocumentStream.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,32 @@
1919
package com.redhat.lightblue.crud;
2020

2121
import java.util.Iterator;
22+
import java.util.function.Function;
2223

2324
/**
2425
* This interface is used to stream documents. A call to getDocuments
2526
* should return an iterator of the document resultset.
2627
*/
27-
public interface DocumentStream<T> {
28-
Iterator<T> getDocuments();
28+
public interface DocumentStream<T> extends Iterator<T>{
29+
/**
30+
* Close the document stream
31+
*/
32+
void close();
33+
34+
public static <S,D> DocumentStream<D> map(final DocumentStream<S> source,final Function<S,D> map) {
35+
return new DocumentStream<D>() {
36+
@Override
37+
public boolean hasNext() {
38+
return source.hasNext();
39+
}
40+
@Override
41+
public void close() {
42+
source.close();
43+
}
44+
@Override
45+
public D next() {
46+
return map.apply(source.next());
47+
}
48+
};
49+
}
2950
}

crud/src/main/java/com/redhat/lightblue/crud/ListDocumentStream.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,39 @@
1818
*/
1919
package com.redhat.lightblue.crud;
2020

21-
import java.util.ArrayList;
2221
import java.util.Iterator;
2322
import java.util.List;
2423

2524
/**
26-
* Default trivial implementation of document stream using a list
25+
* Default trivial implementation of document stream that uses a list to hold its elements
2726
*/
2827
public class ListDocumentStream<T> implements DocumentStream<T> {
2928

3029
private final List<T> documents;
31-
32-
public ListDocumentStream(T item) {
33-
this.documents=new ArrayList<T>(1);
34-
this.documents.add(item);
35-
}
30+
private final Iterator<T> itr;
3631

3732
public ListDocumentStream(List<T> list) {
3833
this.documents=list;
34+
this.itr=documents.iterator();
35+
}
36+
37+
/**
38+
* Return the underlying list
39+
*/
40+
public List<T> getDocuments() {
41+
return documents;
3942
}
4043

4144
@Override
42-
public Iterator<T> getDocuments() {
43-
return documents.iterator();
45+
public boolean hasNext() {
46+
return itr.hasNext();
4447
}
48+
49+
@Override
50+
public T next() {
51+
return itr.next();
52+
}
53+
54+
@Override
55+
public void close() {}
4556
}

0 commit comments

Comments
 (0)