Skip to content

Commit 3568e10

Browse files
author
Oren (electricessence)
committed
Reformat and reference update.
1 parent 8443b45 commit 3568e10

12 files changed

+892
-895
lines changed

Extensions.ActionBlock.cs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,24 @@
44

55
namespace Open.Threading.Dataflow
66
{
7-
public static class ActionBlock
8-
{
9-
public static ActionBlock<T> New<T>(Action<T> action)
10-
=> new ActionBlock<T>(action);
7+
public static class ActionBlock
8+
{
9+
public static ActionBlock<T> New<T>(Action<T> action)
10+
=> new ActionBlock<T>(action);
1111

12-
public static ActionBlock<T> New<T>(Action<T> action, ExecutionDataflowBlockOptions options)
13-
=> new ActionBlock<T>(action, options);
12+
public static ActionBlock<T> New<T>(Action<T> action, ExecutionDataflowBlockOptions options)
13+
=> new ActionBlock<T>(action, options);
1414

15-
public static ActionBlock<T> New<T>(Action<T> consumer, int maxParallel)
16-
=> new ActionBlock<T>(consumer, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = maxParallel });
15+
public static ActionBlock<T> New<T>(Action<T> consumer, int maxParallel)
16+
=> new ActionBlock<T>(consumer, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = maxParallel });
1717

18-
public static ActionBlock<T> NewAsync<T>(Func<T, Task> action)
19-
=> new ActionBlock<T>(action);
18+
public static ActionBlock<T> NewAsync<T>(Func<T, Task> action)
19+
=> new ActionBlock<T>(action);
2020

21-
public static ActionBlock<T> NewAsync<T>(Func<T, Task> action, ExecutionDataflowBlockOptions options)
22-
=> new ActionBlock<T>(action, options);
21+
public static ActionBlock<T> NewAsync<T>(Func<T, Task> action, ExecutionDataflowBlockOptions options)
22+
=> new ActionBlock<T>(action, options);
2323

24-
public static ActionBlock<T> NewAsync<T>(Func<T, Task> consumer, int maxParallel)
25-
=> new ActionBlock<T>(consumer, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = maxParallel });
26-
}
24+
public static ActionBlock<T> NewAsync<T>(Func<T, Task> consumer, int maxParallel)
25+
=> new ActionBlock<T>(consumer, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = maxParallel });
26+
}
2727
}

Extensions.LinkTo.cs

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -6,48 +6,48 @@
66

77
namespace Open.Threading.Dataflow
88
{
9-
public static partial class DataFlowExtensions
10-
{
11-
public static IDisposable LinkTo<T>(this ISourceBlock<T> producer,
12-
Action<T> consumer)
13-
=> producer.LinkTo(new ActionBlock<T>(consumer));
9+
public static partial class DataFlowExtensions
10+
{
11+
public static IDisposable LinkTo<T>(this ISourceBlock<T> producer,
12+
Action<T> consumer)
13+
=> producer.LinkTo(new ActionBlock<T>(consumer));
1414

15-
public static IDisposable LinkToAsync<T>(this ISourceBlock<T> producer,
16-
Func<T, Task> consumer)
17-
=> producer.LinkTo(new ActionBlock<T>(consumer));
15+
public static IDisposable LinkToAsync<T>(this ISourceBlock<T> producer,
16+
Func<T, Task> consumer)
17+
=> producer.LinkTo(new ActionBlock<T>(consumer));
1818

19-
public static IDisposable LinkToWithCompletion<T>(this ISourceBlock<T> producer,
20-
ITargetBlock<T> consumer)
21-
=> producer.LinkTo(consumer, new DataflowLinkOptions() { PropagateCompletion = true });
19+
public static IDisposable LinkToWithCompletion<T>(this ISourceBlock<T> producer,
20+
ITargetBlock<T> consumer)
21+
=> producer.LinkTo(consumer, new DataflowLinkOptions() { PropagateCompletion = true });
2222

2323

24-
public static T PropagateFaultsTo<T>(this T source, params IDataflowBlock[] targets)
25-
where T : IDataflowBlock
26-
{
27-
source.Completion.OnFaulted(ex =>
28-
{
29-
foreach (var target in targets.Where(t => t != null))
30-
target.Fault(ex.InnerException);
31-
});
32-
return source;
33-
}
24+
public static T PropagateFaultsTo<T>(this T source, params IDataflowBlock[] targets)
25+
where T : IDataflowBlock
26+
{
27+
source.Completion.OnFaulted(ex =>
28+
{
29+
foreach (var target in targets.Where(t => t != null))
30+
target.Fault(ex.InnerException);
31+
});
32+
return source;
33+
}
3434

35-
public static T PropagateCompletionTo<T>(this T source, params IDataflowBlock[] targets)
36-
where T : IDataflowBlock
37-
{
38-
source.Completion.ContinueWith(task =>
39-
{
40-
foreach (var target in targets.Where(t => t != null))
41-
{
42-
if (task.IsFaulted)
43-
// ReSharper disable once PossibleNullReferenceException
44-
target.Fault(task.Exception.InnerException);
45-
else
46-
target.Complete();
47-
}
48-
});
49-
return source;
50-
}
35+
public static T PropagateCompletionTo<T>(this T source, params IDataflowBlock[] targets)
36+
where T : IDataflowBlock
37+
{
38+
source.Completion.ContinueWith(task =>
39+
{
40+
foreach (var target in targets.Where(t => t != null))
41+
{
42+
if (task.IsFaulted)
43+
// ReSharper disable once PossibleNullReferenceException
44+
target.Fault(task.Exception.InnerException);
45+
else
46+
target.Complete();
47+
}
48+
});
49+
return source;
50+
}
5151

52-
}
52+
}
5353
}

Extensions.Observable.cs

Lines changed: 56 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -2,60 +2,60 @@
22

33
namespace Open.Threading.Dataflow
44
{
5-
public static partial class DataFlowExtensions
6-
{
7-
class Observer<T> : IObserver<T>, IDisposable
8-
{
9-
public static IObserver<T> New(
10-
Action<T> onNext,
11-
Action<Exception> onError,
12-
Action onCompleted)
13-
=> new Observer<T>()
14-
{
15-
_onNext = onNext,
16-
_onError = onError,
17-
_onCompleted = onCompleted
18-
};
19-
20-
Action _onCompleted;
21-
Action<Exception> _onError;
22-
Action<T> _onNext;
23-
24-
25-
public void OnNext(T value)
26-
{
27-
_onNext?.Invoke(value);
28-
}
29-
30-
public void OnError(Exception error)
31-
{
32-
_onError?.Invoke(error);
33-
}
34-
35-
public void OnCompleted()
36-
{
37-
_onCompleted?.Invoke();
38-
}
39-
40-
41-
public void Dispose()
42-
{
43-
_onNext = null;
44-
_onError = null;
45-
_onCompleted = null;
46-
}
47-
}
48-
49-
public static IDisposable Subscribe<T>(this IObservable<T> observable,
50-
Action<T> onNext,
51-
Action<Exception> onError,
52-
Action onCompleted = null)
53-
=> observable.Subscribe(Observer<T>.New(onNext, onError, onCompleted));
54-
55-
public static IDisposable Subscribe<T>(this IObservable<T> observable,
56-
Action<T> onNext,
57-
Action onCompleted = null)
58-
=> observable.Subscribe(Observer<T>.New(onNext, null, onCompleted));
59-
60-
}
5+
public static partial class DataFlowExtensions
6+
{
7+
class Observer<T> : IObserver<T>, IDisposable
8+
{
9+
public static IObserver<T> New(
10+
Action<T> onNext,
11+
Action<Exception> onError,
12+
Action onCompleted)
13+
=> new Observer<T>()
14+
{
15+
_onNext = onNext,
16+
_onError = onError,
17+
_onCompleted = onCompleted
18+
};
19+
20+
Action _onCompleted;
21+
Action<Exception> _onError;
22+
Action<T> _onNext;
23+
24+
25+
public void OnNext(T value)
26+
{
27+
_onNext?.Invoke(value);
28+
}
29+
30+
public void OnError(Exception error)
31+
{
32+
_onError?.Invoke(error);
33+
}
34+
35+
public void OnCompleted()
36+
{
37+
_onCompleted?.Invoke();
38+
}
39+
40+
41+
public void Dispose()
42+
{
43+
_onNext = null;
44+
_onError = null;
45+
_onCompleted = null;
46+
}
47+
}
48+
49+
public static IDisposable Subscribe<T>(this IObservable<T> observable,
50+
Action<T> onNext,
51+
Action<Exception> onError,
52+
Action onCompleted = null)
53+
=> observable.Subscribe(Observer<T>.New(onNext, onError, onCompleted));
54+
55+
public static IDisposable Subscribe<T>(this IObservable<T> observable,
56+
Action<T> onNext,
57+
Action onCompleted = null)
58+
=> observable.Subscribe(Observer<T>.New(onNext, null, onCompleted));
59+
60+
}
6161
}

0 commit comments

Comments
 (0)