Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions common/extension/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,35 @@ import (
)

var (
authenticators = make(map[string]func() filter.Authenticator)
accessKeyStorages = make(map[string]func() filter.AccessKeyStorage)
authenticators = NewRegistry[func() filter.Authenticator]("authenticator")
accessKeyStorages = NewRegistry[func() filter.AccessKeyStorage]("access key storage")
)

// SetAuthenticator puts the @fcn into map with name
func SetAuthenticator(name string, fcn func() filter.Authenticator) {
authenticators[name] = fcn
authenticators.Register(name, fcn)
}

// GetAuthenticator finds the Authenticator with @name
// Panic if not found
func GetAuthenticator(name string) (filter.Authenticator, bool) {
if authenticators[name] == nil {
fcn, ok := authenticators.Get(name)
if !ok {
return nil, false
}
return authenticators[name](), true
return fcn(), true
}

// SetAccessKeyStorages will set the @fcn into map with this name
func SetAccessKeyStorages(name string, fcn func() filter.AccessKeyStorage) {
accessKeyStorages[name] = fcn
accessKeyStorages.Register(name, fcn)
}

// GetAccessKeyStorages finds the storage with the @name.
// Panic if not found
func GetAccessKeyStorages(name string) (filter.AccessKeyStorage, error) {
f := accessKeyStorages[name]
if f == nil {
f, ok := accessKeyStorages.Get(name)
if !ok {
return nil, errors.New("accessKeyStorages for " + name + " is not existing, make sure you have import the package.")
}
return f(), nil
Expand Down
9 changes: 5 additions & 4 deletions common/extension/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,19 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
)

var clusters = make(map[string]func() cluster.Cluster)
var clusters = NewRegistry[func() cluster.Cluster]("cluster")

// SetCluster sets the cluster fault-tolerant mode with @name
// For example: available/failfast/broadcast/failfast/failsafe/...
func SetCluster(name string, fcn func() cluster.Cluster) {
clusters[name] = fcn
clusters.Register(name, fcn)
}

// GetCluster finds the cluster fault-tolerant mode with @name
func GetCluster(name string) (cluster.Cluster, error) {
if clusters[name] == nil {
fcn, ok := clusters.Get(name)
if !ok {
return nil, errors.New(fmt.Sprintf(constant.NonImportErrorMsgFormat, constant.ClusterKeyFailover))
}
return clusters[name](), nil
return fcn(), nil
}
229 changes: 229 additions & 0 deletions common/extension/concurrency_guards_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package extension

import (
"container/list"
"sync"
"sync/atomic"
"testing"
)

import (
gxset "github.com/dubbogo/gost/container/set"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/common"
commonconfig "dubbo.apache.org/dubbo-go/v3/common/config"
"dubbo.apache.org/dubbo-go/v3/metadata/mapping"
"dubbo.apache.org/dubbo-go/v3/protocol/base"
"dubbo.apache.org/dubbo-go/v3/registry"
)

type mockDir struct{}

func (m *mockDir) GetURL() *common.URL { return &common.URL{} }
func (m *mockDir) IsAvailable() bool { return true }
func (m *mockDir) Destroy() {}

Check failure on line 47 in common/extension/concurrency_guards_test.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this function is empty or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=apache_dubbo-go&issues=AZz-pOxu6xdaFpLM1yP5&open=AZz-pOxu6xdaFpLM1yP5&pullRequest=3264
func (m *mockDir) List(invocation base.Invocation) []base.Invoker { return nil }
func (m *mockDir) Subscribe(url *common.URL) error { return nil }

type mockCustomizer struct{ priority int }

func (m mockCustomizer) GetPriority() int { return m.priority }
func (m mockCustomizer) Customize(instance registry.ServiceInstance) {}

Check failure on line 54 in common/extension/concurrency_guards_test.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this function is empty or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=apache_dubbo-go&issues=AZz-pOxu6xdaFpLM1yP6&open=AZz-pOxu6xdaFpLM1yP6&pullRequest=3264

type mockServiceNameMapping struct{}

func (m *mockServiceNameMapping) Map(url *common.URL) error { return nil }
func (m *mockServiceNameMapping) Get(url *common.URL, listener mapping.MappingListener) (*gxset.HashSet, error) {
return nil, nil
}
func (m *mockServiceNameMapping) Remove(url *common.URL) error { return nil }

type mockPostProcessor struct{}

func (m mockPostProcessor) PostProcessReferenceConfig(url *common.URL) {}

Check failure on line 66 in common/extension/concurrency_guards_test.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this function is empty or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=apache_dubbo-go&issues=AZz-pOxu6xdaFpLM1yP7&open=AZz-pOxu6xdaFpLM1yP7&pullRequest=3264
func (m mockPostProcessor) PostProcessServiceConfig(url *common.URL) {}

Check failure on line 67 in common/extension/concurrency_guards_test.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this function is empty or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=apache_dubbo-go&issues=AZz-pOxu6xdaFpLM1yP8&open=AZz-pOxu6xdaFpLM1yP8&pullRequest=3264

func TestGetAllCustomShutdownCallbacksReturnsCopy(t *testing.T) {
customShutdownCallbacksLock.Lock()
original := customShutdownCallbacks
customShutdownCallbacks = list.New()
customShutdownCallbacksLock.Unlock()

t.Cleanup(func() {
customShutdownCallbacksLock.Lock()
customShutdownCallbacks = original
customShutdownCallbacksLock.Unlock()
})

AddCustomShutdownCallback(func() {})

Check failure on line 81 in common/extension/concurrency_guards_test.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this function is empty or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=apache_dubbo-go&issues=AZz-pOxu6xdaFpLM1yP9&open=AZz-pOxu6xdaFpLM1yP9&pullRequest=3264
AddCustomShutdownCallback(func() {})

Check failure on line 82 in common/extension/concurrency_guards_test.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this function is empty or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=apache_dubbo-go&issues=AZz-pOxu6xdaFpLM1yP-&open=AZz-pOxu6xdaFpLM1yP-&pullRequest=3264

callbacks := GetAllCustomShutdownCallbacks()
assert.Len(t, asSlice(callbacks), 2)

callbacks.PushBack(func() {})

Check failure on line 87 in common/extension/concurrency_guards_test.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this function is empty or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=apache_dubbo-go&issues=AZz-pOxu6xdaFpLM1yP_&open=AZz-pOxu6xdaFpLM1yP_&pullRequest=3264
callbacksAgain := GetAllCustomShutdownCallbacks()
assert.Len(t, asSlice(callbacksAgain), 2)
}

func TestGetDirectoryInstanceUsesProtocolAndFallback(t *testing.T) {
originalDirectories := directories
directories = NewRegistry[registryDirectory]("registry directory test")
t.Cleanup(func() {
directories = originalDirectories
})

if oldDefault := defaultDirectory.Load(); oldDefault != nil {
t.Cleanup(func() { defaultDirectory.Store(oldDefault.(registryDirectory)) })
} else {
t.Cleanup(func() { defaultDirectory = atomic.Value{} })
}

defaultHit := 0
protocolHit := 0

SetDefaultRegistryDirectory(func(url *common.URL, reg registry.Registry) (directory.Directory, error) {
defaultHit++
return &mockDir{}, nil
})

SetDirectory("polaris", func(url *common.URL, reg registry.Registry) (directory.Directory, error) {
protocolHit++
return &mockDir{}, nil
})

_, err := GetDirectoryInstance(&common.URL{Protocol: "polaris"}, nil)
require.NoError(t, err)
assert.Equal(t, 1, protocolHit)
assert.Equal(t, 0, defaultHit)

_, err = GetDirectoryInstance(&common.URL{Protocol: ""}, nil)
require.NoError(t, err)
assert.Equal(t, 1, defaultHit)

_, err = GetDirectoryInstance(&common.URL{Protocol: "unknown"}, nil)
require.NoError(t, err)
assert.Equal(t, 2, defaultHit)
}

func TestCustomizersAreSortedAndReturnedAsCopy(t *testing.T) {
customizersLock.Lock()
original := customizers
customizers = make([]registry.ServiceInstanceCustomizer, 0, 8)
customizersLock.Unlock()

t.Cleanup(func() {
customizersLock.Lock()
customizers = original
customizersLock.Unlock()
})

AddCustomizers(mockCustomizer{priority: 20})
AddCustomizers(mockCustomizer{priority: 10})

got := GetCustomizers()
require.Len(t, got, 2)
assert.Equal(t, 10, got[0].GetPriority())
assert.Equal(t, 20, got[1].GetPriority())

_ = append(got, mockCustomizer{priority: 1})
assert.Len(t, GetCustomizers(), 2)
}

func TestGlobalServiceNameMappingCreator(t *testing.T) {
if old := globalNameMappingCreator.Load(); old != nil {
t.Cleanup(func() { globalNameMappingCreator.Store(old.(ServiceNameMappingCreator)) })
}

expected := &mockServiceNameMapping{}
SetGlobalServiceNameMapping(func() mapping.ServiceNameMapping {
return expected
})

got := GetGlobalServiceNameMapping()
assert.Same(t, expected, got)
}

func TestConfigPostProcessorRegistrySnapshot(t *testing.T) {
originalProcessors := processors
processors = NewRegistry[commonconfig.ConfigPostProcessor]("config post processor test")
t.Cleanup(func() {
processors = originalProcessors
})

SetConfigPostProcessor("p1", mockPostProcessor{})
SetConfigPostProcessor("p2", mockPostProcessor{})

assert.NotNil(t, GetConfigPostProcessor("p1"))
all := GetConfigPostProcessors()
assert.Len(t, all, 2)
}

func TestConcurrentCustomShutdownCallbacksAndCustomizers(t *testing.T) {
customShutdownCallbacksLock.Lock()
originalCallbacks := customShutdownCallbacks
customShutdownCallbacks = list.New()
customShutdownCallbacksLock.Unlock()

customizersLock.Lock()
originalCustomizers := customizers
customizers = make([]registry.ServiceInstanceCustomizer, 0, 8)
customizersLock.Unlock()

t.Cleanup(func() {
customShutdownCallbacksLock.Lock()
customShutdownCallbacks = originalCallbacks
customShutdownCallbacksLock.Unlock()

customizersLock.Lock()
customizers = originalCustomizers
customizersLock.Unlock()
})

var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go func(p int) {
defer wg.Done()
AddCustomShutdownCallback(func() {})

Check failure on line 211 in common/extension/concurrency_guards_test.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this function is empty or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=apache_dubbo-go&issues=AZz-pOxu6xdaFpLM1yQA&open=AZz-pOxu6xdaFpLM1yQA&pullRequest=3264
AddCustomizers(mockCustomizer{priority: p})
_ = GetAllCustomShutdownCallbacks()
_ = GetCustomizers()
}(i)
}
wg.Wait()

assert.Len(t, asSlice(GetAllCustomShutdownCallbacks()), 20)
assert.Len(t, GetCustomizers(), 20)
}

func asSlice(l *list.List) []any {
ret := make([]any, 0, l.Len())
for e := l.Front(); e != nil; e = e.Next() {
ret = append(ret, e.Value)
}
return ret
}
4 changes: 2 additions & 2 deletions common/extension/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package extension

var (
configs = map[string]Config{}
configs = NewRegistry[Config]("config")
)

type Config interface {
Prefix() string
}

func SetConfig(c Config) {
configs[c.Prefix()] = c
configs.Register(c.Prefix(), c)
}
8 changes: 4 additions & 4 deletions common/extension/config_center.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ import (
"dubbo.apache.org/dubbo-go/v3/config_center"
)

var configCenters = make(map[string]func(config *common.URL) (config_center.DynamicConfiguration, error))
var configCenters = NewRegistry[func(config *common.URL) (config_center.DynamicConfiguration, error)]("config center")

// SetConfigCenter sets the DynamicConfiguration with @name
func SetConfigCenter(name string, v func(*common.URL) (config_center.DynamicConfiguration, error)) {
configCenters[name] = v
configCenters.Register(name, v)
}

// GetConfigCenter finds the DynamicConfiguration with @name
func GetConfigCenter(name string, config *common.URL) (config_center.DynamicConfiguration, error) {
configCenterFactory := configCenters[name]
if configCenterFactory == nil {
configCenterFactory, ok := configCenters.Get(name)
if !ok {
return nil, errors.New("config center for " + name + " is not existing, make sure you have import the package.")
}
configCenter, err := configCenterFactory(config)
Expand Down
9 changes: 5 additions & 4 deletions common/extension/config_center_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@ import (
"dubbo.apache.org/dubbo-go/v3/config_center"
)

var configCenterFactories = make(map[string]func() config_center.DynamicConfigurationFactory)
var configCenterFactories = NewRegistry[func() config_center.DynamicConfigurationFactory]("config center")

// SetConfigCenterFactory sets the DynamicConfigurationFactory with @name
func SetConfigCenterFactory(name string, v func() config_center.DynamicConfigurationFactory) {
configCenterFactories[name] = v
configCenterFactories.Register(name, v)
}

// GetConfigCenterFactory finds the DynamicConfigurationFactory with @name
func GetConfigCenterFactory(name string) (config_center.DynamicConfigurationFactory, error) {
if configCenterFactories[name] == nil {
v, ok := configCenterFactories.Get(name)
if !ok {
return nil, errors.New("config center for " + name + " is not existing, make sure you have import the package.")
}
return configCenterFactories[name](), nil
return v(), nil
}
12 changes: 7 additions & 5 deletions common/extension/config_post_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,24 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/config"
)

var processors = make(map[string]config.ConfigPostProcessor)
var processors = NewRegistry[config.ConfigPostProcessor]("config post processor")

// SetConfigPostProcessor registers a ConfigPostProcessor with the given name.
func SetConfigPostProcessor(name string, processor config.ConfigPostProcessor) {
processors[name] = processor
processors.Register(name, processor)
}

// GetConfigPostProcessor finds a ConfigPostProcessor by name.
func GetConfigPostProcessor(name string) config.ConfigPostProcessor {
return processors[name]
v, _ := processors.Get(name)
return v
}

// GetConfigPostProcessors returns all registered instances of ConfigPostProcessor.
func GetConfigPostProcessors() []config.ConfigPostProcessor {
ret := make([]config.ConfigPostProcessor, 0, len(processors))
for _, v := range processors {
snapshot := processors.Snapshot()
ret := make([]config.ConfigPostProcessor, 0, len(snapshot))
for _, v := range snapshot {
ret = append(ret, v)
}
return ret
Expand Down
Loading
Loading