Skip to content

Commit 29c31bb

Browse files
author
Oren (electricessence)
committed
Flter cleanup.
1 parent d730b57 commit 29c31bb

File tree

8 files changed

+156
-144
lines changed

8 files changed

+156
-144
lines changed

Filters/AcceptOrPass.cs

Lines changed: 0 additions & 63 deletions
This file was deleted.

Filters/AutoComplete.cs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,36 @@
1-
using Open.Threading;
2-
using System.Threading.Tasks.Dataflow;
1+
using System.Threading.Tasks.Dataflow;
32

43
namespace Open.Threading.Dataflow
54
{
65

7-
internal class AutoCompleteFilter<T> : TargetBlockFilter<T>
6+
internal class AutoCompleteFilter<T> : TargetBlockFilterBase<T>
87
{
98
public AutoCompleteFilter(int limit, ITargetBlock<T> target) : base(target)
109
{
1110
Limit = limit;
1211
}
1312

1413
public int Limit { get; }
15-
public int AllowedCount { get; private set; }
14+
public int AllowedCount { get; private set; }
1615

1716

18-
// The key here is to reject the message ahead of time.
19-
public override DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
17+
// The key here is to reject the message ahead of time.
18+
public override DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
2019
{
2120
var result = DataflowMessageStatus.DecliningPermanently;
2221
var completed = false;
23-
// There are multiple operations happening here that require synchronization to get right.
24-
ThreadSafety.LockConditional(_target,
22+
// There are multiple operations happening here that require synchronization to get right.
23+
ThreadSafety.LockConditional(Target,
2524
() => AllowedCount < Limit,
2625
() =>
2726
{
28-
AllowedCount++;
29-
result = _target.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
27+
AllowedCount++;
28+
result = Target.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
3029
completed = AllowedCount == Limit;
3130
}
3231
);
3332

34-
if (completed) _target.Complete();
33+
if (completed) Target.Complete();
3534

3635
return result;
3736
}

Filters/Changed.cs

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,27 @@
1-
using Open.Threading;
2-
using System.Threading.Tasks.Dataflow;
1+
using System.Threading.Tasks.Dataflow;
32

43
namespace Open.Threading.Dataflow
54
{
6-
internal class ChangedFilter<T> : TargetBlockFilter<T>
5+
internal class ChangedFilter<T> : TargetBlockFilter<T>
76
{
8-
readonly DataflowMessageStatus _defaultResponseForDuplicate;
9-
10-
T _last;
11-
12-
public ChangedFilter(DataflowMessageStatus defaultResponseForDuplicate, ITargetBlock<T> target) : base(target)
7+
public ChangedFilter(ITargetBlock<T> target, DataflowMessageStatus defaultResponseForDuplicate)
8+
: base(target, defaultResponseForDuplicate, null)
139
{
14-
_defaultResponseForDuplicate = defaultResponseForDuplicate;
1510
}
1611

17-
// The key here is to reject the message ahead of time.
18-
public override DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
19-
{
20-
return ThreadSafety.LockConditional(
21-
_target,
22-
() => !messageValue.Equals(_last),
23-
() => _last = messageValue)
24-
? _target.OfferMessage(messageHeader, messageValue, source, consumeToAccept)
25-
: _defaultResponseForDuplicate;
26-
}
12+
readonly object SyncLock = new object();
13+
T _last;
14+
15+
protected override bool Accept(T messageValue)
16+
=> ThreadSafety.LockConditional(
17+
SyncLock,
18+
() => !messageValue.Equals(_last),
19+
() => _last = messageValue);
2720
}
2821

2922
public static partial class DataFlowExtensions
3023
{
3124
public static ITargetBlock<T> OnlyIfChanged<T>(this ITargetBlock<T> target, DataflowMessageStatus defaultResponseForDuplicate)
32-
=> new ChangedFilter<T>(defaultResponseForDuplicate, target);
25+
=> new ChangedFilter<T>(target, defaultResponseForDuplicate);
3326
}
3427
}

Filters/Distinct.cs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,32 +5,26 @@ namespace Open.Threading.Dataflow
55
{
66
internal class DistinctFilter<T> : TargetBlockFilter<T>
77
{
8-
private readonly DataflowMessageStatus _defaultResponseForDuplicate;
9-
10-
private readonly HashSet<T> _set = new HashSet<T>();
11-
12-
public DistinctFilter(DataflowMessageStatus defaultResponseForDuplicate, ITargetBlock<T> target) : base(target)
8+
public DistinctFilter(ITargetBlock<T> target, DataflowMessageStatus defaultResponseForDuplicate)
9+
: base(target, defaultResponseForDuplicate, null)
1310
{
14-
_defaultResponseForDuplicate = defaultResponseForDuplicate;
1511
}
1612

17-
// The key here is to reject the message ahead of time.
18-
public override DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
13+
private readonly HashSet<T> _set = new HashSet<T>();
14+
15+
protected override bool Accept(T messageValue)
1916
{
2017
bool didntHave;
21-
lock (_target) // Assure order of acceptance.
18+
lock (Target) // Assure order of acceptance.
2219
didntHave = _set.Add(messageValue);
23-
if (didntHave)
24-
return _target.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
25-
26-
return _defaultResponseForDuplicate;
20+
return didntHave;
2721
}
2822
}
2923

3024
public static partial class DataFlowExtensions
3125
{
3226
public static ITargetBlock<T> Distinct<T>(this ITargetBlock<T> target, DataflowMessageStatus defaultResponseForDuplicate)
33-
=> new DistinctFilter<T>(defaultResponseForDuplicate, target);
27+
=> new DistinctFilter<T>(target, defaultResponseForDuplicate);
3428
}
3529

3630
}

Filters/Filter.cs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
using System;
2+
using System.Threading.Tasks.Dataflow;
3+
4+
namespace Open.Threading.Dataflow
5+
{
6+
internal class TargetBlockFilter<T> : TargetBlockFilterBase<T>
7+
{
8+
private readonly DataflowMessageStatus _filterDeclineStatus;
9+
private readonly Func<T, bool> _filter;
10+
11+
public TargetBlockFilter(
12+
ITargetBlock<T> target,
13+
DataflowMessageStatus filterDeclineStatus,
14+
Func<T, bool> filter) :base(target)
15+
{
16+
_filter = filter;
17+
_filterDeclineStatus = filterDeclineStatus;
18+
}
19+
20+
protected virtual bool Accept(T messageValue)
21+
=> _filter(messageValue);
22+
23+
public override DataflowMessageStatus OfferMessage(
24+
DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
25+
=> Accepting && !Accept(messageValue)
26+
? _filterDeclineStatus
27+
: base.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
28+
}
29+
30+
public static partial class DataFlowExtensions
31+
{
32+
/// <summary>
33+
/// Synchronously filters out incomming messages before they arrive to the target.
34+
/// Passes the message to the target if the filter function returns true.
35+
/// If the filter function returns false, then the message will either be accepted and disappear (default), or will be declined if the decli
36+
/// </summary>
37+
/// <typeparam name="T">The message type.</typeparam>
38+
/// <param name="target">The Target-Block to pass messages too.</param>
39+
/// <param name="filter">Passes the message to the target if the filter function returns true.</param>
40+
/// <param name="decline">If true, and the filter returns false, the message will be declined, allowing it to continue elsewhere. If false (default) the message is accepted even though it is not being passed to the target.</param>
41+
/// <returns>A filter block that preceeds the target..</returns>
42+
public static ITargetBlock<T> Filter<T>(this ITargetBlock<T> target,
43+
Func<T, bool> filter,
44+
bool decline = false)
45+
=> new TargetBlockFilter<T>(
46+
target,
47+
decline ? DataflowMessageStatus.Declined : DataflowMessageStatus.Accepted,
48+
filter);
49+
50+
51+
/// <summary>
52+
/// Processes an item through an acceptor function.
53+
/// If the acceptor returns true, then it was accepted and subsequently received/taken from the source (no longer available to downstream targets).
54+
/// </summary>
55+
/// <typeparam name="T">The message type</typeparam>
56+
/// <param name="source">The source block to receive from.</param>
57+
/// <param name="acceptor">The function to process the item and decide if accepted.</param>
58+
/// <returns>The original source block to allow for more acceptors or filters to be applied.</returns>
59+
public static ISourceBlock<T> TakeOrContinue<T>(this ISourceBlock<T> source,
60+
Func<T, bool> acceptor)
61+
{
62+
var receiver = DataflowBlock
63+
.NullTarget<T>() // If the acceptor returns true, the message is dropped.
64+
.Filter(acceptor, true); // Else it's declined and continues on to the next target.
65+
66+
source.LinkToWithCompletion(receiver);
67+
return source;
68+
}
69+
}
70+
71+
}

Open.Threading.Dataflow.csproj

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ Part of the "Open" set of libraries.</Description>
1515
<RepositoryUrl>https://github.com/electricessence/Open.Threading.Dataflow/</RepositoryUrl>
1616
<RepositoryType>git</RepositoryType>
1717
<PackageTags>dotnet, dotnet-core, dotnetcore, cs, dataflow, tpl, extensions</PackageTags>
18-
<Version>2.2.0</Version>
19-
<PackageReleaseNotes></PackageReleaseNotes>
20-
<AssemblyVersion>2.2.0.0</AssemblyVersion>
21-
<FileVersion>2.2.0.0</FileVersion>
18+
<Version>2.2.1</Version>
19+
<PackageReleaseNotes>Cleanup of filters with and added .Filter extension for ITargetBlock&lt;T&gt;.</PackageReleaseNotes>
20+
<AssemblyVersion>2.2.1.0</AssemblyVersion>
21+
<FileVersion>2.2.1.0</FileVersion>
2222
<PackageLicenseExpression>MIT</PackageLicenseExpression>
2323
</PropertyGroup>
2424

TargetBlockFilter.cs

Lines changed: 0 additions & 32 deletions
This file was deleted.

TargetBlockFilterBase.cs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using System.Threading.Tasks.Dataflow;
5+
6+
namespace Open.Threading.Dataflow
7+
{
8+
internal abstract class TargetBlockFilterBase<T> : ITargetBlock<T>
9+
{
10+
protected readonly ITargetBlock<T> Target;
11+
12+
protected TargetBlockFilterBase(ITargetBlock<T> target)
13+
{
14+
Target = target ?? throw new ArgumentNullException(nameof(target));
15+
}
16+
17+
protected int _state;
18+
const int ACCEPTING = 0;
19+
const int REJECTING = 1;
20+
21+
protected bool Accepting => _state == ACCEPTING && !Target.Completion.IsCompleted;
22+
23+
protected void CompleteInternal()
24+
{
25+
if(_state == ACCEPTING)
26+
Interlocked.CompareExchange(ref _state, REJECTING, ACCEPTING);
27+
}
28+
29+
public void Complete()
30+
{
31+
CompleteInternal();
32+
Target.Complete();
33+
}
34+
35+
public void Fault(Exception exception)
36+
{
37+
CompleteInternal();
38+
Target.Fault(exception);
39+
}
40+
41+
public Task Completion => Target.Completion;
42+
43+
// The key here is to reject the message ahead of time.
44+
public virtual DataflowMessageStatus OfferMessage(
45+
DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
46+
=> Accepting
47+
? Target.OfferMessage(messageHeader, messageValue, source, consumeToAccept)
48+
: DataflowMessageStatus.DecliningPermanently;
49+
}
50+
}

0 commit comments

Comments
 (0)