Skip to content

Commit 4037063

Browse files
committed
Implement Tee directly, revert final removals
See #294
1 parent f816929 commit 4037063

File tree

2 files changed

+50
-13
lines changed

2 files changed

+50
-13
lines changed

metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectThreader.java

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright 2019 Pascal Christoph, hbz.
1+
/* Copyright 2019 Pascal Christoph (hbz), and others.
22
*
33
* Licensed under the Apache License, Version 2.0 the "License";
44
* you may not use this file except in compliance with the License.
@@ -15,14 +15,16 @@
1515

1616
package org.metafacture.flowcontrol;
1717

18+
import java.util.ArrayList;
19+
import java.util.List;
20+
1821
import org.metafacture.framework.FluxCommand;
1922
import org.metafacture.framework.ObjectPipe;
2023
import org.metafacture.framework.ObjectReceiver;
2124
import org.metafacture.framework.Tee;
2225
import org.metafacture.framework.annotations.Description;
2326
import org.metafacture.framework.annotations.In;
2427
import org.metafacture.framework.annotations.Out;
25-
import org.metafacture.framework.helpers.DefaultTee;
2628
import org.slf4j.Logger;
2729
import org.slf4j.LoggerFactory;
2830

@@ -35,37 +37,72 @@
3537
* @param <T> Object type
3638
*
3739
* @author Pascal Christoph (dr0i)
40+
* @author Fabian Steeg (fsteeg)
3841
*
3942
*/
4043
@In(Object.class)
4144
@Out(Object.class)
4245
@Description("Incoming objects are distributed to the added receivers, running in their own threads.")
4346
@FluxCommand("thread-object-tee")
44-
public class ObjectThreader<T> extends DefaultTee<ObjectReceiver<T>> implements ObjectPipe<T, ObjectReceiver<T>> {
47+
public class ObjectThreader<T> implements Tee<ObjectReceiver<T>>, ObjectPipe<T, ObjectReceiver<T>> {
4548

4649
private static final Logger LOG = LoggerFactory.getLogger(ObjectThreader.class);
50+
private final List<ObjectReceiver<T>> receivers = new ArrayList<ObjectReceiver<T>>();
4751
private int objectNumber = 0;
4852

4953
@Override
5054
public void process(final T obj) {
51-
getReceivers().get(objectNumber).process(obj);
52-
if (objectNumber == getReceivers().size() - 1) {
55+
receivers.get(objectNumber).process(obj);
56+
if (objectNumber == receivers.size() - 1) {
5357
objectNumber = 0;
5458
} else {
5559
objectNumber++;
5660
}
5761
}
5862

63+
@Override
64+
public Tee<ObjectReceiver<T>> addReceiver(final ObjectReceiver<T> receiver) {
65+
LOG.info("Adding thread {}", (receivers.size() + 1));
66+
ObjectPipeDecoupler<T> opd = new ObjectPipeDecoupler<>();
67+
opd.setReceiver(receiver);
68+
receivers.add(opd);
69+
return this;
70+
}
71+
5972
@Override
6073
public <R extends ObjectReceiver<T>> R setReceiver(final R receiver) {
61-
return super.setReceiver(new ObjectPipeDecoupler<T>().setReceiver(receiver));
74+
receivers.clear();
75+
addReceiver(receiver);
76+
return receiver;
6277
}
6378

6479
@Override
65-
public Tee<ObjectReceiver<T>> addReceiver(final ObjectReceiver<T> receiver) {
66-
LOG.info("Adding thread {}", (getReceivers().size() + 1));
67-
ObjectPipeDecoupler<T> opd = new ObjectPipeDecoupler<>();
68-
opd.setReceiver(receiver);
69-
return super.addReceiver(opd);
80+
public <R extends ObjectReceiver<T>> R setReceivers(R receiver, ObjectReceiver<T> lateralReceiver) {
81+
receivers.clear();
82+
addReceiver(receiver);
83+
addReceiver(lateralReceiver);
84+
return receiver;
85+
}
86+
87+
@Override
88+
public void resetStream() {
89+
receivers.forEach(ObjectReceiver::resetStream);
90+
}
91+
92+
@Override
93+
public void closeStream() {
94+
receivers.forEach(ObjectReceiver::closeStream);
95+
}
96+
97+
@Override
98+
public Tee<ObjectReceiver<T>> removeReceiver(ObjectReceiver<T> receiver) {
99+
receivers.remove(receiver);
100+
return this;
101+
}
102+
103+
@Override
104+
public Tee<ObjectReceiver<T>> clearReceivers() {
105+
receivers.clear();
106+
return this;
70107
}
71108
}

metafacture-framework/src/main/java/org/metafacture/framework/helpers/DefaultTee.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class DefaultTee<T extends Receiver> implements Tee<T> {
3535
private final List<T> receivers = new ArrayList<T>();
3636

3737
@Override
38-
public <R extends T> R setReceiver(final R receiver) {
38+
public final <R extends T> R setReceiver(final R receiver) {
3939
receivers.clear();
4040
receivers.add(receiver);
4141
onChangeReceivers();
@@ -52,7 +52,7 @@ public final <R extends T> R setReceivers(final R receiver, final T lateralRecei
5252
}
5353

5454
@Override
55-
public Tee<T> addReceiver(final T receiver) {
55+
public final Tee<T> addReceiver(final T receiver) {
5656
receivers.add(receiver);
5757
onChangeReceivers();
5858
return this;

0 commit comments

Comments
 (0)