|
| 1 | +using Microsoft.AspNetCore.Http; |
| 2 | +using Microsoft.Extensions.Primitives; |
| 3 | +using Ocelot.Discovery.Nacos.AcceptanceTests.LoadBalancer; |
| 4 | +using Ocelot.LoadBalancer; |
| 5 | +using Shouldly; |
| 6 | +using System.Collections.Concurrent; |
| 7 | +using System.Diagnostics; |
| 8 | +using System.Net; |
| 9 | +using System.Runtime.CompilerServices; |
| 10 | +using System.Text; |
| 11 | + |
| 12 | +namespace Ocelot.Discovery.Nacos.AcceptanceTests; |
| 13 | + |
| 14 | +public class ConcurrentSteps : Steps, IDisposable |
| 15 | +{ |
| 16 | + protected Task[] _tasks; |
| 17 | + protected ServiceHandler[] _handlers; |
| 18 | + protected ConcurrentDictionary<int, HttpResponseMessage> _responses; |
| 19 | + protected volatile int[] _counters; |
| 20 | + |
| 21 | + public ConcurrentSteps() |
| 22 | + { |
| 23 | + _tasks = Array.Empty<Task>(); |
| 24 | + _handlers = Array.Empty<ServiceHandler>(); |
| 25 | + _responses = new(); |
| 26 | + _counters = Array.Empty<int>(); |
| 27 | + } |
| 28 | + |
| 29 | + public override void Dispose() |
| 30 | + { |
| 31 | + foreach (var handler in _handlers) |
| 32 | + { |
| 33 | + handler?.Dispose(); |
| 34 | + } |
| 35 | + |
| 36 | + foreach (var response in _responses.Values) |
| 37 | + { |
| 38 | + response?.Dispose(); |
| 39 | + } |
| 40 | + |
| 41 | + foreach (var task in _tasks) |
| 42 | + { |
| 43 | + task?.Dispose(); |
| 44 | + } |
| 45 | + |
| 46 | + base.Dispose(); |
| 47 | + GC.SuppressFinalize(this); |
| 48 | + } |
| 49 | + |
| 50 | + protected void GivenServiceInstanceIsRunning(string url, string response) |
| 51 | + => GivenServiceInstanceIsRunning(url, response, HttpStatusCode.OK); |
| 52 | + |
| 53 | + protected void GivenServiceInstanceIsRunning(string url, string response, HttpStatusCode statusCode) |
| 54 | + { |
| 55 | + _handlers = new ServiceHandler[1]; // allocate single instance |
| 56 | + _counters = new int[1]; // single counter |
| 57 | + GivenServiceIsRunning(url, response, 0, statusCode); |
| 58 | + _counters[0] = 0; |
| 59 | + } |
| 60 | + |
| 61 | + protected void GivenThereIsAServiceRunningOn(string url, string basePath, string responseBody) |
| 62 | + { |
| 63 | + var handler = new ServiceHandler(); |
| 64 | + _handlers = new ServiceHandler[] { handler }; |
| 65 | + handler.GivenThereIsAServiceRunningOn(url, basePath, MapGet(basePath, responseBody)); |
| 66 | + } |
| 67 | + |
| 68 | + protected void GivenMultipleServiceInstancesAreRunning(string[] urls, [CallerMemberName] string serviceName = null) |
| 69 | + { |
| 70 | + serviceName ??= new Uri(urls[0]).Host; |
| 71 | + string[] responses = urls.Select(u => $"{serviceName}|url({u})").ToArray(); |
| 72 | + GivenMultipleServiceInstancesAreRunning(urls, responses, HttpStatusCode.OK); |
| 73 | + } |
| 74 | + |
| 75 | + protected void GivenMultipleServiceInstancesAreRunning(string[] urls, string[] responses) |
| 76 | + => GivenMultipleServiceInstancesAreRunning(urls, responses, HttpStatusCode.OK); |
| 77 | + |
| 78 | + protected void GivenMultipleServiceInstancesAreRunning(string[] urls, string[] responses, HttpStatusCode statusCode) |
| 79 | + { |
| 80 | + Debug.Assert(urls.Length == responses.Length, "Length mismatch!"); |
| 81 | + _handlers = new ServiceHandler[urls.Length]; // allocate multiple instances |
| 82 | + _counters = new int[urls.Length]; // multiple counters |
| 83 | + for (int i = 0; i < urls.Length; i++) |
| 84 | + { |
| 85 | + GivenServiceIsRunning(urls[i], responses[i], i, statusCode); |
| 86 | + _counters[i] = 0; |
| 87 | + } |
| 88 | + } |
| 89 | + |
| 90 | + private void GivenServiceIsRunning(string url, string response) |
| 91 | + => GivenServiceIsRunning(url, response, 0, HttpStatusCode.OK); |
| 92 | + private void GivenServiceIsRunning(string url, string response, int index) |
| 93 | + => GivenServiceIsRunning(url, response, index, HttpStatusCode.OK); |
| 94 | + |
| 95 | + private void GivenServiceIsRunning(string url, string response, int index, HttpStatusCode successCode) |
| 96 | + { |
| 97 | + response ??= successCode.ToString(); |
| 98 | + _handlers[index] ??= new(); |
| 99 | + var serviceHandler = _handlers[index]; |
| 100 | + serviceHandler.GivenThereIsAServiceRunningOn(url, MapGet(index, response, successCode)); |
| 101 | + } |
| 102 | + |
| 103 | + protected static RequestDelegate MapGet(string path, string responseBody) => MapGet(path, responseBody, HttpStatusCode.OK); |
| 104 | + protected static RequestDelegate MapGet(string path, string responseBody, HttpStatusCode statusCode) => async context => |
| 105 | + { |
| 106 | + var downstreamPath = !string.IsNullOrEmpty(context.Request.PathBase.Value) |
| 107 | + ? context.Request.PathBase.Value |
| 108 | + : context.Request.Path.Value; |
| 109 | + bool isMatch = downstreamPath == path; |
| 110 | + context.Response.StatusCode = (int)(isMatch ? statusCode : HttpStatusCode.NotFound); |
| 111 | + await context.Response.WriteAsync(isMatch ? responseBody : "Not Found"); |
| 112 | + }; |
| 113 | + |
| 114 | + public static class HeaderNames |
| 115 | + { |
| 116 | + public const string ServiceIndex = nameof(LeaseEventArgs.ServiceIndex); |
| 117 | + public const string Host = nameof(Uri.Host); |
| 118 | + public const string Port = nameof(Uri.Port); |
| 119 | + public const string Counter = nameof(Counter); |
| 120 | + } |
| 121 | + |
| 122 | + protected RequestDelegate MapGet(int index, string body) => MapGet(index, body, HttpStatusCode.OK); |
| 123 | + protected RequestDelegate MapGet(int index, string body, HttpStatusCode successCode) => async context => |
| 124 | + { |
| 125 | + // Don't delay during the first service call |
| 126 | + if (Volatile.Read(ref _counters[index]) > 0) |
| 127 | + { |
| 128 | + await Task.Delay(Random.Shared.Next(5, 15)); // emulate integration delay up to 15 milliseconds |
| 129 | + } |
| 130 | + |
| 131 | + string responseBody; |
| 132 | + var request = context.Request; |
| 133 | + var response = context.Response; |
| 134 | + try |
| 135 | + { |
| 136 | + int count = Interlocked.Increment(ref _counters[index]); |
| 137 | + responseBody = string.Concat(count, ':', body); |
| 138 | + |
| 139 | + response.StatusCode = (int)successCode; |
| 140 | + response.Headers.Append(HeaderNames.ServiceIndex, new StringValues(index.ToString())); |
| 141 | + response.Headers.Append(HeaderNames.Host, new StringValues(request.Host.Host)); |
| 142 | + response.Headers.Append(HeaderNames.Port, new StringValues(request.Host.Port.ToString())); |
| 143 | + response.Headers.Append(HeaderNames.Counter, new StringValues(count.ToString())); |
| 144 | + await response.WriteAsync(responseBody); |
| 145 | + } |
| 146 | + catch (Exception exception) |
| 147 | + { |
| 148 | + responseBody = string.Concat(1, ':', exception.StackTrace); |
| 149 | + response.StatusCode = (int)HttpStatusCode.InternalServerError; |
| 150 | + await response.WriteAsync(responseBody); |
| 151 | + } |
| 152 | + }; |
| 153 | + |
| 154 | + public Task[] WhenIGetUrlOnTheApiGatewayConcurrently(string url, int times) |
| 155 | + => RunParallelRequests(times, (i) => url); |
| 156 | + |
| 157 | + public Task[] WhenIGetUrlOnTheApiGatewayConcurrently(int times, params string[] urls) |
| 158 | + => RunParallelRequests(times, (i) => urls[i % urls.Length]); |
| 159 | + |
| 160 | + protected Task[] RunParallelRequests(int times, Func<int, string> urlFunc) |
| 161 | + { |
| 162 | + _tasks = new Task[times]; |
| 163 | + _responses = new(times, times); |
| 164 | + for (var i = 0; i < times; i++) |
| 165 | + { |
| 166 | + var url = urlFunc(i); |
| 167 | + _tasks[i] = GetParallelResponse(url, i); |
| 168 | + _responses[i] = null; |
| 169 | + } |
| 170 | + |
| 171 | + Task.WaitAll(_tasks); |
| 172 | + return _tasks; |
| 173 | + } |
| 174 | + |
| 175 | + private async Task GetParallelResponse(string url, int threadIndex) |
| 176 | + { |
| 177 | + var response = await _ocelotClient.GetAsync(url); |
| 178 | + var content = await response.Content.ReadAsStringAsync(); |
| 179 | + var counterString = content.Contains(':') |
| 180 | + ? content.Split(':')[0] // let the first fragment is counter value |
| 181 | + : "0"; |
| 182 | + int count = int.Parse(counterString); |
| 183 | + count.ShouldBeGreaterThan(0); |
| 184 | + _responses[threadIndex] = response; |
| 185 | + } |
| 186 | + |
| 187 | + public void ThenAllStatusCodesShouldBe(HttpStatusCode expected) |
| 188 | + => _responses.ShouldAllBe(response => response.Value.StatusCode == expected); |
| 189 | + public void ThenAllResponseBodiesShouldBe(string expectedBody) |
| 190 | + => _responses.ShouldAllBe(response => response.Value.Content.ReadAsStringAsync().Result == expectedBody); |
| 191 | + |
| 192 | + protected string CalledTimesMessage() |
| 193 | + => $"All values are [{string.Join(',', _counters)}]"; |
| 194 | + |
| 195 | + public void ThenAllServicesShouldHaveBeenCalledTimes(int expected) |
| 196 | + => _counters.Sum().ShouldBe(expected, CalledTimesMessage()); |
| 197 | + |
| 198 | + public void ThenServiceShouldHaveBeenCalledTimes(int index, int expected) |
| 199 | + => _counters[index].ShouldBe(expected, CalledTimesMessage()); |
| 200 | + |
| 201 | + public void ThenServicesShouldHaveBeenCalledTimes(params int[] expected) |
| 202 | + { |
| 203 | + for (int i = 0; i < expected.Length; i++) |
| 204 | + { |
| 205 | + _counters[i].ShouldBe(expected[i], CalledTimesMessage()); |
| 206 | + } |
| 207 | + } |
| 208 | + |
| 209 | + public static int Bottom(int totalRequests, int totalServices) |
| 210 | + => totalRequests / totalServices; |
| 211 | + public static int Top(int totalRequests, int totalServices) |
| 212 | + { |
| 213 | + int bottom = Bottom(totalRequests, totalServices); |
| 214 | + return totalRequests - (bottom * totalServices) + bottom; |
| 215 | + } |
| 216 | + |
| 217 | + public void ThenAllServicesCalledRealisticAmountOfTimes(int bottom, int top) |
| 218 | + { |
| 219 | + var customMessage = new StringBuilder() |
| 220 | + .AppendLine($"{nameof(bottom)}: {bottom}") |
| 221 | + .AppendLine($" {nameof(top)}: {top}") |
| 222 | + .AppendLine($" All values are [{string.Join(',', _counters)}]") |
| 223 | + .ToString(); |
| 224 | + int sum = 0, totalSum = _counters.Sum(); |
| 225 | + |
| 226 | + // Last offline services cannot be called at all, thus don't assert zero counters |
| 227 | + for (int i = 0; i < _counters.Length && sum < totalSum; i++) |
| 228 | + { |
| 229 | + int actual = _counters[i]; |
| 230 | + actual.ShouldBeInRange(bottom, top, customMessage); |
| 231 | + sum += actual; |
| 232 | + } |
| 233 | + } |
| 234 | + |
| 235 | + public void ThenAllServicesCalledOptimisticAmountOfTimes(ILoadBalancerAnalyzer analyzer) |
| 236 | + { |
| 237 | + if (analyzer == null) return; |
| 238 | + int bottom = analyzer.BottomOfConnections(), |
| 239 | + top = analyzer.TopOfConnections(); |
| 240 | + ThenAllServicesCalledRealisticAmountOfTimes(bottom, top); // with unstable checkings |
| 241 | + } |
| 242 | + |
| 243 | + public void ThenServiceCountersShouldMatchLeasingCounters(ILoadBalancerAnalyzer analyzer, int[] ports, int totalRequests) |
| 244 | + { |
| 245 | + if (analyzer == null || ports == null) |
| 246 | + return; |
| 247 | + |
| 248 | + analyzer.ShouldNotBeNull().Analyze(); |
| 249 | + analyzer.Events.Count.ShouldBe(totalRequests, $"{nameof(ILoadBalancerAnalyzer.ServiceName)}: {analyzer.ServiceName}"); |
| 250 | + |
| 251 | + var leasingCounters = analyzer?.GetHostCounters() ?? new(); |
| 252 | + var sortedLeasingCountersByPort = ports.Select(port => leasingCounters.FirstOrDefault(kv => kv.Key.DownstreamPort == port).Value).ToArray(); |
| 253 | + for (int i = 0; i < ports.Length; i++) |
| 254 | + { |
| 255 | + var host = leasingCounters.Keys.FirstOrDefault(k => k.DownstreamPort == ports[i]); |
| 256 | + |
| 257 | + // Leasing info/counters can be absent because of offline service instance with exact port in unstable scenario |
| 258 | + if (host != null) |
| 259 | + { |
| 260 | + var customMessage = new StringBuilder() |
| 261 | + .AppendLine($"{nameof(ILoadBalancerAnalyzer.ServiceName)}: {analyzer.ServiceName}") |
| 262 | + .AppendLine($" Port: {ports[i]}") |
| 263 | + .AppendLine($" Host: {host}") |
| 264 | + .AppendLine($" Service counters: [{string.Join(',', _counters)}]") |
| 265 | + .AppendLine($" Leasing counters: [{string.Join(',', sortedLeasingCountersByPort)}]") // should have order of _counters |
| 266 | + .ToString(); |
| 267 | + int counter1 = _counters[i]; |
| 268 | + int counter2 = leasingCounters[host]; |
| 269 | + counter1.ShouldBe(counter2, customMessage); |
| 270 | + } |
| 271 | + } |
| 272 | + } |
| 273 | +} |
0 commit comments