diff --git a/cmd/activator/main.go b/cmd/activator/main.go index 213ce6a7f99e..f3bc0d2b323c 100644 --- a/cmd/activator/main.go +++ b/cmd/activator/main.go @@ -83,6 +83,9 @@ type config struct { // TODO: run loadtests using these flags to determine optimal default values. MaxIdleProxyConns int `split_words:"true" default:"1000"` MaxIdleProxyConnsPerHost int `split_words:"true" default:"100"` + + ProbeTimeout int `split_words:"true" default:"300"` + ProbeFrequency int `split_words:"true" default:"200"` } func main() { @@ -158,7 +161,8 @@ func main() { // transport so that throttler probe connections can be reused after probing // (via keep-alive) to send real requests, avoiding needing an extra // reconnect for the first request after the probe succeeds. - logger.Debugf("MaxIdleProxyConns: %d, MaxIdleProxyConnsPerHost: %d", env.MaxIdleProxyConns, env.MaxIdleProxyConnsPerHost) + logger.Debugf("MaxIdleProxyConns: %d, MaxIdleProxyConnsPerHost: %d, ProbeTimeout: %dms, ProbeFrequency: %dms", + env.MaxIdleProxyConns, env.MaxIdleProxyConnsPerHost, env.ProbeTimeout, env.ProbeFrequency) transport := pkgnet.NewProxyAutoTransport(env.MaxIdleProxyConns, env.MaxIdleProxyConnsPerHost) // Fetch networking configuration to determine whether EnableMeshPodAddressability @@ -191,7 +195,9 @@ func main() { // Start throttler. throttler := activatornet.NewThrottler(ctx, env.PodIP) - go throttler.Run(ctx, transport, networkConfig.EnableMeshPodAddressability, networkConfig.MeshCompatibilityMode) + probeTimeout := time.Duration(env.ProbeTimeout) * time.Millisecond + probeFrequency := time.Duration(env.ProbeFrequency) * time.Millisecond + go throttler.Run(ctx, transport, networkConfig.EnableMeshPodAddressability, networkConfig.MeshCompatibilityMode, probeTimeout, probeFrequency) // Set up our config store configMapWatcher := configmapinformer.NewInformedWatcher(kubeClient, system.Namespace()) diff --git a/pkg/activator/net/revision_backends.go b/pkg/activator/net/revision_backends.go index 7a4f38646ab2..a410055d1ec1 100644 --- a/pkg/activator/net/revision_backends.go +++ b/pkg/activator/net/revision_backends.go @@ -87,11 +87,6 @@ func (d dests) MarshalLogObject(enc zapcore.ObjectEncoder) error { return nil } -const ( - probeTimeout time.Duration = 300 * time.Millisecond - defaultProbeFrequency time.Duration = 200 * time.Millisecond -) - // revisionWatcher watches the podIPs and ClusterIP of the service for a revision. It implements the logic // to supply revisionDestsUpdate events on updateCh type revisionWatcher struct { @@ -131,13 +126,15 @@ type revisionWatcher struct { // cover the revision's ready conditions, for example when an exec probe is // being used. enableProbeOptimisation bool + + probeTimeout time.Duration } func newRevisionWatcher(ctx context.Context, rev types.NamespacedName, protocol pkgnet.ProtocolType, updateCh chan<- revisionDestsUpdate, destsCh chan dests, transport http.RoundTripper, serviceLister corev1listers.ServiceLister, usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode, - enableProbeOptimisation bool, + enableProbeOptimisation bool, probeTimeout time.Duration, logger *zap.SugaredLogger, ) *revisionWatcher { ctx, cancel := context.WithCancel(ctx) @@ -155,6 +152,7 @@ func newRevisionWatcher(ctx context.Context, rev types.NamespacedName, protocol usePassthroughLb: usePassthroughLb, meshMode: meshMode, enableProbeOptimisation: enableProbeOptimisation, + probeTimeout: probeTimeout, logger: logger.With(zap.String(logkey.Key, rev.String())), } } @@ -219,7 +217,7 @@ func (rw *revisionWatcher) getDest() (string, error) { } func (rw *revisionWatcher) probeClusterIP(dest string) (bool, error) { - ctx, cancel := context.WithTimeout(context.Background(), probeTimeout) + ctx, cancel := context.WithTimeout(context.Background(), rw.probeTimeout) defer cancel() match, _, err := rw.probe(ctx, dest) return match, err @@ -248,7 +246,7 @@ func (rw *revisionWatcher) probePodIPs(ready, notReady sets.Set[string]) (succee } // Context used for our probe requests. - ctx, cancel := context.WithTimeout(context.Background(), probeTimeout) + ctx, cancel := context.WithTimeout(context.Background(), rw.probeTimeout) defer cancel() // Empty errgroup is used as cancellation on first error is not desired, all probes should be @@ -459,19 +457,12 @@ type revisionBackendsManager struct { usePassthroughLb bool meshMode netcfg.MeshCompatibilityMode logger *zap.SugaredLogger + probeTimeout time.Duration probeFrequency time.Duration } -// NewRevisionBackendsManager returns a new RevisionBackendsManager with default -// probe time out. -func newRevisionBackendsManager(ctx context.Context, tr http.RoundTripper, usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode) *revisionBackendsManager { - return newRevisionBackendsManagerWithProbeFrequency(ctx, tr, usePassthroughLb, meshMode, defaultProbeFrequency) -} - -// newRevisionBackendsManagerWithProbeFrequency creates a fully spec'd RevisionBackendsManager. -func newRevisionBackendsManagerWithProbeFrequency(ctx context.Context, tr http.RoundTripper, - usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode, probeFreq time.Duration, -) *revisionBackendsManager { +// newRevisionBackendsManager returns a new RevisionBackendsManager with configurable probe settings. +func newRevisionBackendsManager(ctx context.Context, tr http.RoundTripper, usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode, probeTimeout, probeFreq time.Duration) *revisionBackendsManager { rbm := &revisionBackendsManager{ ctx: ctx, revisionLister: revisioninformer.Get(ctx).Lister(), @@ -482,6 +473,7 @@ func newRevisionBackendsManagerWithProbeFrequency(ctx context.Context, tr http.R usePassthroughLb: usePassthroughLb, meshMode: meshMode, logger: logging.FromContext(ctx), + probeTimeout: probeTimeout, probeFrequency: probeFreq, } endpointsInformer := endpointsinformer.Get(ctx) @@ -565,7 +557,7 @@ func (rbm *revisionBackendsManager) getOrCreateRevisionWatcher(revID types.Names } destsCh := make(chan dests) - rw := newRevisionWatcher(rbm.ctx, revID, rev.GetProtocol(), rbm.updateCh, destsCh, rbm.transport, rbm.serviceLister, rbm.usePassthroughLb, rbm.meshMode, enableProbeOptimisation, rbm.logger) + rw := newRevisionWatcher(rbm.ctx, revID, rev.GetProtocol(), rbm.updateCh, destsCh, rbm.transport, rbm.serviceLister, rbm.usePassthroughLb, rbm.meshMode, enableProbeOptimisation, rbm.probeTimeout, rbm.logger) rbm.revisionWatchers[revID] = rw go rw.run(rbm.probeFrequency) return rw, nil diff --git a/pkg/activator/net/revision_backends_test.go b/pkg/activator/net/revision_backends_test.go index 632bf123bfde..4ef9f40deea9 100644 --- a/pkg/activator/net/revision_backends_test.go +++ b/pkg/activator/net/revision_backends_test.go @@ -56,8 +56,9 @@ const ( testNamespace = "test-namespace" testRevision = "test-revision" - probeFreq = 50 * time.Millisecond - updateTimeout = 16 * probeFreq + probeFreq = 50 * time.Millisecond + updateTimeout = 16 * probeFreq + defaultProbeTimeout = 300 * time.Millisecond meshErrorStatusCode = http.StatusServiceUnavailable ) @@ -555,6 +556,7 @@ func TestRevisionWatcher(t *testing.T) { tc.usePassthroughLb, // usePassthroughLb tc.meshMode, true, + defaultProbeTimeout, logger) rw.clusterIPHealthy = tc.initialClusterIPState @@ -993,7 +995,7 @@ func TestRevisionBackendManagerAddEndpoint(t *testing.T) { t.Fatal("Failed to start informers:", err) } - rbm := newRevisionBackendsManagerWithProbeFrequency(ctx, rt, false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, probeFreq) + rbm := newRevisionBackendsManager(ctx, rt, false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, defaultProbeTimeout, probeFreq) defer func() { cancel() waitInformers() @@ -1456,7 +1458,7 @@ func TestRevisionDeleted(t *testing.T) { ri.Informer().GetIndexer().Add(rev) fakeRT := activatortest.FakeRoundTripper{} - rbm := newRevisionBackendsManagerWithProbeFrequency(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, probeFreq) + rbm := newRevisionBackendsManager(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, defaultProbeTimeout, probeFreq) defer func() { cancel() waitInformers() @@ -1512,7 +1514,7 @@ func TestServiceDoesNotExist(t *testing.T) { }}, }, } - rbm := newRevisionBackendsManagerWithProbeFrequency(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, probeFreq) + rbm := newRevisionBackendsManager(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, defaultProbeTimeout, probeFreq) defer func() { cancel() waitInformers() @@ -1576,7 +1578,7 @@ func TestServiceMoreThanOne(t *testing.T) { }}, }, } - rbm := newRevisionBackendsManagerWithProbeFrequency(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, probeFreq) + rbm := newRevisionBackendsManager(ctx, pkgnetwork.RoundTripperFunc(fakeRT.RT), false /*usePassthroughLb*/, netcfg.MeshCompatibilityModeAuto, defaultProbeTimeout, probeFreq) defer func() { cancel() waitInformers() @@ -1855,6 +1857,7 @@ func TestProbePodIPs(t *testing.T) { enableProbeOptimisation: input.enableProbeOptimization, meshMode: input.meshMode, healthyPods: input.healthy, + probeTimeout: defaultProbeTimeout, } healthy, noop, notMesh, err := rw.probePodIPs(input.current.ready, input.current.notReady) diff --git a/pkg/activator/net/throttler.go b/pkg/activator/net/throttler.go index 0ef298c48db4..993ef0ae9a2f 100644 --- a/pkg/activator/net/throttler.go +++ b/pkg/activator/net/throttler.go @@ -22,6 +22,7 @@ import ( "sort" "sync" "sync/atomic" + "time" "go.uber.org/zap" "k8s.io/apimachinery/pkg/util/sets" @@ -497,8 +498,8 @@ func NewThrottler(ctx context.Context, ipAddr string) *Throttler { } // Run starts the throttler and blocks until the context is done. -func (t *Throttler) Run(ctx context.Context, probeTransport http.RoundTripper, usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode) { - rbm := newRevisionBackendsManager(ctx, probeTransport, usePassthroughLb, meshMode) +func (t *Throttler) Run(ctx context.Context, probeTransport http.RoundTripper, usePassthroughLb bool, meshMode netcfg.MeshCompatibilityMode, probeTimeout, probeFrequency time.Duration) { + rbm := newRevisionBackendsManager(ctx, probeTransport, usePassthroughLb, meshMode, probeTimeout, probeFrequency) // Update channel is closed when ctx is done. t.run(rbm.updates()) }