Skip to content

Commit b764cab

Browse files
committed
Fix #140: Added RecordReader for reading records from a reader.
The RecordReader class reads data from a Reader and splits it at a user-defined character.
1 parent 12e44d7 commit b764cab

File tree

2 files changed

+334
-0
lines changed

2 files changed

+334
-0
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright 2013 Christoph Böhme
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.culturegraph.mf.stream.converter;
17+
18+
import java.io.IOException;
19+
import java.io.Reader;
20+
21+
import org.culturegraph.mf.exceptions.MetafactureException;
22+
import org.culturegraph.mf.framework.DefaultObjectPipe;
23+
import org.culturegraph.mf.framework.ObjectReceiver;
24+
import org.culturegraph.mf.framework.annotations.Description;
25+
import org.culturegraph.mf.framework.annotations.In;
26+
import org.culturegraph.mf.framework.annotations.Out;
27+
28+
/**
29+
* <p>Reads data from a {@code Reader} and splits it into individual
30+
* records.</p>
31+
*
32+
* <p>The default separator is the new line character (0x0a). Empty
33+
* records are skipped by default.</p>
34+
*
35+
* @author Christoph Böhme
36+
*
37+
*/
38+
@Description("Reads data from a Reader and splits it into individual records")
39+
@In(Reader.class)
40+
@Out(String.class)
41+
public final class RecordReader extends
42+
DefaultObjectPipe<Reader, ObjectReceiver<String>> {
43+
44+
public static final char DEFAULT_SEPARATOR = '\n';
45+
46+
private static final int BUFFER_SIZE = 1024 * 1024 * 16;
47+
48+
private final StringBuilder builder = new StringBuilder();
49+
private final char[] buffer = new char[BUFFER_SIZE];
50+
51+
private char separator = DEFAULT_SEPARATOR;
52+
private boolean skipEmptyRecords = true;
53+
54+
public void setSeparator(final char separator) {
55+
this.separator = separator;
56+
}
57+
58+
public char getSeparator() {
59+
return separator;
60+
}
61+
62+
public void setSkipEmptyRecords(final boolean skipEmptyRecords) {
63+
this.skipEmptyRecords = skipEmptyRecords;
64+
}
65+
66+
public boolean getSkipEmptyRecords() {
67+
return skipEmptyRecords;
68+
}
69+
70+
@Override
71+
public void process(final Reader reader) {
72+
assert !isClosed();
73+
74+
try {
75+
boolean nothingRead = true;
76+
int size;
77+
while ((size = reader.read(buffer)) != -1) {
78+
nothingRead = false;
79+
int offset = 0;
80+
for (int i = 0; i < size; ++i) {
81+
if (buffer[i] == separator) {
82+
builder.append(buffer, offset, i - offset);
83+
offset = i + 1;
84+
emitRecord();
85+
}
86+
}
87+
builder.append(buffer, offset, size - offset);
88+
}
89+
if (!nothingRead) {
90+
emitRecord();
91+
}
92+
93+
} catch (final IOException e) {
94+
throw new MetafactureException(e);
95+
}
96+
}
97+
98+
private void emitRecord() {
99+
final String record = builder.toString();
100+
if (!skipEmptyRecords || !record.isEmpty()) {
101+
getReceiver().process(record);
102+
builder.delete(0, builder.length());
103+
}
104+
}
105+
106+
}
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/*
2+
* Copyright 2013 Christoph Böhme
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.culturegraph.mf.stream.converter;
17+
18+
import static org.mockito.Mockito.inOrder;
19+
import static org.mockito.Mockito.times;
20+
import static org.mockito.Mockito.verifyNoMoreInteractions;
21+
import static org.mockito.Mockito.verifyZeroInteractions;
22+
23+
import java.io.StringReader;
24+
25+
import org.culturegraph.mf.framework.ObjectReceiver;
26+
import org.junit.After;
27+
import org.junit.Before;
28+
import org.junit.Test;
29+
import org.mockito.InOrder;
30+
import org.mockito.Mock;
31+
import org.mockito.MockitoAnnotations;
32+
33+
/**
34+
* Tests for {@link RecordReader}.
35+
*
36+
* @author Christoph Böhme
37+
*
38+
*/
39+
public final class RecordReaderTest {
40+
41+
private static final String RECORD1 = "record1";
42+
private static final String RECORD2 = "record2";
43+
private static final String EMPTY_RECORD = "";
44+
private static final char SEPARATOR = ':';
45+
private static final char DEFAULT_SEPARATOR = '\n';
46+
47+
private RecordReader recordReader;
48+
49+
@Mock
50+
private ObjectReceiver<String> receiver;
51+
52+
@Before
53+
public void setup() {
54+
MockitoAnnotations.initMocks(this);
55+
recordReader = new RecordReader();
56+
recordReader.setReceiver(receiver);
57+
}
58+
59+
@After
60+
public void cleanup() {
61+
recordReader.closeStream();
62+
}
63+
64+
@Test
65+
public void testShouldProcessRecordsFollowedbySeparator() {
66+
recordReader.setSeparator(SEPARATOR);
67+
68+
recordReader.process(new StringReader(
69+
RECORD1 + SEPARATOR +
70+
RECORD2 + SEPARATOR));
71+
72+
final InOrder ordered = inOrder(receiver);
73+
ordered.verify(receiver).process(RECORD1);
74+
ordered.verify(receiver).process(RECORD2);
75+
verifyNoMoreInteractions(receiver);
76+
}
77+
78+
@Test
79+
public void testShouldProcessRecordsPrecededbySeparator() {
80+
recordReader.setSeparator(SEPARATOR);
81+
82+
recordReader.process(new StringReader(
83+
SEPARATOR + RECORD1 +
84+
SEPARATOR + RECORD2));
85+
86+
final InOrder ordered = inOrder(receiver);
87+
ordered.verify(receiver).process(RECORD1);
88+
ordered.verify(receiver).process(RECORD2);
89+
verifyNoMoreInteractions(receiver);
90+
}
91+
92+
@Test
93+
public void testShouldProcessRecordsSeparatedBySeparator() {
94+
recordReader.setSeparator(SEPARATOR);
95+
96+
recordReader.process(new StringReader(
97+
RECORD1 + SEPARATOR +
98+
RECORD2));
99+
100+
final InOrder ordered = inOrder(receiver);
101+
ordered.verify(receiver).process(RECORD1);
102+
ordered.verify(receiver).process(RECORD2);
103+
verifyNoMoreInteractions(receiver);
104+
}
105+
106+
@Test
107+
public void testShouldProcessSingleRecordWithoutSeparator() {
108+
recordReader.setSeparator(SEPARATOR);
109+
110+
recordReader.process(new StringReader(
111+
RECORD1));
112+
113+
final InOrder ordered = inOrder(receiver);
114+
ordered.verify(receiver).process(RECORD1);
115+
verifyNoMoreInteractions(receiver);
116+
}
117+
118+
@Test
119+
public void testShouldNotEmitRecordIfInputIsEmpty() {
120+
recordReader.setSeparator(SEPARATOR);
121+
// Make sure empty records are
122+
// normally emitted:
123+
recordReader.setSkipEmptyRecords(false);
124+
125+
recordReader.process(new StringReader(
126+
EMPTY_RECORD));
127+
128+
verifyZeroInteractions(receiver);
129+
}
130+
131+
@Test
132+
public void testShouldSkipEmptyRecordsByDefault() {
133+
recordReader.setSeparator(SEPARATOR);
134+
135+
recordReader.process(new StringReader(
136+
RECORD1 + SEPARATOR +
137+
EMPTY_RECORD + SEPARATOR +
138+
RECORD2));
139+
140+
final InOrder ordered = inOrder(receiver);
141+
ordered.verify(receiver).process(RECORD1);
142+
ordered.verify(receiver).process(RECORD2);
143+
verifyNoMoreInteractions(receiver);
144+
}
145+
146+
@Test
147+
public void testShouldOutputEmptyRecordsIfConfigured() {
148+
recordReader.setSeparator(SEPARATOR);
149+
recordReader.setSkipEmptyRecords(false);
150+
151+
recordReader.process(new StringReader(
152+
RECORD1 + SEPARATOR +
153+
EMPTY_RECORD + SEPARATOR +
154+
RECORD2));
155+
156+
final InOrder ordered = inOrder(receiver);
157+
ordered.verify(receiver).process(RECORD1);
158+
ordered.verify(receiver).process(EMPTY_RECORD);
159+
ordered.verify(receiver).process(RECORD2);
160+
verifyNoMoreInteractions(receiver);
161+
}
162+
163+
@Test
164+
public void testShouldOutputEmptyRecordsAtStartOfInputIfConfigured() {
165+
recordReader.setSeparator(SEPARATOR);
166+
recordReader.setSkipEmptyRecords(false);
167+
168+
recordReader.process(new StringReader(
169+
EMPTY_RECORD + SEPARATOR +
170+
RECORD1 + SEPARATOR +
171+
RECORD2));
172+
173+
final InOrder ordered = inOrder(receiver);
174+
ordered.verify(receiver).process(EMPTY_RECORD);
175+
ordered.verify(receiver).process(RECORD1);
176+
ordered.verify(receiver).process(RECORD2);
177+
verifyNoMoreInteractions(receiver);
178+
}
179+
180+
@Test
181+
public void testShouldOutputEmptyRecordsAtEndOfInputIfConfigured() {
182+
recordReader.setSeparator(SEPARATOR);
183+
recordReader.setSkipEmptyRecords(false);
184+
185+
recordReader.process(new StringReader(
186+
RECORD1 + SEPARATOR +
187+
RECORD2 + SEPARATOR +
188+
EMPTY_RECORD));
189+
190+
final InOrder ordered = inOrder(receiver);
191+
ordered.verify(receiver).process(RECORD1);
192+
ordered.verify(receiver).process(RECORD2);
193+
ordered.verify(receiver).process(EMPTY_RECORD);
194+
verifyNoMoreInteractions(receiver);
195+
}
196+
197+
@Test
198+
public void testShouldUseNewLineAsDefaultSeparator() {
199+
recordReader.process(new StringReader(
200+
RECORD1 + DEFAULT_SEPARATOR +
201+
RECORD2 + DEFAULT_SEPARATOR));
202+
203+
final InOrder ordered = inOrder(receiver);
204+
ordered.verify(receiver).process(RECORD1);
205+
ordered.verify(receiver).process(RECORD2);
206+
verifyNoMoreInteractions(receiver);
207+
}
208+
209+
@Test
210+
public void testShouldProcessMultipleReaders() {
211+
recordReader.setSeparator(SEPARATOR);
212+
213+
recordReader.process(new StringReader(
214+
RECORD1 + SEPARATOR +
215+
RECORD2));
216+
recordReader.process(new StringReader(
217+
RECORD2 + SEPARATOR +
218+
RECORD1));
219+
220+
final InOrder ordered = inOrder(receiver);
221+
ordered.verify(receiver).process(RECORD1);
222+
ordered.verify(receiver, times(2)).process(RECORD2);
223+
ordered.verify(receiver).process(RECORD1);
224+
verifyNoMoreInteractions(receiver);
225+
226+
}
227+
228+
}

0 commit comments

Comments
 (0)