Skip to content

Commit 308b0ca

Browse files
committed
WIP
1 parent 56fba95 commit 308b0ca

File tree

8 files changed

+333
-133
lines changed

8 files changed

+333
-133
lines changed
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
////////////////////////////////////////////////////////////////////////////
2+
//
3+
// GitReader - Lightweight Git local repository traversal library.
4+
// Copyright (c) Kouji Matsui (@kozy_kekyo, @kekyo@mi.kekyo.net)
5+
//
6+
// Licensed under Apache-v2: https://opensource.org/licenses/Apache-2.0
7+
//
8+
////////////////////////////////////////////////////////////////////////////
9+
10+
using System;
11+
using System.Collections.Generic;
12+
using System.Linq;
13+
using System.Threading;
14+
using System.Threading.Tasks;
15+
16+
namespace GitReader.Internal;
17+
18+
/// <summary>
19+
/// Concurrent scope for loosely limiting the number of concurrent tasks.
20+
/// </summary>
21+
internal sealed class LooseConcurrentScope
22+
{
23+
private int availableExecutionSeat;
24+
25+
public LooseConcurrentScope() =>
26+
// Exactly, it should be a value that depends on the I/O parallel degree that the machine can accept,
27+
// but it is very difficult to determine it automatically.
28+
// Therefore, we use the number of processors x2 as a substitute.
29+
// * https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
30+
// * Mark Russinovich, Windows Internals 2nd edition
31+
this.availableExecutionSeat = Environment.ProcessorCount * 2;
32+
33+
public LooseConcurrentScope(int targetConcurrentCount) =>
34+
this.availableExecutionSeat = targetConcurrentCount;
35+
36+
#if NET45_OR_GREATER || NETSTANDARD || NETCOREAPP2_1_OR_GREATER
37+
public ValueTask WhenAll(IEnumerable<ValueTask> tasks) =>
38+
#else
39+
public Task WhenAll(IEnumerable<Task> tasks) =>
40+
#endif
41+
this.WhenAll(default, tasks);
42+
43+
#if NET45_OR_GREATER || NETSTANDARD || NETCOREAPP2_1_OR_GREATER
44+
public async ValueTask WhenAll(CancellationToken ct, IEnumerable<ValueTask> tasks)
45+
#else
46+
public async Task WhenAll(CancellationToken ct, IEnumerable<Task> tasks)
47+
#endif
48+
{
49+
// Since no collection is done for any exceptions at all,
50+
// the behavior is strictly different from Task.WhenAll().
51+
52+
#if NET45_OR_GREATER || NETSTANDARD || NETCOREAPP2_1_OR_GREATER
53+
var candidateTasks = new List<ValueTask>();
54+
#else
55+
var candidateTasks = new List<Task>();
56+
#endif
57+
var enumerator = tasks.GetEnumerator();
58+
try
59+
{
60+
while (true)
61+
{
62+
while (true)
63+
{
64+
ct.ThrowIfCancellationRequested();
65+
66+
var isTaskAvailable = enumerator.MoveNext();
67+
if (!isTaskAvailable)
68+
{
69+
// No more tasks.
70+
break;
71+
}
72+
73+
// This task is candidate.
74+
candidateTasks.Add(enumerator.Current);
75+
76+
// When available execution seat (and reserved):
77+
var availableExecutionSeat = Interlocked.Decrement(ref this.availableExecutionSeat);
78+
if (availableExecutionSeat < 0)
79+
{
80+
// No more available execution seats, aggregation is done.
81+
break;
82+
}
83+
}
84+
85+
// Candidate only one task.
86+
if (candidateTasks.Count == 1)
87+
{
88+
try
89+
{
90+
// Execute one task directly.
91+
await candidateTasks[0];
92+
}
93+
finally
94+
{
95+
// Returns an execution seat.
96+
Interlocked.Increment(ref this.availableExecutionSeat);
97+
candidateTasks.Clear();
98+
}
99+
}
100+
// Candidate some tasks.
101+
else if (candidateTasks.Count >= 2)
102+
{
103+
try
104+
{
105+
// Execute in parallel.
106+
await Utilities.WhenAll(candidateTasks);
107+
}
108+
finally
109+
{
110+
// Returns execution seats.
111+
Interlocked.Add(ref this.availableExecutionSeat, candidateTasks.Count);
112+
candidateTasks.Clear();
113+
}
114+
}
115+
else
116+
{
117+
// Done.
118+
break;
119+
}
120+
}
121+
}
122+
finally
123+
{
124+
if (candidateTasks.Count >= 1)
125+
{
126+
// Returns execution seats.
127+
Interlocked.Add(ref this.availableExecutionSeat, candidateTasks.Count);
128+
}
129+
130+
enumerator.Dispose();
131+
}
132+
}
133+
134+
#if NET45_OR_GREATER || NETSTANDARD || NETCOREAPP
135+
public ValueTask<T[]> WhenAll<T>(IEnumerable<ValueTask<T>> tasks) =>
136+
#else
137+
public Task<T[]> WhenAll<T>(IEnumerable<Task<T>> tasks) =>
138+
#endif
139+
this.WhenAll(default, tasks);
140+
141+
#if NET45_OR_GREATER || NETSTANDARD || NETCOREAPP
142+
public async ValueTask<T[]> WhenAll<T>(CancellationToken ct, IEnumerable<ValueTask<T>> tasks)
143+
#else
144+
public async Task<T[]> WhenAll<T>(CancellationToken ct, IEnumerable<Task<T>> tasks)
145+
#endif
146+
{
147+
// Since no collection is done for any exceptions at all,
148+
// the behavior is strictly different from Task.WhenAll().
149+
150+
#if NET45_OR_GREATER || NETSTANDARD || NETCOREAPP
151+
var candidateTasks = new List<ValueTask<T>>();
152+
#else
153+
var candidateTasks = new List<Task<T>>();
154+
#endif
155+
var enumerator = tasks.GetEnumerator();
156+
try
157+
{
158+
var results = new List<T>();
159+
while (true)
160+
{
161+
while (true)
162+
{
163+
ct.ThrowIfCancellationRequested();
164+
165+
var isTaskAvailable = enumerator.MoveNext();
166+
if (!isTaskAvailable)
167+
{
168+
// No more tasks.
169+
break;
170+
}
171+
172+
// This task is candidate.
173+
candidateTasks.Add(enumerator.Current);
174+
175+
// When available execution seat (and reserved):
176+
var availableExecutionSeat = Interlocked.Decrement(ref this.availableExecutionSeat);
177+
if (availableExecutionSeat < 0)
178+
{
179+
// No more available execution seats, aggregation is done.
180+
break;
181+
}
182+
}
183+
184+
// Candidate only one task.
185+
if (candidateTasks.Count == 1)
186+
{
187+
try
188+
{
189+
// Execute one task directly.
190+
results.Add(await candidateTasks[0]);
191+
}
192+
finally
193+
{
194+
// Returns an execution seat.
195+
Interlocked.Increment(ref this.availableExecutionSeat);
196+
candidateTasks.Clear();
197+
}
198+
}
199+
// Candidate some tasks.
200+
else if (candidateTasks.Count >= 2)
201+
{
202+
try
203+
{
204+
// Execute in parallel.
205+
results.AddRange(await Utilities.WhenAll(candidateTasks));
206+
}
207+
finally
208+
{
209+
// Returns execution seats.
210+
Interlocked.Add(ref this.availableExecutionSeat, candidateTasks.Count);
211+
candidateTasks.Clear();
212+
}
213+
}
214+
else
215+
{
216+
// Done.
217+
break;
218+
}
219+
}
220+
return results.ToArray();
221+
}
222+
finally
223+
{
224+
if (candidateTasks.Count >= 1)
225+
{
226+
// Returns execution seats.
227+
Interlocked.Add(ref this.availableExecutionSeat, candidateTasks.Count);
228+
}
229+
230+
enumerator.Dispose();
231+
}
232+
}
233+
234+
public static readonly LooseConcurrentScope Default = new();
235+
}

GitReader.Core/Internal/ObjectAccessor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,7 @@ await MemoizedStream.CreateAsync(zlibStream, (long)objectSize, this.pool, this.f
621621
#endif
622622
{
623623
var files = await this.fileSystem.GetFilesAsync(this.packedBasePath, "pack-*.idx", ct);
624-
var entries = await Utilities.WhenAll(
624+
var entries = await LooseConcurrentScope.Default.WhenAll(ct,
625625
files.Select(indexFilePath =>
626626
this.GetOrCacheIndexEntryAsync(indexFilePath.Substring(this.packedBasePath.Length + 1), ct)));
627627

GitReader.Core/Internal/RepositoryAccessor.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -216,19 +216,19 @@ public static async Task<CandidateFilePath[]> GetCandidateFilePathsAsync(
216216
string relativePathFromGitPath,
217217
string match,
218218
CancellationToken ct) =>
219-
(await Utilities.WhenAll(repository.TryingPathList.
219+
(await LooseConcurrentScope.Default.WhenAll(ct, repository.TryingPathList.
220220
#if NET45_OR_GREATER || NETSTANDARD || NETCOREAPP
221221
Select((Func<string, ValueTask<CandidateFilePath[]>>)(async gitPath =>
222222
#else
223223
Select((async gitPath =>
224224
#endif
225-
{
226-
var basePath = repository.fileSystem.Combine(gitPath, relativePathFromGitPath);
227-
var candidatePaths = await repository.fileSystem.GetFilesAsync(basePath, match, ct);
228-
return candidatePaths.
229-
Select(candidatePath => new CandidateFilePath(gitPath, basePath, candidatePath)).
230-
ToArray();
231-
})))).
225+
{
226+
var basePath = repository.fileSystem.Combine(gitPath, relativePathFromGitPath);
227+
var candidatePaths = await repository.fileSystem.GetFilesAsync(basePath, match, ct);
228+
return candidatePaths.
229+
Select(candidatePath => new CandidateFilePath(gitPath, basePath, candidatePath)).
230+
ToArray();
231+
})))).
232232
SelectMany(paths => paths).
233233
ToArray();
234234

@@ -616,7 +616,7 @@ public static async Task<PrimitiveReference[]> ReadReferencesAsync(
616616
{
617617
var candidatePaths = await GetCandidateFilePathsAsync(
618618
repository, repository.fileSystem.Combine("refs", GetReferenceTypeName(type)), "*", ct);
619-
var references = (await Utilities.WhenAll(
619+
var references = (await LooseConcurrentScope.Default.WhenAll(ct,
620620
candidatePaths.
621621
#if NET45_OR_GREATER || NETSTANDARD || NETCOREAPP
622622
Select((Func<CandidateFilePath, ValueTask<PrimitiveReference?>>)(async cp =>
@@ -682,7 +682,7 @@ public static async Task<PrimitiveTagReference[]> ReadTagReferencesAsync(
682682
{
683683
var candidatePaths = await GetCandidateFilePathsAsync(
684684
repository, repository.fileSystem.Combine("refs", "tags"), "*", ct);
685-
var references = (await Utilities.WhenAll(
685+
var references = (await LooseConcurrentScope.Default.WhenAll(ct,
686686
candidatePaths.
687687
#if NET45_OR_GREATER || NETSTANDARD || NETCOREAPP
688688
Select((Func<CandidateFilePath, ValueTask<PrimitiveTagReference?>>)(async cp =>

GitReader.Core/Internal/Utilities.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,9 +571,20 @@ public static async ValueTask<PairResult<T0, T1, T2, T3, T4>> Join<T0, T1, T2, T
571571
#if NET35 || NET40
572572
public static Task<T> FromResult<T>(T result) =>
573573
TaskEx.FromResult(result);
574+
575+
public static Task CompletedTask =>
576+
TaskEx.FromResult(0);
574577
#else
575578
public static Task<T> FromResult<T>(T result) =>
576579
Task.FromResult(result);
580+
581+
#if NET45
582+
public static Task CompletedTask =>
583+
Task.FromResult(0);
584+
#else
585+
public static Task CompletedTask =>
586+
Task.CompletedTask;
587+
#endif
577588
#endif
578589

579590
#if NET35 || NET40

0 commit comments

Comments
 (0)