diff --git a/Cargo.lock b/Cargo.lock index d712eecfcc72e..2cfdd82851d67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2156,6 +2156,7 @@ dependencies = [ "datafusion-session", "futures", "object_store", + "serde_json", "tokio", ] diff --git a/datafusion-examples/examples/csv_json_opener.rs b/datafusion-examples/examples/csv_json_opener.rs index ef2a3eaca0c88..e7b809c82ed3d 100644 --- a/datafusion-examples/examples/csv_json_opener.rs +++ b/datafusion-examples/examples/csv_json_opener.rs @@ -121,6 +121,7 @@ async fn json_opener() -> Result<()> { projected, FileCompressionType::UNCOMPRESSED, Arc::new(object_store), + false, ); let scan_config = FileScanConfigBuilder::new( diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index a77fd764eea06..68022c2f06fe9 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -2813,6 +2813,23 @@ config_namespace! { pub struct JsonOptions { pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED pub schema_infer_max_rec: Option, default = None + pub compression_level: Option, default = None + /// The format of JSON input files. + /// + /// When `false` (default), expects newline-delimited JSON (NDJSON): + /// ```text + /// {"key1": 1, "key2": "val"} + /// {"key1": 2, "key2": "vals"} + /// ``` + /// + /// When `true`, expects JSON array format: + /// ```text + /// [ + /// {"key1": 1, "key2": "val"}, + /// {"key1": 2, "key2": "vals"} + /// ] + /// ``` + pub format_array: bool, default = false } } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 34d3d64f07fb2..cce8ead46cd78 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -47,6 +47,7 @@ mod tests { use datafusion_common::stats::Precision; use datafusion_common::Result; + use datafusion_datasource::file_compression_type::FileCompressionType; use futures::StreamExt; use insta::assert_snapshot; use object_store::local::LocalFileSystem; @@ -349,4 +350,316 @@ mod tests { fn fmt_batches(batches: &[RecordBatch]) -> String { pretty::pretty_format_batches(batches).unwrap().to_string() } + + #[tokio::test] + async fn test_write_empty_json_from_sql() -> Result<()> { + let ctx = SessionContext::new(); + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/empty_sql.json", tmp_dir.path().to_string_lossy()); + let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?; + df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None) + .await?; + // Expected the file to exist and be empty + assert!(std::path::Path::new(&path).exists()); + let metadata = std::fs::metadata(&path)?; + assert_eq!(metadata.len(), 0); + Ok(()) + } + + #[tokio::test] + async fn test_write_empty_json_from_record_batch() -> Result<()> { + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + ])); + let empty_batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(arrow::array::Int64Array::from(Vec::::new())), + Arc::new(arrow::array::StringArray::from(Vec::>::new())), + ], + )?; + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/empty_batch.json", tmp_dir.path().to_string_lossy()); + let df = ctx.read_batch(empty_batch.clone())?; + df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None) + .await?; + // Expected the file to exist and be empty + assert!(std::path::Path::new(&path).exists()); + let metadata = std::fs::metadata(&path)?; + assert_eq!(metadata.len(), 0); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_format() -> Result<()> { + let session = SessionContext::new(); + let ctx = session.state(); + let store = Arc::new(LocalFileSystem::new()) as _; + + // Create a temporary file with JSON array format + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); + std::fs::write( + &path, + r#"[ + {"a": 1, "b": 2.0, "c": true}, + {"a": 2, "b": 3.5, "c": false}, + {"a": 3, "b": 4.0, "c": true} + ]"#, + )?; + + // Test with format_array = true + let format = JsonFormat::default().with_format_array(true); + let file_schema = format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await + .expect("Schema inference"); + + let fields = file_schema + .fields() + .iter() + .map(|f| format!("{}: {:?}", f.name(), f.data_type())) + .collect::>(); + assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_format_empty() -> Result<()> { + let session = SessionContext::new(); + let ctx = session.state(); + let store = Arc::new(LocalFileSystem::new()) as _; + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/empty_array.json", tmp_dir.path().to_string_lossy()); + std::fs::write(&path, "[]")?; + + let format = JsonFormat::default().with_format_array(true); + let result = format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await; + + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("JSON array is empty")); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_format_with_limit() -> Result<()> { + let session = SessionContext::new(); + let ctx = session.state(); + let store = Arc::new(LocalFileSystem::new()) as _; + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array_limit.json", tmp_dir.path().to_string_lossy()); + std::fs::write( + &path, + r#"[ + {"a": 1}, + {"a": 2, "b": "extra"} + ]"#, + )?; + + // Only infer from first record + let format = JsonFormat::default() + .with_format_array(true) + .with_schema_infer_max_rec(1); + + let file_schema = format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await + .expect("Schema inference"); + + // Should only have field "a" since we limited to 1 record + let fields = file_schema + .fields() + .iter() + .map(|f| format!("{}: {:?}", f.name(), f.data_type())) + .collect::>(); + assert_eq!(vec!["a: Int64"], fields); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_format_read_data() -> Result<()> { + let session = SessionContext::new(); + let ctx = session.state(); + let task_ctx = ctx.task_ctx(); + let store = Arc::new(LocalFileSystem::new()) as _; + + // Create a temporary file with JSON array format + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); + std::fs::write( + &path, + r#"[ + {"a": 1, "b": 2.0, "c": true}, + {"a": 2, "b": 3.5, "c": false}, + {"a": 3, "b": 4.0, "c": true} + ]"#, + )?; + + let format = JsonFormat::default().with_format_array(true); + + // Infer schema + let file_schema = format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await?; + + // Scan and read data + let exec = scan_format( + &ctx, + &format, + Some(file_schema), + tmp_dir.path().to_str().unwrap(), + "array.json", + None, + None, + ) + .await?; + let batches = collect(exec, task_ctx).await?; + + assert_eq!(1, batches.len()); + assert_eq!(3, batches[0].num_columns()); + assert_eq!(3, batches[0].num_rows()); + + // Verify data + let array_a = as_int64_array(batches[0].column(0))?; + assert_eq!( + vec![1, 2, 3], + (0..3).map(|i| array_a.value(i)).collect::>() + ); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_format_with_projection() -> Result<()> { + let session = SessionContext::new(); + let ctx = session.state(); + let task_ctx = ctx.task_ctx(); + let store = Arc::new(LocalFileSystem::new()) as _; + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); + std::fs::write(&path, r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#)?; + + let format = JsonFormat::default().with_format_array(true); + let file_schema = format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await?; + + // Project only column "a" + let exec = scan_format( + &ctx, + &format, + Some(file_schema), + tmp_dir.path().to_str().unwrap(), + "array.json", + Some(vec![0]), + None, + ) + .await?; + let batches = collect(exec, task_ctx).await?; + + assert_eq!(1, batches.len()); + assert_eq!(1, batches[0].num_columns()); // Only 1 column projected + assert_eq!(2, batches[0].num_rows()); + + Ok(()) + } + + #[tokio::test] + async fn test_ndjson_read_options_format_array() -> Result<()> { + let ctx = SessionContext::new(); + + // Create a temporary file with JSON array format + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); + std::fs::write( + &path, + r#"[ + {"a": 1, "b": "hello"}, + {"a": 2, "b": "world"}, + {"a": 3, "b": "test"} + ]"#, + )?; + + // Use NdJsonReadOptions with format_array = true + let options = NdJsonReadOptions::default().format_array(true); + + ctx.register_json("json_array_table", &path, options) + .await?; + + let result = ctx + .sql("SELECT a, b FROM json_array_table ORDER BY a") + .await? + .collect() + .await?; + + assert_snapshot!(batches_to_string(&result), @r" + +---+-------+ + | a | b | + +---+-------+ + | 1 | hello | + | 2 | world | + | 3 | test | + +---+-------+ + "); + + Ok(()) + } + + #[tokio::test] + async fn test_ndjson_read_options_format_array_with_compression() -> Result<()> { + use flate2::write::GzEncoder; + use flate2::Compression; + use std::io::Write; + + let ctx = SessionContext::new(); + + // Create a temporary gzip compressed JSON array file + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array.json.gz", tmp_dir.path().to_string_lossy()); + + let json_content = r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#; + let file = std::fs::File::create(&path)?; + let mut encoder = GzEncoder::new(file, Compression::default()); + encoder.write_all(json_content.as_bytes())?; + encoder.finish()?; + + // Use NdJsonReadOptions with format_array and GZIP compression + let options = NdJsonReadOptions::default() + .format_array(true) + .file_compression_type(FileCompressionType::GZIP) + .file_extension(".json.gz"); + + ctx.register_json("json_array_gzip", &path, options).await?; + + let result = ctx + .sql("SELECT a, b FROM json_array_gzip ORDER BY a") + .await? + .collect() + .await?; + + assert_snapshot!(batches_to_string(&result), @r" + +---+-------+ + | a | b | + +---+-------+ + | 1 | hello | + | 2 | world | + +---+-------+ + "); + + Ok(()) + } } diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index e78c5f09553cc..fb707ed3d6c57 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -465,6 +465,9 @@ pub struct NdJsonReadOptions<'a> { pub infinite: bool, /// Indicates how the file is sorted pub file_sort_order: Vec>, + /// Whether the JSON file is in array format `[{...}, {...}]` instead of + /// line-delimited format. Defaults to `false`. + pub format_array: bool, } impl Default for NdJsonReadOptions<'_> { @@ -477,6 +480,7 @@ impl Default for NdJsonReadOptions<'_> { file_compression_type: FileCompressionType::UNCOMPRESSED, infinite: false, file_sort_order: vec![], + format_array: false, } } } @@ -523,6 +527,19 @@ impl<'a> NdJsonReadOptions<'a> { self.file_sort_order = file_sort_order; self } + + /// Specify how many rows to read for schema inference + pub fn schema_infer_max_records(mut self, schema_infer_max_records: usize) -> Self { + self.schema_infer_max_records = schema_infer_max_records; + self + } + + /// Specify whether the JSON file is in array format `[{...}, {...}]` + /// instead of line-delimited format. + pub fn format_array(mut self, format_array: bool) -> Self { + self.format_array = format_array; + self + } } #[async_trait] @@ -657,7 +674,8 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> { let file_format = JsonFormat::default() .with_options(table_options.json) .with_schema_infer_max_rec(self.schema_infer_max_records) - .with_file_compression_type(self.file_compression_type.to_owned()); + .with_file_compression_type(self.file_compression_type.to_owned()) + .with_format_array(self.format_array); ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) diff --git a/datafusion/core/tests/data/json_array.json b/datafusion/core/tests/data/json_array.json new file mode 100644 index 0000000000000..1a8716dbf4beb --- /dev/null +++ b/datafusion/core/tests/data/json_array.json @@ -0,0 +1,5 @@ +[ + {"a": 1, "b": "hello"}, + {"a": 2, "b": "world"}, + {"a": 3, "b": "test"} +] diff --git a/datafusion/core/tests/data/json_empty_array.json b/datafusion/core/tests/data/json_empty_array.json new file mode 100644 index 0000000000000..fe51488c7066f --- /dev/null +++ b/datafusion/core/tests/data/json_empty_array.json @@ -0,0 +1 @@ +[] diff --git a/datafusion/datasource-json/Cargo.toml b/datafusion/datasource-json/Cargo.toml index 37fa8d43a0816..168ae8880eee7 100644 --- a/datafusion/datasource-json/Cargo.toml +++ b/datafusion/datasource-json/Cargo.toml @@ -44,6 +44,7 @@ datafusion-physical-plan = { workspace = true } datafusion-session = { workspace = true } futures = { workspace = true } object_store = { workspace = true } +serde_json = { workspace = true } tokio = { workspace = true } # Note: add additional linter rules in lib.rs. diff --git a/datafusion/datasource-json/src/file_format.rs b/datafusion/datasource-json/src/file_format.rs index 51f4bd7e963e0..589ad6bea9c8b 100644 --- a/datafusion/datasource-json/src/file_format.rs +++ b/datafusion/datasource-json/src/file_format.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -//! [`JsonFormat`]: Line delimited JSON [`FileFormat`] abstractions +//! [`JsonFormat`]: Line delimited and array JSON [`FileFormat`] abstractions use std::any::Any; use std::collections::HashMap; use std::fmt; use std::fmt::Debug; -use std::io::BufReader; +use std::io::{BufReader, Read}; use std::sync::Arc; use crate::source::JsonSource; @@ -47,6 +47,8 @@ use datafusion_datasource::file_format::{ use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; use datafusion_datasource::sink::{DataSink, DataSinkExec}; + +use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::write::demux::DemuxedStreamReceiver; use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join; use datafusion_datasource::write::BatchSerializer; @@ -58,7 +60,6 @@ use datafusion_session::Session; use async_trait::async_trait; use bytes::{Buf, Bytes}; -use datafusion_datasource::source::DataSourceExec; use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; #[derive(Default)] @@ -131,7 +132,23 @@ impl Debug for JsonFormatFactory { } } -/// New line delimited JSON `FileFormat` implementation. +/// JSON `FileFormat` implementation supporting both line-delimited and array formats. +/// +/// # Supported Formats +/// +/// ## Line-Delimited JSON (default) +/// ```text +/// {"key1": 1, "key2": "val"} +/// {"key1": 2, "key2": "vals"} +/// ``` +/// +/// ## JSON Array Format (when `format_array` option is true) +/// ```text +/// [ +/// {"key1": 1, "key2": "val"}, +/// {"key1": 2, "key2": "vals"} +/// ] +/// ``` #[derive(Debug, Default)] pub struct JsonFormat { options: JsonOptions, @@ -165,6 +182,49 @@ impl JsonFormat { self.options.compression = file_compression_type.into(); self } + + /// Set whether to expect JSON array format instead of line-delimited format. + /// + /// When `true`, expects input like: `[{"a": 1}, {"a": 2}]` + /// When `false` (default), expects input like: + /// ```text + /// {"a": 1} + /// {"a": 2} + /// ``` + pub fn with_format_array(mut self, format_array: bool) -> Self { + self.options.format_array = format_array; + self + } +} + +/// Infer schema from a JSON array format file. +/// +/// This function reads JSON data in array format `[{...}, {...}]` and infers +/// the Arrow schema from the contained objects. +fn infer_json_schema_from_json_array( + reader: &mut R, + max_records: usize, +) -> std::result::Result { + let mut content = String::new(); + reader.read_to_string(&mut content).map_err(|e| { + ArrowError::JsonError(format!("Failed to read JSON content: {e}")) + })?; + + // Parse as JSON array using serde_json + let values: Vec = serde_json::from_str(&content) + .map_err(|e| ArrowError::JsonError(format!("Failed to parse JSON array: {e}")))?; + + // Take only max_records for schema inference + let values_to_infer: Vec<_> = values.into_iter().take(max_records).collect(); + + if values_to_infer.is_empty() { + return Err(ArrowError::JsonError( + "JSON array is empty, cannot infer schema".to_string(), + )); + } + + // Use arrow's schema inference on the parsed values + infer_json_schema_from_iterator(values_to_infer.into_iter().map(Ok)) } #[async_trait] @@ -201,6 +261,8 @@ impl FileFormat for JsonFormat { .schema_infer_max_rec .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD); let file_compression_type = FileCompressionType::from(self.options.compression); + let is_array_format = self.options.format_array; + for object in objects { let mut take_while = || { let should_take = records_to_read > 0; @@ -216,15 +278,29 @@ impl FileFormat for JsonFormat { GetResultPayload::File(file, _) => { let decoder = file_compression_type.convert_read(file)?; let mut reader = BufReader::new(decoder); - let iter = ValueIter::new(&mut reader, None); - infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? + + if is_array_format { + infer_json_schema_from_json_array(&mut reader, records_to_read)? + } else { + let iter = ValueIter::new(&mut reader, None); + infer_json_schema_from_iterator( + iter.take_while(|_| take_while()), + )? + } } GetResultPayload::Stream(_) => { let data = r.bytes().await?; let decoder = file_compression_type.convert_read(data.reader())?; let mut reader = BufReader::new(decoder); - let iter = ValueIter::new(&mut reader, None); - infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? + + if is_array_format { + infer_json_schema_from_json_array(&mut reader, records_to_read)? + } else { + let iter = ValueIter::new(&mut reader, None); + infer_json_schema_from_iterator( + iter.take_while(|_| take_while()), + )? + } } }; @@ -253,7 +329,8 @@ impl FileFormat for JsonFormat { _state: &dyn Session, conf: FileScanConfig, ) -> Result> { - let source = Arc::new(JsonSource::new()); + let source = + Arc::new(JsonSource::new().with_format_array(self.options.format_array)); let conf = FileScanConfigBuilder::from(conf) .with_file_compression_type(FileCompressionType::from( self.options.compression, @@ -282,7 +359,7 @@ impl FileFormat for JsonFormat { } fn file_source(&self) -> Arc { - Arc::new(JsonSource::default()) + Arc::new(JsonSource::new().with_format_array(self.options.format_array)) } } diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 52ed0def03f18..d1107b0b97751 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -36,6 +36,7 @@ use datafusion_datasource::{ }; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; +use arrow::array::RecordBatch; use arrow::json::ReaderBuilder; use arrow::{datatypes::SchemaRef, json}; use datafusion_common::Statistics; @@ -55,6 +56,7 @@ pub struct JsonOpener { projected_schema: SchemaRef, file_compression_type: FileCompressionType, object_store: Arc, + format_array: bool, } impl JsonOpener { @@ -64,12 +66,14 @@ impl JsonOpener { projected_schema: SchemaRef, file_compression_type: FileCompressionType, object_store: Arc, + format_array: bool, ) -> Self { Self { batch_size, projected_schema, file_compression_type, object_store, + format_array, } } } @@ -81,6 +85,7 @@ pub struct JsonSource { metrics: ExecutionPlanMetricsSet, projected_statistics: Option, schema_adapter_factory: Option>, + format_array: bool, } impl JsonSource { @@ -88,6 +93,12 @@ impl JsonSource { pub fn new() -> Self { Self::default() } + + /// Set whether to expect JSON array format + pub fn with_format_array(mut self, format_array: bool) -> Self { + self.format_array = format_array; + self + } } impl From for Arc { @@ -110,6 +121,7 @@ impl FileSource for JsonSource { projected_schema: base_config.projected_file_schema(), file_compression_type: base_config.file_compression_type, object_store, + format_array: self.format_array, }) } @@ -181,6 +193,16 @@ impl FileOpener for JsonOpener { let schema = Arc::clone(&self.projected_schema); let batch_size = self.batch_size; let file_compression_type = self.file_compression_type.to_owned(); + let format_array = self.format_array; + + // JSON array format requires reading the complete file + if format_array && partitioned_file.range.is_some() { + return Err(DataFusionError::NotImplemented( + "JSON array format does not support range-based file scanning. \ + Disable repartition_file_scans or use line-delimited JSON format." + .to_string(), + )); + } Ok(Box::pin(async move { let calculated_range = @@ -217,33 +239,94 @@ impl FileOpener for JsonOpener { } }; - let reader = ReaderBuilder::new(schema) - .with_batch_size(batch_size) - .build(BufReader::new(bytes))?; - - Ok(futures::stream::iter(reader) - .map(|r| r.map_err(Into::into)) - .boxed()) + if format_array { + // Handle JSON array format + let batches = read_json_array_to_batches( + BufReader::new(bytes), + schema, + batch_size, + )?; + Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed()) + } else { + let reader = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build(BufReader::new(bytes))?; + Ok(futures::stream::iter(reader) + .map(|r| r.map_err(Into::into)) + .boxed()) + } } GetResultPayload::Stream(s) => { - let s = s.map_err(DataFusionError::from); - - let decoder = ReaderBuilder::new(schema) - .with_batch_size(batch_size) - .build_decoder()?; - let input = file_compression_type.convert_stream(s.boxed())?.fuse(); - - let stream = deserialize_stream( - input, - DecoderDeserializer::new(JsonDecoder::new(decoder)), - ); - Ok(stream.map_err(Into::into).boxed()) + if format_array { + // For streaming, we need to collect all bytes first + let bytes = s + .map_err(DataFusionError::from) + .try_fold(Vec::new(), |mut acc, chunk| async move { + acc.extend_from_slice(&chunk); + Ok(acc) + }) + .await?; + let decompressed = file_compression_type + .convert_read(std::io::Cursor::new(bytes))?; + let batches = read_json_array_to_batches( + BufReader::new(decompressed), + schema, + batch_size, + )?; + Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed()) + } else { + let s = s.map_err(DataFusionError::from); + let decoder = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build_decoder()?; + let input = + file_compression_type.convert_stream(s.boxed())?.fuse(); + let stream = deserialize_stream( + input, + DecoderDeserializer::new(JsonDecoder::new(decoder)), + ); + Ok(stream.map_err(Into::into).boxed()) + } } } })) } } +/// Read JSON array format and convert to RecordBatches +fn read_json_array_to_batches( + mut reader: R, + schema: SchemaRef, + batch_size: usize, +) -> Result> { + use arrow::json::ReaderBuilder; + + let mut content = String::new(); + reader.read_to_string(&mut content)?; + + // Parse JSON array + let values: Vec = serde_json::from_str(&content) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + if values.is_empty() { + return Ok(vec![RecordBatch::new_empty(schema)]); + } + + // Convert to NDJSON string for arrow-json reader + let ndjson: String = values + .iter() + .map(|v| v.to_string()) + .collect::>() + .join("\n"); + + let cursor = std::io::Cursor::new(ndjson); + let reader = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build(cursor)?; + + reader.collect::, _>>().map_err(Into::into) +} + pub async fn plan_to_json( task_ctx: Arc, plan: Arc, diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 267953556b166..c598b8dcb644a 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -467,6 +467,8 @@ message CsvOptions { message JsonOptions { CompressionTypeVariant compression = 1; // Compression type optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference + optional uint32 compression_level = 3; // Optional compression level + bool format_array = 4; // Whether the JSON is in array format [{},...] (default false = line-delimited) } message TableParquetOptions { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 4ede5b970eaeb..71f8cc3e78dc5 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1092,6 +1092,8 @@ impl TryFrom<&protobuf::JsonOptions> for JsonOptions { Ok(JsonOptions { compression: compression.into(), schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize), + compression_level: proto_opts.compression_level.map(|h| h as usize), + format_array: proto_opts.format_array, }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index e63f345459b8f..cc646e48b02d8 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -4548,6 +4548,12 @@ impl serde::Serialize for JsonOptions { if self.schema_infer_max_rec.is_some() { len += 1; } + if self.compression_level.is_some() { + len += 1; + } + if self.format_array { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.JsonOptions", len)?; if self.compression != 0 { let v = CompressionTypeVariant::try_from(self.compression) @@ -4559,6 +4565,12 @@ impl serde::Serialize for JsonOptions { #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("schemaInferMaxRec", ToString::to_string(&v).as_str())?; } + if let Some(v) = self.compression_level.as_ref() { + struct_ser.serialize_field("compressionLevel", v)?; + } + if self.format_array { + struct_ser.serialize_field("formatArray", &self.format_array)?; + } struct_ser.end() } } @@ -4572,12 +4584,18 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { "compression", "schema_infer_max_rec", "schemaInferMaxRec", + "compression_level", + "compressionLevel", + "format_array", + "formatArray", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { Compression, SchemaInferMaxRec, + CompressionLevel, + FormatArray, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -4601,6 +4619,8 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { match value { "compression" => Ok(GeneratedField::Compression), "schemaInferMaxRec" | "schema_infer_max_rec" => Ok(GeneratedField::SchemaInferMaxRec), + "compressionLevel" | "compression_level" => Ok(GeneratedField::CompressionLevel), + "formatArray" | "format_array" => Ok(GeneratedField::FormatArray), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -4622,6 +4642,8 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { { let mut compression__ = None; let mut schema_infer_max_rec__ = None; + let mut compression_level__ = None; + let mut format_array__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Compression => { @@ -4638,11 +4660,27 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) ; } + GeneratedField::CompressionLevel => { + if compression_level__.is_some() { + return Err(serde::de::Error::duplicate_field("compressionLevel")); + } + compression_level__ = + map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) + ; + } + GeneratedField::FormatArray => { + if format_array__.is_some() { + return Err(serde::de::Error::duplicate_field("formatArray")); + } + format_array__ = Some(map_.next_value()?); + } } } Ok(JsonOptions { compression: compression__.unwrap_or_default(), schema_infer_max_rec: schema_infer_max_rec__, + compression_level: compression_level__, + format_array: format_array__.unwrap_or_default(), }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index aa7c3d51a9d6d..24d6600e4d21f 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -659,6 +659,12 @@ pub struct JsonOptions { /// Optional max records for schema inference #[prost(uint64, optional, tag = "2")] pub schema_infer_max_rec: ::core::option::Option, + /// Optional compression level + #[prost(uint32, optional, tag = "3")] + pub compression_level: ::core::option::Option, + /// Whether the JSON is in array format \[{},...\] (default false = line-delimited) + #[prost(bool, tag = "4")] + pub format_array: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableParquetOptions { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index e9de1d9e9a9ef..69f3c269c78a4 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -986,6 +986,8 @@ impl TryFrom<&JsonOptions> for protobuf::JsonOptions { Ok(protobuf::JsonOptions { compression: compression.into(), schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64), + compression_level: opts.compression_level.map(|h| h as u32), + format_array: opts.format_array, }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index aa7c3d51a9d6d..24d6600e4d21f 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -659,6 +659,12 @@ pub struct JsonOptions { /// Optional max records for schema inference #[prost(uint64, optional, tag = "2")] pub schema_infer_max_rec: ::core::option::Option, + /// Optional compression level + #[prost(uint32, optional, tag = "3")] + pub compression_level: ::core::option::Option, + /// Whether the JSON is in array format \[{},...\] (default false = line-delimited) + #[prost(bool, tag = "4")] + pub format_array: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableParquetOptions { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index d32bfb22ffddd..3685e615420fe 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -238,6 +238,8 @@ impl JsonOptionsProto { JsonOptionsProto { compression: options.compression as i32, schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64), + compression_level: options.compression_level.map(|v| v as u32), + format_array: options.format_array, } } else { JsonOptionsProto::default() @@ -256,6 +258,8 @@ impl From<&JsonOptionsProto> for JsonOptions { _ => CompressionTypeVariant::UNCOMPRESSED, }, schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize), + compression_level: proto.compression_level.map(|v| v as usize), + format_array: proto.format_array, } } } diff --git a/datafusion/sqllogictest/test_files/json.slt b/datafusion/sqllogictest/test_files/json.slt index b46b8c49d6623..4442a6a2d5af2 100644 --- a/datafusion/sqllogictest/test_files/json.slt +++ b/datafusion/sqllogictest/test_files/json.slt @@ -146,3 +146,31 @@ EXPLAIN SELECT id FROM json_partitioned_test WHERE part = 2 ---- logical_plan TableScan: json_partitioned_test projection=[id], full_filters=[json_partitioned_test.part = Int32(2)] physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_json/part=2/data.json]]}, projection=[id], file_type=json + +########## +## JSON Array Format Tests +########## + +# Test reading JSON array format file with format_array=true +statement ok +CREATE EXTERNAL TABLE json_array_test +STORED AS JSON +OPTIONS ('format.format_array' 'true') +LOCATION '../core/tests/data/json_array.json'; + +query IT rowsort +SELECT a, b FROM json_array_test +---- +1 hello +2 world +3 test + +statement ok +DROP TABLE json_array_test; + +# Test that reading JSON array format WITHOUT format_array option fails +# (default is line-delimited mode which can't parse array format correctly) +statement error Not valid JSON +CREATE EXTERNAL TABLE json_array_as_ndjson +STORED AS JSON +LOCATION '../core/tests/data/json_array.json'; \ No newline at end of file