File tree Expand file tree Collapse file tree 2 files changed +34
-3
lines changed Expand file tree Collapse file tree 2 files changed +34
-3
lines changed Original file line number Diff line number Diff line change @@ -55,15 +55,13 @@ impl PyMergeBuilder {
5555 custom_execute_handler : Option < Arc < dyn CustomExecuteHandler > > ,
5656 ) -> DeltaResult < Self > {
5757 let ctx = SessionContext :: new ( ) ;
58- let schema = source
59- . schema_ref ( )
60- . map_err ( |e| DeltaTableError :: generic ( e. to_string ( ) ) ) ?;
6158
6259 let source = source
6360 . into_reader ( )
6461 . map_err ( |e| DeltaTableError :: generic ( e. to_string ( ) ) ) ?;
6562
6663 let source = maybe_lazy_cast_reader ( source, batch_schema. into_inner ( ) ) ;
64+ let schema = source. schema ( ) ;
6765
6866 let source_df = if streamed_exec {
6967 let arrow_stream_batch_generator: Arc < RwLock < dyn LazyBatchGenerator > > =
Original file line number Diff line number Diff line change 11import pytest
22from arro3 .core import DataType , Field , Schema
33
4+ from deltalake import DeltaTable , write_deltalake
45from deltalake .writer ._conversion import _convert_arro3_schema_to_delta
56
67
385386)
386387def test_schema_conversion (input_schema : Schema , expected_schema : Schema ):
387388 assert expected_schema == _convert_arro3_schema_to_delta (input_schema )
389+
390+
391+ @pytest .mark .pandas
392+ def test_merge_casting_table_provider (tmp_path ):
393+ import pandas as pd
394+
395+ df = pd .DataFrame (
396+ {
397+ "a" : 1 ,
398+ "ts" : pd .date_range (
399+ "2021-01-01" , "2021-01-02" , freq = "h" , tz = "America/Chicago"
400+ ),
401+ }
402+ )
403+ write_deltalake (tmp_path , df , mode = "overwrite" )
404+
405+ df2 = pd .DataFrame (
406+ {
407+ "a" : 2 ,
408+ "ts" : pd .date_range (
409+ "2021-01-01" , "2021-01-03" , freq = "h" , tz = "America/Chicago"
410+ ),
411+ }
412+ )
413+
414+ dt = DeltaTable (tmp_path )
415+ dt .merge (
416+ df2 ,
417+ predicate = "source.ts = target.ts" ,
418+ source_alias = "source" ,
419+ target_alias = "target" ,
420+ ).when_matched_update_all ().when_not_matched_insert_all ().execute ()
You can’t perform that action at this time.
0 commit comments