@@ -22,11 +22,17 @@ package resources
2222
2323import (
2424 "context"
25+ "net/http"
26+ "sync"
2527
2628 meta "k8s.io/apimachinery/pkg/apis/meta/v1"
29+ "k8s.io/apimachinery/pkg/types"
30+
31+ "github.com/arangodb/go-driver"
2732
2833 api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
2934 "github.com/arangodb/kube-arangodb/pkg/apis/shared"
35+ "github.com/arangodb/kube-arangodb/pkg/deployment/features"
3036 "github.com/arangodb/kube-arangodb/pkg/deployment/patch"
3137 "github.com/arangodb/kube-arangodb/pkg/util/errors"
3238 "github.com/arangodb/kube-arangodb/pkg/util/globals"
@@ -125,8 +131,8 @@ func (r *Resources) EnsureLeader(ctx context.Context, cachedStatus inspectorInte
125131 if s , ok := cachedStatus .Service ().V1 ().GetSimple (leaderAgentSvcName ); ok {
126132 if err , adjusted := r .adjustService (ctx , s , shared .ArangoPort , selector ); err == nil {
127133 if ! adjusted {
128- // The service is not changed.
129- return nil
134+ // The service is not changed, so single server leader can be set .
135+ return r . ensureSingleServerLeader ( ctx , cachedStatus )
130136 }
131137
132138 return errors .Reconcile ()
@@ -149,3 +155,208 @@ func (r *Resources) EnsureLeader(ctx context.Context, cachedStatus inspectorInte
149155 // The service has been created.
150156 return errors .Reconcile ()
151157}
158+
159+ // getSingleServerLeaderID returns id of a single server leader.
160+ func (r * Resources ) getSingleServerLeaderID (ctx context.Context ) (string , error ) {
161+ status , _ := r .context .GetStatus ()
162+ var mutex sync.Mutex
163+ var leaderID string
164+ var anyError error
165+
166+ dbServers := func (group api.ServerGroup , list api.MemberStatusList ) error {
167+ if len (list ) == 0 {
168+ return nil
169+ }
170+ ctxCancel , cancel := context .WithCancel (ctx )
171+ defer func () {
172+ cancel ()
173+ }()
174+
175+ // Fetch availability of each single server.
176+ var wg sync.WaitGroup
177+ wg .Add (len (list ))
178+ for _ , m := range list {
179+ go func (id string ) {
180+ defer wg .Done ()
181+ err := globals .GetGlobalTimeouts ().ArangoD ().RunWithTimeout (ctxCancel , func (ctxChild context.Context ) error {
182+ c , err := r .context .GetServerClient (ctxChild , api .ServerGroupSingle , id )
183+ if err != nil {
184+ return err
185+ }
186+
187+ if available , err := isServerAvailable (ctxChild , c ); err != nil {
188+ return err
189+ } else if ! available {
190+ return errors .New ("not available" )
191+ }
192+
193+ // Other requests can be interrupted, because a leader is known already.
194+ cancel ()
195+ mutex .Lock ()
196+ leaderID = id
197+ mutex .Unlock ()
198+ return nil
199+ })
200+
201+ if err != nil {
202+ mutex .Lock ()
203+ anyError = err
204+ mutex .Unlock ()
205+ }
206+ }(m .ID )
207+ }
208+ wg .Wait ()
209+
210+ return nil
211+ }
212+
213+ if err := status .Members .ForeachServerInGroups (dbServers , api .ServerGroupSingle ); err != nil {
214+ return "" , err
215+ }
216+
217+ if len (leaderID ) > 0 {
218+ return leaderID , nil
219+ }
220+
221+ if anyError != nil {
222+ return "" , errors .WithMessagef (anyError , "unable to get a leader" )
223+ }
224+
225+ return "" , errors .New ("unable to get a leader" )
226+ }
227+
228+ // setSingleServerLeadership adds or removes leadership label on a single server pod.
229+ func (r * Resources ) ensureSingleServerLeader (ctx context.Context , cachedStatus inspectorInterface.Inspector ) error {
230+ changed := false
231+
232+ enabled := features .FailoverLeadership ().Enabled ()
233+ var leaderID string
234+ if enabled {
235+ var err error
236+ if leaderID , err = r .getSingleServerLeaderID (ctx ); err != nil {
237+ return err
238+ }
239+ }
240+
241+ singleServers := func (group api.ServerGroup , list api.MemberStatusList ) error {
242+ for _ , m := range list {
243+ pod , exist := cachedStatus .Pod ().V1 ().GetSimple (m .PodName )
244+ if ! exist {
245+ continue
246+ }
247+
248+ labels := pod .GetLabels ()
249+ if enabled && m .ID == leaderID {
250+ if value , ok := labels [k8sutil .LabelKeyArangoLeader ]; ok && value == "true" {
251+ // Single server is available, and it has a leader label.
252+ continue
253+ }
254+
255+ labels = addLabel (labels , k8sutil .LabelKeyArangoLeader , "true" )
256+ } else {
257+ if _ , ok := labels [k8sutil .LabelKeyArangoLeader ]; ! ok {
258+ // Single server is not available, and it does not have a leader label.
259+ continue
260+ }
261+
262+ delete (labels , k8sutil .LabelKeyArangoLeader )
263+ }
264+
265+ err := r .context .ApplyPatchOnPod (ctx , pod , patch .ItemReplace (patch .NewPath ("metadata" , "labels" ), labels ))
266+ if err != nil {
267+ return errors .WithMessagef (err , "unable to change leader label for pod %s" , m .PodName )
268+ }
269+ changed = true
270+ }
271+
272+ return nil
273+ }
274+
275+ status , _ := r .context .GetStatus ()
276+ if err := status .Members .ForeachServerInGroups (singleServers , api .ServerGroupSingle ); err != nil {
277+ return err
278+ }
279+
280+ if changed {
281+ return errors .Reconcile ()
282+ }
283+
284+ return r .ensureSingleServerLeaderServices (ctx , cachedStatus )
285+ }
286+
287+ // ensureSingleServerLeaderServices adds a leadership label to deployment service and external deployment service.
288+ func (r * Resources ) ensureSingleServerLeaderServices (ctx context.Context , cachedStatus inspectorInterface.Inspector ) error {
289+ // Add a leadership label to deployment service and external deployment service.
290+ deploymentName := r .context .GetAPIObject ().GetName ()
291+ changed := false
292+ services := []string {
293+ k8sutil .CreateDatabaseClientServiceName (deploymentName ),
294+ k8sutil .CreateDatabaseExternalAccessServiceName (deploymentName ),
295+ }
296+
297+ enabled := features .FailoverLeadership ().Enabled ()
298+ for _ , svcName := range services {
299+ svc , exists := cachedStatus .Service ().V1 ().GetSimple (svcName )
300+ if ! exists {
301+ // It will be created later with a leadership label.
302+ continue
303+ }
304+ selector := svc .Spec .Selector
305+ if enabled {
306+ if v , ok := selector [k8sutil .LabelKeyArangoLeader ]; ok && v == "true" {
307+ // It is already OK.
308+ continue
309+ }
310+
311+ selector = addLabel (selector , k8sutil .LabelKeyArangoLeader , "true" )
312+ } else {
313+ if _ , ok := selector [k8sutil .LabelKeyArangoLeader ]; ! ok {
314+ // Service does not have a leader label, and it should not have.
315+ continue
316+ }
317+
318+ delete (selector , k8sutil .LabelKeyArangoLeader )
319+ }
320+
321+ parser := patch .Patch ([]patch.Item {patch .ItemReplace (patch .NewPath ("spec" , "selector" ), selector )})
322+ data , err := parser .Marshal ()
323+ if err != nil {
324+ return errors .WithMessagef (err , "unable to marshal labels for service %s" , svcName )
325+ }
326+
327+ err = globals .GetGlobalTimeouts ().Kubernetes ().RunWithTimeout (ctx , func (ctxChild context.Context ) error {
328+ _ , err := cachedStatus .ServicesModInterface ().V1 ().Patch (ctxChild , svcName , types .JSONPatchType , data , meta.PatchOptions {})
329+ return err
330+ })
331+ if err != nil {
332+ return errors .WithMessagef (err , "unable to patch labels for service %s" , svcName )
333+ }
334+ changed = true
335+ }
336+
337+ if changed {
338+ return errors .Reconcile ()
339+ }
340+
341+ return nil
342+ }
343+
344+ // isServerAvailable returns true when server is available.
345+ // In active fail-over mode one of the server should be available.
346+ func isServerAvailable (ctx context.Context , c driver.Client ) (bool , error ) {
347+ req , err := c .Connection ().NewRequest ("GET" , "_admin/server/availability" )
348+ if err != nil {
349+ return false , errors .WithStack (err )
350+ }
351+
352+ resp , err := c .Connection ().Do (ctx , req )
353+ if err != nil {
354+ return false , errors .WithStack (err )
355+ }
356+
357+ if err := resp .CheckStatus (http .StatusOK , http .StatusServiceUnavailable ); err != nil {
358+ return false , errors .WithStack (err )
359+ }
360+
361+ return resp .StatusCode () == http .StatusOK , nil
362+ }
0 commit comments