Skip to content

Commit 49f24df

Browse files
author
mgeipel
committed
quick fix for #108.
1 parent a5083f5 commit 49f24df

File tree

4 files changed

+128
-1
lines changed

4 files changed

+128
-1
lines changed

examples/beacon/create/create.flux

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
//creates a beacon file based on a pica+ dump of the DNB CBS data.
22

3+
default type = "ALL";
34
default out = dump + "-" + type + ".beacon";
45
default header = FLUX_DIR + "header.txt";
5-
default type = "ALL";
6+
67

78
//read header
89
"reading header " + header | write("stdout");
@@ -28,6 +29,7 @@ template("${s}")|
2829
@Y;
2930

3031
@Y|
32+
wait-for-inputs("2")|
3133
write(out);
3234

3335

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2013 Deutsche Nationalbibliothek
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.culturegraph.mf.stream.pipe;
17+
18+
import org.culturegraph.mf.framework.ObjectPipe;
19+
import org.culturegraph.mf.framework.ObjectReceiver;
20+
import org.culturegraph.mf.framework.annotations.Description;
21+
import org.culturegraph.mf.framework.annotations.In;
22+
import org.culturegraph.mf.framework.annotations.Out;
23+
24+
/**
25+
*
26+
* @param <T>
27+
* object type
28+
*
29+
* @author Markus Geipel
30+
*
31+
*/
32+
@Description("")
33+
@In(Object.class)
34+
@Out(Object.class)
35+
public final class CloseSupressor<T> implements ObjectPipe<T, ObjectReceiver<T>> {
36+
37+
private ObjectReceiver<T> receiver;
38+
private final int numCloses;
39+
private int count;
40+
41+
public CloseSupressor(final int numCloses) {
42+
this.numCloses = numCloses;
43+
}
44+
45+
public CloseSupressor(final String numCloses) {
46+
this.numCloses = Integer.parseInt(numCloses);
47+
}
48+
49+
@Override
50+
public void process(final T obj) {
51+
if (receiver != null) {
52+
receiver.process(obj);
53+
}
54+
}
55+
56+
@Override
57+
public <R extends ObjectReceiver<T>> R setReceiver(final R receiver) {
58+
this.receiver = receiver;
59+
return receiver;
60+
}
61+
62+
@Override
63+
public void resetStream() {
64+
count = 0;
65+
if (receiver != null) {
66+
receiver.resetStream();
67+
}
68+
}
69+
70+
@Override
71+
public void closeStream() {
72+
++count;
73+
if (count == numCloses && receiver != null) {
74+
receiver.closeStream();
75+
}
76+
}
77+
}

src/main/resources/flux-commands.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ collect-triples org.culturegraph.mf.stream.pipe.sort.TripleCollect
1616
stream-to-triples org.culturegraph.mf.stream.converter.StreamToTriples
1717
filter-triples org.culturegraph.mf.stream.pipe.TripleFilter
1818

19+
calculate-metrics org.culturegraph.mf.stream.pipe.stat.CooccurrenceMetricCalculator
20+
1921
jscript org.culturegraph.mf.stream.pipe.JScriptObjectPipe
2022

2123
as-lines org.culturegraph.mf.stream.converter.LineReader
@@ -66,6 +68,7 @@ filter-duplicate-objects org.culturegraph.mf.stream.pipe.DuplicateObjectFilter
6668

6769
object-tee org.culturegraph.mf.stream.pipe.ObjectTee
6870
stream-tee org.culturegraph.mf.stream.pipe.StreamTee
71+
wait-for-inputs org.culturegraph.mf.stream.pipe.CloseSupressor
6972

7073
stream-to-xml org.culturegraph.mf.stream.sink.SimpleXmlWriter
7174
rdf-macros org.culturegraph.mf.stream.pipe.RdfMacroPipe
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2013 Deutsche Nationalbibliothek
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.culturegraph.mf.stream.pipe;
17+
18+
import org.culturegraph.mf.framework.ObjectReceiver;
19+
import org.junit.Test;
20+
import org.mockito.Mockito;
21+
22+
/**
23+
* Tests for {@link CloseSupressor}.
24+
*
25+
* @author Markus Geipel
26+
*/
27+
public final class CloseSuppressorTest {
28+
29+
@SuppressWarnings("unchecked")
30+
@Test
31+
public void testSuppression() {
32+
33+
final CloseSupressor<Object> supressor = new CloseSupressor<Object>(3);
34+
final ObjectReceiver<Object> receiver = Mockito.mock(ObjectReceiver.class);
35+
supressor.setReceiver(receiver);
36+
supressor.closeStream();
37+
supressor.closeStream();
38+
Mockito.verifyZeroInteractions(receiver);
39+
supressor.closeStream();
40+
Mockito.verify(receiver).closeStream();
41+
supressor.closeStream();
42+
Mockito.verifyNoMoreInteractions(receiver);
43+
}
44+
45+
}

0 commit comments

Comments
 (0)