Skip to content

newline_delimited_stream incorrect processing Generic { store: "LineDelimiter", source: UnterminatedString } on valid CSVs (impacts datafusion) #650

@bboissin

Description

@bboissin

Describe the bug

We've had issue with datafusion schema inferrence which uses newline_delimited_stream to chunk the input stream.

It manifested as Generic { store: "LineDelimiter", source: UnterminatedString } errors on what was a valid CSV file (csv parser would be fine, but datafusion would fail due to the incorrect chunking).

To Reproduce

Minimal testcases that can be added to deliminated.rs:

   #[tokio::test]
    async fn test_delimiter_quotes_stream() {
        let input = vec!["x,y,z\n,\"new\nline\",\"with ", "space\""];
        let input_stream =
            futures_util::stream::iter(input.into_iter().map(|s| Ok(Bytes::from(s))));
        let stream = newline_delimited_stream(input_stream);

        let results: Vec<_> = stream.try_collect().await.unwrap();
        assert_eq!(
            results,
            vec![
                Bytes::from("x,y,z\n"),
                Bytes::from("\"new\nline\",\"with space\"")
            ]
        )
    }

    #[tokio::test]
    async fn test_delimiter_escape_stream() {
        let input = vec!["hello\n\n\"\\ttabulated\"", "world"];
        let input_stream =
            futures_util::stream::iter(input.into_iter().map(|s| Ok(Bytes::from(s))));
        let stream = newline_delimited_stream(input_stream);

        let results: Vec<_> = stream.try_collect().await.unwrap();
        assert_eq!(
            results,
            vec![
                Bytes::from("hello\n"),
                Bytes::from("\"\ttabulated\"\"world\n")
            ]
        )
    }

Additional context

Both bugs have the same root cause: records_end is a double ended iterator, so when we call next_back(), the escaping/quote logic is run in reverse instead of forward, and the internal state during the filter_map gets corrupted.

One obvious fix is to materialize all the split points into a Vec, e.g.:

        let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| {
            ...
        }).collect::<Vec<_>>();

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions