Skip to content

Commit d73d736

Browse files
committed
feat(backend): db-event-consumer 启用只用配置模式 TencentBlueKing#16453
1 parent bcbf588 commit d73d736

File tree

10 files changed

+54
-43
lines changed

10 files changed

+54
-43
lines changed

dbm-services/common/db-config/internal/api/config_base.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type BKBizIDDef struct {
1616
// RequestType TODO
1717
type RequestType struct {
1818
// 配置文件修改动作的请求类型,`SaveOnly`: 仅保存, `SaveAndPublish`保存并发布
19-
ReqType string `json:"req_type" form:"req_type" validate:"required,enums" enums:"SaveOnly,SaveAndPublish"`
19+
ReqType string `json:"req_type" form:"req_type" validate:"enums" enums:"SaveOnly,SaveAndPublish"`
2020
}
2121

2222
// OperationType TODO

dbm-services/common/db-config/internal/handler/simple/config_item.go

Lines changed: 15 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package simple
22

33
import (
4+
"fmt"
5+
46
"bk-dbconfig/internal/api"
57
"bk-dbconfig/internal/handler"
68
"bk-dbconfig/internal/service/simpleconfig"
79
"bk-dbconfig/pkg/constvar"
810
"bk-dbconfig/pkg/core/logger"
911
"bk-dbconfig/pkg/util"
10-
"fmt"
1112

1213
"github.com/gin-gonic/gin"
1314
)
@@ -156,10 +157,20 @@ func (cf *Config) UpdateConfigFileItems(ctx *gin.Context) {
156157
handler.SendResponse(ctx, err, nil)
157158
return
158159
}
159-
if err = simpleconfig.CheckValidConfType(r.ConfFileInfo.Namespace, r.ConfFileInfo.ConfType,
160-
r.ConfFileInfo.ConfFile, r.LevelName, 1); err != nil {
160+
if versioned, err := simpleconfig.CheckValidConfFile(r.ConfFileInfo.Namespace, r.ConfFileInfo.ConfType,
161+
r.ConfFileInfo.ConfFile, r.LevelName); err != nil {
161162
handler.SendResponse(ctx, err, nil)
162163
return
164+
} else if versioned != "" {
165+
r = api.UpsertConfItemsReq{
166+
RequestType: api.RequestType{ReqType: constvar.MethodGenAndPublish},
167+
SaveConfItemsReq: r.SaveConfItemsReq,
168+
}
169+
} else {
170+
r = api.UpsertConfItemsReq{
171+
RequestType: api.RequestType{ReqType: constvar.MethodSave},
172+
SaveConfItemsReq: r.SaveConfItemsReq,
173+
}
163174
}
164175
opUser := api.GetHeaderUsername(ctx.GetHeader(constvar.BKApiAuthorization))
165176
if resp, err = simpleconfig.UpdateConfigFileItems(&r, opUser); err != nil {
@@ -184,33 +195,5 @@ func (cf *Config) UpdateConfigFileItems(ctx *gin.Context) {
184195
// @Failure 400 {object} api.HTTPClientErrResp
185196
// @Router /bkconfig/v1/confitem/save [post]
186197
func (cf *Config) SaveConfigFileItems(ctx *gin.Context) {
187-
var r api.SaveConfItemsReq
188-
var resp *api.UpsertConfItemsResp
189-
var err error
190-
defer util.LoggerErrorStack(logger.Error, err)
191-
192-
if err = ctx.BindJSON(&r); err != nil {
193-
handler.SendResponse(ctx, err, nil)
194-
return
195-
}
196-
if err = r.Validate(); err != nil {
197-
handler.SendResponse(ctx, err, nil)
198-
return
199-
}
200-
if err = simpleconfig.CheckValidConfType(r.ConfFileInfo.Namespace, r.ConfFileInfo.ConfType,
201-
r.ConfFileInfo.ConfFile, r.LevelName, 0); err != nil {
202-
handler.SendResponse(ctx, err, nil)
203-
return
204-
}
205-
r2 := api.UpsertConfItemsReq{
206-
RequestType: api.RequestType{ReqType: constvar.MethodSave},
207-
SaveConfItemsReq: r,
208-
}
209-
opUser := api.GetHeaderUsername(ctx.GetHeader(constvar.BKApiAuthorization))
210-
if resp, err = simpleconfig.UpdateConfigFileItems(&r2, opUser); err != nil {
211-
handler.SendResponse(ctx, err, nil)
212-
return
213-
} else {
214-
handler.SendResponse(ctx, nil, resp)
215-
}
198+
cf.UpdateConfigFileItems(ctx)
216199
}

dbm-services/common/db-config/internal/service/simpleconfig/config_meta.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,23 @@ func QueryConfigTypeInfo(r *api.QueryConfigTypeReq) (*api.QueryConfigTypeResp, e
149149
return resp, nil
150150
}
151151

152+
func CheckValidConfFile(namespace, confType, confFile, levelName string) (string, error) {
153+
msg := fmt.Sprintf("namespace=%s, conf_type=%s, conf_file=%s", namespace, confType, confFile)
154+
fd := api.BaseConfFileDef{Namespace: namespace, ConfType: confType, ConfFile: confFile}
155+
if f, e := model.CacheGetConfigFile(fd); e != nil {
156+
return "", errors.Wrapf(errno.ErrConfFile, "ErrFound:%s for %s", e.Error(), msg)
157+
} else if f == nil {
158+
return "", errors.Wrapf(errno.ErrNamespaceType, msg)
159+
} else {
160+
if levelName != "" {
161+
if !util.StringsHas(f.LevelNameList, levelName) {
162+
return "", errors.Wrapf(errno.ErrLevelName, "allowed [%s] but given %s", f.LevelNames, levelName)
163+
}
164+
}
165+
return f.LevelVersioned, nil
166+
}
167+
}
168+
152169
// CheckValidConfType 检查 namespace, conf_type, conf_file, level_name 的合法性
153170
// 如果 level_name = "" 不检查 level_name
154171
// 如果 needVersioned >=2 不做版本化相关检查

dbm-services/common/db-event-consumer/README.md

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,26 @@ db-event-consumer -c config.yaml
4141

4242
### 2. 非严格 schema
4343
自动根据 kafka 消息的内容,使用 `map[string]interface{}` 来反序列化,然后直接生成 insert 语句。
44+
这种方式不需要在 db-event-consumer 里定义 model 的代码,直接启动时拉起配置即可。
4445

45-
这种方式需要提前在目标 datasource 创建好表结构,如果表结构字段缺少,会导致入库失败。
46+
这种方式需要提前在目标 datasource 创建好表结构,或者在其他地方有 migrate 来维护表结构和索引约束。
47+
如果表结构字段缺少,会导致入库失败。
4648

4749
```
4850
- topic: "mysql_binlog_result"
4951
model_table: "tb_mysql_binlog_result"
5052
strict_schema: false
51-
datasource: "prod_bk_dbm_report"
53+
skip_migrate_schema: true
54+
write_mode: upsert
55+
datasource: "prod_bk_dbm_report_raw"
56+
omit_fields: []
5257
```
53-
`strict_schema=false` 时,model_table 的值用于拼成 insert table name.
5458

55-
非严格 schema 方式可以快速验证入库效果,不用代码定义 model 。但不方便在各个环境环境移植,不推荐。
59+
- `strict_schema=false` 时,model_table 的值用于拼成 insert table name.
60+
- `write_mode=upsert`, 当遇到冲突时,使用何种处理方式,`replace`,`upsert`,`insert_ignore`, `insert`
61+
- `omit_fields`, 忽略哪些字段,不拼在 insert values 里
62+
63+
非严格 schema 方式可以快速验证入库效果,不用代码定义 model。如果你需要对上报的内容做重组,二次加工后再入库,则不适合这个方式。
5664

5765
## 自定义入库方式
5866
可以自定义 schema migrate 方式和数据入库方式,可以参考 `MysqlBackupResultModel`

dbm-services/common/db-event-consumer/pkg/config/sinker_config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type SinkerConfig struct {
3333
FetchMinBytes int32 `yaml:"fetch_min_bytes"`
3434
// SinkBatchSize 一次 fetch 可能有多条记录,sink_batch_size 控制多少次 fetch 合并成一次 sink. default 1
3535
SinkBatchSize int `yaml:"sink_batch_size"`
36-
// WriteMode default is insert_ignore, allowed: insert_ignore, insert, upsert
36+
// WriteMode default is upsert, allowed: insert_ignore, insert, upsert
3737
WriteMode string `yaml:"write_mode"`
3838
Datasource string `yaml:"datasource"`
3939
}

dbm-services/common/db-event-consumer/pkg/consumer/consumer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type AnySinker struct {
3434
strictSchema bool
3535
}
3636

37+
// Setup run default migrate or custom migrate
3738
func (s *AnySinker) Setup(sarama.ConsumerGroupSession) error {
3839
var err error
3940
if s.Sinker.RuntimeConfig.SkipMigrateSchema || !s.strictSchema {

dbm-services/common/db-event-consumer/pkg/consumer/new_consumer_group.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func (s *Sinker) NewSinkHandler() (sarama.ConsumerGroupHandler, error) {
3737
s.DSWriter.SetWriteMode(s.RuntimeConfig.WriteMode)
3838
if ok {
3939
if !*s.RuntimeConfig.StrictSchema {
40-
return nil, fmt.Errorf("registerd table[%s] need strict_schema=true", s.RuntimeConfig.ModelTable)
40+
return nil, fmt.Errorf("registered table[%s] need strict_schema=true", s.RuntimeConfig.ModelTable)
4141
}
4242
modelType := reflect.TypeOf(modelTable).Elem()
4343
if modelType.Kind() == reflect.Ptr {

dbm-services/common/db-event-consumer/pkg/sinker/mysql_raw_writer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,10 @@ func (w *MysqlRawWriter) WriteBatch(table interface{}, models interface{}) error
7979
}
8080
if w.writeMode == cst.ModeUpsert || w.writeMode == cst.ModeReplace {
8181
_, err = w.session.Replace()
82-
} else {
82+
} else if w.writeMode == cst.ModeInsertIgnore {
8383
_, err = w.session.InsertIgnore()
84+
} else {
85+
_, err = w.session.Insert()
8486
}
8587
return err
8688
}

dbm-ui/backend/components/dbconfig/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,12 @@ def __init__(self):
5656
self.save_conf_item = self.generate_data_api(
5757
method="POST",
5858
url="bkconfig/v1/confitem/save",
59-
description=_("保存不可变配置(如字符集等)"),
59+
description=_("保存无版本概念的配置(如部署,备份,监控),现在与 upsert 接口完全相同了"),
6060
)
6161
self.upsert_conf_item = self.generate_data_api(
6262
method="POST",
6363
url="bkconfig/v1/confitem/upsert",
64-
description=_("编辑发布层级(业务、集群、模块)配置"),
64+
description=_("保存有版本概念的配置(如参数)"),
6565
)
6666
self.batch_get_conf_item = self.generate_data_api(
6767
method="POST",

helm-charts/bk-dbm/templates/configmaps/db-event-consumer-configmap.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ data:
4545
password: "{{ $reportDB.password }}"
4646
database: "{{ $reportDB.name }}"
4747
charset: utf8
48-
- name: prod_bk_dbm_report2
48+
- name: prod_bk_dbm_report_raw
4949
type: mysql_raw
5050
dsn:
5151
address: "{{ $reportDB.host }}:{{ $reportDB.port }}"

0 commit comments

Comments
 (0)