Skip to content

Commit 61f11b5

Browse files
authored
Merge pull request #378 from ydb-platform/fix-update-token
fixed update token for topic reader
2 parents 91383d8 + ae810ef commit 61f11b5

File tree

5 files changed

+24
-3
lines changed

5 files changed

+24
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Fixed update token for topic reader
12
* Marked sessions which creates from `database/sql` driver as supported server-side session balancing
23

34
## v3.37.7

config/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ type Config struct {
3535
excludeGRPCCodesForPessimization []grpcCodes.Code
3636
}
3737

38+
func (c Config) Credentials() credentials.Credentials {
39+
return c.credentials
40+
}
41+
3842
// ExcludeGRPCCodesForPessimization defines grpc codes for exclude its from pessimization trigger
3943
func (c Config) ExcludeGRPCCodesForPessimization() []grpcCodes.Code {
4044
return c.excludeGRPCCodesForPessimization

connection.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ func (c *connection) Scripting() scripting.Client {
330330
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
331331
func (c *connection) Topic() topic.Client {
332332
c.topicOnce.Init(func() closeFunc {
333-
c.topic = topicclientinternal.New(c.balancer, c.topicOptions...)
333+
c.topic = topicclientinternal.New(c.balancer, c.config.Credentials(), c.topicOptions...)
334334
return c.topic.Close
335335
})
336336
return c.topic

internal/topic/topicclientinternal/client.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/ydb-platform/ydb-go-genproto/Ydb_Topic_V1"
77
"google.golang.org/grpc"
88

9+
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
910
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic"
1011
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
@@ -18,11 +19,12 @@ import (
1819

1920
type Client struct {
2021
cfg topic.Config
22+
cred credentials.Credentials
2123
defaultOperationParams rawydb.OperationParams
2224
rawClient rawtopic.Client
2325
}
2426

25-
func New(conn grpc.ClientConnInterface, opts ...topicoptions.TopicOption) *Client {
27+
func New(conn grpc.ClientConnInterface, cred credentials.Credentials, opts ...topicoptions.TopicOption) *Client {
2628
rawClient := rawtopic.NewClient(Ydb_Topic_V1.NewTopicServiceClient(conn))
2729

2830
cfg := newTopicConfig(opts...)
@@ -32,6 +34,7 @@ func New(conn grpc.ClientConnInterface, opts ...topicoptions.TopicOption) *Clien
3234

3335
return &Client{
3436
cfg: cfg,
37+
cred: cred,
3538
defaultOperationParams: defaultOperationParams,
3639
rawClient: rawClient,
3740
}
@@ -160,7 +163,10 @@ func (c *Client) StartReader(
160163
return c.rawClient.StreamRead(ctx)
161164
}
162165

163-
defaultOpts := []topicoptions.ReaderOption{topicoptions.WithCommonConfig(c.cfg.Common)}
166+
defaultOpts := []topicoptions.ReaderOption{
167+
topicoptions.WithCommonConfig(c.cfg.Common),
168+
topicreaderinternal.WithCredentials(c.cred),
169+
}
164170
opts = append(defaultOpts, opts...)
165171

166172
internalReader := topicreaderinternal.NewReader(connector, consumer, readSelectors, opts...)

internal/topic/topicreaderinternal/reader.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"sync"
88
"time"
99

10+
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
1011
"github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
@@ -192,6 +193,15 @@ type ReaderConfig struct {
192193
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
193194
type PublicReaderOption func(cfg *ReaderConfig)
194195

196+
func WithCredentials(cred credentials.Credentials) PublicReaderOption {
197+
return func(cfg *ReaderConfig) {
198+
if cred == nil {
199+
cred = credentials.NewAnonymousCredentials()
200+
}
201+
cfg.Cred = cred
202+
}
203+
}
204+
195205
func convertNewParamsToStreamConfig(
196206
consumer string,
197207
readSelectors []PublicReadSelector,

0 commit comments

Comments
 (0)