Skip to content

Commit 56515a5

Browse files
committed
chore(PubSub): Move pool to its own package
1 parent 367889a commit 56515a5

File tree

3 files changed

+10
-8
lines changed

3 files changed

+10
-8
lines changed

pkg/pubsub/kafka/partitionconsumer.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ import (
77

88
sdkkafka "github.com/scribd/go-sdk/pkg/instrumentation/kafka"
99
sdklogger "github.com/scribd/go-sdk/pkg/logger"
10+
"github.com/scribd/go-sdk/pkg/pubsub/pool"
1011
)
1112

1213
type pconsumer struct {
13-
pool *pool
14+
pool *pool.Pool
1415

1516
quit chan struct{}
1617
done chan struct{}

pkg/pubsub/kafka/subscriber.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
sdkkafka "github.com/scribd/go-sdk/pkg/instrumentation/kafka"
1414
sdklogger "github.com/scribd/go-sdk/pkg/logger"
15+
"github.com/scribd/go-sdk/pkg/pubsub/pool"
1516
)
1617

1718
type (
@@ -152,7 +153,7 @@ func (s *Subscriber) assigned(_ context.Context, cl *kgo.Client, assigned map[st
152153
pc := pconsumer{
153154
quit: make(chan struct{}),
154155
recs: make(chan *sdkkafka.FetchPartition),
155-
pool: newPool(s.numWorkers),
156+
pool: pool.New(s.numWorkers),
156157
done: make(chan struct{}),
157158
}
158159
s.consumers[topic][partition] = pc

pkg/pubsub/kafka/pool.go renamed to pkg/pubsub/pool/pool.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
1-
package kafka
1+
package pool
22

3-
type pool struct {
3+
type Pool struct {
44
sem chan struct{}
55
work chan func()
66
}
77

8-
func newPool(size int) *pool {
9-
p := &pool{
8+
func New(size int) *Pool {
9+
p := &Pool{
1010
sem: make(chan struct{}, size),
1111
work: make(chan func()),
1212
}
1313

1414
return p
1515
}
1616

17-
func (p *pool) Schedule(task func()) {
17+
func (p *Pool) Schedule(task func()) {
1818
select {
1919
case p.work <- task:
2020
return
@@ -23,7 +23,7 @@ func (p *pool) Schedule(task func()) {
2323
}
2424
}
2525

26-
func (p *pool) worker(task func()) {
26+
func (p *Pool) worker(task func()) {
2727
defer func() { <-p.sem }()
2828

2929
for {

0 commit comments

Comments
 (0)