Skip to content

Commit a99a34d

Browse files
committed
Fix emitting record when stream is closed
By implementing the interfaces instead of extending DefaultObjectPipe the "closeStream" method can be declared, pushing the record to the downstream module. - add test Complements #296.
1 parent 76eb9fd commit a99a34d

File tree

2 files changed

+71
-23
lines changed

2 files changed

+71
-23
lines changed
Lines changed: 56 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
/*
2-
* Copyright 2019 hbz
1+
/* Copyright 2019 Pascal Christoph (hbz)
32
*
43
* Licensed under the Apache License, Version 2.0 the "License";
54
* you may not use this file except in compliance with the License.
@@ -13,46 +12,81 @@
1312
* See the License for the specific language governing permissions and
1413
* limitations under the License.
1514
*/
15+
1616
package org.metafacture.strings;
1717

1818
import org.metafacture.framework.FluxCommand;
19+
import org.metafacture.framework.ObjectPipe;
1920
import org.metafacture.framework.ObjectReceiver;
2021
import org.metafacture.framework.annotations.Description;
2122
import org.metafacture.framework.annotations.In;
2223
import org.metafacture.framework.annotations.Out;
23-
import org.metafacture.framework.helpers.DefaultObjectPipe;
2424

2525
/**
2626
* Collects strings and emits them as records when a line matches the pattern.
27-
* Appends to every incoming line a line feed so that the original structure is
28-
* preserved.
27+
* Appends to every incoming line a line feed so that the original structure
28+
* is preserved.
2929
*
3030
* @author Pascal Christoph (dr0i).
3131
*
3232
*/
33-
@Description("Collects strings and emits them as records when a line matches the pattern.")
34-
@In(String.class)
35-
@Out(String.class)
36-
@FluxCommand("lines-to-records")
33+
@Description ( "Collects strings and emits them as records when a"
34+
+" line matches the pattern or the stream is closed." )
35+
@In ( String.class )
36+
@Out ( String.class )
37+
@FluxCommand ( "lines-to-records" )
3738
public final class LineRecorder
38-
extends DefaultObjectPipe<String, ObjectReceiver<String>> {
39+
implements ObjectPipe<String,ObjectReceiver<String>> {
40+
41+
private final static int SB_CAPACITY=4096*7;
42+
// empty line is the default
43+
private String recordMarkerRegexp="^\\s*$";
44+
StringBuilder record=new StringBuilder(
45+
SB_CAPACITY);
46+
ObjectReceiver<String> receiver;
47+
48+
public void setRecordMarkerRegexp ( final String regexp ) {
49+
recordMarkerRegexp=regexp;
50+
}
51+
52+
@Override
53+
public void process ( final String line ) {
54+
if(line.matches(
55+
recordMarkerRegexp)){
56+
getReceiver().process(
57+
record.toString());
58+
record=new StringBuilder(
59+
SB_CAPACITY);
60+
}else
61+
record.append(
62+
line+"\n");
63+
}
3964

40-
private final int SB_CAPACITY = 4096 * 7;
41-
private String recordMarkerRegexp = "^\\s*$"; // empty line is default
42-
StringBuilder record = new StringBuilder(SB_CAPACITY);
65+
@Override
66+
public void resetStream ( ) {
67+
record=new StringBuilder(
68+
SB_CAPACITY);
69+
}
4370

44-
public void setRecordMarkerRegexp(final String regexp) {
45-
this.recordMarkerRegexp = regexp;
71+
@Override
72+
public void closeStream ( ) {
73+
getReceiver().process(
74+
record.toString());
4675
}
4776

4877
@Override
49-
public void process(final String line) {
50-
assert !isClosed();
51-
if (line.matches(recordMarkerRegexp)) {
52-
getReceiver().process(record.toString());
53-
record = new StringBuilder(SB_CAPACITY);
54-
} else
55-
record.append(line + "\n");
78+
public <R extends ObjectReceiver<String>> R setReceiver ( R receiver ) {
79+
this.receiver=receiver;
80+
return receiver;
81+
}
82+
83+
/**
84+
* Returns a reference to the downstream module.
85+
*
86+
* @return reference to the downstream module
87+
*/
88+
protected final ObjectReceiver<String> getReceiver ( ) {
89+
return receiver;
5690
}
5791

5892
}

metafacture-strings/src/test/java/org/metafacture/strings/LineRecorderTest.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 Pascal Christoph
2+
* Copyright 2019 Pascal Christoph (hbz)
33
*
44
* Licensed under the Apache License, Version 2.0 the "License";
55
* you may not use this file except in compliance with the License.
@@ -99,4 +99,18 @@ public void shouldEmitRecordWithNonDefaultRecordMarker() {
9999
ordered.verifyNoMoreInteractions();
100100
}
101101

102+
@Test
103+
public void shouldEmitLastRecordWithoutRecordMarkerWhenClosingStream() {
104+
lineRecorder.process(RECORD3_PART1);
105+
lineRecorder.process(RECORD3_PART2);
106+
lineRecorder.closeStream();
107+
final InOrder ordered = inOrder(receiver);
108+
ordered.verify(receiver).process(
109+
RECORD3_PART1 +
110+
LINE_SEPARATOR +
111+
RECORD3_PART2 +
112+
LINE_SEPARATOR);
113+
ordered.verifyNoMoreInteractions();
114+
}
115+
102116
}

0 commit comments

Comments
 (0)