|
4 | 4 | "context" |
5 | 5 | "errors" |
6 | 6 | "net" |
| 7 | + "sync" |
7 | 8 | "time" |
8 | 9 |
|
9 | 10 | "github.com/google/uuid" |
@@ -57,11 +58,11 @@ type K8sClientWrapper struct { |
57 | 58 | } |
58 | 59 |
|
59 | 60 | type ApiConnectivityCheckWrapper struct { |
60 | | - apicheck.ApiConnectivityCheck |
| 61 | + *apicheck.ApiConnectivityCheck |
61 | 62 | ShouldSimulatePeerResponses bool |
62 | 63 |
|
63 | | - // store responses that we should override for any peer responses |
64 | | - SimulatePeerResponses []selfNodeRemediation.HealthCheckResponseCode |
| 64 | + responsesMu sync.Mutex |
| 65 | + simulatedPeerResponses []selfNodeRemediation.HealthCheckResponseCode |
65 | 66 | } |
66 | 67 |
|
67 | 68 | func (kcw *K8sClientWrapper) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (err error) { |
@@ -107,29 +108,75 @@ func GetRandomIpAddress() (randomIP string) { |
107 | 108 | } |
108 | 109 |
|
109 | 110 | func NewApiConnectivityCheckWrapper(ck *apicheck.ApiConnectivityCheckConfig, controlPlaneManager *controlplane.Manager) (ckw *ApiConnectivityCheckWrapper) { |
| 111 | + inner := apicheck.New(ck, controlPlaneManager) |
110 | 112 | ckw = &ApiConnectivityCheckWrapper{ |
111 | | - ApiConnectivityCheck: *apicheck.New(ck, controlPlaneManager), |
| 113 | + ApiConnectivityCheck: inner, |
112 | 114 | ShouldSimulatePeerResponses: false, |
113 | | - SimulatePeerResponses: []selfNodeRemediation.HealthCheckResponseCode{}, |
| 115 | + simulatedPeerResponses: []selfNodeRemediation.HealthCheckResponseCode{}, |
114 | 116 | } |
115 | 117 |
|
116 | | - ckw.ApiConnectivityCheck.SetHealthStatusFunc(func(endpointIp corev1.PodIP, results chan<- selfNodeRemediation.HealthCheckResponseCode) { |
117 | | - switch { |
118 | | - case ckw.ShouldSimulatePeerResponses: |
119 | | - for _, code := range ckw.SimulatePeerResponses { |
120 | | - results <- code |
121 | | - } |
122 | | - |
| 118 | + inner.SetHealthStatusFunc(func(endpointIp corev1.PodIP, results chan<- selfNodeRemediation.HealthCheckResponseCode) { |
| 119 | + if ckw.ShouldSimulatePeerResponses { |
| 120 | + results <- ckw.nextSimulatedPeerResponse() |
123 | 121 | return |
124 | | - default: |
125 | | - ckw.ApiConnectivityCheck.GetDefaultPeerHealthCheckFunc()(endpointIp, results) |
126 | | - break |
127 | 122 | } |
| 123 | + |
| 124 | + ckw.GetDefaultPeerHealthCheckFunc()(endpointIp, results) |
128 | 125 | }) |
129 | 126 |
|
130 | 127 | return |
131 | 128 | } |
132 | 129 |
|
| 130 | +func (ckw *ApiConnectivityCheckWrapper) nextSimulatedPeerResponse() selfNodeRemediation.HealthCheckResponseCode { |
| 131 | + ckw.responsesMu.Lock() |
| 132 | + defer ckw.responsesMu.Unlock() |
| 133 | + |
| 134 | + if len(ckw.simulatedPeerResponses) == 0 { |
| 135 | + return selfNodeRemediation.RequestFailed |
| 136 | + } |
| 137 | + |
| 138 | + code := ckw.simulatedPeerResponses[0] |
| 139 | + if len(ckw.simulatedPeerResponses) > 1 { |
| 140 | + ckw.simulatedPeerResponses = append([]selfNodeRemediation.HealthCheckResponseCode{}, ckw.simulatedPeerResponses[1:]...) |
| 141 | + } |
| 142 | + |
| 143 | + return code |
| 144 | +} |
| 145 | + |
| 146 | +func (ckw *ApiConnectivityCheckWrapper) AppendSimulatedPeerResponse(code selfNodeRemediation.HealthCheckResponseCode) { |
| 147 | + ckw.responsesMu.Lock() |
| 148 | + defer ckw.responsesMu.Unlock() |
| 149 | + ckw.simulatedPeerResponses = append(ckw.simulatedPeerResponses, code) |
| 150 | +} |
| 151 | + |
| 152 | +func (ckw *ApiConnectivityCheckWrapper) ClearSimulatedPeerResponses() { |
| 153 | + ckw.responsesMu.Lock() |
| 154 | + defer ckw.responsesMu.Unlock() |
| 155 | + ckw.simulatedPeerResponses = nil |
| 156 | +} |
| 157 | + |
| 158 | +func (ckw *ApiConnectivityCheckWrapper) SnapshotSimulatedPeerResponses() []selfNodeRemediation.HealthCheckResponseCode { |
| 159 | + ckw.responsesMu.Lock() |
| 160 | + defer ckw.responsesMu.Unlock() |
| 161 | + if len(ckw.simulatedPeerResponses) == 0 { |
| 162 | + return nil |
| 163 | + } |
| 164 | + snapshot := make([]selfNodeRemediation.HealthCheckResponseCode, len(ckw.simulatedPeerResponses)) |
| 165 | + copy(snapshot, ckw.simulatedPeerResponses) |
| 166 | + return snapshot |
| 167 | +} |
| 168 | + |
| 169 | +func (ckw *ApiConnectivityCheckWrapper) RestoreSimulatedPeerResponses(codes []selfNodeRemediation.HealthCheckResponseCode) { |
| 170 | + ckw.responsesMu.Lock() |
| 171 | + defer ckw.responsesMu.Unlock() |
| 172 | + if len(codes) == 0 { |
| 173 | + ckw.simulatedPeerResponses = nil |
| 174 | + return |
| 175 | + } |
| 176 | + ckw.simulatedPeerResponses = make([]selfNodeRemediation.HealthCheckResponseCode, len(codes)) |
| 177 | + copy(ckw.simulatedPeerResponses, codes) |
| 178 | +} |
| 179 | + |
133 | 180 | func GenerateTestConfig() *selfnoderemediationv1alpha1.SelfNodeRemediationConfig { |
134 | 181 | return &selfnoderemediationv1alpha1.SelfNodeRemediationConfig{ |
135 | 182 | TypeMeta: metav1.TypeMeta{ |
|
0 commit comments