Skip to content

Commit dba8fee

Browse files
authored
feat: each can flatten streams from the closure without collecting (nushell#16735)
1 parent 58b0593 commit dba8fee

File tree

3 files changed

+90
-16
lines changed

3 files changed

+90
-16
lines changed

crates/nu-cli/src/menus/help_completions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ mod test {
140140
use rstest::rstest;
141141

142142
#[rstest]
143-
#[case("who", 5, 8, &["whoami"])]
143+
#[case("who", 5, 8, &["whoami", "each"])]
144144
#[case("hash", 1, 5, &["hash", "hash md5", "hash sha256"])]
145145
#[case("into f", 0, 6, &["into float", "into filesize"])]
146146
#[case("into nonexistent", 0, 16, &[])]

crates/nu-command/src/filters/each.rs

Lines changed: 70 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,20 @@ iterate over each record, not necessarily each cell within it.
2121
Avoid passing single records to this command. Since a record is a
2222
one-row structure, 'each' will only run once, behaving similar to 'do'.
2323
To iterate over a record's values, use 'items' or try converting it to a table
24-
with 'transpose' first."#
24+
with 'transpose' first.
25+
26+
27+
By default, for each input there is a single output value.
28+
If the closure returns a stream rather than value, the stream is collected
29+
completely, and the resulting value becomes one of the items in `each`'s output.
30+
31+
To receive items from those streams without waiting for the whole stream to be
32+
collected, `each --flatten` can be used.
33+
Instead of waiting for the stream to be collected before returning the result as
34+
a single item, `each --flatten` will return each item as soon as they are received.
35+
36+
This "flattens" the output, turning an output that would otherwise be a
37+
list of lists like `list<list<string>>` into a flat list like `list<string>`."#
2538
}
2639

2740
fn search_terms(&self) -> Vec<&str> {
@@ -44,6 +57,12 @@ with 'transpose' first."#
4457
"The closure to run.",
4558
)
4659
.switch("keep-empty", "keep empty result cells", Some('k'))
60+
.switch(
61+
"flatten",
62+
"combine outputs into a single stream instead of\
63+
collecting them to separate values",
64+
Some('f'),
65+
)
4766
.allow_variants_without_examples(true)
4867
.category(Category::Filters)
4968
}
@@ -95,6 +114,16 @@ with 'transpose' first."#
95114
description: "Update value if not null, otherwise do nothing",
96115
result: None,
97116
},
117+
Example {
118+
description: "Scan through multiple files without pause",
119+
example: "\
120+
ls *.txt \
121+
| each --flatten {|f| open $f.name | lines } \
122+
| find -i 'note: ' \
123+
| str join \"\\n\"\
124+
",
125+
result: None,
126+
},
98127
]
99128
}
100129

@@ -108,6 +137,7 @@ with 'transpose' first."#
108137
let head = call.head;
109138
let closure: Closure = call.req(engine_state, stack, 0)?;
110139
let keep_empty = call.has_flag(engine_state, stack, "keep-empty")?;
140+
let flatten = call.has_flag(engine_state, stack, "flatten")?;
111141

112142
let metadata = input.metadata();
113143
let result = match input {
@@ -119,27 +149,52 @@ with 'transpose' first."#
119149
| PipelineData::ListStream(..) => {
120150
let mut closure = ClosureEval::new(engine_state, stack, closure);
121151

122-
Ok(input
123-
.into_iter()
124-
.map(move |val| {
125-
each_map(val, &mut closure, head)
126-
.unwrap_or_else(|error| Value::error(error, head))
127-
})
128-
.into_pipeline_data(head, engine_state.signals().clone()))
152+
let out = if flatten {
153+
input
154+
.into_iter()
155+
.flat_map(move |value| {
156+
closure.run_with_value(value).unwrap_or_else(|error| {
157+
Value::error(error, head).into_pipeline_data()
158+
})
159+
})
160+
.into_pipeline_data(head, engine_state.signals().clone())
161+
} else {
162+
input
163+
.into_iter()
164+
.map(move |value| {
165+
each_map(value, &mut closure, head)
166+
.unwrap_or_else(|error| Value::error(error, head))
167+
})
168+
.into_pipeline_data(head, engine_state.signals().clone())
169+
};
170+
Ok(out)
129171
}
130172
PipelineData::ByteStream(stream, ..) => {
131173
let Some(chunks) = stream.chunks() else {
132174
return Ok(PipelineData::empty().set_metadata(metadata));
133175
};
134176

135177
let mut closure = ClosureEval::new(engine_state, stack, closure);
136-
Ok(chunks
137-
.map(move |result| {
138-
result
139-
.and_then(|value| each_map(value, &mut closure, head))
140-
.unwrap_or_else(|error| Value::error(error, head))
141-
})
142-
.into_pipeline_data(head, engine_state.signals().clone()))
178+
let out = if flatten {
179+
chunks
180+
.flat_map(move |result| {
181+
result
182+
.and_then(|value| closure.run_with_value(value))
183+
.unwrap_or_else(|error| {
184+
Value::error(error, head).into_pipeline_data()
185+
})
186+
})
187+
.into_pipeline_data(head, engine_state.signals().clone())
188+
} else {
189+
chunks
190+
.map(move |result| {
191+
result
192+
.and_then(|value| each_map(value, &mut closure, head))
193+
.unwrap_or_else(|error| Value::error(error, head))
194+
})
195+
.into_pipeline_data(head, engine_state.signals().clone())
196+
};
197+
Ok(out)
143198
}
144199
// This match allows non-iterables to be accepted,
145200
// which is currently considered undesirable (Nov 2022).

crates/nu-command/tests/commands/each.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,22 @@ fn each_noop_on_single_null() {
7878

7979
assert_eq!(actual.out, "nothing");
8080
}
81+
82+
#[test]
83+
fn each_flatten_dont_collect() {
84+
let collected = nu!(r##"
85+
def round [] { each {|e| print -n $"\(($e)\)"; $e } }
86+
def square [] { each {|e| print -n $"[($e)]"; $e } }
87+
[0 3] | each {|e| $e..<($e + 3) | round } | flatten | square | ignore
88+
"##);
89+
90+
assert_eq!(collected.out, r#"(0)(1)(2)[0][1][2](3)(4)(5)[3][4][5]"#);
91+
92+
let streamed = nu!(r##"
93+
def round [] { each {|e| print -n $"\(($e)\)"; $e } }
94+
def square [] { each {|e| print -n $"[($e)]"; $e } }
95+
[0 3] | each --flatten {|e| $e..<($e + 3) | round } | square | ignore
96+
"##);
97+
98+
assert_eq!(streamed.out, r#"(0)[0](1)[1](2)[2](3)[3](4)[4](5)[5]"#);
99+
}

0 commit comments

Comments
 (0)