@@ -47,7 +47,6 @@ mod tests {
4747 use datafusion_common:: stats:: Precision ;
4848
4949 use datafusion_common:: Result ;
50- use datafusion_datasource:: file_compression_type:: FileCompressionType ;
5150 use futures:: StreamExt ;
5251 use insta:: assert_snapshot;
5352 use object_store:: local:: LocalFileSystem ;
@@ -350,316 +349,4 @@ mod tests {
350349 fn fmt_batches ( batches : & [ RecordBatch ] ) -> String {
351350 pretty:: pretty_format_batches ( batches) . unwrap ( ) . to_string ( )
352351 }
353-
354- #[ tokio:: test]
355- async fn test_write_empty_json_from_sql ( ) -> Result < ( ) > {
356- let ctx = SessionContext :: new ( ) ;
357- let tmp_dir = tempfile:: TempDir :: new ( ) ?;
358- let path = format ! ( "{}/empty_sql.json" , tmp_dir. path( ) . to_string_lossy( ) ) ;
359- let df = ctx. sql ( "SELECT CAST(1 AS BIGINT) AS id LIMIT 0" ) . await ?;
360- df. write_json ( & path, crate :: dataframe:: DataFrameWriteOptions :: new ( ) , None )
361- . await ?;
362- // Expected the file to exist and be empty
363- assert ! ( std:: path:: Path :: new( & path) . exists( ) ) ;
364- let metadata = std:: fs:: metadata ( & path) ?;
365- assert_eq ! ( metadata. len( ) , 0 ) ;
366- Ok ( ( ) )
367- }
368-
369- #[ tokio:: test]
370- async fn test_write_empty_json_from_record_batch ( ) -> Result < ( ) > {
371- let ctx = SessionContext :: new ( ) ;
372- let schema = Arc :: new ( Schema :: new ( vec ! [
373- Field :: new( "id" , DataType :: Int64 , false ) ,
374- Field :: new( "name" , DataType :: Utf8 , true ) ,
375- ] ) ) ;
376- let empty_batch = RecordBatch :: try_new (
377- schema. clone ( ) ,
378- vec ! [
379- Arc :: new( arrow:: array:: Int64Array :: from( Vec :: <i64 >:: new( ) ) ) ,
380- Arc :: new( arrow:: array:: StringArray :: from( Vec :: <Option <& str >>:: new( ) ) ) ,
381- ] ,
382- ) ?;
383-
384- let tmp_dir = tempfile:: TempDir :: new ( ) ?;
385- let path = format ! ( "{}/empty_batch.json" , tmp_dir. path( ) . to_string_lossy( ) ) ;
386- let df = ctx. read_batch ( empty_batch. clone ( ) ) ?;
387- df. write_json ( & path, crate :: dataframe:: DataFrameWriteOptions :: new ( ) , None )
388- . await ?;
389- // Expected the file to exist and be empty
390- assert ! ( std:: path:: Path :: new( & path) . exists( ) ) ;
391- let metadata = std:: fs:: metadata ( & path) ?;
392- assert_eq ! ( metadata. len( ) , 0 ) ;
393- Ok ( ( ) )
394- }
395-
396- #[ tokio:: test]
397- async fn test_json_array_format ( ) -> Result < ( ) > {
398- let session = SessionContext :: new ( ) ;
399- let ctx = session. state ( ) ;
400- let store = Arc :: new ( LocalFileSystem :: new ( ) ) as _ ;
401-
402- // Create a temporary file with JSON array format
403- let tmp_dir = tempfile:: TempDir :: new ( ) ?;
404- let path = format ! ( "{}/array.json" , tmp_dir. path( ) . to_string_lossy( ) ) ;
405- std:: fs:: write (
406- & path,
407- r#"[
408- {"a": 1, "b": 2.0, "c": true},
409- {"a": 2, "b": 3.5, "c": false},
410- {"a": 3, "b": 4.0, "c": true}
411- ]"# ,
412- ) ?;
413-
414- // Test with format_array = true
415- let format = JsonFormat :: default ( ) . with_format_array ( true ) ;
416- let file_schema = format
417- . infer_schema ( & ctx, & store, & [ local_unpartitioned_file ( & path) ] )
418- . await
419- . expect ( "Schema inference" ) ;
420-
421- let fields = file_schema
422- . fields ( )
423- . iter ( )
424- . map ( |f| format ! ( "{}: {:?}" , f. name( ) , f. data_type( ) ) )
425- . collect :: < Vec < _ > > ( ) ;
426- assert_eq ! ( vec![ "a: Int64" , "b: Float64" , "c: Boolean" ] , fields) ;
427-
428- Ok ( ( ) )
429- }
430-
431- #[ tokio:: test]
432- async fn test_json_array_format_empty ( ) -> Result < ( ) > {
433- let session = SessionContext :: new ( ) ;
434- let ctx = session. state ( ) ;
435- let store = Arc :: new ( LocalFileSystem :: new ( ) ) as _ ;
436-
437- let tmp_dir = tempfile:: TempDir :: new ( ) ?;
438- let path = format ! ( "{}/empty_array.json" , tmp_dir. path( ) . to_string_lossy( ) ) ;
439- std:: fs:: write ( & path, "[]" ) ?;
440-
441- let format = JsonFormat :: default ( ) . with_format_array ( true ) ;
442- let result = format
443- . infer_schema ( & ctx, & store, & [ local_unpartitioned_file ( & path) ] )
444- . await ;
445-
446- assert ! ( result. is_err( ) ) ;
447- assert ! ( result
448- . unwrap_err( )
449- . to_string( )
450- . contains( "JSON array is empty" ) ) ;
451-
452- Ok ( ( ) )
453- }
454-
455- #[ tokio:: test]
456- async fn test_json_array_format_with_limit ( ) -> Result < ( ) > {
457- let session = SessionContext :: new ( ) ;
458- let ctx = session. state ( ) ;
459- let store = Arc :: new ( LocalFileSystem :: new ( ) ) as _ ;
460-
461- let tmp_dir = tempfile:: TempDir :: new ( ) ?;
462- let path = format ! ( "{}/array_limit.json" , tmp_dir. path( ) . to_string_lossy( ) ) ;
463- std:: fs:: write (
464- & path,
465- r#"[
466- {"a": 1},
467- {"a": 2, "b": "extra"}
468- ]"# ,
469- ) ?;
470-
471- // Only infer from first record
472- let format = JsonFormat :: default ( )
473- . with_format_array ( true )
474- . with_schema_infer_max_rec ( 1 ) ;
475-
476- let file_schema = format
477- . infer_schema ( & ctx, & store, & [ local_unpartitioned_file ( & path) ] )
478- . await
479- . expect ( "Schema inference" ) ;
480-
481- // Should only have field "a" since we limited to 1 record
482- let fields = file_schema
483- . fields ( )
484- . iter ( )
485- . map ( |f| format ! ( "{}: {:?}" , f. name( ) , f. data_type( ) ) )
486- . collect :: < Vec < _ > > ( ) ;
487- assert_eq ! ( vec![ "a: Int64" ] , fields) ;
488-
489- Ok ( ( ) )
490- }
491-
492- #[ tokio:: test]
493- async fn test_json_array_format_read_data ( ) -> Result < ( ) > {
494- let session = SessionContext :: new ( ) ;
495- let ctx = session. state ( ) ;
496- let task_ctx = ctx. task_ctx ( ) ;
497- let store = Arc :: new ( LocalFileSystem :: new ( ) ) as _ ;
498-
499- // Create a temporary file with JSON array format
500- let tmp_dir = tempfile:: TempDir :: new ( ) ?;
501- let path = format ! ( "{}/array.json" , tmp_dir. path( ) . to_string_lossy( ) ) ;
502- std:: fs:: write (
503- & path,
504- r#"[
505- {"a": 1, "b": 2.0, "c": true},
506- {"a": 2, "b": 3.5, "c": false},
507- {"a": 3, "b": 4.0, "c": true}
508- ]"# ,
509- ) ?;
510-
511- let format = JsonFormat :: default ( ) . with_format_array ( true ) ;
512-
513- // Infer schema
514- let file_schema = format
515- . infer_schema ( & ctx, & store, & [ local_unpartitioned_file ( & path) ] )
516- . await ?;
517-
518- // Scan and read data
519- let exec = scan_format (
520- & ctx,
521- & format,
522- Some ( file_schema) ,
523- tmp_dir. path ( ) . to_str ( ) . unwrap ( ) ,
524- "array.json" ,
525- None ,
526- None ,
527- )
528- . await ?;
529- let batches = collect ( exec, task_ctx) . await ?;
530-
531- assert_eq ! ( 1 , batches. len( ) ) ;
532- assert_eq ! ( 3 , batches[ 0 ] . num_columns( ) ) ;
533- assert_eq ! ( 3 , batches[ 0 ] . num_rows( ) ) ;
534-
535- // Verify data
536- let array_a = as_int64_array ( batches[ 0 ] . column ( 0 ) ) ?;
537- assert_eq ! (
538- vec![ 1 , 2 , 3 ] ,
539- ( 0 ..3 ) . map( |i| array_a. value( i) ) . collect:: <Vec <_>>( )
540- ) ;
541-
542- Ok ( ( ) )
543- }
544-
545- #[ tokio:: test]
546- async fn test_json_array_format_with_projection ( ) -> Result < ( ) > {
547- let session = SessionContext :: new ( ) ;
548- let ctx = session. state ( ) ;
549- let task_ctx = ctx. task_ctx ( ) ;
550- let store = Arc :: new ( LocalFileSystem :: new ( ) ) as _ ;
551-
552- let tmp_dir = tempfile:: TempDir :: new ( ) ?;
553- let path = format ! ( "{}/array.json" , tmp_dir. path( ) . to_string_lossy( ) ) ;
554- std:: fs:: write ( & path, r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"# ) ?;
555-
556- let format = JsonFormat :: default ( ) . with_format_array ( true ) ;
557- let file_schema = format
558- . infer_schema ( & ctx, & store, & [ local_unpartitioned_file ( & path) ] )
559- . await ?;
560-
561- // Project only column "a"
562- let exec = scan_format (
563- & ctx,
564- & format,
565- Some ( file_schema) ,
566- tmp_dir. path ( ) . to_str ( ) . unwrap ( ) ,
567- "array.json" ,
568- Some ( vec ! [ 0 ] ) ,
569- None ,
570- )
571- . await ?;
572- let batches = collect ( exec, task_ctx) . await ?;
573-
574- assert_eq ! ( 1 , batches. len( ) ) ;
575- assert_eq ! ( 1 , batches[ 0 ] . num_columns( ) ) ; // Only 1 column projected
576- assert_eq ! ( 2 , batches[ 0 ] . num_rows( ) ) ;
577-
578- Ok ( ( ) )
579- }
580-
581- #[ tokio:: test]
582- async fn test_ndjson_read_options_format_array ( ) -> Result < ( ) > {
583- let ctx = SessionContext :: new ( ) ;
584-
585- // Create a temporary file with JSON array format
586- let tmp_dir = tempfile:: TempDir :: new ( ) ?;
587- let path = format ! ( "{}/array.json" , tmp_dir. path( ) . to_string_lossy( ) ) ;
588- std:: fs:: write (
589- & path,
590- r#"[
591- {"a": 1, "b": "hello"},
592- {"a": 2, "b": "world"},
593- {"a": 3, "b": "test"}
594- ]"# ,
595- ) ?;
596-
597- // Use NdJsonReadOptions with format_array = true
598- let options = NdJsonReadOptions :: default ( ) . format_array ( true ) ;
599-
600- ctx. register_json ( "json_array_table" , & path, options)
601- . await ?;
602-
603- let result = ctx
604- . sql ( "SELECT a, b FROM json_array_table ORDER BY a" )
605- . await ?
606- . collect ( )
607- . await ?;
608-
609- assert_snapshot ! ( batches_to_string( & result) , @r"
610- +---+-------+
611- | a | b |
612- +---+-------+
613- | 1 | hello |
614- | 2 | world |
615- | 3 | test |
616- +---+-------+
617- " ) ;
618-
619- Ok ( ( ) )
620- }
621-
622- #[ tokio:: test]
623- async fn test_ndjson_read_options_format_array_with_compression ( ) -> Result < ( ) > {
624- use flate2:: write:: GzEncoder ;
625- use flate2:: Compression ;
626- use std:: io:: Write ;
627-
628- let ctx = SessionContext :: new ( ) ;
629-
630- // Create a temporary gzip compressed JSON array file
631- let tmp_dir = tempfile:: TempDir :: new ( ) ?;
632- let path = format ! ( "{}/array.json.gz" , tmp_dir. path( ) . to_string_lossy( ) ) ;
633-
634- let json_content = r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"# ;
635- let file = std:: fs:: File :: create ( & path) ?;
636- let mut encoder = GzEncoder :: new ( file, Compression :: default ( ) ) ;
637- encoder. write_all ( json_content. as_bytes ( ) ) ?;
638- encoder. finish ( ) ?;
639-
640- // Use NdJsonReadOptions with format_array and GZIP compression
641- let options = NdJsonReadOptions :: default ( )
642- . format_array ( true )
643- . file_compression_type ( FileCompressionType :: GZIP )
644- . file_extension ( ".json.gz" ) ;
645-
646- ctx. register_json ( "json_array_gzip" , & path, options) . await ?;
647-
648- let result = ctx
649- . sql ( "SELECT a, b FROM json_array_gzip ORDER BY a" )
650- . await ?
651- . collect ( )
652- . await ?;
653-
654- assert_snapshot ! ( batches_to_string( & result) , @r"
655- +---+-------+
656- | a | b |
657- +---+-------+
658- | 1 | hello |
659- | 2 | world |
660- +---+-------+
661- " ) ;
662-
663- Ok ( ( ) )
664- }
665352}
0 commit comments