11//! Structures for writing output data.
22
33use crate :: { options:: Options , value:: Value , Error , Result } ;
4- use object_store:: { local :: LocalFileSystem , ObjectStore , PutResult } ;
5- use stac:: { io:: Config , Format } ;
6- use std:: { io :: IsTerminal , path:: Path , pin:: Pin } ;
4+ use object_store:: PutResult ;
5+ use stac:: io:: { Format , IntoFormattedBytes } ;
6+ use std:: { path:: Path , pin:: Pin } ;
77use tokio:: {
88 fs:: File ,
99 io:: { AsyncWrite , AsyncWriteExt } ,
@@ -15,7 +15,7 @@ pub(crate) struct Output {
1515 pub ( crate ) format : Format ,
1616 href : Option < String > ,
1717 stream : Pin < Box < dyn AsyncWrite + Send > > ,
18- config : Config ,
18+ options : Options ,
1919}
2020
2121impl Output {
@@ -26,10 +26,9 @@ impl Output {
2626 options : impl Into < Options > ,
2727 create_parent_directories : bool ,
2828 ) -> Result < Output > {
29- let mut format = format
29+ let format = format
3030 . or_else ( || href. as_deref ( ) . and_then ( Format :: infer_from_href) )
3131 . unwrap_or_default ( ) ;
32- let config = Config :: new ( ) . format ( Some ( format) ) . options ( options. into ( ) ) ;
3332 let stream = if let Some ( href) = href. as_deref ( ) {
3433 if let Ok ( url) = Url :: parse ( href) {
3534 if url. scheme ( ) == "file" {
@@ -39,53 +38,39 @@ impl Output {
3938 )
4039 . await ?
4140 } else {
42- Box :: pin ( config. buf_writer ( & url) ?)
41+ unimplemented ! ( "streaming to object stores is not supported" ) ;
42+ // FIXME turn this into an actual error
4343 }
4444 } else {
4545 create_file_stream ( href, create_parent_directories) . await ?
4646 }
4747 } else {
48- if std:: io:: stdout ( ) . is_terminal ( ) {
49- format = format. pretty ( ) ;
50- }
5148 Box :: pin ( tokio:: io:: stdout ( ) )
5249 } ;
5350 Ok ( Output {
5451 href,
5552 format,
5653 stream,
57- config ,
54+ options : options . into ( ) ,
5855 } )
5956 }
6057
6158 /// Streams a value to the output
6259 pub ( crate ) async fn stream ( & mut self , value : Value ) -> Result < ( ) > {
63- let bytes = value. into_ndjson ( ) ?;
60+ let bytes = value. into_formatted_bytes ( Format :: NdJson ) ?;
6461 self . stream . write_all ( & bytes) . await ?;
6562 self . stream . flush ( ) . await ?;
6663 Ok ( ( ) )
6764 }
6865
6966 /// Puts a value to the output.
7067 pub ( crate ) async fn put ( & mut self , value : Value ) -> Result < Option < PutResult > > {
71- let bytes = match value {
72- Value :: Json ( value) => self . format . json_to_vec ( value) ?,
73- Value :: Stac ( value) => self . format . value_to_vec ( value) ?,
74- } ;
7568 if let Some ( href) = self . href . as_deref ( ) {
76- let ( object_store, path) : ( Box < dyn ObjectStore > , _ ) = match Url :: parse ( href) {
77- Ok ( url) => self . config . object_store ( & url) ?,
78- Err ( _) => {
79- let path = object_store:: path:: Path :: from_filesystem_path ( href) ?;
80- ( Box :: new ( LocalFileSystem :: new ( ) ) , path)
81- }
82- } ;
83- object_store
84- . put ( & path, bytes. into ( ) )
69+ stac:: io:: put_format_opts ( href, value, self . format , self . options . iter ( ) )
8570 . await
86- . map ( Some )
8771 . map_err ( Error :: from)
8872 } else {
73+ let bytes = value. into_formatted_bytes ( self . format ) ?;
8974 self . stream . write_all ( & bytes) . await ?;
9075 self . stream . flush ( ) . await ?;
9176 Ok ( None )
0 commit comments