Skip to content

Commit b42d0e4

Browse files
committed
sort: Rework merge batching logic
Fix bug #6944 Rework the way batching is done with sort such that it doesn't open more input files than necessary. Previously, the code would always open one extra input file which causes problems in ulimit scenarios. Add additional test case.
1 parent a259952 commit b42d0e4

File tree

4 files changed

+67
-41
lines changed

4 files changed

+67
-41
lines changed

src/uu/sort/src/ext_sort.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,12 @@ fn reader_writer<
9898
)?;
9999
match read_result {
100100
ReadResult::WroteChunksToFile { tmp_files } => {
101-
let merger = merge::merge_with_file_limit::<_, _, Tmp>(
101+
merge::merge_with_file_limit::<_, _, Tmp>(
102102
tmp_files.into_iter().map(|c| c.reopen()),
103103
settings,
104+
output,
104105
tmp_dir,
105106
)?;
106-
merger.write_all(settings, output)?;
107107
}
108108
ReadResult::SortedSingleChunk(chunk) => {
109109
if settings.unique {

src/uu/sort/src/merge.rs

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use std::{
2525
};
2626

2727
use compare::Compare;
28-
use itertools::Itertools;
2928
use uucore::error::UResult;
3029

3130
use crate::{
@@ -67,58 +66,63 @@ fn replace_output_file_in_input_files(
6766
///
6867
/// If `settings.merge_batch_size` is greater than the length of `files`, intermediate files will be used.
6968
/// If `settings.compress_prog` is `Some`, intermediate files will be compressed with it.
70-
pub fn merge<'a>(
69+
pub fn merge(
7170
files: &mut [OsString],
72-
settings: &'a GlobalSettings,
73-
output: Option<&str>,
71+
settings: &GlobalSettings,
72+
output: Output,
7473
tmp_dir: &mut TmpDirWrapper,
75-
) -> UResult<FileMerger<'a>> {
76-
replace_output_file_in_input_files(files, output, tmp_dir)?;
74+
) -> UResult<()> {
75+
replace_output_file_in_input_files(files, output.as_output_name(), tmp_dir)?;
76+
let files = files
77+
.iter()
78+
.map(|file| open(file).map(|file| PlainMergeInput { inner: file }));
7779
if settings.compress_prog.is_none() {
78-
merge_with_file_limit::<_, _, WriteablePlainTmpFile>(
79-
files
80-
.iter()
81-
.map(|file| open(file).map(|file| PlainMergeInput { inner: file })),
82-
settings,
83-
tmp_dir,
84-
)
80+
merge_with_file_limit::<_, _, WriteablePlainTmpFile>(files, settings, output, tmp_dir)
8581
} else {
86-
merge_with_file_limit::<_, _, WriteableCompressedTmpFile>(
87-
files
88-
.iter()
89-
.map(|file| open(file).map(|file| PlainMergeInput { inner: file })),
90-
settings,
91-
tmp_dir,
92-
)
82+
merge_with_file_limit::<_, _, WriteableCompressedTmpFile>(files, settings, output, tmp_dir)
9383
}
9484
}
9585

9686
// Merge already sorted `MergeInput`s.
9787
pub fn merge_with_file_limit<
98-
'a,
9988
M: MergeInput + 'static,
10089
F: ExactSizeIterator<Item = UResult<M>>,
10190
Tmp: WriteableTmpFile + 'static,
10291
>(
10392
files: F,
104-
settings: &'a GlobalSettings,
93+
settings: &GlobalSettings,
94+
output: Output,
10595
tmp_dir: &mut TmpDirWrapper,
106-
) -> UResult<FileMerger<'a>> {
107-
if files.len() > settings.merge_batch_size {
108-
let mut remaining_files = files.len();
109-
let batches = files.chunks(settings.merge_batch_size);
110-
let mut batches = batches.into_iter();
96+
) -> UResult<()> {
97+
if files.len() <= settings.merge_batch_size {
98+
let merger = merge_without_limit(files, settings);
99+
merger?.write_all(settings, output)
100+
} else {
111101
let mut temporary_files = vec![];
112-
while remaining_files != 0 {
113-
// Work around the fact that `Chunks` is not an `ExactSizeIterator`.
114-
remaining_files = remaining_files.saturating_sub(settings.merge_batch_size);
115-
let merger = merge_without_limit(batches.next().unwrap(), settings)?;
102+
let mut batch = vec![];
103+
for file in files {
104+
batch.push(file);
105+
if batch.len() >= settings.merge_batch_size {
106+
assert_eq!(batch.len(), settings.merge_batch_size);
107+
let merger = merge_without_limit(batch.into_iter(), settings)?;
108+
batch = vec![];
109+
110+
let mut tmp_file =
111+
Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
112+
merger.write_all_to(settings, tmp_file.as_write())?;
113+
temporary_files.push(tmp_file.finished_writing()?);
114+
}
115+
}
116+
// Merge any remaining files that didn't get merged in a full batch above.
117+
if !batch.is_empty() {
118+
assert!(batch.len() < settings.merge_batch_size);
119+
let merger = merge_without_limit(batch.into_iter(), settings)?;
120+
116121
let mut tmp_file =
117122
Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
118123
merger.write_all_to(settings, tmp_file.as_write())?;
119124
temporary_files.push(tmp_file.finished_writing()?);
120125
}
121-
assert!(batches.next().is_none());
122126
merge_with_file_limit::<_, _, Tmp>(
123127
temporary_files
124128
.into_iter()
@@ -127,10 +131,9 @@ pub fn merge_with_file_limit<
127131
dyn FnMut(Tmp::Closed) -> UResult<<Tmp::Closed as ClosedTmpFile>::Reopened>,
128132
>),
129133
settings,
134+
output,
130135
tmp_dir,
131136
)
132-
} else {
133-
merge_without_limit(files, settings)
134137
}
135138
}
136139

@@ -260,7 +263,7 @@ struct PreviousLine {
260263
}
261264

262265
/// Merges files together. This is **not** an iterator because of lifetime problems.
263-
pub struct FileMerger<'a> {
266+
struct FileMerger<'a> {
264267
heap: binary_heap_plus::BinaryHeap<MergeableFile, FileComparator<'a>>,
265268
request_sender: Sender<(usize, RecycledChunk)>,
266269
prev: Option<PreviousLine>,
@@ -269,12 +272,12 @@ pub struct FileMerger<'a> {
269272

270273
impl FileMerger<'_> {
271274
/// Write the merged contents to the output file.
272-
pub fn write_all(self, settings: &GlobalSettings, output: Output) -> UResult<()> {
275+
fn write_all(self, settings: &GlobalSettings, output: Output) -> UResult<()> {
273276
let mut out = output.into_write();
274277
self.write_all_to(settings, &mut out)
275278
}
276279

277-
pub fn write_all_to(mut self, settings: &GlobalSettings, out: &mut impl Write) -> UResult<()> {
280+
fn write_all_to(mut self, settings: &GlobalSettings, out: &mut impl Write) -> UResult<()> {
278281
while self.write_next(settings, out) {}
279282
drop(self.request_sender);
280283
self.reader_join_handle.join().unwrap()

src/uu/sort/src/sort.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1567,8 +1567,7 @@ fn exec(
15671567
tmp_dir: &mut TmpDirWrapper,
15681568
) -> UResult<()> {
15691569
if settings.merge {
1570-
let file_merger = merge::merge(files, settings, output.as_output_name(), tmp_dir)?;
1571-
file_merger.write_all(settings, output)
1570+
merge::merge(files, settings, output, tmp_dir)
15721571
} else if settings.check {
15731572
if files.len() > 1 {
15741573
Err(UUsageError::new(2, "only one file allowed with -c"))

tests/by-util/test_sort.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1084,6 +1084,30 @@ fn test_merge_batch_size() {
10841084
.stdout_only_fixture("merge_ints_interleaved.expected");
10851085
}
10861086

1087+
#[test]
1088+
fn test_merge_batch_size_with_limit() {
1089+
use rlimit::Resource;
1090+
// Currently need...
1091+
// 3 descriptors for stdin, stdout, stderr
1092+
// 2 descriptors for CTRL+C handling logic (to be reworked at some point)
1093+
// 2 descriptors for the input files (i.e. batch-size of 2).
1094+
let limit_fd = 3 + 2 + 2;
1095+
TestScenario::new(util_name!())
1096+
.ucmd()
1097+
.limit(Resource::NOFILE, limit_fd, limit_fd)
1098+
.arg("--batch-size=2")
1099+
.arg("-m")
1100+
.arg("--unique")
1101+
.arg("merge_ints_interleaved_1.txt")
1102+
.arg("merge_ints_interleaved_2.txt")
1103+
.arg("merge_ints_interleaved_3.txt")
1104+
.arg("merge_ints_interleaved_3.txt")
1105+
.arg("merge_ints_interleaved_2.txt")
1106+
.arg("merge_ints_interleaved_1.txt")
1107+
.succeeds()
1108+
.stdout_only_fixture("merge_ints_interleaved.expected");
1109+
}
1110+
10871111
#[test]
10881112
fn test_sigpipe_panic() {
10891113
let mut cmd = new_ucmd!();

0 commit comments

Comments
 (0)