Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion-examples/examples/csv_json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ async fn json_opener() -> Result<()> {
projected,
FileCompressionType::UNCOMPRESSED,
Arc::new(object_store),
false,
);

let scan_config = FileScanConfigBuilder::new(
Expand Down
17 changes: 17 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2813,6 +2813,23 @@ config_namespace! {
pub struct JsonOptions {
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
pub schema_infer_max_rec: Option<usize>, default = None
pub compression_level: Option<usize>, default = None
/// The format of JSON input files.
///
/// When `false` (default), expects newline-delimited JSON (NDJSON):
/// ```text
/// {"key1": 1, "key2": "val"}
/// {"key1": 2, "key2": "vals"}
/// ```
///
/// When `true`, expects JSON array format:
/// ```text
/// [
/// {"key1": 1, "key2": "val"},
/// {"key1": 2, "key2": "vals"}
/// ]
/// ```
pub format_array: bool, default = false
}
}

Expand Down
313 changes: 313 additions & 0 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ mod tests {
use datafusion_common::stats::Precision;

use datafusion_common::Result;
use datafusion_datasource::file_compression_type::FileCompressionType;
use futures::StreamExt;
use insta::assert_snapshot;
use object_store::local::LocalFileSystem;
Expand Down Expand Up @@ -349,4 +350,316 @@ mod tests {
fn fmt_batches(batches: &[RecordBatch]) -> String {
pretty::pretty_format_batches(batches).unwrap().to_string()
}

#[tokio::test]
async fn test_write_empty_json_from_sql() -> Result<()> {
let ctx = SessionContext::new();
let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/empty_sql.json", tmp_dir.path().to_string_lossy());
let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?;
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
.await?;
// Expected the file to exist and be empty
assert!(std::path::Path::new(&path).exists());
let metadata = std::fs::metadata(&path)?;
assert_eq!(metadata.len(), 0);
Ok(())
}

#[tokio::test]
async fn test_write_empty_json_from_record_batch() -> Result<()> {
let ctx = SessionContext::new();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
]));
let empty_batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow::array::Int64Array::from(Vec::<i64>::new())),
Arc::new(arrow::array::StringArray::from(Vec::<Option<&str>>::new())),
],
)?;

let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/empty_batch.json", tmp_dir.path().to_string_lossy());
let df = ctx.read_batch(empty_batch.clone())?;
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
.await?;
// Expected the file to exist and be empty
assert!(std::path::Path::new(&path).exists());
let metadata = std::fs::metadata(&path)?;
assert_eq!(metadata.len(), 0);
Ok(())
}

#[tokio::test]
async fn test_json_array_format() -> Result<()> {
let session = SessionContext::new();
let ctx = session.state();
let store = Arc::new(LocalFileSystem::new()) as _;

// Create a temporary file with JSON array format
let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
std::fs::write(
&path,
r#"[
{"a": 1, "b": 2.0, "c": true},
{"a": 2, "b": 3.5, "c": false},
{"a": 3, "b": 4.0, "c": true}
]"#,
)?;

// Test with format_array = true
let format = JsonFormat::default().with_format_array(true);
let file_schema = format
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
.await
.expect("Schema inference");

let fields = file_schema
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect::<Vec<_>>();
assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields);

Ok(())
}

#[tokio::test]
async fn test_json_array_format_empty() -> Result<()> {
let session = SessionContext::new();
let ctx = session.state();
let store = Arc::new(LocalFileSystem::new()) as _;

let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/empty_array.json", tmp_dir.path().to_string_lossy());
std::fs::write(&path, "[]")?;

let format = JsonFormat::default().with_format_array(true);
let result = format
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
.await;

assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("JSON array is empty"));

Ok(())
}

#[tokio::test]
async fn test_json_array_format_with_limit() -> Result<()> {
let session = SessionContext::new();
let ctx = session.state();
let store = Arc::new(LocalFileSystem::new()) as _;

let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/array_limit.json", tmp_dir.path().to_string_lossy());
std::fs::write(
&path,
r#"[
{"a": 1},
{"a": 2, "b": "extra"}
]"#,
)?;

// Only infer from first record
let format = JsonFormat::default()
.with_format_array(true)
.with_schema_infer_max_rec(1);

let file_schema = format
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
.await
.expect("Schema inference");

// Should only have field "a" since we limited to 1 record
let fields = file_schema
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect::<Vec<_>>();
assert_eq!(vec!["a: Int64"], fields);

Ok(())
}

#[tokio::test]
async fn test_json_array_format_read_data() -> Result<()> {
let session = SessionContext::new();
let ctx = session.state();
let task_ctx = ctx.task_ctx();
let store = Arc::new(LocalFileSystem::new()) as _;

// Create a temporary file with JSON array format
let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
std::fs::write(
&path,
r#"[
{"a": 1, "b": 2.0, "c": true},
{"a": 2, "b": 3.5, "c": false},
{"a": 3, "b": 4.0, "c": true}
]"#,
)?;

let format = JsonFormat::default().with_format_array(true);

// Infer schema
let file_schema = format
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
.await?;

// Scan and read data
let exec = scan_format(
&ctx,
&format,
Some(file_schema),
tmp_dir.path().to_str().unwrap(),
"array.json",
None,
None,
)
.await?;
let batches = collect(exec, task_ctx).await?;

assert_eq!(1, batches.len());
assert_eq!(3, batches[0].num_columns());
assert_eq!(3, batches[0].num_rows());

// Verify data
let array_a = as_int64_array(batches[0].column(0))?;
assert_eq!(
vec![1, 2, 3],
(0..3).map(|i| array_a.value(i)).collect::<Vec<_>>()
);

Ok(())
}

#[tokio::test]
async fn test_json_array_format_with_projection() -> Result<()> {
let session = SessionContext::new();
let ctx = session.state();
let task_ctx = ctx.task_ctx();
let store = Arc::new(LocalFileSystem::new()) as _;

let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
std::fs::write(&path, r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#)?;

let format = JsonFormat::default().with_format_array(true);
let file_schema = format
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
.await?;

// Project only column "a"
let exec = scan_format(
&ctx,
&format,
Some(file_schema),
tmp_dir.path().to_str().unwrap(),
"array.json",
Some(vec![0]),
None,
)
.await?;
let batches = collect(exec, task_ctx).await?;

assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns()); // Only 1 column projected
assert_eq!(2, batches[0].num_rows());

Ok(())
}

#[tokio::test]
async fn test_ndjson_read_options_format_array() -> Result<()> {
let ctx = SessionContext::new();

// Create a temporary file with JSON array format
let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
std::fs::write(
&path,
r#"[
{"a": 1, "b": "hello"},
{"a": 2, "b": "world"},
{"a": 3, "b": "test"}
]"#,
)?;

// Use NdJsonReadOptions with format_array = true
let options = NdJsonReadOptions::default().format_array(true);

ctx.register_json("json_array_table", &path, options)
.await?;

let result = ctx
.sql("SELECT a, b FROM json_array_table ORDER BY a")
.await?
.collect()
.await?;

assert_snapshot!(batches_to_string(&result), @r"
+---+-------+
| a | b |
+---+-------+
| 1 | hello |
| 2 | world |
| 3 | test |
+---+-------+
");

Ok(())
}

#[tokio::test]
async fn test_ndjson_read_options_format_array_with_compression() -> Result<()> {
use flate2::write::GzEncoder;
use flate2::Compression;
use std::io::Write;

let ctx = SessionContext::new();

// Create a temporary gzip compressed JSON array file
let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/array.json.gz", tmp_dir.path().to_string_lossy());

let json_content = r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#;
let file = std::fs::File::create(&path)?;
let mut encoder = GzEncoder::new(file, Compression::default());
encoder.write_all(json_content.as_bytes())?;
encoder.finish()?;

// Use NdJsonReadOptions with format_array and GZIP compression
let options = NdJsonReadOptions::default()
.format_array(true)
.file_compression_type(FileCompressionType::GZIP)
.file_extension(".json.gz");

ctx.register_json("json_array_gzip", &path, options).await?;

let result = ctx
.sql("SELECT a, b FROM json_array_gzip ORDER BY a")
.await?
.collect()
.await?;

assert_snapshot!(batches_to_string(&result), @r"
+---+-------+
| a | b |
+---+-------+
| 1 | hello |
| 2 | world |
+---+-------+
");

Ok(())
}
}
Loading
Loading