|
1 | 1 | using System.Collections.Concurrent;
|
| 2 | +using System.Net; |
2 | 3 | using System.Runtime.Serialization;
|
3 | 4 | using System.Text.Json;
|
4 | 5 |
|
@@ -59,39 +60,58 @@ private async Task WatchClientEventsAsync(CancellationToken stoppingToken)
|
59 | 60 | {
|
60 | 61 | try
|
61 | 62 | {
|
62 |
| - await foreach ((WatchEventType type, TEntity? entity) in client.WatchAsync<TEntity>( |
63 |
| - settings.Namespace, |
64 |
| - cancellationToken: stoppingToken)) |
| 63 | + while (!stoppingToken.IsCancellationRequested) |
65 | 64 | {
|
| 65 | + await foreach ((WatchEventType type, TEntity? entity) in client.WatchAsync<TEntity>( |
| 66 | + settings.Namespace, |
| 67 | + cancellationToken: stoppingToken)) |
| 68 | + { |
66 | 69 | #pragma warning disable SA1312
|
67 |
| - using var _ = logger.BeginScope(new |
| 70 | + using var _ = logger.BeginScope(new |
68 | 71 | #pragma warning restore SA1312
|
69 |
| - { |
70 |
| - EventType = type, |
71 |
| - |
72 |
| - // ReSharper disable once RedundantAnonymousTypePropertyName |
73 |
| - Kind = entity?.Kind, |
74 |
| - Name = entity?.Name(), |
75 |
| - ResourceVersion = entity?.ResourceVersion(), |
76 |
| - }); |
77 |
| - logger.LogInformation( |
78 |
| - """Received watch event "{EventType}" for "{Kind}/{Name}", last observed resource version: {ResourceVersion}.""", |
79 |
| - type, |
80 |
| - entity?.Kind, |
81 |
| - entity?.Name(), |
82 |
| - entity?.ResourceVersion()); |
83 |
| - try |
84 |
| - { |
85 |
| - await OnEventAsync(type, entity, stoppingToken); |
86 |
| - } |
87 |
| - catch (Exception e) |
88 |
| - { |
89 |
| - logger.LogError( |
90 |
| - e, |
91 |
| - "Reconciliation of {EventType} for {Kind}/{Name} failed.", |
| 72 | + { |
| 73 | + EventType = type, |
| 74 | + |
| 75 | + // ReSharper disable once RedundantAnonymousTypePropertyName |
| 76 | + Kind = entity?.Kind, |
| 77 | + Name = entity?.Name(), |
| 78 | + ResourceVersion = entity?.ResourceVersion(), |
| 79 | + }); |
| 80 | + logger.LogInformation( |
| 81 | + """Received watch event "{EventType}" for "{Kind}/{Name}", last observed resource version: {ResourceVersion}.""", |
92 | 82 | type,
|
93 | 83 | entity?.Kind,
|
94 |
| - entity.Name()); |
| 84 | + entity?.Name(), |
| 85 | + entity?.ResourceVersion()); |
| 86 | + try |
| 87 | + { |
| 88 | + await OnEventAsync(type, entity, stoppingToken); |
| 89 | + } |
| 90 | + catch (KubernetesException e) |
| 91 | + { |
| 92 | + if (e.Status.Code == (int)HttpStatusCode.Gone) |
| 93 | + { |
| 94 | + logger.LogDebug("Watch restarting due to 410 HTTP Gone"); |
| 95 | + |
| 96 | + break; |
| 97 | + } |
| 98 | + |
| 99 | + LogReconciliationFailed(e); |
| 100 | + } |
| 101 | + catch (Exception e) |
| 102 | + { |
| 103 | + LogReconciliationFailed(e); |
| 104 | + } |
| 105 | + |
| 106 | + void LogReconciliationFailed(Exception exception) |
| 107 | + { |
| 108 | + logger.LogError( |
| 109 | + exception, |
| 110 | + "Reconciliation of {EventType} for {Kind}/{Name} failed.", |
| 111 | + type, |
| 112 | + entity?.Kind, |
| 113 | + entity.Name()); |
| 114 | + } |
95 | 115 | }
|
96 | 116 | }
|
97 | 117 | }
|
|
0 commit comments