Skip to content

Commit bf09a8b

Browse files
committed
[CSV-318] printRecord() hangs if fed a parallel stream
Add tests
1 parent 17b0f99 commit bf09a8b

File tree

1 file changed

+145
-0
lines changed

1 file changed

+145
-0
lines changed
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.commons.csv;
21+
22+
import static org.junit.jupiter.api.Assertions.assertEquals;
23+
24+
import java.io.ByteArrayOutputStream;
25+
import java.io.IOException;
26+
import java.io.PrintWriter;
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.stream.Stream;
30+
31+
import org.apache.commons.io.function.IOConsumer;
32+
import org.apache.commons.io.function.IOStream;
33+
import org.apache.commons.lang3.ArrayUtils;
34+
import org.junit.jupiter.api.Disabled;
35+
import org.junit.jupiter.api.Test;
36+
37+
/**
38+
* Tests https://issues.apache.org/jira/projects/CSV/issues/CSV-318?filter=allopenissues
39+
*
40+
* @see CSVPrinter
41+
*/
42+
public class JiraCsv318Test {
43+
44+
private void checkOutput(final ByteArrayOutputStream baos) {
45+
checkOutput(baos.toString());
46+
}
47+
48+
private void checkOutput(final String string) {
49+
assertEquals("col a,col b,col c", string.trim());
50+
}
51+
52+
private Stream<String> newParallelStream() {
53+
// returned stream is intermediate
54+
return newStream().parallel();
55+
}
56+
57+
private CSVPrinter newPrinter(final ByteArrayOutputStream baos) throws IOException {
58+
return new CSVPrinter(new PrintWriter(baos), CSVFormat.DEFAULT);
59+
}
60+
61+
private Stream<String> newSequentialStream() {
62+
// returned stream is intermediate
63+
return newStream().sequential();
64+
}
65+
66+
private Stream<String> newStream() {
67+
return Stream.of("col a", "col b", "col c");
68+
}
69+
70+
public synchronized void printRecord(final Stream<?> values) throws IOException {
71+
// IOStream.adapt(values).forEachOrdered(this::print);
72+
}
73+
74+
@Test
75+
void testDefaultStream() throws IOException {
76+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
77+
try (CSVPrinter printer = newPrinter(baos)) {
78+
printer.printRecord(newStream());
79+
}
80+
checkOutput(baos);
81+
}
82+
83+
@SuppressWarnings("resource")
84+
@Test
85+
void testParallelIOStream() throws IOException {
86+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
87+
try (CSVPrinter printer = newPrinter(baos)) {
88+
IOStream.adapt(newParallelStream()).forEachOrdered(printer::print);
89+
}
90+
// No EOR marker in this test intentionally, so checkOutput will trim.
91+
checkOutput(baos);
92+
}
93+
94+
@SuppressWarnings("resource")
95+
@Test
96+
@Disabled("Deadlock because CSVPrinter.print(Object) is synchronized")
97+
void testParallelIOStreamSynchronizedPrinter() throws IOException {
98+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
99+
try (CSVPrinter printer = newPrinter(baos)) {
100+
synchronized (printer) {
101+
IOStream.adapt(newParallelStream()).forEachOrdered(printer::print);
102+
}
103+
}
104+
// No EOR marker in this test intentionally, so checkOutput will trim.
105+
checkOutput(baos);
106+
}
107+
108+
@SuppressWarnings("resource")
109+
@Test
110+
void testParallelIOStreamSynchronizedPrinterNotUsed() throws IOException {
111+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
112+
try (CSVPrinter printer = newPrinter(baos)) {
113+
synchronized (printer) {
114+
IOStream.adapt(newParallelStream()).forEachOrdered(IOConsumer.noop());
115+
}
116+
}
117+
final List<String> list = new ArrayList<>();
118+
try (CSVPrinter printer = newPrinter(baos)) {
119+
synchronized (printer) {
120+
IOStream.adapt(newParallelStream()).forEachOrdered(list::add);
121+
}
122+
}
123+
// No EOR marker in this test intentionally, so checkOutput will trim.
124+
checkOutput(String.join(",", list.toArray(ArrayUtils.EMPTY_STRING_ARRAY)));
125+
}
126+
127+
@Test
128+
@Disabled("Deadlock because CSVPrinter.print(Object) is synchronized")
129+
void testParallelStream() throws IOException {
130+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
131+
try (CSVPrinter printer = newPrinter(baos)) {
132+
printer.printRecord(newParallelStream());
133+
}
134+
checkOutput(baos);
135+
}
136+
137+
@Test
138+
void testSequentialStream() throws IOException {
139+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
140+
try (CSVPrinter printer = newPrinter(baos)) {
141+
printer.printRecord(newSequentialStream());
142+
}
143+
checkOutput(baos);
144+
}
145+
}

0 commit comments

Comments
 (0)