|
| 1 | +use rayon::{ |
| 2 | + prelude::ParallelIterator, |
| 3 | + slice::{ParallelSlice, ParallelSliceMut}, |
| 4 | +}; |
| 5 | + |
1 | 6 | use crate::{
|
2 | 7 | asyncjob::{AsyncJob, RunParams},
|
3 | 8 | error::Result,
|
4 | 9 | sync::{self, CommitId, RepoPath, SharedCommitFilterFn},
|
5 | 10 | AsyncGitNotification, ProgressPercent,
|
6 | 11 | };
|
7 | 12 | use std::{
|
8 |
| - sync::{Arc, Mutex}, |
| 13 | + sync::{atomic::AtomicUsize, Arc, Mutex}, |
9 | 14 | time::{Duration, Instant},
|
10 | 15 | };
|
11 | 16 |
|
@@ -69,41 +74,73 @@ impl AsyncCommitFilterJob {
|
69 | 74 | commits: Vec<CommitId>,
|
70 | 75 | params: &RunParams<AsyncGitNotification, ProgressPercent>,
|
71 | 76 | ) -> JobState {
|
72 |
| - let response = sync::repo(repo_path) |
73 |
| - .map(|repo| self.filter_commits(&repo, commits, params)) |
| 77 | + let result = self |
| 78 | + .filter_commits(repo_path, commits, params) |
74 | 79 | .map(|(start, result)| CommitFilterResult {
|
75 | 80 | result,
|
76 | 81 | duration: start.elapsed(),
|
77 | 82 | });
|
78 | 83 |
|
79 |
| - JobState::Response(response) |
| 84 | + JobState::Response(result) |
80 | 85 | }
|
81 | 86 |
|
82 | 87 | fn filter_commits(
|
83 | 88 | &self,
|
84 |
| - repo: &git2::Repository, |
| 89 | + repo_path: &RepoPath, |
85 | 90 | commits: Vec<CommitId>,
|
86 | 91 | params: &RunParams<AsyncGitNotification, ProgressPercent>,
|
87 |
| - ) -> (Instant, Vec<CommitId>) { |
| 92 | + ) -> Result<(Instant, Vec<CommitId>)> { |
88 | 93 | let total_amount = commits.len();
|
89 | 94 | let start = Instant::now();
|
90 | 95 |
|
91 |
| - let result = commits |
92 |
| - .into_iter() |
93 |
| - .enumerate() |
94 |
| - .filter_map(|(idx, c)| { |
95 |
| - Self::update_progress( |
96 |
| - params, |
97 |
| - ProgressPercent::new(idx, total_amount), |
98 |
| - ); |
99 |
| - |
100 |
| - (*self.filter)(repo, &c) |
101 |
| - .ok() |
102 |
| - .and_then(|res| res.then_some(c)) |
103 |
| - }) |
104 |
| - .collect::<Vec<_>>(); |
105 |
| - |
106 |
| - (start, result) |
| 96 | + //note: for some reason >4 threads degrades search performance |
| 97 | + let pool = |
| 98 | + rayon::ThreadPoolBuilder::new().num_threads(4).build()?; |
| 99 | + |
| 100 | + let idx = AtomicUsize::new(0); |
| 101 | + |
| 102 | + let mut result = pool.install(|| { |
| 103 | + commits |
| 104 | + .into_iter() |
| 105 | + .enumerate() |
| 106 | + .collect::<Vec<(usize, CommitId)>>() |
| 107 | + .par_chunks(1000) |
| 108 | + .filter_map(|c| { |
| 109 | + //TODO: error log repo open errors |
| 110 | + sync::repo(repo_path).ok().map(|repo| { |
| 111 | + c.iter() |
| 112 | + .filter_map(|(e, c)| { |
| 113 | + let idx = idx.fetch_add( |
| 114 | + 1, |
| 115 | + std::sync::atomic::Ordering::Relaxed, |
| 116 | + ); |
| 117 | + |
| 118 | + Self::update_progress( |
| 119 | + params, |
| 120 | + ProgressPercent::new( |
| 121 | + idx, |
| 122 | + total_amount, |
| 123 | + ), |
| 124 | + ); |
| 125 | + |
| 126 | + (*self.filter)(&repo, c) |
| 127 | + .ok() |
| 128 | + .and_then(|res| { |
| 129 | + res.then_some((*e, *c)) |
| 130 | + }) |
| 131 | + }) |
| 132 | + .collect::<Vec<_>>() |
| 133 | + }) |
| 134 | + }) |
| 135 | + .flatten() |
| 136 | + .collect::<Vec<_>>() |
| 137 | + }); |
| 138 | + |
| 139 | + result.par_sort_by(|a, b| a.0.cmp(&b.0)); |
| 140 | + |
| 141 | + let result = result.into_iter().map(|c| c.1).collect(); |
| 142 | + |
| 143 | + Ok((start, result)) |
107 | 144 | }
|
108 | 145 |
|
109 | 146 | fn update_progress(
|
|
0 commit comments