Skip to content

Commit 7be5465

Browse files
fireworkmarkscxt90730
authored andcommitted
Kafka plugin and bugs fixed (#242)
* Turn kafka into a plugin * Re-modification of CopyObject and bug fixes for ListObject
1 parent 26ce3b1 commit 7be5465

31 files changed

+424
-566
lines changed

.travis.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,6 @@ script:
3737
- go test -v
3838
- popd
3939
- cat lc.log
40-
- pushd messagebus/tests
41-
- go test -check.vv
42-
- popd
4340

4441
before_deploy:
4542
- echo "prepare package"

api/access-log-handler.go

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,8 @@ import (
77
"time"
88

99
"github.com/journeymidnight/yig/helper"
10-
bus "github.com/journeymidnight/yig/messagebus"
11-
"github.com/journeymidnight/yig/messagebus/types"
1210
"github.com/journeymidnight/yig/meta"
11+
bus "github.com/journeymidnight/yig/mq"
1312
)
1413

1514
type ResponseRecorder struct {
@@ -56,15 +55,12 @@ func (a AccessLogHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
5655
response := newReplacer.Replace(a.format)
5756

5857
helper.AccessLogger.Println(response)
59-
// send the entries in access logger to message bus.
58+
// send the entries in access logger to message queue.
6059
elems := newReplacer.GetReplacedValues()
6160
a.notify(elems)
6261
}
6362

6463
func (a AccessLogHandler) notify(elems map[string]string) {
65-
if !helper.CONFIG.MsgBus.Enabled {
66-
return
67-
}
6864
if len(elems) == 0 {
6965
return
7066
}
@@ -74,29 +70,14 @@ func (a AccessLogHandler) notify(elems map[string]string) {
7470
return
7571
}
7672

77-
sender, err := bus.GetMessageSender()
78-
if err != nil {
79-
helper.Logger.Error("Failed to get message bus sender, err:", err)
80-
return
81-
}
82-
83-
// send the message to message bus async.
84-
// don't set the ErrChan.
85-
msg := &types.Message{
86-
Topic: helper.CONFIG.MsgBus.Topic,
87-
Key: "",
88-
ErrChan: nil,
89-
Value: val,
90-
}
91-
92-
err = sender.AsyncSend(msg)
73+
err = bus.MsgSender.AsyncSend(val)
9374
if err != nil {
9475
helper.Logger.Error(
95-
fmt.Sprintf("Failed to send message [%v] to message bus, err: %v",
76+
fmt.Sprintf("Failed to send message [%v] to message queue, err: %v",
9677
elems, err))
9778
return
9879
}
99-
helper.Logger.Info(fmt.Sprintf("Succeed to send message [%v] to message bus.",
80+
helper.Logger.Info(fmt.Sprintf("Succeed to send message [%v] to message queue.",
10081
elems))
10182
}
10283

api/object-handlers.go

Lines changed: 40 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -492,25 +492,52 @@ func (api ObjectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
492492
return
493493
}
494494

495+
// TODO: In a versioning-enabled bucket, you cannot change the storage class
496+
// of a specific version of an object. When you copy it, Amazon S3 gives it
497+
// a new version ID.
498+
storageClassStr := r.Header.Get("X-Amz-Storage-Class")
499+
var targetStorageClass meta.StorageClass
500+
if storageClassStr != "" {
501+
helper.Logger.Info("Get storage class header:", storageClassStr)
502+
targetStorageClass, err = meta.MatchStorageClassIndex(storageClassStr)
503+
if err != nil {
504+
WriteErrorResponse(w, r, err)
505+
return
506+
}
507+
} else {
508+
targetStorageClass = sourceObject.StorageClass
509+
}
510+
if targetStorageClass == meta.ObjectStorageClassGlacier || targetStorageClass == meta.ObjectStorageClassDeepArchive {
511+
WriteErrorResponse(w, r, ErrInvalidCopySourceStorageClass)
512+
return
513+
}
514+
495515
// maximum Upload size for object in a single CopyObject operation.
496516
if isMaxObjectSize(sourceObject.Size) {
497517
WriteErrorResponseWithResource(w, r, ErrEntityTooLarge, copySource)
498518
return
499519
}
500520

521+
var isMetadataOnly bool
522+
if sourceBucketName == targetBucketName && sourceObjectName == targetObjectName {
523+
isMetadataOnly = true
524+
}
525+
501526
pipeReader, pipeWriter := io.Pipe()
502-
go func() {
503-
startOffset := int64(0) // Read the whole file.
504-
// Get the object.
505-
err = api.ObjectAPI.GetObject(sourceObject, startOffset, sourceObject.Size,
506-
pipeWriter, sseRequest)
507-
if err != nil {
508-
logger.Error("Unable to read an object:", err)
509-
pipeWriter.CloseWithError(err)
510-
return
511-
}
512-
pipeWriter.Close()
513-
}()
527+
if !isMetadataOnly {
528+
go func() {
529+
startOffset := int64(0) // Read the whole file.
530+
// Get the object.
531+
err = api.ObjectAPI.GetObject(sourceObject, startOffset, sourceObject.Size,
532+
pipeWriter, sseRequest)
533+
if err != nil {
534+
logger.Error("Unable to read an object:", err)
535+
pipeWriter.CloseWithError(err)
536+
return
537+
}
538+
pipeWriter.Close()
539+
}()
540+
}
514541

515542
targetACL, err := getAclFromHeader(r.Header)
516543
if err != nil {
@@ -527,55 +554,25 @@ func (api ObjectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
527554
targetObject.Etag = sourceObject.Etag
528555
targetObject.Parts = sourceObject.Parts
529556
targetObject.Type = sourceObject.Type
557+
targetObject.StorageClass = targetStorageClass
530558

531559
directive := r.Header.Get("X-Amz-Metadata-Directive")
532560
if directive == "COPY" || directive == "" {
533561
targetObject.CustomAttributes = sourceObject.CustomAttributes
534-
targetObject.StorageClass = sourceObject.StorageClass
535562
targetObject.ContentType = sourceObject.ContentType
536563
} else if directive == "REPLACE" {
537-
// TODO: In a versioning-enabled bucket, you cannot change the storage class
538-
// of a specific version of an object. When you copy it, Amazon S3 gives it
539-
// a new version ID.
540-
storageClassFromHeader, err := getStorageClassFromHeader(r)
541-
if err != nil {
542-
WriteErrorResponse(w, r, err)
543-
return
544-
}
545-
if storageClassFromHeader == meta.ObjectStorageClassGlacier || storageClassFromHeader == meta.ObjectStorageClassDeepArchive {
546-
WriteErrorResponse(w, r, ErrInvalidCopySourceStorageClass)
547-
return
548-
}
549564
newMetadata := extractMetadataFromHeader(r.Header)
550565
if c, ok := newMetadata["content-type"]; ok {
551566
targetObject.ContentType = c
552567
} else {
553568
targetObject.ContentType = sourceObject.ContentType
554569
}
555570
targetObject.CustomAttributes = newMetadata
556-
targetObject.StorageClass = storageClassFromHeader
557571
} else {
558572
WriteErrorResponse(w, r, ErrInvalidCopyRequest)
559573
return
560574
}
561575

562-
var isMetadataOnly bool
563-
if sourceBucketName == targetBucketName && sourceObjectName == targetObjectName {
564-
isMetadataOnly = true
565-
}
566-
567-
// Check if x-amz-metadata-directive was not set to REPLACE and source,
568-
// desination are same objects. Apply this restriction also when
569-
// metadataOnly is true indicating that we are not overwriting the object.
570-
// if encryption is enabled we do not need explicit "REPLACE" metadata to
571-
// be enabled as well - this is to allow for key-rotation.
572-
// TODO: something need to Encrypted with SSE
573-
//if directive != "REPLACE" && isMetadataOnly && !IsEncrypted(targetObject.CustomAttributes) {
574-
if directive != "REPLACE" && isMetadataOnly {
575-
WriteErrorResponse(w, r, ErrInvalidCopyRequestWithSameObject)
576-
return
577-
}
578-
579576
// Create the object.
580577
result, err := api.ObjectAPI.CopyObject(reqCtx, targetObject, pipeReader, credential, sseRequest, isMetadataOnly)
581578
if err != nil {

collector.go

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -55,24 +55,28 @@ func (c *Metrics) Collect(ch chan<- prometheus.Metric) {
5555

5656
GaugeMetricDataForBucket := c.GenerateBucketUsageData()
5757
for bucket, data := range GaugeMetricDataForBucket {
58-
ch <- prometheus.MustNewConstMetric(c.metrics["bucket_usage_byte_metric"], prometheus.GaugeValue, float64(data.value), bucket, data.owner, data.storageClass)
58+
for _, v := range data {
59+
ch <- prometheus.MustNewConstMetric(c.metrics["bucket_usage_byte_metric"], prometheus.GaugeValue, float64(v.value), bucket, v.owner, v.storageClass)
60+
}
5961
}
6062

6163
GaugeMetricDataForUid := c.GenerateUserUsageData()
6264
for uid, data := range GaugeMetricDataForUid {
63-
ch <- prometheus.MustNewConstMetric(c.metrics["user_usage_byte_metric"], prometheus.GaugeValue, float64(data.value), uid, data.storageClass)
65+
for _, v := range data {
66+
ch <- prometheus.MustNewConstMetric(c.metrics["user_usage_byte_metric"], prometheus.GaugeValue, float64(v.value), uid, v.storageClass)
67+
}
6468
}
6569
}
6670

6771
// Get bucket usage cache which like <key><value> = <u_b_test><STANDARD:233333>
68-
func (c *Metrics) GenerateBucketUsageData() (GaugeMetricData map[string]UsageDataWithBucket) {
72+
func (c *Metrics) GenerateBucketUsageData() (GaugeMetricData map[string][]UsageDataWithBucket) {
6973
buckets, err := adminServer.Yig.MetaStorage.GetBuckets()
7074
if err != nil {
7175
helper.Logger.Error("Get usage data for prometheus failed:",
7276
err.Error())
7377
return
7478
}
75-
GaugeMetricData = make(map[string]UsageDataWithBucket)
79+
GaugeMetricData = make(map[string][]UsageDataWithBucket)
7680
for _, bucket := range buckets {
7781
key := BucketUsagePrefix + bucket.Name
7882
usageCache, err := redis.GetUsage(key)
@@ -81,65 +85,68 @@ func (c *Metrics) GenerateBucketUsageData() (GaugeMetricData map[string]UsageDat
8185
err.Error())
8286
return
8387
}
84-
data, err := parseUsage(usageCache)
88+
datas, err := parseUsage(usageCache)
8589
if err != nil {
8690
helper.Logger.Error("Parse usage data from redis for prometheus failed:",
8791
err.Error())
8892
return
8993
}
90-
if data.storageClass != "" {
91-
GaugeMetricData[bucket.Name] = UsageDataWithBucket{data.value, bucket.OwnerId, data.storageClass}
94+
for _, data := range datas {
95+
GaugeMetricData[bucket.Name] = append(GaugeMetricData[bucket.Name], UsageDataWithBucket{data.value, bucket.OwnerId, data.storageClass})
9296
}
9397
}
9498
return
9599
}
96100

97101
// Get bucket usage cache which like <key><value> = <u_p_hehehehe><STANDARD 233333>
98-
func (c *Metrics) GenerateUserUsageData() (GaugeMetricData map[string]UsageData) {
102+
func (c *Metrics) GenerateUserUsageData() (GaugeMetricData map[string][]UsageData) {
99103
buckets, err := adminServer.Yig.MetaStorage.GetBuckets()
100104
if err != nil {
101105
helper.Logger.Error("Get usage data for prometheus failed:",
102106
err.Error())
103107
return
104108
}
105-
GaugeMetricData = make(map[string]UsageData)
109+
GaugeMetricData = make(map[string][]UsageData)
106110
for _, bucket := range buckets {
107-
if GaugeMetricData[bucket.OwnerId].storageClass == "" {
111+
if len(GaugeMetricData[bucket.OwnerId]) == 0 {
108112
key := PidUsagePrefix + bucket.OwnerId
109113
usageCache, err := redis.GetUsage(key)
110114
if err != nil {
111115
helper.Logger.Error("Get usage data from redis for prometheus failed:",
112116
err.Error())
113117
return
114118
}
115-
data, err := parseUsage(usageCache)
119+
datas, err := parseUsage(usageCache)
116120
if err != nil {
117121
helper.Logger.Error("Parse usage data from redis for prometheus failed:",
118122
err.Error())
119123
return
120124
}
121-
if data.storageClass != "" {
122-
GaugeMetricData[bucket.OwnerId] = UsageData{value: data.value, storageClass: data.storageClass}
125+
for _, data := range datas {
126+
GaugeMetricData[bucket.OwnerId] = append(GaugeMetricData[bucket.OwnerId], UsageData{value: data.value, storageClass: data.storageClass})
123127
}
124128
}
125129
}
126130
return
127131
}
128132

129133
// get usage from redis
130-
// <Storage-Class>:<usagenumber>
134+
// <Storage-Class1>:<usagenumber>,<Storage-Class2>:<usagenumber>
131135
// eg. STANDARD:2222
132-
func parseUsage(value string) (*UsageData, error) {
133-
var err error
134-
data := new(UsageData)
136+
func parseUsage(value string) (datas []*UsageData, err error) {
135137
if value == "" {
136-
return data, nil
138+
return
137139
}
138-
allParams := strings.Split(value, ":")
139-
data.value, err = strconv.ParseInt(allParams[1], 10, 64)
140-
if err != nil {
141-
return data, err
140+
storageClass := strings.Split(value, ",")
141+
for _, v := range storageClass {
142+
data := new(UsageData)
143+
allParams := strings.Split(v, ":")
144+
data.value, err = strconv.ParseInt(allParams[1], 10, 64)
145+
if err != nil {
146+
return
147+
}
148+
data.storageClass = allParams[0]
149+
datas = append(datas, data)
142150
}
143-
data.storageClass = allParams[0]
144-
return data, nil
151+
return
145152
}

conf/yig.toml

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ meta_cache_type = 2
2323
meta_store = "tidb"
2424
tidb_info = "root:@tcp(10.5.0.17:4000)/yig"
2525
keepalive = true
26+
enable_usage_push = false
2627
redis_group = ["redis:6379"]
2728
redis_password = "hehehehe"
2829
redis_connection_number = 10
@@ -61,15 +62,13 @@ kms_secret = "your_secret"
6162
version = 0
6263
keyname = "yig"
6364

64-
[msg_bus]
65-
msg_bus_enable = true
66-
msg_bus_type = 1
67-
msg_bus_topic = "testTopic2"
68-
[msg_bus.msg_bus_server]
69-
broker_list = "kafka:29092"
70-
71-
7265
# Plugin Config
66+
[plugins.dummy_mq]
67+
path = "/etc/yig/plugins/dummy_mq_plugin.so"
68+
enable = true
69+
[plugins.dummy_mq.args]
70+
topic = "testTopic2"
71+
url = "kafka:29092"
7372

7473
[plugins.dummy_iam]
7574
path = "/etc/yig/plugins/dummy_iam_plugin.so"

helper/config.go

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,6 @@ type Config struct {
8080
UploadMaxChunkSize int64 `toml:"upload_max_chunk_size"`
8181

8282
KMS KMSConfig `toml:"kms"`
83-
84-
// Message Bus
85-
MsgBus MsgBusConfig `toml:"msg_bus"`
8683
}
8784

8885
type PluginConfig struct {
@@ -100,25 +97,6 @@ type KMSConfig struct {
10097
Keyname string
10198
}
10299

103-
type MsgBusConfig struct {
104-
// Controls whether to enable message bus when receive the request.
105-
Enabled bool `toml:"msg_bus_enable"`
106-
// Controls the under implementation of message bus: 1 for kafka.
107-
Type int `toml:"msg_bus_type"`
108-
// Controls the message topic used by message bus.
109-
Topic string `toml:"msg_bus_topic"`
110-
// Controls the request timeout for sending a req through message bus.
111-
RequestTimeoutMs int `toml:"msg_bus_request_timeout_ms"`
112-
// Controls the total timeout for sending a req through message bus.
113-
// It will timeout if min(MessageTimeoutMs, SendMaxRetries * RequestTimeoutMs) meets.
114-
MessageTimeoutMs int `toml:"msg_bus_message_timeout_ms"`
115-
// Controls the retry time used by message bus if it fails to send a req.
116-
SendMaxRetries int `toml:"msg_bus_send_max_retries"`
117-
// Controls the settings for the implementation of message bus.
118-
// For kafka, the 'broker_list' must be set, like 'broker_list = "kafka:29092"'
119-
Server map[string]interface{} `toml:"msg_bus_server"`
120-
}
121-
122100
var CONFIG Config
123101

124102
func SetupConfig() {
@@ -203,10 +181,5 @@ func MarshalTOMLConfig() error {
203181

204182
CONFIG.KMS = c.KMS
205183

206-
CONFIG.MsgBus = c.MsgBus
207-
CONFIG.MsgBus.RequestTimeoutMs = Ternary(c.MsgBus.RequestTimeoutMs == 0, 3000, c.MsgBus.RequestTimeoutMs).(int)
208-
CONFIG.MsgBus.MessageTimeoutMs = Ternary(c.MsgBus.MessageTimeoutMs == 0, 5000, c.MsgBus.MessageTimeoutMs).(int)
209-
CONFIG.MsgBus.SendMaxRetries = Ternary(c.MsgBus.SendMaxRetries == 0, 2, c.MsgBus.SendMaxRetries).(int)
210-
211184
return nil
212185
}

0 commit comments

Comments
 (0)