Skip to content

Commit f792995

Browse files
author
mgeipel
committed
fixed #7
1 parent 5757cbb commit f792995

File tree

2 files changed

+158
-154
lines changed

2 files changed

+158
-154
lines changed

src/main/java/org/culturegraph/mf/stream/pipe/AbstractBatcher.java

Lines changed: 81 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -13,79 +13,84 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package org.culturegraph.mf.stream.pipe;
17-
18-
import org.culturegraph.mf.framework.StreamReceiver;
19-
20-
/**
21-
* Base class for pipes that perform an action every N records.
22-
*
23-
* @author Markus Geipel
24-
*
25-
*/
26-
public abstract class AbstractBatcher
27-
extends WrappingStreamPipe<StreamReceiver> {
28-
29-
public static final long DEFAULT_BATCH_SIZE = 1000;
30-
31-
private long batchSize = DEFAULT_BATCH_SIZE;
32-
private long recordCount;
33-
private long batchCount;
34-
35-
public final void setBatchSize(final int batchSize) {
36-
this.batchSize = batchSize;
37-
}
38-
39-
public final long getBatchSize() {
40-
return batchSize;
41-
}
42-
43-
public final long getBatchCount() {
44-
return batchCount;
45-
}
46-
47-
public final long getRecordCount() {
48-
return recordCount;
49-
}
50-
51-
@Override
52-
public final void startRecord(final String identifier) {
53-
getInternalReceiver().startRecord(identifier);
54-
}
55-
56-
@Override
57-
public final void endRecord() {
58-
getInternalReceiver().endRecord();
59-
60-
++recordCount;
61-
recordCount %= batchSize;
62-
if (0 == recordCount) {
63-
++batchCount;
64-
onBatchComplete();
65-
}
66-
}
67-
68-
@Override
69-
public final void startEntity(final String name) {
70-
getInternalReceiver().startEntity(name);
71-
}
72-
73-
@Override
74-
public final void endEntity() {
75-
getInternalReceiver().endEntity();
76-
}
77-
78-
@Override
79-
public final void literal(final String name, final String value) {
80-
getInternalReceiver().literal(name, value);
81-
}
82-
83-
@Override
84-
protected void onResetStream() {
85-
recordCount = 0;
86-
batchCount = 0;
87-
}
88-
89-
protected abstract void onBatchComplete();
90-
91-
}
16+
package org.culturegraph.mf.stream.pipe;
17+
18+
import org.culturegraph.mf.framework.StreamReceiver;
19+
20+
/**
21+
* Base class for pipes that perform an action every N records.
22+
*
23+
* @author Markus Geipel
24+
*
25+
*/
26+
public abstract class AbstractBatcher
27+
extends WrappingStreamPipe<StreamReceiver> {
28+
29+
public static final long DEFAULT_BATCH_SIZE = 1000;
30+
31+
private long batchSize = DEFAULT_BATCH_SIZE;
32+
private long recordCount;
33+
private long batchCount;
34+
35+
public final void setBatchSize(final int batchSize) {
36+
this.batchSize = batchSize;
37+
}
38+
39+
public final long getBatchSize() {
40+
return batchSize;
41+
}
42+
43+
public final long getBatchCount() {
44+
return batchCount;
45+
}
46+
47+
public final long getRecordCount() {
48+
return recordCount;
49+
}
50+
51+
@Override
52+
public final void startRecord(final String identifier) {
53+
getInternalReceiver().startRecord(identifier);
54+
}
55+
56+
@Override
57+
public final void endRecord() {
58+
getInternalReceiver().endRecord();
59+
60+
++recordCount;
61+
recordCount %= batchSize;
62+
if (0 == recordCount) {
63+
++batchCount;
64+
onBatchComplete();
65+
}
66+
}
67+
68+
@Override
69+
public final void startEntity(final String name) {
70+
getInternalReceiver().startEntity(name);
71+
}
72+
73+
@Override
74+
public final void endEntity() {
75+
getInternalReceiver().endEntity();
76+
}
77+
78+
@Override
79+
public final void literal(final String name, final String value) {
80+
getInternalReceiver().literal(name, value);
81+
}
82+
83+
@Override
84+
protected final void onResetStream() {
85+
recordCount = 0;
86+
batchCount = 0;
87+
onReset();
88+
}
89+
90+
protected void onReset(){
91+
//default: do nothing
92+
}
93+
94+
protected abstract void onBatchComplete();
95+
96+
}

src/main/java/org/culturegraph/mf/stream/pipe/StreamBatchMerger.java

Lines changed: 77 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -13,81 +13,80 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package org.culturegraph.mf.stream.pipe;
17-
18-
import org.culturegraph.mf.framework.DefaultStreamReceiver;
19-
import org.culturegraph.mf.framework.StreamReceiver;
20-
import org.culturegraph.mf.framework.annotations.Description;
21-
import org.culturegraph.mf.framework.annotations.In;
22-
import org.culturegraph.mf.framework.annotations.Out;
23-
24-
/**
25-
* Merges a sequence of {@code batchSize} records. On a close-stream event,
26-
* a record containing less than {@code batchSize} source records may be
27-
* created.
28-
*
29-
* @author Christoph Böhme
30-
*
31-
*/
32-
@Description("Merges a sequence of batchSize records")
33-
@In(StreamReceiver.class)
34-
@Out(StreamReceiver.class)
35-
public final class StreamBatchMerger extends AbstractBatcher {
36-
37-
private boolean inRecord;
38-
39-
public StreamBatchMerger() {
40-
super();
41-
setInternalReceiver(new Merger());
42-
}
43-
44-
@Override
45-
protected void onBatchComplete() {
46-
getReceiver().endRecord();
47-
inRecord = false;
48-
}
49-
50-
@Override
51-
protected void onResetStream() {
52-
super.onResetStream();
53-
inRecord = false;
54-
}
55-
56-
@Override
57-
protected void onCloseStream() {
58-
if (inRecord) {
59-
onBatchComplete();
60-
}
61-
}
62-
63-
/**
64-
* Helper class for merging.
65-
*/
66-
private final class Merger extends DefaultStreamReceiver {
67-
68-
@Override
69-
public void startRecord(final String identifier) {
70-
if (!inRecord) {
71-
getReceiver().startRecord(identifier);
72-
inRecord = true;
73-
}
74-
}
75-
76-
@Override
77-
public void startEntity(final String name) {
78-
getReceiver().startEntity(name);
79-
}
80-
81-
@Override
82-
public void endEntity() {
83-
getReceiver().endEntity();
84-
}
85-
86-
@Override
87-
public void literal(final String name, final String value) {
88-
getReceiver().literal(name, value);
89-
}
90-
91-
}
92-
93-
}
16+
package org.culturegraph.mf.stream.pipe;
17+
18+
import org.culturegraph.mf.framework.DefaultStreamReceiver;
19+
import org.culturegraph.mf.framework.StreamReceiver;
20+
import org.culturegraph.mf.framework.annotations.Description;
21+
import org.culturegraph.mf.framework.annotations.In;
22+
import org.culturegraph.mf.framework.annotations.Out;
23+
24+
/**
25+
* Merges a sequence of {@code batchSize} records. On a close-stream event,
26+
* a record containing less than {@code batchSize} source records may be
27+
* created.
28+
*
29+
* @author Christoph Böhme
30+
*
31+
*/
32+
@Description("Merges a sequence of batchSize records")
33+
@In(StreamReceiver.class)
34+
@Out(StreamReceiver.class)
35+
public final class StreamBatchMerger extends AbstractBatcher {
36+
37+
private boolean inRecord;
38+
39+
public StreamBatchMerger() {
40+
super();
41+
setInternalReceiver(new Merger());
42+
}
43+
44+
@Override
45+
protected void onBatchComplete() {
46+
getReceiver().endRecord();
47+
inRecord = false;
48+
}
49+
50+
@Override
51+
protected void onReset() {
52+
inRecord = false;
53+
}
54+
55+
@Override
56+
protected void onCloseStream() {
57+
if (inRecord) {
58+
onBatchComplete();
59+
}
60+
}
61+
62+
/**
63+
* Helper class for merging.
64+
*/
65+
private final class Merger extends DefaultStreamReceiver {
66+
67+
@Override
68+
public void startRecord(final String identifier) {
69+
if (!inRecord) {
70+
getReceiver().startRecord(identifier);
71+
inRecord = true;
72+
}
73+
}
74+
75+
@Override
76+
public void startEntity(final String name) {
77+
getReceiver().startEntity(name);
78+
}
79+
80+
@Override
81+
public void endEntity() {
82+
getReceiver().endEntity();
83+
}
84+
85+
@Override
86+
public void literal(final String name, final String value) {
87+
getReceiver().literal(name, value);
88+
}
89+
90+
}
91+
92+
}

0 commit comments

Comments
 (0)