Skip to content

Commit 7fe2157

Browse files
rudrakhplogand22
authored andcommitted
perf: skip infra ir and status subscription for followers (envoyproxy#7145)
Signed-off-by: Rudrakh Panigrahi <rudrakh97@gmail.com>
1 parent a32adc8 commit 7fe2157

File tree

4 files changed

+40
-19
lines changed

4 files changed

+40
-19
lines changed

internal/cmd/server.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -237,15 +237,9 @@ func startRunners(ctx context.Context, cfg *config.Server) (err error) {
237237
// Wait until done
238238
<-ctx.Done()
239239

240-
// Close messages
241-
closeChannels := []interface{ Close() }{
242-
channels.pResources,
243-
channels.xdsIR,
244-
channels.infraIR,
245-
}
246-
for _, ch := range closeChannels {
247-
ch.Close()
248-
}
240+
// Close xdsIR channel
241+
// No need to close infraIR and pResources channels since they are already closed
242+
channels.xdsIR.Close()
249243

250244
cfg.Logger.Info("runners are shutting down")
251245
for _, r := range runners {

internal/infrastructure/runner/runner.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,10 @@ func (r *Runner) Start(ctx context.Context) (err error) {
5757
return err
5858
}
5959

60-
sub := r.InfraIR.Subscribe(ctx)
61-
initInfra := func() {
60+
// This is a blocking function that subscribes to the infraIR and initializes the infrastructure.
61+
subscribeInitInfraAndCloseInfraIRMessage := func() {
62+
// Subscribe and Close in same goroutine to avoid race condition.
63+
sub := r.InfraIR.Subscribe(ctx)
6264
go r.subscribeToProxyInfraIR(ctx, sub)
6365

6466
// Enable global ratelimit if it has been configured.
@@ -73,6 +75,9 @@ func (r *Runner) Start(ctx context.Context) (err error) {
7375
}()
7476
}
7577
r.Logger.Info("started")
78+
<-ctx.Done()
79+
r.InfraIR.Close()
80+
r.Logger.Info("shutting down")
7681
}
7782

7883
// When leader election is active, infrastructure initialization occurs only upon acquiring leadership
@@ -82,14 +87,18 @@ func (r *Runner) Start(ctx context.Context) (err error) {
8287
go func() {
8388
select {
8489
case <-ctx.Done():
90+
// As a follower EG instance close infraIR when the context is done.
91+
r.InfraIR.Close()
8592
return
8693
case <-r.Elected:
87-
initInfra()
94+
// As a leader EG instance subscribe to infraIR to initialize the infrastructure and Close when the context is done.
95+
subscribeInitInfraAndCloseInfraIRMessage()
8896
}
8997
}()
90-
return
98+
} else {
99+
// Since leader election is disabled subscribe to infraIR to initialize the infrastructure and Close when the context is done.
100+
go subscribeInitInfraAndCloseInfraIRMessage()
91101
}
92-
initInfra()
93102
return
94103
}
95104

internal/message/types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func (p *ProviderResources) Close() {
7070
p.GatewayAPIResources.Close()
7171
p.GatewayAPIStatuses.Close()
7272
p.PolicyStatuses.Close()
73+
p.ExtensionStatuses.Close()
7374
}
7475

7576
// GatewayAPIStatuses contains gateway API resources statuses
@@ -122,6 +123,10 @@ func (p *PolicyStatuses) Close() {
122123
p.ExtensionPolicyStatuses.Close()
123124
}
124125

126+
func (e *ExtensionStatuses) Close() {
127+
e.BackendStatuses.Close()
128+
}
129+
125130
// XdsIR message
126131
type XdsIR struct {
127132
watchable.Map[string, *ir.Xds]

internal/provider/kubernetes/controller.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -199,23 +199,36 @@ func newGatewayAPIController(ctx context.Context, mgr manager.Manager, cfg *conf
199199
return fmt.Errorf("error watching resources: %w", err)
200200
}
201201

202-
// Do not call .Subscribe() inside Goroutine since it is supposed to be called from the same
203-
// Goroutine where Close() is called.
204-
r.subscribeToResources(ctx)
202+
// This is a blocking function that subscribes to the resource updates and updates the status.
203+
subscribeUpdateStatusAndCloseResources := func() {
204+
// Subscribe to resource updates
205+
r.subscribeToResources(ctx)
206+
// Update status
207+
go r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.ExtensionManager != nil)
208+
r.log.Info("started")
209+
// Close resources if the context is done.
210+
<-ctx.Done()
211+
r.resources.Close()
212+
r.log.Info("shutting down")
213+
}
205214

206215
// When leader election is enabled, only subscribe to status updates upon acquiring leadership.
207216
if cfg.EnvoyGateway.Provider.Type == egv1a1.ProviderTypeKubernetes &&
208217
!ptr.Deref(cfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) {
209218
go func() {
210219
select {
211220
case <-ctx.Done():
221+
// As a follower EG instance close resources when the context is done.
222+
r.resources.Close()
212223
return
213224
case <-cfg.Elected:
214-
r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.ExtensionManager != nil)
225+
// As a leader EG instance subscribe to resource updates and Close resources when the context is done.
226+
subscribeUpdateStatusAndCloseResources()
215227
}
216228
}()
217229
} else {
218-
r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.ExtensionManager != nil)
230+
// Since leader election is disabled subscribe to resource updates and Close resources when the context is done.
231+
go subscribeUpdateStatusAndCloseResources()
219232
}
220233
return nil
221234
}

0 commit comments

Comments
 (0)