Skip to content

Commit 3667419

Browse files
authored
Merge pull request #828 from Alegr37/issue-826
issue-826: write to log offset ranges for OnReaderSendCommitMessage
2 parents bc56fcf + 079f9c1 commit 3667419

File tree

5 files changed

+61
-35
lines changed

5 files changed

+61
-35
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Improved OnReaderSendCommitMessage logs
12
* Added `table.Session.CopyTables` method
23
* Added `x-ydb-trace-id` header into grpc calls
34
* Improved topic reader logs

internal/topic/topicreaderinternal/commit_range.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"sort"
55

66
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
7+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
78
)
89

910
// PublicCommitRangeGetter return data piece for commit messages range
@@ -23,20 +24,17 @@ func (r *CommitRanges) len() int {
2324
return len(r.ranges)
2425
}
2526

26-
// PartitionIDs implements trace.TopicReaderStreamSendCommitMessageStartMessageInfo
27-
func (r *CommitRanges) PartitionIDs() []int64 {
28-
res := make([]int64, len(r.ranges))
27+
// GetCommitsInfo implements trace.TopicReaderStreamSendCommitMessageStartMessageInfo
28+
func (r *CommitRanges) GetCommitsInfo() []trace.TopicReaderStreamCommitInfo {
29+
res := make([]trace.TopicReaderStreamCommitInfo, len(r.ranges))
2930
for i := range res {
30-
res[i] = r.ranges[i].partitionSession.PartitionID
31-
}
32-
return res
33-
}
34-
35-
// PartitionSessionIDs implements trace.TopicReaderStreamSendCommitMessageStartMessageInfo
36-
func (r *CommitRanges) PartitionSessionIDs() []int64 {
37-
res := make([]int64, len(r.ranges))
38-
for i := range res {
39-
res[i] = r.ranges[i].partitionSession.partitionSessionID.ToInt64()
31+
res[i] = trace.TopicReaderStreamCommitInfo{
32+
Topic: r.ranges[i].partitionSession.Topic,
33+
PartitionID: r.ranges[i].partitionSession.PartitionID,
34+
PartitionSessionID: r.ranges[i].partitionSession.partitionSessionID.ToInt64(),
35+
StartOffset: r.ranges[i].commitOffsetStart.ToInt64(),
36+
EndOffset: r.ranges[i].commitOffsetEnd.ToInt64(),
37+
}
4038
}
4139
return res
4240
}

internal/topic/topicreaderinternal/committer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,10 @@ func (c *committer) pushCommitsLoop(ctx context.Context) {
144144

145145
commits.optimize()
146146

147-
onDone := trace.TopicOnReaderSendCommitMessage(c.tracer, &commits)
147+
onDone := trace.TopicOnReaderSendCommitMessage(
148+
c.tracer,
149+
&commits,
150+
)
148151
err := sendCommitMessage(c.send, commits)
149152
onDone(err)
150153

log/topic.go

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -162,25 +162,37 @@ func internalTopic(l *wrapper, d trace.Detailer) (t trace.Topic) { //nolint:gocy
162162
}
163163
ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "send", "commit", "message")
164164
start := time.Now()
165-
l.Log(ctx, "start",
166-
Any("partitions_id", info.CommitsInfo.PartitionIDs()),
167-
Any("partitions_session_id", info.CommitsInfo.PartitionSessionIDs()),
168-
)
165+
166+
commitInfo := info.CommitsInfo.GetCommitsInfo()
167+
for i := range commitInfo {
168+
l.Log(ctx, "start",
169+
String("topic", commitInfo[i].Topic),
170+
Int64("partitions_id", commitInfo[i].PartitionID),
171+
Int64("partitions_session_id", commitInfo[i].PartitionSessionID),
172+
Int64("commit_start_offset", commitInfo[i].StartOffset),
173+
Int64("commit_end_offset", commitInfo[i].EndOffset),
174+
)
175+
}
169176
return func(doneInfo trace.TopicReaderSendCommitMessageDoneInfo) {
170-
fields := []Field{
171-
Any("partitions_id", info.CommitsInfo.PartitionIDs()),
172-
Any("partitions_session_id", info.CommitsInfo.PartitionSessionIDs()),
173-
latencyField(start),
174-
}
175-
if doneInfo.Error == nil {
176-
l.Log(ctx, "done", fields...)
177-
} else {
178-
l.Log(WithLevel(ctx, WARN), "commit message sent",
179-
append(fields,
180-
Error(doneInfo.Error),
181-
versionField(),
182-
)...,
183-
)
177+
for i := range commitInfo {
178+
fields := []Field{
179+
String("topic", commitInfo[i].Topic),
180+
Int64("partitions_id", commitInfo[i].PartitionID),
181+
Int64("partitions_session_id", commitInfo[i].PartitionSessionID),
182+
Int64("commit_start_offset", commitInfo[i].StartOffset),
183+
Int64("commit_end_offset", commitInfo[i].EndOffset),
184+
latencyField(start),
185+
}
186+
if doneInfo.Error == nil {
187+
l.Log(ctx, "done", fields...)
188+
} else {
189+
l.Log(WithLevel(ctx, WARN), "commit message sent",
190+
append(fields,
191+
Error(doneInfo.Error),
192+
versionField(),
193+
)...,
194+
)
195+
}
184196
}
185197
}
186198
}

trace/topic.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,19 +134,31 @@ type (
134134
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
135135
// later release.
136136
TopicReaderSendCommitMessageStartInfo struct {
137-
// ReaderConnectionID string unimplemented yet - need some internal changes
138137
CommitsInfo TopicReaderStreamSendCommitMessageStartMessageInfo
139138
}
140139

140+
// TopicReaderStreamCommitInfo
141+
//
142+
// Experimental
143+
//
144+
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
145+
// later release.
146+
TopicReaderStreamCommitInfo struct {
147+
Topic string
148+
PartitionID int64
149+
PartitionSessionID int64
150+
StartOffset int64
151+
EndOffset int64
152+
}
153+
141154
// TopicReaderStreamSendCommitMessageStartMessageInfo
142155
//
143156
// Experimental
144157
//
145158
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
146159
// later release.
147160
TopicReaderStreamSendCommitMessageStartMessageInfo interface {
148-
PartitionIDs() []int64
149-
PartitionSessionIDs() []int64
161+
GetCommitsInfo() []TopicReaderStreamCommitInfo
150162
}
151163

152164
// TopicReaderSendCommitMessageDoneInfo

0 commit comments

Comments
 (0)