@@ -3,6 +3,9 @@ package agents
3
3
import (
4
4
"context"
5
5
"fmt"
6
+ "maps"
7
+ "slices"
8
+ "time"
6
9
7
10
"go.uber.org/zap"
8
11
"golang.org/x/xerrors"
@@ -30,6 +33,8 @@ type retryParams struct {
30
33
retrials int
31
34
}
32
35
36
+ const RollingChangeArgs = "RollingChangeArgs"
37
+
33
38
// EnsureAgentKeySecretExists checks if the Secret with specified name (<groupId>-group-secret) exists, otherwise tries to
34
39
// generate agent key using OM public API and create Secret containing this key. Generation of a key is expected to be
35
40
// a rare operation as the group creation api generates agent key already (so the only possible situation is when the group
@@ -107,6 +112,184 @@ func getAgentRegisterError(errorMsg string) error {
107
112
"name ('cluster.local'): %s" , errorMsg ))
108
113
}
109
114
115
+ const StaleProcessDuration = time .Minute * 2
116
+
117
+ // ProcessState represents the state of the mongodb process.
118
+ // Most importantly it contains the information whether the node is down (precisely whether the agent running next to mongod is actively reporting pings to OM),
119
+ // what is the last version of the automation config achieved and the step on which the agent is currently executing the plan.
120
+ type ProcessState struct {
121
+ Hostname string
122
+ LastAgentPing time.Time
123
+ GoalVersionAchieved int
124
+ Plan []string
125
+ ProcessName string
126
+ }
127
+
128
+ // NewProcessState should be used to create new instances of ProcessState as it sets some reasonable default values.
129
+ // As ProcessState is combining the data from two sources, we don't have any guarantees that we'll have the information about the given hostname
130
+ // available from both sources, therefore we need to always assume some defaults.
131
+ func NewProcessState (hostname string ) ProcessState {
132
+ return ProcessState {
133
+ Hostname : hostname ,
134
+ LastAgentPing : time.Time {},
135
+ GoalVersionAchieved : - 1 ,
136
+ Plan : nil ,
137
+ }
138
+ }
139
+
140
+ // IsStale returns true if this process is considered down, i.e. last ping of the agent is later than 2 minutes ago
141
+ // We use an in-the-middle value when considering the process to be down:
142
+ // - in waitForAgentsToRegister we use 1 min to consider the process "not registered"
143
+ // - Ops Manager is using 5 mins as a default for considering process as stale
144
+ func (p ProcessState ) IsStale () bool {
145
+ return p .LastAgentPing .Add (StaleProcessDuration ).Before (time .Now ())
146
+ }
147
+
148
+ // MongoDBClusterStateInOM represents the state of the whole deployment from the Ops Manager's perspective by combining singnals about the processes from two sources:
149
+ // - from om.Connection.ReadAutomationAgents to get last ping of the agent (/groups/<groupId>/agents/AUTOMATION)
150
+ // - from om.Connection.ReadAutomationStatus to get the list of agent health statuses, AC version achieved, step of the agent's plan (/groups/<groupId>/automationStatus)
151
+ type MongoDBClusterStateInOM struct {
152
+ GoalVersion int
153
+ ProcessStateMap map [string ]ProcessState
154
+ }
155
+
156
+ // GetMongoDBClusterState executes requests to OM from the given omConnection to gather the current deployment state.
157
+ // It combines the data from the automation status and the list of automation agents.
158
+ func GetMongoDBClusterState (omConnection om.Connection ) (MongoDBClusterStateInOM , error ) {
159
+ var agentStatuses []om.AgentStatus
160
+ _ , err := om .TraversePages (
161
+ omConnection .ReadAutomationAgents ,
162
+ func (aa interface {}) bool {
163
+ agentStatuses = append (agentStatuses , aa .(om.AgentStatus ))
164
+ return false
165
+ },
166
+ )
167
+ if err != nil {
168
+ return MongoDBClusterStateInOM {}, xerrors .Errorf ("error when reading automation agent pages: %v" , err )
169
+ }
170
+
171
+ automationStatus , err := omConnection .ReadAutomationStatus ()
172
+ if err != nil {
173
+ return MongoDBClusterStateInOM {}, xerrors .Errorf ("error reading automation status: %v" , err )
174
+ }
175
+
176
+ processStateMap , err := calculateProcessStateMap (automationStatus .Processes , agentStatuses )
177
+ if err != nil {
178
+ return MongoDBClusterStateInOM {}, err
179
+ }
180
+
181
+ return MongoDBClusterStateInOM {
182
+ GoalVersion : automationStatus .GoalVersion ,
183
+ ProcessStateMap : processStateMap ,
184
+ }, nil
185
+ }
186
+
187
+ func (c * MongoDBClusterStateInOM ) GetProcessState (hostname string ) ProcessState {
188
+ if processState , ok := c .ProcessStateMap [hostname ]; ok {
189
+ return processState
190
+ }
191
+
192
+ return NewProcessState (hostname )
193
+ }
194
+
195
+ func (c * MongoDBClusterStateInOM ) GetProcesses () []ProcessState {
196
+ return slices .Collect (maps .Values (c .ProcessStateMap ))
197
+ }
198
+
199
+ func (c * MongoDBClusterStateInOM ) GetProcessesNotInGoalState () []ProcessState {
200
+ return slices .DeleteFunc (slices .Collect (maps .Values (c .ProcessStateMap )), func (processState ProcessState ) bool {
201
+ return processState .GoalVersionAchieved >= c .GoalVersion
202
+ })
203
+ }
204
+
205
+ // calculateProcessStateMap combines information from ProcessStatuses and AgentStatuses returned by OpsManager
206
+ // and maps them to a unified data structure.
207
+ //
208
+ // The resulting ProcessState combines information from both agent and process status when refer to the same hostname.
209
+ // It is not guaranteed that we'll have the information from two sources, so in case one side is missing the defaults
210
+ // would be present as defined in NewProcessState.
211
+ // If multiple statuses exist for the same hostname, subsequent entries overwrite ones.
212
+ // Fields such as GoalVersionAchieved default to -1 if never set, and Plan defaults to nil.
213
+ // LastAgentPing defaults to the zero time if no AgentStatus entry is available.
214
+ func calculateProcessStateMap (processStatuses []om.ProcessStatus , agentStatuses []om.AgentStatus ) (map [string ]ProcessState , error ) {
215
+ processStates := map [string ]ProcessState {}
216
+ for _ , agentStatus := range agentStatuses {
217
+ if agentStatus .TypeName != "AUTOMATION" {
218
+ return nil , xerrors .Errorf ("encountered unexpected agent type in agent status type in %+v" , agentStatus )
219
+ }
220
+ processState , ok := processStates [agentStatus .Hostname ]
221
+ if ! ok {
222
+ processState = NewProcessState (agentStatus .Hostname )
223
+ }
224
+ lastPing , err := time .Parse (time .RFC3339 , agentStatus .LastConf )
225
+ if err != nil {
226
+ return nil , xerrors .Errorf ("wrong format for lastConf field: expected UTC format but the value is %s, agentStatus=%+v: %v" , agentStatus .LastConf , agentStatus , err )
227
+ }
228
+ processState .LastAgentPing = lastPing
229
+
230
+ processStates [agentStatus .Hostname ] = processState
231
+ }
232
+
233
+ for _ , processStatus := range processStatuses {
234
+ processState , ok := processStates [processStatus .Hostname ]
235
+ if ! ok {
236
+ processState = NewProcessState (processStatus .Hostname )
237
+ }
238
+ processState .GoalVersionAchieved = processStatus .LastGoalVersionAchieved
239
+ processState .ProcessName = processStatus .Name
240
+ processState .Plan = processStatus .Plan
241
+ processStates [processStatus .Hostname ] = processState
242
+ }
243
+
244
+ return processStates , nil
245
+ }
246
+
247
+ func agentCheck (omConnection om.Connection , agentHostnames []string , log * zap.SugaredLogger ) (string , bool ) {
248
+ registeredHostnamesSet := map [string ]struct {}{}
249
+ predicateFunc := func (aa interface {}) bool {
250
+ automationAgent := aa .(om.Status )
251
+ for _ , hostname := range agentHostnames {
252
+ if automationAgent .IsRegistered (hostname , log ) {
253
+ registeredHostnamesSet [hostname ] = struct {}{}
254
+ if len (registeredHostnamesSet ) == len (agentHostnames ) {
255
+ return true
256
+ }
257
+ }
258
+ }
259
+ return false
260
+ }
261
+
262
+ _ , err := om .TraversePages (
263
+ omConnection .ReadAutomationAgents ,
264
+ predicateFunc ,
265
+ )
266
+ if err != nil {
267
+ return fmt .Sprintf ("Received error when reading automation agent pages: %v" , err ), false
268
+ }
269
+
270
+ // convert to list of keys only for pretty printing in the error message
271
+ var registeredHostnamesList []string
272
+ for hostname := range registeredHostnamesSet {
273
+ registeredHostnamesList = append (registeredHostnamesList , hostname )
274
+ }
275
+
276
+ var msg string
277
+ if len (registeredHostnamesList ) == 0 {
278
+ return fmt .Sprintf ("None of %d expected agents has registered with OM, expected hostnames: %+v" , len (agentHostnames ), agentHostnames ), false
279
+ } else if len (registeredHostnamesList ) == len (agentHostnames ) {
280
+ return fmt .Sprintf ("All of %d expected agents have registered with OM, hostnames: %+v" , len (registeredHostnamesList ), registeredHostnamesList ), true
281
+ } else {
282
+ var missingHostnames []string
283
+ for _ , expectedHostname := range agentHostnames {
284
+ if _ , ok := registeredHostnamesSet [expectedHostname ]; ! ok {
285
+ missingHostnames = append (missingHostnames , expectedHostname )
286
+ }
287
+ }
288
+ msg = fmt .Sprintf ("Only %d of %d expected agents have registered with OM, missing hostnames: %+v, registered hostnames in OM: %+v, expected hostnames: %+v" , len (registeredHostnamesList ), len (agentHostnames ), missingHostnames , registeredHostnamesList , agentHostnames )
289
+ return msg , false
290
+ }
291
+ }
292
+
110
293
// waitUntilRegistered waits until all agents with 'agentHostnames' are registered in OM. Note, that wait
111
294
// happens after retrial - this allows to skip waiting in case agents are already registered
112
295
func waitUntilRegistered (omConnection om.Connection , log * zap.SugaredLogger , r retryParams , agentHostnames ... string ) (bool , string ) {
@@ -120,47 +303,7 @@ func waitUntilRegistered(omConnection om.Connection, log *zap.SugaredLogger, r r
120
303
retrials := env .ReadIntOrDefault (util .PodWaitRetriesEnv , r .retrials )
121
304
122
305
agentsCheckFunc := func () (string , bool ) {
123
- registeredHostnamesMap := map [string ]struct {}{}
124
- _ , err := om .TraversePages (
125
- omConnection .ReadAutomationAgents ,
126
- func (aa interface {}) bool {
127
- automationAgent := aa .(om.Status )
128
- for _ , hostname := range agentHostnames {
129
- if automationAgent .IsRegistered (hostname , log ) {
130
- registeredHostnamesMap [hostname ] = struct {}{}
131
- if len (registeredHostnamesMap ) == len (agentHostnames ) {
132
- return true
133
- }
134
- }
135
- }
136
- return false
137
- },
138
- )
139
- if err != nil {
140
- log .Errorw ("Received error when reading automation agent pages" , "err" , err )
141
- }
142
-
143
- // convert to list of keys only for pretty printing in the error message
144
- var registeredHostnamesList []string
145
- for hostname := range registeredHostnamesMap {
146
- registeredHostnamesList = append (registeredHostnamesList , hostname )
147
- }
148
-
149
- var msg string
150
- if len (registeredHostnamesList ) == 0 {
151
- return fmt .Sprintf ("None of %d expected agents has registered with OM, expected hostnames: %+v" , len (agentHostnames ), agentHostnames ), false
152
- } else if len (registeredHostnamesList ) == len (agentHostnames ) {
153
- return fmt .Sprintf ("All of %d expected agents have registered with OM, hostnames: %+v" , len (registeredHostnamesList ), registeredHostnamesList ), true
154
- } else {
155
- var missingHostnames []string
156
- for _ , expectedHostname := range agentHostnames {
157
- if _ , ok := registeredHostnamesMap [expectedHostname ]; ! ok {
158
- missingHostnames = append (missingHostnames , expectedHostname )
159
- }
160
- }
161
- msg = fmt .Sprintf ("Only %d of %d expected agents have registered with OM, missing hostnames: %+v, registered hostnames in OM: %+v, expected hostnames: %+v" , len (registeredHostnamesList ), len (agentHostnames ), missingHostnames , registeredHostnamesList , agentHostnames )
162
- return msg , false
163
- }
306
+ return agentCheck (omConnection , agentHostnames , log )
164
307
}
165
308
166
309
return util .DoAndRetry (agentsCheckFunc , log , retrials , waitSeconds )
0 commit comments