Skip to content

Commit f7a7f95

Browse files
support regex in flow and isolation for resource name (#595)
1 parent 368e01c commit f7a7f95

File tree

8 files changed

+419
-40
lines changed

8 files changed

+419
-40
lines changed

core/flow/rule.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ type Rule struct {
117117
HighMemUsageThreshold int64 `json:"highMemUsageThreshold"`
118118
MemLowWaterMarkBytes int64 `json:"memLowWaterMarkBytes"`
119119
MemHighWaterMarkBytes int64 `json:"memHighWaterMarkBytes"`
120+
// Regex indicates whether the rule is a regex rule
121+
Regex bool `json:"regex"`
120122
}
121123

122124
func (r *Rule) isEqualsTo(newRule *Rule) bool {

core/flow/rule_manager.go

Lines changed: 112 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package flow
1717
import (
1818
"fmt"
1919
"reflect"
20+
"regexp"
2021
"sync"
2122

2223
"github.com/alibaba/sentinel-golang/core/base"
@@ -41,10 +42,12 @@ type trafficControllerGenKey struct {
4142
type TrafficControllerMap map[string][]*TrafficShapingController
4243

4344
var (
44-
tcGenFuncMap = make(map[trafficControllerGenKey]TrafficControllerGenFunc, 6)
45-
tcMap = make(TrafficControllerMap)
46-
tcMux = new(sync.RWMutex)
47-
nopStat = &standaloneStatistic{
45+
tcGenFuncMap = make(map[trafficControllerGenKey]TrafficControllerGenFunc, 6)
46+
tcMap = make(TrafficControllerMap)
47+
tcRegexMap = make(TrafficControllerMap)
48+
tcRegexCacheMap = make(TrafficControllerMap)
49+
tcMux = new(sync.RWMutex)
50+
nopStat = &standaloneStatistic{
4851
reuseResourceStat: false,
4952
readOnlyMetric: base.NopReadStat(),
5053
writeOnlyMetric: base.NopWriteStat(),
@@ -204,24 +207,35 @@ func onRuleUpdate(rawResRulesMap map[string][]*Rule) (err error) {
204207
start := util.CurrentTimeNano()
205208

206209
tcMux.RLock()
207-
tcMapClone := make(TrafficControllerMap, len(validResRulesMap))
210+
tcMapClone := make(TrafficControllerMap, len(tcMap)+len(tcRegexMap))
208211
for res, tcs := range tcMap {
209212
resTcClone := make([]*TrafficShapingController, 0, len(tcs))
210213
resTcClone = append(resTcClone, tcs...)
211214
tcMapClone[res] = resTcClone
212215
}
216+
for res, tcs := range tcRegexMap {
217+
resTcClone := make([]*TrafficShapingController, 0, len(tcs))
218+
resTcClone = append(resTcClone, tcs...)
219+
tcMapClone[res] = resTcClone
220+
}
213221
tcMux.RUnlock()
214222

215-
m := make(TrafficControllerMap, len(validResRulesMap))
223+
m := make(TrafficControllerMap)
224+
regexM := make(TrafficControllerMap)
216225
for res, rulesOfRes := range validResRulesMap {
217-
newTcsOfRes := buildResourceTrafficShapingController(res, rulesOfRes, tcMapClone[res])
226+
newTcsOfRes, newRegexTcsOfRes := buildResourceTrafficShapingController(res, rulesOfRes, tcMapClone[res])
218227
if len(newTcsOfRes) > 0 {
219228
m[res] = newTcsOfRes
220229
}
230+
if len(newRegexTcsOfRes) > 0 {
231+
regexM[res] = newRegexTcsOfRes
232+
}
221233
}
222234

223235
tcMux.Lock()
224236
tcMap = m
237+
tcRegexMap = regexM
238+
tcRegexCacheMap = make(TrafficControllerMap)
225239
tcMux.Unlock()
226240
currentRules = rawResRulesMap
227241

@@ -277,15 +291,22 @@ func onResourceRuleUpdate(res string, rawResRules []*Rule) (err error) {
277291
oldResTcs := make([]*TrafficShapingController, 0)
278292
tcMux.RLock()
279293
oldResTcs = append(oldResTcs, tcMap[res]...)
294+
oldResTcs = append(oldResTcs, tcRegexMap[res]...)
280295
tcMux.RUnlock()
281-
newResTcs := buildResourceTrafficShapingController(res, validResRules, oldResTcs)
296+
newResTcs, newRegexResTcs := buildResourceTrafficShapingController(res, validResRules, oldResTcs)
282297

283298
tcMux.Lock()
284299
if len(newResTcs) == 0 {
285300
delete(tcMap, res)
286301
} else {
287302
tcMap[res] = newResTcs
288303
}
304+
if len(newRegexResTcs) == 0 {
305+
delete(tcRegexMap, res)
306+
} else {
307+
tcRegexMap[res] = newRegexResTcs
308+
}
309+
tcRegexCacheMap = make(TrafficControllerMap)
289310
tcMux.Unlock()
290311
currentRules[res] = rawResRules
291312
logging.Debug("[Flow onResourceRuleUpdate] Time statistic(ns) for updating flow rule", "timeCost", util.CurrentTimeNano()-start)
@@ -308,6 +329,8 @@ func LoadRulesOfResource(res string, rules []*Rule) (bool, error) {
308329
// clear tcMap
309330
tcMux.Lock()
310331
delete(tcMap, res)
332+
delete(tcRegexMap, res)
333+
tcRegexCacheMap = make(TrafficControllerMap)
311334
tcMux.Unlock()
312335
logging.Info("[Flow] clear resource level rules", "resource", res)
313336
return true, nil
@@ -329,19 +352,31 @@ func getRules() []*Rule {
329352
tcMux.RLock()
330353
defer tcMux.RUnlock()
331354

332-
return rulesFrom(tcMap)
355+
return append(rulesFrom(tcMap), rulesFrom(tcRegexMap)...)
333356
}
334357

335358
// getRulesOfResource returns specific resource's rules。Any changes of rules take effect for flow module
336359
// getRulesOfResource is an internal interface.
337360
func getRulesOfResource(res string) []*Rule {
338361
tcMux.RLock()
339-
defer tcMux.RUnlock()
340362

341-
resTcs, exist := tcMap[res]
342-
if !exist {
343-
return nil
363+
resTcs := make([]*TrafficShapingController, 0)
364+
if tcs, ok := tcMap[res]; ok {
365+
resTcs = append(resTcs, tcs...)
366+
}
367+
cacheRegexTcs, ok := tcRegexCacheMap[res]
368+
tcMux.RUnlock()
369+
370+
if ok {
371+
resTcs = append(resTcs, cacheRegexTcs...)
372+
} else {
373+
tcMux.Lock()
374+
regexTcs := regexTcMatches(res)
375+
resTcs = append(resTcs, regexTcs...)
376+
tcRegexCacheMap[res] = regexTcs
377+
tcMux.Unlock()
344378
}
379+
345380
ret := make([]*Rule, 0, len(resTcs))
346381
for _, tc := range resTcs {
347382
ret = append(ret, tc.BoundRule())
@@ -507,9 +542,28 @@ func RemoveTrafficShapingGenerator(tokenCalculateStrategy TokenCalculateStrategy
507542

508543
func getTrafficControllerListFor(name string) []*TrafficShapingController {
509544
tcMux.RLock()
510-
defer tcMux.RUnlock()
545+
rules := make([]*TrafficShapingController, 0)
546+
rules = append(rules, tcMap[name]...)
547+
regexCacheRules, ok := tcRegexCacheMap[name]
548+
tcMux.RUnlock()
549+
550+
if ok {
551+
rules = append(rules, regexCacheRules...)
552+
return rules
553+
}
511554

512-
return tcMap[name]
555+
tcMux.Lock()
556+
defer tcMux.Unlock()
557+
// double check
558+
if regexCacheRules, ok = tcRegexCacheMap[name]; ok {
559+
rules = append(rules, regexCacheRules...)
560+
return rules
561+
}
562+
regexRules := regexTcMatches(name)
563+
tcRegexCacheMap[name] = regexRules
564+
rules = append(rules, regexRules...)
565+
566+
return rules
513567
}
514568

515569
func calculateReuseIndexFor(r *Rule, oldResTcs []*TrafficShapingController) (equalIdx, reuseStatIdx int) {
@@ -539,8 +593,10 @@ func calculateReuseIndexFor(r *Rule, oldResTcs []*TrafficShapingController) (equ
539593
}
540594

541595
// buildResourceTrafficShapingController builds TrafficShapingController slice from rules. the resource of rules must be equals to res
542-
func buildResourceTrafficShapingController(res string, rulesOfRes []*Rule, oldResTcs []*TrafficShapingController) []*TrafficShapingController {
596+
func buildResourceTrafficShapingController(res string, rulesOfRes []*Rule,
597+
oldResTcs []*TrafficShapingController) (simpleControllers []*TrafficShapingController, regexControllers []*TrafficShapingController) {
543598
newTcsOfRes := make([]*TrafficShapingController, 0, len(rulesOfRes))
599+
newRegexTcsOfRes := make([]*TrafficShapingController, 0, len(rulesOfRes))
544600
for _, rule := range rulesOfRes {
545601
if res != rule.Resource {
546602
logging.Error(errors.Errorf("unmatched resource name expect: %s, actual: %s", res, rule.Resource), "Unmatched resource name in flow.buildResourceTrafficShapingController()", "rule", rule)
@@ -582,9 +638,15 @@ func buildResourceTrafficShapingController(res string, rulesOfRes []*Rule, oldRe
582638
// remove old tc from oldResTcs
583639
oldResTcs = append(oldResTcs[:reuseStatIdx], oldResTcs[reuseStatIdx+1:]...)
584640
}
585-
newTcsOfRes = append(newTcsOfRes, tc)
641+
642+
if tc.rule != nil && tc.rule.Regex {
643+
newRegexTcsOfRes = append(newRegexTcsOfRes, tc)
644+
} else {
645+
newTcsOfRes = append(newTcsOfRes, tc)
646+
}
647+
586648
}
587-
return newTcsOfRes
649+
return newTcsOfRes, newRegexTcsOfRes
588650
}
589651

590652
// IsValidRule checks whether the given Rule is valid.
@@ -649,3 +711,35 @@ func IsValidRule(rule *Rule) error {
649711

650712
return nil
651713
}
714+
715+
func regexTcMatches(resource string) []*TrafficShapingController {
716+
result := make([]*TrafficShapingController, 0)
717+
for pattern, controllers := range tcRegexMap {
718+
re := regexp.MustCompile(pattern)
719+
if !re.MatchString(resource) {
720+
continue
721+
}
722+
controllersCopy := make([]*TrafficShapingController, len(controllers))
723+
for resourceName, controller := range controllers {
724+
generator, supported := tcGenFuncMap[trafficControllerGenKey{
725+
tokenCalculateStrategy: controller.rule.TokenCalculateStrategy,
726+
controlBehavior: controller.rule.ControlBehavior,
727+
}]
728+
if !supported || generator == nil {
729+
logging.Error(errors.New("get trafficShapingController copy failed, unsupported flow control strategy"),
730+
"Ignoring the rule due to unsupported control behavior in flow."+
731+
"buildResourceTrafficShapingController()", "rule", controller.rule)
732+
continue
733+
}
734+
if tc, e := generator(controller.rule, nil); e == nil && tc != nil {
735+
controllersCopy[resourceName] = tc
736+
} else {
737+
logging.Error(errors.New("get trafficShapingController copy failed, bad generated traffic controller"),
738+
"Ignoring the rule due to bad generated traffic controller in flow."+
739+
"buildResourceTrafficShapingController()", "rule", controller.rule)
740+
}
741+
}
742+
result = append(result, controllersCopy...)
743+
}
744+
return result
745+
}

core/flow/rule_manager_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ func Test_buildResourceTrafficShapingController(t *testing.T) {
263263
MaxQueueingTimeMs: 10,
264264
}
265265
assert.True(t, len(tcMap["abc1"]) == 0)
266-
tcs := buildResourceTrafficShapingController("abc1", []*Rule{r1, r2}, tcMap["abc1"])
266+
tcs, _ := buildResourceTrafficShapingController("abc1", []*Rule{r1, r2}, tcMap["abc1"])
267267
assert.True(t, len(tcs) == 2)
268268
assert.True(t, tcs[0].BoundRule() == r1)
269269
assert.True(t, tcs[1].BoundRule() == r2)
@@ -415,7 +415,7 @@ func Test_buildResourceTrafficShapingController(t *testing.T) {
415415
StatIntervalInMs: 50000,
416416
}
417417

418-
tcs := buildResourceTrafficShapingController("abc1", []*Rule{r12, r22, r32, r42}, tcMap["abc1"])
418+
tcs, _ := buildResourceTrafficShapingController("abc1", []*Rule{r12, r22, r32, r42}, tcMap["abc1"])
419419
assert.True(t, len(tcs) == 4)
420420

421421
assert.True(t, tcs[0].BoundRule() == r12)

core/flow/slot_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,68 @@ func Test_FlowSlot_StandaloneStat(t *testing.T) {
6666
}
6767
assert.True(t, getTrafficControllerListFor("abc")[0].boundStat.readOnlyMetric.GetSum(base.MetricEventPass) == 50)
6868
}
69+
70+
func Test_RegexFlowSlot_StandaloneStat(t *testing.T) {
71+
slot := &Slot{}
72+
statSLot := &StandaloneStatSlot{}
73+
res1 := base.NewResourceWrapper("abc/123", base.ResTypeCommon, base.Inbound)
74+
res2 := base.NewResourceWrapper("abc/456", base.ResTypeCommon, base.Inbound)
75+
resNode1 := stat.GetOrCreateResourceNode("abc/123", base.ResTypeCommon)
76+
resNode2 := stat.GetOrCreateResourceNode("abc/456", base.ResTypeCommon)
77+
ctx1 := &base.EntryContext{
78+
Resource: res1,
79+
StatNode: resNode1,
80+
Input: &base.SentinelInput{
81+
BatchCount: 1,
82+
},
83+
RuleCheckResult: nil,
84+
Data: nil,
85+
}
86+
ctx2 := &base.EntryContext{
87+
Resource: res2,
88+
StatNode: resNode2,
89+
Input: &base.SentinelInput{
90+
BatchCount: 1,
91+
},
92+
RuleCheckResult: nil,
93+
Data: nil,
94+
}
95+
96+
r1 := &Rule{
97+
Resource: "abc/*",
98+
TokenCalculateStrategy: Direct,
99+
ControlBehavior: Reject,
100+
// Use standalone statistic, using single-bucket-sliding-windows
101+
StatIntervalInMs: 20000,
102+
Threshold: 100,
103+
RelationStrategy: CurrentResource,
104+
Regex: true,
105+
}
106+
_, e := LoadRules([]*Rule{r1})
107+
if e != nil {
108+
logging.Error(e, "")
109+
t.Fail()
110+
return
111+
}
112+
113+
for i := 0; i < 80; i++ {
114+
ret := slot.Check(ctx1)
115+
if ret != nil {
116+
t.Fail()
117+
return
118+
}
119+
statSLot.OnEntryPassed(ctx1)
120+
}
121+
for i := 0; i < 80; i++ {
122+
ret := slot.Check(ctx2)
123+
if ret != nil {
124+
t.Fail()
125+
return
126+
}
127+
statSLot.OnEntryPassed(ctx2)
128+
}
129+
assert.True(t, getTrafficControllerListFor("abc/123")[0].boundStat.readOnlyMetric.GetSum(base.
130+
MetricEventPass) == 80)
131+
assert.True(t, getTrafficControllerListFor("abc/456")[0].boundStat.readOnlyMetric.GetSum(base.
132+
MetricEventPass) == 80)
133+
}

core/isolation/rule.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,16 @@ type Rule struct {
4646
// Currently Concurrency is supported for concurrency limiting.
4747
MetricType MetricType `json:"metricType"`
4848
Threshold uint32 `json:"threshold"`
49+
// Regex indicates whether the rule is a regex rule
50+
Regex bool `json:"regex"`
4951
}
5052

5153
func (r *Rule) String() string {
5254
b, err := json.Marshal(r)
5355
if err != nil {
5456
// Return the fallback string
55-
return fmt.Sprintf("{Id=%s, Resource=%s, MetricType=%s, Threshold=%d}", r.ID, r.Resource, r.MetricType.String(), r.Threshold)
57+
return fmt.Sprintf("{Id=%s, Resource=%s, MetricType=%s, Threshold=%d, Regex=%v}", r.ID, r.Resource,
58+
r.MetricType.String(), r.Threshold, r.Regex)
5659
}
5760
return string(b)
5861
}

0 commit comments

Comments
 (0)