2424
2525/**
2626 * Collects strings and emits them as records when a line matches the pattern.
27- * Appends to every incoming line a line feed so that the original structure
28- * is preserved.
27+ * Appends to every incoming line a line feed so that the original structure is
28+ * preserved.
2929 *
3030 * @author Pascal Christoph (dr0i).
3131 *
3232 */
33- @ Description ( "Collects strings and emits them as records when a"
34- +" line matches the pattern or the stream is closed." )
35- @ In ( String .class )
36- @ Out ( String .class )
37- @ FluxCommand ( "lines-to-records" )
38- public final class LineRecorder
39- implements ObjectPipe <String ,ObjectReceiver <String >> {
33+ @ Description ("Collects strings and emits them as records when a line matches the pattern or the stream is closed." )
34+ @ In (String .class )
35+ @ Out (String .class )
36+ @ FluxCommand ("lines-to-records" )
37+ public final class LineRecorder implements ObjectPipe <String , ObjectReceiver <String >> {
4038
41- private final static int SB_CAPACITY = 4096 * 7 ;
39+ private final static int SB_CAPACITY = 4096 * 7 ;
4240 // empty line is the default
43- private String recordMarkerRegexp = "^\\ s*$" ;
44- StringBuilder record = new StringBuilder (
45- SB_CAPACITY ) ;
46- ObjectReceiver < String > receiver ;
41+ private String recordMarkerRegexp = "^\\ s*$" ;
42+ private StringBuilder record = new StringBuilder (SB_CAPACITY );
43+ private ObjectReceiver < String > receiver ;
44+ private boolean isClosed = false ;
4745
48- public void setRecordMarkerRegexp ( final String regexp ) {
49- recordMarkerRegexp = regexp ;
46+ public void setRecordMarkerRegexp ( final String regexp ) {
47+ recordMarkerRegexp = regexp ;
5048 }
5149
5250 @ Override
53- public void process ( final String line ) {
54- if (line .matches (
55- recordMarkerRegexp )){
56- getReceiver ().process (
57- record .toString ());
58- record =new StringBuilder (
59- SB_CAPACITY );
60- }else
61- record .append (
62- line +"\n " );
51+ public void process (final String line ) {
52+ assert !isClosed ();
53+ if (line .matches (recordMarkerRegexp )) {
54+ getReceiver ().process (record .toString ());
55+ record = new StringBuilder (SB_CAPACITY );
56+ } else
57+ record .append (line + "\n " );
58+ }
59+
60+ private boolean isClosed () {
61+ return isClosed ;
6362 }
6463
6564 @ Override
66- public void resetStream ( ) {
67- record =new StringBuilder (
68- SB_CAPACITY );
65+ public void resetStream () {
66+ record = new StringBuilder (SB_CAPACITY );
6967 }
7068
7169 @ Override
72- public void closeStream ( ) {
73- getReceiver ().process (
74- record . toString ()) ;
70+ public void closeStream ( ) {
71+ getReceiver ().process (record . toString ());
72+ isClosed = true ;
7573 }
7674
7775 @ Override
78- public <R extends ObjectReceiver <String >> R setReceiver ( R receiver ) {
79- this .receiver = receiver ;
76+ public <R extends ObjectReceiver <String >> R setReceiver ( R receiver ) {
77+ this .receiver = receiver ;
8078 return receiver ;
8179 }
8280
@@ -85,7 +83,7 @@ public <R extends ObjectReceiver<String>> R setReceiver ( R receiver ) {
8583 *
8684 * @return reference to the downstream module
8785 */
88- protected final ObjectReceiver <String > getReceiver ( ) {
86+ protected final ObjectReceiver <String > getReceiver ( ) {
8987 return receiver ;
9088 }
9189
0 commit comments