Skip to content

Commit eb1cb22

Browse files
committed
Add ObjectThreader for multithreading using pipelines
The ObjectThreader divides incoming objects and passes them to receivers, each internally plumbed with an ObjectPipeDecoupler. Thus, a multithreaded object pipeline is created. The receivers are to be added like every Tee receiver. - change methods of DefaultTee to not be final - add test The ObjectThreader makes use of the DefaultTee and overrides two methods, so they musn't be final. See hbz/lobid-resources#967.
1 parent ccdad61 commit eb1cb22

File tree

3 files changed

+146
-2
lines changed

3 files changed

+146
-2
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/* Copyright 2019 hbz, Pascal Christoph.
2+
*
3+
* Licensed under the Apache License, Version 2.0 the "License";
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package org.metafacture.flowcontrol;
17+
18+
import org.metafacture.framework.FluxCommand;
19+
import org.metafacture.framework.ObjectPipe;
20+
import org.metafacture.framework.ObjectReceiver;
21+
import org.metafacture.framework.Tee;
22+
import org.metafacture.framework.annotations.Description;
23+
import org.metafacture.framework.annotations.In;
24+
import org.metafacture.framework.annotations.Out;
25+
import org.metafacture.framework.helpers.DefaultTee;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
/**
30+
* Divides incoming objects and distributes them to added receivers. These
31+
* receivers are coupled with an
32+
* {@link org.metafacture.flowcontrol.ObjectPipeDecoupler}, so each added
33+
* receiver runs in its own thread.
34+
*
35+
* @param <T> Object type
36+
*
37+
* @author Pascal Christoph(dr0i)
38+
*
39+
*/
40+
@In(Object.class)
41+
@Out(Object.class)
42+
@Description("incoming objects are distributed to the added receivers, running in their own threads")
43+
@FluxCommand("threader")
44+
public class ObjectThreader<T> extends DefaultTee<ObjectReceiver<T>> implements ObjectPipe<T, ObjectReceiver<T>> {
45+
46+
private static final Logger LOG = LoggerFactory.getLogger(ObjectThreader.class);
47+
private int objectNumber = 0;
48+
49+
@Override
50+
public void process(final T obj) {
51+
getReceivers().get(objectNumber).process(obj);
52+
if (objectNumber == getReceivers().size() - 1)
53+
objectNumber = 0;
54+
else
55+
objectNumber++;
56+
}
57+
58+
@Override
59+
public <R extends ObjectReceiver<T>> R setReceiver(final R receiver) {
60+
return super.setReceiver(new ObjectPipeDecoupler<T>().setReceiver(receiver));
61+
}
62+
63+
@Override
64+
public Tee<ObjectReceiver<T>> addReceiver(final ObjectReceiver<T> receiver) {
65+
LOG.info("Adding thread " + (getReceivers().size() + 1));
66+
ObjectPipeDecoupler<T> opd = new ObjectPipeDecoupler<>();
67+
opd.setReceiver(receiver);
68+
return super.addReceiver(opd);
69+
}
70+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2019 hbz
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.metafacture.flowcontrol;
17+
18+
import static org.mockito.Mockito.verify;
19+
import static org.mockito.Mockito.atMost;
20+
import static org.mockito.Mockito.atLeast;
21+
22+
import org.junit.Before;
23+
import org.junit.Test;
24+
import org.metafacture.flowcontrol.ObjectThreader;
25+
import org.metafacture.framework.ObjectReceiver;
26+
27+
import org.mockito.Mock;
28+
import org.mockito.MockitoAnnotations;
29+
30+
/**
31+
* Tests for class {@link ObjectThreader} (which itself uses
32+
* {@link org.metafacture.flowcontrol.ObjectPipeDecoupler} to thread receivers).
33+
*
34+
* @author Pascal Christoph(dr0i)
35+
*
36+
*/
37+
public final class ObjectThreaderTest {
38+
39+
@Mock
40+
private ObjectReceiver<String> receiverThread1;
41+
@Mock
42+
private ObjectReceiver<String> receiverThread2;
43+
44+
private final ObjectThreader<String> objectThreader = new ObjectThreader<>();
45+
private static final int ACTIVE_THREADS_AT_BEGINNING = Thread.getAllStackTraces().keySet().size();
46+
47+
@Before
48+
public void setup() {
49+
MockitoAnnotations.initMocks(this);
50+
objectThreader//
51+
.addReceiver(receiverThread1)//
52+
.addReceiver(receiverThread2);
53+
}
54+
55+
@Test
56+
public void shouldSplitAllObjectsToAllThreadedDownStreamReceivers() throws InterruptedException {
57+
objectThreader.process("a");
58+
objectThreader.process("b");
59+
objectThreader.process("a");
60+
objectThreader.process("c");
61+
// check if two more threads were indeed created
62+
assert (Thread.getAllStackTraces().keySet().size() - ACTIVE_THREADS_AT_BEGINNING == 2);
63+
objectThreader.closeStream();
64+
// verify thread 1
65+
verify(receiverThread1, atLeast(2)).process("a");
66+
verify(receiverThread1, atMost(0)).process("b");
67+
verify(receiverThread1, atMost(0)).process("c");
68+
// verify thread 2
69+
verify(receiverThread2, atMost(0)).process("a");
70+
verify(receiverThread2, atLeast(1)).process("b");
71+
verify(receiverThread2, atLeast(1)).process("c");
72+
}
73+
74+
}

metafacture-framework/src/main/java/org/metafacture/framework/helpers/DefaultTee.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class DefaultTee<T extends Receiver> implements Tee<T> {
3535
private final List<T> receivers = new ArrayList<T>();
3636

3737
@Override
38-
public final <R extends T> R setReceiver(final R receiver) {
38+
public <R extends T> R setReceiver(final R receiver) {
3939
receivers.clear();
4040
receivers.add(receiver);
4141
onChangeReceivers();
@@ -52,7 +52,7 @@ public final <R extends T> R setReceivers(final R receiver, final T lateralRecei
5252
}
5353

5454
@Override
55-
public final Tee<T> addReceiver(final T receiver) {
55+
public Tee<T> addReceiver(final T receiver) {
5656
receivers.add(receiver);
5757
onChangeReceivers();
5858
return this;

0 commit comments

Comments
 (0)