Skip to content

Commit 298b63e

Browse files
authored
Merge branch 'master' into make-some-options-public
2 parents 983853d + 4ed0f00 commit 298b63e

File tree

11 files changed

+335
-42
lines changed

11 files changed

+335
-42
lines changed

internal/query/options/retry.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ import (
88
)
99

1010
var (
11-
_ DoOption = retryOptionsOption(nil)
12-
_ DoOption = traceOption{}
11+
_ DoOption = RetryOptionsOption(nil)
12+
_ DoOption = TraceOption{}
1313

14-
_ DoTxOption = retryOptionsOption(nil)
15-
_ DoTxOption = traceOption{}
14+
_ DoTxOption = RetryOptionsOption(nil)
15+
_ DoTxOption = TraceOption{}
1616
_ DoTxOption = doTxSettingsOption{}
1717
)
1818

@@ -35,8 +35,8 @@ type (
3535
txSettings tx.Settings
3636
}
3737

38-
retryOptionsOption []retry.Option
39-
traceOption struct {
38+
RetryOptionsOption []retry.Option
39+
TraceOption struct {
4040
t *trace.Query
4141
}
4242
doTxSettingsOption struct {
@@ -56,19 +56,19 @@ func (s *doTxSettings) TxSettings() tx.Settings {
5656
return s.txSettings
5757
}
5858

59-
func (opt traceOption) applyDoOption(s *doSettings) {
59+
func (opt TraceOption) applyDoOption(s *doSettings) {
6060
s.trace = s.trace.Compose(opt.t)
6161
}
6262

63-
func (opt traceOption) applyDoTxOption(s *doTxSettings) {
63+
func (opt TraceOption) applyDoTxOption(s *doTxSettings) {
6464
opt.applyDoOption(&s.doSettings)
6565
}
6666

67-
func (opts retryOptionsOption) applyDoOption(s *doSettings) {
67+
func (opts RetryOptionsOption) applyDoOption(s *doSettings) {
6868
s.retryOpts = append(s.retryOpts, opts...)
6969
}
7070

71-
func (opts retryOptionsOption) applyDoTxOption(s *doTxSettings) {
71+
func (opts RetryOptionsOption) applyDoTxOption(s *doTxSettings) {
7272
opts.applyDoOption(&s.doSettings)
7373
}
7474

@@ -80,19 +80,19 @@ func WithTxSettings(txSettings tx.Settings) doTxSettingsOption {
8080
return doTxSettingsOption{txSettings: txSettings}
8181
}
8282

83-
func WithIdempotent() retryOptionsOption {
83+
func WithIdempotent() RetryOptionsOption {
8484
return []retry.Option{retry.WithIdempotent(true)}
8585
}
8686

87-
func WithLabel(lbl string) retryOptionsOption {
87+
func WithLabel(lbl string) RetryOptionsOption {
8888
return []retry.Option{retry.WithLabel(lbl)}
8989
}
9090

91-
func WithTrace(t *trace.Query) traceOption {
92-
return traceOption{t: t}
91+
func WithTrace(t *trace.Query) TraceOption {
92+
return TraceOption{t: t}
9393
}
9494

95-
func WithRetryBudget(b budget.Budget) retryOptionsOption {
95+
func WithRetryBudget(b budget.Budget) RetryOptionsOption {
9696
return []retry.Option{retry.WithBudget(b)}
9797
}
9898

internal/query/scanner/indexed_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ func TestIndexed(t *testing.T) {
399399
{func(v uint64) *uint64 { return &v }(100500)},
400400
{func(v int64) *int64 { return &v }(100500)},
401401
{func(v int32) *int32 { return &v }(100500)},
402-
{func(v time.Time) *time.Time { return &v }(time.Unix(8683200000, 0))},
402+
{func(v time.Time) *time.Time { return &v }(time.Unix(8683200000, 0).UTC())},
403403
},
404404
},
405405
{

internal/query/scanner/named_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ func TestNamed(t *testing.T) {
411411
{func(v uint64) *uint64 { return &v }(100500)},
412412
{func(v int64) *int64 { return &v }(100500)},
413413
{func(v int32) *int32 { return &v }(100500)},
414-
{func(v time.Time) *time.Time { return &v }(time.Unix(8683200000, 0))},
414+
{func(v time.Time) *time.Time { return &v }(time.Unix(8683200000, 0).UTC())},
415415
},
416416
},
417417
{

internal/query/scanner/struct_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@ func TestStruct(t *testing.T) {
477477
Int8Int32: 123,
478478
Int8Int16: 123,
479479
BoolBool: true,
480-
DateTime: time.Unix(8683200000, 0),
480+
DateTime: time.Unix(8683200000, 0).UTC(),
481481
DatetimeTime: time.Unix(100500, 0),
482482
TimestampTime: time.Unix(12345678987, 654321000),
483483
}, dst)

internal/topic/topicreaderinternal/committer.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package topicreaderinternal
33
import (
44
"context"
55
"errors"
6-
"fmt"
76
"sync/atomic"
87
"time"
98

@@ -53,12 +52,17 @@ type committer struct {
5352
commits CommitRanges
5453
}
5554

56-
func newCommitterStopped(tracer *trace.Topic, lifeContext context.Context, mode PublicCommitMode, send sendMessageToServerFunc, readerID int64) *committer { //nolint:lll,revive
55+
func newCommitterStopped(
56+
tracer *trace.Topic,
57+
lifeContext context.Context, //nolint:revive
58+
mode PublicCommitMode,
59+
send sendMessageToServerFunc,
60+
) *committer {
5761
res := &committer{
5862
mode: mode,
5963
clock: clockwork.NewRealClock(),
6064
send: send,
61-
backgroundWorker: *background.NewWorker(lifeContext, fmt.Sprintf("ydb-topic-reader-committer: %v", readerID)),
65+
backgroundWorker: *background.NewWorker(lifeContext, "ydb-topic-reader-committer"),
6266
tracer: tracer,
6367
}
6468
res.initChannels()

internal/topic/topicreaderinternal/committer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ func TestCommitterBuffer(t *testing.T) {
379379
func newTestCommitter(ctx context.Context, t testing.TB) *committer {
380380
res := newCommitterStopped(&trace.Topic{}, ctx, CommitModeAsync, func(msg rawtopicreader.ClientMessage) error {
381381
return nil
382-
}, -1)
382+
})
383383
res.Start()
384384
t.Cleanup(func() {
385385
if err := res.Close(ctx, errors.New("test committer closed")); err != nil {

internal/topic/topicreaderinternal/stream_reader_impl.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,9 @@ func newTopicStreamReaderStopped(
156156
rawMessagesFromBuffer: make(chan rawtopicreader.ServerMessage, 1),
157157
}
158158

159-
res.backgroundWorkers = *background.NewWorker(stopPump, fmt.Sprintf(
160-
"topic-reader-stream-background: %v",
161-
res.readerID,
162-
))
159+
res.backgroundWorkers = *background.NewWorker(stopPump, "topic-reader-stream-background")
163160

164-
res.committer = newCommitterStopped(cfg.Trace, labeledContext, cfg.CommitMode, res.send, res.readerID)
161+
res.committer = newCommitterStopped(cfg.Trace, labeledContext, cfg.CommitMode, res.send)
165162
res.committer.BufferTimeLagTrigger = cfg.CommitterBatchTimeLag
166163
res.committer.BufferCountTrigger = cfg.CommitterBatchCounterTrigger
167164
res.sessionController.init()

internal/topic/topicwriterinternal/writer_single_stream.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ func newSingleStreamWriterStopped(
8484
) *SingleStreamWriter {
8585
return &SingleStreamWriter{
8686
cfg: cfg,
87-
background: *background.NewWorker(xcontext.ValueOnly(ctxForPProfLabelsOnly), fmt.Sprintf(
88-
"ydb-topic-stream-writer-background: %v",
89-
cfg.reconnectorInstanceID,
90-
)),
87+
background: *background.NewWorker(
88+
xcontext.ValueOnly(ctxForPProfLabelsOnly),
89+
"ydb-topic-stream-writer-background",
90+
),
9191
closeCompleted: make(empty.Chan),
9292
}
9393
}

0 commit comments

Comments
 (0)