Skip to content

Commit a24b91a

Browse files
authored
Continuation of #2648 (#2651)
* Modified ForEachAsync to release items as processed #2644 Memory leak is caused by WhenAll that expects all of the tasks in the bulk operations to complete to further invoke done() delegate in ForEachAsync. Referencing all of the tasks in WhenAll causes them to prevent GC from collecting processed items. * Continuation of #2648 @sergii-sakharov rightfully pointed out the Task.WhenAll is not the most resource friendly solution and replaced it with bounded list we await WhenAny is released and then continue. That change had impact though on the way we relied on Task.WhenAll to be blocking and being able to catch exceptions in Subscribe. This is now no longer possible because we return early from calling an async method. This is fine in our case because the returned Observable allows you to track and dispose that async process. This also introduces Wait(TimeSpan, Action onNext) extension methods for period and have the exceptions propagated out of the subscribe as expected. :w * undo change to test.default.yaml
1 parent a4890fc commit a24b91a

File tree

10 files changed

+191
-114
lines changed

10 files changed

+191
-114
lines changed

src/Nest/CommonAbstractions/Extensions/Extensions.cs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -206,22 +206,39 @@ internal static Dictionary<TKey, TValue> NullIfNoKeys<TKey, TValue>(this Diction
206206

207207
internal static IEnumerable<T> EmptyIfNull<T>(this IEnumerable<T> xs) => xs ?? new T[0];
208208

209-
internal static Task ForEachAsync<TSource, TResult>(
209+
internal static async Task ForEachAsync<TSource, TResult>(
210210
this IEnumerable<TSource> lazyList,
211211
Func<TSource, long, Task<TResult>> taskSelector,
212212
Action<TSource, TResult> resultProcessor,
213-
Action<Task> done,
213+
Action<Exception> done,
214214
int maxDegreeOfParallelism,
215215
SemaphoreSlim additionalRateLimitter = null
216216
)
217217
{
218218
var semaphore = new SemaphoreSlim(initialCount: maxDegreeOfParallelism, maxCount: maxDegreeOfParallelism);
219219
long page = 0;
220220

221-
return Task.WhenAll(
222-
from item in lazyList
223-
select ProcessAsync<TSource, TResult>(item, taskSelector, resultProcessor, semaphore, additionalRateLimitter, page++)
224-
).ContinueWith(done);
221+
try
222+
{
223+
var tasks = new List<Task>();
224+
foreach (var item in lazyList)
225+
{
226+
tasks.Add(ProcessAsync(item, taskSelector, resultProcessor, semaphore, additionalRateLimitter, page++));
227+
if (tasks.Count <= maxDegreeOfParallelism)
228+
continue;
229+
230+
var task = await Task.WhenAny(tasks);
231+
tasks.Remove(task);
232+
}
233+
234+
await Task.WhenAll(tasks);
235+
done(null);
236+
}
237+
catch (Exception e)
238+
{
239+
done(e);
240+
throw;
241+
}
225242
}
226243

227244
private static async Task ProcessAsync<TSource, TResult>(
@@ -239,10 +256,6 @@ private static async Task ProcessAsync<TSource, TResult>(
239256
var result = await taskSelector(item, page).ConfigureAwait(false);
240257
resultProcessor(item, result);
241258
}
242-
catch
243-
{
244-
throw;
245-
}
246259
finally
247260
{
248261
localRateLimiter?.Release();
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
using System;
2+
using System.Threading;
3+
4+
namespace Nest
5+
{
6+
public static class BlockingSubscribeExtensions
7+
{
8+
public static BulkAllObserver Wait<T>(this BulkAllObservable<T> observable, TimeSpan maximumRunTime, Action<IBulkAllResponse> onNext)
9+
where T : class =>
10+
WaitOnObservable<BulkAllObservable<T>, IBulkAllResponse, BulkAllObserver>(
11+
observable, maximumRunTime, (e, c) => new BulkAllObserver(onNext, e, c));
12+
13+
public static ScrollAllObserver<T> Wait<T>(this IObservable<IScrollAllResponse<T>> observable, TimeSpan maximumRunTime, Action<IScrollAllResponse<T>> onNext)
14+
where T : class =>
15+
WaitOnObservable<IObservable<IScrollAllResponse<T>>, IScrollAllResponse<T>, ScrollAllObserver<T>>(
16+
observable, maximumRunTime, (e, c) => new ScrollAllObserver<T>(onNext, e, c));
17+
18+
public static ReindexObserver Wait(this IObservable<IBulkAllResponse> observable, TimeSpan maximumRunTime, Action<IBulkAllResponse> onNext) =>
19+
WaitOnObservable<IObservable<IBulkAllResponse>, IBulkAllResponse, ReindexObserver>(
20+
observable, maximumRunTime, (e, c) => new ReindexObserver(onNext, e, c));
21+
22+
private static TObserver WaitOnObservable<TObservable, TObserve, TObserver>(
23+
TObservable observable,
24+
TimeSpan maximumRunTime,
25+
Func<Action<Exception>, Action, TObserver> factory
26+
)
27+
where TObservable : IObservable<TObserve>
28+
where TObserver : IObserver<TObserve>
29+
{
30+
observable.ThrowIfNull(nameof(observable));
31+
maximumRunTime.ThrowIfNull(nameof(maximumRunTime));
32+
Exception ex = null;
33+
var handle = new ManualResetEvent(false);
34+
var observer = factory(
35+
e =>
36+
{
37+
ex = e;
38+
handle.Set();
39+
},
40+
() => handle.Set()
41+
);
42+
observable.Subscribe(observer);
43+
handle.WaitOne(maximumRunTime);
44+
if (ex != null) throw ex;
45+
return observer;
46+
}
47+
}
48+
}

src/Nest/Document/Multiple/BulkAll/BulkAllObservable.cs

Lines changed: 40 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ public BulkAllObservable(
2525
IElasticClient client,
2626
IBulkAllRequest<T> partionedBulkRequest,
2727
CancellationToken cancellationToken = default(CancellationToken)
28-
)
28+
)
2929
{
3030
this._client = client;
3131
this._partionedBulkRequest = partionedBulkRequest;
3232
this._backOffRetries = this._partionedBulkRequest.BackOffRetries.GetValueOrDefault(CoordinatedRequestDefaults.BulkAllBackOffRetriesDefault);
33-
this._backOffTime = (this._partionedBulkRequest?.BackOffTime?.ToTimeSpan() ?? CoordinatedRequestDefaults.BulkAllBackOffTimeDefault);
33+
this._backOffTime = (this._partionedBulkRequest?.BackOffTime?.ToTimeSpan() ?? CoordinatedRequestDefaults.BulkAllBackOffTimeDefault);
3434
this._bulkSize = this._partionedBulkRequest.Size ?? CoordinatedRequestDefaults.BulkAllSizeDefault;
3535
this._maxDegreeOfParallelism = _partionedBulkRequest.MaxDegreeOfParallelism ?? CoordinatedRequestDefaults.BulkAllMaxDegreeOfParallelismDefault;
3636

@@ -42,20 +42,13 @@ public IDisposable Subscribe(BulkAllObserver observer)
4242
{
4343
_incrementFailed = observer.IncrementTotalNumberOfFailedBuffers;
4444
_incrementRetries = observer.IncrementTotalNumberOfRetries;
45-
return this.Subscribe((IObserver<IBulkAllResponse>)observer);
45+
return this.Subscribe((IObserver<IBulkAllResponse>) observer);
4646
}
4747

4848
public IDisposable Subscribe(IObserver<IBulkAllResponse> observer)
4949
{
5050
observer.ThrowIfNull(nameof(observer));
51-
try
52-
{
53-
this.BulkAll(observer);
54-
}
55-
catch (Exception e)
56-
{
57-
observer.OnError(e);
58-
}
51+
this.BulkAll(observer);
5952
return this;
6053
}
6154

@@ -66,62 +59,67 @@ private void BulkAll(IObserver<IBulkAllResponse> observer)
6659
{
6760
var documents = this._partionedBulkRequest.Documents;
6861
var partioned = new PartitionHelper<T>(documents, this._bulkSize);
62+
#pragma warning disable 4014
6963
partioned.ForEachAsync(
64+
#pragma warning restore 4014
7065
(buffer, page) => this.BulkAsync(buffer, page, 0),
7166
(buffer, response) => observer.OnNext(response),
72-
t => OnCompleted(t, observer),
67+
ex => OnCompleted(ex, observer),
7368
this._maxDegreeOfParallelism
7469
);
7570
}
7671

77-
private void OnCompleted(Task task, IObserver<IBulkAllResponse> observer)
72+
private void OnCompleted(Exception exception, IObserver<IBulkAllResponse> observer)
7873
{
79-
switch (task.Status)
74+
if (exception != null)
75+
observer.OnError(exception);
76+
else
8077
{
81-
case System.Threading.Tasks.TaskStatus.RanToCompletion:
82-
if (this._partionedBulkRequest.RefreshOnCompleted)
83-
{
84-
var refresh = this._client.Refresh(this._partionedBulkRequest.Index);
85-
if (!refresh.IsValid) throw Throw($"Refreshing after all documents have indexed failed", refresh.ApiCall);
86-
}
78+
try
79+
{
80+
RefreshOnCompleted();
8781
observer.OnCompleted();
88-
break;
89-
case System.Threading.Tasks.TaskStatus.Faulted:
90-
observer.OnError(task.Exception.InnerException);
91-
break;
92-
case System.Threading.Tasks.TaskStatus.Canceled:
93-
observer.OnError(new TaskCanceledException(task));
94-
break;
95-
default:
96-
throw new ArgumentOutOfRangeException();
82+
}
83+
catch (Exception e)
84+
{
85+
observer.OnError(e);
86+
}
9787
}
9888
}
9989

90+
private void RefreshOnCompleted()
91+
{
92+
if (!this._partionedBulkRequest.RefreshOnCompleted) return;
93+
var refresh = this._client.Refresh(this._partionedBulkRequest.Index);
94+
if (!refresh.IsValid) throw Throw($"Refreshing after all documents have indexed failed", refresh.ApiCall);
95+
}
96+
10097
private async Task<IBulkAllResponse> BulkAsync(IList<T> buffer, long page, int backOffRetries)
10198
{
10299
this._compositeCancelToken.ThrowIfCancellationRequested();
103100

104101
var r = this._partionedBulkRequest;
105102
var response = await this._client.BulkAsync(s =>
106-
{
107-
s.Index(r.Index).Type(r.Type);
108-
if (r.BufferToBulk != null) r.BufferToBulk(s, buffer);
109-
else s.IndexMany(buffer);
110-
if (!string.IsNullOrEmpty(r.Pipeline)) s.Pipeline(r.Pipeline);
111-
if (r.Refresh.HasValue) s.Refresh(r.Refresh.Value);
112-
if (!string.IsNullOrEmpty(r.Routing)) s.Routing(r.Routing);
113-
if (r.WaitForActiveShards.HasValue) s.WaitForActiveShards(r.WaitForActiveShards.ToString());
103+
{
104+
s.Index(r.Index).Type(r.Type);
105+
if (r.BufferToBulk != null) r.BufferToBulk(s, buffer);
106+
else s.IndexMany(buffer);
107+
if (!string.IsNullOrEmpty(r.Pipeline)) s.Pipeline(r.Pipeline);
108+
if (r.Refresh.HasValue) s.Refresh(r.Refresh.Value);
109+
if (!string.IsNullOrEmpty(r.Routing)) s.Routing(r.Routing);
110+
if (r.WaitForActiveShards.HasValue) s.WaitForActiveShards(r.WaitForActiveShards.ToString());
114111

115-
return s;
116-
}, this._compositeCancelToken).ConfigureAwait(false);
112+
return s;
113+
}, this._compositeCancelToken)
114+
.ConfigureAwait(false);
117115

118116
this._compositeCancelToken.ThrowIfCancellationRequested();
119117
if (!response.IsValid && backOffRetries < this._backOffRetries)
120118
{
121119
this._incrementRetries();
122120
//wait before or after fishing out retriable docs?
123121
await Task.Delay(this._backOffTime, this._compositeCancelToken).ConfigureAwait(false);
124-
var retryDocuments = response.Items.Zip(buffer, (i, d) => new { i, d })
122+
var retryDocuments = response.Items.Zip(buffer, (i, d) => new {i, d})
125123
.Where(x => x.i.Status == 429)
126124
.Select(x => x.d)
127125
.ToList();
@@ -135,10 +133,11 @@ private async Task<IBulkAllResponse> BulkAsync(IList<T> buffer, long page, int b
135133
throw Throw($"Bulk indexing failed and after retrying {backOffRetries} times", response.ApiCall);
136134
}
137135
this._partionedBulkRequest.BackPressure?.Release();
138-
return new BulkAllResponse { Retries = backOffRetries, Page = page };
136+
return new BulkAllResponse {Retries = backOffRetries, Page = page};
139137
}
140138

141139
public bool IsDisposed { get; private set; }
140+
142141
public void Dispose()
143142
{
144143
this.IsDisposed = true;

src/Nest/Document/Multiple/ScrollAll/ScrollAllObservable.cs

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public class ScrollAllObservable<T> : IDisposable, IObservable<IScrollAllRespons
2222
private readonly ProducerConsumerBackPressure _backPressure;
2323

2424
public ScrollAllObservable(
25-
IElasticClient client,
25+
IElasticClient client,
2626
IScrollAllRequest scrollAllRequest,
2727
CancellationToken cancellationToken = default(CancellationToken)
2828
)
@@ -41,14 +41,7 @@ public ScrollAllObservable(
4141
public IDisposable Subscribe(IObserver<IScrollAllResponse<T>> observer)
4242
{
4343
observer.ThrowIfNull(nameof(observer));
44-
try
45-
{
46-
this.ScrollAll(observer);
47-
}
48-
catch (Exception e)
49-
{
50-
observer.OnError(e);
51-
}
44+
this.ScrollAll(observer);
5245
return this;
5346
}
5447

@@ -57,7 +50,9 @@ private void ScrollAll(IObserver<IScrollAllResponse<T>> observer)
5750
var slices = this._scrollAllRequest.Slices;
5851
var maxSlicesAtOnce = this._scrollAllRequest.MaxDegreeOfParallelism ?? this._scrollAllRequest.Slices;
5952

60-
Enumerable.Range(0, slices).ForEachAsync<int, bool>(
53+
#pragma warning disable 4014
54+
Enumerable.Range(0, slices).ForEachAsync(
55+
#pragma warning restore 4014
6156
(slice, l) => this.ScrollSliceAsync(observer, slice),
6257
(slice, r) => { },
6358
t => OnCompleted(t, observer),
@@ -128,20 +123,12 @@ private async Task<ISearchResponse<T>> InitiateSearchAsync(int slice)
128123
finally { _scrollInitiationLock.Release(); }
129124
}
130125

131-
private static void OnCompleted(Task task, IObserver<IScrollAllResponse<T>> observer)
126+
private static void OnCompleted(Exception exception, IObserver<IScrollAllResponse<T>> observer)
132127
{
133-
switch (task.Status)
134-
{
135-
case System.Threading.Tasks.TaskStatus.RanToCompletion:
136-
observer.OnCompleted();
137-
break;
138-
case System.Threading.Tasks.TaskStatus.Faulted:
139-
observer.OnError(task.Exception.InnerException);
140-
break;
141-
case System.Threading.Tasks.TaskStatus.Canceled:
142-
observer.OnError(new TaskCanceledException(task));
143-
break;
144-
}
128+
if (exception == null)
129+
observer.OnCompleted();
130+
else
131+
observer.OnError(exception);
145132
}
146133

147134
public bool IsDisposed { get; private set; }

src/Nest/Document/Multiple/ScrollAll/ScrollAllObserver.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,6 @@ public ScrollAllObserver(
1111
: base(onNext, onError, onCompleted) { }
1212

1313
}
14+
15+
1416
}

src/Nest/Nest.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,7 @@
482482
<Compile Include="CommonAbstractions\LowLevelDispatch\IHighLevelToLowLevelDispatcher.cs" />
483483
<Compile Include="CommonAbstractions\LowLevelDispatch\LowLevelDispatch.cs" />
484484
<Compile Include="CommonAbstractions\RawJson\RawJson.cs" />
485+
<Compile Include="CommonAbstractions\Reactive\BlockingSubscribeExtensions.cs" />
485486
<Compile Include="CommonAbstractions\Reactive\CoordinatedRequestObserverBase.cs" />
486487
<Compile Include="CommonAbstractions\Reactive\GetEnumerator.cs" />
487488
<Compile Include="CommonAbstractions\Reactive\PartitionHelper.cs" />

0 commit comments

Comments
 (0)