Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.

Commit d9ad8dc

Browse files
committed
Merge pull request #2305 from Clockwork-Muse/PLINQ_2239
PLINQ - Minor fixes
2 parents ce056a8 + ca16c3d commit d9ad8dc

12 files changed

+167
-104
lines changed

src/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Inlined/NullableDecimalAverageAggregationOperator.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,11 @@ protected override bool MoveNextCore(ref Pair<decimal, long> currentElement)
118118

119119
if (current.HasValue)
120120
{
121-
sum += current.GetValueOrDefault();
122-
count++;
121+
checked
122+
{
123+
sum += current.GetValueOrDefault();
124+
count++;
125+
}
123126
}
124127
}
125128

src/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Inlined/NullableDoubleAverageAggregationOperator.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,11 @@ protected override bool MoveNextCore(ref Pair<double, long> currentElement)
118118
if ((i++ & CancellationState.POLL_INTERVAL) == 0)
119119
CancellationState.ThrowIfCanceled(_cancellationToken);
120120

121-
sum += current.GetValueOrDefault();
122-
count++;
121+
checked
122+
{
123+
sum += current.GetValueOrDefault();
124+
count++;
125+
}
123126
}
124127
}
125128

src/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Inlined/NullableFloatAverageAggregationOperator.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,11 @@ protected override bool MoveNextCore(ref Pair<double, long> currentElement)
119119

120120
if (current.HasValue)
121121
{
122-
sum += current.GetValueOrDefault();
123-
count++;
122+
checked
123+
{
124+
sum += current.GetValueOrDefault();
125+
count++;
126+
}
124127
}
125128
}
126129

src/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Inlined/NullableIntAverageAggregationOperator.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,11 @@ protected override bool MoveNextCore(ref Pair<long, long> currentElement)
119119

120120
if (current.HasValue)
121121
{
122-
sum += current.GetValueOrDefault();
123-
count++;
122+
checked
123+
{
124+
sum += current.GetValueOrDefault();
125+
count++;
126+
}
124127
}
125128
}
126129

src/System.Linq.Parallel/src/System/Linq/Parallel/QueryOperators/Inlined/NullableLongAverageAggregationOperator.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,11 @@ protected override bool MoveNextCore(ref Pair<long, long> currentElement)
119119
CancellationState.ThrowIfCanceled(_cancellationToken);
120120
if (current.HasValue)
121121
{
122-
sum += current.GetValueOrDefault();
123-
count++;
122+
checked
123+
{
124+
sum += current.GetValueOrDefault();
125+
count++;
126+
}
124127
}
125128
}
126129

src/System.Linq.Parallel/src/System/Linq/Parallel/Utils/ReverseComparer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ internal ReverseComparer(IComparer<T> comparer)
2727

2828
public int Compare(T x, T y)
2929
{
30-
return -_comparer.Compare(x, y);
30+
return _comparer.Compare(y, x);
3131
}
3232
}
3333
}

src/System.Linq.Parallel/src/System/Linq/ParallelEnumerable.cs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -352,29 +352,10 @@ public static ParallelQuery<TSource> WithDegreeOfParallelism<TSource>(this Paral
352352
/// <exception cref="T:System.InvalidOperationException">
353353
/// WithCancellation is used multiple times in the query.
354354
/// </exception>
355-
/// <exception cref="T:System.ObjectDisposedException">
356-
/// The <see cref="T:System.Threading.CancellationTokenSource"/> associated with the <paramref name="cancellationToken"/> has been disposed.
357-
/// </exception>
358355
public static ParallelQuery<TSource> WithCancellation<TSource>(this ParallelQuery<TSource> source, CancellationToken cancellationToken)
359356
{
360357
if (source == null) throw new ArgumentNullException("source");
361358

362-
// also a convenience check whether the cancellationTokenSource backing the token is already disposed.
363-
// do this via a dummy registration as there is no public IsDipsosed property on CT.
364-
CancellationTokenRegistration dummyRegistration = new CancellationTokenRegistration();
365-
try
366-
{
367-
dummyRegistration = cancellationToken.Register(() => { });
368-
}
369-
catch (ObjectDisposedException)
370-
{
371-
throw new ArgumentException(SR.ParallelEnumerable_WithCancellation_TokenSourceDisposed, "cancellationToken");
372-
}
373-
finally
374-
{
375-
dummyRegistration.Dispose();
376-
}
377-
378359
QuerySettings settings = QuerySettings.Empty;
379360
settings.CancellationState = new CancellationState(cancellationToken);
380361

src/System.Linq.Parallel/tests/ExchangeTests.cs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Collections.Concurrent;
66
using System.Collections.Generic;
77
using System.Linq;
8+
using System.Threading;
89
using Xunit;
910

1011
namespace Test
@@ -131,22 +132,26 @@ public static void Merge_Ordered_Longrunning(Labeled<ParallelQuery<int>> labeled
131132
}
132133

133134
[Theory]
134-
[MemberData("MergeData", (object)(new int[] { 16, 1024 }))]
135-
[MemberData("ThrowOnCount_AllMergeOptions_MemberData", (object)(new int[] { 16, 1024 }))]
135+
[MemberData("ThrowOnCount_AllMergeOptions_MemberData", (object)(new int[] { 4, 8 }))]
136136
// FailingMergeData has enumerables that throw errors when attempting to perform the nth enumeration.
137137
// This test checks whether the query runs in a pipelined or buffered fashion.
138138
public static void Merge_Ordered_Pipelining(Labeled<ParallelQuery<int>> labeled, int count, ParallelMergeOptions options)
139139
{
140-
Assert.Equal(0, labeled.Item.WithMergeOptions(options).Select(x => x).First());
140+
Assert.Equal(0, labeled.Item.WithDegreeOfParallelism(count - 1).WithMergeOptions(options).First());
141141
}
142142

143143
[Theory]
144-
[OuterLoop]
145-
[MemberData("MergeData", (object)(new int[] { 1024 * 4, 1024 * 1024 }))]
146-
[MemberData("ThrowOnCount_AllMergeOptions_MemberData", (object)(new int[] { 1024 * 4, 1024 * 1024 }))]
147-
public static void Merge_Ordered_Pipelining_Longrunning(Labeled<ParallelQuery<int>> labeled, int count, ParallelMergeOptions options)
144+
[MemberData("MergeData", (object)(new int[] { 4, 8 }))]
145+
// This test checks whether the query runs in a pipelined or buffered fashion.
146+
public static void Merge_Ordered_Pipelining_Select(Labeled<ParallelQuery<int>> labeled, int count, ParallelMergeOptions options)
148147
{
149-
Merge_Ordered_Pipelining(labeled, count, options);
148+
int countdown = count;
149+
Func<int, int> down = i =>
150+
{
151+
if (Interlocked.Decrement(ref countdown) == 0) throw new DeliberateTestException();
152+
return i;
153+
};
154+
Assert.Equal(0, labeled.Item.WithDegreeOfParallelism(count - 1).WithMergeOptions(options).Select(down).First());
150155
}
151156

152157
[Theory]

src/System.Linq.Parallel/tests/Helpers/Comparers.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,24 @@ public int Compare(int x, int y)
7272
}
7373
}
7474

75+
/// <summary>
76+
/// Returns an extreme value from non-equal comparisons.
77+
/// </summary>
78+
/// <remarks>Helper for regression test against PLINQ's version of #2239 .</remarks>
79+
/// <typeparam name="T">The type being compared.</typeparam>
80+
internal class ExtremeComparer<T> : IComparer<T>
81+
{
82+
private IComparer<T> _def = Comparer<T>.Default;
83+
84+
public int Compare(T x, T y)
85+
{
86+
int direction = _def.Compare(x, y);
87+
return direction == 0 ? 0 :
88+
direction > 0 ? int.MaxValue :
89+
int.MinValue;
90+
}
91+
}
92+
7593
internal static class DelgatedComparable
7694
{
7795
public static DelegatedComparable<T> Delegate<T>(T value, IComparer<T> comparer) where T : IComparable<T>

src/System.Linq.Parallel/tests/QueryOperators/AverageTests.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ public class AverageTests
1414
//
1515
// Average
1616
//
17+
18+
// Get a set of ranges from 0 to each count, with an extra parameter containing the expected average.
1719
public static IEnumerable<object[]> AverageData(object[] counts)
1820
{
1921
Func<int, double> average = x => (x - 1) / 2.0;
@@ -76,6 +78,16 @@ public static void Average_Long_Longrunning(Labeled<ParallelQuery<int>> labeled,
7678
Average_Long(labeled, count, average);
7779
}
7880

81+
[Theory]
82+
[MemberData("Ranges", (object)(new int[] { 2 }), MemberType = typeof(UnorderedSources))]
83+
public static void Average_Long_Overflow(Labeled<ParallelQuery<int>> labeled, int count)
84+
{
85+
Functions.AssertThrowsWrapped<OverflowException>(() => labeled.Item.Select(x => x == 0 ? 1 : long.MaxValue).Average());
86+
Functions.AssertThrowsWrapped<OverflowException>(() => labeled.Item.Select(x => x == 0 ? (long?)1 : long.MaxValue).Average());
87+
Functions.AssertThrowsWrapped<OverflowException>(() => labeled.Item.Average(x => x == 0 ? -1 : long.MinValue));
88+
Functions.AssertThrowsWrapped<OverflowException>(() => labeled.Item.Average(x => x == 0 ? (long?)-1 : long.MinValue));
89+
}
90+
7991
[Theory]
8092
[MemberData("AverageData", (object)(new int[] { 1, 2, 16 }))]
8193
public static void Average_Long_SomeNull(Labeled<ParallelQuery<int>> labeled, int count, double average)

0 commit comments

Comments
 (0)