Skip to content

Commit 8583685

Browse files
authored
Array json support for datafusion (#27)
1 parent b3385dd commit 8583685

File tree

18 files changed

+635
-30
lines changed

18 files changed

+635
-30
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-examples/examples/csv_json_opener.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ async fn json_opener() -> Result<()> {
121121
projected,
122122
FileCompressionType::UNCOMPRESSED,
123123
Arc::new(object_store),
124+
false,
124125
);
125126

126127
let scan_config = FileScanConfigBuilder::new(

datafusion/common/src/config.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2813,6 +2813,23 @@ config_namespace! {
28132813
pub struct JsonOptions {
28142814
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
28152815
pub schema_infer_max_rec: Option<usize>, default = None
2816+
pub compression_level: Option<usize>, default = None
2817+
/// The format of JSON input files.
2818+
///
2819+
/// When `false` (default), expects newline-delimited JSON (NDJSON):
2820+
/// ```text
2821+
/// {"key1": 1, "key2": "val"}
2822+
/// {"key1": 2, "key2": "vals"}
2823+
/// ```
2824+
///
2825+
/// When `true`, expects JSON array format:
2826+
/// ```text
2827+
/// [
2828+
/// {"key1": 1, "key2": "val"},
2829+
/// {"key1": 2, "key2": "vals"}
2830+
/// ]
2831+
/// ```
2832+
pub format_array: bool, default = false
28162833
}
28172834
}
28182835

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 313 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ mod tests {
4747
use datafusion_common::stats::Precision;
4848

4949
use datafusion_common::Result;
50+
use datafusion_datasource::file_compression_type::FileCompressionType;
5051
use futures::StreamExt;
5152
use insta::assert_snapshot;
5253
use object_store::local::LocalFileSystem;
@@ -349,4 +350,316 @@ mod tests {
349350
fn fmt_batches(batches: &[RecordBatch]) -> String {
350351
pretty::pretty_format_batches(batches).unwrap().to_string()
351352
}
353+
354+
#[tokio::test]
355+
async fn test_write_empty_json_from_sql() -> Result<()> {
356+
let ctx = SessionContext::new();
357+
let tmp_dir = tempfile::TempDir::new()?;
358+
let path = format!("{}/empty_sql.json", tmp_dir.path().to_string_lossy());
359+
let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?;
360+
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
361+
.await?;
362+
// Expected the file to exist and be empty
363+
assert!(std::path::Path::new(&path).exists());
364+
let metadata = std::fs::metadata(&path)?;
365+
assert_eq!(metadata.len(), 0);
366+
Ok(())
367+
}
368+
369+
#[tokio::test]
370+
async fn test_write_empty_json_from_record_batch() -> Result<()> {
371+
let ctx = SessionContext::new();
372+
let schema = Arc::new(Schema::new(vec![
373+
Field::new("id", DataType::Int64, false),
374+
Field::new("name", DataType::Utf8, true),
375+
]));
376+
let empty_batch = RecordBatch::try_new(
377+
schema.clone(),
378+
vec![
379+
Arc::new(arrow::array::Int64Array::from(Vec::<i64>::new())),
380+
Arc::new(arrow::array::StringArray::from(Vec::<Option<&str>>::new())),
381+
],
382+
)?;
383+
384+
let tmp_dir = tempfile::TempDir::new()?;
385+
let path = format!("{}/empty_batch.json", tmp_dir.path().to_string_lossy());
386+
let df = ctx.read_batch(empty_batch.clone())?;
387+
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
388+
.await?;
389+
// Expected the file to exist and be empty
390+
assert!(std::path::Path::new(&path).exists());
391+
let metadata = std::fs::metadata(&path)?;
392+
assert_eq!(metadata.len(), 0);
393+
Ok(())
394+
}
395+
396+
#[tokio::test]
397+
async fn test_json_array_format() -> Result<()> {
398+
let session = SessionContext::new();
399+
let ctx = session.state();
400+
let store = Arc::new(LocalFileSystem::new()) as _;
401+
402+
// Create a temporary file with JSON array format
403+
let tmp_dir = tempfile::TempDir::new()?;
404+
let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
405+
std::fs::write(
406+
&path,
407+
r#"[
408+
{"a": 1, "b": 2.0, "c": true},
409+
{"a": 2, "b": 3.5, "c": false},
410+
{"a": 3, "b": 4.0, "c": true}
411+
]"#,
412+
)?;
413+
414+
// Test with format_array = true
415+
let format = JsonFormat::default().with_format_array(true);
416+
let file_schema = format
417+
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
418+
.await
419+
.expect("Schema inference");
420+
421+
let fields = file_schema
422+
.fields()
423+
.iter()
424+
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
425+
.collect::<Vec<_>>();
426+
assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields);
427+
428+
Ok(())
429+
}
430+
431+
#[tokio::test]
432+
async fn test_json_array_format_empty() -> Result<()> {
433+
let session = SessionContext::new();
434+
let ctx = session.state();
435+
let store = Arc::new(LocalFileSystem::new()) as _;
436+
437+
let tmp_dir = tempfile::TempDir::new()?;
438+
let path = format!("{}/empty_array.json", tmp_dir.path().to_string_lossy());
439+
std::fs::write(&path, "[]")?;
440+
441+
let format = JsonFormat::default().with_format_array(true);
442+
let result = format
443+
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
444+
.await;
445+
446+
assert!(result.is_err());
447+
assert!(result
448+
.unwrap_err()
449+
.to_string()
450+
.contains("JSON array is empty"));
451+
452+
Ok(())
453+
}
454+
455+
#[tokio::test]
456+
async fn test_json_array_format_with_limit() -> Result<()> {
457+
let session = SessionContext::new();
458+
let ctx = session.state();
459+
let store = Arc::new(LocalFileSystem::new()) as _;
460+
461+
let tmp_dir = tempfile::TempDir::new()?;
462+
let path = format!("{}/array_limit.json", tmp_dir.path().to_string_lossy());
463+
std::fs::write(
464+
&path,
465+
r#"[
466+
{"a": 1},
467+
{"a": 2, "b": "extra"}
468+
]"#,
469+
)?;
470+
471+
// Only infer from first record
472+
let format = JsonFormat::default()
473+
.with_format_array(true)
474+
.with_schema_infer_max_rec(1);
475+
476+
let file_schema = format
477+
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
478+
.await
479+
.expect("Schema inference");
480+
481+
// Should only have field "a" since we limited to 1 record
482+
let fields = file_schema
483+
.fields()
484+
.iter()
485+
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
486+
.collect::<Vec<_>>();
487+
assert_eq!(vec!["a: Int64"], fields);
488+
489+
Ok(())
490+
}
491+
492+
#[tokio::test]
493+
async fn test_json_array_format_read_data() -> Result<()> {
494+
let session = SessionContext::new();
495+
let ctx = session.state();
496+
let task_ctx = ctx.task_ctx();
497+
let store = Arc::new(LocalFileSystem::new()) as _;
498+
499+
// Create a temporary file with JSON array format
500+
let tmp_dir = tempfile::TempDir::new()?;
501+
let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
502+
std::fs::write(
503+
&path,
504+
r#"[
505+
{"a": 1, "b": 2.0, "c": true},
506+
{"a": 2, "b": 3.5, "c": false},
507+
{"a": 3, "b": 4.0, "c": true}
508+
]"#,
509+
)?;
510+
511+
let format = JsonFormat::default().with_format_array(true);
512+
513+
// Infer schema
514+
let file_schema = format
515+
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
516+
.await?;
517+
518+
// Scan and read data
519+
let exec = scan_format(
520+
&ctx,
521+
&format,
522+
Some(file_schema),
523+
tmp_dir.path().to_str().unwrap(),
524+
"array.json",
525+
None,
526+
None,
527+
)
528+
.await?;
529+
let batches = collect(exec, task_ctx).await?;
530+
531+
assert_eq!(1, batches.len());
532+
assert_eq!(3, batches[0].num_columns());
533+
assert_eq!(3, batches[0].num_rows());
534+
535+
// Verify data
536+
let array_a = as_int64_array(batches[0].column(0))?;
537+
assert_eq!(
538+
vec![1, 2, 3],
539+
(0..3).map(|i| array_a.value(i)).collect::<Vec<_>>()
540+
);
541+
542+
Ok(())
543+
}
544+
545+
#[tokio::test]
546+
async fn test_json_array_format_with_projection() -> Result<()> {
547+
let session = SessionContext::new();
548+
let ctx = session.state();
549+
let task_ctx = ctx.task_ctx();
550+
let store = Arc::new(LocalFileSystem::new()) as _;
551+
552+
let tmp_dir = tempfile::TempDir::new()?;
553+
let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
554+
std::fs::write(&path, r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#)?;
555+
556+
let format = JsonFormat::default().with_format_array(true);
557+
let file_schema = format
558+
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
559+
.await?;
560+
561+
// Project only column "a"
562+
let exec = scan_format(
563+
&ctx,
564+
&format,
565+
Some(file_schema),
566+
tmp_dir.path().to_str().unwrap(),
567+
"array.json",
568+
Some(vec![0]),
569+
None,
570+
)
571+
.await?;
572+
let batches = collect(exec, task_ctx).await?;
573+
574+
assert_eq!(1, batches.len());
575+
assert_eq!(1, batches[0].num_columns()); // Only 1 column projected
576+
assert_eq!(2, batches[0].num_rows());
577+
578+
Ok(())
579+
}
580+
581+
#[tokio::test]
582+
async fn test_ndjson_read_options_format_array() -> Result<()> {
583+
let ctx = SessionContext::new();
584+
585+
// Create a temporary file with JSON array format
586+
let tmp_dir = tempfile::TempDir::new()?;
587+
let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
588+
std::fs::write(
589+
&path,
590+
r#"[
591+
{"a": 1, "b": "hello"},
592+
{"a": 2, "b": "world"},
593+
{"a": 3, "b": "test"}
594+
]"#,
595+
)?;
596+
597+
// Use NdJsonReadOptions with format_array = true
598+
let options = NdJsonReadOptions::default().format_array(true);
599+
600+
ctx.register_json("json_array_table", &path, options)
601+
.await?;
602+
603+
let result = ctx
604+
.sql("SELECT a, b FROM json_array_table ORDER BY a")
605+
.await?
606+
.collect()
607+
.await?;
608+
609+
assert_snapshot!(batches_to_string(&result), @r"
610+
+---+-------+
611+
| a | b |
612+
+---+-------+
613+
| 1 | hello |
614+
| 2 | world |
615+
| 3 | test |
616+
+---+-------+
617+
");
618+
619+
Ok(())
620+
}
621+
622+
#[tokio::test]
623+
async fn test_ndjson_read_options_format_array_with_compression() -> Result<()> {
624+
use flate2::write::GzEncoder;
625+
use flate2::Compression;
626+
use std::io::Write;
627+
628+
let ctx = SessionContext::new();
629+
630+
// Create a temporary gzip compressed JSON array file
631+
let tmp_dir = tempfile::TempDir::new()?;
632+
let path = format!("{}/array.json.gz", tmp_dir.path().to_string_lossy());
633+
634+
let json_content = r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#;
635+
let file = std::fs::File::create(&path)?;
636+
let mut encoder = GzEncoder::new(file, Compression::default());
637+
encoder.write_all(json_content.as_bytes())?;
638+
encoder.finish()?;
639+
640+
// Use NdJsonReadOptions with format_array and GZIP compression
641+
let options = NdJsonReadOptions::default()
642+
.format_array(true)
643+
.file_compression_type(FileCompressionType::GZIP)
644+
.file_extension(".json.gz");
645+
646+
ctx.register_json("json_array_gzip", &path, options).await?;
647+
648+
let result = ctx
649+
.sql("SELECT a, b FROM json_array_gzip ORDER BY a")
650+
.await?
651+
.collect()
652+
.await?;
653+
654+
assert_snapshot!(batches_to_string(&result), @r"
655+
+---+-------+
656+
| a | b |
657+
+---+-------+
658+
| 1 | hello |
659+
| 2 | world |
660+
+---+-------+
661+
");
662+
663+
Ok(())
664+
}
352665
}

0 commit comments

Comments
 (0)