Skip to content

Commit f943aed

Browse files
committed
独立出事件类型处理
1 parent 4085c9b commit f943aed

File tree

10 files changed

+128
-135
lines changed

10 files changed

+128
-135
lines changed

bus/broker/kafka/kafka_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
)
1313

1414
var (
15-
sourceEvent = event.NewEvent()
15+
sourceEvent = event.NewDefaultEvent()
1616
)
1717

1818
func TestPub(t *testing.T) {

bus/broker/kafka/subscriber.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (s *Subscriber) ConsumeClaim(session sarama.ConsumerGroupSession, claim sar
128128
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
129129
// 具体消费消息
130130
for msg := range claim.Messages() {
131-
e := event.NewEvent()
131+
e := event.NewDefaultEvent()
132132
if err := json.Unmarshal(msg.Value, e); err != nil {
133133
s.l.Errorf("unmarshal data to event error, %s", err)
134134
continue

bus/broker/nats/nats.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func (b *Broker) Sub(topic string, h bus.EventHandler) error {
137137

138138
fn := func(msg *nats.Msg) {
139139
b.l.Debugf("receive an message from %s, data: %s", msg.Subject, string(msg.Data))
140-
e := event.NewEvent()
140+
e := event.NewDefaultEvent()
141141
if err := json.Unmarshal(msg.Data, e); err != nil {
142142
b.l.Errorf("unmarshal data to event error, %s", err)
143143
return

bus/broker/nats/nats_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ func TestPubSub(t *testing.T) {
1616
should.NoError(err)
1717

1818
b.Debug(zap.L().Named("Nats Bus"))
19-
sourceEvent := event.NewEvent()
19+
sourceEvent := event.NewDefaultEvent()
2020

2121
should.NoError(b.Connect())
2222
err = b.Sub("test", func(topic string, e *event.Event) error {

bus/event/event.go

Lines changed: 108 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,37 +2,77 @@ package event
22

33
import (
44
"context"
5+
"encoding/json"
6+
"fmt"
57

68
"github.com/rs/xid"
79

810
"github.com/infraboard/mcube/types/ftime"
911
)
1012

11-
// 事件主题定义
13+
// 事件主题定义(由事件类型确定)
1214
// 1. 资源变更事件 (变更成功和变更失败)
1315
// 2. 资源告警事件 ( 更加metric计算触发)
1416

15-
// NewEvent 实例
16-
func NewEvent() *Event {
17+
// NewOperateEvent 实例
18+
func NewOperateEvent(header *Header, e *OperateEvent) *Event {
1719
return &Event{
18-
ID: xid.New().String(),
19-
Time: ftime.Now(),
20-
Label: make(map[string]string),
20+
ID: xid.New().String(),
21+
Time: ftime.Now(),
22+
Type: OperateEventType,
23+
Header: header,
24+
Body: e,
25+
}
26+
}
27+
28+
// NewDefaultEvent todo
29+
func NewDefaultEvent() *Event {
30+
return &Event{
31+
Header: NewHeader(),
32+
Body: &json.RawMessage{},
2133
}
2234
}
2335

2436
// Event 事件数据结构
2537
type Event struct {
26-
ID string `bson:"_id" json:"id"` // 事件ID
27-
Time ftime.Time `bson:"time" json:"time"` // 事件发生时间(毫秒)
28-
Source string `bson:"source" json:"source"` // 事件来源, 比如cmdb
29-
Level Level `bson:"level" json:"level"` // 事件等级
30-
Label map[string]string `bson:"label" json:"label"` // 标签
31-
Meta Meta `bson:"meta" json:"meta"` // 事件的元数据
32-
Type Type `bson:"type" json:"type"` // 事件类型
33-
Body interface{} `bson:"body" json:"body"` // 事件数据
38+
ID string `bson:"_id" json:"id"` // 事件ID
39+
Time ftime.Time `bson:"time" json:"time"` // 事件发生时间(毫秒)
40+
Type Type `bson:"type" json:"type"` // 事件类型
41+
*Header `bson:",inline"` // 事件头
42+
Body interface{} `bson:"body" json:"body"` // 事件数据
43+
44+
ctx context.Context
45+
parsed bool
46+
}
47+
48+
// ParseBody todo
49+
func (e *Event) ParseBody() error {
50+
if e.parsed {
51+
return nil
52+
}
53+
54+
body, err := e.getBytesBody()
55+
if err != nil {
56+
return err
57+
}
3458

35-
ctx context.Context
59+
switch e.Type {
60+
case OperateEventType:
61+
e.Body, err = ParseOperateEventFromBytes(body)
62+
if err != nil {
63+
return err
64+
}
65+
case AlertEventType:
66+
e.Body, err = ParseAlertEventFromBytes(body)
67+
if err != nil {
68+
return err
69+
}
70+
default:
71+
return fmt.Errorf("unknown event type: %s", e.Type)
72+
}
73+
74+
e.parsed = true
75+
return nil
3676
}
3777

3878
// WithContext 添加上下文
@@ -45,13 +85,61 @@ func (e *Event) Context() context.Context {
4585
return e.ctx
4686
}
4787

88+
func (e *Event) getBytesBody() ([]byte, error) {
89+
switch v := e.Body.(type) {
90+
case []byte:
91+
return v, nil
92+
case json.RawMessage:
93+
return v, nil
94+
case *json.RawMessage:
95+
return *v, nil
96+
default:
97+
return nil, fmt.Errorf("body type is not []byte or json.RawMessage")
98+
}
99+
}
100+
101+
// NewHeader todo
102+
func NewHeader() *Header {
103+
return &Header{
104+
Label: make(map[string]string),
105+
}
106+
}
107+
108+
// Header todo
109+
type Header struct {
110+
Source string `bson:"source" json:"source"` // 事件来源, 比如cmdb
111+
Level Level `bson:"level" json:"level"` // 事件等级
112+
Label map[string]string `bson:"label" json:"label"` // 标签
113+
Meta Meta `bson:"meta" json:"meta"` // 事件的元数据
114+
}
115+
116+
// ParseOperateEventFromBytes todo
117+
func ParseOperateEventFromBytes(data []byte) (*OperateEvent, error) {
118+
oe := &OperateEvent{}
119+
if err := json.Unmarshal(data, oe); err != nil {
120+
return nil, err
121+
}
122+
return oe, nil
123+
}
124+
48125
// OperateEvent 事件具体数据
49126
type OperateEvent struct {
50-
ResourceType string `bson:"resource_type" json:"resource_type"` // 资源类型,
51-
ResourceUUID string `bson:"resource_uuid" json:"resource_uuid"` // 资源UUID
52-
ResourceName string `bson:"resource_name" json:"resource_name"` // 资源名称
53-
Action string `bson:"action" json:"action"` // 操作
54-
Data interface{} `bson:"data" json:"data,omitempty"` // 事件数据
127+
OperateSession string `bson:"operate_session" json:"operate_session"` // 回话ID
128+
OperateUser string `bson:"operate_user" json:"operate_user"` // 操作人
129+
ResourceType string `bson:"resource_type" json:"resource_type"` // 资源类型,
130+
ResourceUUID string `bson:"resource_uuid" json:"resource_uuid"` // 资源UUID
131+
ResourceName string `bson:"resource_name" json:"resource_name"` // 资源名称
132+
Action string `bson:"action" json:"action"` // 操作
133+
Data interface{} `bson:"data" json:"data,omitempty"` // 事件数据
134+
}
135+
136+
// ParseAlertEventFromBytes todo
137+
func ParseAlertEventFromBytes(data []byte) (*AlertEvent, error) {
138+
ae := &AlertEvent{}
139+
if err := json.Unmarshal(data, ae); err != nil {
140+
return nil, err
141+
}
142+
return ae, nil
55143
}
56144

57145
// AlertEvent 事件具体数据

bus/global.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,37 @@
11
package bus
22

3+
import (
4+
"fmt"
5+
6+
"github.com/infraboard/mcube/bus/event"
7+
)
8+
39
var (
410
publisher Publisher
511
subscriber Subscriber
612
)
713

8-
// P bus为全局对象
9-
func P() Publisher {
14+
// Pub bus为全局对象
15+
func Pub(e *event.Event) error {
1016
if publisher == nil {
11-
panic("publisher not initail")
17+
return fmt.Errorf("publisher not initail")
1218
}
13-
return publisher
19+
20+
return publisher.Pub(e.Type.String(), e)
1421
}
1522

1623
// SetPublisher 设置pub
1724
func SetPublisher(p Publisher) {
1825
publisher = p
1926
}
2027

21-
// S bus为全局对象
22-
func S() Subscriber {
28+
// Sub bus为全局对象
29+
func Sub(t event.Type, h EventHandler) error {
2330
if subscriber == nil {
24-
panic("subscriber not initial")
31+
return fmt.Errorf("subscriber not initial")
2532
}
26-
return subscriber
33+
34+
return subscriber.Sub(t.String(), h)
2735
}
2836

2937
// SetSubscriber 设置sub

http/response/event.go

Lines changed: 0 additions & 35 deletions
This file was deleted.

http/response/reporter.go

Lines changed: 0 additions & 47 deletions
This file was deleted.

http/response/send.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,5 @@ func Success(w http.ResponseWriter, data interface{}) {
8080

8181
w.WriteHeader(http.StatusOK)
8282
w.Write(respBytes)
83-
84-
// 发送事件
85-
if re, ok := data.(ResourceEvent); HasEventReporter() && ok {
86-
sendEvent(re)
87-
}
8883
return
8984
}

http/router/httprouter/httprouter_test.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"net/http/httptest"
66
"testing"
77

8-
broker "github.com/infraboard/mcube/bus/broker/mock"
98
"github.com/infraboard/mcube/exception"
109
"github.com/infraboard/mcube/http/context"
1110
"github.com/infraboard/mcube/http/label"
@@ -215,18 +214,3 @@ func TestAPIRootOrderOK(t *testing.T) {
215214
should.Equal("/test3", es.Items[2].Path)
216215
}
217216
}
218-
219-
func TestResourceEventOk(t *testing.T) {
220-
should := assert.New(t)
221-
222-
response.SetEventReporter(broker.NewBroker())
223-
224-
r := httprouter.New()
225-
req, _ := http.NewRequest("GET", "/", nil)
226-
w := httptest.NewRecorder()
227-
228-
r.Handle("GET", "/", ResourceEventHandler)
229-
r.ServeHTTP(w, req)
230-
231-
should.Equal(200, w.Code)
232-
}

0 commit comments

Comments
 (0)