Skip to content

Commit df0c4c0

Browse files
author
Burak Serdar
committed
Add listener support to stream
1 parent 5e57e62 commit df0c4c0

File tree

6 files changed

+86
-22
lines changed

6 files changed

+86
-22
lines changed

crud/src/main/java/com/redhat/lightblue/assoc/ep/JoinSearch.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.ArrayList;
2323
import java.util.Iterator;
2424
import java.util.NoSuchElementException;
25+
import java.util.function.Consumer;
2526

2627
import com.fasterxml.jackson.databind.JsonNode;
2728
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -71,6 +72,7 @@ private class BatchQueryIterator implements DocumentStream<ResultDocument> {
7172
private final int batchSize;
7273
private final ExecutionContext ctx;
7374
private final Iterator<JoinTuple> sourceStream;
75+
private final ArrayList<Consumer<ResultDocument>> listeners=new ArrayList<>();
7476

7577
private DocumentStream<DocCtx> currentIterator;
7678
private boolean done=false; // Are we still iterating, or are we done?
@@ -100,8 +102,12 @@ public ResultDocument next() {
100102
if(!done) {
101103
if(currentIterator==null||!currentIterator.hasNext())
102104
retrieveNextBatch();
103-
if(currentIterator!=null)
104-
return new ResultDocument(block,currentIterator.next().getOutputDocument());
105+
if(currentIterator!=null) {
106+
ResultDocument doc=new ResultDocument(block,currentIterator.next().getOutputDocument());
107+
for(Consumer<ResultDocument> l:listeners)
108+
l.accept(doc);
109+
return doc;
110+
}
105111
}
106112
throw new NoSuchElementException();
107113
}
@@ -110,7 +116,12 @@ public ResultDocument next() {
110116
public void close() {
111117
if(currentIterator!=null)
112118
currentIterator.close();
113-
}
119+
}
120+
121+
@Override
122+
public void tee(Consumer<ResultDocument> listener) {
123+
listeners.add(listener);
124+
}
114125

115126
private void retrieveNextBatch() {
116127
do {

crud/src/main/java/com/redhat/lightblue/assoc/ep/StepResultDocumentStream.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.redhat.lightblue.assoc.ep;
22

33
import java.util.Iterator;
4+
import java.util.ArrayList;
5+
import java.util.function.Consumer;
46
import java.util.stream.Stream;
57

68
import com.redhat.lightblue.crud.DocCtx;
@@ -12,6 +14,7 @@
1214
public class StepResultDocumentStream implements DocumentStream<DocCtx> {
1315

1416
private final Iterator<DocCtx> itr;
17+
private final ArrayList<Consumer<DocCtx>> listeners=new ArrayList<>();
1518

1619
public StepResultDocumentStream(StepResult<DocCtx> result) {
1720
this.itr=result.stream().iterator();
@@ -24,9 +27,17 @@ public boolean hasNext() {
2427

2528
@Override
2629
public DocCtx next() {
27-
return itr.next();
30+
DocCtx doc=itr.next();
31+
for(Consumer<DocCtx> c:listeners)
32+
c.accept(doc);
33+
return doc;
2834
}
2935

3036
@Override
3137
public void close() {}
38+
39+
@Override
40+
public void tee(Consumer<DocCtx> listener) {
41+
listeners.add(listener);
42+
}
3243
}

crud/src/main/java/com/redhat/lightblue/crud/DocumentStream.java

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
package com.redhat.lightblue.crud;
2020

2121
import java.util.Iterator;
22+
import java.util.ArrayList;
2223
import java.util.function.Function;
24+
import java.util.function.Consumer;
2325

2426
/**
2527
* This interface is used to stream documents. A call to getDocuments
@@ -31,6 +33,11 @@ public interface DocumentStream<T> extends Iterator<T>{
3133
*/
3234
void close();
3335

36+
/**
37+
* Adds a listener that will be called for each document of the stream when next() is called
38+
*/
39+
void tee(Consumer<T> dest);
40+
3441
/**
3542
* returns true if rewind works
3643
*/
@@ -42,19 +49,44 @@ public interface DocumentStream<T> extends Iterator<T>{
4249
default DocumentStream<T> rewind() { throw new UnsupportedOperationException(); }
4350

4451
public static <S,D> DocumentStream<D> map(final DocumentStream<S> source,final Function<S,D> map) {
45-
return new DocumentStream<D>() {
46-
@Override
47-
public boolean hasNext() {
48-
return source.hasNext();
49-
}
50-
@Override
51-
public void close() {
52-
source.close();
53-
}
54-
@Override
55-
public D next() {
56-
return map.apply(source.next());
57-
}
58-
};
52+
return new DocumentStreamMapper<S,D>(source,map);
5953
}
54+
55+
static final class DocumentStreamMapper<S,D> implements DocumentStream<D> {
56+
private final ArrayList<Consumer<D>> listeners=new ArrayList<>();
57+
private final DocumentStream<S> source;
58+
private final Function<S,D> map;
59+
DocumentStreamMapper(DocumentStream<S> source,Function<S,D> map) {
60+
this.source=source;
61+
this.map=map;
62+
}
63+
@Override
64+
public boolean hasNext() {
65+
return source.hasNext();
66+
}
67+
@Override
68+
public void close() {
69+
source.close();
70+
}
71+
@Override
72+
public D next() {
73+
D d=map.apply(source.next());
74+
for(Consumer<D> c:listeners)
75+
c.accept(d);
76+
return d;
77+
}
78+
@Override
79+
public boolean canRewind() {
80+
return source.canRewind();
81+
}
82+
@Override
83+
public DocumentStream<D> rewind() {
84+
return new DocumentStreamMapper<S,D>(source.rewind(),map);
85+
}
86+
@Override
87+
public void tee(Consumer<D> t) {
88+
listeners.add(t);
89+
}
90+
}
91+
6092
}

crud/src/main/java/com/redhat/lightblue/crud/ListDocumentStream.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import java.util.Iterator;
2222
import java.util.List;
23+
import java.util.ArrayList;
24+
import java.util.function.Consumer;
2325

2426
/**
2527
* Default trivial implementation of document stream that uses a list to hold its elements
@@ -28,6 +30,7 @@ public class ListDocumentStream<T> implements DocumentStream<T> {
2830

2931
private final List<T> documents;
3032
private Iterator<T> itr;
33+
private final ArrayList<Consumer<T>> listeners=new ArrayList<>();
3134

3235
public ListDocumentStream(List<T> list) {
3336
this.documents=list;
@@ -58,9 +61,17 @@ public boolean hasNext() {
5861
public T next() {
5962
if(itr==null)
6063
itr=documents.iterator();
61-
return itr.next();
64+
T doc=itr.next();
65+
for(Consumer<T> c:listeners)
66+
c.accept(doc);
67+
return doc;
6268
}
6369

6470
@Override
6571
public void close() {}
72+
73+
@Override
74+
public void tee(Consumer<T> listener) {
75+
listeners.add(listener);
76+
}
6677
}

crud/src/main/java/com/redhat/lightblue/hooks/HookManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ private void queueHooks(CRUDOperationContext ctx, boolean mediatorHooks) {
309309
// Queue the hooks
310310
queuedHooks.addAll(hookCache.values());
311311
} else {
312-
// TODO: Use a closure to tap into the stream
312+
// Stream is one-way, and cannot be rewound
313313
}
314314
}
315315
}

metadata/src/main/java/com/redhat/lightblue/hooks/CRUDHook.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ public interface CRUDHook {
3636
/**
3737
* Process the hook
3838
*
39-
* @param ctx The operation context. This contains the operation performed
40-
* on the entity, and the complete list of documents
39+
* @param md Entity metadata
4140
* @param cfg The hook configuration as specified in the metadata
4241
* @param processedDocuments Contains the list of documents that are
4342
* operated on. If the hook specifies a projection in metadata, the input

0 commit comments

Comments
 (0)