Skip to content

Commit 72617a0

Browse files
committed
Call LineRecorder receiver's closeStream() (#433)
Under some circumstances (e.g. if the JVM is not stopped after processing) the streams of receivers connected to LineRecorder are not closed, resulting in empty or missing output data. By switching from "implements ObjectPipe" to "extends DefaultObjectPipe" in LineRecorde the "DefaultSender" is used which implements the Sender interface. This ensures the proper closing of streams. - complement test
1 parent 1ecaa75 commit 72617a0

File tree

2 files changed

+9
-26
lines changed

2 files changed

+9
-26
lines changed

metafacture-strings/src/main/java/org/metafacture/strings/LineRecorder.java

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
package org.metafacture.strings;
1717

1818
import org.metafacture.framework.FluxCommand;
19-
import org.metafacture.framework.ObjectPipe;
2019
import org.metafacture.framework.ObjectReceiver;
2120
import org.metafacture.framework.annotations.Description;
2221
import org.metafacture.framework.annotations.In;
2322
import org.metafacture.framework.annotations.Out;
23+
import org.metafacture.framework.helpers.DefaultObjectPipe;
2424

2525
/**
2626
* Collects strings and emits them as records when a line matches the pattern.
@@ -34,7 +34,7 @@
3434
@In(String.class)
3535
@Out(String.class)
3636
@FluxCommand("lines-to-records")
37-
public final class LineRecorder implements ObjectPipe<String, ObjectReceiver<String>> {
37+
public final class LineRecorder extends DefaultObjectPipe<String, ObjectReceiver<String>> {
3838

3939
private static final int SB_CAPACITY = 4096 * 7;
4040
// empty line is the default
@@ -70,34 +70,16 @@ record = new StringBuilder(SB_CAPACITY);
7070
}
7171
}
7272

73-
private boolean isClosed() {
74-
return isClosed;
75-
}
76-
7773
@Override
78-
public void resetStream() {
79-
record = new StringBuilder(SB_CAPACITY);
80-
}
81-
82-
@Override
83-
public void closeStream() {
84-
getReceiver().process(record.toString());
85-
isClosed = true;
74+
protected void onCloseStream() {
75+
if (record.length() > 0) {
76+
getReceiver().process(record.toString());
77+
}
8678
}
8779

8880
@Override
89-
public <R extends ObjectReceiver<String>> R setReceiver(final R newReceiver) {
90-
receiver = newReceiver;
91-
return newReceiver;
92-
}
93-
94-
/**
95-
* Returns a reference to the downstream module.
96-
*
97-
* @return reference to the downstream module
98-
*/
99-
protected ObjectReceiver<String> getReceiver() {
100-
return receiver;
81+
public void onResetStream() {
82+
record = new StringBuilder(SB_CAPACITY);
10183
}
10284

10385
}

metafacture-strings/src/test/java/org/metafacture/strings/LineRecorderTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ public void shouldEmitLastRecordWithoutRecordMarkerWhenClosingStream() {
111111
LINE_SEPARATOR +
112112
RECORD3_PART2 +
113113
LINE_SEPARATOR);
114+
ordered.verify(receiver).closeStream();
114115
ordered.verifyNoMoreInteractions();
115116
}
116117

0 commit comments

Comments
 (0)