|
20 | 20 | import functools
|
21 | 21 | import itertools
|
22 | 22 | import typing
|
23 |
| -from typing import ( |
24 |
| - Callable, |
25 |
| - Dict, |
26 |
| - Generator, |
27 |
| - Iterable, |
28 |
| - Mapping, |
29 |
| - Sequence, |
30 |
| - Set, |
31 |
| - Tuple, |
32 |
| - Union, |
33 |
| -) |
| 23 | +from typing import Callable, Dict, Generator, Iterable, Mapping, Sequence, Tuple, Union |
34 | 24 |
|
35 | 25 | from bigframes.core import expression, field, identifiers
|
36 | 26 | import bigframes.core.schema as schemata
|
@@ -309,33 +299,31 @@ def unique_nodes(
|
309 | 299 | seen.add(item)
|
310 | 300 | stack.extend(item.child_nodes)
|
311 | 301 |
|
312 |
| - def edges( |
| 302 | + def iter_nodes_topo( |
313 | 303 | self: BigFrameNode,
|
314 |
| - ) -> Generator[Tuple[BigFrameNode, BigFrameNode], None, None]: |
315 |
| - for item in self.unique_nodes(): |
316 |
| - for child in item.child_nodes: |
317 |
| - yield (item, child) |
318 |
| - |
319 |
| - def iter_nodes_topo(self: BigFrameNode) -> Generator[BigFrameNode, None, None]: |
320 |
| - """Returns nodes from bottom up.""" |
321 |
| - queue = collections.deque( |
322 |
| - [node for node in self.unique_nodes() if not node.child_nodes] |
323 |
| - ) |
324 |
| - |
| 304 | + ) -> Generator[BigFrameNode, None, None]: |
| 305 | + """Returns nodes in reverse topological order, using Kahn's algorithm.""" |
325 | 306 | child_to_parents: Dict[
|
326 |
| - BigFrameNode, Set[BigFrameNode] |
327 |
| - ] = collections.defaultdict(set) |
328 |
| - for parent, child in self.edges(): |
329 |
| - child_to_parents[child].add(parent) |
330 |
| - |
331 |
| - yielded = set() |
| 307 | + BigFrameNode, list[BigFrameNode] |
| 308 | + ] = collections.defaultdict(list) |
| 309 | + out_degree: Dict[BigFrameNode, int] = collections.defaultdict(int) |
| 310 | + |
| 311 | + queue: collections.deque["BigFrameNode"] = collections.deque() |
| 312 | + for node in list(self.unique_nodes()): |
| 313 | + num_children = len(node.child_nodes) |
| 314 | + out_degree[node] = num_children |
| 315 | + if num_children == 0: |
| 316 | + queue.append(node) |
| 317 | + for child in node.child_nodes: |
| 318 | + child_to_parents[child].append(node) |
332 | 319 |
|
333 | 320 | while queue:
|
334 | 321 | item = queue.popleft()
|
335 | 322 | yield item
|
336 |
| - yielded.add(item) |
337 |
| - for parent in child_to_parents[item]: |
338 |
| - if set(parent.child_nodes).issubset(yielded): |
| 323 | + parents = child_to_parents.get(item, []) |
| 324 | + for parent in parents: |
| 325 | + out_degree[parent] -= 1 |
| 326 | + if out_degree[parent] == 0: |
339 | 327 | queue.append(parent)
|
340 | 328 |
|
341 | 329 | def top_down(
|
|
0 commit comments