@@ -344,7 +344,7 @@ def add(self, *edges: EdgePath[StateT, DepsT]) -> None:
344
344
"""Add one or more edge paths to the graph.
345
345
346
346
This method processes edge paths and automatically creates any necessary
347
- fork nodes for broadcasts and spreads .
347
+ fork nodes for broadcasts and maps .
348
348
349
349
Args:
350
350
*edges: The edge paths to add to the graph
@@ -358,12 +358,12 @@ def _handle_path(p: Path):
358
358
"""
359
359
for item in p .items :
360
360
if isinstance (item , BroadcastMarker ):
361
- new_node = Fork [Any , Any ](id = item .fork_id , is_spread = False )
361
+ new_node = Fork [Any , Any ](id = item .fork_id , is_map = False )
362
362
self ._insert_node (new_node )
363
363
for path in item .paths :
364
364
_handle_path (Path (items = [* path .items ]))
365
365
elif isinstance (item , SpreadMarker ):
366
- new_node = Fork [Any , Any ](id = item .fork_id , is_spread = True )
366
+ new_node = Fork [Any , Any ](id = item .fork_id , is_map = True )
367
367
self ._insert_node (new_node )
368
368
elif isinstance (item , DestinationMarker ):
369
369
pass
@@ -407,34 +407,34 @@ def add_edge(self, source: Source[T], destination: Destination[T], *, label: str
407
407
builder = builder .label (label )
408
408
self .add (builder .to (destination ))
409
409
410
- def add_spreading_edge (
410
+ def add_mapping_edge (
411
411
self ,
412
412
source : Source [Iterable [T ]],
413
- spread_to : Destination [T ],
413
+ map_to : Destination [T ],
414
414
* ,
415
- pre_spread_label : str | None = None ,
416
- post_spread_label : str | None = None ,
415
+ pre_map_label : str | None = None ,
416
+ post_map_label : str | None = None ,
417
417
fork_id : ForkID | None = None ,
418
418
downstream_join_id : JoinID | None = None ,
419
419
) -> None :
420
- """Add an edge that spreads iterable data across parallel paths.
420
+ """Add an edge that maps iterable data across parallel paths.
421
421
422
422
Args:
423
423
source: The source node that produces iterable data
424
- spread_to : The destination node that receives individual items
425
- pre_spread_label : Optional label before the spread operation
426
- post_spread_label : Optional label after the spread operation
427
- fork_id: Optional ID for the fork node produced for this spread operation
428
- downstream_join_id: Optional ID of a join node that will always be downstream of this spread .
429
- Specifying this ensures correct handling if you try to spread an empty iterable.
424
+ map_to : The destination node that receives individual items
425
+ pre_map_label : Optional label before the map operation
426
+ post_map_label : Optional label after the map operation
427
+ fork_id: Optional ID for the fork node produced for this map operation
428
+ downstream_join_id: Optional ID of a join node that will always be downstream of this map .
429
+ Specifying this ensures correct handling if you try to map an empty iterable.
430
430
"""
431
431
builder = self .edge_from (source )
432
- if pre_spread_label is not None :
433
- builder = builder .label (pre_spread_label )
434
- builder = builder .spread (fork_id = fork_id , downstream_join_id = downstream_join_id )
435
- if post_spread_label is not None :
436
- builder = builder .label (post_spread_label )
437
- self .add (builder .to (spread_to ))
432
+ if pre_map_label is not None :
433
+ builder = builder .label (pre_map_label )
434
+ builder = builder .map (fork_id = fork_id , downstream_join_id = downstream_join_id )
435
+ if post_map_label is not None :
436
+ builder = builder .label (post_map_label )
437
+ self .add (builder .to (map_to ))
438
438
439
439
# TODO(P2): Support adding subgraphs ... not sure exactly what that looks like yet..
440
440
# probably similar to a step, but with some tweaks
@@ -590,17 +590,17 @@ def _get_new_broadcast_id(self, from_: str | None = None) -> str:
590
590
index += 1
591
591
return node_id
592
592
593
- def _get_new_spread_id (self , from_ : str | None = None , to : str | None = None ) -> str :
594
- """Generate a unique ID for a new spread fork.
593
+ def _get_new_map_id (self , from_ : str | None = None , to : str | None = None ) -> str :
594
+ """Generate a unique ID for a new map fork.
595
595
596
596
Args:
597
597
from_: Optional source identifier to include in the ID
598
598
to: Optional destination identifier to include in the ID
599
599
600
600
Returns:
601
- A unique spread fork ID
601
+ A unique map fork ID
602
602
"""
603
- prefix = 'spread '
603
+ prefix = 'map '
604
604
if from_ is not None :
605
605
prefix += f'_from_{ from_ } '
606
606
if to is not None :
@@ -744,13 +744,13 @@ def _normalize_forks(
744
744
paths_to_handle .extend (edges_from_source )
745
745
746
746
node = nodes [source_id ]
747
- if isinstance (node , Fork ) and not node .is_spread :
747
+ if isinstance (node , Fork ) and not node .is_map :
748
748
new_edges [source_id ] = edges_from_source
749
749
continue # broadcast fork; nothing to do
750
750
if len (edges_from_source ) == 1 :
751
751
new_edges [source_id ] = edges_from_source
752
752
continue
753
- new_fork = Fork [Any , Any ](id = ForkID (NodeID (f'{ node .id } _broadcast_fork' )), is_spread = False )
753
+ new_fork = Fork [Any , Any ](id = ForkID (NodeID (f'{ node .id } _broadcast_fork' )), is_map = False )
754
754
new_nodes [new_fork .id ] = new_fork
755
755
new_edges [source_id ] = [Path (items = [BroadcastMarker (fork_id = new_fork .id , paths = edges_from_source )])]
756
756
new_edges [new_fork .id ] = edges_from_source
@@ -764,7 +764,7 @@ def _normalize_forks(
764
764
if isinstance (item , BroadcastMarker ):
765
765
assert item .fork_id in new_nodes
766
766
# if item.fork_id not in new_nodes:
767
- # new_nodes[new_fork.id] = Fork[Any, Any](id=item.fork_id, is_spread =False)
767
+ # new_nodes[new_fork.id] = Fork[Any, Any](id=item.fork_id, is_map =False)
768
768
new_edges [item .fork_id ] = [* item .paths ]
769
769
paths_to_handle .extend (item .paths )
770
770
0 commit comments