Skip to content

Commit 1fc7895

Browse files
author
Burak Serdar
committed
Add rewindable stream as a separate interface
1 parent 286f35b commit 1fc7895

File tree

6 files changed

+102
-39
lines changed

6 files changed

+102
-39
lines changed

crud/src/main/java/com/redhat/lightblue/assoc/CompositeFindImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.HashSet;
2323
import java.util.List;
2424
import java.util.ArrayList;
25+
import java.util.Arrays;
2526

2627
import java.util.concurrent.Executors;
2728

@@ -144,9 +145,7 @@ public void explain(OperationContext ctx,
144145
initialize(ctx,req);
145146
ExecutionContext executionContext = new ExecutionContext(ctx,null);
146147
JsonDoc doc=new JsonDoc(executionPlan.explain(executionContext));
147-
ArrayList<DocCtx> l=new ArrayList<>(1);
148-
l.add(new DocCtx(doc));
149-
ctx.setDocumentStream(new ListDocumentStream<DocCtx>(l));
148+
ctx.setDocumentStream(new ListDocumentStream<DocCtx>(Arrays.asList(new DocCtx(doc))));
150149
}
151150

152151
@Override

crud/src/main/java/com/redhat/lightblue/crud/DocumentStream.java

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,24 +38,17 @@ public interface DocumentStream<T> extends Iterator<T>{
3838
*/
3939
void tee(Consumer<T> dest);
4040

41-
/**
42-
* returns true if rewind works
43-
*/
44-
default boolean canRewind() { return false; }
45-
46-
/**
47-
* returns a new stream that starts the same resultset from the beginning. Only works if canRewind() is true
48-
*/
49-
default DocumentStream<T> rewind() { throw new UnsupportedOperationException(); }
50-
5141
public static <S,D> DocumentStream<D> map(final DocumentStream<S> source,final Function<S,D> map) {
52-
return new DocumentStreamMapper<S,D>(source,map);
42+
if(source instanceof RewindableDocumentStream)
43+
return new RewindableDocumentStream.RewindableDocumentStreamMapper<S,D>((RewindableDocumentStream<S>)source,map);
44+
else
45+
return new DocumentStreamMapper<S,D>(source,map);
5346
}
5447

55-
static final class DocumentStreamMapper<S,D> implements DocumentStream<D> {
56-
private final ArrayList<Consumer<D>> listeners=new ArrayList<>();
57-
private final DocumentStream<S> source;
58-
private final Function<S,D> map;
48+
static class DocumentStreamMapper<S,D> implements DocumentStream<D> {
49+
final ArrayList<Consumer<D>> listeners=new ArrayList<>();
50+
final DocumentStream<S> source;
51+
final Function<S,D> map;
5952
DocumentStreamMapper(DocumentStream<S> source,Function<S,D> map) {
6053
this.source=source;
6154
this.map=map;
@@ -76,14 +69,6 @@ public D next() {
7669
return d;
7770
}
7871
@Override
79-
public boolean canRewind() {
80-
return source.canRewind();
81-
}
82-
@Override
83-
public DocumentStream<D> rewind() {
84-
return new DocumentStreamMapper<S,D>(source.rewind(),map);
85-
}
86-
@Override
8772
public void tee(Consumer<D> t) {
8873
listeners.add(t);
8974
}

crud/src/main/java/com/redhat/lightblue/crud/ListDocumentStream.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
/**
2727
* Default trivial implementation of document stream that uses a list to hold its elements
2828
*/
29-
public class ListDocumentStream<T> implements DocumentStream<T> {
29+
public class ListDocumentStream<T> implements RewindableDocumentStream<T> {
3030

3131
private final List<T> documents;
3232
private Iterator<T> itr;
@@ -37,12 +37,7 @@ public ListDocumentStream(List<T> list) {
3737
}
3838

3939
@Override
40-
public boolean canRewind() {
41-
return true;
42-
}
43-
44-
@Override
45-
public DocumentStream<T> rewind() {
40+
public RewindableDocumentStream<T> rewind() {
4641
return new ListDocumentStream<T>(documents);
4742
}
4843

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
Copyright 2013 Red Hat, Inc. and/or its affiliates.
3+
4+
This file is part of lightblue.
5+
6+
This program is free software: you can redistribute it and/or modify
7+
it under the terms of the GNU General Public License as published by
8+
the Free Software Foundation, either version 3 of the License, or
9+
(at your option) any later version.
10+
11+
This program is distributed in the hope that it will be useful,
12+
but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
GNU General Public License for more details.
15+
16+
You should have received a copy of the GNU General Public License
17+
along with this program. If not, see <http://www.gnu.org/licenses/>.
18+
*/
19+
package com.redhat.lightblue.crud;
20+
21+
import java.util.Iterator;
22+
import java.util.ArrayList;
23+
import java.util.function.Function;
24+
import java.util.function.Consumer;
25+
26+
/**
27+
* A document stream than can be rewound
28+
*/
29+
public interface RewindableDocumentStream<T> extends DocumentStream<T> {
30+
31+
/**
32+
* returns a new stream that starts the same resultset from the beginning. Only works if canRewind() is true
33+
*/
34+
RewindableDocumentStream<T> rewind();
35+
36+
static class RewindableDocumentStreamMapper<S,D> extends DocumentStream.DocumentStreamMapper<S,D>
37+
implements RewindableDocumentStream<D> {
38+
RewindableDocumentStreamMapper(RewindableDocumentStream<S> source,Function<S,D> map) {
39+
super(source,map);
40+
}
41+
@Override
42+
public RewindableDocumentStream<D> rewind() {
43+
return new RewindableDocumentStreamMapper<S,D>(((RewindableDocumentStream<S>)source).rewind(),map);
44+
}
45+
}
46+
47+
}

crud/src/main/java/com/redhat/lightblue/hooks/HookManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.redhat.lightblue.crud.CrudConstants;
3434
import com.redhat.lightblue.crud.DocCtx;
3535
import com.redhat.lightblue.crud.DocumentStream;
36+
import com.redhat.lightblue.crud.RewindableDocumentStream;
3637
import com.redhat.lightblue.eval.Projector;
3738
import com.redhat.lightblue.mediator.OperationContext;
3839
import com.redhat.lightblue.metadata.EntityMetadata;
@@ -272,8 +273,8 @@ private void queueHooks(CRUDOperationContext ctx, boolean mediatorHooks) {
272273
}
273274

274275
DocumentStream<DocCtx> documents=ctx.getDocumentStream();
275-
if(documents.canRewind()) {
276-
DocumentStream<DocCtx> stream=documents.rewind();
276+
if(documents instanceof RewindableDocumentStream) {
277+
RewindableDocumentStream<DocCtx> stream=((RewindableDocumentStream<DocCtx>)documents).rewind();
277278
while(stream.hasNext()) {
278279
addDocument(hookList,stream.next());
279280
}

crud/src/test/java/com/redhat/lightblue/hooks/HookManagerTest.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.util.List;
2222
import java.util.ArrayList;
2323
import java.util.Map;
24+
import java.util.Iterator;
25+
import java.util.function.Consumer;
2426

2527
import javax.management.RuntimeErrorException;
2628

@@ -52,6 +54,7 @@
5254
import com.redhat.lightblue.crud.Factory;
5355
import com.redhat.lightblue.crud.DocCtx;
5456
import com.redhat.lightblue.crud.ListDocumentStream;
57+
import com.redhat.lightblue.crud.DocumentStream;
5558

5659
import com.redhat.lightblue.util.test.AbstractJsonNodeTest;
5760
import com.redhat.lightblue.util.JsonDoc;
@@ -332,15 +335,48 @@ public void crudFindQueueTest() throws Exception {
332335
}
333336

334337

338+
private static class NonRewindableDocumentStream<T> implements DocumentStream<T> {
339+
340+
private final List<T> documents;
341+
private Iterator<T> itr;
342+
private final ArrayList<Consumer<T>> listeners=new ArrayList<>();
343+
344+
public NonRewindableDocumentStream(List<T> list) {
345+
this.documents=list;
346+
}
347+
348+
@Override
349+
public boolean hasNext() {
350+
if(itr==null)
351+
itr=documents.iterator();
352+
return itr.hasNext();
353+
}
354+
355+
@Override
356+
public T next() {
357+
if(itr==null)
358+
itr=documents.iterator();
359+
T doc=itr.next();
360+
for(Consumer<T> c:listeners)
361+
c.accept(doc);
362+
return doc;
363+
}
364+
365+
@Override
366+
public void close() {}
367+
368+
@Override
369+
public void tee(Consumer<T> listener) {
370+
listeners.add(listener);
371+
}
372+
}
373+
335374
@Test
336375
public void crudFindQueueTest_deferredProcessing() throws Exception {
337376
HookManager hooks = new HookManager(resolver, nodeFactory);
338377
TestOperationContext ctx = setupContext(CRUDOperation.FIND);
339378

340-
ctx.setDocumentStream(new ListDocumentStream<DocCtx>(ctx.getInputDocuments()) {
341-
@Override
342-
public boolean canRewind() {return false;}
343-
});
379+
ctx.setDocumentStream(new NonRewindableDocumentStream<DocCtx>(ctx.getInputDocuments()));
344380

345381
// hook2 should be called
346382
hooks.queueHooks(ctx);

0 commit comments

Comments
 (0)