Skip to content

Commit 1110af9

Browse files
authored
feat: config center (#2997)
* chore: config * chore: config * chore: config * chore: config * chore: config * feat: config * fix: config * fix: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * feat: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config * fix: config
1 parent 66abd9e commit 1110af9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1039
-278
lines changed

config/openim-push.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ prometheus:
2121
maxConcurrentWorkers: 3
2222
#Use geTui for offline push notifications, or choose fcm or jpns; corresponding configuration settings must be specified.
2323
enable:
24-
geTui:
24+
getui:
2525
pushUrl: https://restapi.getui.com/v2/$appId
2626
masterSecret:
2727
appKey:

docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ services:
8282
- ETCD_INITIAL_CLUSTER=s1=http://0.0.0.0:2380
8383
- ETCD_INITIAL_CLUSTER_TOKEN=tkn
8484
- ETCD_INITIAL_CLUSTER_STATE=new
85+
volumes:
86+
- "${DATA_DIR}/components/etcd:/etcd-data"
8587
restart: always
8688
networks:
8789
- openim

internal/api/config_manager.go

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
package api
2+
3+
import (
4+
"encoding/json"
5+
"reflect"
6+
"strconv"
7+
"time"
8+
9+
"github.com/gin-gonic/gin"
10+
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
11+
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
12+
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
13+
"github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
14+
"github.com/openimsdk/open-im-server/v3/version"
15+
"github.com/openimsdk/tools/apiresp"
16+
"github.com/openimsdk/tools/errs"
17+
"github.com/openimsdk/tools/log"
18+
"github.com/openimsdk/tools/utils/runtimeenv"
19+
clientv3 "go.etcd.io/etcd/client/v3"
20+
)
21+
22+
type ConfigManager struct {
23+
imAdminUserID []string
24+
config *config.AllConfig
25+
client *clientv3.Client
26+
configPath string
27+
runtimeEnv string
28+
}
29+
30+
func NewConfigManager(IMAdminUserID []string, cfg *config.AllConfig, client *clientv3.Client, configPath string, runtimeEnv string) *ConfigManager {
31+
return &ConfigManager{
32+
imAdminUserID: IMAdminUserID,
33+
config: cfg,
34+
client: client,
35+
configPath: configPath,
36+
runtimeEnv: runtimeEnv,
37+
}
38+
}
39+
40+
func (cm *ConfigManager) CheckAdmin(c *gin.Context) {
41+
if err := authverify.CheckAdmin(c, cm.imAdminUserID); err != nil {
42+
apiresp.GinError(c, err)
43+
c.Abort()
44+
}
45+
}
46+
47+
func (cm *ConfigManager) GetConfig(c *gin.Context) {
48+
var req apistruct.GetConfigReq
49+
if err := c.BindJSON(&req); err != nil {
50+
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
51+
return
52+
}
53+
conf := cm.config.Name2Config(req.ConfigName)
54+
if conf == nil {
55+
apiresp.GinError(c, errs.ErrArgs.WithDetail("config name not found").Wrap())
56+
return
57+
}
58+
b, err := json.Marshal(conf)
59+
if err != nil {
60+
apiresp.GinError(c, err)
61+
return
62+
}
63+
apiresp.GinSuccess(c, string(b))
64+
}
65+
66+
func (cm *ConfigManager) GetConfigList(c *gin.Context) {
67+
var resp apistruct.GetConfigListResp
68+
resp.ConfigNames = cm.config.GetConfigNames()
69+
resp.Environment = runtimeenv.PrintRuntimeEnvironment()
70+
resp.Version = version.Version
71+
72+
apiresp.GinSuccess(c, resp)
73+
}
74+
75+
func (cm *ConfigManager) SetConfig(c *gin.Context) {
76+
if cm.config.Discovery.Enable != config.ETCD {
77+
apiresp.GinError(c, errs.New("only etcd support set config").Wrap())
78+
return
79+
}
80+
var req apistruct.SetConfigReq
81+
if err := c.BindJSON(&req); err != nil {
82+
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
83+
return
84+
}
85+
var err error
86+
switch req.ConfigName {
87+
case cm.config.Discovery.GetConfigFileName():
88+
err = compareAndSave[config.Discovery](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
89+
case cm.config.Kafka.GetConfigFileName():
90+
err = compareAndSave[config.Kafka](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
91+
case cm.config.LocalCache.GetConfigFileName():
92+
err = compareAndSave[config.LocalCache](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
93+
case cm.config.Log.GetConfigFileName():
94+
err = compareAndSave[config.Log](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
95+
case cm.config.Minio.GetConfigFileName():
96+
err = compareAndSave[config.Minio](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
97+
case cm.config.Mongo.GetConfigFileName():
98+
err = compareAndSave[config.Mongo](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
99+
case cm.config.Notification.GetConfigFileName():
100+
err = compareAndSave[config.Notification](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
101+
case cm.config.API.GetConfigFileName():
102+
err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
103+
case cm.config.CronTask.GetConfigFileName():
104+
err = compareAndSave[config.CronTask](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
105+
case cm.config.MsgGateway.GetConfigFileName():
106+
err = compareAndSave[config.MsgGateway](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
107+
case cm.config.MsgTransfer.GetConfigFileName():
108+
err = compareAndSave[config.MsgTransfer](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
109+
case cm.config.Push.GetConfigFileName():
110+
err = compareAndSave[config.Push](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
111+
case cm.config.Auth.GetConfigFileName():
112+
err = compareAndSave[config.Auth](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
113+
case cm.config.Conversation.GetConfigFileName():
114+
err = compareAndSave[config.Conversation](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
115+
case cm.config.Friend.GetConfigFileName():
116+
err = compareAndSave[config.Friend](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
117+
case cm.config.Group.GetConfigFileName():
118+
err = compareAndSave[config.Group](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
119+
case cm.config.Msg.GetConfigFileName():
120+
err = compareAndSave[config.Msg](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
121+
case cm.config.Third.GetConfigFileName():
122+
err = compareAndSave[config.Third](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
123+
case cm.config.User.GetConfigFileName():
124+
err = compareAndSave[config.User](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
125+
case cm.config.Redis.GetConfigFileName():
126+
err = compareAndSave[config.Redis](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
127+
case cm.config.Share.GetConfigFileName():
128+
err = compareAndSave[config.Share](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
129+
case cm.config.Webhooks.GetConfigFileName():
130+
err = compareAndSave[config.Webhooks](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
131+
default:
132+
apiresp.GinError(c, errs.ErrArgs.Wrap())
133+
return
134+
}
135+
if err != nil {
136+
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
137+
return
138+
}
139+
apiresp.GinSuccess(c, nil)
140+
}
141+
142+
func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, client *clientv3.Client) error {
143+
conf := new(T)
144+
err := json.Unmarshal([]byte(req.Data), &conf)
145+
if err != nil {
146+
return errs.ErrArgs.WithDetail(err.Error()).Wrap()
147+
}
148+
eq := reflect.DeepEqual(old, conf)
149+
if eq {
150+
return nil
151+
}
152+
data, err := json.Marshal(conf)
153+
if err != nil {
154+
return errs.ErrArgs.WithDetail(err.Error()).Wrap()
155+
}
156+
_, err = client.Put(c, etcd.BuildKey(req.ConfigName), string(data))
157+
if err != nil {
158+
return errs.WrapMsg(err, "save to etcd failed")
159+
}
160+
return nil
161+
}
162+
163+
func (cm *ConfigManager) ResetConfig(c *gin.Context) {
164+
go cm.resetConfig(c)
165+
apiresp.GinSuccess(c, nil)
166+
}
167+
168+
func (cm *ConfigManager) resetConfig(c *gin.Context) {
169+
txn := cm.client.Txn(c)
170+
type initConf struct {
171+
old any
172+
new any
173+
isChanged bool
174+
}
175+
configMap := map[string]*initConf{
176+
cm.config.Discovery.GetConfigFileName(): {old: &cm.config.Discovery, new: new(config.Discovery)},
177+
cm.config.Kafka.GetConfigFileName(): {old: &cm.config.Kafka, new: new(config.Kafka)},
178+
cm.config.LocalCache.GetConfigFileName(): {old: &cm.config.LocalCache, new: new(config.LocalCache)},
179+
cm.config.Log.GetConfigFileName(): {old: &cm.config.Log, new: new(config.Log)},
180+
cm.config.Minio.GetConfigFileName(): {old: &cm.config.Minio, new: new(config.Minio)},
181+
cm.config.Mongo.GetConfigFileName(): {old: &cm.config.Mongo, new: new(config.Mongo)},
182+
cm.config.Notification.GetConfigFileName(): {old: &cm.config.Notification, new: new(config.Notification)},
183+
cm.config.API.GetConfigFileName(): {old: &cm.config.API, new: new(config.API)},
184+
cm.config.CronTask.GetConfigFileName(): {old: &cm.config.CronTask, new: new(config.CronTask)},
185+
cm.config.MsgGateway.GetConfigFileName(): {old: &cm.config.MsgGateway, new: new(config.MsgGateway)},
186+
cm.config.MsgTransfer.GetConfigFileName(): {old: &cm.config.MsgTransfer, new: new(config.MsgTransfer)},
187+
cm.config.Push.GetConfigFileName(): {old: &cm.config.Push, new: new(config.Push)},
188+
cm.config.Auth.GetConfigFileName(): {old: &cm.config.Auth, new: new(config.Auth)},
189+
cm.config.Conversation.GetConfigFileName(): {old: &cm.config.Conversation, new: new(config.Conversation)},
190+
cm.config.Friend.GetConfigFileName(): {old: &cm.config.Friend, new: new(config.Friend)},
191+
cm.config.Group.GetConfigFileName(): {old: &cm.config.Group, new: new(config.Group)},
192+
cm.config.Msg.GetConfigFileName(): {old: &cm.config.Msg, new: new(config.Msg)},
193+
cm.config.Third.GetConfigFileName(): {old: &cm.config.Third, new: new(config.Third)},
194+
cm.config.User.GetConfigFileName(): {old: &cm.config.User, new: new(config.User)},
195+
cm.config.Redis.GetConfigFileName(): {old: &cm.config.Redis, new: new(config.Redis)},
196+
cm.config.Share.GetConfigFileName(): {old: &cm.config.Share, new: new(config.Share)},
197+
cm.config.Webhooks.GetConfigFileName(): {old: &cm.config.Webhooks, new: new(config.Webhooks)},
198+
}
199+
200+
changedKeys := make([]string, 0, len(configMap))
201+
for k, v := range configMap {
202+
err := config.Load(
203+
cm.configPath,
204+
k,
205+
config.EnvPrefixMap[k],
206+
cm.runtimeEnv,
207+
v.new,
208+
)
209+
if err != nil {
210+
log.ZError(c, "load config failed", err)
211+
continue
212+
}
213+
v.isChanged = reflect.DeepEqual(v.old, v.new)
214+
if !v.isChanged {
215+
changedKeys = append(changedKeys, k)
216+
}
217+
}
218+
219+
ops := make([]clientv3.Op, 0)
220+
for _, k := range changedKeys {
221+
data, err := json.Marshal(configMap[k].new)
222+
if err != nil {
223+
log.ZError(c, "marshal config failed", err)
224+
continue
225+
}
226+
ops = append(ops, clientv3.OpPut(etcd.BuildKey(k), string(data)))
227+
}
228+
if len(ops) > 0 {
229+
txn.Then(ops...)
230+
_, err := txn.Commit()
231+
if err != nil {
232+
log.ZError(c, "commit etcd txn failed", err)
233+
return
234+
}
235+
}
236+
}
237+
238+
func (cm *ConfigManager) Restart(c *gin.Context) {
239+
go cm.restart(c)
240+
apiresp.GinSuccess(c, nil)
241+
}
242+
243+
func (cm *ConfigManager) restart(c *gin.Context) {
244+
time.Sleep(time.Millisecond * 200) // wait for Restart http call return
245+
t := time.Now().Unix()
246+
_, err := cm.client.Put(c, etcd.BuildKey(etcd.RestartKey), strconv.Itoa(int(t)))
247+
if err != nil {
248+
log.ZError(c, "restart etcd put key failed", err)
249+
}
250+
}

internal/api/init.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ import (
2727
"time"
2828

2929
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
30-
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
30+
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery"
31+
disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
3132
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
3233
"github.com/openimsdk/tools/discovery/etcd"
3334
"github.com/openimsdk/tools/errs"
@@ -43,11 +44,10 @@ import (
4344
)
4445

4546
type Config struct {
46-
API conf.API
47-
Share conf.Share
48-
Discovery conf.Discovery
47+
*conf.AllConfig
4948

5049
RuntimeEnv string
50+
ConfigPath string
5151
}
5252

5353
func Start(ctx context.Context, index int, config *Config) error {
@@ -139,22 +139,33 @@ func Start(ctx context.Context, index int, config *Config) error {
139139
if err != nil && !errors.Is(err, http.ErrServerClosed) {
140140
netErr = errs.WrapMsg(err, fmt.Sprintf("api start err: %s", server.Addr))
141141
netDone <- struct{}{}
142-
143142
}
144143
}()
145144

145+
if config.Discovery.Enable == conf.ETCD {
146+
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames())
147+
cm.Watch(ctx)
148+
}
149+
146150
sigs := make(chan os.Signal, 1)
147151
signal.Notify(sigs, syscall.SIGTERM)
148152

149-
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
150-
defer cancel()
151-
select {
152-
case <-sigs:
153-
program.SIGTERMExit()
153+
shutdown := func() error {
154+
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
155+
defer cancel()
154156
err := server.Shutdown(ctx)
155157
if err != nil {
156158
return errs.WrapMsg(err, "shutdown err")
157159
}
160+
return nil
161+
}
162+
disetcd.RegisterShutDown(shutdown)
163+
select {
164+
case <-sigs:
165+
program.SIGTERMExit()
166+
if err := shutdown(); err != nil {
167+
return err
168+
}
158169
case <-netDone:
159170
close(netDone)
160171
return netErr

0 commit comments

Comments
 (0)