@@ -1940,6 +1940,7 @@ def test_write_parquet_with_options_encoding(tmp_path, encoding, data_types, res
19401940 data ["float" ] = [1.01 , 2.02 , 3.03 ]
19411941 elif data_type == "str" :
19421942 data ["str" ] = ["a" , "b" , "c" ]
1943+
19431944 elif data_type == "bool" :
19441945 data ["bool" ] = [True , False , True ]
19451946
@@ -2668,3 +2669,77 @@ def test_join_deduplicate_select():
26682669 assert multi_result .column (0 ).to_pylist () == expected_data ["id" ]
26692670 assert multi_result .column (1 ).to_pylist () == expected_data ["name" ]
26702671 assert multi_result .column (2 ).to_pylist () == expected_data ["city" ]
2672+
2673+
2674+ def test_join_deduplicate_all_types ():
2675+ """Test deduplication behavior across different join types (left, right, outer)."""
2676+ ctx = SessionContext ()
2677+
2678+ # Create left dataframe with some rows that won't match
2679+ left_batch = pa .RecordBatch .from_arrays (
2680+ [pa .array ([1 , 2 , 3 , 4 ]), pa .array (["a" , "b" , "c" , "d" ])],
2681+ names = ["id" , "left_value" ],
2682+ )
2683+ left_df = ctx .create_dataframe ([[left_batch ]], "left" )
2684+
2685+ # Create right dataframe with some rows that won't match and duplicate column name
2686+ right_batch = pa .RecordBatch .from_arrays (
2687+ [pa .array ([2 , 3 , 5 , 6 ]), pa .array (["x" , "y" , "z" , "w" ])],
2688+ names = ["id" , "right_value" ],
2689+ )
2690+ right_df = ctx .create_dataframe ([[right_batch ]], "right" )
2691+
2692+ # Test inner join with deduplication (default behavior)
2693+ inner_joined = left_df .join (right_df , on = "id" , how = "inner" , deduplicate = True )
2694+ inner_result = inner_joined .sort ([column ("id" )]).collect ()[0 ]
2695+
2696+ # Should only have matching rows (2, 3)
2697+ expected_inner = {
2698+ "id" : [2 , 3 ],
2699+ "left_value" : ["b" , "c" ],
2700+ "right_value" : ["x" , "y" ],
2701+ }
2702+ assert inner_result .to_pydict () == expected_inner
2703+
2704+ # Test left join with deduplication
2705+ left_joined = left_df .join (right_df , on = "id" , how = "left" , deduplicate = True )
2706+ left_result = left_joined .sort ([column ("id" )]).collect ()[0 ]
2707+
2708+ # Should have all left rows, with nulls for unmatched right rows
2709+ expected_left = {
2710+ "id" : [1 , 2 , 3 , 4 ],
2711+ "left_value" : ["a" , "b" , "c" , "d" ],
2712+ "right_value" : [None , "x" , "y" , None ],
2713+ }
2714+ assert left_result .to_pydict () == expected_left
2715+
2716+ # Test right join with deduplication
2717+ right_joined = left_df .join (right_df , on = "id" , how = "right" , deduplicate = True )
2718+ right_result = right_joined .sort ([column ("id" )]).collect ()[0 ]
2719+
2720+ # Should have all right rows, with nulls for unmatched left rows
2721+ expected_right = {
2722+ "id" : [2 , 3 , 5 , 6 ],
2723+ "left_value" : ["b" , "c" , None , None ],
2724+ "right_value" : ["x" , "y" , "z" , "w" ],
2725+ }
2726+ assert right_result .to_pydict () == expected_right
2727+
2728+ # Test full outer join with deduplication
2729+ outer_joined = left_df .join (right_df , on = "id" , how = "outer" , deduplicate = True )
2730+ outer_result = outer_joined .sort ([column ("id" )]).collect ()[0 ]
2731+
2732+ # Should have all rows from both sides, with nulls for unmatched rows
2733+ expected_outer = {
2734+ "id" : [1 , 2 , 3 , 4 , 5 , 6 ],
2735+ "left_value" : ["a" , "b" , "c" , "d" , None , None ],
2736+ "right_value" : [None , "x" , "y" , None , "z" , "w" ],
2737+ }
2738+ assert outer_result .to_pydict () == expected_outer
2739+
2740+ # Verify that we can still select the deduplicated column without issues
2741+ for join_type in ["inner" , "left" , "right" , "outer" ]:
2742+ joined = left_df .join (right_df , on = "id" , how = join_type , deduplicate = True )
2743+ selected = joined .select (column ("id" ))
2744+ # Should not raise an error and should have the same number of rows
2745+ assert len (selected .collect ()[0 ]) == len (joined .collect ()[0 ])
0 commit comments