Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/global/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ type ValueType interface {
string | int | uint16 | uint32 | uint64 | bool | time.Duration
}

const (
K8sClientQPS = "k8s.client/qps"
K8sClientBurst = "k8s.client/burst"
K8sBrokerClientQPS = "k8s.broker.client/qps"
K8sBrokerClientBurst = "k8s.broker.client/burst"
)

var (
configMap sync.Map
logger = log.Logger{Logger: logf.Log.WithName("Global")}
Expand Down
4 changes: 2 additions & 2 deletions pkg/syncer/broker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ type brokerSpecification struct {
APIServer string
APIServerToken string
RemoteNamespace string
Insecure bool `default:"false"`
Ca string
Secret string
Insecure bool `default:"false"`
}

const brokerConfigPrefix = "broker_k8s"
Expand All @@ -43,7 +43,7 @@ func getBrokerSpecification() (*brokerSpecification, error) {

err := envconfig.Process(brokerConfigPrefix, &brokerSpec)
if err != nil {
return nil, errors.Wrap(err, "error processing env configuration")
return nil, errors.Wrap(err, "error processing broker env configuration")
}

return &brokerSpec, nil
Expand Down
7 changes: 6 additions & 1 deletion pkg/syncer/broker/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/submariner-io/admiral/pkg/federate"
"github.com/submariner-io/admiral/pkg/global"
"github.com/submariner-io/admiral/pkg/log"
"github.com/submariner-io/admiral/pkg/resource"
"github.com/submariner-io/admiral/pkg/syncer"
Expand Down Expand Up @@ -327,7 +328,6 @@ func (c *SyncerConfig) createBrokerClient() error {
}

c.BrokerNamespace = spec.RemoteNamespace

// If we have a secret, try to use it
if spec.Secret != "" {
c.BrokerRestConfig, authorized, err = resource.GetAuthorizedRestConfigFromFiles(spec.APIServer,
Expand All @@ -343,6 +343,11 @@ func (c *SyncerConfig) createBrokerClient() error {
c.BrokerRestConfig, authorized, err = resource.GetAuthorizedRestConfigFromData(spec.APIServer, spec.APIServerToken, spec.Ca,
&rest.TLSClientConfig{Insecure: spec.Insecure}, *gvr, spec.RemoteNamespace)
}

if c.BrokerRestConfig != nil {
c.BrokerRestConfig.QPS = float32(global.Get(global.K8sBrokerClientQPS, 0))
c.BrokerRestConfig.Burst = global.Get(global.K8sBrokerClientBurst, 0)
}
}

if !authorized {
Expand Down
55 changes: 53 additions & 2 deletions pkg/syncer/broker/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/submariner-io/admiral/pkg/fake"
fakefederator "github.com/submariner-io/admiral/pkg/federate/fake"
"github.com/submariner-io/admiral/pkg/global"
resourceutils "github.com/submariner-io/admiral/pkg/resource"
sync "github.com/submariner-io/admiral/pkg/syncer"
"github.com/submariner-io/admiral/pkg/syncer/broker"
Expand Down Expand Up @@ -64,6 +65,7 @@ var _ = Describe("Broker Syncer", func() {
initialLocalResources []runtime.Object
initialBrokerResources []runtime.Object
stopCh chan struct{}
actualLocalRestConfig *rest.Config
actualBrokerRestConfig *rest.Config
expectInitError bool
)
Expand All @@ -74,8 +76,10 @@ var _ = Describe("Broker Syncer", func() {
os.Unsetenv("BROKER_K8S_REMOTENAMESPACE")
os.Unsetenv("BROKER_K8S_INSECURE")
os.Unsetenv("BROKER_K8S_SECRET")
global.Init()

expectInitError = false
actualLocalRestConfig = nil
actualBrokerRestConfig = nil
initialLocalResources = nil
initialBrokerResources = nil
Expand Down Expand Up @@ -122,6 +126,7 @@ var _ = Describe("Broker Syncer", func() {
if config.LocalRestConfig != nil || config.BrokerRestConfig != nil || brokerAPIServer != "" {
resourceutils.NewDynamicClient = func(inConfig *rest.Config) (dynamic.Interface, error) {
if equality.Semantic.DeepDerivative(inConfig, config.LocalRestConfig) {
actualLocalRestConfig = inConfig
return localDynClient, nil
} else if equality.Semantic.DeepDerivative(inConfig, config.BrokerRestConfig) ||
(brokerAPIServer != "" && strings.HasSuffix(inConfig.Host, brokerAPIServer)) {
Expand Down Expand Up @@ -628,11 +633,15 @@ var _ = Describe("Broker Syncer", func() {
When("rest config instances are specified", func() {
BeforeEach(func() {
config.LocalRestConfig = &rest.Config{
Host: "https://local",
Host: "https://local",
QPS: 30,
Burst: 100,
}

config.BrokerRestConfig = &rest.Config{
Host: "https://broker",
Host: "https://broker",
QPS: 50,
Burst: 200,
}
})

Expand All @@ -641,6 +650,32 @@ var _ = Describe("Broker Syncer", func() {
test.AwaitResource(brokerClient, resource.GetName())
})

It("should use the local rest config QPS and Burst values", func() {
Expect(actualLocalRestConfig.QPS).To(Equal(float32(30)))
Expect(actualLocalRestConfig.Burst).To(Equal(100))
})

It("should use the broker rest config QPS and Burst values", func() {
Expect(actualBrokerRestConfig.QPS).To(Equal(float32(50)))
Expect(actualBrokerRestConfig.Burst).To(Equal(200))
})

Context("and global config QPS/Burst are also set", func() {
BeforeEach(func() {
global.Init(&corev1.ConfigMap{
Data: map[string]string{
global.K8sBrokerClientQPS: "100",
global.K8sBrokerClientBurst: "500",
},
})
})

It("should prioritize the broker rest config over global config", func() {
Expect(actualBrokerRestConfig.QPS).To(Equal(float32(50)))
Expect(actualBrokerRestConfig.Burst).To(Equal(200))
})
})

Context("and broker authorization fails", func() {
BeforeEach(func() {
expectInitError = true
Expand Down Expand Up @@ -687,5 +722,21 @@ var _ = Describe("Broker Syncer", func() {
test.AwaitResource(brokerClient, resource.GetName())
})
})

Context("and global QPS/Burst config is set", func() {
BeforeEach(func() {
global.Init(&corev1.ConfigMap{
Data: map[string]string{
global.K8sBrokerClientQPS: "100",
global.K8sBrokerClientBurst: "500",
},
})
})

It("should apply the QPS and Burst settings to the broker rest config", func() {
Expect(actualBrokerRestConfig.QPS).To(Equal(float32(100)))
Expect(actualBrokerRestConfig.Burst).To(Equal(500))
})
})
})
})
Loading