Skip to content

Commit c28ae7c

Browse files
feat(topic): log endpoint for topic writer session (#2029)
Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent bf07bc2 commit c28ae7c

File tree

11 files changed

+104
-25
lines changed

11 files changed

+104
-25
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added `endpoint` (`node_id`, `address`, `location`) to topic writer init stream logs
2+
13
## v3.127.0
24
* Changed behaviour of `table.Client` and `query.Client` internal session pool. When `ydb.WithPreferredNodeID` is set and there is no idle session on preferred node, the pool closes most idle session to create a new one on the preferred node
35
* Fixed a bug when sometimes in cases of context cancellation returner error was not `errors.Is(err, context.Canceled)`

internal/conn/grpc_client_stream.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"google.golang.org/grpc"
99
"google.golang.org/grpc/metadata"
1010

11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
1314
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
@@ -37,6 +38,12 @@ func (s *grpcClientStream) Context() context.Context {
3738
return s.stream.Context()
3839
}
3940

41+
// Endpoint returns the endpoint of the connection used by this stream.
42+
// It implements the optional interface used by topic writer for session logging.
43+
func (s *grpcClientStream) Endpoint() endpoint.Endpoint {
44+
return s.parentConn.Endpoint()
45+
}
46+
4047
func (s *grpcClientStream) CloseSend() (err error) {
4148
var (
4249
ctx = s.streamCtx

internal/grpcwrapper/rawtopic/client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/google/uuid"
88
"github.com/ydb-platform/ydb-go-genproto/Ydb_Topic_V1"
99

10+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1011
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
@@ -122,11 +123,17 @@ func (c *Client) StreamWrite(
122123
)
123124
}
124125

126+
var ep endpoint.Endpoint
127+
if withEp, ok := protoResp.(interface{ Endpoint() endpoint.Endpoint }); ok {
128+
ep = withEp.Endpoint()
129+
}
130+
125131
return &rawtopicwriter.StreamWriter{
126132
Stream: protoResp,
127133
Tracer: tracer,
128134
InternalStreamID: uuid.New().String(),
129135
LogContext: &ctxStreamLifeTime,
136+
PeerEndpoint: ep,
130137
}, nil
131138
}
132139

internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
1212

13+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1314
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
1415
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
1516
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
@@ -36,8 +37,13 @@ type StreamWriter struct {
3637
writtenMessagesCount int
3738
sessionID string
3839
LogContext *context.Context
40+
// PeerEndpoint is the endpoint this stream is connected to (for logging).
41+
PeerEndpoint endpoint.Endpoint
3942
}
4043

44+
// Endpoint returns the endpoint this stream is connected to (for logging).
45+
func (w *StreamWriter) Endpoint() endpoint.Endpoint { return w.PeerEndpoint }
46+
4147
//nolint:funlen
4248
func (w *StreamWriter) Recv() (ServerMessage, error) {
4349
readCnt := atomic.AddInt32(&w.readCounter, 1)

internal/topic/topicwriterinternal/raw_topic_writer_stream_mock_test.go

Lines changed: 39 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,13 +610,19 @@ func (w *WriterReconnector) waitFirstInitResponse(ctx context.Context) error {
610610
}
611611

612612
func (w *WriterReconnector) createWriterStreamConfig(stream RawTopicWriterStream) SingleStreamWriterConfig {
613+
var ep trace.EndpointInfo
614+
if stream != nil {
615+
ep = stream.Endpoint() // endpoint.Endpoint implements trace.EndpointInfo
616+
}
617+
613618
cfg := newSingleStreamWriterConfig(
614619
w.cfg.WritersCommonConfig,
615620
stream,
616621
&w.queue,
617622
w.encodersMap,
618623
w.needReceiveLastSeqNo(),
619624
w.writerInstanceID,
625+
ep,
620626
)
621627

622628
return cfg

internal/topic/topicwriterinternal/writer_reconnector_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,7 @@ func TestWriterImpl_Reconnect(t *testing.T) {
555555
}
556556

557557
initRequest := testCreateInitRequest(w)
558+
strm.EXPECT().Endpoint().Return(nil).AnyTimes()
558559
strm.EXPECT().Send(&initRequest)
559560
strm.EXPECT().Recv().Return(nil, testErr)
560561
strm.EXPECT().CloseSend()
@@ -581,6 +582,7 @@ func TestWriterImpl_Reconnect(t *testing.T) {
581582
isFirstConnection := true
582583
newStream := func(name string) *MockRawTopicWriterStream {
583584
strm := NewMockRawTopicWriterStream(mc)
585+
strm.EXPECT().Endpoint().Return(nil).AnyTimes()
584586
initReq := testCreateInitRequest(w)
585587
if isFirstConnection {
586588
isFirstConnection = false
@@ -1194,6 +1196,7 @@ func newTestEnv(t testing.TB, options *testEnvOptions) *testEnv {
11941196

11951197
res.writer = newWriterReconnectorStopped(NewWriterReconnectorConfig(writerOptions...))
11961198

1199+
res.stream.EXPECT().Endpoint().Return(nil).AnyTimes()
11971200
res.stream.EXPECT().Recv().DoAndReturn(res.receiveMessageHandler).AnyTimes()
11981201

11991202
req := testCreateInitRequest(res.writer)

internal/topic/topicwriterinternal/writer_single_stream.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/ydb-platform/ydb-go-sdk/v3/internal/background"
1111
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
1314
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter"
1415
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
@@ -26,6 +27,7 @@ type SingleStreamWriterConfig struct {
2627
encodersMap *MultiEncoder
2728
getLastSeqNum bool
2829
reconnectorInstanceID string
30+
endpoint trace.EndpointInfo
2931
}
3032

3133
func newSingleStreamWriterConfig(
@@ -35,6 +37,7 @@ func newSingleStreamWriterConfig(
3537
encodersMap *MultiEncoder,
3638
getLastSeqNum bool,
3739
reconnectorID string,
40+
endpoint trace.EndpointInfo,
3841
) SingleStreamWriterConfig {
3942
return SingleStreamWriterConfig{
4043
WritersCommonConfig: common,
@@ -43,6 +46,7 @@ func newSingleStreamWriterConfig(
4346
encodersMap: encodersMap,
4447
getLastSeqNum: getLastSeqNum,
4548
reconnectorInstanceID: reconnectorID,
49+
endpoint: endpoint,
4650
}
4751
}
4852

@@ -125,16 +129,16 @@ func (w *SingleStreamWriter) start() {
125129
}
126130

127131
func (w *SingleStreamWriter) initStream() (err error) {
132+
logCtx := w.cfg.LogContext
133+
traceOnDone := trace.TopicOnWriterInitStream(
134+
w.cfg.Tracer,
135+
&logCtx,
136+
w.cfg.reconnectorInstanceID,
137+
w.cfg.topic,
138+
w.cfg.producerID,
139+
)
128140
defer func() {
129-
logCtx := w.cfg.LogContext
130-
traceOnDone := trace.TopicOnWriterInitStream(
131-
w.cfg.Tracer,
132-
&logCtx,
133-
w.cfg.reconnectorInstanceID,
134-
w.cfg.topic,
135-
w.cfg.producerID,
136-
)
137-
traceOnDone(w.SessionID, err)
141+
traceOnDone(w.SessionID, w.cfg.endpoint, err)
138142
}()
139143

140144
req := w.createInitRequest()
@@ -322,6 +326,7 @@ type RawTopicWriterStream interface {
322326
Recv() (rawtopicwriter.ServerMessage, error)
323327
Send(mess rawtopicwriter.ClientMessage) error
324328
CloseSend() error
329+
Endpoint() endpoint.Endpoint
325330
}
326331

327332
func sendMessagesToStream(

log/topic.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -702,23 +702,25 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
702702
)
703703

704704
return func(doneInfo trace.TopicWriterInitStreamDoneInfo) {
705-
if doneInfo.Error == nil {
706-
l.Log(WithLevel(ctx, DEBUG), "topic writer init stream done",
707-
kv.Error(doneInfo.Error),
708-
kv.String("topic", info.Topic),
709-
kv.String("producer_id", info.ProducerID),
710-
kv.String("writer_instance_id", info.WriterInstanceID),
711-
kv.Latency(start),
712-
kv.String("session_id", doneInfo.SessionID),
705+
fields := []Field{
706+
kv.String("topic", info.Topic),
707+
kv.String("producer_id", info.ProducerID),
708+
kv.String("writer_instance_id", info.WriterInstanceID),
709+
kv.Latency(start),
710+
kv.String("session_id", doneInfo.SessionID),
711+
}
712+
if doneInfo.Endpoint != nil {
713+
fields = append(fields,
714+
kv.Int64("node_id", int64(doneInfo.Endpoint.NodeID())),
715+
kv.String("address", doneInfo.Endpoint.Address()),
716+
kv.String("location", doneInfo.Endpoint.Location()),
713717
)
718+
}
719+
if doneInfo.Error == nil {
720+
l.Log(WithLevel(ctx, DEBUG), "topic writer init stream done", fields...)
714721
} else {
715722
l.Log(WithLevel(ctx, WARN), "topic writer init stream failed",
716-
kv.Error(doneInfo.Error),
717-
kv.String("topic", info.Topic),
718-
kv.String("producer_id", info.ProducerID),
719-
kv.String("writer_instance_id", info.WriterInstanceID),
720-
kv.Latency(start),
721-
kv.String("session_id", doneInfo.SessionID),
723+
append(append(fields, kv.Error(doneInfo.Error)), kv.Version())...,
722724
)
723725
}
724726
}

trace/topic.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,7 @@ type (
546546
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
547547
TopicWriterInitStreamDoneInfo struct {
548548
SessionID string
549+
Endpoint EndpointInfo
549550
Error error
550551
}
551552

0 commit comments

Comments
 (0)