Skip to content

Commit de10795

Browse files
committed
Ensures point balance will not update if value is ErrPts
1 parent 4128f7b commit de10795

File tree

4 files changed

+73
-27
lines changed

4 files changed

+73
-27
lines changed

README.md

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,35 +12,49 @@ No external dependencies for this package.
1212

1313
Create a new Semaphore instance by supplying the capacity of the number of Goroutines you wish to run concurrently and information about the GraphQL point balance.
1414

15-
`Aquire(ctx context.Context)` accepts a context which will return an error, if one has happened such as a context timeout.
15+
The two key methods are:
1616

17-
`Release(pts int32)` accepts an integer representing the remaining point balance returned by Shopify's GraphQL API response.
17+
* `Aquire(ctx context.Context)` which accepts a context which will return an error, if one has happened (such as a context timeout).
18+
* `Release(pts int32)` which accepts an integer representing the remaining point balance returned by Shopify's GraphQL API response.
1819

1920
Example usage:
2021

2122
```go
2223
package main
2324

24-
import ssem "github.com/gniktr/shopifysemaphore"
25+
import (
26+
"log"
27+
ssem "github.com/gniktr/shopifysemaphore"
28+
)
2529

2630
func work(id int, wg *sync.WaitGroup, ctx context.Context, sem *ssem.Semaphore) {
2731
err := sem.Aquire(ctx)
2832
if err != nil {
29-
// Context timeout.
33+
// Possible context timeout.
34+
log.Printf("work: %w", err)
3035
wg.Done()
3136
return
3237
}
3338

34-
points, err := graphQLCall() // Return remaining points from call.
39+
// Return remaining points from call.
40+
points, err := graphQLCall()
3541
if err != nil {
36-
// Handle error.
42+
log.Printf("work: %w", err)
43+
44+
// If error is a network error or bad request for example, essentially
45+
// any error which would cause the response to *not* return point information,
46+
// then you should set the points to ErrPts to not trigger a point
47+
// update in Balance.
48+
points := ssem.ErrPts
3749
}
3850
fmt.Printf("remaining: %d points", points)
51+
52+
wg.Done()
3953
sem.Release(points)
4054
}
4155

4256
func main() {
43-
fmt.Println("started!")
57+
log.Println("started!")
4458
done := make(chan bool)
4559
ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Minute)
4660

@@ -51,10 +65,10 @@ func main() {
5165
10,
5266
ssem.NewBalance(200, 2000, 100),
5367
ssem.WithPauseFunc(func (pts int32, dur time.Duration) {
54-
fmt.Printf("pausing for %s due to remaining points of %d...", dur, pts)
68+
log.Printf("pausing for %s due to remaining points of %d...", dur, pts)
5569
}),
5670
ssem.WithResumeFunc(func () {
57-
fmt.Println("resuming...")
71+
log.Println("resuming...")
5872
})
5973
)
6074

@@ -73,11 +87,11 @@ func main() {
7387

7488
select {
7589
case <-ctx.Done():
76-
fmt.Println("timeout happened.")
90+
log.Println("timeout happened.")
7791
case <-done:
78-
fmt.Println("work finished.")
92+
log.Println("work finished.")
7993
}
80-
fmt.Println("completed.")
94+
log.Println("completed.")
8195
}
8296
```
8397

@@ -114,9 +128,15 @@ package shopifysemaphore // import "github.com/gnikyt/shopify-semaphore"
114128
VARIABLES
115129
116130
var (
117-
DefaultAquireBuffer = 200 * time.Millisecond
118-
DefaultPauseBuffer = 1 * time.Second
131+
DefaultAquireBuffer = 200 * time.Millisecond // Default aquire throttle duration.
132+
DefaultPauseBuffer = 1 * time.Second // Default pause buffer to append to pause duration calculation.
119133
)
134+
var ErrPts int32 = -1
135+
ErrPts is the points value to pass in if a network or other error happens.
136+
Essentially to be used for situations where no response containing point
137+
information was returned. This is used to know if the Update method should
138+
actually update the remaining point balance or not.
139+
120140
121141
FUNCTIONS
122142
@@ -141,10 +161,10 @@ func WithResumeFunc(fn func()) func(*Semaphore)
141161
TYPES
142162
143163
type Balance struct {
144-
Remaining atomic.Int32 // Point balance remaining.
145-
Threshold int32 // Minimum point balance where we would consider handling with a "pause".
146-
Limit int32 // Maximum points available.
147-
RefillRate int32 // Number of points refilled per second.
164+
Remaining atomic.Int32 // Point balance remaining.
165+
Threshold int32 // Minimum point balance where we would consider handling with a "pause".
166+
Limit int32 // Maximum points available.
167+
RefillRate int32 // Number of points refilled per second.
148168
}
149169
Balance represents the information of point values and keeps track of items
150170
such as the remaining points, threshold, limit, and refill rate.
@@ -167,14 +187,14 @@ func (b *Balance) Update(points int32)
167187
Update accepts a new value of remaining points to store.
168188
169189
type Semaphore struct {
170-
*Balance // Point information and tracking.
190+
*Balance // Point information and tracking.
171191
172-
PauseFunc func(int32, time.Duration) // Optional callback for when pause happens.
173-
ResumeFunc func() // Optional callback for when resume happens.
174-
PauseBuffer time.Duration // Buffer of time to wait before attempting to re-aquire a spot.
175-
AquireBuffer time.Duration // Buffer of time to extend the pause with.
192+
PauseFunc func(int32, time.Duration) // Optional callback for when pause happens.
193+
ResumeFunc func() // Optional callback for when resume happens.
194+
PauseBuffer time.Duration // Buffer of time to wait before attempting to re-aquire a spot.
195+
AquireBuffer time.Duration // Buffer of time to extend the pause with.
176196
177-
// Has unexported fields.
197+
// Has unexported fields.
178198
}
179199
Semaphore is responsible regulating when to pause and resume processing
180200
of Goroutines. Points remaining, point thresholds, and point refill rates

balance.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ import (
55
"time"
66
)
77

8+
// ErrPts is the points value to pass in if a network or other error happens.
9+
// Essentially to be used for situations where no response containing point
10+
// information was returned. This is used to know if the Update method should
11+
// actually update the remaining point balance or not.
12+
var ErrPts int32 = -1
13+
814
// Balance represents the information of point values and keeps track of
915
// items such as the remaining points, threshold, limit, and refill rate.
1016
type Balance struct {
@@ -28,7 +34,9 @@ func NewBalance(thld int32, max int32, rr int32) *Balance {
2834

2935
// Update accepts a new value of remaining points to store.
3036
func (b *Balance) Update(points int32) {
31-
b.Remaining.Store(points)
37+
if points > ErrPts {
38+
b.Remaining.Store(points)
39+
}
3240
}
3341

3442
// RefillDuration accounts for the remaining points, the limit, and the refill rate to

balance_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,21 @@ func TestNewBalance(t *testing.T) {
6868
t.Errorf("Balance.Remaining = %d; want %d", b.Remaining.Load(), limit)
6969
}
7070
}
71+
72+
// TestUpdate should ensure we only update points above a value of ErrPts.
73+
func TestUpdate(t *testing.T) {
74+
b := newBalance()
75+
b.Update(500)
76+
77+
var expts int32 = 500
78+
rpts := b.Remaining.Load()
79+
if rpts != expts {
80+
t.Errorf("Balance.Remaining = %d; want %d", rpts, expts)
81+
}
82+
83+
b.Update(ErrPts)
84+
rpts = b.Remaining.Load()
85+
if rpts != expts {
86+
t.Errorf("Balance.Remaining = %d; want %d", rpts, expts)
87+
}
88+
}

semaphore.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import (
77
)
88

99
var (
10-
DefaultAquireBuffer = 200 * time.Millisecond
11-
DefaultPauseBuffer = 1 * time.Second
10+
DefaultAquireBuffer = 200 * time.Millisecond // Default aquire throttle duration.
11+
DefaultPauseBuffer = 1 * time.Second // Default pause buffer to append to pause duration calculation.
1212
)
1313

1414
// Semaphore is responsible regulating when to pause and resume processing of Goroutines.

0 commit comments

Comments
 (0)