Skip to content

Commit cb43933

Browse files
committed
Merge pull request #294 from hbz-5.0.1-hbzAddTest
2 parents ccdad61 + 4037063 commit cb43933

File tree

5 files changed

+199
-7
lines changed

5 files changed

+199
-7
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ build
2929
.classpath
3030
.project
3131
.settings/
32+
bin/
3233

3334
# Ignore IntelliJ project files:
3435
*.ipr

metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectPipeDecoupler.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013, 2014 Deutsche Nationalbibliothek
2+
* Copyright 2013-2019 Deutsche Nationalbibliothek and others
33
*
44
* Licensed under the Apache License, Version 2.0 the "License";
55
* you may not use this file except in compliance with the License.
@@ -33,6 +33,7 @@
3333
* @param <T> Object type
3434
*
3535
* @author Markus Micheal Geipel
36+
* @author Pascal Christoph (dr0i)
3637
*/
3738
@In(Object.class)
3839
@Out(Object.class)
@@ -49,15 +50,15 @@ public final class ObjectPipeDecoupler<T> implements ObjectPipe<T, ObjectReceive
4950
private boolean debug;
5051

5152
public ObjectPipeDecoupler() {
52-
queue = new LinkedBlockingQueue<Object>(DEFAULT_CAPACITY);
53+
queue = new LinkedBlockingQueue<>(DEFAULT_CAPACITY);
5354
}
5455

5556
public ObjectPipeDecoupler(final int capacity) {
56-
queue = new LinkedBlockingQueue<Object>(capacity);
57+
queue = new LinkedBlockingQueue<>(capacity);
5758
}
5859

5960
public ObjectPipeDecoupler(final String capacity) {
60-
queue = new LinkedBlockingQueue<Object>(Integer.parseInt(capacity));
61+
queue = new LinkedBlockingQueue<>(Integer.parseInt(capacity));
6162
}
6263

6364
public void setDebug(final boolean debug) {
@@ -73,7 +74,7 @@ public void process(final T obj) {
7374
try {
7475
queue.put(obj);
7576
if (debug) {
76-
LOG.info("Current buffer size: " + queue.size());
77+
LOG.info("Current buffer size: {}", queue.size());
7778
}
7879
} catch (InterruptedException e) {
7980
Thread.currentThread().interrupt();
@@ -97,13 +98,17 @@ public <R extends ObjectReceiver<T>> R setReceiver(final R receiver) {
9798

9899
@Override
99100
public void resetStream() {
100-
queue.add(Feeder.BLUE_PILL);
101+
try {
102+
queue.put(Feeder.BLUE_PILL);
103+
} catch (InterruptedException e) {
104+
Thread.currentThread().interrupt();
105+
}
101106
}
102107

103108
@Override
104109
public void closeStream() {
105-
queue.add(Feeder.RED_PILL);
106110
try {
111+
queue.put(Feeder.RED_PILL);
107112
thread.join();
108113
} catch (InterruptedException e) {
109114
Thread.currentThread().interrupt();
@@ -147,6 +152,7 @@ public void run() {
147152
}
148153
} catch (InterruptedException e) {
149154
Thread.currentThread().interrupt();
155+
return;
150156
}
151157
}
152158
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/* Copyright 2019 Pascal Christoph (hbz), and others.
2+
*
3+
* Licensed under the Apache License, Version 2.0 the "License";
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package org.metafacture.flowcontrol;
17+
18+
import java.util.ArrayList;
19+
import java.util.List;
20+
21+
import org.metafacture.framework.FluxCommand;
22+
import org.metafacture.framework.ObjectPipe;
23+
import org.metafacture.framework.ObjectReceiver;
24+
import org.metafacture.framework.Tee;
25+
import org.metafacture.framework.annotations.Description;
26+
import org.metafacture.framework.annotations.In;
27+
import org.metafacture.framework.annotations.Out;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
/**
32+
* Divides incoming objects and distributes them to added receivers. These
33+
* receivers are coupled with an
34+
* {@link org.metafacture.flowcontrol.ObjectPipeDecoupler}, so each added
35+
* receiver runs in its own thread.
36+
*
37+
* @param <T> Object type
38+
*
39+
* @author Pascal Christoph (dr0i)
40+
* @author Fabian Steeg (fsteeg)
41+
*
42+
*/
43+
@In(Object.class)
44+
@Out(Object.class)
45+
@Description("Incoming objects are distributed to the added receivers, running in their own threads.")
46+
@FluxCommand("thread-object-tee")
47+
public class ObjectThreader<T> implements Tee<ObjectReceiver<T>>, ObjectPipe<T, ObjectReceiver<T>> {
48+
49+
private static final Logger LOG = LoggerFactory.getLogger(ObjectThreader.class);
50+
private final List<ObjectReceiver<T>> receivers = new ArrayList<ObjectReceiver<T>>();
51+
private int objectNumber = 0;
52+
53+
@Override
54+
public void process(final T obj) {
55+
receivers.get(objectNumber).process(obj);
56+
if (objectNumber == receivers.size() - 1) {
57+
objectNumber = 0;
58+
} else {
59+
objectNumber++;
60+
}
61+
}
62+
63+
@Override
64+
public Tee<ObjectReceiver<T>> addReceiver(final ObjectReceiver<T> receiver) {
65+
LOG.info("Adding thread {}", (receivers.size() + 1));
66+
ObjectPipeDecoupler<T> opd = new ObjectPipeDecoupler<>();
67+
opd.setReceiver(receiver);
68+
receivers.add(opd);
69+
return this;
70+
}
71+
72+
@Override
73+
public <R extends ObjectReceiver<T>> R setReceiver(final R receiver) {
74+
receivers.clear();
75+
addReceiver(receiver);
76+
return receiver;
77+
}
78+
79+
@Override
80+
public <R extends ObjectReceiver<T>> R setReceivers(R receiver, ObjectReceiver<T> lateralReceiver) {
81+
receivers.clear();
82+
addReceiver(receiver);
83+
addReceiver(lateralReceiver);
84+
return receiver;
85+
}
86+
87+
@Override
88+
public void resetStream() {
89+
receivers.forEach(ObjectReceiver::resetStream);
90+
}
91+
92+
@Override
93+
public void closeStream() {
94+
receivers.forEach(ObjectReceiver::closeStream);
95+
}
96+
97+
@Override
98+
public Tee<ObjectReceiver<T>> removeReceiver(ObjectReceiver<T> receiver) {
99+
receivers.remove(receiver);
100+
return this;
101+
}
102+
103+
@Override
104+
public Tee<ObjectReceiver<T>> clearReceivers() {
105+
receivers.clear();
106+
return this;
107+
}
108+
}

metafacture-flowcontrol/src/main/resources/flux-commands.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ batch-reset org.metafacture.flowcontrol.StreamBatchResetter
2020
reset-object-batch org.metafacture.flowcontrol.ObjectBatchResetter
2121
defer-stream org.metafacture.flowcontrol.StreamDeferrer
2222
catch-stream-exception org.metafacture.flowcontrol.StreamExceptionCatcher
23+
thread-object-tee org.metafacture.flowcontrol.ObjectThreader
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2019 Pascal Christoph, hbz.
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.metafacture.flowcontrol;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import static org.mockito.Mockito.verify;
21+
import static org.mockito.Mockito.atMost;
22+
import static org.mockito.Mockito.atLeast;
23+
24+
import org.junit.Before;
25+
import org.junit.Test;
26+
import org.metafacture.flowcontrol.ObjectThreader;
27+
import org.metafacture.framework.ObjectReceiver;
28+
29+
import org.mockito.Mock;
30+
import org.mockito.MockitoAnnotations;
31+
32+
/**
33+
* Tests for class {@link ObjectThreader} (which itself uses
34+
* {@link org.metafacture.flowcontrol.ObjectPipeDecoupler} to thread receivers).
35+
*
36+
* @author Pascal Christoph (dr0i)
37+
*
38+
*/
39+
public final class ObjectThreaderTest {
40+
41+
@Mock
42+
private ObjectReceiver<String> receiverThread1;
43+
@Mock
44+
private ObjectReceiver<String> receiverThread2;
45+
46+
private final ObjectThreader<String> objectThreader = new ObjectThreader<>();
47+
private static final int ACTIVE_THREADS_AT_BEGINNING = Thread.getAllStackTraces().keySet().size();
48+
49+
@Before
50+
public void setup() {
51+
MockitoAnnotations.initMocks(this);
52+
objectThreader//
53+
.addReceiver(receiverThread1)//
54+
.addReceiver(receiverThread2);
55+
}
56+
57+
@Test
58+
public void shouldSplitAllObjectsToAllThreadedDownStreamReceivers() throws InterruptedException {
59+
objectThreader.process("a");
60+
objectThreader.process("b");
61+
objectThreader.process("a");
62+
objectThreader.process("c");
63+
// check if two more threads were indeed created
64+
assertThat(Thread.getAllStackTraces().keySet().size() - ACTIVE_THREADS_AT_BEGINNING).isEqualTo(2);
65+
objectThreader.closeStream();
66+
// verify thread 1
67+
verify(receiverThread1, atLeast(2)).process("a");
68+
verify(receiverThread1, atMost(0)).process("b");
69+
verify(receiverThread1, atMost(0)).process("c");
70+
// verify thread 2
71+
verify(receiverThread2, atMost(0)).process("a");
72+
verify(receiverThread2, atLeast(1)).process("b");
73+
verify(receiverThread2, atLeast(1)).process("c");
74+
}
75+
76+
}

0 commit comments

Comments
 (0)