Skip to content

Commit 9df208d

Browse files
committed
A better "dequeue" abstraction defined on reviews table
1 parent 749e746 commit 9df208d

File tree

3 files changed

+55
-40
lines changed

3 files changed

+55
-40
lines changed

tests/database_test.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,6 @@ func (db *Database) WithTx(tx pgx.Tx) *Database {
4141
return initDB(pgkitDB)
4242
}
4343

44-
func (db *Database) Close() { db.DB.Conn.Close() }
45-
46-
type accountsTable struct {
47-
*pgkit.Table[Account, *Account, int64]
48-
}
49-
50-
type articlesTable struct {
51-
*pgkit.Table[Article, *Article, uint64]
52-
}
53-
54-
type reviewsTable struct {
55-
*pgkit.Table[Review, *Review, uint64]
44+
func (db *Database) Close() {
45+
db.DB.Conn.Close()
5646
}

tests/table_test.go

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"slices"
66
"sync"
77
"testing"
8-
"time"
98

109
sq "github.com/Masterminds/squirrel"
1110
"github.com/stretchr/testify/require"
@@ -183,49 +182,28 @@ func TestLockForUpdates(t *testing.T) {
183182
err = db.Reviews.Save(ctx, reviews...)
184183
require.NoError(t, err, "create review")
185184

186-
where := sq.Eq{
187-
"status": ReviewStatusPending,
188-
"deleted_at": nil,
189-
}
190-
orderBy := []string{
191-
"created_at ASC",
192-
}
193-
limit := uint64(10)
194-
195-
var processedIDs [][]uint64 = make([][]uint64, 10)
185+
var ids [][]uint64 = make([][]uint64, 10)
196186
var wg sync.WaitGroup
197187

198188
for range 10 {
199189
wg.Add(1)
200190
go func() {
201191
defer wg.Done()
202192

203-
var processReviews []*Review
193+
reviews, err := db.Reviews.DequeueForProcessing(ctx, 10)
194+
require.NoError(t, err, "dequeue reviews")
204195

205-
err := db.Reviews.LockForUpdates(ctx, where, orderBy, limit, func(reviews []*Review) {
206-
now := time.Now().UTC()
207-
for _, review := range reviews {
208-
review.Status = ReviewStatusProcessing
209-
review.ProcessedAt = &now
210-
}
211-
212-
processReviews = reviews
213-
})
214-
require.NoError(t, err, "lock for update")
215-
216-
for _, review := range processReviews {
196+
for i, review := range reviews {
217197
go worker.ProcessReview(ctx, review)
218-
}
219198

220-
for i, review := range processReviews {
221-
processedIDs[i] = append(processedIDs[i], review.ID)
199+
ids[i] = append(ids[i], review.ID)
222200
}
223201
}()
224202
}
225203
wg.Wait()
226204

227205
// Ensure that all reviews were picked up for processing exactly once.
228-
uniqueIDs := slices.Concat(processedIDs...)
206+
uniqueIDs := slices.Concat(ids...)
229207
slices.Sort(uniqueIDs)
230208
uniqueIDs = slices.Compact(uniqueIDs)
231209
require.Equal(t, 100, len(uniqueIDs), "number of unique reviews picked up for processing should be 100")

tests/tables_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package pgkit_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
sq "github.com/Masterminds/squirrel"
9+
"github.com/goware/pgkit/v2"
10+
)
11+
12+
type accountsTable struct {
13+
*pgkit.Table[Account, *Account, int64]
14+
}
15+
16+
type articlesTable struct {
17+
*pgkit.Table[Article, *Article, uint64]
18+
}
19+
20+
type reviewsTable struct {
21+
*pgkit.Table[Review, *Review, uint64]
22+
}
23+
24+
func (w *reviewsTable) DequeueForProcessing(ctx context.Context, limit uint64) ([]*Review, error) {
25+
var dequeued []*Review
26+
where := sq.Eq{
27+
"status": ReviewStatusPending,
28+
"deleted_at": nil,
29+
}
30+
orderBy := []string{
31+
"created_at ASC",
32+
}
33+
34+
err := w.LockForUpdates(ctx, where, orderBy, limit, func(reviews []*Review) {
35+
now := time.Now().UTC()
36+
for _, review := range reviews {
37+
review.Status = ReviewStatusProcessing
38+
review.ProcessedAt = &now
39+
}
40+
dequeued = reviews
41+
})
42+
if err != nil {
43+
return nil, fmt.Errorf("lock for updates: %w", err)
44+
}
45+
46+
return dequeued, nil
47+
}

0 commit comments

Comments
 (0)