Skip to content

Commit 571a604

Browse files
committed
prepare traces for logging
1 parent 9b2c717 commit 571a604

File tree

9 files changed

+345
-333
lines changed

9 files changed

+345
-333
lines changed

internal/topic/topicreaderinternal/commit_range.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,28 @@ func (r *CommitRanges) len() int {
2323
return len(r.ranges)
2424
}
2525

26-
func NewCommitRangesWithCapacity(capacity int) CommitRanges {
27-
return CommitRanges{ranges: make([]commitRange, 0, capacity)}
26+
// PartitionIDs implements trace.TopicReaderStreamSendCommitMessageStartMessageInfo
27+
func (r *CommitRanges) PartitionIDs() []int64 {
28+
res := make([]int64, len(r.ranges))
29+
for i := range res {
30+
res[i] = r.ranges[i].partitionSession.PartitionID
31+
}
32+
return res
2833
}
2934

30-
func NewCommitRanges(commitable ...PublicCommitRangeGetter) CommitRanges {
31-
var res CommitRanges
32-
res.Append(commitable...)
35+
// PartitionSessionIDs implements trace.TopicReaderStreamSendCommitMessageStartMessageInfo
36+
func (r *CommitRanges) PartitionSessionIDs() []int64 {
37+
res := make([]int64, len(r.ranges))
38+
for i := range res {
39+
res[i] = r.ranges[i].partitionSession.partitionSessionID.ToInt64()
40+
}
3341
return res
3442
}
3543

44+
func NewCommitRangesWithCapacity(capacity int) CommitRanges {
45+
return CommitRanges{ranges: make([]commitRange, 0, capacity)}
46+
}
47+
3648
func NewCommitRangesFromPublicCommits(ranges []PublicCommitRange) CommitRanges {
3749
res := CommitRanges{}
3850
res.ranges = make([]commitRange, len(ranges))
@@ -72,7 +84,7 @@ func (r *CommitRanges) Reset() {
7284
r.ranges = r.ranges[:0]
7385
}
7486

75-
func (r CommitRanges) toPartitionsOffsets() []rawtopicreader.PartitionCommitOffset {
87+
func (r *CommitRanges) toPartitionsOffsets() []rawtopicreader.PartitionCommitOffset {
7688
if len(r.ranges) == 0 {
7789
return nil
7890
}

internal/topic/topicreaderinternal/commit_range_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,3 +280,9 @@ func TestCommitsToRawPartitionCommitOffset(t *testing.T) {
280280
})
281281
}
282282
}
283+
284+
func testNewCommitRanges(commitable ...PublicCommitRangeGetter) *CommitRanges {
285+
var res CommitRanges
286+
res.Append(commitable...)
287+
return &res
288+
}

internal/topic/topicreaderinternal/committer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func (c *committer) pushCommitsLoop(ctx context.Context) {
141141

142142
commits.optimize()
143143

144-
onDone := trace.TopicOnReaderStreamSentCommitMessage(c.tracer)
144+
onDone := trace.TopicOnReaderStreamSendCommitMessage(c.tracer, &commits)
145145
err := sendCommitMessage(c.send, commits)
146146
onDone(err)
147147

internal/topic/topicreaderinternal/committer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func TestCommitterCommitAsync(t *testing.T) {
6262
close(sendCalled)
6363
require.Equal(t,
6464
&rawtopicreader.CommitOffsetRequest{
65-
CommitOffsets: NewCommitRanges(&cRange).toPartitionsOffsets(),
65+
CommitOffsets: testNewCommitRanges(&cRange).toPartitionsOffsets(),
6666
},
6767
msg)
6868
return nil
@@ -93,7 +93,7 @@ func TestCommitterCommitSync(t *testing.T) {
9393
sendCalled = true
9494
require.Equal(t,
9595
&rawtopicreader.CommitOffsetRequest{
96-
CommitOffsets: NewCommitRanges(&cRange).toPartitionsOffsets(),
96+
CommitOffsets: testNewCommitRanges(&cRange).toPartitionsOffsets(),
9797
},
9898
msg)
9999
c.OnCommitNotify(session, cRange.commitOffsetEnd)

internal/topic/topicreaderinternal/stream_reader_impl.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,12 +173,13 @@ func (r *topicStreamReaderImpl) ReadMessageBatch(
173173
)
174174
defer func() {
175175
if batch == nil {
176-
onDone(0, "", -1, -1, -1, r.getRestBufferBytes(), err)
176+
onDone(0, "", -1, -1, -1, -1, r.getRestBufferBytes(), err)
177177
} else {
178178
onDone(
179179
len(batch.Messages),
180180
batch.Topic(),
181181
batch.PartitionID(),
182+
batch.partitionSession().partitionSessionID.ToInt64(),
182183
batch.commitRange.commitOffsetStart.ToInt64(),
183184
batch.commitRange.commitOffsetEnd.ToInt64(),
184185
r.getRestBufferBytes(),
@@ -273,13 +274,13 @@ func (r *topicStreamReaderImpl) consumeRawMessageFromBuffer(ctx context.Context)
273274

274275
func (r *topicStreamReaderImpl) onStopPartitionSessionRequestFromBuffer(
275276
msg *rawtopicreader.StopPartitionSessionRequest,
276-
) error {
277+
) (err error) {
277278
session, err := r.sessionController.Get(msg.PartitionSessionID)
278279
if err != nil {
279280
return err
280281
}
281282

282-
trace.TopicOnReaderPartitionReadStop(
283+
onDone := trace.TopicOnReaderPartitionReadStopResponse(
283284
r.cfg.Tracer,
284285
r.readConnectionID,
285286
session.Context(),
@@ -289,6 +290,9 @@ func (r *topicStreamReaderImpl) onStopPartitionSessionRequestFromBuffer(
289290
msg.CommittedOffset.ToInt64(),
290291
msg.Graceful,
291292
)
293+
defer func() {
294+
onDone(err)
295+
}()
292296

293297
if msg.Graceful {
294298
resp := &rawtopicreader.StopPartitionSessionResponse{
@@ -667,6 +671,16 @@ func (r *topicStreamReaderImpl) onCommitResponse(msg *rawtopicreader.CommitOffse
667671
return err
668672
}
669673
partition.setCommittedOffset(commit.CommittedOffset)
674+
675+
trace.TopicOnReaderStreamCommittedNotify(
676+
r.cfg.Tracer,
677+
r.readConnectionID,
678+
partition.Topic,
679+
partition.PartitionID,
680+
partition.partitionSessionID.ToInt64(),
681+
commit.CommittedOffset.ToInt64(),
682+
)
683+
670684
r.committer.OnCommitNotify(partition, commit.CommittedOffset)
671685
}
672686

internal/topic/topicreaderinternal/stream_reconnector.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,6 @@ func (r *readerReconnector) reconnect(ctx context.Context, oldReader batchedStre
239239
}
240240

241241
func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err error) {
242-
traceDone := trace.TopicOnReaderConnect(r.tracer)
243-
defer traceDone(err)
244-
245242
bgContext := r.background.Context()
246243

247244
if err = bgContext.Err(); err != nil {

trace/details.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ const (
2626

2727
TopicReaderStreamLifeCycleEvents
2828
TopicReaderStreamEvents
29+
TopicReaderMessageEvents
2930
TopicReaderPartitionEvents
3031

3132
RetryEvents
@@ -68,7 +69,7 @@ const (
6869
TableSessionQueryEvents |
6970
TableSessionTransactionEvents
7071

71-
TopicReaderAllEvents = TopicReaderStreamEvents |
72+
TopicReaderAllEvents = TopicReaderStreamEvents | TopicReaderMessageEvents |
7273
TopicReaderPartitionEvents |
7374
TopicReaderStreamLifeCycleEvents
7475

@@ -112,6 +113,7 @@ var (
112113
TopicControlPlaneEvents: "ydb.topic.controlplane",
113114
TopicReaderAllEvents: "ydb.topic.reader",
114115
TopicReaderStreamEvents: "ydb.topic.reader.stream",
116+
TopicReaderMessageEvents: "ydb.topic.reader.message",
115117
TopicReaderPartitionEvents: "ydb.topic.reader.partition",
116118
TopicReaderStreamLifeCycleEvents: "ydb.topic.reader.lifecycle",
117119
}

trace/topic.go

Lines changed: 54 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,24 @@ type (
1717
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1818
// later release.
1919
Topic struct {
20-
OnReaderConnect func(TopicReaderConnectStartInfo) func(TopicReaderConnectDoneInfo)
21-
OnReaderReconnect func(TopicReaderReconnectStartInfo) func(TopicReaderReconnectDoneInfo)
22-
OnReaderReconnectRequest func(TopicReaderReconnectRequestInfo)
23-
24-
OnReaderPartitionReadStartResponse func(TopicReaderPartitionReadStartResponseStartInfo) func(TopicReaderPartitionReadStartResponseDoneInfo)
25-
OnReaderPartitionReadStop func(TopicReaderPartitionReadStopInfo)
26-
27-
OnReaderStreamCommit func(TopicReaderStreamCommitStartInfo) func(TopicReaderStreamCommitDoneInfo)
28-
OnReaderStreamSentCommitMessage func(TopicReaderStreamSentCommitMessageStartInfo) func(TopicReaderStreamSentCommitMessageDoneInfo)
29-
OnReaderStreamCommittedNotify func(TopicReaderStreamCommittedInfo)
30-
OnReaderStreamClose func(TopicReaderStreamCloseStartInfo) func(TopicReaderStreamCloseDoneInfo)
31-
OnReaderStreamInit func(TopicReaderStreamInitStartInfo) func(TopicReaderStreamInitDoneInfo)
32-
OnReaderStreamError func(TopicReaderStreamErrorInfo)
33-
OnReaderStreamSentDataRequest func(TopicReaderStreamSentDataRequestInfo)
34-
OnReaderStreamReceiveDataResponse func(TopicReaderStreamReceiveDataResponseStartInfo) func(TopicReaderStreamReceiveDataResponseDoneInfo)
35-
OnReaderStreamReadMessages func(TopicReaderStreamReadMessagesStartInfo) func(TopicReaderStreamReadMessagesDoneInfo)
36-
OnReaderStreamUnknownGrpcMessage func(OnReadStreamUnknownGrpcMessageInfo)
37-
OnReaderStreamUpdateToken func(OnReadStreamUpdateTokenStartInfo) func(OnReadStreamUpdateTokenMiddleTokenReceivedInfo) func(OnReadStreamUpdateTokenDoneInfo)
20+
OnReaderReconnect func(startInfo TopicReaderReconnectStartInfo) func(doneInfo TopicReaderReconnectDoneInfo)
21+
OnReaderReconnectRequest func(info TopicReaderReconnectRequestInfo)
22+
23+
OnReaderPartitionReadStartResponse func(startInfo TopicReaderPartitionReadStartResponseStartInfo) func(doneInfo TopicReaderPartitionReadStartResponseDoneInfo)
24+
OnReaderPartitionReadStopResponse func(startInfo TopicReaderPartitionReadStopResponseStartInfo) func(doneInfo TopicReaderPartitionReadStopResponseDoneInfo)
25+
26+
OnReaderStreamCommit func(startInfo TopicReaderStreamCommitStartInfo) func(doneInfo TopicReaderStreamCommitDoneInfo)
27+
OnReaderStreamSendCommitMessage func(startInfo TopicReaderStreamSendCommitMessageStartInfo) func(doneInfo TopicReaderStreamSendCommitMessageDoneInfo)
28+
OnReaderStreamCommittedNotify func(info TopicReaderStreamCommittedNotifyInfo)
29+
OnReaderStreamClose func(startInfo TopicReaderStreamCloseStartInfo) func(doneInfo TopicReaderStreamCloseDoneInfo)
30+
OnReaderStreamInit func(startInfo TopicReaderStreamInitStartInfo) func(doneInfo TopicReaderStreamInitDoneInfo)
31+
OnReaderStreamError func(info TopicReaderStreamErrorInfo)
32+
OnReaderStreamUpdateToken func(startInfo OnReadStreamUpdateTokenStartInfo) func(updateTokenInfo OnReadStreamUpdateTokenMiddleTokenReceivedInfo) func(doneInfo OnReadStreamUpdateTokenDoneInfo)
33+
34+
OnReaderStreamSentDataRequest func(startInfo TopicReaderStreamSentDataRequestInfo)
35+
OnReaderStreamReceiveDataResponse func(startInfo TopicReaderStreamReceiveDataResponseStartInfo) func(doneInfo TopicReaderStreamReceiveDataResponseDoneInfo)
36+
OnReaderStreamReadMessages func(startInfo TopicReaderStreamReadMessagesStartInfo) func(doneInfo TopicReaderStreamReadMessagesDoneInfo)
37+
OnReaderStreamUnknownGrpcMessage func(info OnReadStreamUnknownGrpcMessageInfo)
3838
}
3939

4040
// TopicReaderPartitionReadStartResponseStartInfo
@@ -61,13 +61,13 @@ type (
6161
Error error
6262
}
6363

64-
// TopicReaderPartitionReadStopInfo
64+
// TopicReaderPartitionReadStopResponseStartInfo
6565
//
6666
// Experimental
6767
//
6868
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
6969
// later release.
70-
TopicReaderPartitionReadStopInfo struct {
70+
TopicReaderPartitionReadStopResponseStartInfo struct {
7171
ReaderConnectionID string
7272
PartitionContext context.Context
7373
Topic string
@@ -77,34 +77,59 @@ type (
7777
Graceful bool
7878
}
7979

80-
// TopicReaderStreamSentCommitMessageStartInfo
80+
// TopicReaderPartitionReadStopResponseDoneInfo
8181
//
8282
// Experimental
8383
//
8484
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
8585
// later release.
86-
TopicReaderStreamSentCommitMessageStartInfo struct{}
86+
TopicReaderPartitionReadStopResponseDoneInfo struct {
87+
Error error
88+
}
89+
90+
// TopicReaderStreamSendCommitMessageStartInfo
91+
//
92+
// Experimental
93+
//
94+
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
95+
// later release.
96+
TopicReaderStreamSendCommitMessageStartInfo struct {
97+
// ReaderConnectionID string unimplemented yet - need some internal changes
98+
CommitsInfo TopicReaderStreamSendCommitMessageStartMessageInfo
99+
}
87100

88-
// TopicReaderStreamSentCommitMessageDoneInfo
101+
// TopicReaderStreamSendCommitMessageStartMessageInfo
89102
//
90103
// Experimental
91104
//
92105
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
93106
// later release.
94-
TopicReaderStreamSentCommitMessageDoneInfo struct {
107+
TopicReaderStreamSendCommitMessageStartMessageInfo interface {
108+
PartitionIDs() []int64
109+
PartitionSessionIDs() []int64
110+
}
111+
112+
// TopicReaderStreamSendCommitMessageDoneInfo
113+
//
114+
// Experimental
115+
//
116+
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
117+
// later release.
118+
TopicReaderStreamSendCommitMessageDoneInfo struct {
95119
Error error
96120
}
97121

98-
// TopicReaderStreamCommittedInfo
122+
// TopicReaderStreamCommittedNotifyInfo
99123
//
100124
// Experimental
101125
//
102126
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
103127
// later release.
104-
TopicReaderStreamCommittedInfo struct {
128+
TopicReaderStreamCommittedNotifyInfo struct {
105129
ReaderConnectionID string
106130
Topic string
107131
PartitionID int64
132+
PartitionSessionID int64
108133
CommittedOffset int64
109134
}
110135

@@ -181,6 +206,7 @@ type (
181206
MessagesCount int
182207
Topic string
183208
PartitionID int64
209+
PartitionSessionID int64
184210
OffsetStart int64
185211
OffsetEnd int64
186212
FreeBufferCapacity int
@@ -198,24 +224,6 @@ type (
198224
Error error
199225
}
200226

201-
// TopicReaderConnectStartInfo
202-
//
203-
// Experimental
204-
//
205-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
206-
// later release.
207-
TopicReaderConnectStartInfo struct{}
208-
209-
// TopicReaderConnectDoneInfo
210-
//
211-
// Experimental
212-
//
213-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
214-
// later release.
215-
TopicReaderConnectDoneInfo struct {
216-
Error error
217-
}
218-
219227
// TopicReaderReconnectStartInfo
220228
//
221229
// Experimental
@@ -347,8 +355,8 @@ type (
347355
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
348356
// later release.
349357
TopicReaderStreamInitDoneInfo struct {
350-
NewReaderConnectionID string
351-
Error error
358+
ReaderConnectionID string
359+
Error error
352360
}
353361

354362
// OnReadStreamUpdateTokenStartInfo

0 commit comments

Comments
 (0)