Skip to content

Commit 72255b8

Browse files
committed
add semaphore throtttler
1 parent 74208af commit 72255b8

File tree

6 files changed

+126
-53
lines changed

6 files changed

+126
-53
lines changed

README.MD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ You can find list of returning error types for all existing throttlers in thrott
171171
| retry | `func NewThrottlerRetry(thr Throttler, retries uint64) Throttler` | Retries provided throttler error up until the provided retries threshold.<br> If provided onthreshold flag is set even `ErrorThreshold` errors will be retried.<br> Internally retry uses square throttler with `DefaultRetriedDuration` initial duration.<br> - could return any underlying throttler error; |
172172
| cache | `func NewThrottlerCache(thr Throttler, cache time.Duration) Throttler` | Caches provided throttler calls for the provided cache duration, throttler release resulting resets cache.<br> Only non throttling calls are cached for the provided cache duration.<br> - could return any underlying throttler error; |
173173
| generator | `func NewThrottlerGenerator(gen Generator, capacity uint64, eviction float64) Throttler` | Creates new throttler instance that throttles if found key matching throttler throttles.<br> If no key matching throttler has been found generator used insted to provide new throttler that will be added to existing throttlers map.<br> Generated throttlers are kept in bounded map with capacity *c* defined by the specified capacity and eviction rate *e* defined by specified eviction value is normalized to [0.0, 1.0], where eviction rate affects number of throttlers that will be removed from the map after bounds overflow.<br> Use `WithKey` to specify key for throttler matching and generation.<br> - could return `ErrorInternal`;<br> - could return any underlying throttler error; |
174+
| semaphore | `func NewThrottlerSemaphore(weight int64) Throttler` | Creates new throttler instance that throttles call if underlying semaphore throttles.<br>Use `WithWeight` to override context call weight, 1 by default.<br> - could return `ErrorThreshold`; |
174175

175176
## Integrations
176177

context.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const (
1313
ghctxmessage ghctxid = "gohalt_context_message"
1414
ghctxtimestamp ghctxid = "gohalt_context_timestamp"
1515
ghctxmarshaler ghctxid = "gohalt_context_marshaler"
16+
ghctxweight ghctxid = "gohalt_context_weight"
1617
)
1718

1819
// WithTimestamp adds the provided timestamp to the provided context
@@ -153,3 +154,19 @@ func (ctx ctxthr) Err() (err error) {
153154
func (ctx ctxthr) Throttler() Throttler {
154155
return ctx.thr
155156
}
157+
158+
// WithWeight adds the provided weight to the provided context
159+
// to differ `Acquire` weight levels.
160+
// Resulted context is used by: `semaphore` throtttler.
161+
func WithWeight(ctx context.Context, weight int64) context.Context {
162+
return context.WithValue(ctx, ghctxweight, weight)
163+
}
164+
165+
func ctxWeight(ctx context.Context) int64 {
166+
if val := ctx.Value(ghctxweight); val != nil {
167+
if weight, ok := val.(int64); ok && weight > 0 {
168+
return weight
169+
}
170+
}
171+
return 1
172+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ require (
2020
github.com/stretchr/testify v1.6.1
2121
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a // indirect
2222
golang.org/x/net v0.0.0-20200927032502-5d4f70055728 // indirect
23+
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
2324
golang.org/x/sys v0.0.0-20200926100807-9d91bd62050c // indirect
2425
golang.org/x/text v0.3.3 // indirect
2526
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
345345
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
346346
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
347347
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
348+
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
348349
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
349350
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
350351
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

throttlers.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"regexp"
77
"sync"
88
"time"
9+
10+
"golang.org/x/sync/semaphore"
911
)
1012

1113
// Throttler defines core gohalt throttler abstraction and exposes pair of counterpart methods: `Acquire` and `Release`.
@@ -1075,3 +1077,32 @@ func (thr *tgenerator) Release(ctx context.Context) error {
10751077
}
10761078
return nil
10771079
}
1080+
1081+
type tsemaphore struct {
1082+
sem *semaphore.Weighted
1083+
}
1084+
1085+
// NewThrottlerSemaphore creates new throttler instance that
1086+
// throttles call if underlying semaphore throttles.
1087+
// Use `WithWeight` to override context call weight, 1 by default.
1088+
// - could return `ErrorThreshold`;
1089+
func NewThrottlerSemaphore(weight int64) Throttler {
1090+
return tsemaphore{sem: semaphore.NewWeighted(weight)}
1091+
}
1092+
1093+
func (thr tsemaphore) Acquire(ctx context.Context) error {
1094+
if ok := thr.sem.TryAcquire(ctxWeight(ctx)); !ok {
1095+
return ErrorThreshold{
1096+
Throttler: "semaphore",
1097+
Threshold: strbool(ok),
1098+
}
1099+
}
1100+
return nil
1101+
}
1102+
1103+
func (thr tsemaphore) Release(ctx context.Context) error {
1104+
// prevent over releasing panic.
1105+
defer func() { _ = recover() }()
1106+
thr.sem.Release(ctxWeight(ctx))
1107+
return nil
1108+
}

throttlers_test.go

Lines changed: 75 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ const (
2727
ms30_0 time.Duration = 30 * time.Millisecond
2828
)
2929

30-
var trun Runner = NewRunnerSync(context.Background(), NewThrottlerBuffered(1))
30+
var trun Runner = NewRunnerSync(context.TODO(), NewThrottlerBuffered(1))
3131

3232
type tcase struct {
3333
tms uint64 // number of sub runs inside one case
@@ -46,7 +46,7 @@ type tcase struct {
4646

4747
func (t *tcase) run(index int) (dur time.Duration, err error) {
4848
// get context with fallback
49-
ctx := context.Background()
49+
ctx := context.TODO()
5050
if index < len(t.ctxs) {
5151
ctx = t.ctxs[index]
5252
}
@@ -124,7 +124,7 @@ func (t *tcase) result(index int) (dur time.Duration, err error) {
124124

125125
func TestThrottlers(t *testing.T) {
126126
DefaultRetriedDuration = time.Millisecond
127-
cctx, cancel := context.WithCancel(context.Background())
127+
cctx, cancel := context.WithCancel(context.TODO())
128128
cancel()
129129
testerr := errors.New("test")
130130
table := map[string]tcase{
@@ -219,7 +219,7 @@ func TestThrottlers(t *testing.T) {
219219
thr: NewThrottlerContext(),
220220
ctxs: []context.Context{
221221
cctx,
222-
context.Background(),
222+
context.TODO(),
223223
cctx,
224224
},
225225
errs: []error{
@@ -301,9 +301,9 @@ func TestThrottlers(t *testing.T) {
301301
tms: 3,
302302
thr: NewThrottlerPast(time.Date(2000, 1, 1, 10, 0, 0, 0, time.Local)),
303303
ctxs: []context.Context{
304-
WithTimestamp(context.Background(), time.Date(1900, 1, 1, 10, 0, 0, 0, time.UTC)),
305-
WithTimestamp(context.Background(), time.Date(2000, 1, 1, 9, 59, 0, 0, time.Local)),
306-
WithTimestamp(context.Background(), time.Date(2000, 1, 1, 10, 59, 0, 0, time.Local)),
304+
WithTimestamp(context.TODO(), time.Date(1900, 1, 1, 10, 0, 0, 0, time.UTC)),
305+
WithTimestamp(context.TODO(), time.Date(2000, 1, 1, 9, 59, 0, 0, time.Local)),
306+
WithTimestamp(context.TODO(), time.Date(2000, 1, 1, 10, 59, 0, 0, time.Local)),
307307
},
308308
errs: []error{
309309
ErrorThreshold{
@@ -327,9 +327,9 @@ func TestThrottlers(t *testing.T) {
327327
tms: 3,
328328
thr: NewThrottlerFuture(time.Date(2000, 1, 1, 10, 0, 0, 0, time.Local)),
329329
ctxs: []context.Context{
330-
WithTimestamp(context.Background(), time.Date(1900, 1, 1, 10, 0, 0, 0, time.UTC)),
331-
WithTimestamp(context.Background(), time.Date(2000, 1, 1, 9, 59, 0, 0, time.Local)),
332-
WithTimestamp(context.Background(), time.Date(2000, 1, 1, 10, 59, 0, 0, time.Local)),
330+
WithTimestamp(context.TODO(), time.Date(1900, 1, 1, 10, 0, 0, 0, time.UTC)),
331+
WithTimestamp(context.TODO(), time.Date(2000, 1, 1, 9, 59, 0, 0, time.Local)),
332+
WithTimestamp(context.TODO(), time.Date(2000, 1, 1, 10, 59, 0, 0, time.Local)),
333333
},
334334
errs: []error{
335335
nil,
@@ -447,13 +447,13 @@ func TestThrottlers(t *testing.T) {
447447
delayed(ms3_0, nope),
448448
},
449449
ctxs: []context.Context{
450-
WithPriority(context.Background(), 1),
451-
WithPriority(context.Background(), 1),
452-
WithPriority(context.Background(), 1),
453-
WithPriority(context.Background(), 2),
454-
WithPriority(context.Background(), 2),
455-
WithPriority(context.Background(), 2),
456-
WithPriority(context.Background(), 2),
450+
WithPriority(context.TODO(), 1),
451+
WithPriority(context.TODO(), 1),
452+
WithPriority(context.TODO(), 1),
453+
WithPriority(context.TODO(), 2),
454+
WithPriority(context.TODO(), 2),
455+
WithPriority(context.TODO(), 2),
456+
WithPriority(context.TODO(), 2),
457457
},
458458
durs: []time.Duration{
459459
ms0_0,
@@ -792,9 +792,9 @@ func TestThrottlers(t *testing.T) {
792792
tms: 3,
793793
thr: NewThrottlerEnqueue(enqmock{}),
794794
ctxs: []context.Context{
795-
WithMarshaler(context.Background(), nil),
796-
WithMarshaler(context.Background(), nil),
797-
WithMarshaler(context.Background(), nil),
795+
WithMarshaler(context.TODO(), nil),
796+
WithMarshaler(context.TODO(), nil),
797+
WithMarshaler(context.TODO(), nil),
798798
},
799799
errs: []error{
800800
ErrorInternal{Throttler: "enqueue", Message: "context doesn't contain required marshaler"},
@@ -815,9 +815,9 @@ func TestThrottlers(t *testing.T) {
815815
tms: 3,
816816
thr: NewThrottlerEnqueue(enqmock{}),
817817
ctxs: []context.Context{
818-
WithMarshaler(WithMessage(context.Background(), "test"), marshal(testerr)),
819-
WithMarshaler(WithMessage(context.Background(), "test"), marshal(testerr)),
820-
WithMarshaler(WithMessage(context.Background(), "test"), marshal(testerr)),
818+
WithMarshaler(WithMessage(context.TODO(), "test"), marshal(testerr)),
819+
WithMarshaler(WithMessage(context.TODO(), "test"), marshal(testerr)),
820+
WithMarshaler(WithMessage(context.TODO(), "test"), marshal(testerr)),
821821
},
822822
errs: []error{
823823
ErrorInternal{Throttler: "enqueue", Message: testerr.Error()},
@@ -829,9 +829,9 @@ func TestThrottlers(t *testing.T) {
829829
tms: 3,
830830
thr: NewThrottlerEnqueue(enqmock{err: testerr}),
831831
ctxs: []context.Context{
832-
WithMessage(context.Background(), "test"),
833-
WithMessage(context.Background(), "test"),
834-
WithMessage(context.Background(), "test"),
832+
WithMessage(context.TODO(), "test"),
833+
WithMessage(context.TODO(), "test"),
834+
WithMessage(context.TODO(), "test"),
835835
},
836836
errs: []error{
837837
ErrorInternal{Throttler: "enqueue", Message: testerr.Error()},
@@ -843,9 +843,9 @@ func TestThrottlers(t *testing.T) {
843843
tms: 3,
844844
thr: NewThrottlerEnqueue(enqmock{}),
845845
ctxs: []context.Context{
846-
WithMarshaler(WithMessage(context.Background(), "test"), marshal(nil)),
847-
WithMessage(context.Background(), "test"),
848-
WithMessage(context.Background(), "test"),
846+
WithMarshaler(WithMessage(context.TODO(), "test"), marshal(nil)),
847+
WithMessage(context.TODO(), "test"),
848+
WithMessage(context.TODO(), "test"),
849849
},
850850
},
851851
"Throttler adaptive should throttle on throttling adoptee": {
@@ -883,9 +883,9 @@ func TestThrottlers(t *testing.T) {
883883
tms: 3,
884884
thr: NewThrottlerPattern(),
885885
ctxs: []context.Context{
886-
context.Background(),
887-
WithKey(context.Background(), ""),
888-
WithKey(context.Background(), "test"),
886+
context.TODO(),
887+
WithKey(context.TODO(), ""),
888+
WithKey(context.TODO(), "test"),
889889
},
890890
errs: []error{
891891
ErrorInternal{Throttler: "pattern", Message: "known key is not found"},
@@ -906,11 +906,11 @@ func TestThrottlers(t *testing.T) {
906906
},
907907
),
908908
ctxs: []context.Context{
909-
context.Background(),
910-
WithKey(context.Background(), "125"),
911-
WithKey(context.Background(), "test"),
912-
WithKey(context.Background(), "nontest"),
913-
WithKey(context.Background(), "non"),
909+
context.TODO(),
910+
WithKey(context.TODO(), "125"),
911+
WithKey(context.TODO(), "test"),
912+
WithKey(context.TODO(), "nontest"),
913+
WithKey(context.TODO(), "non"),
914914
},
915915
errs: []error{
916916
ErrorInternal{Throttler: "pattern", Message: "known key is not found"},
@@ -1132,9 +1132,9 @@ func TestThrottlers(t *testing.T) {
11321132
0.1,
11331133
),
11341134
ctxs: []context.Context{
1135-
WithKey(context.Background(), "test"),
1136-
WithKey(context.Background(), "nontest"),
1137-
WithKey(context.Background(), "111"),
1135+
WithKey(context.TODO(), "test"),
1136+
WithKey(context.TODO(), "nontest"),
1137+
WithKey(context.TODO(), "111"),
11381138
},
11391139
errs: []error{
11401140
ErrorInternal{
@@ -1161,11 +1161,11 @@ func TestThrottlers(t *testing.T) {
11611161
0.1,
11621162
),
11631163
ctxs: []context.Context{
1164-
WithKey(context.Background(), "125"),
1165-
WithKey(context.Background(), "125"),
1166-
WithKey(context.Background(), "test"),
1167-
WithKey(context.Background(), "nontest"),
1168-
WithKey(context.Background(), "125"),
1164+
WithKey(context.TODO(), "125"),
1165+
WithKey(context.TODO(), "125"),
1166+
WithKey(context.TODO(), "test"),
1167+
WithKey(context.TODO(), "nontest"),
1168+
WithKey(context.TODO(), "125"),
11691169
},
11701170
errs: []error{
11711171
nil,
@@ -1191,13 +1191,13 @@ func TestThrottlers(t *testing.T) {
11911191
1000,
11921192
),
11931193
ctxs: []context.Context{
1194-
WithKey(context.Background(), "111"),
1195-
WithKey(context.Background(), "test"),
1196-
WithKey(context.Background(), "111"),
1197-
WithKey(context.Background(), "test1"),
1198-
WithKey(context.Background(), "kkk"),
1199-
WithKey(context.Background(), "kkk"),
1200-
WithKey(context.Background(), "test2"),
1194+
WithKey(context.TODO(), "111"),
1195+
WithKey(context.TODO(), "test"),
1196+
WithKey(context.TODO(), "111"),
1197+
WithKey(context.TODO(), "test1"),
1198+
WithKey(context.TODO(), "kkk"),
1199+
WithKey(context.TODO(), "kkk"),
1200+
WithKey(context.TODO(), "test2"),
12011201
},
12021202
errs: []error{
12031203
nil,
@@ -1215,6 +1215,28 @@ func TestThrottlers(t *testing.T) {
12151215
nil,
12161216
},
12171217
},
1218+
"Throttler semaphore should throttle on semaphore threshold": {
1219+
tms: 3,
1220+
thr: NewThrottlerSemaphore(5),
1221+
ctxs: []context.Context{
1222+
WithWeight(context.TODO(), 2),
1223+
WithWeight(context.TODO(), 1),
1224+
WithWeight(context.TODO(), 3),
1225+
},
1226+
acts: []Runnable{
1227+
delayed(ms1_0, nope),
1228+
delayed(ms1_0, nope),
1229+
delayed(ms1_0, nope),
1230+
},
1231+
errs: []error{
1232+
nil,
1233+
nil,
1234+
ErrorThreshold{
1235+
Throttler: "semaphore",
1236+
Threshold: strbool(false),
1237+
},
1238+
},
1239+
},
12181240
}
12191241
for tname, ptrtcase := range table {
12201242
t.Run(tname, func(t *testing.T) {
@@ -1256,7 +1278,7 @@ func BenchmarkComplexThrottlers(b *testing.B) {
12561278
Stats{MEMAlloc: 1000},
12571279
),
12581280
)
1259-
ctx := context.Background()
1281+
ctx := context.TODO()
12601282
for i := 0; i < b.N; i++ {
12611283
_ = thr.Acquire(ctx)
12621284
_ = thr.Release(ctx)

0 commit comments

Comments
 (0)