|
1 | | -use std::collections::VecDeque; |
2 | | - |
3 | 1 | use crate::{Error, Json, Result}; |
4 | 2 | use pyo3::{ |
5 | 3 | exceptions::PyStopAsyncIteration, pyclass, pyfunction, pymethods, types::PyDict, Bound, Py, |
6 | 4 | PyAny, PyResult, Python, |
7 | 5 | }; |
8 | | -use stac::{Container, Node, Value}; |
| 6 | +use stac::{Container, Item, Links, Node, SelfHref, Value}; |
| 7 | +use std::collections::VecDeque; |
| 8 | +use std::sync::Arc; |
| 9 | +use tokio::sync::Mutex; |
9 | 10 |
|
10 | 11 | #[pyfunction] |
11 | 12 | pub fn walk(container: Bound<'_, PyDict>) -> Result<Walk> { |
12 | | - let value: Value = pythonize::depythonize(&container)?; |
| 13 | + let mut value: Value = pythonize::depythonize(&container)?; |
| 14 | + if let Some(link) = value.link("self").cloned() { |
| 15 | + *value.self_href_mut() = Some(link.href); |
| 16 | + } |
13 | 17 | let container: Container = value.try_into()?; |
14 | 18 | let node = Node::from(container); |
15 | 19 | let mut walks = VecDeque::new(); |
16 | 20 | walks.push_back(node); |
17 | | - Ok(Walk(walks)) |
| 21 | + Ok(Walk(Arc::new(Mutex::new(walks)))) |
18 | 22 | } |
19 | 23 |
|
20 | 24 | #[pyclass] |
21 | | -pub struct Walk(VecDeque<Node>); |
| 25 | +pub struct Walk(Arc<Mutex<VecDeque<Node>>>); |
22 | 26 |
|
23 | 27 | #[pymethods] |
24 | 28 | impl Walk { |
25 | 29 | fn __aiter__(slf: Py<Self>) -> Py<Self> { |
26 | 30 | slf |
27 | 31 | } |
28 | 32 |
|
29 | | - fn __anext__<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> { |
30 | | - if let Some(node) = self.0.pop_front() { |
31 | | - pyo3_async_runtimes::tokio::future_into_py(py, async move { |
32 | | - let node = node.resolve().await.map_err(Error::from)?; |
33 | | - let mut children = Vec::with_capacity(node.children.len()); |
34 | | - for child in node.children { |
35 | | - children.push(Json(child.value.clone())); |
36 | | - self.0.push_back(child); |
37 | | - } |
38 | | - let items: Vec<_> = node.items.iter().map(|item| Json(item.clone())).collect(); |
39 | | - Ok((Json(node.value), children, items)) |
40 | | - }) |
41 | | - } else { |
42 | | - Err(PyStopAsyncIteration::new_err("walk complete")) |
| 33 | + fn __anext__<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> { |
| 34 | + let nodes = self.0.clone(); |
| 35 | + pyo3_async_runtimes::tokio::future_into_py(py, async move { next_walk(nodes).await }) |
| 36 | + } |
| 37 | +} |
| 38 | + |
| 39 | +type WalkStep = (Value, Vec<Container>, VecDeque<Item>); |
| 40 | + |
| 41 | +async fn next_walk(nodes: Arc<Mutex<VecDeque<Node>>>) -> PyResult<Json<WalkStep>> { |
| 42 | + let mut nodes = nodes.lock().await; |
| 43 | + if let Some(node) = nodes.pop_front() { |
| 44 | + let mut node = node.resolve().await.map_err(Error::from)?; |
| 45 | + let items = std::mem::take(&mut node.items); |
| 46 | + let mut children = Vec::with_capacity(node.children.len()); |
| 47 | + for child in node.children { |
| 48 | + children.push(child.value.clone()); |
| 49 | + nodes.push_back(child); |
43 | 50 | } |
| 51 | + Ok(Json((node.value.into(), children, items))) |
| 52 | + } else { |
| 53 | + Err(PyStopAsyncIteration::new_err("done walking")) |
44 | 54 | } |
45 | 55 | } |
0 commit comments