Skip to content

Commit 291bacb

Browse files
committed
leader election and unique par identity
1 parent 1d5cd3b commit 291bacb

File tree

3 files changed

+35
-23
lines changed

3 files changed

+35
-23
lines changed

cmd/cluster-agent/subcommands/start/command.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ func start(log log.Component,
551551
}
552552

553553
if config.GetBool("private_action_runner.enabled") {
554-
drain, err := startPrivateActionRunner(mainCtx, config, hostnameGetter, rcClient, log)
554+
drain, err := startPrivateActionRunner(mainCtx, config, hostnameGetter, rcClient, le, log)
555555
if err != nil {
556556
log.Errorf("Cannot start private action runner: %v", err)
557557
} else {
@@ -688,11 +688,19 @@ func startPrivateActionRunner(
688688
config config.Component,
689689
hostnameGetter hostnameinterface.Component,
690690
rcClient *rcclient.Client,
691+
le *leaderelection.LeaderEngine,
691692
log log.Component,
692693
) (func(), error) {
693694
if rcClient == nil {
694695
return nil, errors.New("Remote config is disabled or failed to initialize, remote config is a required dependency for private action runner")
695696
}
697+
if !config.GetBool("leader_election") {
698+
return nil, errors.New("leader election is not enabled on the Cluster Agent. The private action runner needs leader election for identity coordination across replicas")
699+
}
700+
err := le.EnsureLeaderElectionRuns()
701+
if err != nil {
702+
return nil, err
703+
}
696704
app, err := privateactionrunner.NewPrivateActionRunner(ctx, config, hostnameGetter, rcClient, log)
697705
if err != nil {
698706
return nil, err

pkg/privateactionrunner/enrollment/k8s_secret.go

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,28 +46,26 @@ func getIdentityFromK8sSecret(ctx context.Context, cfg configModel.Reader) (*Per
4646
ns := namespace.GetResourcesNamespace()
4747
secretName := getSecretName(cfg)
4848

49-
// Check if we're a follower - if so, we may need to wait for leader to create the secret
5049
le, err := leaderelection.GetLeaderEngine()
51-
isFollower := err == nil && !le.IsLeader()
50+
if err != nil {
51+
return nil, err
52+
}
5253

5354
secret, err := client.CoreV1().Secrets(ns).Get(ctx, secretName, metav1.GetOptions{})
5455
if err != nil {
55-
if k8serrors.IsNotFound(err) {
56-
// If we're a follower and secret doesn't exist, wait for leader to create it
57-
if isFollower {
58-
log.Info("Follower replica: waiting for leader to create PAR identity secret")
59-
secret, err = waitForSecretCreation(ctx, client, ns, secretName)
60-
if err != nil {
61-
return nil, err
62-
}
63-
// Continue to parse the secret below
64-
} else {
65-
// Leader or no leader election - return nil to trigger self-enrollment
66-
return nil, nil
67-
}
68-
} else {
56+
if !k8serrors.IsNotFound(err) {
6957
return nil, fmt.Errorf("failed to get identity secret: %w", err)
7058
}
59+
if le.IsLeader() {
60+
// Leader - return nil to trigger self-enrollment
61+
return nil, nil
62+
}
63+
// Follower - since secret doesn't exist, wait for leader to create it
64+
log.Info("Follower replica: waiting for leader to create PAR identity secret")
65+
secret, err = waitForSecretCreation(ctx, client, ns, secretName)
66+
if err != nil {
67+
return nil, err
68+
}
7169
}
7270

7371
privateKey, ok := secret.Data[privateKeyField]
@@ -90,12 +88,11 @@ func getIdentityFromK8sSecret(ctx context.Context, cfg configModel.Reader) (*Per
9088

9189
// persistIdentityToK8sSecret saves the enrollment result to a Kubernetes secret
9290
func persistIdentityToK8sSecret(ctx context.Context, cfg configModel.Reader, result *Result) error {
93-
// Check if this replica is the leader before creating the secret
9491
le, err := leaderelection.GetLeaderEngine()
9592
if err != nil {
96-
log.Warnf("Failed to get leader engine, proceeding without leader check: %v", err)
97-
// Fall through to create secret anyway if leader election is not available
98-
} else if !le.IsLeader() {
93+
return err
94+
}
95+
if !le.IsLeader() {
9996
log.Info("Not leader, skipping PAR identity secret creation (leader will handle it)")
10097
return nil
10198
}

pkg/util/kubernetes/apiserver/leaderelection/leaderelection.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ func (le *LeaderEngine) init() error {
191191
func (le *LeaderEngine) StartLeaderElectionRun() {
192192
le.once.Do(
193193
func() {
194+
log.Infof("Starting leader election goroutine for %q", le.HolderIdentity)
194195
go le.runLeaderElection()
195196
},
196197
)
@@ -209,14 +210,20 @@ func (le *LeaderEngine) EnsureLeaderElectionRuns() error {
209210

210211
le.StartLeaderElectionRun()
211212

213+
log.Infof("Waiting for leader election to determine a leader (timeout: %s)...", getLeaderTimeout)
212214
timeout := time.After(getLeaderTimeout)
213215
tick := time.NewTicker(time.Second)
214216
defer tick.Stop()
215217
for {
216-
log.Tracef("Waiting for new leader identity...")
217218
select {
218219
case <-tick.C:
219-
leaderIdentity := le.GetLeader()
220+
// Query Kubernetes directly to see current leader, don't wait for callback
221+
leaderIdentity, err := le.getCurrentLeader()
222+
if err != nil {
223+
log.Warnf("Error querying current leader: %v", err)
224+
continue
225+
}
226+
log.Infof("Polling for leader: current leader identity = %q", leaderIdentity)
220227
if leaderIdentity != "" {
221228
log.Infof("Leader election running, current leader is %q", leaderIdentity)
222229
le.running = true

0 commit comments

Comments
 (0)