Skip to content

Commit 71fbffa

Browse files
author
Oren (electricessence)
committed
Improved drop handling.
1 parent 29c31bb commit 71fbffa

File tree

2 files changed

+32
-10
lines changed

2 files changed

+32
-10
lines changed

Filters/Filter.cs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,50 @@
44
namespace Open.Threading.Dataflow
55
{
66
internal class TargetBlockFilter<T> : TargetBlockFilterBase<T>
7-
{
7+
{
88
private readonly DataflowMessageStatus _filterDeclineStatus;
99
private readonly Func<T, bool> _filter;
1010

1111
public TargetBlockFilter(
1212
ITargetBlock<T> target,
1313
DataflowMessageStatus filterDeclineStatus,
14-
Func<T, bool> filter) :base(target)
15-
{
14+
Func<T, bool> filter) : base(target)
15+
{
1616
_filter = filter;
17+
switch (filterDeclineStatus)
18+
{
19+
case DataflowMessageStatus.Postponed:
20+
case DataflowMessageStatus.NotAvailable:
21+
throw new ArgumentException("Block filter does not support: " + Enum.GetName(typeof(DataflowMessageStatus), filterDeclineStatus));
22+
23+
}
24+
1725
_filterDeclineStatus = filterDeclineStatus;
26+
1827
}
1928

29+
private static readonly ITargetBlock<T> NullTarget = DataflowBlock.NullTarget<T>();
30+
2031
protected virtual bool Accept(T messageValue)
2132
=> _filter(messageValue);
2233

2334
public override DataflowMessageStatus OfferMessage(
2435
DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
25-
=> Accepting && !Accept(messageValue)
26-
? _filterDeclineStatus
27-
: base.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
36+
{
37+
if (!Accepting)
38+
return DataflowMessageStatus.DecliningPermanently;
39+
40+
if (Accept(messageValue))
41+
return base.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
42+
43+
if (_filterDeclineStatus == DataflowMessageStatus.Accepted)
44+
return NullTarget.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
45+
46+
return _filterDeclineStatus;
47+
}
2848
}
2949

50+
3051
public static partial class DataFlowExtensions
3152
{
3253
/// <summary>

Open.Threading.Dataflow.csproj

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@ Part of the "Open" set of libraries.</Description>
1515
<RepositoryUrl>https://github.com/electricessence/Open.Threading.Dataflow/</RepositoryUrl>
1616
<RepositoryType>git</RepositoryType>
1717
<PackageTags>dotnet, dotnet-core, dotnetcore, cs, dataflow, tpl, extensions</PackageTags>
18-
<Version>2.2.1</Version>
19-
<PackageReleaseNotes>Cleanup of filters with and added .Filter extension for ITargetBlock&lt;T&gt;.</PackageReleaseNotes>
20-
<AssemblyVersion>2.2.1.0</AssemblyVersion>
21-
<FileVersion>2.2.1.0</FileVersion>
18+
<Version>2.2.2</Version>
19+
<PackageReleaseNotes>Cleanup of filters with and added .Filter extension for ITargetBlock&lt;T&gt;.
20+
Better drop handing using a NullTarget.</PackageReleaseNotes>
21+
<AssemblyVersion>2.2.2.0</AssemblyVersion>
22+
<FileVersion>2.2.2.0</FileVersion>
2223
<PackageLicenseExpression>MIT</PackageLicenseExpression>
2324
</PropertyGroup>
2425

0 commit comments

Comments
 (0)