|
1 | | -# dag-stream |
| 1 | +# futures-dag |
| 2 | + |
| 3 | +[](https://github.com/riberk/futures-dag/actions/workflows/ci.yml) |
| 4 | +[](https://crates.io/crates/futures-dag) |
| 5 | +[](https://riberk.github.io/futures-dag/coverage_report/index.html) |
| 6 | + |
| 7 | +A dynamic DAG scheduler for async futures with a `Stream`-based API. |
| 8 | + |
| 9 | +This crate executes futures according to a directed acyclic graph (DAG) of |
| 10 | +dependencies and yields their results as a stream. A future starts executing |
| 11 | +**only after all of its dependencies have completed**. |
| 12 | + |
| 13 | +The graph can be extended while it is running, and missing dependencies are |
| 14 | +handled automatically. |
| 15 | + |
| 16 | +--- |
| 17 | + |
| 18 | +## Features |
| 19 | + |
| 20 | +* **Dependency-aware execution** |
| 21 | + Futures run only when all declared dependencies are satisfied. |
| 22 | + |
| 23 | +* **Dynamic graph** |
| 24 | + Nodes and dependencies can be inserted while the DAG is already running. |
| 25 | + |
| 26 | +* **Lazy by design** |
| 27 | + No future is polled until the stream itself is polled. |
| 28 | + |
| 29 | +* **Concurrent execution** |
| 30 | + Independent nodes run concurrently using `FuturesUnordered`. |
| 31 | + |
| 32 | +* **Stream-based API** |
| 33 | + Results are yielded as `(Key, Output)` pairs as soon as futures complete. |
| 34 | + |
| 35 | +* **Placeholder dependencies** |
| 36 | + Dependencies may be referenced before their futures are inserted. |
| 37 | + |
| 38 | +* **Cycle detection** |
| 39 | + If execution stalls and the graph contains a cycle, polling the stream panics. |
| 40 | + |
| 41 | +* **Thread-safe insertion** |
| 42 | + A mutex-backed wrapper allows inserting nodes from multiple tasks or threads. |
| 43 | + |
| 44 | +--- |
| 45 | + |
| 46 | +## Basic usage |
| 47 | + |
| 48 | +```rust |
| 49 | +use futures::StreamExt; |
| 50 | +use std::future::ready; |
| 51 | +use crate::FuturesDag; |
| 52 | + |
| 53 | +let mut dag = FuturesDag::new(); |
| 54 | + |
| 55 | +dag.insert(1, [2, 3].into(), ready("node 1")).unwrap(); |
| 56 | +dag.insert(2, [3].into(), ready("node 2")).unwrap(); |
| 57 | +dag.insert(3, [].into(), ready("node 3")).unwrap(); |
| 58 | + |
| 59 | +let results = dag.collect::<Vec<_>>().await; |
| 60 | + |
| 61 | +assert_eq!( |
| 62 | + results, |
| 63 | + vec![ |
| 64 | + (3, "node 3"), |
| 65 | + (2, "node 2"), |
| 66 | + (1, "node 1"), |
| 67 | + ] |
| 68 | +); |
| 69 | +``` |
| 70 | + |
| 71 | +The order of yielded items reflects **completion order**, not insertion order. |
| 72 | + |
| 73 | +--- |
| 74 | + |
| 75 | +## Dynamic insertion |
| 76 | + |
| 77 | +Nodes may depend on keys that are not yet present in the DAG. |
| 78 | + |
| 79 | +```rust |
| 80 | +use futures::{StreamExt, FutureExt}; |
| 81 | +use futures_dag::BoxFuturesDag; |
| 82 | + |
| 83 | +let mut dag = BoxFuturesDag::default(); |
| 84 | + |
| 85 | +dag.insert_box(1, [2].into(), async {}).unwrap(); |
| 86 | + |
| 87 | +assert!(dag.next().now_or_never().is_none()); |
| 88 | + |
| 89 | +dag.insert_box(2, [].into(), async {}).unwrap(); |
| 90 | + |
| 91 | +assert_eq!(dag.next().await.unwrap().0, 2); |
| 92 | +assert_eq!(dag.next().await.unwrap().0, 1); |
| 93 | +assert_eq!(dag.next().await, None); |
| 94 | +``` |
| 95 | + |
| 96 | +Missing dependencies are tracked as *placeholders* and automatically resolved |
| 97 | +once their futures are inserted and completed. |
| 98 | + |
| 99 | +--- |
| 100 | + |
| 101 | +## Thread-safe insertion |
| 102 | + |
| 103 | +If nodes need to be inserted from multiple tasks or threads while another task |
| 104 | +drives the stream, use `FuturesMutexDag`. |
| 105 | + |
| 106 | +```rust |
| 107 | +use futures::StreamExt; |
| 108 | +use futures_dag::{BoxFuturesDag, FuturesMutexDag}; |
| 109 | + |
| 110 | +let dag = FuturesMutexDag::new(BoxFuturesDag::default()); |
| 111 | + |
| 112 | +let dag_clone = dag.clone(); |
| 113 | +tokio::spawn(async move { |
| 114 | + dag_clone.insert_box(2, [].into(), async {}).unwrap(); |
| 115 | +}); |
| 116 | + |
| 117 | +dag.insert_box(1, [2].into(), async {}).unwrap(); |
| 118 | + |
| 119 | +let results = dag.map(|v| v.0).collect::<Vec<_>>().await; |
| 120 | +assert_eq!(results, vec![2, 1]); |
| 121 | +``` |
| 122 | + |
| 123 | +`FuturesMutexDag` stores the DAG in `Arc<Mutex<_>>`. |
| 124 | + |
| 125 | +--- |
| 126 | + |
| 127 | +## Execution model |
| 128 | + |
| 129 | +* Futures are **not** spawned on a runtime. |
| 130 | +* All polling happens when the stream is polled. |
| 131 | +* Ready futures are driven concurrently via `FuturesUnordered`. |
| 132 | +* The DAG completes when all nodes have completed and yielded their outputs. |
| 133 | + |
| 134 | +If the DAG cannot make progress and not all nodes are completed, the graph is |
| 135 | +checked for cycles and polling panics if a cycle is found. |
| 136 | + |
| 137 | +--- |
| 138 | +## When to use this crate |
| 139 | + |
| 140 | +This crate is a good fit when you need: |
| 141 | + |
| 142 | +* dependency-aware async execution, |
| 143 | +* incremental graph construction, |
| 144 | +* streaming of completion results, |
| 145 | +* a lightweight alternative to a full task scheduler. |
| 146 | + |
| 147 | +--- |
| 148 | + |
| 149 | +## License |
| 150 | + |
| 151 | +MIT |
0 commit comments