|
| 1 | +# Concurrent Collections |
| 2 | + |
| 3 | + |
| 4 | +## `ConcurrentDictionary` |
| 5 | + |
| 6 | +All operations of `ConcurrentDictionary<,>` including methods, properties and indexer are thread-safe. |
| 7 | +There's some unique methods dedicated to `ConcurrentDictionary<,>` rather than `Dictionary<,>` |
| 8 | + |
| 9 | +- `AddOrUpdate`: add or update the value dependent on old value. |
| 10 | +- `GetOrAdd`: get or add the value dependent on the key. |
| 11 | +- `TryRemove(key, out var value)`: remove if key exists. |
| 12 | + |
| 13 | +> [!TIP] |
| 14 | +> If the value you would add or update does not dependent on the existing value, using indexer would be better. |
| 15 | +> If you care to **get** the value, always use `TryGet*` or `GetOrAdd` to avoid exceptions. |
| 16 | +
|
| 17 | +```cs |
| 18 | +ConcurrentDictionary<string, string> dict = []; |
| 19 | + |
| 20 | +string? val; |
| 21 | + |
| 22 | +// if key is registered, update with the transformed value |
| 23 | +// if key is not registered, callback wouldn't be called |
| 24 | +val = dict.AddOrUpdate("key", "newValue", (key, old) => $"transformed {old} for {key}"); |
| 25 | +// add with transformed key |
| 26 | +// or |
| 27 | +// update with transformed with key and old value. |
| 28 | +val = dict.AddOrUpdate( |
| 29 | + "key", |
| 30 | + key => $"transformed from {key} on add", |
| 31 | + (key, old) => $"transformed {old} for {key} on update" |
| 32 | +); |
| 33 | + |
| 34 | +// if the key is not registered, add with the generated value |
| 35 | +// or just return the value |
| 36 | +val = dict.GetOrAdd("foo", key => $"generated using {key}"); |
| 37 | +// add with raw value |
| 38 | +val = dict.GetOrAdd("foo", "newValue"); |
| 39 | + |
| 40 | +// value is the value corresponds to the key |
| 41 | +if (dict.TryRemove("key", out string? value)) |
| 42 | + Console.WriteLine($"value {value} has been removed"); |
| 43 | +``` |
| 44 | + |
| 45 | +## `ConcurrentQueue` |
| 46 | + |
| 47 | +```cs |
| 48 | +using System.Collections.Concurrent; |
| 49 | + |
| 50 | +ConcurrentQueue<string> queue = []; |
| 51 | + |
| 52 | +queue.Enqueue("foo"); |
| 53 | + |
| 54 | +if (queue.TryDequeue(out var result)) |
| 55 | + Console.WriteLine(result); |
| 56 | + |
| 57 | +if (queue.TryPeek(out var head)) |
| 58 | + Console.WriteLine(head); |
| 59 | +``` |
| 60 | + |
| 61 | +## `ConcurrentStack` |
| 62 | + |
| 63 | +```cs |
| 64 | +using System.Collections.Concurrent; |
| 65 | + |
| 66 | +ConcurrentStack<int> stack = []; |
| 67 | + |
| 68 | +stack.Push(1); |
| 69 | +stack.PushRange([2, 3, 4]); |
| 70 | + |
| 71 | +if (stack.TryPeek(out var top)) |
| 72 | + Console.WriteLine(top); |
| 73 | + |
| 74 | +if (stack.TryPop(out var result)) |
| 75 | + Console.WriteLine(result); |
| 76 | + |
| 77 | +// pop a range to another collection |
| 78 | +// the new container should have valid Length or Count |
| 79 | +int[] dest = new int[stack.Count]; |
| 80 | +if (stack.TryPopRange(dest, startIndex: 0, count: stack.Count) > 0) |
| 81 | + Console.WriteLine(string.Join(", ", dest)); |
| 82 | + |
| 83 | +// or pop them all directly |
| 84 | +if (stack.TryPopRange(dest) > 0) |
| 85 | + Console.WriteLine(string.Join(", ", dest)); |
| 86 | +``` |
| 87 | + |
| 88 | +## `ConcurrentBag` |
| 89 | + |
| 90 | +`ConcurrentBag<T>` is a collection that its order is not guaranteed. But each peek or take within the same thread always returns the same value added by the thread. |
| 91 | + |
| 92 | +```cs |
| 93 | +using System.Collections.Concurrent; |
| 94 | + |
| 95 | +ConcurrentBag<int> bag = []; |
| 96 | + |
| 97 | +var tasks = Enumerable.Range(1, 10).Select(i => { |
| 98 | + return Task.Run(() => { |
| 99 | + Console.WriteLine($"i: {i}"); |
| 100 | + bag.Add(i); |
| 101 | + if (bag.TryPeek(out var result)) // [!code highlight] |
| 102 | + Console.WriteLine($"Peek: {result}"); // result is always the i been added within the same thread // [!code highlight] |
| 103 | + }); |
| 104 | +}); |
| 105 | + |
| 106 | +Task.WaitAll(tasks); |
| 107 | + |
| 108 | +if (bag.TryPeek(out var result)) |
| 109 | + Console.WriteLine(result); // random when access from main thread |
| 110 | +``` |
| 111 | + |
| 112 | +That is because `ConcurrentBag` creates dedicated list for each thread so each thread wouldn't have to content for a same list, and you can only access those items been added within the same thread. |
| 113 | + |
| 114 | +```cs |
| 115 | +var tasks = Enumerable.Range(1, 10).Select(i => { |
| 116 | + return Task.Run(() => { |
| 117 | + bag.Add(i); bag.Add(i); bag.Add(i); // add 3 times in the thread // [!code highlight] |
| 118 | +
|
| 119 | + while (bag.TryTake(out var result)) // [!code highlight] |
| 120 | + Console.WriteLine(result); // write i for 3 times since it was added 3 times within the same thread // [!code highlight] |
| 121 | + }); |
| 122 | +}); |
| 123 | +``` |
| 124 | + |
| 125 | +## `BlockingCollection` |
| 126 | + |
| 127 | +Dedicated concurrent collections except `ConcurrentDictionary` implements a special interface `IProducerConsumerCollection<T>` |
| 128 | + |
| 129 | +```cs |
| 130 | +// Defines methods to manipulate thread-safe collections intended for producer/consumer usage. |
| 131 | +// This interface provides a unified representation for producer/consumer collections |
| 132 | +// so that higher level abstractions such as BlockingCollection<T> can use the collection as the underlying storage mechanism. |
| 133 | +public interface IProducerConsumerCollection<T> : IEnumerable<T>, IEnumerable, ICollection |
| 134 | +{ |
| 135 | + void CopyTo(T[] array, int index); |
| 136 | + |
| 137 | + T[] ToArray(); |
| 138 | + |
| 139 | + bool TryAdd(T item); |
| 140 | + |
| 141 | + bool TryTake([MaybeNullWhen(false)] out T item); |
| 142 | +} |
| 143 | +``` |
| 144 | + |
| 145 | +`BlockingCollection<T>` is a dedicated **wrapper** to serve for any `IProducerConsumerCollection<T>` by limiting the maximum concurrent item count could a inner `IProducerConsumerCollection<T>` have. |
| 146 | +If the limit were reached, any operation on it including **producing** and **consuming** would be blocked(methods without indicator such as `Add`) or failed(methods such as `TryAdd`) on the thread. |
| 147 | +Such **Producer-Consumer** pattern is done by indicators returned from `bool IProducerConsumerCollection<T>.TryAdd(out var _)` and so on... |
| 148 | + |
| 149 | +> [!NOTE] |
| 150 | +> - **consume**: meaning the item was taken and removed from the inner concurrent collection |
| 151 | +> - **produce**: adding a item to the collection |
| 152 | +
|
| 153 | +```cs |
| 154 | +using System.Collections.Concurrent; |
| 155 | + |
| 156 | +BlockingCollection<int> items = new(new ConcurrentBag<int>(), boundedCapacity: 5); // [!code highlight] |
| 157 | +CancellationTokenSource cts = new(); |
| 158 | + |
| 159 | +var produce = Task.Run(() => { |
| 160 | + while (true) { |
| 161 | + cts.Token.ThrowIfCancellationRequested(); |
| 162 | + var next = Random.Shared.Next(1, 9); |
| 163 | + if (items.TryAdd(next)) // would fail if reached the boundedCapacity // [!code highlight] |
| 164 | + Console.WriteLine($"{next} was produced"); |
| 165 | + // no throttling here so producing is way faster than consuming // [!code highlight] |
| 166 | + } |
| 167 | +}, cts.Token); |
| 168 | + |
| 169 | +var consume = Task.Run(() => { |
| 170 | + foreach (var item in items.GetConsumingEnumerable()) { |
| 171 | + cts.Token.ThrowIfCancellationRequested(); |
| 172 | + Console.WriteLine($"{item} was consumed"); |
| 173 | + Thread.Sleep(Random.Shared.Next(500, 1000)); // throttling |
| 174 | + } |
| 175 | +}, cts.Token); |
| 176 | + |
| 177 | +Console.ReadKey(); |
| 178 | +cts.Cancel(); |
| 179 | +``` |
| 180 | + |
| 181 | +As you run the example you would find even the producer should produce faster but it blocks the appending when it reached the limit. |
0 commit comments