Skip to content

Commit c66c90a

Browse files
committed
small fixes
1 parent 569e38a commit c66c90a

File tree

7 files changed

+37
-57
lines changed

7 files changed

+37
-57
lines changed

internal/grpcwrapper/rawtopic/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@ package rawtopic
33
import (
44
"context"
55
"fmt"
6-
"github.com/google/uuid"
7-
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
86

7+
"github.com/google/uuid"
98
"github.com/ydb-platform/ydb-go-genproto/Ydb_Topic_V1"
109

1110
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
1211
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter"
1312
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
1413
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
14+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
1515
)
1616

1717
type Client struct {

internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package rawtopicwriter
33
import (
44
"errors"
55
"fmt"
6-
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
76
"reflect"
87
"sync"
98
"sync/atomic"
@@ -15,6 +14,7 @@ import (
1514
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
1615
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
1716
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
17+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
1818
)
1919

2020
var errConcurencyReadDenied = xerrors.Wrap(errors.New("ydb: read from rawtopicwriter in parallel"))
@@ -38,6 +38,7 @@ type StreamWriter struct {
3838
sessionID string
3939
}
4040

41+
//nolint:funlen
4142
func (w *StreamWriter) Recv() (ServerMessage, error) {
4243
readCnt := atomic.AddInt32(&w.readCounter, 1)
4344
defer atomic.AddInt32(&w.readCounter, -1)
@@ -46,22 +47,27 @@ func (w *StreamWriter) Recv() (ServerMessage, error) {
4647
return nil, xerrors.WithStackTrace(errConcurencyReadDenied)
4748
}
4849

49-
grpcMsg, err := w.Stream.Recv()
50+
grpcMsg, sendErr := w.Stream.Recv()
5051
w.readMessagesCount++
51-
trace.TopicOnWriterReceiveGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, err)
52-
if err != nil {
53-
if !xerrors.IsErrorFromServer(err) {
54-
err = xerrors.Transport(err)
52+
defer func() {
53+
// defer needs for set good session id on first init response before trace the message
54+
trace.TopicOnWriterReceiveGRPCMessage(
55+
w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
56+
)
57+
}()
58+
if sendErr != nil {
59+
if !xerrors.IsErrorFromServer(sendErr) {
60+
sendErr = xerrors.Transport(sendErr)
5561
}
5662

5763
return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
5864
"ydb: failed to read grpc message from writer stream: %w",
59-
err,
65+
sendErr,
6066
)))
6167
}
6268

6369
var meta rawtopiccommon.ServerMessageMetadata
64-
if err = meta.MetaFromStatusAndIssues(grpcMsg); err != nil {
70+
if err := meta.MetaFromStatusAndIssues(grpcMsg); err != nil {
6571
return nil, err
6672
}
6773
if !meta.Status.IsSuccess() {
@@ -79,7 +85,7 @@ func (w *StreamWriter) Recv() (ServerMessage, error) {
7985
case *Ydb_Topic.StreamWriteMessage_FromServer_WriteResponse:
8086
var res WriteResult
8187
res.ServerMessageMetadata = meta
82-
err = res.fromProto(v.WriteResponse)
88+
err := res.fromProto(v.WriteResponse)
8389
if err != nil {
8490
return nil, err
8591
}

internal/topic/topicwriterinternal/writer_reconnector_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"context"
77
"errors"
88
"fmt"
9-
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
109
"io"
1110
"math"
1211
"sort"
@@ -25,6 +24,7 @@ import (
2524
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
2625
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
2726
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
27+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2828
)
2929

3030
var testCommonEncoders = NewMultiEncoder()

log/topic.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ package log
22

33
import (
44
"context"
5+
"time"
6+
57
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
68
"google.golang.org/protobuf/encoding/protojson"
79
"google.golang.org/protobuf/proto"
8-
"time"
910

1011
"github.com/ydb-platform/ydb-go-sdk/v3/internal/kv"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
@@ -948,17 +949,17 @@ func (s lazyProtoStringifer) String() string {
948949
Data []byte
949950
Metadata []*Ydb_Topic.MetadataItem
950951
}
951-
storage := make([]messDataType, len(data.Messages))
952-
for i := range data.Messages {
953-
storage[i].Data = data.Messages[i].Data
952+
storage := make([]messDataType, len(data.GetMessages()))
953+
for i := range data.GetMessages() {
954+
storage[i].Data = data.GetMessages()[i].GetData()
954955
data.Messages[i] = nil
955956

956-
storage[i].Metadata = data.Messages[i].MetadataItems
957+
storage[i].Metadata = data.GetMessages()[i].GetMetadataItems()
957958
data.Messages[i].MetadataItems = nil
958959
}
959960

960961
defer func() {
961-
for i := range data.Messages {
962+
for i := range data.GetMessages() {
962963
data.Messages[i].Data = storage[i].Data
963964
data.Messages[i].MetadataItems = storage[i].Metadata
964965
}
@@ -967,5 +968,6 @@ func (s lazyProtoStringifer) String() string {
967968
}
968969

969970
res := protojson.MarshalOptions{AllowPartial: true}.Format(s.message)
971+
970972
return res
971973
}

tests/integration/topic_read_writer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"context"
99
"encoding/binary"
1010
"errors"
11-
"github.com/ydb-platform/ydb-go-sdk/v3/log"
1211
"io"
1312
"os"
1413
"runtime/pprof"
@@ -27,6 +26,7 @@ import (
2726
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
2827
"github.com/ydb-platform/ydb-go-sdk/v3/internal/version"
2928
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
29+
"github.com/ydb-platform/ydb-go-sdk/v3/log"
3030
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
3131
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar"
3232
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes"

trace/topic.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ package trace
22

33
import (
44
"context"
5-
"google.golang.org/protobuf/proto"
5+
6+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
67
)
78

89
// tool gtrace used from ./internal/cmd/gtrace
@@ -544,7 +545,7 @@ type (
544545
TopicStreamInternalID string
545546
SessionID string
546547
MessageNumber int
547-
Message proto.Message
548+
Message *Ydb_Topic.StreamWriteMessage_FromClient
548549
Error error
549550
}
550551

@@ -553,7 +554,7 @@ type (
553554
TopicStreamInternalID string
554555
SessionID string
555556
MessageNumber int
556-
Message proto.Message
557+
Message *Ydb_Topic.StreamWriteMessage_FromServer
557558
Error error
558559
}
559560

0 commit comments

Comments
 (0)