4646)
4747from pydantic_graph .beta .step import NodeStep , Step , StepFunction , StepNode
4848from pydantic_graph .beta .util import TypeOrTypeExpression , get_callable_name , unpack_type_expression
49+ from pydantic_graph .exceptions import GraphBuildingError
4950from pydantic_graph .nodes import BaseNode , End
5051
5152StateT = TypeVar ('StateT' , infer_variance = True )
6061T = TypeVar ('T' , infer_variance = True )
6162
6263
63- class GraphBuildingError (ValueError ):
64- """An error raised during graph-building."""
65-
66- pass
67-
68-
6964@dataclass (init = False )
7065class GraphBuilder (Generic [StateT , DepsT , GraphInputT , GraphOutputT ]):
7166 """A builder for constructing executable graph definitions.
@@ -297,7 +292,6 @@ def join(
297292 initial_factory = lambda : initial # pyright: ignore[reportAssignmentType] # noqa E731
298293
299294 return Join [StateT , DepsT , InputT , OutputT ](
300- # TODO: Find a way to use the reducer name here, but still allow duplicates. It makes for a better node id.
301295 id = JoinID (NodeID (node_id or generate_placeholder_node_id (get_callable_name (reducer )))),
302296 reducer = reducer ,
303297 initial_factory = cast (Callable [[], OutputT ], initial_factory ),
@@ -470,7 +464,7 @@ def match_node(
470464 source : type [SourceNodeT ],
471465 * ,
472466 matches : Callable [[Any ], bool ] | None = None ,
473- ) -> DecisionBranch [SourceNodeT ]:
467+ ) -> DecisionBranch [SourceNodeT ]: # pragma: no cover # TODO: We should cover this
474468 """Create a decision branch for BaseNode subclasses.
475469
476470 This is similar to match() but specifically designed for matching
@@ -483,7 +477,6 @@ def match_node(
483477 Returns:
484478 A DecisionBranch for the BaseNode type
485479 """
486- # TODO: Need to cover this in a test
487480 node = NodeStep (source )
488481 path = Path (items = [DestinationMarker (node .id )])
489482 return DecisionBranch (source = source , matches = matches , path = path , destinations = [node ])
@@ -772,7 +765,7 @@ def _handle_path(path: Path, last_source_id: NodeID):
772765 path: The path to process
773766 last_source_id: The current source node ID
774767 """
775- for item in path .items :
768+ for item in path .items : # pragma: no branch
776769 # No need to handle MapMarker or BroadcastMarker here as these should have all been removed
777770 # by the call to `_flatten_paths`
778771 if isinstance (item , DestinationMarker ):
@@ -798,25 +791,23 @@ def _handle_path(path: Path, last_source_id: NodeID):
798791 dominating_forks : dict [JoinID , ParentFork [NodeID ]] = {}
799792 for join in joins :
800793 dominating_fork = finder .find_parent_fork (
801- join .id , explicit_fork_id = join .parent_fork_id , prefer_closest = join .preferred_parent_fork == 'closest'
794+ join .id , parent_fork_id = join .parent_fork_id , prefer_closest = join .preferred_parent_fork == 'closest'
802795 )
803- if dominating_fork is None :
796+ if dominating_fork is None : # pragma: no cover # TODO: We should cover this
804797 rendered_mermaid_graph = build_mermaid_graph (graph_nodes , graph_edges_by_source ).render ()
805- error_message = f"""\
798+ raise GraphBuildingError ( f"""\
806799 For every Join J in the graph, there must be a Fork F between the StartNode and J satisfying:
807800* Every path from the StartNode to J passes through F
808- * There are no cycles in the graph including both J and F.
801+ * There are no cycles in the graph including J that don't pass through F.
809802In this case, F is called a "dominating fork" for J.
810803
811804This is used to determine when all tasks upstream of this Join are complete and we can proceed with execution.
812805
813806Mermaid diagram:
814807{ rendered_mermaid_graph }
815808
816- Join { join .id !r} in this graph has no dominating fork.\
817- """
818- # TODO: Need to cover this in a test
819- raise GraphBuildingError (error_message )
809+ Join { join .id !r} in this graph has no dominating fork in this graph.\
810+ """ )
820811 dominating_forks [join .id ] = dominating_fork
821812
822813 return dominating_forks
0 commit comments