Skip to content

Commit 53ab3ca

Browse files
committed
Merge pull request #142 from cboehme/issue-140-record-reader
Fix #140: Added RecordReader for reading records from a reader.
2 parents 37c0502 + 79e5aea commit 53ab3ca

File tree

3 files changed

+343
-0
lines changed

3 files changed

+343
-0
lines changed
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright 2013 Christoph Böhme
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.culturegraph.mf.stream.converter;
17+
18+
import java.io.IOException;
19+
import java.io.Reader;
20+
21+
import org.culturegraph.mf.exceptions.MetafactureException;
22+
import org.culturegraph.mf.framework.DefaultObjectPipe;
23+
import org.culturegraph.mf.framework.ObjectReceiver;
24+
import org.culturegraph.mf.framework.annotations.Description;
25+
import org.culturegraph.mf.framework.annotations.In;
26+
import org.culturegraph.mf.framework.annotations.Out;
27+
28+
/**
29+
* <p>Reads data from a {@code Reader} and splits it into individual
30+
* records.</p>
31+
*
32+
* <p>The default separator is the global separator character (0x001d).
33+
* Empty records are skipped by default.</p>
34+
*
35+
* @author Christoph Böhme
36+
*
37+
*/
38+
@Description("Reads data from a Reader and splits it into individual records")
39+
@In(Reader.class)
40+
@Out(String.class)
41+
public final class RecordReader extends
42+
DefaultObjectPipe<Reader, ObjectReceiver<String>> {
43+
44+
public static final char DEFAULT_SEPARATOR = '\u001d';
45+
46+
private static final int BUFFER_SIZE = 1024 * 1024 * 16;
47+
48+
private final StringBuilder builder = new StringBuilder();
49+
private final char[] buffer = new char[BUFFER_SIZE];
50+
51+
private char separator = DEFAULT_SEPARATOR;
52+
private boolean skipEmptyRecords = true;
53+
54+
public void setSeparator(final String separator) {
55+
if (separator.length() >= 1) {
56+
this.separator = separator.charAt(0);
57+
} else {
58+
this.separator = DEFAULT_SEPARATOR;
59+
}
60+
}
61+
62+
public void setSeparator(final char separator) {
63+
this.separator = separator;
64+
}
65+
66+
public char getSeparator() {
67+
return separator;
68+
}
69+
70+
public void setSkipEmptyRecords(final boolean skipEmptyRecords) {
71+
this.skipEmptyRecords = skipEmptyRecords;
72+
}
73+
74+
public boolean getSkipEmptyRecords() {
75+
return skipEmptyRecords;
76+
}
77+
78+
@Override
79+
public void process(final Reader reader) {
80+
assert !isClosed();
81+
82+
try {
83+
boolean nothingRead = true;
84+
int size;
85+
while ((size = reader.read(buffer)) != -1) {
86+
nothingRead = false;
87+
int offset = 0;
88+
for (int i = 0; i < size; ++i) {
89+
if (buffer[i] == separator) {
90+
builder.append(buffer, offset, i - offset);
91+
offset = i + 1;
92+
emitRecord();
93+
}
94+
}
95+
builder.append(buffer, offset, size - offset);
96+
}
97+
if (!nothingRead) {
98+
emitRecord();
99+
}
100+
101+
} catch (final IOException e) {
102+
throw new MetafactureException(e);
103+
}
104+
}
105+
106+
private void emitRecord() {
107+
final String record = builder.toString();
108+
if (!skipEmptyRecords || !record.isEmpty()) {
109+
getReceiver().process(record);
110+
builder.delete(0, builder.length());
111+
}
112+
}
113+
114+
}

src/main/resources/flux-commands.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ calculate-metrics org.culturegraph.mf.stream.pipe.stat.CooccurrenceMetricCalcula
2121
jscript org.culturegraph.mf.stream.pipe.JScriptObjectPipe
2222

2323
as-lines org.culturegraph.mf.stream.converter.LineReader
24+
as-records org.culturegraph.mf.stream.converter.RecordReader
2425

2526
# Decoders:
2627
decode-pica org.culturegraph.mf.stream.converter.bib.PicaDecoder
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/*
2+
* Copyright 2013 Christoph Böhme
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.culturegraph.mf.stream.converter;
17+
18+
import static org.mockito.Mockito.inOrder;
19+
import static org.mockito.Mockito.times;
20+
import static org.mockito.Mockito.verifyNoMoreInteractions;
21+
import static org.mockito.Mockito.verifyZeroInteractions;
22+
23+
import java.io.StringReader;
24+
25+
import org.culturegraph.mf.framework.ObjectReceiver;
26+
import org.junit.After;
27+
import org.junit.Before;
28+
import org.junit.Test;
29+
import org.mockito.InOrder;
30+
import org.mockito.Mock;
31+
import org.mockito.MockitoAnnotations;
32+
33+
/**
34+
* Tests for {@link RecordReader}.
35+
*
36+
* @author Christoph Böhme
37+
*
38+
*/
39+
public final class RecordReaderTest {
40+
41+
private static final String RECORD1 = "record1";
42+
private static final String RECORD2 = "record2";
43+
private static final String EMPTY_RECORD = "";
44+
private static final char SEPARATOR = ':';
45+
private static final char DEFAULT_SEPARATOR = '\u001d';
46+
47+
private RecordReader recordReader;
48+
49+
@Mock
50+
private ObjectReceiver<String> receiver;
51+
52+
@Before
53+
public void setup() {
54+
MockitoAnnotations.initMocks(this);
55+
recordReader = new RecordReader();
56+
recordReader.setReceiver(receiver);
57+
}
58+
59+
@After
60+
public void cleanup() {
61+
recordReader.closeStream();
62+
}
63+
64+
@Test
65+
public void testShouldProcessRecordsFollowedbySeparator() {
66+
recordReader.setSeparator(SEPARATOR);
67+
68+
recordReader.process(new StringReader(
69+
RECORD1 + SEPARATOR +
70+
RECORD2 + SEPARATOR));
71+
72+
final InOrder ordered = inOrder(receiver);
73+
ordered.verify(receiver).process(RECORD1);
74+
ordered.verify(receiver).process(RECORD2);
75+
verifyNoMoreInteractions(receiver);
76+
}
77+
78+
@Test
79+
public void testShouldProcessRecordsPrecededbySeparator() {
80+
recordReader.setSeparator(SEPARATOR);
81+
82+
recordReader.process(new StringReader(
83+
SEPARATOR + RECORD1 +
84+
SEPARATOR + RECORD2));
85+
86+
final InOrder ordered = inOrder(receiver);
87+
ordered.verify(receiver).process(RECORD1);
88+
ordered.verify(receiver).process(RECORD2);
89+
verifyNoMoreInteractions(receiver);
90+
}
91+
92+
@Test
93+
public void testShouldProcessRecordsSeparatedBySeparator() {
94+
recordReader.setSeparator(SEPARATOR);
95+
96+
recordReader.process(new StringReader(
97+
RECORD1 + SEPARATOR +
98+
RECORD2));
99+
100+
final InOrder ordered = inOrder(receiver);
101+
ordered.verify(receiver).process(RECORD1);
102+
ordered.verify(receiver).process(RECORD2);
103+
verifyNoMoreInteractions(receiver);
104+
}
105+
106+
@Test
107+
public void testShouldProcessSingleRecordWithoutSeparator() {
108+
recordReader.setSeparator(SEPARATOR);
109+
110+
recordReader.process(new StringReader(
111+
RECORD1));
112+
113+
final InOrder ordered = inOrder(receiver);
114+
ordered.verify(receiver).process(RECORD1);
115+
verifyNoMoreInteractions(receiver);
116+
}
117+
118+
@Test
119+
public void testShouldNotEmitRecordIfInputIsEmpty() {
120+
recordReader.setSeparator(SEPARATOR);
121+
// Make sure empty records are
122+
// normally emitted:
123+
recordReader.setSkipEmptyRecords(false);
124+
125+
recordReader.process(new StringReader(
126+
EMPTY_RECORD));
127+
128+
verifyZeroInteractions(receiver);
129+
}
130+
131+
@Test
132+
public void testShouldSkipEmptyRecordsByDefault() {
133+
recordReader.setSeparator(SEPARATOR);
134+
135+
recordReader.process(new StringReader(
136+
RECORD1 + SEPARATOR +
137+
EMPTY_RECORD + SEPARATOR +
138+
RECORD2));
139+
140+
final InOrder ordered = inOrder(receiver);
141+
ordered.verify(receiver).process(RECORD1);
142+
ordered.verify(receiver).process(RECORD2);
143+
verifyNoMoreInteractions(receiver);
144+
}
145+
146+
@Test
147+
public void testShouldOutputEmptyRecordsIfConfigured() {
148+
recordReader.setSeparator(SEPARATOR);
149+
recordReader.setSkipEmptyRecords(false);
150+
151+
recordReader.process(new StringReader(
152+
RECORD1 + SEPARATOR +
153+
EMPTY_RECORD + SEPARATOR +
154+
RECORD2));
155+
156+
final InOrder ordered = inOrder(receiver);
157+
ordered.verify(receiver).process(RECORD1);
158+
ordered.verify(receiver).process(EMPTY_RECORD);
159+
ordered.verify(receiver).process(RECORD2);
160+
verifyNoMoreInteractions(receiver);
161+
}
162+
163+
@Test
164+
public void testShouldOutputEmptyRecordsAtStartOfInputIfConfigured() {
165+
recordReader.setSeparator(SEPARATOR);
166+
recordReader.setSkipEmptyRecords(false);
167+
168+
recordReader.process(new StringReader(
169+
EMPTY_RECORD + SEPARATOR +
170+
RECORD1 + SEPARATOR +
171+
RECORD2));
172+
173+
final InOrder ordered = inOrder(receiver);
174+
ordered.verify(receiver).process(EMPTY_RECORD);
175+
ordered.verify(receiver).process(RECORD1);
176+
ordered.verify(receiver).process(RECORD2);
177+
verifyNoMoreInteractions(receiver);
178+
}
179+
180+
@Test
181+
public void testShouldOutputEmptyRecordsAtEndOfInputIfConfigured() {
182+
recordReader.setSeparator(SEPARATOR);
183+
recordReader.setSkipEmptyRecords(false);
184+
185+
recordReader.process(new StringReader(
186+
RECORD1 + SEPARATOR +
187+
RECORD2 + SEPARATOR +
188+
EMPTY_RECORD));
189+
190+
final InOrder ordered = inOrder(receiver);
191+
ordered.verify(receiver).process(RECORD1);
192+
ordered.verify(receiver).process(RECORD2);
193+
ordered.verify(receiver).process(EMPTY_RECORD);
194+
verifyNoMoreInteractions(receiver);
195+
}
196+
197+
@Test
198+
public void testShouldUseGlobalSeparatorAsDefaultSeparator() {
199+
recordReader.process(new StringReader(
200+
RECORD1 + DEFAULT_SEPARATOR +
201+
RECORD2 + DEFAULT_SEPARATOR));
202+
203+
final InOrder ordered = inOrder(receiver);
204+
ordered.verify(receiver).process(RECORD1);
205+
ordered.verify(receiver).process(RECORD2);
206+
verifyNoMoreInteractions(receiver);
207+
}
208+
209+
@Test
210+
public void testShouldProcessMultipleReaders() {
211+
recordReader.setSeparator(SEPARATOR);
212+
213+
recordReader.process(new StringReader(
214+
RECORD1 + SEPARATOR +
215+
RECORD2));
216+
recordReader.process(new StringReader(
217+
RECORD2 + SEPARATOR +
218+
RECORD1));
219+
220+
final InOrder ordered = inOrder(receiver);
221+
ordered.verify(receiver).process(RECORD1);
222+
ordered.verify(receiver, times(2)).process(RECORD2);
223+
ordered.verify(receiver).process(RECORD1);
224+
verifyNoMoreInteractions(receiver);
225+
226+
}
227+
228+
}

0 commit comments

Comments
 (0)