@@ -5,17 +5,28 @@ import (
55 "sync/atomic"
66
77 "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicreaderinternal"
8+ "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
89)
910
10- // Reader allow to read message from YDB topics
11- // reader methods must not call concurrency
11+ // Reader allow to read message from YDB topics.
12+ // ReadMessage or ReadMessageBatch can call concurrency with Commit, other concurrency call is denied.
13+ //
14+ // In other words you can have one goroutine for read messages and one goroutine for commit messages.
15+ //
16+ // Concurrency table
17+ // | Method | ReadMessage | ReadMessageBatch | Commit | Close |
18+ // | ReadMessage | - | - | + | - |
19+ // | ReadMessageBatch | - | - | + | - |
20+ // | Commit | + | + | - | - |
21+ // | Close | - | - | - | - |
1222//
1323// Experimental
1424//
1525// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
1626type Reader struct {
17- reader topicreaderinternal.Reader
18- inFlyght int64
27+ reader topicreaderinternal.Reader
28+ readInFlyght int32
29+ commitInFlyght int32
1930}
2031
2132// NewReader
@@ -34,10 +45,10 @@ func NewReader(internalReader topicreaderinternal.Reader) *Reader {
3445//
3546// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
3647func (r * Reader ) ReadMessage (ctx context.Context ) (* Message , error ) {
37- if err := r .inCall (); err != nil {
48+ if err := r .inCall (& r . readInFlyght ); err != nil {
3849 return nil , err
3950 }
40- defer r .outCall ()
51+ defer r .outCall (& r . readInFlyght )
4152
4253 return r .reader .ReadMessage (ctx )
4354}
@@ -62,10 +73,10 @@ type MessageContentUnmarshaler = topicreaderinternal.PublicMessageContentUnmarsh
6273//
6374// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
6475func (r * Reader ) Commit (ctx context.Context , obj CommitRangeGetter ) error {
65- if err := r .inCall (); err != nil {
76+ if err := r .inCall (& r . commitInFlyght ); err != nil {
6677 return err
6778 }
68- defer r .outCall ()
79+ defer r .outCall (& r . commitInFlyght )
6980
7081 return r .reader .Commit (ctx , obj )
7182}
@@ -86,10 +97,10 @@ type CommitRangeGetter = topicreaderinternal.PublicCommitRangeGetter
8697//
8798// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
8899func (r * Reader ) ReadMessageBatch (ctx context.Context , opts ... ReadBatchOption ) (* Batch , error ) {
89- if err := r .inCall (); err != nil {
100+ if err := r .inCall (& r . readInFlyght ); err != nil {
90101 return nil , err
91102 }
92- defer r .outCall ()
103+ defer r .outCall (& r . readInFlyght )
93104
94105 return r .reader .ReadMessageBatch (ctx , opts ... )
95106}
@@ -116,25 +127,33 @@ type ReadBatchOption = topicreaderinternal.PublicReadBatchOption
116127//
117128// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
118129func (r * Reader ) Close (ctx context.Context ) error {
119- if err := r .inCall (); err != nil {
130+ // close must be non-concurrent with read and commit
131+
132+ if err := r .inCall (& r .readInFlyght ); err != nil {
120133 return err
121134 }
122- defer r .outCall ()
135+ defer r .outCall (& r .readInFlyght )
136+
137+ if err := r .inCall (& r .commitInFlyght ); err != nil {
138+ return err
139+ }
140+ defer r .outCall (& r .commitInFlyght )
123141
124142 return r .reader .Close (ctx )
125143}
126144
127- func (r * Reader ) inCall () error {
128- if atomic .CompareAndSwapInt64 ( & r . inFlyght , 0 , 1 ) {
145+ func (r * Reader ) inCall (inFlight * int32 ) error {
146+ if atomic .CompareAndSwapInt32 ( inFlight , 0 , 1 ) {
129147 return nil
130148 }
131149
132- return ErrConcurrencyCall
150+ return xerrors . WithStackTrace ( ErrConcurrencyCall )
133151}
134152
135- func (r * Reader ) outCall () {
136- if atomic .CompareAndSwapInt64 ( & r . inFlyght , 1 , 0 ) {
153+ func (r * Reader ) outCall (inFlight * int32 ) {
154+ if atomic .CompareAndSwapInt32 ( inFlight , 1 , 0 ) {
137155 return
138156 }
139- panic ("ydb: reader outcall without in call, must be never" )
157+
158+ panic ("ydb: topic reader out call without in call, must be never" )
140159}
0 commit comments