Skip to content

Commit f24837d

Browse files
committed
fix linters
1 parent a89720e commit f24837d

File tree

7 files changed

+188
-291
lines changed

7 files changed

+188
-291
lines changed

internal/topic/topicreaderinternal/committer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ type committer struct {
5454
commits CommitRanges
5555
}
5656

57-
func newCommitter(tracer trace.Topic, lifeContext context.Context, mode PublicCommitMode, send sendMessageToServerFunc) *committer {
57+
func newCommitter(tracer trace.Topic, lifeContext context.Context, mode PublicCommitMode, send sendMessageToServerFunc) *committer { //nolint:lll,revive
5858
res := &committer{
5959
mode: mode,
6060
clock: clockwork.NewRealClock(),
@@ -141,7 +141,7 @@ func (c *committer) pushCommitsLoop(ctx context.Context) {
141141

142142
commits.optimize()
143143

144-
onDone := trace.TopicOnReaderStreamSendCommitMessage(c.tracer, &commits)
144+
onDone := trace.TopicOnReaderSendCommitMessage(c.tracer, &commits)
145145
err := sendCommitMessage(c.send, commits)
146146
onDone(err)
147147

internal/topic/topicreaderinternal/stream_reader_impl.go

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func (r *topicStreamReaderImpl) ReadMessageBatch(
164164
ctx context.Context,
165165
opts ReadMessageBatchOptions,
166166
) (batch *PublicBatch, err error) {
167-
onDone := trace.TopicOnReaderStreamReadMessages(
167+
onDone := trace.TopicOnReaderReadMessages(
168168
r.cfg.Tracer,
169169
ctx,
170170
opts.MinCount,
@@ -330,7 +330,7 @@ func (r *topicStreamReaderImpl) Commit(ctx context.Context, commitRange commitRa
330330
session = *commitRange.partitionSession
331331
}
332332

333-
onDone := trace.TopicOnReaderStreamCommit(
333+
onDone := trace.TopicOnReaderCommit(
334334
r.cfg.Tracer,
335335
ctx,
336336
session.Topic,
@@ -371,7 +371,7 @@ func (r *topicStreamReaderImpl) checkCommitRange(commitRange commitRange) error
371371
func (r *topicStreamReaderImpl) send(msg rawtopicreader.ClientMessage) error {
372372
err := r.stream.Send(msg)
373373
if err != nil {
374-
trace.TopicOnReaderStreamError(r.cfg.Tracer, r.readConnectionID, err)
374+
trace.TopicOnReaderError(r.cfg.Tracer, r.readConnectionID, err)
375375
_ = r.CloseWithError(r.ctx, err)
376376
}
377377
return err
@@ -406,7 +406,7 @@ func (r *topicStreamReaderImpl) setStarted() error {
406406
func (r *topicStreamReaderImpl) initSession() (err error) {
407407
initMessage := r.cfg.initMessage()
408408

409-
onDone := trace.TopicOnReaderStreamInit(r.cfg.Tracer, r.readConnectionID, initMessage)
409+
onDone := trace.TopicOnReaderInit(r.cfg.Tracer, r.readConnectionID, initMessage)
410410
defer func() {
411411
onDone(r.readConnectionID, err)
412412
}()
@@ -453,9 +453,9 @@ func (r *topicStreamReaderImpl) readMessagesLoop(ctx context.Context) {
453453
for {
454454
serverMessage, err := r.stream.Recv()
455455
if err != nil {
456-
trace.TopicOnReaderStreamError(r.cfg.Tracer, r.readConnectionID, err)
456+
trace.TopicOnReaderError(r.cfg.Tracer, r.readConnectionID, err)
457457
if errors.Is(err, rawtopicreader.ErrUnexpectedMessageType) {
458-
trace.TopicOnReaderStreamUnknownGrpcMessage(r.cfg.Tracer, r.readConnectionID, err)
458+
trace.TopicOnReaderUnknownGrpcMessage(r.cfg.Tracer, r.readConnectionID, err)
459459
// new messages can be added to protocol, it must be backward compatible to old programs
460460
// and skip message is safe
461461
continue
@@ -497,7 +497,7 @@ func (r *topicStreamReaderImpl) readMessagesLoop(ctx context.Context) {
497497
case *rawtopicreader.UpdateTokenResponse:
498498
// skip
499499
default:
500-
trace.TopicOnReaderStreamUnknownGrpcMessage(
500+
trace.TopicOnReaderUnknownGrpcMessage(
501501
r.cfg.Tracer,
502502
r.readConnectionID,
503503
xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
@@ -537,7 +537,7 @@ func (r *topicStreamReaderImpl) dataRequestLoop(ctx context.Context) {
537537
}
538538

539539
resCapacity := r.addRestBufferBytes(sum)
540-
trace.TopicOnReaderStreamSentDataRequest(r.cfg.Tracer, r.readConnectionID, sum, resCapacity)
540+
trace.TopicOnReaderSentDataRequest(r.cfg.Tracer, r.readConnectionID, sum, resCapacity)
541541
if err := r.sendDataRequest(sum); err != nil {
542542
return
543543
}
@@ -577,7 +577,7 @@ func (r *topicStreamReaderImpl) updateTokenLoop(ctx context.Context) {
577577

578578
func (r *topicStreamReaderImpl) onReadResponse(msg *rawtopicreader.ReadResponse) (err error) {
579579
resCapacity := r.addRestBufferBytes(-msg.BytesSize)
580-
onDone := trace.TopicOnReaderStreamReceiveDataResponse(r.cfg.Tracer, r.readConnectionID, resCapacity, msg)
580+
onDone := trace.TopicOnReaderReceiveDataResponse(r.cfg.Tracer, r.readConnectionID, resCapacity, msg)
581581
defer func() {
582582
onDone(err)
583583
}()
@@ -624,7 +624,7 @@ func (r *topicStreamReaderImpl) onReadResponse(msg *rawtopicreader.ReadResponse)
624624
}
625625

626626
func (r *topicStreamReaderImpl) CloseWithError(ctx context.Context, reason error) (closeErr error) {
627-
onDone := trace.TopicOnReaderStreamClose(r.cfg.Tracer, r.readConnectionID, reason)
627+
onDone := trace.TopicOnReaderClose(r.cfg.Tracer, r.readConnectionID, reason)
628628
defer onDone(closeErr)
629629

630630
isFirstClose := false
@@ -672,7 +672,7 @@ func (r *topicStreamReaderImpl) onCommitResponse(msg *rawtopicreader.CommitOffse
672672
}
673673
partition.setCommittedOffset(commit.CommittedOffset)
674674

675-
trace.TopicOnReaderStreamCommittedNotify(
675+
trace.TopicOnReaderCommittedNotify(
676676
r.cfg.Tracer,
677677
r.readConnectionID,
678678
partition.Topic,
@@ -688,7 +688,7 @@ func (r *topicStreamReaderImpl) onCommitResponse(msg *rawtopicreader.CommitOffse
688688
}
689689

690690
func (r *topicStreamReaderImpl) updateToken(ctx context.Context) {
691-
onUpdateToken := trace.TopicOnReaderStreamUpdateToken(
691+
onUpdateToken := trace.TopicOnReaderUpdateToken(
692692
r.cfg.Tracer,
693693
r.readConnectionID,
694694
)
@@ -765,11 +765,7 @@ func (r *topicStreamReaderImpl) onStartPartitionSessionRequestFromBuffer(
765765
respMessage.CommitOffset.FromInt64Pointer(commitOffset)
766766
}
767767

768-
if err = r.send(respMessage); err != nil {
769-
return err
770-
}
771-
772-
return nil
768+
return r.send(respMessage)
773769
}
774770

775771
func (r *topicStreamReaderImpl) onStopPartitionSessionRequest(m *rawtopicreader.StopPartitionSessionRequest) error {

internal/topic/topicreaderinternal/stream_reader_impl_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) {
145145
readMessagesCtx, readMessagesCtxCancel := xcontext.WithErrCancel(context.Background())
146146
committedOffset := int64(222)
147147

148-
e.reader.cfg.Tracer.OnReaderPartitionReadStopResponse = func(info trace.TopicReaderPartitionReadStopResponseStartInfo) func(doneInfo trace.TopicReaderPartitionReadStopResponseDoneInfo) {
148+
e.reader.cfg.Tracer.OnReaderPartitionReadStopResponse = func(info trace.TopicReaderPartitionReadStopResponseStartInfo) func(doneInfo trace.TopicReaderPartitionReadStopResponseDoneInfo) { //nolint:lll
149149
expected := trace.TopicReaderPartitionReadStopResponseStartInfo{
150150
ReaderConnectionID: e.reader.readConnectionID,
151151
PartitionContext: e.partitionSession.ctx,

internal/topic/topicreaderinternal/trace_wrappers.go

Lines changed: 0 additions & 49 deletions
This file was deleted.

topic/reader_e2e_test.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ package topic_test
66
import (
77
"context"
88
"fmt"
9-
"io"
10-
"io/ioutil"
119
"runtime/pprof"
1210
"sync/atomic"
1311
"testing"
@@ -23,7 +21,6 @@ import (
2321
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
2422
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar"
2523
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes"
26-
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2724
)
2825

2926
func TestReadMessages(t *testing.T) {
@@ -188,26 +185,6 @@ WITH (
188185
}
189186

190187
func createReader(t *testing.T, db ydb.Connection, opts ...topicoptions.ReaderOption) *topicreader.Reader {
191-
js := func(v interface{ JSONData() io.Reader }) string {
192-
data, err := ioutil.ReadAll(v.JSONData())
193-
if err != nil {
194-
t.Fatal(err)
195-
}
196-
return string(data)
197-
}
198-
199-
tracer := trace.Topic{
200-
OnReadStreamRawSent: func(info trace.OnReadStreamRawSentInfo) {
201-
t.Logf("sent: %v %v\n%v", info.ClientMessage.Type(), info.Error, js(info.ClientMessage))
202-
},
203-
OnReadStreamRawReceived: func(info trace.OnReadStreamRawReceivedInfo) {
204-
t.Logf("received: %v %v\n%v", info.ServerMessage.Type(), info.Error, js(info.ServerMessage))
205-
},
206-
}
207-
_ = tracer
208-
209-
opts = append(opts[:len(opts):len(opts)], topicoptions.WithTracer(tracer))
210-
211188
topicPath := db.Name() + "/test/feed"
212189
reader, err := db.Topic().StartReader("test", []topicoptions.ReadSelector{
213190
{

0 commit comments

Comments
 (0)