Skip to content

Commit c25594d

Browse files
authored
Merge pull request #1424 Added write topic writer ack
2 parents d10ca1b + 7cc86b9 commit c25594d

File tree

5 files changed

+135
-0
lines changed

5 files changed

+135
-0
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added log topic writer ack
2+
13
## v3.77.0
24
* Changed log message about send topic message
35
* Added experimental support for executing scripts over query service client (`query.Client.ExecuteScript` and `query.CLient.FetchScriptResults`)

internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"errors"
55
"fmt"
66
"reflect"
7+
"strconv"
78
"time"
89

910
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
@@ -221,6 +222,48 @@ func (r *WriteResult) fromProto(response *Ydb_Topic.StreamWriteMessage_WriteResp
221222
return r.WriteStatistics.fromProto(response.GetWriteStatistics())
222223
}
223224

225+
// GetAcks implemtnts trace.TopicWriterResultMessagesInfoAcks interface
226+
func (r *WriteResult) GetAcks() (res traceAck) {
227+
res.AcksCount = len(r.Acks)
228+
if res.AcksCount > 0 {
229+
res.SeqNoMin = r.Acks[0].SeqNo
230+
res.WrittenOffsetMin = r.Acks[0].MessageWriteStatus.WrittenOffset
231+
}
232+
for i := range r.Acks {
233+
ack := &r.Acks[i]
234+
if ack.MessageWriteStatus.Type == WriteStatusTypeWritten {
235+
res.WrittenCount++
236+
}
237+
if ack.MessageWriteStatus.Type == WriteStatusTypeSkipped {
238+
res.SkipCount++
239+
}
240+
241+
if ack.SeqNo < res.SeqNoMin {
242+
res.SeqNoMin = ack.SeqNo
243+
} else if ack.SeqNo > res.SeqNoMax {
244+
res.SeqNoMax = ack.SeqNo
245+
}
246+
247+
if ack.MessageWriteStatus.WrittenOffset < res.SeqNoMin {
248+
res.WrittenOffsetMin = ack.MessageWriteStatus.WrittenOffset
249+
} else if ack.MessageWriteStatus.WrittenOffset > res.WrittenOffsetMax {
250+
res.WrittenOffsetMax = ack.MessageWriteStatus.WrittenOffset
251+
}
252+
}
253+
254+
return res
255+
}
256+
257+
type traceAck = struct {
258+
AcksCount int
259+
SeqNoMin int64
260+
SeqNoMax int64
261+
WrittenOffsetMin int64
262+
WrittenOffsetMax int64
263+
WrittenCount int
264+
SkipCount int
265+
}
266+
224267
type WriteAck struct {
225268
SeqNo int64
226269
MessageWriteStatus MessageWriteStatus
@@ -269,6 +312,19 @@ const (
269312
WriteStatusTypeSkipped
270313
)
271314

315+
func (t WriteStatusType) String() string {
316+
switch t {
317+
case WriteStatusTypeUnknown:
318+
return "Unknown"
319+
case WriteStatusTypeSkipped:
320+
return "Skipped"
321+
case WriteStatusTypeWritten:
322+
return "Written"
323+
default:
324+
return strconv.Itoa(int(t))
325+
}
326+
}
327+
272328
type WriteStatusSkipReason int
273329

274330
const (

log/topic.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -820,6 +820,25 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
820820
}
821821
}
822822
}
823+
t.OnWriterReceiveResult = func(info trace.TopicWriterResultMessagesInfo) {
824+
if d.Details()&trace.TopicWriterStreamEvents == 0 {
825+
return
826+
}
827+
acks := info.Acks.GetAcks()
828+
ctx := with(context.Background(), DEBUG, "ydb", "topic", "writer", "receive", "result")
829+
l.Log(ctx, "topic writer receive result from server",
830+
String("writer_instance_id", info.WriterInstanceID),
831+
String("session_id", info.SessionID),
832+
Int("acks_count", acks.AcksCount),
833+
Int64("seq_no_min", acks.SeqNoMin),
834+
Int64("seq_no_max", acks.SeqNoMax),
835+
Int64("written_offset_min", acks.WrittenOffsetMin),
836+
Int64("written_offset_max", acks.WrittenOffsetMax),
837+
Int("written_offset_count", acks.WrittenCount),
838+
Int("skip_count", acks.SkipCount),
839+
versionField(),
840+
)
841+
}
823842
t.OnWriterReadUnknownGrpcMessage = func(info trace.TopicOnWriterReadUnknownGrpcMessageInfo) {
824843
if d.Details()&trace.TopicWriterStreamEvents == 0 {
825844
return

trace/topic.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ type (
121121
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
122122
OnWriterSendMessages func(TopicWriterSendMessagesStartInfo) func(TopicWriterSendMessagesDoneInfo)
123123
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
124+
OnWriterReceiveResult func(TopicWriterResultMessagesInfo)
125+
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
124126
OnWriterReadUnknownGrpcMessage func(TopicOnWriterReadUnknownGrpcMessageInfo)
125127
}
126128

@@ -470,6 +472,27 @@ type (
470472
Error error
471473
}
472474

475+
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
476+
TopicWriterResultMessagesInfo struct {
477+
WriterInstanceID string
478+
SessionID string
479+
PartitionID int64
480+
Acks TopicWriterResultMessagesInfoAcks
481+
}
482+
483+
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
484+
TopicWriterResultMessagesInfoAcks interface {
485+
GetAcks() struct {
486+
AcksCount int
487+
SeqNoMin int64
488+
SeqNoMax int64
489+
WrittenOffsetMin int64
490+
WrittenOffsetMax int64
491+
WrittenCount int
492+
SkipCount int
493+
}
494+
}
495+
473496
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
474497
TopicOnWriterReadUnknownGrpcMessageInfo struct {
475498
WriterInstanceID string

trace/topic_gtrace.go

Lines changed: 35 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)