Skip to content

Commit 4c31fbb

Browse files
authored
[format] Introduce text format table (#6879)
1 parent c38eede commit 4c31fbb

File tree

21 files changed

+587
-149
lines changed

21 files changed

+587
-149
lines changed

docs/content/concepts/spec/fileformat.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,33 @@ The following table lists the type mapping from Paimon type to CSV type.
506506
</tbody>
507507
</table>
508508

509+
## TEXT
510+
511+
Experimental feature, not recommended for production.
512+
513+
Format Options:
514+
515+
<table class="table table-bordered">
516+
<thead>
517+
<tr>
518+
<th class="text-left" style="width: 25%">Option</th>
519+
<th class="text-center" style="width: 7%">Default</th>
520+
<th class="text-center" style="width: 10%">Type</th>
521+
<th class="text-center" style="width: 42%">Description</th>
522+
</tr>
523+
</thead>
524+
<tbody>
525+
<tr>
526+
<td><h5>text.line-delimiter</h5></td>
527+
<td style="word-wrap: break-word;"><code>\n</code></td>
528+
<td>String</td>
529+
<td>The line delimiter for TEXT format</td>
530+
</tr>
531+
</tbody>
532+
</table>
533+
534+
The Paimon text table contains only one field, and it is of string type.
535+
509536
## JSON
510537

511538
Experimental feature, not recommended for production.

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ public InlineElement getDescription() {
230230
public static final String FILE_FORMAT_AVRO = "avro";
231231
public static final String FILE_FORMAT_PARQUET = "parquet";
232232
public static final String FILE_FORMAT_CSV = "csv";
233+
public static final String FILE_FORMAT_TEXT = "text";
233234
public static final String FILE_FORMAT_JSON = "json";
234235

235236
public static final ConfigOption<String> FILE_FORMAT =
@@ -2379,6 +2380,7 @@ public String formatTableFileCompression() {
23792380
case FILE_FORMAT_ORC:
23802381
return "zstd";
23812382
case FILE_FORMAT_CSV:
2383+
case FILE_FORMAT_TEXT:
23822384
case FILE_FORMAT_JSON:
23832385
return "none";
23842386
default:

paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ enum Format {
7676
ORC,
7777
PARQUET,
7878
CSV,
79+
TEXT,
7980
JSON
8081
}
8182

paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.paimon.format.csv;
2020

2121
import org.apache.paimon.data.InternalRow;
22-
import org.apache.paimon.format.text.TextFileReader;
22+
import org.apache.paimon.format.text.AbstractTextFileReader;
2323
import org.apache.paimon.fs.FileIO;
2424
import org.apache.paimon.fs.Path;
2525
import org.apache.paimon.types.RowType;
@@ -29,7 +29,7 @@
2929
import java.io.IOException;
3030

3131
/** CSV file reader implementation. */
32-
public class CsvFileReader extends TextFileReader {
32+
public class CsvFileReader extends AbstractTextFileReader {
3333

3434
private final boolean includeHeader;
3535
private final CsvParser csvParser;

paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.paimon.casting.CastExecutor;
2222
import org.apache.paimon.casting.CastExecutors;
2323
import org.apache.paimon.data.InternalRow;
24-
import org.apache.paimon.format.text.TextFileWriter;
24+
import org.apache.paimon.format.text.AbstractTextFileWriter;
2525
import org.apache.paimon.fs.PositionOutputStream;
2626
import org.apache.paimon.types.DataType;
2727
import org.apache.paimon.types.DataTypeRoot;
@@ -33,7 +33,7 @@
3333
import java.util.concurrent.ConcurrentHashMap;
3434

3535
/** CSV format writer implementation. */
36-
public class CsvFormatWriter extends TextFileWriter {
36+
public class CsvFormatWriter extends AbstractTextFileWriter {
3737

3838
private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
3939
// Performance optimization: Cache frequently used cast executors

paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.paimon.data.GenericMap;
2626
import org.apache.paimon.data.GenericRow;
2727
import org.apache.paimon.data.InternalRow;
28-
import org.apache.paimon.format.text.TextFileReader;
28+
import org.apache.paimon.format.text.AbstractTextFileReader;
2929
import org.apache.paimon.fs.FileIO;
3030
import org.apache.paimon.fs.Path;
3131
import org.apache.paimon.types.ArrayType;
@@ -49,7 +49,7 @@
4949
import java.util.Map;
5050

5151
/** JSON file reader. */
52-
public class JsonFileReader extends TextFileReader {
52+
public class JsonFileReader extends AbstractTextFileReader {
5353

5454
private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
5555

paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.paimon.data.InternalArray;
2525
import org.apache.paimon.data.InternalMap;
2626
import org.apache.paimon.data.InternalRow;
27-
import org.apache.paimon.format.text.TextFileWriter;
27+
import org.apache.paimon.format.text.AbstractTextFileWriter;
2828
import org.apache.paimon.fs.PositionOutputStream;
2929
import org.apache.paimon.types.ArrayType;
3030
import org.apache.paimon.types.DataField;
@@ -45,7 +45,7 @@
4545
import java.util.Map;
4646

4747
/** Json format writer implementation. */
48-
public class JsonFormatWriter extends TextFileWriter {
48+
public class JsonFormatWriter extends AbstractTextFileWriter {
4949

5050
private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
5151

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.format.text;
20+
21+
import org.apache.paimon.data.InternalRow;
22+
import org.apache.paimon.fs.FileIO;
23+
import org.apache.paimon.fs.Path;
24+
import org.apache.paimon.reader.FileRecordIterator;
25+
import org.apache.paimon.reader.FileRecordReader;
26+
import org.apache.paimon.types.RowType;
27+
28+
import javax.annotation.Nullable;
29+
30+
import java.io.IOException;
31+
import java.io.InputStream;
32+
33+
import static org.apache.paimon.format.text.HadoopCompressionUtils.createDecompressedInputStream;
34+
35+
/** Base class for text-based file readers that provides common functionality. */
36+
public abstract class AbstractTextFileReader implements FileRecordReader<InternalRow> {
37+
38+
private final Path filePath;
39+
private final TextRecordIterator reader;
40+
41+
protected final RowType rowType;
42+
protected final long offset;
43+
protected final TextLineReader lineReader;
44+
45+
protected boolean readerClosed = false;
46+
47+
protected AbstractTextFileReader(
48+
FileIO fileIO,
49+
Path filePath,
50+
RowType rowType,
51+
String delimiter,
52+
long offset,
53+
@Nullable Long length)
54+
throws IOException {
55+
this.filePath = filePath;
56+
this.rowType = rowType;
57+
this.offset = offset;
58+
InputStream decompressedStream =
59+
createDecompressedInputStream(fileIO.newInputStream(filePath), filePath);
60+
this.lineReader = TextLineReader.create(decompressedStream, delimiter, offset, length);
61+
this.reader = new TextRecordIterator();
62+
}
63+
64+
/**
65+
* Parses a single line of text into an InternalRow. Subclasses must implement this method to
66+
* handle their specific format.
67+
*/
68+
@Nullable
69+
protected abstract InternalRow parseLine(String line) throws IOException;
70+
71+
/**
72+
* Performs any additional setup before reading records. Subclasses can override this method if
73+
* they need to perform setup operations like skipping headers.
74+
*/
75+
protected void setupReading() throws IOException {
76+
// Default implementation does nothing
77+
}
78+
79+
@Override
80+
@Nullable
81+
public FileRecordIterator<InternalRow> readBatch() throws IOException {
82+
if (readerClosed) {
83+
return null;
84+
}
85+
86+
// Perform any setup needed before reading
87+
setupReading();
88+
89+
if (reader.end) {
90+
return null;
91+
}
92+
return reader;
93+
}
94+
95+
@Override
96+
public void close() throws IOException {
97+
if (!readerClosed) {
98+
if (lineReader != null) {
99+
lineReader.close();
100+
}
101+
readerClosed = true;
102+
}
103+
}
104+
105+
/** Record iterator for text-based file readers. */
106+
private class TextRecordIterator implements FileRecordIterator<InternalRow> {
107+
108+
protected long currentPosition = 0;
109+
protected boolean end = false;
110+
111+
@Override
112+
public InternalRow next() throws IOException {
113+
while (true) {
114+
if (readerClosed) {
115+
return null;
116+
}
117+
String nextLine = readLine();
118+
if (nextLine == null) {
119+
end = true;
120+
return null;
121+
}
122+
123+
currentPosition++;
124+
InternalRow row = parseLine(nextLine);
125+
if (row != null) {
126+
return row;
127+
}
128+
}
129+
}
130+
131+
@Override
132+
public void releaseBatch() {
133+
// Default implementation does nothing
134+
}
135+
136+
@Override
137+
public Path filePath() {
138+
return filePath;
139+
}
140+
141+
@Override
142+
public long returnedPosition() {
143+
if (offset > 0) {
144+
throw new UnsupportedOperationException(
145+
"Cannot return position with reading offset.");
146+
}
147+
return Math.max(0, currentPosition - 1);
148+
}
149+
}
150+
151+
/**
152+
* Reads a single line from the input stream, using either the default line delimiter or a
153+
* custom delimiter.
154+
*
155+
* <p>This method supports multi-character custom delimiters by using a simple pattern matching
156+
* algorithm. For standard delimiters (null or empty), it delegates to BufferedReader's
157+
* readLine() for optimal performance.
158+
*
159+
* <p>The algorithm maintains a partial match index and accumulates bytes until:
160+
*
161+
* <ul>
162+
* <li>A complete delimiter is found (returns line without delimiter)
163+
* <li>End of stream is reached (returns accumulated data or null if empty)
164+
* <li>Maximum line length is exceeded (throws IOException)
165+
* </ul>
166+
*
167+
* @return the next line as a string (without delimiter), or null if end of stream
168+
* @throws IOException if an I/O error occurs or line exceeds maximum length
169+
*/
170+
protected String readLine() throws IOException {
171+
return lineReader.readLine();
172+
}
173+
}

paimon-format/src/main/java/org/apache/paimon/format/text/TextFileWriter.java renamed to paimon-format/src/main/java/org/apache/paimon/format/text/AbstractTextFileWriter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,14 @@
3131
import java.nio.charset.StandardCharsets;
3232

3333
/** Base class for text-based format writers that provides common functionality. */
34-
public abstract class TextFileWriter implements FormatWriter {
34+
public abstract class AbstractTextFileWriter implements FormatWriter {
3535

3636
protected final PositionOutputStream outputStream;
3737
protected final BufferedWriter writer;
3838
protected final RowType rowType;
3939

40-
protected TextFileWriter(PositionOutputStream outputStream, RowType rowType, String compression)
40+
protected AbstractTextFileWriter(
41+
PositionOutputStream outputStream, RowType rowType, String compression)
4142
throws IOException {
4243
this.outputStream = outputStream;
4344
OutputStream compressedStream =

0 commit comments

Comments
 (0)