Skip to content

Commit fe5a5b7

Browse files
committed
Emit reset-stream event after updating counts
1 parent 9c1d2f4 commit fe5a5b7

File tree

2 files changed

+18
-1
lines changed

2 files changed

+18
-1
lines changed

metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectBatchResetter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,9 @@ public void process(final T obj) {
9191

9292
objectCount += 1;
9393
if (objectCount >= batchSize) {
94-
getReceiver().resetStream();
9594
batchCount += 1;
9695
objectCount = 0;
96+
getReceiver().resetStream();
9797
}
9898
}
9999

metafacture-flowcontrol/src/test/java/org/metafacture/flowcontrol/ObjectBatchResetterTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.junit.Rule;
2424
import org.junit.Test;
2525
import org.metafacture.framework.ObjectReceiver;
26+
import org.metafacture.framework.helpers.DefaultObjectReceiver;
2627
import org.mockito.InOrder;
2728
import org.mockito.Mock;
2829
import org.mockito.junit.MockitoJUnit;
@@ -113,6 +114,22 @@ public void shouldResetCountsOnResetStream() {
113114
.isZero();
114115
}
115116

117+
@Test
118+
public void shouldEmitResetStreamEventAfterUpdatingCounts() {
119+
120+
systemUnderTest.setBatchSize(2);
121+
systemUnderTest.setReceiver(new DefaultObjectReceiver<String>() {
122+
@Override
123+
public void resetStream() {
124+
assertThat(systemUnderTest.getObjectCount()).isZero();
125+
assertThat(systemUnderTest.getBatchCount()).isEqualTo(1);
126+
}
127+
});
128+
129+
systemUnderTest.process("1");
130+
systemUnderTest.process("2");
131+
}
132+
116133
@Rule
117134
public MockitoRule mockitoRule = MockitoJUnit.rule();
118135

0 commit comments

Comments
 (0)