Skip to content

Commit f1f9c5f

Browse files
committed
Factor out CharsetDecoder, fix bugs, add feature flag to README
1 parent 0acebf2 commit f1f9c5f

File tree

3 files changed

+100
-56
lines changed

3 files changed

+100
-56
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ Optional features:
131131

132132
- `avro`: support for reading the [Apache Avro] format
133133
- `backtrace`: include backtrace information in error messages
134+
- `encoding_rs`: support for reading files with character encodings other than UTF-8
134135
- `parquet_encryption`: support for using [Parquet Modular Encryption]
135136
- `serde`: enable arrow-schema's `serde` feature
136137

datafusion/datasource-csv/src/charset.rs

Lines changed: 95 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -41,53 +41,43 @@ pub fn lookup_charset(enc: Option<&str>) -> Result<Option<&'static Encoding>> {
4141
}
4242
}
4343

44-
/// A `Decoder` that decodes input bytes from the specified character encoding
45-
/// to UTF-8 before passing them onto the inner `Decoder`.
46-
pub struct CharsetDecoder<T> {
44+
/// A record batch `Decoder` that decodes input bytes from the specified
45+
/// character encoding to UTF-8 before passing them onto the inner `Decoder`.
46+
#[derive(Debug)]
47+
pub struct CharsetBatchDecoder<T> {
4748
inner: T,
48-
charset_decoder: encoding_rs::Decoder,
49-
buffer: Buffer,
49+
decoder: CharsetDecoder,
5050
}
5151

52-
impl<T> CharsetDecoder<T> {
52+
impl<T> CharsetBatchDecoder<T> {
5353
pub fn new(inner: T, encoding: &'static Encoding) -> Self {
54-
Self {
55-
inner,
56-
charset_decoder: encoding.new_decoder(),
57-
buffer: Buffer::with_capacity(DECODE_BUFFER_CAP),
58-
}
54+
let decoder = CharsetDecoder::new(encoding);
55+
Self { inner, decoder }
5956
}
6057
}
6158

62-
impl<T: Decoder> Decoder for CharsetDecoder<T> {
59+
impl<T: Decoder> Decoder for CharsetBatchDecoder<T> {
6360
fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
6461
let last = buf.is_empty();
6562
let mut buf_offset = 0;
6663

67-
if !self.buffer.is_empty() {
68-
let decoded = self.inner.decode(self.buffer.read_buf())?;
69-
self.buffer.consume(decoded);
64+
if !self.decoder.is_empty() {
65+
let decoded = self.inner.decode(self.decoder.read())?;
66+
self.decoder.consume(decoded);
7067

7168
if decoded == 0 {
7269
return Ok(buf_offset);
7370
}
7471
}
7572

7673
loop {
77-
self.buffer.backshift();
78-
79-
let (res, read, written, _) = self.charset_decoder.decode_to_utf8(
80-
&buf[buf_offset..],
81-
self.buffer.write_buf(),
82-
last,
83-
);
74+
let (read, input_empty) = self.decoder.fill(&buf[buf_offset..], last);
8475
buf_offset += read;
85-
self.buffer.advance(written);
8676

87-
let decoded = self.inner.decode(self.buffer.read_buf())?;
88-
self.buffer.consume(decoded);
77+
let decoded = self.inner.decode(self.decoder.read())?;
78+
self.decoder.consume(decoded);
8979

90-
if res == CoderResult::InputEmpty || decoded == 0 {
80+
if input_empty || decoded == 0 {
9181
break;
9282
}
9383
}
@@ -104,28 +94,18 @@ impl<T: Decoder> Decoder for CharsetDecoder<T> {
10494
}
10595
}
10696

107-
impl<T: Debug> Debug for CharsetDecoder<T> {
108-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109-
f.debug_struct("CharsetDecoder")
110-
.field("inner", &self.inner)
111-
.field("charset_decoder", self.charset_decoder.encoding())
112-
.finish()
113-
}
114-
}
115-
97+
/// A `BufRead` adapter that decodes input bytes from the
98+
/// specified character encoding to UTF-8.
99+
#[derive(Debug)]
116100
pub struct CharsetReader<R> {
117101
inner: R,
118-
charset_decoder: encoding_rs::Decoder,
119-
buffer: Buffer,
102+
decoder: CharsetDecoder,
120103
}
121104

122105
impl<R: BufRead> CharsetReader<R> {
123106
pub fn new(inner: R, encoding: &'static Encoding) -> Self {
124-
Self {
125-
inner,
126-
charset_decoder: encoding.new_decoder(),
127-
buffer: Buffer::with_capacity(DECODE_BUFFER_CAP),
128-
}
107+
let decoder = CharsetDecoder::new(encoding);
108+
Self { inner, decoder }
129109
}
130110
}
131111

@@ -140,27 +120,90 @@ impl<R: BufRead> Read for CharsetReader<R> {
140120

141121
impl<R: BufRead> BufRead for CharsetReader<R> {
142122
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
143-
if self.buffer.is_empty() {
144-
self.buffer.backshift();
145-
123+
if self.decoder.needs_input() {
146124
let buf = self.inner.fill_buf()?;
147-
let (_, read, written, _) = self.charset_decoder.decode_to_utf8(
148-
buf,
149-
self.buffer.write_buf(),
150-
buf.is_empty(),
151-
);
125+
let (read, _) = self.decoder.fill(buf, buf.is_empty());
152126
self.inner.consume(read);
153-
self.buffer.advance(written);
154127
}
155128

156-
Ok(self.buffer.read_buf())
129+
Ok(self.decoder.read())
130+
}
131+
132+
fn consume(&mut self, amount: usize) {
133+
self.decoder.consume(amount);
134+
}
135+
}
136+
137+
/// Converts bytes from some character encoding to UTF-8,
138+
/// using an internal fixed-size buffer
139+
pub struct CharsetDecoder {
140+
charset_decoder: encoding_rs::Decoder,
141+
buffer: Buffer,
142+
finished: bool,
143+
}
144+
145+
impl CharsetDecoder {
146+
/// Creates a new `CharsetDecoder`.
147+
fn new(encoding: &'static Encoding) -> Self {
148+
Self {
149+
charset_decoder: encoding.new_decoder(),
150+
buffer: Buffer::with_capacity(DECODE_BUFFER_CAP),
151+
finished: false,
152+
}
153+
}
154+
155+
/// Returns `true` if the internal buffer is empty.
156+
fn is_empty(&self) -> bool {
157+
self.buffer.is_empty()
158+
}
159+
160+
/// Returns `true` if the decoder needs more input to make progress.
161+
fn needs_input(&self) -> bool {
162+
!self.finished && self.buffer.is_empty()
163+
}
164+
165+
/// Fills the internal buffer by decoding the provided bytes, returning
166+
/// the number of bytes consumed and whether the input was exhausted.
167+
fn fill(&mut self, src: &[u8], last: bool) -> (usize, bool) {
168+
if self.finished {
169+
return (0, true);
170+
}
171+
172+
self.buffer.backshift();
173+
174+
let dst = self.buffer.write_buf();
175+
let (res, read, written, _) = self.charset_decoder.decode_to_utf8(src, dst, last);
176+
self.buffer.advance(written);
177+
178+
if last && res == CoderResult::InputEmpty {
179+
self.finished = true;
180+
}
181+
182+
(read, res == CoderResult::InputEmpty)
183+
}
184+
185+
/// Returns the unread decoded bytes in the internal buffer.
186+
fn read(&self) -> &[u8] {
187+
self.buffer.read_buf()
157188
}
158189

190+
/// Marks the given amount of bytes from the internal buffer as having been read.
191+
/// Subsequent calls to `read` only return bytes that have not been marked as read.
159192
fn consume(&mut self, amount: usize) {
160193
self.buffer.consume(amount);
161194
}
162195
}
163196

197+
impl Debug for CharsetDecoder {
198+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199+
f.debug_struct("CharsetDecoder")
200+
.field("charset_decoder", self.charset_decoder.encoding())
201+
.field("buffer", &self.buffer)
202+
.field("finished", &self.finished)
203+
.finish()
204+
}
205+
}
206+
164207
mod buffer {
165208
/// A fixed-sized buffer that maintains both
166209
/// a read position and a write position

datafusion/datasource-csv/src/source.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -415,8 +415,8 @@ impl FileOpener for CsvOpener {
415415
let mut reader = match charset {
416416
#[cfg(feature = "encoding_rs")]
417417
Some(enc) => {
418-
use crate::charset::CharsetDecoder;
419-
let decoder = CharsetDecoder::new(decoder, enc);
418+
use crate::charset::CharsetBatchDecoder;
419+
let decoder = CharsetBatchDecoder::new(decoder, enc);
420420
deserialize_reader(reader, decoder)
421421
}
422422
None => deserialize_reader(reader, decoder),
@@ -440,8 +440,8 @@ impl FileOpener for CsvOpener {
440440
let stream = match charset {
441441
#[cfg(feature = "encoding_rs")]
442442
Some(enc) => {
443-
use crate::charset::CharsetDecoder;
444-
let decoder = CharsetDecoder::new(decoder, enc);
443+
use crate::charset::CharsetBatchDecoder;
444+
let decoder = CharsetBatchDecoder::new(decoder, enc);
445445
deserialize_stream(stream, DecoderDeserializer::new(decoder))
446446
}
447447
None => {

0 commit comments

Comments
 (0)