Skip to content

Commit a911869

Browse files
committed
Add ObjectBatchResetter module
An object-processing module which emits a reset-stream event after it has processed a batch of records.
1 parent c92478f commit a911869

File tree

4 files changed

+232
-1
lines changed

4 files changed

+232
-1
lines changed

metafacture-flowcontrol/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,6 @@ dependencies {
2222
implementation 'org.slf4j:slf4j-api:1.7.21'
2323
testImplementation 'junit:junit:4.12'
2424
testImplementation 'org.mockito:mockito-core:2.5.5'
25+
testImplementation 'org.assertj:assertj-core:3.11.1'
2526
testRuntimeOnly 'org.slf4j:slf4j-simple:1.7.21'
2627
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright 2018 Deutsche Nationalbibliothek
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.metafacture.flowcontrol;
17+
18+
import org.metafacture.framework.FluxCommand;
19+
import org.metafacture.framework.ObjectReceiver;
20+
import org.metafacture.framework.annotations.Description;
21+
import org.metafacture.framework.annotations.In;
22+
import org.metafacture.framework.annotations.Out;
23+
import org.metafacture.framework.helpers.DefaultObjectPipe;
24+
25+
/**
26+
* Resets the downstream modules every {@link #setBatchSize(int) batch-size} objects.
27+
*
28+
* @param <T> object type
29+
* @author Christoph Böhme
30+
*/
31+
@Description("Resets the downstream modules every batch-size objects")
32+
@FluxCommand("reset-object-batch")
33+
@In(Object.class)
34+
@Out(Object.class)
35+
public class ObjectBatchResetter<T> extends DefaultObjectPipe<T, ObjectReceiver<T>> {
36+
37+
public static final int DEFAULT_BATCH_SIZE = 1000;
38+
39+
private int batchSize = DEFAULT_BATCH_SIZE;
40+
41+
private long batchCount;
42+
private int objectCount;
43+
44+
/**
45+
* Number of objects after which a <i>reset-stream</i> event is triggered.
46+
* <p>
47+
* The default value is {@value DEFAULT_BATCH_SIZE}.
48+
* <p>
49+
* This parameter can be changed anytime during processing. If the new value
50+
* is less than the number of received objects a <i>reset-stream</i> event is
51+
* emitted when the next object is received.
52+
*
53+
* @param batchSize number of objects before a <i>reset-stream</i> event is
54+
* triggered
55+
*/
56+
public void setBatchSize(int batchSize) {
57+
58+
this.batchSize = batchSize;
59+
}
60+
61+
public int getBatchSize() {
62+
return batchSize;
63+
}
64+
65+
/**
66+
* Returns the number of batches that were processed.
67+
* <p>
68+
* This counter is reset when this module receives a <i>reset-stream</i> event.
69+
*
70+
* @return number of batches
71+
*/
72+
public long getBatchCount() {
73+
return batchCount;
74+
}
75+
76+
/**
77+
* Returns the number of objects in the current batch.
78+
* <p>
79+
* This counter is reset after each batch and also when the module receives a <i>reset-stream</i> event.
80+
*
81+
* @return number of objects in the current batch
82+
*/
83+
public int getObjectCount() {
84+
return objectCount;
85+
}
86+
87+
@Override
88+
public void process(final T obj) {
89+
90+
getReceiver().process(obj);
91+
92+
objectCount += 1;
93+
if (objectCount >= batchSize) {
94+
getReceiver().resetStream();
95+
batchCount += 1;
96+
objectCount = 0;
97+
}
98+
}
99+
100+
@Override
101+
protected void onResetStream() {
102+
103+
batchCount = 0;
104+
objectCount = 0;
105+
}
106+
107+
}

metafacture-flowcontrol/src/main/resources/flux-commands.properties

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Copyright 2016 Christoph Böhme
2+
# Copyright 2016, 2018 Christoph Böhme, Deutsche Nationalbibliothek
33
#
44
# Licensed under the Apache License, Version 2.0 the "License";
55
# you may not use this file except in compliance with the License.
@@ -17,5 +17,6 @@ wait-for-inputs org.metafacture.flowcontrol.CloseSuppressor
1717
catch-object-exception org.metafacture.flowcontrol.ObjectExceptionCatcher
1818
decouple org.metafacture.flowcontrol.ObjectPipeDecoupler
1919
batch-reset org.metafacture.flowcontrol.StreamBatchResetter
20+
reset-object-batch org.metafacture.flowcontrol.ObjectBatchResetter
2021
defer-stream org.metafacture.flowcontrol.StreamDeferrer
2122
catch-stream-exception org.metafacture.flowcontrol.StreamExceptionCatcher
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright 2018 Deutsche Nationalbibliothek
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.metafacture.flowcontrol;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.mockito.Mockito.inOrder;
20+
import static org.mockito.Mockito.verifyNoMoreInteractions;
21+
22+
import org.junit.Before;
23+
import org.junit.Rule;
24+
import org.junit.Test;
25+
import org.metafacture.framework.ObjectReceiver;
26+
import org.mockito.InOrder;
27+
import org.mockito.Mock;
28+
import org.mockito.junit.MockitoJUnit;
29+
import org.mockito.junit.MockitoRule;
30+
31+
/**
32+
* Tests for class {@link ObjectBatchResetter}.
33+
*/
34+
public class ObjectBatchResetterTest {
35+
36+
private ObjectBatchResetter<String> systemUnderTest;
37+
38+
@Before
39+
public void setupSystemUnderTest() {
40+
41+
systemUnderTest = new ObjectBatchResetter<>();
42+
systemUnderTest.setReceiver(receiver);
43+
}
44+
45+
@Test
46+
public void shouldEmitResetStreamAfterBatchSizeObjects() {
47+
48+
systemUnderTest.setBatchSize(3);
49+
50+
systemUnderTest.process("1");
51+
systemUnderTest.process("2");
52+
systemUnderTest.process("3");
53+
54+
InOrder ordered = inOrder(receiver);
55+
ordered.verify(receiver).process("1");
56+
ordered.verify(receiver).process("2");
57+
ordered.verify(receiver).process("3");
58+
ordered.verify(receiver).resetStream();
59+
verifyNoMoreInteractions(receiver);
60+
}
61+
62+
@Test
63+
public void shouldIncreaseObjectCounterAfterEachObject() {
64+
65+
systemUnderTest.setBatchSize(3);
66+
67+
systemUnderTest.process("1");
68+
systemUnderTest.process("2");
69+
70+
assertThat(systemUnderTest.getObjectCount())
71+
.isEqualTo(2);
72+
}
73+
74+
@Test
75+
public void shouldResetObjectCountOnBatchCompletion() {
76+
77+
systemUnderTest.setBatchSize(2);
78+
79+
systemUnderTest.process("1");
80+
systemUnderTest.process("2");
81+
82+
assertThat(systemUnderTest.getObjectCount())
83+
.isZero();
84+
}
85+
86+
@Test
87+
public void shouldIncreaseBatchCountAfterEachBatch() {
88+
89+
systemUnderTest.setBatchSize(2);
90+
91+
systemUnderTest.process("1");
92+
systemUnderTest.process("2");
93+
systemUnderTest.process("3");
94+
systemUnderTest.process("4");
95+
96+
assertThat(systemUnderTest.getBatchCount())
97+
.isEqualTo(2);
98+
}
99+
100+
@Test
101+
public void shouldResetCountsOnResetStream() {
102+
103+
systemUnderTest.setBatchSize(2);
104+
105+
systemUnderTest.process("1");
106+
systemUnderTest.process("2");
107+
systemUnderTest.process("3");
108+
systemUnderTest.resetStream();
109+
110+
assertThat(systemUnderTest.getBatchCount())
111+
.isZero();
112+
assertThat(systemUnderTest.getObjectCount())
113+
.isZero();
114+
}
115+
116+
@Rule
117+
public MockitoRule mockitoRule = MockitoJUnit.rule();
118+
119+
@Mock
120+
private ObjectReceiver<String> receiver;
121+
122+
}

0 commit comments

Comments
 (0)