Skip to content

Commit 26901be

Browse files
authored
Merge pull request #319 from ydb-platform/allow-read-and-commit-in-parallel
allow parallel read and commit with topic api reader
2 parents 420b7a1 + 5ad0a48 commit 26901be

File tree

3 files changed

+40
-19
lines changed

3 files changed

+40
-19
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Allow read and commit messages in parallel
2+
13
## v3.31.0
24
* Extended the ydb.Connection interface with experimental db.Topic() client (control plane and reader API)
35
* Removed `ydb.RegisterParser()` function (was needed for `database/sql` driver outside `ydb-go-sdk` repository, necessity of `ydb.RegisterParser()` disappeared with implementation `database/sql` driver in same repository)

topic/topicreader/errors.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,4 @@ var ErrUnexpectedCodec = topicreaderinternal.PublicErrUnexpectedCodec
2020
// Experimental
2121
//
2222
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
23-
var ErrConcurrencyCall = xerrors.Wrap(errors.New("concurrency call"))
23+
var ErrConcurrencyCall = xerrors.Wrap(errors.New("ydb: concurrency call denied"))

topic/topicreader/reader.go

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,28 @@ import (
55
"sync/atomic"
66

77
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicreaderinternal"
8+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
89
)
910

10-
// Reader allow to read message from YDB topics
11-
// reader methods must not call concurrency
11+
// Reader allow to read message from YDB topics.
12+
// ReadMessage or ReadMessageBatch can call concurrency with Commit, other concurrency call is denied.
13+
//
14+
// In other words you can have one goroutine for read messages and one goroutine for commit messages.
15+
//
16+
// Concurrency table
17+
// | Method | ReadMessage | ReadMessageBatch | Commit | Close |
18+
// | ReadMessage | - | - | + | - |
19+
// | ReadMessageBatch | - | - | + | - |
20+
// | Commit | + | + | - | - |
21+
// | Close | - | - | - | - |
1222
//
1323
// Experimental
1424
//
1525
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
1626
type Reader struct {
17-
reader topicreaderinternal.Reader
18-
inFlyght int64
27+
reader topicreaderinternal.Reader
28+
readInFlyght int32
29+
commitInFlyght int32
1930
}
2031

2132
// NewReader
@@ -34,10 +45,10 @@ func NewReader(internalReader topicreaderinternal.Reader) *Reader {
3445
//
3546
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
3647
func (r *Reader) ReadMessage(ctx context.Context) (*Message, error) {
37-
if err := r.inCall(); err != nil {
48+
if err := r.inCall(&r.readInFlyght); err != nil {
3849
return nil, err
3950
}
40-
defer r.outCall()
51+
defer r.outCall(&r.readInFlyght)
4152

4253
return r.reader.ReadMessage(ctx)
4354
}
@@ -62,10 +73,10 @@ type MessageContentUnmarshaler = topicreaderinternal.PublicMessageContentUnmarsh
6273
//
6374
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
6475
func (r *Reader) Commit(ctx context.Context, obj CommitRangeGetter) error {
65-
if err := r.inCall(); err != nil {
76+
if err := r.inCall(&r.commitInFlyght); err != nil {
6677
return err
6778
}
68-
defer r.outCall()
79+
defer r.outCall(&r.commitInFlyght)
6980

7081
return r.reader.Commit(ctx, obj)
7182
}
@@ -86,10 +97,10 @@ type CommitRangeGetter = topicreaderinternal.PublicCommitRangeGetter
8697
//
8798
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
8899
func (r *Reader) ReadMessageBatch(ctx context.Context, opts ...ReadBatchOption) (*Batch, error) {
89-
if err := r.inCall(); err != nil {
100+
if err := r.inCall(&r.readInFlyght); err != nil {
90101
return nil, err
91102
}
92-
defer r.outCall()
103+
defer r.outCall(&r.readInFlyght)
93104

94105
return r.reader.ReadMessageBatch(ctx, opts...)
95106
}
@@ -116,25 +127,33 @@ type ReadBatchOption = topicreaderinternal.PublicReadBatchOption
116127
//
117128
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
118129
func (r *Reader) Close(ctx context.Context) error {
119-
if err := r.inCall(); err != nil {
130+
// close must be non-concurrent with read and commit
131+
132+
if err := r.inCall(&r.readInFlyght); err != nil {
120133
return err
121134
}
122-
defer r.outCall()
135+
defer r.outCall(&r.readInFlyght)
136+
137+
if err := r.inCall(&r.commitInFlyght); err != nil {
138+
return err
139+
}
140+
defer r.outCall(&r.commitInFlyght)
123141

124142
return r.reader.Close(ctx)
125143
}
126144

127-
func (r *Reader) inCall() error {
128-
if atomic.CompareAndSwapInt64(&r.inFlyght, 0, 1) {
145+
func (r *Reader) inCall(inFlight *int32) error {
146+
if atomic.CompareAndSwapInt32(inFlight, 0, 1) {
129147
return nil
130148
}
131149

132-
return ErrConcurrencyCall
150+
return xerrors.WithStackTrace(ErrConcurrencyCall)
133151
}
134152

135-
func (r *Reader) outCall() {
136-
if atomic.CompareAndSwapInt64(&r.inFlyght, 1, 0) {
153+
func (r *Reader) outCall(inFlight *int32) {
154+
if atomic.CompareAndSwapInt32(inFlight, 1, 0) {
137155
return
138156
}
139-
panic("ydb: reader outcall without in call, must be never")
157+
158+
panic("ydb: topic reader out call without in call, must be never")
140159
}

0 commit comments

Comments
 (0)