Skip to content

Commit a68ea0f

Browse files
authored
perf: skip infra ir and status subscription for followers (#7145)
Signed-off-by: Rudrakh Panigrahi <[email protected]>
1 parent b01639f commit a68ea0f

File tree

4 files changed

+40
-19
lines changed

4 files changed

+40
-19
lines changed

internal/cmd/server.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -237,15 +237,9 @@ func startRunners(ctx context.Context, cfg *config.Server) (err error) {
237237
// Wait until done
238238
<-ctx.Done()
239239

240-
// Close messages
241-
closeChannels := []interface{ Close() }{
242-
channels.pResources,
243-
channels.xdsIR,
244-
channels.infraIR,
245-
}
246-
for _, ch := range closeChannels {
247-
ch.Close()
248-
}
240+
// Close xdsIR channel
241+
// No need to close infraIR and pResources channels since they are already closed
242+
channels.xdsIR.Close()
249243

250244
cfg.Logger.Info("runners are shutting down")
251245
for _, r := range runners {

internal/infrastructure/runner/runner.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,10 @@ func (r *Runner) Start(ctx context.Context) (err error) {
5757
return err
5858
}
5959

60-
sub := r.InfraIR.Subscribe(ctx)
61-
initInfra := func() {
60+
// This is a blocking function that subscribes to the infraIR and initializes the infrastructure.
61+
subscribeInitInfraAndCloseInfraIRMessage := func() {
62+
// Subscribe and Close in same goroutine to avoid race condition.
63+
sub := r.InfraIR.Subscribe(ctx)
6264
go r.subscribeToProxyInfraIR(ctx, sub)
6365

6466
// Enable global ratelimit if it has been configured.
@@ -73,6 +75,9 @@ func (r *Runner) Start(ctx context.Context) (err error) {
7375
}()
7476
}
7577
r.Logger.Info("started")
78+
<-ctx.Done()
79+
r.InfraIR.Close()
80+
r.Logger.Info("shutting down")
7681
}
7782

7883
// When leader election is active, infrastructure initialization occurs only upon acquiring leadership
@@ -82,14 +87,18 @@ func (r *Runner) Start(ctx context.Context) (err error) {
8287
go func() {
8388
select {
8489
case <-ctx.Done():
90+
// As a follower EG instance close infraIR when the context is done.
91+
r.InfraIR.Close()
8592
return
8693
case <-r.Elected:
87-
initInfra()
94+
// As a leader EG instance subscribe to infraIR to initialize the infrastructure and Close when the context is done.
95+
subscribeInitInfraAndCloseInfraIRMessage()
8896
}
8997
}()
90-
return
98+
} else {
99+
// Since leader election is disabled subscribe to infraIR to initialize the infrastructure and Close when the context is done.
100+
go subscribeInitInfraAndCloseInfraIRMessage()
91101
}
92-
initInfra()
93102
return
94103
}
95104

internal/message/types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func (p *ProviderResources) Close() {
7070
p.GatewayAPIResources.Close()
7171
p.GatewayAPIStatuses.Close()
7272
p.PolicyStatuses.Close()
73+
p.ExtensionStatuses.Close()
7374
}
7475

7576
// GatewayAPIStatuses contains gateway API resources statuses
@@ -122,6 +123,10 @@ func (p *PolicyStatuses) Close() {
122123
p.ExtensionPolicyStatuses.Close()
123124
}
124125

126+
func (e *ExtensionStatuses) Close() {
127+
e.BackendStatuses.Close()
128+
}
129+
125130
// XdsIR message
126131
type XdsIR struct {
127132
watchable.Map[string, *ir.Xds]

internal/provider/kubernetes/controller.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -198,23 +198,36 @@ func newGatewayAPIController(ctx context.Context, mgr manager.Manager, cfg *conf
198198
return fmt.Errorf("error watching resources: %w", err)
199199
}
200200

201-
// Do not call .Subscribe() inside Goroutine since it is supposed to be called from the same
202-
// Goroutine where Close() is called.
203-
r.subscribeToResources(ctx)
201+
// This is a blocking function that subscribes to the resource updates and updates the status.
202+
subscribeUpdateStatusAndCloseResources := func() {
203+
// Subscribe to resource updates
204+
r.subscribeToResources(ctx)
205+
// Update status
206+
go r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.ExtensionManager != nil)
207+
r.log.Info("started")
208+
// Close resources if the context is done.
209+
<-ctx.Done()
210+
r.resources.Close()
211+
r.log.Info("shutting down")
212+
}
204213

205214
// When leader election is enabled, only subscribe to status updates upon acquiring leadership.
206215
if cfg.EnvoyGateway.Provider.Type == egv1a1.ProviderTypeKubernetes &&
207216
!ptr.Deref(cfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) {
208217
go func() {
209218
select {
210219
case <-ctx.Done():
220+
// As a follower EG instance close resources when the context is done.
221+
r.resources.Close()
211222
return
212223
case <-cfg.Elected:
213-
r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.ExtensionManager != nil)
224+
// As a leader EG instance subscribe to resource updates and Close resources when the context is done.
225+
subscribeUpdateStatusAndCloseResources()
214226
}
215227
}()
216228
} else {
217-
r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.ExtensionManager != nil)
229+
// Since leader election is disabled subscribe to resource updates and Close resources when the context is done.
230+
go subscribeUpdateStatusAndCloseResources()
218231
}
219232
return nil
220233
}

0 commit comments

Comments
 (0)