Skip to content

Commit 7e744c7

Browse files
committed
metafacture-flowcontrol/ (main): Fix Checkstyle violations.
1 parent cfef405 commit 7e744c7

15 files changed

+108
-75
lines changed

metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/CloseSuppressor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
1617
package org.metafacture.flowcontrol;
1718

1819
import org.metafacture.framework.FluxCommand;
@@ -59,9 +60,9 @@ public void process(final T obj) {
5960
}
6061

6162
@Override
62-
public <R extends ObjectReceiver<T>> R setReceiver(final R receiver) {
63-
this.receiver = receiver;
64-
return receiver;
63+
public <R extends ObjectReceiver<T>> R setReceiver(final R newReceiver) {
64+
receiver = newReceiver;
65+
return newReceiver;
6566
}
6667

6768
@Override

metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectBatchResetter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
1617
package org.metafacture.flowcontrol;
1718

1819
import org.metafacture.framework.FluxCommand;
@@ -41,6 +42,9 @@ public class ObjectBatchResetter<T> extends DefaultObjectPipe<T, ObjectReceiver<
4142
private long batchCount;
4243
private int objectCount;
4344

45+
public ObjectBatchResetter() {
46+
}
47+
4448
/**
4549
* Number of objects after which a <i>reset-stream</i> event is triggered.
4650
* <p>
@@ -53,8 +57,7 @@ public class ObjectBatchResetter<T> extends DefaultObjectPipe<T, ObjectReceiver<
5357
* @param batchSize number of objects before a <i>reset-stream</i> event is
5458
* triggered
5559
*/
56-
public void setBatchSize(int batchSize) {
57-
60+
public void setBatchSize(final int batchSize) {
5861
this.batchSize = batchSize;
5962
}
6063

metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectExceptionCatcher.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,21 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package org.metafacture.flowcontrol;
1716

18-
import java.io.PrintWriter;
19-
import java.io.StringWriter;
17+
package org.metafacture.flowcontrol;
2018

2119
import org.metafacture.framework.FluxCommand;
2220
import org.metafacture.framework.ObjectReceiver;
2321
import org.metafacture.framework.annotations.Description;
2422
import org.metafacture.framework.annotations.In;
2523
import org.metafacture.framework.annotations.Out;
2624
import org.metafacture.framework.helpers.DefaultObjectPipe;
25+
2726
import org.slf4j.Logger;
2827
import org.slf4j.LoggerFactory;
2928

29+
import java.io.PrintWriter;
30+
import java.io.StringWriter;
3031

3132
/**
3233
* Wraps the call to the process method of the downstream module
@@ -57,7 +58,6 @@ public ObjectExceptionCatcher() {
5758
}
5859

5960
public ObjectExceptionCatcher(final String logPrefix) {
60-
super();
6161
this.logPrefix = logPrefix;
6262
}
6363

@@ -81,7 +81,8 @@ public boolean isLogStackTrace() {
8181
public void process(final T obj) {
8282
try {
8383
getReceiver().process(obj);
84-
} catch(final Exception e) {
84+
}
85+
catch (final Exception e) { // checkstyle-disable-line IllegalCatch
8586
LOG.error("{}'{}' while processing object: {}", logPrefix, e.getMessage(), obj);
8687

8788
if (logStackTrace) {

metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectPipeDecoupler.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,22 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package org.metafacture.flowcontrol;
1716

18-
import java.util.concurrent.BlockingQueue;
19-
import java.util.concurrent.LinkedBlockingQueue;
17+
package org.metafacture.flowcontrol;
2018

2119
import org.metafacture.framework.FluxCommand;
2220
import org.metafacture.framework.ObjectPipe;
2321
import org.metafacture.framework.ObjectReceiver;
2422
import org.metafacture.framework.annotations.Description;
2523
import org.metafacture.framework.annotations.In;
2624
import org.metafacture.framework.annotations.Out;
25+
2726
import org.slf4j.Logger;
2827
import org.slf4j.LoggerFactory;
2928

29+
import java.util.concurrent.BlockingQueue;
30+
import java.util.concurrent.LinkedBlockingQueue;
31+
3032
/**
3133
* Creates a new thread in which subsequent flow elements run.
3234
*
@@ -76,7 +78,8 @@ public void process(final T obj) {
7678
if (debug) {
7779
LOG.info("Current buffer size: {}", queue.size());
7880
}
79-
} catch (InterruptedException e) {
81+
}
82+
catch (final InterruptedException e) {
8083
Thread.currentThread().interrupt();
8184
}
8285
}
@@ -87,20 +90,21 @@ private void start() {
8790
}
8891

8992
@Override
90-
public <R extends ObjectReceiver<T>> R setReceiver(final R receiver) {
93+
public <R extends ObjectReceiver<T>> R setReceiver(final R newReceiver) {
9194
if (null != thread) {
9295
throw new IllegalStateException("Receiver cannot be changed while processing thread is running.");
9396
}
9497

95-
this.receiver = receiver;
96-
return receiver;
98+
receiver = newReceiver;
99+
return newReceiver;
97100
}
98101

99102
@Override
100103
public void resetStream() {
101104
try {
102105
queue.put(Feeder.BLUE_PILL);
103-
} catch (InterruptedException e) {
106+
}
107+
catch (final InterruptedException e) {
104108
Thread.currentThread().interrupt();
105109
}
106110
}
@@ -110,7 +114,8 @@ public void closeStream() {
110114
try {
111115
queue.put(Feeder.RED_PILL);
112116
thread.join();
113-
} catch (InterruptedException e) {
117+
}
118+
catch (final InterruptedException e) {
114119
Thread.currentThread().interrupt();
115120
}
116121
thread = null;
@@ -128,7 +133,7 @@ static final class Feeder<T> implements Runnable {
128133
private final ObjectReceiver<T> receiver;
129134
private final BlockingQueue<Object> queue;
130135

131-
public Feeder(final ObjectReceiver<T> receiver, final BlockingQueue<Object> queue) {
136+
Feeder(final ObjectReceiver<T> receiver, final BlockingQueue<Object> queue) {
132137
this.receiver = receiver;
133138
this.queue = queue;
134139
}
@@ -150,7 +155,8 @@ public void run() {
150155
}
151156
receiver.process((T) object);
152157
}
153-
} catch (InterruptedException e) {
158+
}
159+
catch (final InterruptedException e) {
154160
Thread.currentThread().interrupt();
155161
return;
156162
}

metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectThreader.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,20 @@
1515

1616
package org.metafacture.flowcontrol;
1717

18-
import java.util.ArrayList;
19-
import java.util.List;
20-
2118
import org.metafacture.framework.FluxCommand;
2219
import org.metafacture.framework.ObjectPipe;
2320
import org.metafacture.framework.ObjectReceiver;
2421
import org.metafacture.framework.Tee;
2522
import org.metafacture.framework.annotations.Description;
2623
import org.metafacture.framework.annotations.In;
2724
import org.metafacture.framework.annotations.Out;
25+
2826
import org.slf4j.Logger;
2927
import org.slf4j.LoggerFactory;
3028

29+
import java.util.ArrayList;
30+
import java.util.List;
31+
3132
/**
3233
* Divides incoming objects and distributes them to added receivers. These
3334
* receivers are coupled with an
@@ -48,22 +49,26 @@ public class ObjectThreader<T> implements Tee<ObjectReceiver<T>>, ObjectPipe<T,
4849

4950
private static final Logger LOG = LoggerFactory.getLogger(ObjectThreader.class);
5051
private final List<ObjectReceiver<T>> receivers = new ArrayList<ObjectReceiver<T>>();
51-
private int objectNumber = 0;
52+
private int objectNumber;
53+
54+
public ObjectThreader() {
55+
}
5256

5357
@Override
5458
public void process(final T obj) {
5559
receivers.get(objectNumber).process(obj);
5660
if (objectNumber == receivers.size() - 1) {
5761
objectNumber = 0;
58-
} else {
59-
objectNumber++;
62+
}
63+
else {
64+
++objectNumber;
6065
}
6166
}
6267

6368
@Override
6469
public Tee<ObjectReceiver<T>> addReceiver(final ObjectReceiver<T> receiver) {
65-
LOG.info("Adding thread {}", (receivers.size() + 1));
66-
ObjectPipeDecoupler<T> opd = new ObjectPipeDecoupler<>();
70+
LOG.info("Adding thread {}", receivers.size() + 1);
71+
final ObjectPipeDecoupler<T> opd = new ObjectPipeDecoupler<>();
6772
opd.setReceiver(receiver);
6873
receivers.add(opd);
6974
return this;
@@ -77,7 +82,7 @@ public <R extends ObjectReceiver<T>> R setReceiver(final R receiver) {
7782
}
7883

7984
@Override
80-
public <R extends ObjectReceiver<T>> R setReceivers(R receiver, ObjectReceiver<T> lateralReceiver) {
85+
public <R extends ObjectReceiver<T>> R setReceivers(final R receiver, final ObjectReceiver<T> lateralReceiver) {
8186
receivers.clear();
8287
addReceiver(receiver);
8388
addReceiver(lateralReceiver);
@@ -95,7 +100,7 @@ public void closeStream() {
95100
}
96101

97102
@Override
98-
public Tee<ObjectReceiver<T>> removeReceiver(ObjectReceiver<T> receiver) {
103+
public Tee<ObjectReceiver<T>> removeReceiver(final ObjectReceiver<T> receiver) {
99104
receivers.remove(receiver);
100105
return this;
101106
}

metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/StreamBatchResetter.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
1617
package org.metafacture.flowcontrol;
1718

1819
import org.metafacture.framework.FluxCommand;
@@ -40,35 +41,38 @@ public final class StreamBatchResetter extends ForwardingStreamPipe {
4041
private long recordCount;
4142
private long batchCount;
4243

43-
public final void setBatchSize(final int batchSize) {
44+
public StreamBatchResetter() {
45+
}
46+
47+
public void setBatchSize(final int batchSize) {
4448
this.batchSize = batchSize;
4549
}
4650

47-
public final long getBatchSize() {
51+
public long getBatchSize() {
4852
return batchSize;
4953
}
5054

51-
public final long getBatchCount() {
55+
public long getBatchCount() {
5256
return batchCount;
5357
}
5458

55-
public final long getRecordCount() {
59+
public long getRecordCount() {
5660
return recordCount;
5761
}
5862

5963
@Override
60-
public final void endRecord() {
64+
public void endRecord() {
6165
getReceiver().endRecord();
62-
recordCount++;
66+
++recordCount;
6367
recordCount %= batchSize;
6468
if (recordCount == 0) {
65-
batchCount++;
69+
++batchCount;
6670
getReceiver().resetStream();
6771
}
6872
}
6973

7074
@Override
71-
protected final void onResetStream() {
75+
protected void onResetStream() {
7276
recordCount = 0;
7377
batchCount = 0;
7478
}

0 commit comments

Comments
 (0)