@@ -1402,10 +1402,15 @@ def process(self, element, five):
14021402 assert_that (d , equal_to ([6 , 7 , 8 ]))
14031403 self .p .run ()
14041404
1405- def test_composite_child_without_explicit_hints_falls_back_to_any (self ):
1405+ def test_child_with_both_input_and_output_hints_binds_typevars_correctly (
1406+ self ):
14061407 """
1407- Verifies that child transforms in a composite need explicit type hints.
1408- Without them, type variables fall back to Any despite parent decorators.
1408+ When a child transform has both input and output type hints with type
1409+ variables, those variables bind correctly from the actual input data.
1410+
1411+ Example: Child with .with_input_types(Tuple[K, V])
1412+ .with_output_types(Tuple[K, V]) receiving Tuple['a', 'hello'] will bind
1413+ K=str, V=str correctly.
14091414 """
14101415 K = typehints .TypeVariable ('K' )
14111416 V = typehints .TypeVariable ('V' )
@@ -1419,18 +1424,83 @@ def process(self, element):
14191424 yield (k , v .upper ())
14201425
14211426 def expand (self , pcoll ):
1422- return pcoll | beam .ParDo (self .MyDoFn ()).with_output_types (tuple [K , V ])
1423- # return pcoll | beam.ParDo(self.MyDoFn()).with_output_types(tuple[str,str])
1427+ return (
1428+ pcoll
1429+ | beam .ParDo (self .MyDoFn ()).with_input_types (
1430+ tuple [K , V ]).with_output_types (tuple [K , V ]))
1431+
1432+ with TestPipeline () as p :
1433+ result = (
1434+ p
1435+ | beam .Create ([('a' , 'hello' ), ('b' , 'world' )])
1436+ | TransformWithoutChildHints ())
1437+
1438+ self .assertEqual (result .element_type , typehints .Tuple [str , str ])
1439+
1440+ def test_child_without_input_hints_fails_to_bind_typevars (self ):
1441+ """
1442+ When a child transform lacks input type hints, type variables in its output
1443+ hints cannot bind and default to Any, even when parent composite has
1444+ decorated type hints.
1445+
1446+ This test demonstrates the current limitation: without explicit input hints
1447+ on the child, the type variable K in .with_output_types(Tuple[K, str])
1448+ remains unbound, resulting in Tuple[Any, str] instead of the expected
1449+ Tuple[str, str].
1450+ """
1451+ K = typehints .TypeVariable ('K' )
1452+
1453+ @typehints .with_input_types (typehints .Tuple [K , str ])
1454+ @typehints .with_output_types (typehints .Tuple [K , str ])
1455+ class TransformWithoutChildHints (beam .PTransform ):
1456+ class MyDoFn (beam .DoFn ):
1457+ def process (self , element ):
1458+ k , v = element
1459+ yield (k , v .upper ())
1460+
1461+ def expand (self , pcoll ):
1462+ return (
1463+ pcoll
1464+ | beam .ParDo (self .MyDoFn ()).with_output_types (tuple [K , str ]))
1465+
1466+ with TestPipeline () as p :
1467+ result = (
1468+ p
1469+ | beam .Create ([('a' , 'hello' ), ('b' , 'world' )])
1470+ | TransformWithoutChildHints ())
1471+
1472+ self .assertEqual (result .element_type , typehints .Tuple [typehints .Any , str ])
1473+
1474+ def test_child_without_output_hints_infers_partial_types_from_dofn (self ):
1475+ """
1476+ When a child transform has input hints but no output hints, type inference
1477+ from the DoFn's process method produces partially inferred types.
1478+
1479+ With .with_input_types(Tuple[K, V]) and actual input Tuple['a', 'hello'], K
1480+ binds to str from the input. However, without output hints, the DoFn's
1481+ yield statement inference produces Tuple[str, Any].
1482+ """
1483+ K = typehints .TypeVariable ('K' )
1484+ V = typehints .TypeVariable ('K' )
1485+
1486+ @typehints .with_input_types (typehints .Tuple [K , V ])
1487+ @typehints .with_output_types (typehints .Tuple [K , V ])
1488+ class TransformWithoutChildHints (beam .PTransform ):
1489+ class MyDoFn (beam .DoFn ):
1490+ def process (self , element ):
1491+ k , v = element
1492+ yield (k , v .upper ())
1493+
1494+ def expand (self , pcoll ):
1495+ return (pcoll | beam .ParDo (self .MyDoFn ()).with_input_types (tuple [K , V ]))
14241496
14251497 with TestPipeline () as p :
14261498 result = (
14271499 p
14281500 | beam .Create ([('a' , 'hello' ), ('b' , 'world' )])
14291501 | TransformWithoutChildHints ())
14301502
1431- # Current behavior: Types become Any
1432- self .assertEqual (
1433- result .element_type , typehints .Tuple [typehints .Any , typehints .Any ])
1503+ self .assertEqual (result .element_type , typehints .Tuple [str , typing .Any ])
14341504
14351505 def test_do_fn_pipeline_pipeline_type_check_violated (self ):
14361506 @with_input_types (str , str )
0 commit comments